Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/sync/table/synctable.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*67Variations: Instead of making this class really complicated8with many different ways to do sync (e.g, changefeeds, project9websockets, unit testing, etc.), we have one single approach via10a Client that has a certain interface. Then we implement different11Clients that have this interface, in order to support different12ways of orchestrating a SyncTable.13*/1415// If true, will log to the console a huge amount of16// info about every get/set17let DEBUG: boolean = false;1819export function set_debug(x: boolean): void {20DEBUG = x;21}2223import { delay } from "awaiting";24import { global_cache_decref } from "./global-cache";25import { EventEmitter } from "events";26import { Map, fromJS, List } from "immutable";27import { keys, throttle } from "lodash";28import { callback2, cancel_scheduled, once } from "@cocalc/util/async-utils";29import { wait } from "@cocalc/util/async-wait";30import { query_function } from "./query-function";31import { assert_uuid, copy, is_array, is_object, len } from "@cocalc/util/misc";32import * as schema from "@cocalc/util/schema";33import mergeDeep from "@cocalc/util/immutable-deep-merge";34import type { Client } from "@cocalc/sync/client/types";35export type { Client };3637export type Query = any; // TODO typing38export type QueryOptions = any[]; // TODO typing3940export type MergeType = "deep" | "shallow" | "none";4142export interface VersionedChange {43obj: { [key: string]: any };44version: number;45}4647export interface TimedChange {48obj: { [key: string]: any };49time: number; // ms since epoch50}5152function is_fatal(err: string): boolean {53return err.indexOf("FATAL") != -1;54}5556import { reuseInFlight } from "@cocalc/util/reuse-in-flight";5758import { Changefeed } from "./changefeed";59import { parse_query, to_key } from "./util";6061export type State = "disconnected" | "connected" | "closed";6263export class SyncTable extends EventEmitter {64private changefeed?: Changefeed;65private query: Query;66private client_query: any;67private primary_keys: string[];68private options: QueryOptions;69public readonly client: Client;70private throttle_changes?: number;71private throttled_emit_changes?: Function;72private last_server_time: number = 0;73private error: { error: string; query: Query } | undefined = undefined;7475// This can optionally be set on a SyncTable implementation.76// E.g., it is supported for the version in77// packages/sync/client/synctable-project78// so that a table connected to a project can make a change based79// on fact client disconnected (e.g., clear its cursor).80public setOnDisconnect?: (changes: any, merge) => void;81// Optional function that is available for direct82// communication with project, in case synctable is backed83// by a project. Clients can send a message using84// this function, and the project synctable will85// emit a 'message-from-project' event when it receives such a message.86public sendMessageToProject?: (data) => void;8788// Immutable map -- the value of this synctable.89private value?: Map<string, Map<string, any>>;90private last_save: Map<string, Map<string, any>> = Map();9192// Which records we have changed (and when, by server time),93// that haven't been sent to the backend.94private changes: { [key: string]: number } = {};9596// The version of each record.97private versions: { [key: string]: number } = {};9899// The inital version is only used in the project, where we100// just assume the clock is right. If this were totally101// off/changed, then clients would get confused -- until they102// close and open the file or refresh their browser. It might103// be better to switch to storing the current version number104// on disk.105private initial_version: number = Date.now();106107// disconnected <--> connected --> closed108private state: State;109public table: string;110private schema: any;111private emit_change: Function;112public reference_count: number = 0;113public cache_key: string | undefined;114// Which fields the user is allowed to set/change.115// Gets updated during init.116private set_fields: string[] = [];117// Which fields *must* be included in any set query.118// Also updated during init.119private required_set_fields: { [key: string]: boolean } = {};120121// Coerce types and generally do strong checking of all122// types using the schema. Do this unless you have a very123// good reason not to!124private coerce_types: boolean = true;125126// If set, then the table is assumed to be managed127// entirely externally (using events).128// This is used by the synctables that are managed129// entirely by the project (e.g., sync-doc support).130private no_db_set: boolean = false;131132// Set only for some tables.133private project_id?: string;134135private last_has_uncommitted_changes?: boolean = undefined;136137// This is used only in synctable-project.ts for a communications channel138// for Jupyter on compute servers.139public channel?: any;140141constructor(142query,143options: any[],144client: Client,145throttle_changes?: number,146coerce_types?: boolean,147no_db_set?: boolean,148project_id?: string,149) {150super();151152if (coerce_types != undefined) {153this.coerce_types = coerce_types;154}155if (no_db_set != undefined) {156this.no_db_set = no_db_set;157}158if (project_id != undefined) {159this.project_id = project_id;160}161162if (is_array(query)) {163throw Error("must be a single query, not array of queries");164}165166this.set_state("disconnected");167168this.changefeed_on_update = this.changefeed_on_update.bind(this);169this.changefeed_on_close = this.changefeed_on_close.bind(this);170171this.setMaxListeners(100);172this.query = parse_query(query);173this.options = options;174this.client = client;175this.throttle_changes = throttle_changes;176177this.init_query();178this.init_throttle_changes();179180// So only ever runs once at a time.181this.save = reuseInFlight(this.save.bind(this));182this.first_connect();183}184185/* PUBLIC API */186187// is_ready is true if the table has been initialized and not yet closed.188// It might *not* be currently connected, due to a temporary network189// disconnect. When is_ready is true you can read and write to this table,190// but there is no guarantee things aren't temporarily stale.191public is_ready(): boolean {192return this.value != null && this.state !== "closed";193}194195/*196Return true if there are changes to this synctable that197have NOT been confirmed as saved to the backend database.198(Always returns false when not yet initialized.)199*/200public has_uncommitted_changes(): boolean {201if (this.state === "closed") {202return false; // if closed, can't have any uncommitted changes.203}204return len(this.changes) !== 0;205}206207/* Gets records from this table.208- arg = not given: returns everything (as an209immutable map from key to obj)210- arg = array of keys; return map from key to obj211- arg = single key; returns corresponding object212213This is NOT a generic query mechanism. SyncTable214is really best thought of as a key:value store!215*/216public get(arg?): Map<string, any> | undefined {217this.assert_not_closed("get");218219if (this.value == null) {220throw Error("table not yet initialized");221}222223if (arg == null) {224return this.value;225}226227if (is_array(arg)) {228let x: Map<string, Map<string, any>> = Map();229for (const k of arg) {230const key: string | undefined = to_key(k);231if (key != null) {232const y = this.value.get(key);233if (y != null) {234x = x.set(key, y);235}236}237}238return x;239} else {240const key = to_key(arg);241if (key != null) {242return this.value.get(key);243}244}245}246247/* Return the number of records in the table. */248public size(): number {249this.assert_not_closed("size");250if (this.value == null) {251throw Error("table not yet initialized");252}253return this.value.size;254}255256/*257Get one record from this table. Especially useful when258there is only one record, which is an important special259case (a so-called "wide" table?.)260*/261public get_one(arg?): Map<string, any> | undefined {262if (this.value == null) {263throw Error("table not yet initialized");264}265266if (arg == null) {267return this.value.toSeq().first();268} else {269// get only returns (at most) one object, so it's "get_one".270return this.get(arg);271}272}273274private async wait_until_value(): Promise<void> {275if (this.value != null) return;276// can't save until server sends state. We wait.277await once(this, "init-value-server");278if (this.value == null) {279throw Error("bug -- change should initialize value");280}281}282283/*284Ensure any unsent changes are sent to the backend.285When this function returns there are no unsent changes,286since it keeps calling _save until nothing has changed287locally.288*/289public async save(): Promise<void> {290const dbg = this.dbg("save");291//console.log("synctable SAVE");292if (this.state === "closed") {293// Not possible to save. save is wrapped in294// reuseInFlight, which debounces, so it's very295// reasonable that an attempt to call this would296// finally fire after a close (which is sync).297// Throwing an error hit would (and did) actually298// crash projects on the backend in production,299// so this has to be a warning.300dbg("WARNING: called save on closed synctable");301return;302}303if (this.value == null) {304// nothing to save yet305return;306}307308while (this.has_uncommitted_changes()) {309if (this.error) {310// do not try to save when there's an error since that311// won't help. Need to attempt to fix it first.312dbg("WARNING: not saving ", this.error);313return;314}315//console.log("SAVE -- has uncommitted changes, so trying again.");316if (this.state !== "connected") {317// wait for state change.318// This could take a long time, and that is fine.319await once(this, "state");320}321if (this.state === "connected") {322if (!(await this._save())) {323this.update_has_uncommitted_changes();324return;325}326}327// else switched to something else (?), so328// loop around and wait again for a change...329}330}331332private update_has_uncommitted_changes(): void {333const cur = this.has_uncommitted_changes();334if (cur !== this.last_has_uncommitted_changes) {335this.emit("has-uncommitted-changes", cur);336this.last_has_uncommitted_changes = cur;337}338}339340/*341set -- Changes (or creates) one entry in the table.342The input field changes is either an Immutable.js Map or a JS Object map.343If changes does not have the primary key then a random record is updated,344and there *must* be at least one record. Exception: computed primary345keys will be computed (see stuff about computed primary keys above).346The second parameter 'merge' can be one of three values:347'deep' : (DEFAULT) deep merges the changes into the record, keep as much info as possible.348'shallow': shallow merges, replacing keys by corresponding values349'none' : do no merging at all -- just replace record completely350Raises an exception if something goes wrong doing the set.351Returns updated value otherwise.352353DOES NOT cause a save.354355NOTE: we always use db schema to ensure types are correct,356converting if necessary. This has a performance impact,357but is worth it for sanity's sake!!!358*/359public set(360changes: any,361merge: MergeType = "deep",362fire_change_event: boolean = true,363): any {364if (this.value == null) {365throw Error("can't set until table is initialized");366}367368if (!Map.isMap(changes)) {369changes = fromJS(changes);370if (!is_object(changes)) {371throw Error(372"type error -- changes must be an immutable.js Map or JS map",373);374}375}376if (DEBUG) {377//console.log(`set('${this.table}'): ${JSON.stringify(changes.toJS())}`);378}379// For sanity!380changes = this.do_coerce_types(changes);381// Ensure that each key is allowed to be set.382if (this.client_query.set == null) {383throw Error(`users may not set ${this.table}`);384}385386const can_set = this.client_query.set.fields;387changes.map((_, k) => {388if (can_set[k] === undefined) {389throw Error(`users may not set ${this.table}.${k}`);390}391});392// Determine the primary key's value393let key: string | undefined = this.obj_to_key(changes);394if (key == null) {395// attempt to compute primary key if it is a computed primary key396let key0 = this.computed_primary_key(changes);397key = to_key(key0);398if (key == null && this.primary_keys.length === 1) {399// use a "random" primary key from existing data400key0 = key = this.value.keySeq().first();401}402if (key == null) {403throw Error(404`must specify primary key ${this.primary_keys.join(405",",406)}, have at least one record, or have a computed primary key`,407);408}409// Now key is defined410if (this.primary_keys.length === 1) {411changes = changes.set(this.primary_keys[0], key0);412} else if (this.primary_keys.length > 1) {413if (key0 == null) {414// to satisfy typescript.415throw Error("bug -- computed primary key must be an array");416}417let i = 0;418for (const pk of this.primary_keys) {419changes = changes.set(pk, key0[i]);420i += 1;421}422}423}424425// Get the current value426const cur = this.value.get(key);427let new_val;428429if (cur == null) {430// No record with the given primary key. Require that431// all the this.required_set_fields are specified, or432// it will become impossible to sync this table to433// the backend.434for (const k in this.required_set_fields) {435if (changes.get(k) == null) {436throw Error(`must specify field '${k}' for new records`);437}438}439// If no current value, then next value is easy -- it equals the current value in all cases.440new_val = changes;441} else {442// Use the appropriate merge strategy to get the next val.443switch (merge) {444case "deep":445new_val = mergeDeep(cur, changes);446break;447case "shallow":448new_val = cur.merge(changes);449break;450case "none":451new_val = changes;452break;453default:454throw Error("merge must be one of 'deep', 'shallow', 'none'");455}456}457458if (new_val.equals(cur)) {459// nothing actually changed, so nothing further to do.460return new_val;461}462463// clear error state -- the change may be just what is needed464// to fix the error, e.g., attempting to save an invalid account465// setting, then fixing it.466this.clearError();467468for (const field in this.required_set_fields) {469if (!new_val.has(field)) {470throw Error(471`missing required set field ${field} of table ${this.table}`,472);473}474}475476// Something changed:477this.value = this.value.set(key, new_val);478this.changes[key] = this.unique_server_time();479this.update_has_uncommitted_changes();480if (this.client.is_project()) {481// project assigns versions482const version = this.increment_version(key);483const obj = new_val.toJS();484this.emit("versioned-changes", [{ obj, version }]);485} else {486// browser gets them assigned...487this.null_version(key);488// also touch to indicate activity and make sure project running,489// in some cases.490this.touch_project();491}492if (fire_change_event) {493this.emit_change([key]);494}495496return new_val;497}498499private async touch_project(): Promise<void> {500if (this.project_id != null) {501try {502await this.client.touch_project(this.project_id);503} catch (err) {504// not fatal505console.warn("touch_project -- ", this.project_id, err);506}507}508}509510public close_no_async(): void {511if (this.state === "closed") {512// already closed513return;514}515// decrement the reference to this synctable516if (global_cache_decref(this)) {517// close: not zero -- so don't close it yet --518// still in use by possibly multiple clients519return;520}521522if (this.throttled_emit_changes != null) {523cancel_scheduled(this.throttled_emit_changes);524delete this.throttled_emit_changes;525}526527this.client.removeListener("disconnected", this.disconnected);528this.close_changefeed();529this.set_state("closed");530this.removeAllListeners();531delete this.value;532}533534public async close(fatal: boolean = false): Promise<void> {535if (this.state === "closed") {536// already closed537return;538}539if (!fatal) {540// do a last attempt at a save (so we don't lose data),541// then really close.542await this.save(); // attempt last save to database.543/*544The moment the sync part of _save is done, we remove listeners545and clear everything up. It's critical that as soon as close546is called that there be no possible way any further connect547events (etc) can make this SyncTable548do anything!! That finality assumption is made549elsewhere (e.g in @cocalc/project).550*/551}552this.close_no_async();553}554555public async wait(until: Function, timeout: number = 30): Promise<any> {556this.assert_not_closed("wait");557558return await wait({559obj: this,560until,561timeout,562change_event: "change-no-throttle",563});564}565566/* INTERNAL PRIVATE METHODS */567568private async first_connect(): Promise<void> {569try {570await this.connect();571this.update_has_uncommitted_changes();572} catch (err) {573console.warn(574`synctable: failed to connect (table=${this.table}), error=${err}`,575this.query,576);577this.close(true);578}579}580581private set_state(state: State): void {582this.state = state;583this.emit(state);584}585586public get_state(): State {587return this.state;588}589590public get_table(): string {591return this.table;592}593594private set_throttle_changes(): void {595// No throttling of change events, unless explicitly requested596// *or* part of the schema.597if (this.throttle_changes != null) return;598const t = schema.SCHEMA[this.table];599if (t == null) return;600const u = t.user_query;601if (u == null) return;602const g = u.get;603if (g == null) return;604this.throttle_changes = g.throttle_changes;605}606607private init_throttle_changes(): void {608this.set_throttle_changes();609610if (!this.throttle_changes) {611this.emit_change = (changed_keys: string[]) => {612this.emit("change", changed_keys);613this.emit("change-no-throttle", changed_keys);614};615return;616}617618// throttle emitting of change events619let all_changed_keys = {};620const do_emit_changes = () => {621//console.log("#{this.table} -- emitting changes", keys(all_changed_keys))622// CRITICAL: some code depends on emitting change even623// for the *empty* list of keys!624// E.g., projects page won't load for new users. This625// is the *change* from not loaded to being loaded,626// which does make sense.627this.emit("change", keys(all_changed_keys));628all_changed_keys = {};629};630this.throttled_emit_changes = throttle(631do_emit_changes,632this.throttle_changes,633);634this.emit_change = (changed_keys) => {635//console.log("emit_change", changed_keys);636this.dbg("emit_change")(changed_keys);637//console.log("#{this.table} -- queue changes", changed_keys)638for (const key of changed_keys) {639all_changed_keys[key] = true;640}641this.emit("change-no-throttle", changed_keys);642if (this.throttled_emit_changes != null) {643this.throttled_emit_changes();644}645};646}647648private dbg(_f?: string): Function {649if (!DEBUG) {650return () => {};651}652if (this.client.is_project()) {653return this.client.dbg(654`SyncTable('${JSON.stringify(this.query)}').${_f}`,655);656} else {657return (...args) => {658console.log(`synctable("${this.table}").${_f}: `, ...args);659};660}661}662663private async connect(): Promise<void> {664const dbg = this.dbg("connect");665dbg();666this.assert_not_closed("connect");667if (this.state === "connected") {668return;669}670671// 1. save, in case we have any local unsaved changes,672// then sync with upstream.673if (this.value != null) {674dbg("send off any local unsaved changes first");675await this.save();676}677678// 2. Now actually setup the changefeed.679// (Even if this.no_db_set is set, this still may do680// an initial query to the database. However, the changefeed681// does nothing further.)682dbg("actually setup changefeed");683await this.create_changefeed();684685dbg("connect should have succeeded");686}687688private async create_changefeed(): Promise<void> {689const dbg = this.dbg("create_changefeed");690if (this.get_state() == "closed") {691dbg("closed so don't do anything ever again");692return;693}694dbg("creating changefeed connection...");695let initval;696try {697initval = await this.create_changefeed_connection();698} catch (err) {699dbg("failed to create changefeed", err.toString());700// Typically this happens if synctable closed while701// creating the connection...702this.close();703throw err;704}705if (this.state == "closed") {706return;707}708dbg("got changefeed, now initializing table data");709this.init_changefeed_handlers();710const changed_keys = this.update_all(initval);711dbg("setting state to connected");712this.set_state("connected");713714// NOTE: Can't emit change event until after715// switching state to connected, which is why716// we do it here.717this.emit_change(changed_keys);718}719720private close_changefeed(): void {721if (this.changefeed == null) return;722this.remove_changefeed_handlers();723this.changefeed.close();724delete this.changefeed;725}726727private async create_changefeed_connection(): Promise<any[]> {728let delay_ms: number = 500;729while (true) {730this.close_changefeed();731this.changefeed = new Changefeed(this.changefeed_options());732await this.wait_until_ready_to_query_db();733try {734return await this.changefeed.connect();735} catch (err) {736if (is_fatal(err.toString())) {737console.warn("FATAL creating initial changefeed", this.table, err);738this.close(true);739throw err;740}741// This can happen because we might suddenly NOT be ready742// to query db immediately after we are ready...743console.warn(744`${this.table} -- failed to connect -- ${err}; will retry`,745);746await delay(delay_ms);747if (delay_ms < 8000) {748delay_ms *= 1.3;749}750}751}752}753754private async wait_until_ready_to_query_db(): Promise<void> {755const dbg = this.dbg("wait_until_ready_to_query_db");756757// Wait until we're ready to query the database.758let client_state: string;759760if (this.schema.anonymous || this.client.is_project()) {761// For anonymous tables (and for project accessing db),762// this just means the client is connected.763client_state = "connected";764} else {765// For non-anonymous tables, the client766// has to actually be signed in.767client_state = "signed_in";768}769770if (this.client[`is_${client_state}`]()) {771dbg("state already achieved -- no need to wait");772return;773}774775await once(this.client, client_state);776dbg(`success -- client emited ${client_state}`);777}778779private changefeed_options() {780return {781do_query: query_function(this.client.query, this.table),782query_cancel: this.client.query_cancel.bind(this.client),783options: this.options,784query: this.query,785table: this.table,786};787}788789private init_changefeed_handlers(): void {790if (this.changefeed == null) return;791this.changefeed.on("update", this.changefeed_on_update);792this.changefeed.on("close", this.changefeed_on_close);793}794795private remove_changefeed_handlers(): void {796if (this.changefeed == null) return;797this.changefeed.removeListener("update", this.changefeed_on_update);798this.changefeed.removeListener("close", this.changefeed_on_close);799}800801private changefeed_on_update(change): void {802this.update_change(change);803}804805private changefeed_on_close(): void {806this.set_state("disconnected");807this.create_changefeed();808}809810private disconnected(why: string): void {811const dbg = this.dbg("disconnected");812dbg(`why=${why}`);813if (this.state === "disconnected") {814dbg("already disconnected");815return;816}817this.set_state("disconnected");818}819820private obj_to_key(_): string | undefined {821// Return string key used in the immutable map in822// which this table is stored.823throw Error("this.obj_to_key must be set during initialization");824}825826private init_query(): void {827// Check that the query is probably valid, and828// record the table and schema829const tables = keys(this.query);830if (len(tables) !== 1) {831throw Error("must query only a single table");832}833this.table = tables[0];834this.schema = schema.SCHEMA[this.table];835if (this.schema == null) {836throw Error(`unknown schema for table ${this.table}`);837}838if (this.client.is_project()) {839this.client_query = this.schema.project_query;840} else {841this.client_query = this.schema.user_query;842}843if (this.client_query == null) {844throw Error(`no query schema allowing queries to ${this.table}`);845}846if (!is_array(this.query[this.table])) {847throw Error("must be a multi-document query");848}849this.primary_keys = schema.client_db.primary_keys(this.table);850// Check that all primary keys are in the query.851for (const primary_key of this.primary_keys) {852if (this.query[this.table][0][primary_key] === undefined) {853throw Error(854`must include each primary key in query of table '${this.table}', but you missed '${primary_key}'`,855);856}857}858// Check that all keys in the query are allowed by the schema.859for (const query_key of keys(this.query[this.table][0])) {860if (this.client_query.get.fields[query_key] === undefined) {861throw Error(862`every key in query of table '${this.table}' must` +863` be a valid user get field in the schema but '${query_key}' is not`,864);865}866}867868// Function this.to_key to extract primary key from object869if (this.primary_keys.length === 1) {870// very common case871const pk = this.primary_keys[0];872this.obj_to_key = (obj) => {873if (obj == null) {874return;875}876if (Map.isMap(obj)) {877return to_key(obj.get(pk));878} else {879return to_key(obj[pk]);880}881};882} else {883// compound primary key884this.obj_to_key = (obj) => {885if (obj == null) {886return;887}888const v: any[] = [];889if (Map.isMap(obj)) {890for (const pk of this.primary_keys) {891const a = obj.get(pk);892if (a == null) {893return;894}895v.push(a);896}897} else {898for (const pk of this.primary_keys) {899const a = obj[pk];900if (a == null) {901return;902}903v.push(a);904}905}906return to_key(v);907};908}909910if (this.client_query != null && this.client_query.set != null) {911// Initialize set_fields and required_set_fields.912const set = this.client_query.set;913for (const field of keys(this.query[this.table][0])) {914if (set.fields != null && set.fields[field]) {915this.set_fields.push(field);916}917if (set.required_fields != null && set.required_fields[field]) {918this.required_set_fields[field] = true;919}920}921}922}923924/* Send all unsent changes.925This function must not be called more than once at a time.926Returns boolean:927false -- there are no additional changes to be saved928true -- new changes may have appeared during the _save that929need to be saved.930931If writing to the database results in an error (but not due to no network),932then an error state is set (which client can consult), an even is emitted,933and we do not try to write to the database again until that error934state is cleared. One way it can be cleared is by changing the table.935*/936private async _save(): Promise<boolean> {937//console.log("_save");938const dbg = this.dbg("_save");939dbg();940if (this.get_state() == "closed") return false;941if (this.client_query.set == null) {942// Nothing to do -- can never set anything for this table.943// There are some tables (e.g., stats) where the remote values944// could change while user is offline, and the code below would945// result in warnings.946return false;947}948//console.log("_save", this.table);949dbg("waiting for network");950await this.wait_until_ready_to_query_db();951if (this.get_state() == "closed") return false;952dbg("waiting for value");953await this.wait_until_value();954if (this.get_state() == "closed") return false;955if (len(this.changes) === 0) return false;956if (this.value == null) {957throw Error("value must not be null");958}959960// Send our changes to the server.961const query: any[] = [];962const timed_changes: TimedChange[] = [];963const proposed_keys: { [key: string]: boolean } = {};964const changes = copy(this.changes);965//console.log("_save: send ", changes);966for (const key in this.changes) {967if (this.versions[key] === 0) {968proposed_keys[key] = true;969}970const x = this.value.get(key);971if (x == null) {972throw Error("delete is not implemented");973}974const obj = x.toJS();975976if (!this.no_db_set) {977// qobj is the db query version of obj, or at least the part978// of it that expresses what changed.979const qobj = {};980// Set the primary key part:981if (this.primary_keys.length === 1) {982qobj[this.primary_keys[0]] = key;983} else {984// unwrap compound primary key985const v = JSON.parse(key);986let i = 0;987for (const primary_key of this.primary_keys) {988qobj[primary_key] = v[i];989i += 1;990}991}992// Can only send set_field sets to the database. Of these,993// only send what actually changed.994const prev = this.last_save.get(key);995for (const k of this.set_fields) {996if (!x.has(k)) continue;997if (prev == null) {998qobj[k] = obj[k];999continue;1000}10011002// Convert to List to get a clean way to *compare* no1003// matter whether they are immutable.js objects or not!1004const a = List([x.get(k)]);1005const b = List([prev.get(k)]);1006if (!a.equals(b)) {1007qobj[k] = obj[k];1008}1009}10101011for (const k in this.required_set_fields) {1012if (qobj[k] == null) {1013qobj[k] = obj[k];1014}1015}10161017query.push({ [this.table]: qobj });1018}1019timed_changes.push({ obj, time: this.changes[key] });1020}1021dbg("sending timed-changes", timed_changes);1022this.emit("timed-changes", timed_changes);10231024if (!this.no_db_set) {1025try {1026const value = this.value;1027dbg("doing database query");1028await callback2(this.client.query, {1029query,1030options: [{ set: true }], // force it to be a set query1031timeout: 120, // give it some time (especially if it is long)1032});1033this.last_save = value; // success -- don't have to save this stuff anymore...1034} catch (err) {1035this.setError(err, query);1036dbg("db query failed", err);1037if (is_fatal(err.toString())) {1038console.warn("FATAL doing set", this.table, err);1039this.close(true);1040throw err;1041}1042// NOTE: we do not show entire log since the number1043// of entries in the query can be very large and just1044// converting them all to text could use a lot of memory (?).1045console.warn(1046`_save('${this.table}') set query error:`,1047err,1048" queries: ",1049query[0],1050"...",1051query.length - 1,1052" omitted",1053);1054return true;1055}1056}10571058if (this.get_state() == "closed") return false;1059if (this.value == null) {1060// should not happen1061return false;1062}10631064if (this.no_db_set) {1065// Not using changefeeds, so have to depend on other mechanisms1066// to update state. Wait until changes to proposed keys are1067// acknowledged by their version being assigned.1068try {1069dbg("waiting until versions are updated");1070await this.wait_until_versions_are_updated(proposed_keys, 5000);1071} catch (err) {1072dbg("waiting for versions timed out / failed");1073// took too long -- try again to send and receive changes.1074return true;1075}1076}10771078dbg("Record that we successfully sent these changes");1079for (const key in changes) {1080if (changes[key] == this.changes[key]) {1081delete this.changes[key];1082}1083}1084this.update_has_uncommitted_changes();10851086const is_done = len(this.changes) === 0;1087dbg("done? ", is_done);1088return !is_done;1089}10901091private setError(error: string, query: Query): void {1092this.error = { error, query };1093this.emit("error", this.error);1094}10951096public clearError(): void {1097this.error = undefined;1098this.emit("clear-error");1099}11001101private async wait_until_versions_are_updated(1102proposed_keys: { [key: string]: boolean },1103timeout_ms: number,1104): Promise<void> {1105const start_ms = Date.now();1106while (len(proposed_keys) > 0) {1107for (const key in proposed_keys) {1108if (this.versions[key] > 0) {1109delete proposed_keys[key];1110}1111}1112if (len(proposed_keys) > 0) {1113const elapsed_ms = Date.now() - start_ms;1114const keys: string[] = await once(1115this,1116"increased-versions",1117timeout_ms - elapsed_ms,1118);1119for (const key of keys) {1120delete proposed_keys[key];1121}1122}1123}1124}11251126// Return modified immutable Map, with all types coerced to be1127// as specified in the schema, if possible, or throw an exception.1128private do_coerce_types(1129changes: Map<string | number, any>,1130): Map<string | number, any> {1131if (!Map.isMap(changes)) {1132changes = Map(changes);1133}1134if (!this.coerce_types) {1135// no-op if coerce_types isn't set.1136return changes;1137}1138const t = schema.SCHEMA[this.table];1139if (t == null) {1140throw Error(`Missing schema for table ${this.table}`);1141}1142const fields = copy(t.fields);1143if (fields == null) {1144throw Error(`Missing fields part of schema for table ${this.table}`);1145}1146let specs;1147if (t.virtual != null) {1148if (t.virtual === true) {1149throw Error(`t.virtual can't be true for ${this.table}`);1150}1151const x = schema.SCHEMA[t.virtual];1152if (x == null) {1153throw Error(`invalid virtual table spec for ${this.table}`);1154}1155specs = copy(x.fields);1156if (specs == null) {1157throw Error(`invalid virtual table spec for ${this.table}`);1158}1159} else {1160specs = fields;1161}11621163if (typeof this.query != "string") {1164// explicit query (not just from schema)1165let x = this.query[this.table];1166if (is_array(x)) {1167x = x[0];1168}1169for (const k in fields) {1170if (x[k] === undefined) {1171delete fields[k];1172}1173}1174}1175return Map(1176changes.map((value, field) => {1177if (typeof field !== "string") {1178// satisfy typescript.1179return;1180}1181if (value == null) {1182// do not coerce null types1183return value;1184}1185if (fields[field] == null) {1186//console.warn(changes, fields);1187throw Error(1188`Cannot coerce: no field '${field}' in table '${this.table}'`,1189);1190}1191const spec = specs[field];1192let desired: string | undefined = spec.type || spec.pg_type;1193if (desired == null) {1194throw Error(`Cannot coerce: no type info for field ${field}`);1195}1196desired = desired.toLowerCase();11971198const actual = typeof value;1199if (desired === actual) {1200return value;1201}12021203// We can add more or less later...1204if (desired === "string" || desired.slice(0, 4) === "char") {1205if (actual !== "string") {1206// ensure is a string1207return `${value}`;1208}1209return value;1210}1211if (desired === "timestamp") {1212if (!(value instanceof Date)) {1213// make it a Date object. (usually converting from string rep)1214return new Date(value);1215}1216return value;1217}1218if (desired === "integer") {1219// always fine to do this -- will round floats, fix strings, etc.1220return parseInt(value);1221}1222if (desired === "number") {1223// actual wasn't number, so parse:1224return parseFloat(value);1225}1226if (desired === "array") {1227if (!List.isList(value)) {1228value = fromJS(value);1229if (!List.isList(value)) {1230throw Error(1231`field ${field} of table ${this.table} (value=${changes.get(1232field,1233)}) must convert to an immutable.js List`,1234);1235}1236}1237return value;1238}1239if (desired === "map") {1240if (!Map.isMap(value)) {1241value = Map(value);1242if (!Map.isMap(value)) {1243throw Error(1244`field ${field} of table ${this.table} (value=${changes.get(1245field,1246)}) must convert to an immutable.js Map`,1247);1248}1249}1250return value;1251}1252if (desired === "boolean") {1253// actual wasn't boolean, so coerce.1254return !!value;1255}1256if (desired === "uuid") {1257assert_uuid(value);1258return value;1259}1260return value;1261}),1262);1263}12641265/*1266Handle an update of all records from the database.1267This happens on initialization, and also if we1268disconnect and reconnect.1269*/1270private update_all(v: any[]): any[] {1271//const dbg = this.dbg("update_all");12721273if (this.state === "closed") {1274// nothing to do -- just ignore updates from db1275throw Error("makes no sense to do update_all when state is closed.");1276}12771278this.emit("before-change");1279// Restructure the array of records in v as a mapping1280// from the primary key to the corresponding record.1281const x = {};1282for (const y of v) {1283const key = this.obj_to_key(y);1284if (key != null) {1285x[key] = y;1286// initialize all version numbers1287this.versions[key] = this.initial_version;1288}1289}1290const changed_keys = keys(x); // of course all keys have been changed.1291this.emit("increased-versions", changed_keys);12921293this.value = fromJS(x);1294if (this.value == null) {1295throw Error("bug");1296}1297this.last_save = this.value;1298if (this.coerce_types) {1299// Ensure all values are properly coerced, as specified1300// in the database schema. This is important, e.g., since1301// when mocking the client db query, JSON is involved and1302// timestamps are not parsed to Date objects.1303this.value = <Map<string, Map<string, any>>>this.value.map((val, _) => {1304if (val == null) {1305throw Error("val must not be null");1306}1307return this.do_coerce_types(val);1308});1309}13101311// It's possibly that nothing changed (e.g., typical case1312// on reconnect!) so we check.1313// If something really did change, we set the server1314// state to what we just got, and1315// also inform listeners of which records changed (by giving keys).1316//console.log("update_all: changed_keys=", changed_keys)1317if (this.state === "connected") {1318// When not yet connected, initial change is emitted1319// by function that sets up the changefeed. We are1320// connected here, so we are responsible for emitting1321// this change.1322this.emit_change(changed_keys);1323}13241325this.emit("init-value-server");1326return changed_keys;1327}13281329public initial_version_for_browser_client(): VersionedChange[] {1330if (this.value == null) {1331throw Error("value must not be null");1332}1333const x: VersionedChange[] = [];1334this.value.forEach((val, key) => {1335if (val == null) {1336throw Error("val must be non-null");1337}1338const obj = val.toJS();1339if (obj == null) {1340throw Error("obj must be non-null");1341}1342if (key == null) {1343throw Error("key must not be null");1344}1345const version = this.versions[key];1346if (version == null) {1347throw Error("version must not be null");1348}13491350x.push({ obj, version });1351});1352return x;1353}13541355public init_browser_client(changes: VersionedChange[]): void {1356const dbg = this.dbg("init_browser_client");1357dbg(`applying ${changes.length} versioned changes`);1358// The value before doing init (which happens precisely when project1359// synctable is reset). See note below.1360const before = this.value;1361const received_keys = this.apply_changes_to_browser_client(changes);1362if (before != null) {1363before.forEach((_, key) => {1364if (key == null || received_keys[key]) return; // received as part of init1365if (this.changes[key] && this.versions[key] == 0) return; // not event sent yet1366// This key was known and confirmed sent before init, but1367// didn't get sent back this time. So it was lost somehow,1368// e.g., due to not getting saved to the database and the project1369// (or table in the project) getting restarted.1370dbg(`found lost: key=${key}`);1371// So we will try to send out it again.1372if (!this.changes[key]) {1373this.changes[key] = this.unique_server_time();1374this.update_has_uncommitted_changes();1375}1376// So we don't view it as having any known version1377// assigned by project, since the project lost it.1378this.null_version(key);1379});1380if (len(this.changes) > 0) {1381this.save(); // kick off a save of our unsaved lost work back to the project.1382}1383}1384/*1385NOTE: The problem solved here is the following. Imagine the project1386synctable is killed, and it has acknowledge a change C from a1387web browser client, but has NOT saved that change to the central1388postgreSQL database (or someday, maybe a local SQLite database).1389Then when the project synctable is resurrected, it uses the database1390for its initial state, and it knows nothing about C. The1391browser thinks that C has been successfully written and broadcast1392to everybody, so the browser doesn't send C again. The result is1393that the browser and the project would be forever out of sync.1394Note that we only care about lost changes that some browser knows1395about -- if no browser knows about them, then the fact they are1396lost won't break sync. Also, for file editing, data is regularly1397saved to disk, so if the browser sends a change that is lost due to1398the project being killed before writing to the database, then the1399browser terminates too, then that change is completely lost. However,1400everybody will start again with at least the last version of the file1401**saved to disk,** which is basically what people may expect as a1402worst case.14031404The solution to the above problem is to look at what key:value pairs1405we know about that the project didn't just send back to us. If there1406are any that were reported as committed, but they vanished, then we1407set them as unsent and send them again.1408*/1409}14101411public apply_changes_to_browser_client(changes: VersionedChange[]): {1412[key: string]: boolean;1413} {1414const dbg = this.dbg("apply_changes_to_browser_client");1415dbg("got ", changes.length, "changes");1416this.assert_not_closed("apply_changes_to_browser_client");1417if (this.value == null) {1418// initializing the synctable for the first time.1419this.value = Map();1420}14211422this.emit("before-change");1423const changed_keys: string[] = [];1424const increased_versions: string[] = [];1425const received_keys: { [key: string]: boolean } = {};1426for (const change of changes) {1427const { obj, version } = change;1428const new_val = this.do_coerce_types(fromJS(obj));1429const key = this.obj_to_key(new_val);1430if (key == null) {1431throw Error("object results in null key");1432}1433received_keys[key] = true;1434const cur_version = this.versions[key] ? this.versions[key] : 0;1435if (cur_version > version) {1436// nothing further to do.1437continue;1438}1439if (this.handle_new_val(new_val, undefined, "insert", false)) {1440// really did make a change.1441changed_keys.push(key);1442}1443// Update our version number to the newer version.1444this.versions[key] = version;1445increased_versions.push(key);1446}14471448if (increased_versions.length > 0) {1449this.emit("increased-versions", increased_versions);1450}14511452if (changed_keys.length > 0) {1453this.emit_change(changed_keys);1454}1455return received_keys;1456}14571458public apply_changes_from_browser_client(changes: TimedChange[]): void {1459const dbg = this.dbg("apply_changes_from_browser_client");1460dbg("project <-- changes -- client", JSON.stringify(changes));1461const changed_keys: string[] = [];1462const versioned_changes: VersionedChange[] = [];1463for (const change of changes) {1464const { obj, time } = change;1465if (obj == null) {1466throw Error("obj must not be null");1467}1468const new_val = this.do_coerce_types(fromJS(obj));1469const key = this.obj_to_key(new_val); // must have been coerced!1470if (key == null) {1471throw Error("object results in null key");1472}1473const cur_time = this.changes[key];1474if (cur_time != null && cur_time > time) {1475dbg("already have a more recent version");1476// We already have a more recent update to this object.1477// We push that new version out again, just in case.1478if (this.value == null) {1479throw Error("value must not be null");1480}1481let obj: any = this.value.get(key);1482if (obj == null) {1483throw Error(`there must be an object in this.value with key ${key}`);1484}1485obj = obj.toJS();1486const version = this.versions[key];1487if (version == null) {1488throw Error(`object with key ${key} must have a version`);1489}1490versioned_changes.push({ obj, version });1491continue;1492}1493if (this.handle_new_val(new_val, undefined, "insert", false)) {1494const version = this.increment_version(key);1495this.changes[key] = time;1496this.update_has_uncommitted_changes();1497versioned_changes.push({ obj: new_val.toJS(), version });1498changed_keys.push(key);1499}1500}1501if (changed_keys.length > 0) {1502this.emit_change(changed_keys);1503}1504if (versioned_changes.length > 0) {1505this.emit("versioned-changes", versioned_changes);1506}1507dbg("project -- versioned --> clients", JSON.stringify(versioned_changes));1508}15091510private increment_version(key: string): number {1511if (this.versions[key] == null) {1512this.versions[key] = this.initial_version;1513} else {1514this.versions[key] += 1;1515}1516this.emit("increased-versions", [key]);1517return this.versions[key];1518}15191520private null_version(key: string): void {1521this.versions[key] = 0;1522}15231524/*1525Apply one incoming change from the database to the1526in-memory table.1527*/1528private update_change(change): void {1529if (this.state === "closed") {1530// We might get a few more updates even after1531// canceling the changefeed, so we just ignore them.1532return;1533}1534if (this.value == null) {1535console.warn(`update_change(${this.table}): ignored`);1536return;1537}1538this.emit("before-change");1539const changed_keys: string[] = [];1540const key = this.handle_new_val(1541change.new_val,1542change.old_val,1543change.action,1544this.coerce_types,1545);1546if (key != null) {1547changed_keys.push(key);1548}15491550//console.log("update_change: changed_keys=", changed_keys)1551if (changed_keys.length > 0) {1552//console.log("_update_change: change")1553this.emit_change(changed_keys);1554}1555}15561557// Returns current time (in ms since epoch) on server,1558// but if there are multiple requests at the same time,1559// the clock is artificially incremented to ensure uniqueness.1560// Also, this time is thus always strictly increasing.1561private unique_server_time(): number {1562let tm = this.client.server_time().valueOf();1563if (tm <= this.last_server_time) {1564tm = this.last_server_time + 1;1565}1566this.last_server_time = tm;1567return tm;1568}15691570// - returns key only if obj actually changed things.1571private handle_new_val(1572new_val: any,1573old_val: any,1574action: string,1575coerce: boolean,1576): string | undefined {1577if (this.value == null) {1578// to satisfy typescript.1579throw Error("value must be initialized");1580}15811582if (action === "delete") {1583old_val = fromJS(old_val);1584if (old_val == null) {1585throw Error("old_val must not be null for delete action");1586}1587if (coerce && this.coerce_types) {1588old_val = this.do_coerce_types(old_val);1589}1590const key = this.obj_to_key(old_val);1591if (key == null || !this.value.has(key)) {1592return; // already gone1593}1594this.value = this.value.delete(key);1595return key;1596}15971598new_val = fromJS(new_val);1599if (new_val == null) {1600throw Error("new_val must not be null for insert or update action");1601}1602if (coerce && this.coerce_types) {1603new_val = this.do_coerce_types(new_val);1604}1605const key = this.obj_to_key(new_val);1606if (key == null) {1607// This means the primary key is null or missing, which1608// shouldn't happen. Maybe it could in some edge case.1609// For now, we shouldn't let this break everything, so:1610return undefined;1611// throw Error("key must not be null");1612}1613const cur_val = this.value.get(key);1614if (action === "update" && cur_val != null) {1615// For update actions, we shallow *merge* in the change.1616// For insert action, we just replace the whole thing.1617new_val = cur_val.merge(new_val);1618}1619if (!new_val.equals(cur_val)) {1620this.value = this.value.set(key, new_val);1621return key;1622}1623return undefined;1624}16251626/*1627obj is an immutable.js Map without the primary key1628set. If the database schema defines a way to compute1629the primary key from other keys, try to use it here.1630This function returns the computed primary key (array or string)1631if it works, and returns undefined otherwise.1632*/1633private computed_primary_key(obj): string[] | string | undefined {1634let f;1635if (this.primary_keys.length === 1) {1636f = this.client_query.set.fields[this.primary_keys[0]];1637if (typeof f === "function") {1638return f(obj.toJS(), schema.client_db);1639} else {1640return;1641}1642} else {1643const v: string[] = [];1644for (const pk of this.primary_keys) {1645f = this.client_query.set.fields[pk];1646if (typeof f === "function") {1647v.push(f(obj.toJS(), schema.client_db));1648} else {1649return;1650}1651}1652return v;1653}1654}16551656private assert_not_closed(desc: string): void {1657if (this.state === "closed") {1658//console.trace();1659throw Error(1660`the synctable "${this.table}" must not be closed -- ${desc}`,1661);1662}1663}16641665// **WARNING:** Right now this *barely* works at all... due to1666// barely being implemented since I mostly haven't needed it.1667// It will delete the object from the database, but if some1668// client still has the object, they can end up just writing1669// it back.1670public async delete(obj): Promise<void> {1671// Table spec must have set.delete = true.1672// This function does a direct database query to delete1673// the entry with primary key described by obj from1674// the database. That will have the side effect slightly1675// later of removing the object from this table. This1676// thus works differently than making changes or1677// creating new entries, at least right now (since1678// implementing this properly is a lot of work but1679// not used much).16801681const query = { [this.table]: obj };1682const options = [{ delete: true }];1683await callback2(this.client.query, { query, options });1684}1685}168616871688