CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/sync/table/synctable.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
8
Variations: Instead of making this class really complicated
9
with many different ways to do sync (e.g, changefeeds, project
10
websockets, unit testing, etc.), we have one single approach via
11
a Client that has a certain interface. Then we implement different
12
Clients that have this interface, in order to support different
13
ways of orchestrating a SyncTable.
14
*/
15
16
// If true, will log to the console a huge amount of
17
// info about every get/set
18
let DEBUG: boolean = false;
19
20
export function set_debug(x: boolean): void {
21
DEBUG = x;
22
}
23
24
import { delay } from "awaiting";
25
import { global_cache_decref } from "./global-cache";
26
import { EventEmitter } from "events";
27
import { Map, fromJS, List } from "immutable";
28
import { keys, throttle } from "lodash";
29
import { callback2, cancel_scheduled, once } from "@cocalc/util/async-utils";
30
import { wait } from "@cocalc/util/async-wait";
31
import { query_function } from "./query-function";
32
import { assert_uuid, copy, is_array, is_object, len } from "@cocalc/util/misc";
33
import * as schema from "@cocalc/util/schema";
34
import mergeDeep from "@cocalc/util/immutable-deep-merge";
35
import type { Client } from "@cocalc/sync/client/types";
36
export type { Client };
37
38
export type Query = any; // TODO typing
39
export type QueryOptions = any[]; // TODO typing
40
41
export type MergeType = "deep" | "shallow" | "none";
42
43
export interface VersionedChange {
44
obj: { [key: string]: any };
45
version: number;
46
}
47
48
export interface TimedChange {
49
obj: { [key: string]: any };
50
time: number; // ms since epoch
51
}
52
53
function is_fatal(err: string): boolean {
54
return err.indexOf("FATAL") != -1;
55
}
56
57
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
58
59
import { Changefeed } from "./changefeed";
60
import { parse_query, to_key } from "./util";
61
62
export type State = "disconnected" | "connected" | "closed";
63
64
export class SyncTable extends EventEmitter {
65
private changefeed?: Changefeed;
66
private query: Query;
67
private client_query: any;
68
private primary_keys: string[];
69
private options: QueryOptions;
70
public readonly client: Client;
71
private throttle_changes?: number;
72
private throttled_emit_changes?: Function;
73
private last_server_time: number = 0;
74
private error: { error: string; query: Query } | undefined = undefined;
75
76
// This can optionally be set on a SyncTable implementation.
77
// E.g., it is supported for the version in
78
// packages/sync/client/synctable-project
79
// so that a table connected to a project can make a change based
80
// on fact client disconnected (e.g., clear its cursor).
81
public setOnDisconnect?: (changes: any, merge) => void;
82
// Optional function that is available for direct
83
// communication with project, in case synctable is backed
84
// by a project. Clients can send a message using
85
// this function, and the project synctable will
86
// emit a 'message-from-project' event when it receives such a message.
87
public sendMessageToProject?: (data) => void;
88
89
// Immutable map -- the value of this synctable.
90
private value?: Map<string, Map<string, any>>;
91
private last_save: Map<string, Map<string, any>> = Map();
92
93
// Which records we have changed (and when, by server time),
94
// that haven't been sent to the backend.
95
private changes: { [key: string]: number } = {};
96
97
// The version of each record.
98
private versions: { [key: string]: number } = {};
99
100
// The inital version is only used in the project, where we
101
// just assume the clock is right. If this were totally
102
// off/changed, then clients would get confused -- until they
103
// close and open the file or refresh their browser. It might
104
// be better to switch to storing the current version number
105
// on disk.
106
private initial_version: number = Date.now();
107
108
// disconnected <--> connected --> closed
109
private state: State;
110
public table: string;
111
private schema: any;
112
private emit_change: Function;
113
public reference_count: number = 0;
114
public cache_key: string | undefined;
115
// Which fields the user is allowed to set/change.
116
// Gets updated during init.
117
private set_fields: string[] = [];
118
// Which fields *must* be included in any set query.
119
// Also updated during init.
120
private required_set_fields: { [key: string]: boolean } = {};
121
122
// Coerce types and generally do strong checking of all
123
// types using the schema. Do this unless you have a very
124
// good reason not to!
125
private coerce_types: boolean = true;
126
127
// If set, then the table is assumed to be managed
128
// entirely externally (using events).
129
// This is used by the synctables that are managed
130
// entirely by the project (e.g., sync-doc support).
131
private no_db_set: boolean = false;
132
133
// Set only for some tables.
134
private project_id?: string;
135
136
private last_has_uncommitted_changes?: boolean = undefined;
137
138
// This is used only in synctable-project.ts for a communications channel
139
// for Jupyter on compute servers.
140
public channel?: any;
141
142
constructor(
143
query,
144
options: any[],
145
client: Client,
146
throttle_changes?: number,
147
coerce_types?: boolean,
148
no_db_set?: boolean,
149
project_id?: string,
150
) {
151
super();
152
153
if (coerce_types != undefined) {
154
this.coerce_types = coerce_types;
155
}
156
if (no_db_set != undefined) {
157
this.no_db_set = no_db_set;
158
}
159
if (project_id != undefined) {
160
this.project_id = project_id;
161
}
162
163
if (is_array(query)) {
164
throw Error("must be a single query, not array of queries");
165
}
166
167
this.set_state("disconnected");
168
169
this.changefeed_on_update = this.changefeed_on_update.bind(this);
170
this.changefeed_on_close = this.changefeed_on_close.bind(this);
171
172
this.setMaxListeners(100);
173
this.query = parse_query(query);
174
this.options = options;
175
this.client = client;
176
this.throttle_changes = throttle_changes;
177
178
this.init_query();
179
this.init_throttle_changes();
180
181
// So only ever runs once at a time.
182
this.save = reuseInFlight(this.save.bind(this));
183
this.first_connect();
184
}
185
186
/* PUBLIC API */
187
188
// is_ready is true if the table has been initialized and not yet closed.
189
// It might *not* be currently connected, due to a temporary network
190
// disconnect. When is_ready is true you can read and write to this table,
191
// but there is no guarantee things aren't temporarily stale.
192
public is_ready(): boolean {
193
return this.value != null && this.state !== "closed";
194
}
195
196
/*
197
Return true if there are changes to this synctable that
198
have NOT been confirmed as saved to the backend database.
199
(Always returns false when not yet initialized.)
200
*/
201
public has_uncommitted_changes(): boolean {
202
if (this.state === "closed") {
203
return false; // if closed, can't have any uncommitted changes.
204
}
205
return len(this.changes) !== 0;
206
}
207
208
/* Gets records from this table.
209
- arg = not given: returns everything (as an
210
immutable map from key to obj)
211
- arg = array of keys; return map from key to obj
212
- arg = single key; returns corresponding object
213
214
This is NOT a generic query mechanism. SyncTable
215
is really best thought of as a key:value store!
216
*/
217
public get(arg?): Map<string, any> | undefined {
218
this.assert_not_closed("get");
219
220
if (this.value == null) {
221
throw Error("table not yet initialized");
222
}
223
224
if (arg == null) {
225
return this.value;
226
}
227
228
if (is_array(arg)) {
229
let x: Map<string, Map<string, any>> = Map();
230
for (const k of arg) {
231
const key: string | undefined = to_key(k);
232
if (key != null) {
233
const y = this.value.get(key);
234
if (y != null) {
235
x = x.set(key, y);
236
}
237
}
238
}
239
return x;
240
} else {
241
const key = to_key(arg);
242
if (key != null) {
243
return this.value.get(key);
244
}
245
}
246
}
247
248
/* Return the number of records in the table. */
249
public size(): number {
250
this.assert_not_closed("size");
251
if (this.value == null) {
252
throw Error("table not yet initialized");
253
}
254
return this.value.size;
255
}
256
257
/*
258
Get one record from this table. Especially useful when
259
there is only one record, which is an important special
260
case (a so-called "wide" table?.)
261
*/
262
public get_one(arg?): Map<string, any> | undefined {
263
if (this.value == null) {
264
throw Error("table not yet initialized");
265
}
266
267
if (arg == null) {
268
return this.value.toSeq().first();
269
} else {
270
// get only returns (at most) one object, so it's "get_one".
271
return this.get(arg);
272
}
273
}
274
275
private async wait_until_value(): Promise<void> {
276
if (this.value != null) return;
277
// can't save until server sends state. We wait.
278
await once(this, "init-value-server");
279
if (this.value == null) {
280
throw Error("bug -- change should initialize value");
281
}
282
}
283
284
/*
285
Ensure any unsent changes are sent to the backend.
286
When this function returns there are no unsent changes,
287
since it keeps calling _save until nothing has changed
288
locally.
289
*/
290
public async save(): Promise<void> {
291
const dbg = this.dbg("save");
292
//console.log("synctable SAVE");
293
if (this.state === "closed") {
294
// Not possible to save. save is wrapped in
295
// reuseInFlight, which debounces, so it's very
296
// reasonable that an attempt to call this would
297
// finally fire after a close (which is sync).
298
// Throwing an error hit would (and did) actually
299
// crash projects on the backend in production,
300
// so this has to be a warning.
301
dbg("WARNING: called save on closed synctable");
302
return;
303
}
304
if (this.value == null) {
305
// nothing to save yet
306
return;
307
}
308
309
while (this.has_uncommitted_changes()) {
310
if (this.error) {
311
// do not try to save when there's an error since that
312
// won't help. Need to attempt to fix it first.
313
dbg("WARNING: not saving ", this.error);
314
return;
315
}
316
//console.log("SAVE -- has uncommitted changes, so trying again.");
317
if (this.state !== "connected") {
318
// wait for state change.
319
// This could take a long time, and that is fine.
320
await once(this, "state");
321
}
322
if (this.state === "connected") {
323
if (!(await this._save())) {
324
this.update_has_uncommitted_changes();
325
return;
326
}
327
}
328
// else switched to something else (?), so
329
// loop around and wait again for a change...
330
}
331
}
332
333
private update_has_uncommitted_changes(): void {
334
const cur = this.has_uncommitted_changes();
335
if (cur !== this.last_has_uncommitted_changes) {
336
this.emit("has-uncommitted-changes", cur);
337
this.last_has_uncommitted_changes = cur;
338
}
339
}
340
341
/*
342
set -- Changes (or creates) one entry in the table.
343
The input field changes is either an Immutable.js Map or a JS Object map.
344
If changes does not have the primary key then a random record is updated,
345
and there *must* be at least one record. Exception: computed primary
346
keys will be computed (see stuff about computed primary keys above).
347
The second parameter 'merge' can be one of three values:
348
'deep' : (DEFAULT) deep merges the changes into the record, keep as much info as possible.
349
'shallow': shallow merges, replacing keys by corresponding values
350
'none' : do no merging at all -- just replace record completely
351
Raises an exception if something goes wrong doing the set.
352
Returns updated value otherwise.
353
354
DOES NOT cause a save.
355
356
NOTE: we always use db schema to ensure types are correct,
357
converting if necessary. This has a performance impact,
358
but is worth it for sanity's sake!!!
359
*/
360
public set(
361
changes: any,
362
merge: MergeType = "deep",
363
fire_change_event: boolean = true,
364
): any {
365
if (this.value == null) {
366
throw Error("can't set until table is initialized");
367
}
368
369
if (!Map.isMap(changes)) {
370
changes = fromJS(changes);
371
if (!is_object(changes)) {
372
throw Error(
373
"type error -- changes must be an immutable.js Map or JS map",
374
);
375
}
376
}
377
if (DEBUG) {
378
//console.log(`set('${this.table}'): ${JSON.stringify(changes.toJS())}`);
379
}
380
// For sanity!
381
changes = this.do_coerce_types(changes);
382
// Ensure that each key is allowed to be set.
383
if (this.client_query.set == null) {
384
throw Error(`users may not set ${this.table}`);
385
}
386
387
const can_set = this.client_query.set.fields;
388
changes.map((_, k) => {
389
if (can_set[k] === undefined) {
390
throw Error(`users may not set ${this.table}.${k}`);
391
}
392
});
393
// Determine the primary key's value
394
let key: string | undefined = this.obj_to_key(changes);
395
if (key == null) {
396
// attempt to compute primary key if it is a computed primary key
397
let key0 = this.computed_primary_key(changes);
398
key = to_key(key0);
399
if (key == null && this.primary_keys.length === 1) {
400
// use a "random" primary key from existing data
401
key0 = key = this.value.keySeq().first();
402
}
403
if (key == null) {
404
throw Error(
405
`must specify primary key ${this.primary_keys.join(
406
",",
407
)}, have at least one record, or have a computed primary key`,
408
);
409
}
410
// Now key is defined
411
if (this.primary_keys.length === 1) {
412
changes = changes.set(this.primary_keys[0], key0);
413
} else if (this.primary_keys.length > 1) {
414
if (key0 == null) {
415
// to satisfy typescript.
416
throw Error("bug -- computed primary key must be an array");
417
}
418
let i = 0;
419
for (const pk of this.primary_keys) {
420
changes = changes.set(pk, key0[i]);
421
i += 1;
422
}
423
}
424
}
425
426
// Get the current value
427
const cur = this.value.get(key);
428
let new_val;
429
430
if (cur == null) {
431
// No record with the given primary key. Require that
432
// all the this.required_set_fields are specified, or
433
// it will become impossible to sync this table to
434
// the backend.
435
for (const k in this.required_set_fields) {
436
if (changes.get(k) == null) {
437
throw Error(`must specify field '${k}' for new records`);
438
}
439
}
440
// If no current value, then next value is easy -- it equals the current value in all cases.
441
new_val = changes;
442
} else {
443
// Use the appropriate merge strategy to get the next val.
444
switch (merge) {
445
case "deep":
446
new_val = mergeDeep(cur, changes);
447
break;
448
case "shallow":
449
new_val = cur.merge(changes);
450
break;
451
case "none":
452
new_val = changes;
453
break;
454
default:
455
throw Error("merge must be one of 'deep', 'shallow', 'none'");
456
}
457
}
458
459
if (new_val.equals(cur)) {
460
// nothing actually changed, so nothing further to do.
461
return new_val;
462
}
463
464
// clear error state -- the change may be just what is needed
465
// to fix the error, e.g., attempting to save an invalid account
466
// setting, then fixing it.
467
this.clearError();
468
469
for (const field in this.required_set_fields) {
470
if (!new_val.has(field)) {
471
throw Error(
472
`missing required set field ${field} of table ${this.table}`,
473
);
474
}
475
}
476
477
// Something changed:
478
this.value = this.value.set(key, new_val);
479
this.changes[key] = this.unique_server_time();
480
this.update_has_uncommitted_changes();
481
if (this.client.is_project()) {
482
// project assigns versions
483
const version = this.increment_version(key);
484
const obj = new_val.toJS();
485
this.emit("versioned-changes", [{ obj, version }]);
486
} else {
487
// browser gets them assigned...
488
this.null_version(key);
489
// also touch to indicate activity and make sure project running,
490
// in some cases.
491
this.touch_project();
492
}
493
if (fire_change_event) {
494
this.emit_change([key]);
495
}
496
497
return new_val;
498
}
499
500
private async touch_project(): Promise<void> {
501
if (this.project_id != null) {
502
try {
503
await this.client.touch_project(this.project_id);
504
} catch (err) {
505
// not fatal
506
console.warn("touch_project -- ", this.project_id, err);
507
}
508
}
509
}
510
511
public close_no_async(): void {
512
if (this.state === "closed") {
513
// already closed
514
return;
515
}
516
// decrement the reference to this synctable
517
if (global_cache_decref(this)) {
518
// close: not zero -- so don't close it yet --
519
// still in use by possibly multiple clients
520
return;
521
}
522
523
if (this.throttled_emit_changes != null) {
524
cancel_scheduled(this.throttled_emit_changes);
525
delete this.throttled_emit_changes;
526
}
527
528
this.client.removeListener("disconnected", this.disconnected);
529
this.close_changefeed();
530
this.set_state("closed");
531
this.removeAllListeners();
532
delete this.value;
533
}
534
535
public async close(fatal: boolean = false): Promise<void> {
536
if (this.state === "closed") {
537
// already closed
538
return;
539
}
540
if (!fatal) {
541
// do a last attempt at a save (so we don't lose data),
542
// then really close.
543
await this.save(); // attempt last save to database.
544
/*
545
The moment the sync part of _save is done, we remove listeners
546
and clear everything up. It's critical that as soon as close
547
is called that there be no possible way any further connect
548
events (etc) can make this SyncTable
549
do anything!! That finality assumption is made
550
elsewhere (e.g in @cocalc/project).
551
*/
552
}
553
this.close_no_async();
554
}
555
556
public async wait(until: Function, timeout: number = 30): Promise<any> {
557
this.assert_not_closed("wait");
558
559
return await wait({
560
obj: this,
561
until,
562
timeout,
563
change_event: "change-no-throttle",
564
});
565
}
566
567
/* INTERNAL PRIVATE METHODS */
568
569
private async first_connect(): Promise<void> {
570
try {
571
await this.connect();
572
this.update_has_uncommitted_changes();
573
} catch (err) {
574
console.warn(
575
`synctable: failed to connect (table=${this.table}), error=${err}`,
576
this.query,
577
);
578
this.close(true);
579
}
580
}
581
582
private set_state(state: State): void {
583
this.state = state;
584
this.emit(state);
585
}
586
587
public get_state(): State {
588
return this.state;
589
}
590
591
public get_table(): string {
592
return this.table;
593
}
594
595
private set_throttle_changes(): void {
596
// No throttling of change events, unless explicitly requested
597
// *or* part of the schema.
598
if (this.throttle_changes != null) return;
599
const t = schema.SCHEMA[this.table];
600
if (t == null) return;
601
const u = t.user_query;
602
if (u == null) return;
603
const g = u.get;
604
if (g == null) return;
605
this.throttle_changes = g.throttle_changes;
606
}
607
608
private init_throttle_changes(): void {
609
this.set_throttle_changes();
610
611
if (!this.throttle_changes) {
612
this.emit_change = (changed_keys: string[]) => {
613
this.emit("change", changed_keys);
614
this.emit("change-no-throttle", changed_keys);
615
};
616
return;
617
}
618
619
// throttle emitting of change events
620
let all_changed_keys = {};
621
const do_emit_changes = () => {
622
//console.log("#{this.table} -- emitting changes", keys(all_changed_keys))
623
// CRITICAL: some code depends on emitting change even
624
// for the *empty* list of keys!
625
// E.g., projects page won't load for new users. This
626
// is the *change* from not loaded to being loaded,
627
// which does make sense.
628
this.emit("change", keys(all_changed_keys));
629
all_changed_keys = {};
630
};
631
this.throttled_emit_changes = throttle(
632
do_emit_changes,
633
this.throttle_changes,
634
);
635
this.emit_change = (changed_keys) => {
636
//console.log("emit_change", changed_keys);
637
this.dbg("emit_change")(changed_keys);
638
//console.log("#{this.table} -- queue changes", changed_keys)
639
for (const key of changed_keys) {
640
all_changed_keys[key] = true;
641
}
642
this.emit("change-no-throttle", changed_keys);
643
if (this.throttled_emit_changes != null) {
644
this.throttled_emit_changes();
645
}
646
};
647
}
648
649
private dbg(_f?: string): Function {
650
if (!DEBUG) {
651
return () => {};
652
}
653
if (this.client.is_project()) {
654
return this.client.dbg(
655
`SyncTable('${JSON.stringify(this.query)}').${_f}`,
656
);
657
} else {
658
return (...args) => {
659
console.log(`synctable("${this.table}").${_f}: `, ...args);
660
};
661
}
662
}
663
664
private async connect(): Promise<void> {
665
const dbg = this.dbg("connect");
666
dbg();
667
this.assert_not_closed("connect");
668
if (this.state === "connected") {
669
return;
670
}
671
672
// 1. save, in case we have any local unsaved changes,
673
// then sync with upstream.
674
if (this.value != null) {
675
dbg("send off any local unsaved changes first");
676
await this.save();
677
}
678
679
// 2. Now actually setup the changefeed.
680
// (Even if this.no_db_set is set, this still may do
681
// an initial query to the database. However, the changefeed
682
// does nothing further.)
683
dbg("actually setup changefeed");
684
await this.create_changefeed();
685
686
dbg("connect should have succeeded");
687
}
688
689
private async create_changefeed(): Promise<void> {
690
const dbg = this.dbg("create_changefeed");
691
if (this.get_state() == "closed") {
692
dbg("closed so don't do anything ever again");
693
return;
694
}
695
dbg("creating changefeed connection...");
696
let initval;
697
try {
698
initval = await this.create_changefeed_connection();
699
} catch (err) {
700
dbg("failed to create changefeed", err.toString());
701
// Typically this happens if synctable closed while
702
// creating the connection...
703
this.close();
704
throw err;
705
}
706
if (this.state == "closed") {
707
return;
708
}
709
dbg("got changefeed, now initializing table data");
710
this.init_changefeed_handlers();
711
const changed_keys = this.update_all(initval);
712
dbg("setting state to connected");
713
this.set_state("connected");
714
715
// NOTE: Can't emit change event until after
716
// switching state to connected, which is why
717
// we do it here.
718
this.emit_change(changed_keys);
719
}
720
721
private close_changefeed(): void {
722
if (this.changefeed == null) return;
723
this.remove_changefeed_handlers();
724
this.changefeed.close();
725
delete this.changefeed;
726
}
727
728
private async create_changefeed_connection(): Promise<any[]> {
729
let delay_ms: number = 500;
730
while (true) {
731
this.close_changefeed();
732
this.changefeed = new Changefeed(this.changefeed_options());
733
await this.wait_until_ready_to_query_db();
734
try {
735
return await this.changefeed.connect();
736
} catch (err) {
737
if (is_fatal(err.toString())) {
738
console.warn("FATAL creating initial changefeed", this.table, err);
739
this.close(true);
740
throw err;
741
}
742
// This can happen because we might suddenly NOT be ready
743
// to query db immediately after we are ready...
744
console.warn(
745
`${this.table} -- failed to connect -- ${err}; will retry`,
746
);
747
await delay(delay_ms);
748
if (delay_ms < 8000) {
749
delay_ms *= 1.3;
750
}
751
}
752
}
753
}
754
755
private async wait_until_ready_to_query_db(): Promise<void> {
756
const dbg = this.dbg("wait_until_ready_to_query_db");
757
758
// Wait until we're ready to query the database.
759
let client_state: string;
760
761
if (this.schema.anonymous || this.client.is_project()) {
762
// For anonymous tables (and for project accessing db),
763
// this just means the client is connected.
764
client_state = "connected";
765
} else {
766
// For non-anonymous tables, the client
767
// has to actually be signed in.
768
client_state = "signed_in";
769
}
770
771
if (this.client[`is_${client_state}`]()) {
772
dbg("state already achieved -- no need to wait");
773
return;
774
}
775
776
await once(this.client, client_state);
777
dbg(`success -- client emited ${client_state}`);
778
}
779
780
private changefeed_options() {
781
return {
782
do_query: query_function(this.client.query, this.table),
783
query_cancel: this.client.query_cancel.bind(this.client),
784
options: this.options,
785
query: this.query,
786
table: this.table,
787
};
788
}
789
790
private init_changefeed_handlers(): void {
791
if (this.changefeed == null) return;
792
this.changefeed.on("update", this.changefeed_on_update);
793
this.changefeed.on("close", this.changefeed_on_close);
794
}
795
796
private remove_changefeed_handlers(): void {
797
if (this.changefeed == null) return;
798
this.changefeed.removeListener("update", this.changefeed_on_update);
799
this.changefeed.removeListener("close", this.changefeed_on_close);
800
}
801
802
private changefeed_on_update(change): void {
803
this.update_change(change);
804
}
805
806
private changefeed_on_close(): void {
807
this.set_state("disconnected");
808
this.create_changefeed();
809
}
810
811
private disconnected(why: string): void {
812
const dbg = this.dbg("disconnected");
813
dbg(`why=${why}`);
814
if (this.state === "disconnected") {
815
dbg("already disconnected");
816
return;
817
}
818
this.set_state("disconnected");
819
}
820
821
private obj_to_key(_): string | undefined {
822
// Return string key used in the immutable map in
823
// which this table is stored.
824
throw Error("this.obj_to_key must be set during initialization");
825
}
826
827
private init_query(): void {
828
// Check that the query is probably valid, and
829
// record the table and schema
830
const tables = keys(this.query);
831
if (len(tables) !== 1) {
832
throw Error("must query only a single table");
833
}
834
this.table = tables[0];
835
this.schema = schema.SCHEMA[this.table];
836
if (this.schema == null) {
837
throw Error(`unknown schema for table ${this.table}`);
838
}
839
if (this.client.is_project()) {
840
this.client_query = this.schema.project_query;
841
} else {
842
this.client_query = this.schema.user_query;
843
}
844
if (this.client_query == null) {
845
throw Error(`no query schema allowing queries to ${this.table}`);
846
}
847
if (!is_array(this.query[this.table])) {
848
throw Error("must be a multi-document query");
849
}
850
this.primary_keys = schema.client_db.primary_keys(this.table);
851
// Check that all primary keys are in the query.
852
for (const primary_key of this.primary_keys) {
853
if (this.query[this.table][0][primary_key] === undefined) {
854
throw Error(
855
`must include each primary key in query of table '${this.table}', but you missed '${primary_key}'`,
856
);
857
}
858
}
859
// Check that all keys in the query are allowed by the schema.
860
for (const query_key of keys(this.query[this.table][0])) {
861
if (this.client_query.get.fields[query_key] === undefined) {
862
throw Error(
863
`every key in query of table '${this.table}' must` +
864
` be a valid user get field in the schema but '${query_key}' is not`,
865
);
866
}
867
}
868
869
// Function this.to_key to extract primary key from object
870
if (this.primary_keys.length === 1) {
871
// very common case
872
const pk = this.primary_keys[0];
873
this.obj_to_key = (obj) => {
874
if (obj == null) {
875
return;
876
}
877
if (Map.isMap(obj)) {
878
return to_key(obj.get(pk));
879
} else {
880
return to_key(obj[pk]);
881
}
882
};
883
} else {
884
// compound primary key
885
this.obj_to_key = (obj) => {
886
if (obj == null) {
887
return;
888
}
889
const v: any[] = [];
890
if (Map.isMap(obj)) {
891
for (const pk of this.primary_keys) {
892
const a = obj.get(pk);
893
if (a == null) {
894
return;
895
}
896
v.push(a);
897
}
898
} else {
899
for (const pk of this.primary_keys) {
900
const a = obj[pk];
901
if (a == null) {
902
return;
903
}
904
v.push(a);
905
}
906
}
907
return to_key(v);
908
};
909
}
910
911
if (this.client_query != null && this.client_query.set != null) {
912
// Initialize set_fields and required_set_fields.
913
const set = this.client_query.set;
914
for (const field of keys(this.query[this.table][0])) {
915
if (set.fields != null && set.fields[field]) {
916
this.set_fields.push(field);
917
}
918
if (set.required_fields != null && set.required_fields[field]) {
919
this.required_set_fields[field] = true;
920
}
921
}
922
}
923
}
924
925
/* Send all unsent changes.
926
This function must not be called more than once at a time.
927
Returns boolean:
928
false -- there are no additional changes to be saved
929
true -- new changes may have appeared during the _save that
930
need to be saved.
931
932
If writing to the database results in an error (but not due to no network),
933
then an error state is set (which client can consult), an even is emitted,
934
and we do not try to write to the database again until that error
935
state is cleared. One way it can be cleared is by changing the table.
936
*/
937
private async _save(): Promise<boolean> {
938
//console.log("_save");
939
const dbg = this.dbg("_save");
940
dbg();
941
if (this.get_state() == "closed") return false;
942
if (this.client_query.set == null) {
943
// Nothing to do -- can never set anything for this table.
944
// There are some tables (e.g., stats) where the remote values
945
// could change while user is offline, and the code below would
946
// result in warnings.
947
return false;
948
}
949
//console.log("_save", this.table);
950
dbg("waiting for network");
951
await this.wait_until_ready_to_query_db();
952
if (this.get_state() == "closed") return false;
953
dbg("waiting for value");
954
await this.wait_until_value();
955
if (this.get_state() == "closed") return false;
956
if (len(this.changes) === 0) return false;
957
if (this.value == null) {
958
throw Error("value must not be null");
959
}
960
961
// Send our changes to the server.
962
const query: any[] = [];
963
const timed_changes: TimedChange[] = [];
964
const proposed_keys: { [key: string]: boolean } = {};
965
const changes = copy(this.changes);
966
//console.log("_save: send ", changes);
967
for (const key in this.changes) {
968
if (this.versions[key] === 0) {
969
proposed_keys[key] = true;
970
}
971
const x = this.value.get(key);
972
if (x == null) {
973
throw Error("delete is not implemented");
974
}
975
const obj = x.toJS();
976
977
if (!this.no_db_set) {
978
// qobj is the db query version of obj, or at least the part
979
// of it that expresses what changed.
980
const qobj = {};
981
// Set the primary key part:
982
if (this.primary_keys.length === 1) {
983
qobj[this.primary_keys[0]] = key;
984
} else {
985
// unwrap compound primary key
986
const v = JSON.parse(key);
987
let i = 0;
988
for (const primary_key of this.primary_keys) {
989
qobj[primary_key] = v[i];
990
i += 1;
991
}
992
}
993
// Can only send set_field sets to the database. Of these,
994
// only send what actually changed.
995
const prev = this.last_save.get(key);
996
for (const k of this.set_fields) {
997
if (!x.has(k)) continue;
998
if (prev == null) {
999
qobj[k] = obj[k];
1000
continue;
1001
}
1002
1003
// Convert to List to get a clean way to *compare* no
1004
// matter whether they are immutable.js objects or not!
1005
const a = List([x.get(k)]);
1006
const b = List([prev.get(k)]);
1007
if (!a.equals(b)) {
1008
qobj[k] = obj[k];
1009
}
1010
}
1011
1012
for (const k in this.required_set_fields) {
1013
if (qobj[k] == null) {
1014
qobj[k] = obj[k];
1015
}
1016
}
1017
1018
query.push({ [this.table]: qobj });
1019
}
1020
timed_changes.push({ obj, time: this.changes[key] });
1021
}
1022
dbg("sending timed-changes", timed_changes);
1023
this.emit("timed-changes", timed_changes);
1024
1025
if (!this.no_db_set) {
1026
try {
1027
const value = this.value;
1028
dbg("doing database query");
1029
await callback2(this.client.query, {
1030
query,
1031
options: [{ set: true }], // force it to be a set query
1032
timeout: 120, // give it some time (especially if it is long)
1033
});
1034
this.last_save = value; // success -- don't have to save this stuff anymore...
1035
} catch (err) {
1036
this.setError(err, query);
1037
dbg("db query failed", err);
1038
if (is_fatal(err.toString())) {
1039
console.warn("FATAL doing set", this.table, err);
1040
this.close(true);
1041
throw err;
1042
}
1043
// NOTE: we do not show entire log since the number
1044
// of entries in the query can be very large and just
1045
// converting them all to text could use a lot of memory (?).
1046
console.warn(
1047
`_save('${this.table}') set query error:`,
1048
err,
1049
" queries: ",
1050
query[0],
1051
"...",
1052
query.length - 1,
1053
" omitted",
1054
);
1055
return true;
1056
}
1057
}
1058
1059
if (this.get_state() == "closed") return false;
1060
if (this.value == null) {
1061
// should not happen
1062
return false;
1063
}
1064
1065
if (this.no_db_set) {
1066
// Not using changefeeds, so have to depend on other mechanisms
1067
// to update state. Wait until changes to proposed keys are
1068
// acknowledged by their version being assigned.
1069
try {
1070
dbg("waiting until versions are updated");
1071
await this.wait_until_versions_are_updated(proposed_keys, 5000);
1072
} catch (err) {
1073
dbg("waiting for versions timed out / failed");
1074
// took too long -- try again to send and receive changes.
1075
return true;
1076
}
1077
}
1078
1079
dbg("Record that we successfully sent these changes");
1080
for (const key in changes) {
1081
if (changes[key] == this.changes[key]) {
1082
delete this.changes[key];
1083
}
1084
}
1085
this.update_has_uncommitted_changes();
1086
1087
const is_done = len(this.changes) === 0;
1088
dbg("done? ", is_done);
1089
return !is_done;
1090
}
1091
1092
private setError(error: string, query: Query): void {
1093
this.error = { error, query };
1094
this.emit("error", this.error);
1095
}
1096
1097
public clearError(): void {
1098
this.error = undefined;
1099
this.emit("clear-error");
1100
}
1101
1102
private async wait_until_versions_are_updated(
1103
proposed_keys: { [key: string]: boolean },
1104
timeout_ms: number,
1105
): Promise<void> {
1106
const start_ms = Date.now();
1107
while (len(proposed_keys) > 0) {
1108
for (const key in proposed_keys) {
1109
if (this.versions[key] > 0) {
1110
delete proposed_keys[key];
1111
}
1112
}
1113
if (len(proposed_keys) > 0) {
1114
const elapsed_ms = Date.now() - start_ms;
1115
const keys: string[] = await once(
1116
this,
1117
"increased-versions",
1118
timeout_ms - elapsed_ms,
1119
);
1120
for (const key of keys) {
1121
delete proposed_keys[key];
1122
}
1123
}
1124
}
1125
}
1126
1127
// Return modified immutable Map, with all types coerced to be
1128
// as specified in the schema, if possible, or throw an exception.
1129
private do_coerce_types(
1130
changes: Map<string | number, any>,
1131
): Map<string | number, any> {
1132
if (!Map.isMap(changes)) {
1133
changes = Map(changes);
1134
}
1135
if (!this.coerce_types) {
1136
// no-op if coerce_types isn't set.
1137
return changes;
1138
}
1139
const t = schema.SCHEMA[this.table];
1140
if (t == null) {
1141
throw Error(`Missing schema for table ${this.table}`);
1142
}
1143
const fields = copy(t.fields);
1144
if (fields == null) {
1145
throw Error(`Missing fields part of schema for table ${this.table}`);
1146
}
1147
let specs;
1148
if (t.virtual != null) {
1149
if (t.virtual === true) {
1150
throw Error(`t.virtual can't be true for ${this.table}`);
1151
}
1152
const x = schema.SCHEMA[t.virtual];
1153
if (x == null) {
1154
throw Error(`invalid virtual table spec for ${this.table}`);
1155
}
1156
specs = copy(x.fields);
1157
if (specs == null) {
1158
throw Error(`invalid virtual table spec for ${this.table}`);
1159
}
1160
} else {
1161
specs = fields;
1162
}
1163
1164
if (typeof this.query != "string") {
1165
// explicit query (not just from schema)
1166
let x = this.query[this.table];
1167
if (is_array(x)) {
1168
x = x[0];
1169
}
1170
for (const k in fields) {
1171
if (x[k] === undefined) {
1172
delete fields[k];
1173
}
1174
}
1175
}
1176
return Map(
1177
changes.map((value, field) => {
1178
if (typeof field !== "string") {
1179
// satisfy typescript.
1180
return;
1181
}
1182
if (value == null) {
1183
// do not coerce null types
1184
return value;
1185
}
1186
if (fields[field] == null) {
1187
//console.warn(changes, fields);
1188
throw Error(
1189
`Cannot coerce: no field '${field}' in table '${this.table}'`,
1190
);
1191
}
1192
const spec = specs[field];
1193
let desired: string | undefined = spec.type || spec.pg_type;
1194
if (desired == null) {
1195
throw Error(`Cannot coerce: no type info for field ${field}`);
1196
}
1197
desired = desired.toLowerCase();
1198
1199
const actual = typeof value;
1200
if (desired === actual) {
1201
return value;
1202
}
1203
1204
// We can add more or less later...
1205
if (desired === "string" || desired.slice(0, 4) === "char") {
1206
if (actual !== "string") {
1207
// ensure is a string
1208
return `${value}`;
1209
}
1210
return value;
1211
}
1212
if (desired === "timestamp") {
1213
if (!(value instanceof Date)) {
1214
// make it a Date object. (usually converting from string rep)
1215
return new Date(value);
1216
}
1217
return value;
1218
}
1219
if (desired === "integer") {
1220
// always fine to do this -- will round floats, fix strings, etc.
1221
return parseInt(value);
1222
}
1223
if (desired === "number") {
1224
// actual wasn't number, so parse:
1225
return parseFloat(value);
1226
}
1227
if (desired === "array") {
1228
if (!List.isList(value)) {
1229
value = fromJS(value);
1230
if (!List.isList(value)) {
1231
throw Error(
1232
`field ${field} of table ${this.table} (value=${changes.get(
1233
field,
1234
)}) must convert to an immutable.js List`,
1235
);
1236
}
1237
}
1238
return value;
1239
}
1240
if (desired === "map") {
1241
if (!Map.isMap(value)) {
1242
value = Map(value);
1243
if (!Map.isMap(value)) {
1244
throw Error(
1245
`field ${field} of table ${this.table} (value=${changes.get(
1246
field,
1247
)}) must convert to an immutable.js Map`,
1248
);
1249
}
1250
}
1251
return value;
1252
}
1253
if (desired === "boolean") {
1254
// actual wasn't boolean, so coerce.
1255
return !!value;
1256
}
1257
if (desired === "uuid") {
1258
assert_uuid(value);
1259
return value;
1260
}
1261
return value;
1262
}),
1263
);
1264
}
1265
1266
/*
1267
Handle an update of all records from the database.
1268
This happens on initialization, and also if we
1269
disconnect and reconnect.
1270
*/
1271
private update_all(v: any[]): any[] {
1272
//const dbg = this.dbg("update_all");
1273
1274
if (this.state === "closed") {
1275
// nothing to do -- just ignore updates from db
1276
throw Error("makes no sense to do update_all when state is closed.");
1277
}
1278
1279
this.emit("before-change");
1280
// Restructure the array of records in v as a mapping
1281
// from the primary key to the corresponding record.
1282
const x = {};
1283
for (const y of v) {
1284
const key = this.obj_to_key(y);
1285
if (key != null) {
1286
x[key] = y;
1287
// initialize all version numbers
1288
this.versions[key] = this.initial_version;
1289
}
1290
}
1291
const changed_keys = keys(x); // of course all keys have been changed.
1292
this.emit("increased-versions", changed_keys);
1293
1294
this.value = fromJS(x);
1295
if (this.value == null) {
1296
throw Error("bug");
1297
}
1298
this.last_save = this.value;
1299
if (this.coerce_types) {
1300
// Ensure all values are properly coerced, as specified
1301
// in the database schema. This is important, e.g., since
1302
// when mocking the client db query, JSON is involved and
1303
// timestamps are not parsed to Date objects.
1304
this.value = <Map<string, Map<string, any>>>this.value.map((val, _) => {
1305
if (val == null) {
1306
throw Error("val must not be null");
1307
}
1308
return this.do_coerce_types(val);
1309
});
1310
}
1311
1312
// It's possibly that nothing changed (e.g., typical case
1313
// on reconnect!) so we check.
1314
// If something really did change, we set the server
1315
// state to what we just got, and
1316
// also inform listeners of which records changed (by giving keys).
1317
//console.log("update_all: changed_keys=", changed_keys)
1318
if (this.state === "connected") {
1319
// When not yet connected, initial change is emitted
1320
// by function that sets up the changefeed. We are
1321
// connected here, so we are responsible for emitting
1322
// this change.
1323
this.emit_change(changed_keys);
1324
}
1325
1326
this.emit("init-value-server");
1327
return changed_keys;
1328
}
1329
1330
public initial_version_for_browser_client(): VersionedChange[] {
1331
if (this.value == null) {
1332
throw Error("value must not be null");
1333
}
1334
const x: VersionedChange[] = [];
1335
this.value.forEach((val, key) => {
1336
if (val == null) {
1337
throw Error("val must be non-null");
1338
}
1339
const obj = val.toJS();
1340
if (obj == null) {
1341
throw Error("obj must be non-null");
1342
}
1343
if (key == null) {
1344
throw Error("key must not be null");
1345
}
1346
const version = this.versions[key];
1347
if (version == null) {
1348
throw Error("version must not be null");
1349
}
1350
1351
x.push({ obj, version });
1352
});
1353
return x;
1354
}
1355
1356
public init_browser_client(changes: VersionedChange[]): void {
1357
const dbg = this.dbg("init_browser_client");
1358
dbg(`applying ${changes.length} versioned changes`);
1359
// The value before doing init (which happens precisely when project
1360
// synctable is reset). See note below.
1361
const before = this.value;
1362
const received_keys = this.apply_changes_to_browser_client(changes);
1363
if (before != null) {
1364
before.forEach((_, key) => {
1365
if (key == null || received_keys[key]) return; // received as part of init
1366
if (this.changes[key] && this.versions[key] == 0) return; // not event sent yet
1367
// This key was known and confirmed sent before init, but
1368
// didn't get sent back this time. So it was lost somehow,
1369
// e.g., due to not getting saved to the database and the project
1370
// (or table in the project) getting restarted.
1371
dbg(`found lost: key=${key}`);
1372
// So we will try to send out it again.
1373
if (!this.changes[key]) {
1374
this.changes[key] = this.unique_server_time();
1375
this.update_has_uncommitted_changes();
1376
}
1377
// So we don't view it as having any known version
1378
// assigned by project, since the project lost it.
1379
this.null_version(key);
1380
});
1381
if (len(this.changes) > 0) {
1382
this.save(); // kick off a save of our unsaved lost work back to the project.
1383
}
1384
}
1385
/*
1386
NOTE: The problem solved here is the following. Imagine the project
1387
synctable is killed, and it has acknowledge a change C from a
1388
web browser client, but has NOT saved that change to the central
1389
postgreSQL database (or someday, maybe a local SQLite database).
1390
Then when the project synctable is resurrected, it uses the database
1391
for its initial state, and it knows nothing about C. The
1392
browser thinks that C has been successfully written and broadcast
1393
to everybody, so the browser doesn't send C again. The result is
1394
that the browser and the project would be forever out of sync.
1395
Note that we only care about lost changes that some browser knows
1396
about -- if no browser knows about them, then the fact they are
1397
lost won't break sync. Also, for file editing, data is regularly
1398
saved to disk, so if the browser sends a change that is lost due to
1399
the project being killed before writing to the database, then the
1400
browser terminates too, then that change is completely lost. However,
1401
everybody will start again with at least the last version of the file
1402
**saved to disk,** which is basically what people may expect as a
1403
worst case.
1404
1405
The solution to the above problem is to look at what key:value pairs
1406
we know about that the project didn't just send back to us. If there
1407
are any that were reported as committed, but they vanished, then we
1408
set them as unsent and send them again.
1409
*/
1410
}
1411
1412
public apply_changes_to_browser_client(changes: VersionedChange[]): {
1413
[key: string]: boolean;
1414
} {
1415
const dbg = this.dbg("apply_changes_to_browser_client");
1416
dbg("got ", changes.length, "changes");
1417
this.assert_not_closed("apply_changes_to_browser_client");
1418
if (this.value == null) {
1419
// initializing the synctable for the first time.
1420
this.value = Map();
1421
}
1422
1423
this.emit("before-change");
1424
const changed_keys: string[] = [];
1425
const increased_versions: string[] = [];
1426
const received_keys: { [key: string]: boolean } = {};
1427
for (const change of changes) {
1428
const { obj, version } = change;
1429
const new_val = this.do_coerce_types(fromJS(obj));
1430
const key = this.obj_to_key(new_val);
1431
if (key == null) {
1432
throw Error("object results in null key");
1433
}
1434
received_keys[key] = true;
1435
const cur_version = this.versions[key] ? this.versions[key] : 0;
1436
if (cur_version > version) {
1437
// nothing further to do.
1438
continue;
1439
}
1440
if (this.handle_new_val(new_val, undefined, "insert", false)) {
1441
// really did make a change.
1442
changed_keys.push(key);
1443
}
1444
// Update our version number to the newer version.
1445
this.versions[key] = version;
1446
increased_versions.push(key);
1447
}
1448
1449
if (increased_versions.length > 0) {
1450
this.emit("increased-versions", increased_versions);
1451
}
1452
1453
if (changed_keys.length > 0) {
1454
this.emit_change(changed_keys);
1455
}
1456
return received_keys;
1457
}
1458
1459
public apply_changes_from_browser_client(changes: TimedChange[]): void {
1460
const dbg = this.dbg("apply_changes_from_browser_client");
1461
dbg("project <-- changes -- client", JSON.stringify(changes));
1462
const changed_keys: string[] = [];
1463
const versioned_changes: VersionedChange[] = [];
1464
for (const change of changes) {
1465
const { obj, time } = change;
1466
if (obj == null) {
1467
throw Error("obj must not be null");
1468
}
1469
const new_val = this.do_coerce_types(fromJS(obj));
1470
const key = this.obj_to_key(new_val); // must have been coerced!
1471
if (key == null) {
1472
throw Error("object results in null key");
1473
}
1474
const cur_time = this.changes[key];
1475
if (cur_time != null && cur_time > time) {
1476
dbg("already have a more recent version");
1477
// We already have a more recent update to this object.
1478
// We push that new version out again, just in case.
1479
if (this.value == null) {
1480
throw Error("value must not be null");
1481
}
1482
let obj: any = this.value.get(key);
1483
if (obj == null) {
1484
throw Error(`there must be an object in this.value with key ${key}`);
1485
}
1486
obj = obj.toJS();
1487
const version = this.versions[key];
1488
if (version == null) {
1489
throw Error(`object with key ${key} must have a version`);
1490
}
1491
versioned_changes.push({ obj, version });
1492
continue;
1493
}
1494
if (this.handle_new_val(new_val, undefined, "insert", false)) {
1495
const version = this.increment_version(key);
1496
this.changes[key] = time;
1497
this.update_has_uncommitted_changes();
1498
versioned_changes.push({ obj: new_val.toJS(), version });
1499
changed_keys.push(key);
1500
}
1501
}
1502
if (changed_keys.length > 0) {
1503
this.emit_change(changed_keys);
1504
}
1505
if (versioned_changes.length > 0) {
1506
this.emit("versioned-changes", versioned_changes);
1507
}
1508
dbg("project -- versioned --> clients", JSON.stringify(versioned_changes));
1509
}
1510
1511
private increment_version(key: string): number {
1512
if (this.versions[key] == null) {
1513
this.versions[key] = this.initial_version;
1514
} else {
1515
this.versions[key] += 1;
1516
}
1517
this.emit("increased-versions", [key]);
1518
return this.versions[key];
1519
}
1520
1521
private null_version(key: string): void {
1522
this.versions[key] = 0;
1523
}
1524
1525
/*
1526
Apply one incoming change from the database to the
1527
in-memory table.
1528
*/
1529
private update_change(change): void {
1530
if (this.state === "closed") {
1531
// We might get a few more updates even after
1532
// canceling the changefeed, so we just ignore them.
1533
return;
1534
}
1535
if (this.value == null) {
1536
console.warn(`update_change(${this.table}): ignored`);
1537
return;
1538
}
1539
this.emit("before-change");
1540
const changed_keys: string[] = [];
1541
const key = this.handle_new_val(
1542
change.new_val,
1543
change.old_val,
1544
change.action,
1545
this.coerce_types,
1546
);
1547
if (key != null) {
1548
changed_keys.push(key);
1549
}
1550
1551
//console.log("update_change: changed_keys=", changed_keys)
1552
if (changed_keys.length > 0) {
1553
//console.log("_update_change: change")
1554
this.emit_change(changed_keys);
1555
}
1556
}
1557
1558
// Returns current time (in ms since epoch) on server,
1559
// but if there are multiple requests at the same time,
1560
// the clock is artificially incremented to ensure uniqueness.
1561
// Also, this time is thus always strictly increasing.
1562
private unique_server_time(): number {
1563
let tm = this.client.server_time().valueOf();
1564
if (tm <= this.last_server_time) {
1565
tm = this.last_server_time + 1;
1566
}
1567
this.last_server_time = tm;
1568
return tm;
1569
}
1570
1571
// - returns key only if obj actually changed things.
1572
private handle_new_val(
1573
new_val: any,
1574
old_val: any,
1575
action: string,
1576
coerce: boolean,
1577
): string | undefined {
1578
if (this.value == null) {
1579
// to satisfy typescript.
1580
throw Error("value must be initialized");
1581
}
1582
1583
if (action === "delete") {
1584
old_val = fromJS(old_val);
1585
if (old_val == null) {
1586
throw Error("old_val must not be null for delete action");
1587
}
1588
if (coerce && this.coerce_types) {
1589
old_val = this.do_coerce_types(old_val);
1590
}
1591
const key = this.obj_to_key(old_val);
1592
if (key == null || !this.value.has(key)) {
1593
return; // already gone
1594
}
1595
this.value = this.value.delete(key);
1596
return key;
1597
}
1598
1599
new_val = fromJS(new_val);
1600
if (new_val == null) {
1601
throw Error("new_val must not be null for insert or update action");
1602
}
1603
if (coerce && this.coerce_types) {
1604
new_val = this.do_coerce_types(new_val);
1605
}
1606
const key = this.obj_to_key(new_val);
1607
if (key == null) {
1608
// This means the primary key is null or missing, which
1609
// shouldn't happen. Maybe it could in some edge case.
1610
// For now, we shouldn't let this break everything, so:
1611
return undefined;
1612
// throw Error("key must not be null");
1613
}
1614
const cur_val = this.value.get(key);
1615
if (action === "update" && cur_val != null) {
1616
// For update actions, we shallow *merge* in the change.
1617
// For insert action, we just replace the whole thing.
1618
new_val = cur_val.merge(new_val);
1619
}
1620
if (!new_val.equals(cur_val)) {
1621
this.value = this.value.set(key, new_val);
1622
return key;
1623
}
1624
return undefined;
1625
}
1626
1627
/*
1628
obj is an immutable.js Map without the primary key
1629
set. If the database schema defines a way to compute
1630
the primary key from other keys, try to use it here.
1631
This function returns the computed primary key (array or string)
1632
if it works, and returns undefined otherwise.
1633
*/
1634
private computed_primary_key(obj): string[] | string | undefined {
1635
let f;
1636
if (this.primary_keys.length === 1) {
1637
f = this.client_query.set.fields[this.primary_keys[0]];
1638
if (typeof f === "function") {
1639
return f(obj.toJS(), schema.client_db);
1640
} else {
1641
return;
1642
}
1643
} else {
1644
const v: string[] = [];
1645
for (const pk of this.primary_keys) {
1646
f = this.client_query.set.fields[pk];
1647
if (typeof f === "function") {
1648
v.push(f(obj.toJS(), schema.client_db));
1649
} else {
1650
return;
1651
}
1652
}
1653
return v;
1654
}
1655
}
1656
1657
private assert_not_closed(desc: string): void {
1658
if (this.state === "closed") {
1659
//console.trace();
1660
throw Error(
1661
`the synctable "${this.table}" must not be closed -- ${desc}`,
1662
);
1663
}
1664
}
1665
1666
// **WARNING:** Right now this *barely* works at all... due to
1667
// barely being implemented since I mostly haven't needed it.
1668
// It will delete the object from the database, but if some
1669
// client still has the object, they can end up just writing
1670
// it back.
1671
public async delete(obj): Promise<void> {
1672
// Table spec must have set.delete = true.
1673
// This function does a direct database query to delete
1674
// the entry with primary key described by obj from
1675
// the database. That will have the side effect slightly
1676
// later of removing the object from this table. This
1677
// thus works differently than making changes or
1678
// creating new entries, at least right now (since
1679
// implementing this properly is a lot of work but
1680
// not used much).
1681
1682
const query = { [this.table]: obj };
1683
const options = [{ delete: true }];
1684
await callback2(this.client.query, { query, options });
1685
}
1686
}
1687
1688