Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/sync/editor/generic/sync-doc.ts
5700 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020-2025 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
SyncDoc -- the core class for editing with a synchronized document.
8
9
This code supports both string-doc and db-doc, for editing both
10
strings and small database tables efficiently, with history,
11
undo, save to disk, etc.
12
13
This code is run *both* in browser clients and under node.js
14
in projects, and behaves slightly differently in each case.
15
16
EVENTS:
17
18
- before-change: fired before merging in changes from upstream
19
- ... TODO
20
*/
21
22
const USE_CONAT = true;
23
24
/* OFFLINE_THRESH_S - If the client becomes disconnected from
25
the backend for more than this long then---on reconnect---do
26
extra work to ensure that all snapshots are up to date (in
27
case snapshots were made when we were offline), and mark the
28
sent field of patches that weren't saved. I.e., we rebase
29
all offline changes. */
30
// const OFFLINE_THRESH_S = 5 * 60; // 5 minutes.
31
32
/* How often the local hub will autosave this file to disk if
33
it has it open and there are unsaved changes. This is very
34
important since it ensures that a user that edits a file but
35
doesn't click "Save" and closes their browser (right after
36
their edits have gone to the database), still has their
37
file saved to disk soon. This is important, e.g., for homework
38
getting collected and not missing the last few changes. It turns
39
out this is what people expect.
40
Set to 0 to disable. (But don't do that.) */
41
const FILE_SERVER_AUTOSAVE_S = 45;
42
// const FILE_SERVER_AUTOSAVE_S = 5;
43
44
// How big of files we allow users to open using syncstrings.
45
const MAX_FILE_SIZE_MB = 32;
46
47
// How frequently to check if file is or is not read only.
48
// The filesystem watcher is NOT sufficient for this, because
49
// it is NOT triggered on permissions changes. Thus we must
50
// poll for read only status periodically, unfortunately.
51
const READ_ONLY_CHECK_INTERVAL_MS = 7500;
52
53
// This parameter determines throttling when broadcasting cursor position
54
// updates. Make this larger to reduce bandwidth at the expense of making
55
// cursors less responsive.
56
const CURSOR_THROTTLE_MS = 750;
57
58
// NATS is much faster and can handle load, and cursors only uses pub/sub
59
const CURSOR_THROTTLE_NATS_MS = 150;
60
61
// Ignore file changes for this long after save to disk.
62
const RECENT_SAVE_TO_DISK_MS = 2000;
63
64
const PARALLEL_INIT = true;
65
66
import {
67
COMPUTE_THRESH_MS,
68
COMPUTER_SERVER_CURSOR_TYPE,
69
decodeUUIDtoNum,
70
SYNCDB_PARAMS as COMPUTE_SERVE_MANAGER_SYNCDB_PARAMS,
71
} from "@cocalc/util/compute/manager";
72
73
import { DEFAULT_SNAPSHOT_INTERVAL } from "@cocalc/util/db-schema/syncstring-schema";
74
75
type XPatch = any;
76
77
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
78
import { SyncTable } from "@cocalc/sync/table/synctable";
79
import {
80
callback2,
81
cancel_scheduled,
82
once,
83
retry_until_success,
84
reuse_in_flight_methods,
85
until,
86
} from "@cocalc/util/async-utils";
87
import { wait } from "@cocalc/util/async-wait";
88
import {
89
auxFileToOriginal,
90
assertDefined,
91
close,
92
endswith,
93
field_cmp,
94
filename_extension,
95
hash_string,
96
keys,
97
minutes_ago,
98
} from "@cocalc/util/misc";
99
import * as schema from "@cocalc/util/schema";
100
import { delay } from "awaiting";
101
import { EventEmitter } from "events";
102
import { Map, fromJS } from "immutable";
103
import { debounce, throttle } from "lodash";
104
import { Evaluator } from "./evaluator";
105
import { HistoryEntry, HistoryExportOptions, export_history } from "./export";
106
import { IpywidgetsState } from "./ipywidgets-state";
107
import { SortedPatchList } from "./sorted-patch-list";
108
import type {
109
Client,
110
CompressedPatch,
111
CursorMap,
112
DocType,
113
Document,
114
FileWatcher,
115
Patch,
116
} from "./types";
117
import { isTestClient, patch_cmp } from "./util";
118
import { CONAT_OPEN_FILE_TOUCH_INTERVAL } from "@cocalc/util/conat";
119
import mergeDeep from "@cocalc/util/immutable-deep-merge";
120
import { JUPYTER_SYNCDB_EXTENSIONS } from "@cocalc/util/jupyter/names";
121
import { LegacyHistory } from "./legacy";
122
import { getLogger } from "@cocalc/conat/client";
123
124
const DEBUG = false;
125
126
export type State = "init" | "ready" | "closed";
127
export type DataServer = "project" | "database";
128
129
export interface SyncOpts0 {
130
project_id: string;
131
path: string;
132
client: Client;
133
patch_interval?: number;
134
135
// file_use_interval defaults to 60000.
136
// Specify 0 to disable.
137
file_use_interval?: number;
138
139
string_id?: string;
140
cursors?: boolean;
141
change_throttle?: number;
142
143
// persistent backend session in project, so only close
144
// backend when explicitly requested:
145
persistent?: boolean;
146
147
// If true, entire sync-doc is assumed ephemeral, in the
148
// sense that no edit history gets saved via patches to
149
// the database. The one syncstring record for coordinating
150
// users does get created in the database.
151
ephemeral?: boolean;
152
153
// which data/changefeed server to use
154
data_server?: DataServer;
155
}
156
157
export interface SyncOpts extends SyncOpts0 {
158
from_str: (str: string) => Document;
159
doctype: DocType;
160
}
161
162
export interface UndoState {
163
my_times: number[];
164
pointer: number;
165
without: number[];
166
final?: CompressedPatch;
167
}
168
169
// NOTE: Do not make multiple SyncDoc's for the same document, especially
170
// not on the frontend.
171
172
const logger = getLogger("sync-doc");
173
logger.debug("init");
174
175
export class SyncDoc extends EventEmitter {
176
public readonly project_id: string; // project_id that contains the doc
177
public readonly path: string; // path of the file corresponding to the doc
178
private string_id: string;
179
private my_user_id: number;
180
181
private client: Client;
182
private _from_str: (str: string) => Document; // creates a doc from a string.
183
184
// Throttling of incoming upstream patches from project to client.
185
private patch_interval: number = 250;
186
187
// This is what's actually output by setInterval -- it's
188
// not an amount of time.
189
private fileserver_autosave_timer: number = 0;
190
191
private read_only_timer: number = 0;
192
193
// throttling of change events -- e.g., is useful for course
194
// editor where we have hundreds of changes and the UI gets
195
// overloaded unless we throttle and group them.
196
private change_throttle: number = 0;
197
198
// file_use_interval throttle: default is 60s for everything
199
private file_use_interval: number;
200
private throttled_file_use?: Function;
201
202
private cursors: boolean = false; // if true, also provide cursor tracking functionality
203
private cursor_map: CursorMap = Map() as CursorMap;
204
private cursor_last_time: Date = new Date(0);
205
206
// doctype: object describing document constructor
207
// (used by project to open file)
208
private doctype: DocType;
209
210
private state: State = "init";
211
212
private syncstring_table: SyncTable;
213
private patches_table: SyncTable;
214
private cursors_table: SyncTable;
215
216
public evaluator?: Evaluator;
217
218
public ipywidgets_state?: IpywidgetsState;
219
220
private patch_list?: SortedPatchList;
221
222
private last: Document;
223
private doc: Document;
224
private before_change?: Document;
225
226
private last_user_change: Date = minutes_ago(60);
227
private last_save_to_disk_time: Date = new Date(0);
228
229
private last_snapshot?: number;
230
private last_seq?: number;
231
private snapshot_interval: number;
232
233
private users: string[];
234
235
private settings: Map<string, any> = Map();
236
237
private syncstring_save_state: string = "";
238
239
// patches that this client made during this editing session.
240
private my_patches: { [time: string]: XPatch } = {};
241
242
private watch_path?: string;
243
private file_watcher?: FileWatcher;
244
245
private handle_patch_update_queue_running: boolean;
246
private patch_update_queue: string[] = [];
247
248
private undo_state: UndoState | undefined;
249
250
private save_to_disk_start_ctime: number | undefined;
251
private save_to_disk_end_ctime: number | undefined;
252
253
private persistent: boolean = false;
254
255
private last_has_unsaved_changes?: boolean = undefined;
256
257
private ephemeral: boolean = false;
258
259
private sync_is_disabled: boolean = false;
260
private delay_sync_timer: any;
261
262
// static because we want exactly one across all docs!
263
private static computeServerManagerDoc?: SyncDoc;
264
265
private useConat: boolean;
266
legacy: LegacyHistory;
267
268
constructor(opts: SyncOpts) {
269
super();
270
if (opts.string_id === undefined) {
271
this.string_id = schema.client_db.sha1(opts.project_id, opts.path);
272
} else {
273
this.string_id = opts.string_id;
274
}
275
276
for (const field of [
277
"project_id",
278
"path",
279
"client",
280
"patch_interval",
281
"file_use_interval",
282
"change_throttle",
283
"cursors",
284
"doctype",
285
"from_patch_str",
286
"persistent",
287
"data_server",
288
"ephemeral",
289
]) {
290
if (opts[field] != undefined) {
291
this[field] = opts[field];
292
}
293
}
294
295
this.legacy = new LegacyHistory({
296
project_id: this.project_id,
297
path: this.path,
298
client: this.client,
299
});
300
301
// NOTE: Do not use conat in test mode, since there we use a minimal
302
// "fake" client that does all communication internally and doesn't
303
// use conat. We also use this for the messages composer.
304
this.useConat = USE_CONAT && !isTestClient(opts.client);
305
if (this.ephemeral) {
306
// So the doctype written to the database reflects the
307
// ephemeral state. Here ephemeral determines whether
308
// or not patches are written to the database by the
309
// project.
310
this.doctype.opts = { ...this.doctype.opts, ephemeral: true };
311
}
312
if (this.cursors) {
313
// similarly to ephemeral, but for cursors. We track them
314
// on the backend since they can also be very useful, e.g.,
315
// with jupyter they are used for connecting remote compute,
316
// and **should** also be used for broadcasting load and other
317
// status information (TODO).
318
this.doctype.opts = { ...this.doctype.opts, cursors: true };
319
}
320
this._from_str = opts.from_str;
321
322
// Initialize to time when we create the syncstring, so we don't
323
// see our own cursor when we refresh the browser (before we move
324
// to update this).
325
this.cursor_last_time = this.client?.server_time();
326
327
reuse_in_flight_methods(this, [
328
"save",
329
"save_to_disk",
330
"load_from_disk",
331
"handle_patch_update_queue",
332
]);
333
334
if (this.change_throttle) {
335
this.emit_change = throttle(this.emit_change, this.change_throttle);
336
}
337
338
this.setMaxListeners(100);
339
340
this.init();
341
}
342
343
/*
344
Initialize everything.
345
This should be called *exactly* once by the constructor,
346
and no other time. It tries to set everything up. If
347
the browser isn't connected to the network, it'll wait
348
until it is (however long, etc.). If this fails, it closes
349
this SyncDoc.
350
*/
351
private initialized = false;
352
private init = async () => {
353
if (this.initialized) {
354
throw Error("init can only be called once");
355
}
356
// const start = Date.now();
357
this.assert_not_closed("init");
358
const log = this.dbg("init");
359
await until(
360
async () => {
361
if (this.state != "init") {
362
return true;
363
}
364
try {
365
log("initializing all tables...");
366
await this.initAll();
367
log("initAll succeeded");
368
return true;
369
} catch (err) {
370
if (this.isClosed()) {
371
return true;
372
}
373
const m = `WARNING: problem initializing ${this.path} -- ${err}`;
374
log(m);
375
if (DEBUG) {
376
console.trace(err);
377
}
378
// log always
379
console.log(m);
380
}
381
log("wait then try again");
382
return false;
383
},
384
{ start: 3000, max: 15000, decay: 1.3 },
385
);
386
387
// Success -- everything initialized with no issues.
388
this.set_state("ready");
389
this.init_watch();
390
this.emit_change(); // from nothing to something.
391
};
392
393
// True if this client is responsible for managing
394
// the state of this document with respect to
395
// the file system. By default, the project is responsible,
396
// but it could be something else (e.g., a compute server!). It's
397
// important that whatever algorithm determines this, it is
398
// a function of state that is eventually consistent.
399
// IMPORTANT: whether or not we are the file server can
400
// change over time, so if you call isFileServer and
401
// set something up (e.g., autosave or a watcher), based
402
// on the result, you need to clear it when the state
403
// changes. See the function handleComputeServerManagerChange.
404
private isFileServer = reuseInFlight(async () => {
405
if (this.state == "closed") return;
406
if (this.client == null || this.client.is_browser()) {
407
// browser is never the file server (yet), and doesn't need to do
408
// anything related to watching for changes in state.
409
// Someday via webassembly or browsers making users files availabl,
410
// etc., we will have this. Not today.
411
return false;
412
}
413
const computeServerManagerDoc = this.getComputeServerManagerDoc();
414
const log = this.dbg("isFileServer");
415
if (computeServerManagerDoc == null) {
416
log("not using compute server manager for this doc");
417
return this.client.is_project();
418
}
419
420
const state = computeServerManagerDoc.get_state();
421
log("compute server manager doc state: ", state);
422
if (state == "closed") {
423
log("compute server manager is closed");
424
// something really messed up
425
return this.client.is_project();
426
}
427
if (state != "ready") {
428
try {
429
log(
430
"waiting for compute server manager doc to be ready; current state=",
431
state,
432
);
433
await once(computeServerManagerDoc, "ready", 15000);
434
log("compute server manager is ready");
435
} catch (err) {
436
log(
437
"WARNING -- failed to initialize computeServerManagerDoc -- err=",
438
err,
439
);
440
return this.client.is_project();
441
}
442
}
443
444
// id of who the user *wants* to be the file server.
445
const path = this.getFileServerPath();
446
const fileServerId =
447
computeServerManagerDoc.get_one({ path })?.get("id") ?? 0;
448
if (this.client.is_project()) {
449
log(
450
"we are project, so we are fileserver if fileServerId=0 and it is ",
451
fileServerId,
452
);
453
return fileServerId == 0;
454
}
455
// at this point we have to be a compute server
456
const computeServerId = decodeUUIDtoNum(this.client.client_id());
457
// this is usually true -- but might not be if we are switching
458
// directly from one compute server to another.
459
log("we are compute server and ", { fileServerId, computeServerId });
460
return fileServerId == computeServerId;
461
});
462
463
private getFileServerPath = () => {
464
if (this.path?.endsWith("." + JUPYTER_SYNCDB_EXTENSIONS)) {
465
// treating jupyter as a weird special case here.
466
return auxFileToOriginal(this.path);
467
}
468
return this.path;
469
};
470
471
private getComputeServerManagerDoc = () => {
472
if (this.path == COMPUTE_SERVE_MANAGER_SYNCDB_PARAMS.path) {
473
// don't want to recursively explode!
474
return null;
475
}
476
if (SyncDoc.computeServerManagerDoc == null) {
477
if (this.client.is_project()) {
478
// @ts-ignore: TODO!
479
SyncDoc.computeServerManagerDoc = this.client.syncdoc({
480
path: COMPUTE_SERVE_MANAGER_SYNCDB_PARAMS.path,
481
});
482
} else {
483
// @ts-ignore: TODO!
484
SyncDoc.computeServerManagerDoc = this.client.sync_client.sync_db({
485
project_id: this.project_id,
486
...COMPUTE_SERVE_MANAGER_SYNCDB_PARAMS,
487
});
488
}
489
if (
490
SyncDoc.computeServerManagerDoc != null &&
491
!this.client.is_browser()
492
) {
493
// start watching for state changes
494
SyncDoc.computeServerManagerDoc.on(
495
"change",
496
this.handleComputeServerManagerChange,
497
);
498
}
499
}
500
return SyncDoc.computeServerManagerDoc;
501
};
502
503
private handleComputeServerManagerChange = async (keys) => {
504
if (SyncDoc.computeServerManagerDoc == null) {
505
return;
506
}
507
let relevant = false;
508
for (const key of keys ?? []) {
509
if (key.get("path") == this.path) {
510
relevant = true;
511
break;
512
}
513
}
514
if (!relevant) {
515
return;
516
}
517
const path = this.getFileServerPath();
518
const fileServerId =
519
SyncDoc.computeServerManagerDoc.get_one({ path })?.get("id") ?? 0;
520
const ourId = this.client.is_project()
521
? 0
522
: decodeUUIDtoNum(this.client.client_id());
523
// we are considering ourself the file server already if we have
524
// either a watcher or autosave on.
525
const thinkWeAreFileServer =
526
this.file_watcher != null || this.fileserver_autosave_timer;
527
const weAreFileServer = fileServerId == ourId;
528
if (thinkWeAreFileServer != weAreFileServer) {
529
// life has changed! Let's adapt.
530
if (thinkWeAreFileServer) {
531
// we were acting as the file server, but now we are not.
532
await this.save_to_disk_filesystem_owner();
533
// Stop doing things we are no longer supposed to do.
534
clearInterval(this.fileserver_autosave_timer as any);
535
this.fileserver_autosave_timer = 0;
536
// stop watching filesystem
537
await this.update_watch_path();
538
} else {
539
// load our state from the disk
540
await this.load_from_disk();
541
// we were not acting as the file server, but now we need. Let's
542
// step up to the plate.
543
// start watching filesystem
544
await this.update_watch_path(this.path);
545
// enable autosave
546
await this.init_file_autosave();
547
}
548
}
549
};
550
551
// Return id of ACTIVE remote compute server, if one is connected and pinging, or 0
552
// if none is connected. This is used by Jupyter to determine who
553
// should evaluate code.
554
// We always take the smallest id of the remote
555
// compute servers, in case there is more than one, so exactly one of them
556
// takes control. Always returns 0 if cursors are not enabled for this
557
// document, since the cursors table is used to coordinate the compute
558
// server.
559
getComputeServerId = (): number => {
560
if (!this.cursors) {
561
return 0;
562
}
563
// This info is in the "cursors" table instead of the document itself
564
// to avoid wasting space in the database longterm. Basically a remote
565
// Jupyter client that can provide compute announces this by reporting it's
566
// cursor to look a certain way.
567
const cursors = this.get_cursors({
568
maxAge: COMPUTE_THRESH_MS,
569
// don't exclude self since getComputeServerId called from the compute
570
// server also to know if it is the chosen one.
571
excludeSelf: "never",
572
});
573
const dbg = this.dbg("getComputeServerId");
574
dbg("num cursors = ", cursors.size);
575
let minId = Infinity;
576
// NOTE: similar code is in frontend/jupyter/cursor-manager.ts
577
for (const [client_id, cursor] of cursors) {
578
if (cursor.getIn(["locs", 0, "type"]) == COMPUTER_SERVER_CURSOR_TYPE) {
579
try {
580
minId = Math.min(minId, decodeUUIDtoNum(client_id));
581
} catch (err) {
582
// this should never happen unless a client were being malicious.
583
dbg(
584
"WARNING -- client_id should encode server id, but is",
585
client_id,
586
);
587
}
588
}
589
}
590
591
return isFinite(minId) ? minId : 0;
592
};
593
594
registerAsComputeServer = () => {
595
this.setCursorLocsNoThrottle([{ type: COMPUTER_SERVER_CURSOR_TYPE }]);
596
};
597
598
/* Set this user's cursors to the given locs. */
599
setCursorLocsNoThrottle = async (
600
// locs is 'any' and not any[] because of a codemirror syntax highlighting bug!
601
locs: any,
602
side_effect: boolean = false,
603
) => {
604
if (this.state != "ready") {
605
return;
606
}
607
if (this.cursors_table == null) {
608
if (!this.cursors) {
609
throw Error("cursors are not enabled");
610
}
611
// table not initialized yet
612
return;
613
}
614
if (this.useConat) {
615
const time = this.client.server_time().valueOf();
616
const x: {
617
user_id: number;
618
locs: any;
619
time: number;
620
} = {
621
user_id: this.my_user_id,
622
locs,
623
time,
624
};
625
// will actually always be non-null due to above
626
this.cursor_last_time = new Date(x.time);
627
this.cursors_table.set(x);
628
return;
629
}
630
631
const x: {
632
string_id?: string;
633
user_id: number;
634
locs: any[];
635
time?: Date;
636
} = {
637
string_id: this.string_id,
638
user_id: this.my_user_id,
639
locs,
640
};
641
const now = this.client.server_time();
642
if (!side_effect || (x.time ?? now) >= now) {
643
// the now comparison above is in case the cursor time
644
// is in the future (due to clock issues) -- always fix that.
645
x.time = now;
646
}
647
if (x.time != null) {
648
// will actually always be non-null due to above
649
this.cursor_last_time = x.time;
650
}
651
this.cursors_table.set(x, "none");
652
await this.cursors_table.save();
653
};
654
655
set_cursor_locs: typeof this.setCursorLocsNoThrottle = throttle(
656
this.setCursorLocsNoThrottle,
657
USE_CONAT ? CURSOR_THROTTLE_NATS_MS : CURSOR_THROTTLE_MS,
658
{
659
leading: true,
660
trailing: true,
661
},
662
);
663
664
private init_file_use_interval = (): void => {
665
if (this.file_use_interval == null) {
666
this.file_use_interval = 60 * 1000;
667
}
668
669
if (!this.file_use_interval || !this.client.is_browser()) {
670
// file_use_interval has to be nonzero, and we only do
671
// this for browser user.
672
return;
673
}
674
675
const file_use = async () => {
676
await delay(100); // wait a little so my_patches and gets updated.
677
// We ONLY count this and record that the file was
678
// edited if there was an actual change record in the
679
// patches log, by this user, since last time.
680
let user_is_active: boolean = false;
681
for (const tm in this.my_patches) {
682
if (new Date(parseInt(tm)) > this.last_user_change) {
683
user_is_active = true;
684
break;
685
}
686
}
687
if (!user_is_active) {
688
return;
689
}
690
this.last_user_change = new Date();
691
this.client.mark_file?.({
692
project_id: this.project_id,
693
path: this.path,
694
action: "edit",
695
ttl: this.file_use_interval,
696
});
697
};
698
this.throttled_file_use = throttle(file_use, this.file_use_interval, {
699
leading: true,
700
});
701
702
this.on("user-change", this.throttled_file_use as any);
703
};
704
705
isClosed = () => (this.state ?? "closed") == "closed";
706
707
private set_state = (state: State): void => {
708
this.state = state;
709
this.emit(state);
710
};
711
712
get_state = (): State => {
713
return this.state;
714
};
715
716
get_project_id = (): string => {
717
return this.project_id;
718
};
719
720
get_path = (): string => {
721
return this.path;
722
};
723
724
get_string_id = (): string => {
725
return this.string_id;
726
};
727
728
get_my_user_id = (): number => {
729
return this.my_user_id != null ? this.my_user_id : 0;
730
};
731
732
private assert_not_closed(desc: string): void {
733
if (this.state === "closed") {
734
//console.trace();
735
throw Error(`must not be closed -- ${desc}`);
736
}
737
}
738
739
set_doc = (doc: Document, exit_undo_mode: boolean = true): void => {
740
if (doc.is_equal(this.doc)) {
741
// no change.
742
return;
743
}
744
if (exit_undo_mode) this.undo_state = undefined;
745
// console.log(`sync-doc.set_doc("${doc.to_str()}")`);
746
this.doc = doc;
747
748
// debounced, so don't immediately alert, in case there are many
749
// more sets comming in the same loop:
750
this.emit_change_debounced();
751
};
752
753
// Convenience function to avoid having to do
754
// get_doc and set_doc constantly.
755
set = (x: any): void => {
756
this.set_doc(this.doc.set(x));
757
};
758
759
delete = (x?: any): void => {
760
this.set_doc(this.doc.delete(x));
761
};
762
763
get = (x?: any): any => {
764
return this.doc.get(x);
765
};
766
767
get_one(x?: any): any {
768
return this.doc.get_one(x);
769
}
770
771
// Return underlying document, or undefined if document
772
// hasn't been set yet.
773
get_doc = (): Document => {
774
if (this.doc == null) {
775
throw Error("doc must be set");
776
}
777
return this.doc;
778
};
779
780
// Set this doc from its string representation.
781
from_str = (value: string): void => {
782
// console.log(`sync-doc.from_str("${value}")`);
783
this.doc = this._from_str(value);
784
};
785
786
// Return string representation of this doc,
787
// or exception if not yet ready.
788
to_str = (): string => {
789
if (this.doc == null) {
790
throw Error("doc must be set");
791
}
792
return this.doc.to_str();
793
};
794
795
count = (): number => {
796
return this.doc.count();
797
};
798
799
// Version of the document at a given point in time; if no
800
// time specified, gives the version right now.
801
// If not fully initialized, will throw exception.
802
version = (time?: number): Document => {
803
this.assert_table_is_ready("patches");
804
assertDefined(this.patch_list);
805
return this.patch_list.value({ time });
806
};
807
808
/* Compute version of document if the patches at the given times
809
were simply not included. This is a building block that is
810
used for implementing undo functionality for client editors. */
811
version_without = (without_times: number[]): Document => {
812
this.assert_table_is_ready("patches");
813
assertDefined(this.patch_list);
814
return this.patch_list.value({ without_times });
815
};
816
817
// Revert document to what it was at the given point in time.
818
// There doesn't have to be a patch at exactly that point in
819
// time -- if there isn't it just uses the patch before that
820
// point in time.
821
revert = (time: number): void => {
822
this.set_doc(this.version(time));
823
};
824
825
/* Undo/redo public api.
826
Calling this.undo and this.redo returns the version of
827
the document after the undo or redo operation, and records
828
a commit changing to that.
829
The first time calling this.undo switches into undo
830
state in which additional
831
calls to undo/redo move up and down the stack of changes made
832
by this user during this session.
833
834
Call this.exit_undo_mode() to exit undo/redo mode.
835
836
Undo and redo *only* impact changes made by this user during
837
this session. Other users edits are unaffected, and work by
838
this same user working from another browser tab or session is
839
also unaffected.
840
841
Finally, undo of a past patch by definition means "the state
842
of the document" if that patch was not applied. The impact
843
of undo is NOT that the patch is removed from the patch history.
844
Instead, it records a new patch that is what would have happened
845
had we replayed history with the patches being undone not there.
846
847
Doing any set_doc explicitly exits undo mode automatically.
848
*/
849
undo = (): Document => {
850
const prev = this._undo();
851
this.set_doc(prev, false);
852
this.commit();
853
return prev;
854
};
855
856
redo = (): Document => {
857
const next = this._redo();
858
this.set_doc(next, false);
859
this.commit();
860
return next;
861
};
862
863
private _undo(): Document {
864
this.assert_is_ready("_undo");
865
let state = this.undo_state;
866
if (state == null) {
867
// not in undo mode
868
state = this.initUndoState();
869
}
870
if (state.pointer === state.my_times.length) {
871
// pointing at live state (e.g., happens on entering undo mode)
872
const value: Document = this.version(); // last saved version
873
const live: Document = this.doc;
874
if (!live.is_equal(value)) {
875
// User had unsaved changes, so last undo is to revert to version without those.
876
state.final = value.make_patch(live); // live redo if needed
877
state.pointer -= 1; // most recent timestamp
878
return value;
879
} else {
880
// User had no unsaved changes, so last undo is version without last saved change.
881
const tm = state.my_times[state.pointer - 1];
882
state.pointer -= 2;
883
if (tm != null) {
884
state.without.push(tm);
885
return this.version_without(state.without);
886
} else {
887
// no undo information during this session
888
return value;
889
}
890
}
891
} else {
892
// pointing at particular timestamp in the past
893
if (state.pointer >= 0) {
894
// there is still more to undo
895
state.without.push(state.my_times[state.pointer]);
896
state.pointer -= 1;
897
}
898
return this.version_without(state.without);
899
}
900
}
901
902
private _redo(): Document {
903
this.assert_is_ready("_redo");
904
const state = this.undo_state;
905
if (state == null) {
906
// nothing to do but return latest live version
907
return this.get_doc();
908
}
909
if (state.pointer === state.my_times.length) {
910
// pointing at live state -- nothing to do
911
return this.get_doc();
912
} else if (state.pointer === state.my_times.length - 1) {
913
// one back from live state, so apply unsaved patch to live version
914
const value = this.version();
915
if (value == null) {
916
// see remark in undo -- do nothing
917
return this.get_doc();
918
}
919
state.pointer += 1;
920
return value.apply_patch(state.final);
921
} else {
922
// at least two back from live state
923
state.without.pop();
924
state.pointer += 1;
925
if (state.final == null && state.pointer === state.my_times.length - 1) {
926
// special case when there wasn't any live change
927
state.pointer += 1;
928
}
929
return this.version_without(state.without);
930
}
931
}
932
933
in_undo_mode = (): boolean => {
934
return this.undo_state != null;
935
};
936
937
exit_undo_mode = (): void => {
938
this.undo_state = undefined;
939
};
940
941
private initUndoState = (): UndoState => {
942
if (this.undo_state != null) {
943
return this.undo_state;
944
}
945
const my_times = keys(this.my_patches).map((x) => parseInt(x));
946
my_times.sort();
947
this.undo_state = {
948
my_times,
949
pointer: my_times.length,
950
without: [],
951
};
952
return this.undo_state;
953
};
954
955
private save_to_disk_autosave = async (): Promise<void> => {
956
if (this.state !== "ready") {
957
return;
958
}
959
const dbg = this.dbg("save_to_disk_autosave");
960
dbg();
961
try {
962
await this.save_to_disk();
963
} catch (err) {
964
dbg(`failed -- ${err}`);
965
}
966
};
967
968
/* Make it so the local hub project will automatically save
969
the file to disk periodically. */
970
private init_file_autosave = async () => {
971
// Do not autosave sagews until we resolve
972
// https://github.com/sagemathinc/cocalc/issues/974
973
// Similarly, do not autosave ipynb because of
974
// https://github.com/sagemathinc/cocalc/issues/5216
975
if (
976
!FILE_SERVER_AUTOSAVE_S ||
977
!(await this.isFileServer()) ||
978
this.fileserver_autosave_timer ||
979
endswith(this.path, ".sagews") ||
980
endswith(this.path, "." + JUPYTER_SYNCDB_EXTENSIONS)
981
) {
982
return;
983
}
984
985
// Explicit cast due to node vs browser typings.
986
this.fileserver_autosave_timer = <any>(
987
setInterval(this.save_to_disk_autosave, FILE_SERVER_AUTOSAVE_S * 1000)
988
);
989
};
990
991
// account_id of the user who made the edit at
992
// the given point in time.
993
account_id = (time: number): string => {
994
this.assert_is_ready("account_id");
995
return this.users[this.user_id(time)];
996
};
997
998
// Integer index of user who made the edit at given
999
// point in time.
1000
user_id = (time: number): number => {
1001
this.assert_table_is_ready("patches");
1002
assertDefined(this.patch_list);
1003
return this.patch_list.user_id(time);
1004
};
1005
1006
private syncstring_table_get_one = (): Map<string, any> => {
1007
if (this.syncstring_table == null) {
1008
logger.warn("syncstring_table missing", {
1009
path: this.path,
1010
state: this.state,
1011
});
1012
return Map();
1013
}
1014
const t = this.syncstring_table.get_one();
1015
if (t == null) {
1016
// project has not initialized it yet.
1017
return Map();
1018
}
1019
return t;
1020
};
1021
1022
/* The project calls set_initialized once it has checked for
1023
the file on disk; this way the frontend knows that the
1024
syncstring has been initialized in the database, and also
1025
if there was an error doing the check.
1026
*/
1027
private set_initialized = async (
1028
error: string,
1029
read_only: boolean,
1030
size: number,
1031
): Promise<void> => {
1032
this.assert_table_is_ready("syncstring");
1033
this.dbg("set_initialized")({ error, read_only, size });
1034
const init = { time: this.client.server_time(), size, error };
1035
await this.set_syncstring_table({
1036
init,
1037
read_only,
1038
last_active: this.client.server_time(),
1039
});
1040
};
1041
1042
/* List of logical timestamps of the versions of this string in the sync
1043
table that we opened to start editing (so starts with what was
1044
the most recent snapshot when we started). The list of timestamps
1045
is sorted from oldest to newest. */
1046
versions = (): number[] => {
1047
assertDefined(this.patch_list);
1048
return this.patch_list.versions();
1049
};
1050
1051
wallTime = (version: number): number | undefined => {
1052
return this.patch_list?.wallTime(version);
1053
};
1054
1055
// newest version of any non-staging known patch on this client,
1056
// including ones just made that might not be in patch_list yet.
1057
newestVersion = (): number | undefined => {
1058
return this.patch_list?.newest_patch_time();
1059
};
1060
1061
hasVersion = (time: number): boolean => {
1062
assertDefined(this.patch_list);
1063
return this.patch_list.hasVersion(time);
1064
};
1065
1066
historyFirstVersion = () => {
1067
this.assert_table_is_ready("patches");
1068
assertDefined(this.patch_list);
1069
return this.patch_list.firstVersion();
1070
};
1071
1072
historyLastVersion = () => {
1073
this.assert_table_is_ready("patches");
1074
assertDefined(this.patch_list);
1075
return this.patch_list.lastVersion();
1076
};
1077
1078
historyVersionNumber = (time: number): number | undefined => {
1079
return this.patch_list?.versionNumber(time);
1080
};
1081
1082
last_changed = (): number => {
1083
const v = this.versions();
1084
return v[v.length - 1] ?? 0;
1085
};
1086
1087
private init_table_close_handlers(): void {
1088
for (const x of ["syncstring", "patches", "cursors"]) {
1089
const t = this[x + "_table"];
1090
if (t != null) {
1091
t.on("close", this.close);
1092
}
1093
}
1094
}
1095
1096
// more gentle version -- this can cause the project actions
1097
// to be *created* etc.
1098
end = reuseInFlight(async () => {
1099
if (this.client.is_browser() && this.state == "ready") {
1100
try {
1101
await this.save_to_disk();
1102
} catch (err) {
1103
// has to be non-fatal since we are closing the document,
1104
// and of couse we need to clear up everything else.
1105
// Do nothing here.
1106
}
1107
}
1108
this.close();
1109
});
1110
1111
// Close synchronized editing of this string; this stops listening
1112
// for changes and stops broadcasting changes.
1113
close = reuseInFlight(async () => {
1114
if (this.state == "closed") {
1115
return;
1116
}
1117
const dbg = this.dbg("close");
1118
dbg("close");
1119
1120
SyncDoc.computeServerManagerDoc?.removeListener(
1121
"change",
1122
this.handleComputeServerManagerChange,
1123
);
1124
//
1125
// SYNC STUFF
1126
//
1127
1128
// WARNING: that 'closed' is emitted at the beginning of the
1129
// close function (before anything async) for the project is
1130
// assumed in src/packages/project/sync/sync-doc.ts, because
1131
// that ensures that the moment close is called we lock trying
1132
// try create the syncdoc again until closing is finished.
1133
// (This set_state call emits "closed"):
1134
this.set_state("closed");
1135
1136
this.emit("close");
1137
1138
// must be after the emits above, so clients know
1139
// what happened and can respond.
1140
this.removeAllListeners();
1141
1142
if (this.throttled_file_use != null) {
1143
// Cancel any pending file_use calls.
1144
cancel_scheduled(this.throttled_file_use);
1145
(this.throttled_file_use as any).cancel();
1146
}
1147
1148
if (this.emit_change != null) {
1149
// Cancel any pending change emit calls.
1150
cancel_scheduled(this.emit_change);
1151
}
1152
1153
if (this.fileserver_autosave_timer) {
1154
clearInterval(this.fileserver_autosave_timer as any);
1155
this.fileserver_autosave_timer = 0;
1156
}
1157
1158
if (this.read_only_timer) {
1159
clearInterval(this.read_only_timer as any);
1160
this.read_only_timer = 0;
1161
}
1162
1163
this.patch_update_queue = [];
1164
1165
// Stop watching for file changes. It's important to
1166
// do this *before* all the await's below, since
1167
// this syncdoc can't do anything in response to a
1168
// a file change in its current state.
1169
this.update_watch_path(); // no input = closes it, if open
1170
1171
if (this.patch_list != null) {
1172
// not async -- just a data structure in memory
1173
this.patch_list.close();
1174
}
1175
1176
try {
1177
this.closeTables();
1178
dbg("closeTables -- successfully saved all data to database");
1179
} catch (err) {
1180
dbg(`closeTables -- ERROR -- ${err}`);
1181
}
1182
// this avoids memory leaks:
1183
close(this);
1184
1185
// after doing that close, we need to keep the state (which just got deleted) as 'closed'
1186
this.set_state("closed");
1187
dbg("close done");
1188
});
1189
1190
private closeTables = async () => {
1191
this.syncstring_table?.close();
1192
this.patches_table?.close();
1193
this.cursors_table?.close();
1194
this.evaluator?.close();
1195
this.ipywidgets_state?.close();
1196
};
1197
1198
// TODO: We **have** to do this on the client, since the backend
1199
// **security model** for accessing the patches table only
1200
// knows the string_id, but not the project_id/path. Thus
1201
// there is no way currently to know whether or not the client
1202
// has access to the patches, and hence the patches table
1203
// query fails. This costs significant time -- a roundtrip
1204
// and write to the database -- whenever the user opens a file.
1205
// This fix should be to change the patches schema somehow
1206
// to have the user also provide the project_id and path, thus
1207
// proving they have access to the sha1 hash (string_id), but
1208
// don't actually use the project_id and path as columns in
1209
// the table. This requires some new idea I guess of virtual
1210
// fields....
1211
// Also, this also establishes the correct doctype.
1212
1213
// Since this MUST succeed before doing anything else. This is critical
1214
// because the patches table can't be opened anywhere if the syncstring
1215
// object doesn't exist, due to how our security works, *AND* that the
1216
// patches table uses the string_id, which is a SHA1 hash.
1217
private ensure_syncstring_exists_in_db = async (): Promise<void> => {
1218
const dbg = this.dbg("ensure_syncstring_exists_in_db");
1219
if (this.useConat) {
1220
dbg("skipping -- no database");
1221
return;
1222
}
1223
1224
if (!this.client.is_connected()) {
1225
dbg("wait until connected...", this.client.is_connected());
1226
await once(this.client, "connected");
1227
}
1228
1229
if (this.client.is_browser() && !this.client.is_signed_in()) {
1230
// the browser has to sign in, unlike the project (and compute servers)
1231
await once(this.client, "signed_in");
1232
}
1233
1234
if (this.state == ("closed" as State)) return;
1235
1236
dbg("do syncstring write query...");
1237
1238
await callback2(this.client.query, {
1239
query: {
1240
syncstrings: {
1241
string_id: this.string_id,
1242
project_id: this.project_id,
1243
path: this.path,
1244
doctype: JSON.stringify(this.doctype),
1245
},
1246
},
1247
});
1248
dbg("wrote syncstring to db - done.");
1249
};
1250
1251
private synctable = async (
1252
query,
1253
options: any[],
1254
throttle_changes?: undefined | number,
1255
): Promise<SyncTable> => {
1256
this.assert_not_closed("synctable");
1257
const dbg = this.dbg("synctable");
1258
if (!this.useConat && !this.ephemeral && this.persistent) {
1259
// persistent table in a non-ephemeral syncdoc, so ensure that table is
1260
// persisted to database (not just in memory).
1261
options = options.concat([{ persistent: true }]);
1262
}
1263
if (this.ephemeral) {
1264
options.push({ ephemeral: true });
1265
}
1266
let synctable;
1267
let ephemeral = false;
1268
for (const x of options) {
1269
if (x.ephemeral) {
1270
ephemeral = true;
1271
break;
1272
}
1273
}
1274
if (this.useConat && query.patches) {
1275
synctable = await this.client.synctable_conat(query, {
1276
obj: {
1277
project_id: this.project_id,
1278
path: this.path,
1279
},
1280
stream: true,
1281
atomic: true,
1282
desc: { path: this.path },
1283
start_seq: this.last_seq,
1284
ephemeral,
1285
});
1286
1287
if (this.last_seq) {
1288
// any possibility last_seq is wrong?
1289
if (!isCompletePatchStream(synctable.dstream)) {
1290
// we load everything and fix it. This happened
1291
// for data moving to conat when the seq numbers changed.
1292
console.log("updating invalid timetravel -- ", this.path);
1293
1294
synctable.close();
1295
synctable = await this.client.synctable_conat(query, {
1296
obj: {
1297
project_id: this.project_id,
1298
path: this.path,
1299
},
1300
stream: true,
1301
atomic: true,
1302
desc: { path: this.path },
1303
ephemeral,
1304
});
1305
1306
// also find the correct last_seq:
1307
let n = synctable.dstream.length - 1;
1308
for (; n >= 0; n--) {
1309
const x = synctable.dstream[n];
1310
if (x?.is_snapshot) {
1311
const time = x.time;
1312
// find the seq number with time
1313
let m = n - 1;
1314
let last_seq = 0;
1315
while (m >= 1) {
1316
if (synctable.dstream[m].time == time) {
1317
last_seq = synctable.dstream.seq(m);
1318
break;
1319
}
1320
m -= 1;
1321
}
1322
this.last_seq = last_seq;
1323
await this.set_syncstring_table({
1324
last_snapshot: time,
1325
last_seq,
1326
});
1327
this.setLastSnapshot(time);
1328
break;
1329
}
1330
}
1331
if (n == -1) {
1332
// no snapshot? should never happen, but just in case.
1333
delete this.last_seq;
1334
await this.set_syncstring_table({
1335
last_seq: undefined,
1336
});
1337
}
1338
}
1339
}
1340
} else if (this.useConat && query.syncstrings) {
1341
synctable = await this.client.synctable_conat(query, {
1342
obj: {
1343
project_id: this.project_id,
1344
path: this.path,
1345
},
1346
stream: false,
1347
atomic: false,
1348
immutable: true,
1349
desc: { path: this.path },
1350
ephemeral,
1351
});
1352
} else if (this.useConat && query.ipywidgets) {
1353
synctable = await this.client.synctable_conat(query, {
1354
obj: {
1355
project_id: this.project_id,
1356
path: this.path,
1357
},
1358
stream: false,
1359
atomic: true,
1360
immutable: true,
1361
// for now just putting a 1-day limit on the ipywidgets table
1362
// so we don't waste a ton of space.
1363
config: { max_age: 1000 * 60 * 60 * 24 },
1364
desc: { path: this.path },
1365
ephemeral: true, // ipywidgets state always ephemeral
1366
});
1367
} else if (this.useConat && (query.eval_inputs || query.eval_outputs)) {
1368
synctable = await this.client.synctable_conat(query, {
1369
obj: {
1370
project_id: this.project_id,
1371
path: this.path,
1372
},
1373
stream: false,
1374
atomic: true,
1375
immutable: true,
1376
config: { max_age: 5 * 60 * 1000 },
1377
desc: { path: this.path },
1378
ephemeral: true, // eval state (for sagews) is always ephemeral
1379
});
1380
} else if (this.useConat) {
1381
synctable = await this.client.synctable_conat(query, {
1382
obj: {
1383
project_id: this.project_id,
1384
path: this.path,
1385
},
1386
stream: false,
1387
atomic: true,
1388
immutable: true,
1389
desc: { path: this.path },
1390
ephemeral,
1391
});
1392
} else {
1393
// only used for unit tests and the ephemeral messaging composer
1394
if (this.client.synctable_ephemeral == null) {
1395
throw Error(`client does not support sync properly`);
1396
}
1397
synctable = await this.client.synctable_ephemeral(
1398
this.project_id,
1399
query,
1400
options,
1401
throttle_changes,
1402
);
1403
}
1404
// We listen and log error events. This is useful because in some settings, e.g.,
1405
// in the project, an eventemitter with no listener for errors, which has an error,
1406
// will crash the entire process.
1407
synctable.on("error", (error) => dbg("ERROR", error));
1408
return synctable;
1409
};
1410
1411
private init_syncstring_table = async (): Promise<void> => {
1412
const query = {
1413
syncstrings: [
1414
{
1415
string_id: this.string_id,
1416
project_id: this.project_id,
1417
path: this.path,
1418
users: null,
1419
last_snapshot: null,
1420
last_seq: null,
1421
snapshot_interval: null,
1422
save: null,
1423
last_active: null,
1424
init: null,
1425
read_only: null,
1426
last_file_change: null,
1427
doctype: null,
1428
archived: null,
1429
settings: null,
1430
},
1431
],
1432
};
1433
const dbg = this.dbg("init_syncstring_table");
1434
1435
dbg("getting table...");
1436
this.syncstring_table = await this.synctable(query, []);
1437
if (this.ephemeral && this.client.is_project()) {
1438
await this.set_syncstring_table({
1439
doctype: JSON.stringify(this.doctype),
1440
});
1441
} else {
1442
dbg("handling the first update...");
1443
this.handle_syncstring_update();
1444
}
1445
this.syncstring_table.on("change", this.handle_syncstring_update);
1446
};
1447
1448
// Used for internal debug logging
1449
private dbg = (_f: string = ""): Function => {
1450
if (DEBUG) {
1451
return (...args) => {
1452
logger.debug(this.path, _f, ...args);
1453
};
1454
} else {
1455
return (..._args) => {};
1456
}
1457
};
1458
1459
private initAll = async (): Promise<void> => {
1460
if (this.state !== "init") {
1461
throw Error("connect can only be called in init state");
1462
}
1463
const log = this.dbg("initAll");
1464
1465
log("update interest");
1466
this.initInterestLoop();
1467
1468
log("ensure syncstring exists");
1469
this.assert_not_closed("initAll -- before ensuring syncstring exists");
1470
await this.ensure_syncstring_exists_in_db();
1471
1472
await this.init_syncstring_table();
1473
this.assert_not_closed("initAll -- successful init_syncstring_table");
1474
1475
log("patch_list, cursors, evaluator, ipywidgets");
1476
this.assert_not_closed(
1477
"initAll -- before init patch_list, cursors, evaluator, ipywidgets",
1478
);
1479
if (PARALLEL_INIT) {
1480
await Promise.all([
1481
this.init_patch_list(),
1482
this.init_cursors(),
1483
this.init_evaluator(),
1484
this.init_ipywidgets(),
1485
]);
1486
this.assert_not_closed(
1487
"initAll -- successful init patch_list, cursors, evaluator, and ipywidgets",
1488
);
1489
} else {
1490
await this.init_patch_list();
1491
this.assert_not_closed("initAll -- successful init_patch_list");
1492
await this.init_cursors();
1493
this.assert_not_closed("initAll -- successful init_patch_cursors");
1494
await this.init_evaluator();
1495
this.assert_not_closed("initAll -- successful init_evaluator");
1496
await this.init_ipywidgets();
1497
this.assert_not_closed("initAll -- successful init_ipywidgets");
1498
}
1499
1500
this.init_table_close_handlers();
1501
this.assert_not_closed("initAll -- successful init_table_close_handlers");
1502
1503
log("file_use_interval");
1504
this.init_file_use_interval();
1505
1506
if (await this.isFileServer()) {
1507
log("load_from_disk");
1508
// This sets initialized, which is needed to be fully ready.
1509
// We keep trying this load from disk until sync-doc is closed
1510
// or it succeeds. It may fail if, e.g., the file is too
1511
// large or is not readable by the user. They are informed to
1512
// fix the problem... and once they do (and wait up to 10s),
1513
// this will finish.
1514
// if (!this.client.is_browser() && !this.client.is_project()) {
1515
// // FAKE DELAY!!! Just to simulate flakiness / slow network!!!!
1516
// await delay(3000);
1517
// }
1518
await retry_until_success({
1519
f: this.init_load_from_disk,
1520
max_delay: 10000,
1521
desc: "syncdoc -- load_from_disk",
1522
});
1523
log("done loading from disk");
1524
} else {
1525
if (this.patch_list!.count() == 0) {
1526
await Promise.race([
1527
this.waitUntilFullyReady(),
1528
once(this.patch_list!, "change"),
1529
]);
1530
}
1531
}
1532
this.assert_not_closed("initAll -- load from disk");
1533
this.emit("init");
1534
1535
this.assert_not_closed("initAll -- after waiting until fully ready");
1536
1537
if (await this.isFileServer()) {
1538
log("init file autosave");
1539
this.init_file_autosave();
1540
}
1541
this.update_has_unsaved_changes();
1542
log("done");
1543
};
1544
1545
private init_error = (): string | undefined => {
1546
let x;
1547
try {
1548
x = this.syncstring_table.get_one();
1549
} catch (_err) {
1550
// if the table hasn't been initialized yet,
1551
// it can't be in error state.
1552
return undefined;
1553
}
1554
return x?.get("init")?.get("error");
1555
};
1556
1557
// wait until the syncstring table is ready to be
1558
// used (so extracted from archive, etc.),
1559
private waitUntilFullyReady = async (): Promise<void> => {
1560
this.assert_not_closed("wait_until_fully_ready");
1561
const dbg = this.dbg("wait_until_fully_ready");
1562
dbg();
1563
1564
if (this.client.is_browser() && this.init_error()) {
1565
// init is set and is in error state. Give the backend a few seconds
1566
// to try to fix this error before giving up. The browser client
1567
// can close and open the file to retry this (as instructed).
1568
try {
1569
await this.syncstring_table.wait(() => !this.init_error(), 5);
1570
} catch (err) {
1571
// fine -- let the code below deal with this problem...
1572
}
1573
}
1574
1575
let init;
1576
const is_init = (t: SyncTable) => {
1577
this.assert_not_closed("is_init");
1578
const tbl = t.get_one();
1579
if (tbl == null) {
1580
dbg("null");
1581
return false;
1582
}
1583
init = tbl.get("init")?.toJS();
1584
return init != null;
1585
};
1586
dbg("waiting for init...");
1587
await this.syncstring_table.wait(is_init, 0);
1588
dbg("init done");
1589
if (init.error) {
1590
throw Error(init.error);
1591
}
1592
assertDefined(this.patch_list);
1593
if (init.size == null) {
1594
// don't crash but warn at least.
1595
console.warn("SYNC BUG -- init.size must be defined", { init });
1596
}
1597
if (
1598
!this.client.is_project() &&
1599
this.patch_list.count() === 0 &&
1600
init.size
1601
) {
1602
dbg("waiting for patches for nontrivial file");
1603
// normally this only happens in a later event loop,
1604
// so force it now.
1605
dbg("handling patch update queue since", this.patch_list.count());
1606
await this.handle_patch_update_queue();
1607
assertDefined(this.patch_list);
1608
dbg("done handling, now ", this.patch_list.count());
1609
if (this.patch_list.count() === 0) {
1610
// wait for a change -- i.e., project loading the file from
1611
// disk and making available... Because init.size > 0, we know that
1612
// there must be SOMETHING in the patches table once initialization is done.
1613
// This is the root cause of https://github.com/sagemathinc/cocalc/issues/2382
1614
await once(this.patches_table, "change");
1615
dbg("got patches_table change");
1616
await this.handle_patch_update_queue();
1617
dbg("handled update queue");
1618
}
1619
}
1620
};
1621
1622
private assert_table_is_ready = (table: string): void => {
1623
const t = this[table + "_table"]; // not using string template only because it breaks codemirror!
1624
if (t == null || t.get_state() != "connected") {
1625
throw Error(
1626
`Table ${table} must be connected. string_id=${this.string_id}`,
1627
);
1628
}
1629
};
1630
1631
assert_is_ready = (desc: string): void => {
1632
if (this.state != "ready") {
1633
throw Error(`must be ready -- ${desc}`);
1634
}
1635
};
1636
1637
wait_until_ready = async (): Promise<void> => {
1638
this.assert_not_closed("wait_until_ready");
1639
if (this.state !== ("ready" as State)) {
1640
// wait for a state change to ready.
1641
await once(this, "ready");
1642
}
1643
};
1644
1645
/* Calls wait for the corresponding patches SyncTable, if
1646
it has been defined. If it hasn't been defined, it waits
1647
until it is defined, then calls wait. Timeout only starts
1648
when patches_table is already initialized.
1649
*/
1650
wait = async (until: Function, timeout: number = 30): Promise<any> => {
1651
await this.wait_until_ready();
1652
//console.trace("SYNC WAIT -- start...");
1653
const result = await wait({
1654
obj: this,
1655
until,
1656
timeout,
1657
change_event: "change",
1658
});
1659
//console.trace("SYNC WAIT -- got it!");
1660
return result;
1661
};
1662
1663
/* Delete the synchronized string and **all** patches from the database
1664
-- basically delete the complete history of editing this file.
1665
WARNINGS:
1666
(1) If a project has this string open, then things may be messed
1667
up, unless that project is restarted.
1668
(2) Only available for an **admin** user right now!
1669
1670
To use: from a javascript console in the browser as admin, do:
1671
1672
await smc.client.sync_string({
1673
project_id:'9f2e5869-54b8-4890-8828-9aeba9a64af4',
1674
path:'a.txt'}).delete_from_database()
1675
1676
Then make sure project and clients refresh.
1677
1678
WORRY: Race condition where constructor might write stuff as
1679
it is being deleted?
1680
*/
1681
delete_from_database = async (): Promise<void> => {
1682
const queries: object[] = this.ephemeral
1683
? []
1684
: [
1685
{
1686
patches_delete: {
1687
id: [this.string_id],
1688
dummy: null,
1689
},
1690
},
1691
];
1692
queries.push({
1693
syncstrings_delete: {
1694
project_id: this.project_id,
1695
path: this.path,
1696
},
1697
});
1698
1699
const v: Promise<any>[] = [];
1700
for (let i = 0; i < queries.length; i++) {
1701
v.push(callback2(this.client.query, { query: queries[i] }));
1702
}
1703
await Promise.all(v);
1704
};
1705
1706
private pathExistsAndIsReadOnly = async (path): Promise<boolean> => {
1707
try {
1708
await callback2(this.client.path_access, {
1709
path,
1710
mode: "w",
1711
});
1712
// clearly exists and is NOT read only:
1713
return false;
1714
} catch (err) {
1715
// either it doesn't exist or it is read only
1716
if (await callback2(this.client.path_exists, { path })) {
1717
// it exists, so is read only and exists
1718
return true;
1719
}
1720
// doesn't exist
1721
return false;
1722
}
1723
};
1724
1725
private file_is_read_only = async (): Promise<boolean> => {
1726
if (await this.pathExistsAndIsReadOnly(this.path)) {
1727
return true;
1728
}
1729
const path = this.getFileServerPath();
1730
if (path != this.path) {
1731
if (await this.pathExistsAndIsReadOnly(path)) {
1732
return true;
1733
}
1734
}
1735
return false;
1736
};
1737
1738
private update_if_file_is_read_only = async (): Promise<void> => {
1739
const read_only = await this.file_is_read_only();
1740
if (this.state == "closed") {
1741
return;
1742
}
1743
this.set_read_only(read_only);
1744
};
1745
1746
private init_load_from_disk = async (): Promise<void> => {
1747
if (this.state == "closed") {
1748
// stop trying, no error -- this is assumed
1749
// in a retry_until_success elsewhere.
1750
return;
1751
}
1752
if (await this.load_from_disk_if_newer()) {
1753
throw Error("failed to load from disk");
1754
}
1755
};
1756
1757
private load_from_disk_if_newer = async (): Promise<boolean> => {
1758
const last_changed = new Date(this.last_changed());
1759
const firstLoad = this.versions().length == 0;
1760
const dbg = this.dbg("load_from_disk_if_newer");
1761
let is_read_only: boolean = false;
1762
let size: number = 0;
1763
let error: string = "";
1764
try {
1765
dbg("check if path exists");
1766
if (await callback2(this.client.path_exists, { path: this.path })) {
1767
// the path exists
1768
dbg("path exists -- stat file");
1769
const stats = await callback2(this.client.path_stat, {
1770
path: this.path,
1771
});
1772
if (firstLoad || stats.ctime > last_changed) {
1773
dbg(
1774
`disk file changed more recently than edits (or first load), so loading, ${stats.ctime} > ${last_changed}; firstLoad=${firstLoad}`,
1775
);
1776
size = await this.load_from_disk();
1777
if (firstLoad) {
1778
dbg("emitting first-load event");
1779
// this event is emited the first time the document is ever loaded from disk.
1780
this.emit("first-load");
1781
}
1782
dbg("loaded");
1783
} else {
1784
dbg("stick with database version");
1785
}
1786
dbg("checking if read only");
1787
is_read_only = await this.file_is_read_only();
1788
dbg("read_only", is_read_only);
1789
}
1790
} catch (err) {
1791
error = `${err}`;
1792
}
1793
1794
await this.set_initialized(error, is_read_only, size);
1795
dbg("done");
1796
return !!error;
1797
};
1798
1799
private patch_table_query = (cutoff?: number) => {
1800
const query = {
1801
string_id: this.string_id,
1802
is_snapshot: false, // only used with conat
1803
time: cutoff ? { ">=": cutoff } : null,
1804
wall: null,
1805
// compressed format patch as a JSON *string*
1806
patch: null,
1807
// integer id of user (maps to syncstring table)
1808
user_id: null,
1809
// (optional) a snapshot at this point in time
1810
snapshot: null,
1811
// info about sequence number, count, etc. of this snapshot
1812
seq_info: null,
1813
parents: null,
1814
version: null,
1815
};
1816
if (this.doctype.patch_format != null) {
1817
(query as any).format = this.doctype.patch_format;
1818
}
1819
return query;
1820
};
1821
1822
private setLastSnapshot(last_snapshot?: number) {
1823
// only set last_snapshot here, so we can keep it in sync with patch_list.last_snapshot
1824
// and also be certain about the data type (being number or undefined).
1825
if (last_snapshot !== undefined && typeof last_snapshot != "number") {
1826
throw Error("type of last_snapshot must be number or undefined");
1827
}
1828
this.last_snapshot = last_snapshot;
1829
}
1830
1831
private init_patch_list = async (): Promise<void> => {
1832
this.assert_not_closed("init_patch_list - start");
1833
const dbg = this.dbg("init_patch_list");
1834
dbg();
1835
1836
// CRITICAL: note that handle_syncstring_update checks whether
1837
// init_patch_list is done by testing whether this.patch_list is defined!
1838
// That is why we first define "patch_list" below, then set this.patch_list
1839
// to it only after we're done.
1840
delete this.patch_list;
1841
1842
const patch_list = new SortedPatchList({
1843
from_str: this._from_str,
1844
});
1845
1846
dbg("opening the table...");
1847
const query = { patches: [this.patch_table_query(this.last_snapshot)] };
1848
this.patches_table = await this.synctable(query, [], this.patch_interval);
1849
this.assert_not_closed("init_patch_list -- after making synctable");
1850
1851
const update_has_unsaved_changes = debounce(
1852
this.update_has_unsaved_changes,
1853
500,
1854
{ leading: true, trailing: true },
1855
);
1856
1857
this.patches_table.on("has-uncommitted-changes", (val) => {
1858
this.emit("has-uncommitted-changes", val);
1859
});
1860
1861
this.on("change", () => {
1862
update_has_unsaved_changes();
1863
});
1864
1865
this.syncstring_table.on("change", () => {
1866
update_has_unsaved_changes();
1867
});
1868
1869
dbg("adding all known patches");
1870
patch_list.add(this.get_patches());
1871
1872
dbg("possibly kick off loading more history");
1873
let last_start_seq: null | number = null;
1874
while (patch_list.needsMoreHistory()) {
1875
// @ts-ignore
1876
const dstream = this.patches_table.dstream;
1877
if (dstream == null) {
1878
break;
1879
}
1880
const snap = patch_list.getOldestSnapshot();
1881
if (snap == null) {
1882
break;
1883
}
1884
const seq_info = snap.seq_info ?? {
1885
prev_seq: 1,
1886
};
1887
const start_seq = seq_info.prev_seq ?? 1;
1888
if (last_start_seq != null && start_seq >= last_start_seq) {
1889
// no progress, e.g., corruption would cause this.
1890
// "corruption" is EXPECTED, since a user might be submitting
1891
// patches after being offline, and get disconnected halfway through.
1892
break;
1893
}
1894
last_start_seq = start_seq;
1895
await dstream.load({ start_seq });
1896
dbg("load more history");
1897
patch_list.add(this.get_patches());
1898
if (start_seq <= 1) {
1899
// loaded everything
1900
break;
1901
}
1902
}
1903
1904
//this.patches_table.on("saved", this.handle_offline);
1905
this.patch_list = patch_list;
1906
1907
let doc;
1908
try {
1909
doc = patch_list.value();
1910
} catch (err) {
1911
console.warn("error getting doc", err);
1912
doc = this._from_str("");
1913
}
1914
this.last = this.doc = doc;
1915
this.patches_table.on("change", this.handle_patch_update);
1916
1917
dbg("done");
1918
};
1919
1920
private init_evaluator = async () => {
1921
const dbg = this.dbg("init_evaluator");
1922
const ext = filename_extension(this.path);
1923
if (ext !== "sagews") {
1924
dbg("done -- only use init_evaluator for sagews");
1925
return;
1926
}
1927
dbg("creating the evaluator and waiting for init");
1928
this.evaluator = new Evaluator(this, this.client, this.synctable);
1929
await this.evaluator.init();
1930
dbg("done");
1931
};
1932
1933
private init_ipywidgets = async () => {
1934
const dbg = this.dbg("init_evaluator");
1935
const ext = filename_extension(this.path);
1936
if (ext != JUPYTER_SYNCDB_EXTENSIONS) {
1937
dbg("done -- only use ipywidgets for jupyter");
1938
return;
1939
}
1940
dbg("creating the ipywidgets state table, and waiting for init");
1941
this.ipywidgets_state = new IpywidgetsState(
1942
this,
1943
this.client,
1944
this.synctable,
1945
);
1946
await this.ipywidgets_state.init();
1947
dbg("done");
1948
};
1949
1950
private init_cursors = async () => {
1951
const dbg = this.dbg("init_cursors");
1952
if (!this.cursors) {
1953
dbg("done -- do not care about cursors for this syncdoc.");
1954
return;
1955
}
1956
if (this.useConat) {
1957
dbg("cursors broadcast using pub/sub");
1958
this.cursors_table = await this.client.pubsub_conat({
1959
project_id: this.project_id,
1960
path: this.path,
1961
name: "cursors",
1962
});
1963
this.cursors_table.on(
1964
"change",
1965
(obj: { user_id: number; locs: any; time: number }) => {
1966
const account_id = this.users[obj.user_id];
1967
if (!account_id) {
1968
return;
1969
}
1970
if (obj.locs == null && !this.cursor_map.has(account_id)) {
1971
// gone, and already gone.
1972
return;
1973
}
1974
if (obj.locs != null) {
1975
// changed
1976
this.cursor_map = this.cursor_map.set(account_id, fromJS(obj));
1977
} else {
1978
// deleted
1979
this.cursor_map = this.cursor_map.delete(account_id);
1980
}
1981
this.emit("cursor_activity", account_id);
1982
},
1983
);
1984
return;
1985
}
1986
1987
dbg("getting cursors ephemeral table");
1988
const query = {
1989
cursors: [
1990
{
1991
string_id: this.string_id,
1992
user_id: null,
1993
locs: null,
1994
time: null,
1995
},
1996
],
1997
};
1998
// We make cursors an ephemeral table, since there is no
1999
// need to persist it to the database, obviously!
2000
// Also, queue_size:1 makes it so only the last cursor position is
2001
// saved, e.g., in case of disconnect and reconnect.
2002
const options = [{ ephemeral: true }, { queue_size: 1 }]; // probably deprecated
2003
this.cursors_table = await this.synctable(query, options, 1000);
2004
this.assert_not_closed("init_cursors -- after making synctable");
2005
2006
// cursors now initialized; first initialize the
2007
// local this._cursor_map, which tracks positions
2008
// of cursors by account_id:
2009
dbg("loading initial state");
2010
const s = this.cursors_table.get();
2011
if (s == null) {
2012
throw Error("bug -- get should not return null once table initialized");
2013
}
2014
s.forEach((locs: any, k: string) => {
2015
if (locs == null) {
2016
return;
2017
}
2018
const u = JSON.parse(k);
2019
if (u != null) {
2020
this.cursor_map = this.cursor_map.set(this.users[u[1]], locs);
2021
}
2022
});
2023
this.cursors_table.on("change", this.handle_cursors_change);
2024
2025
dbg("done");
2026
};
2027
2028
private handle_cursors_change = (keys) => {
2029
if (this.state === "closed") {
2030
return;
2031
}
2032
for (const k of keys) {
2033
const u = JSON.parse(k);
2034
if (u == null) {
2035
continue;
2036
}
2037
const account_id = this.users[u[1]];
2038
if (!account_id) {
2039
// this happens for ephemeral table when project restarts and browser
2040
// has data it is trying to send.
2041
continue;
2042
}
2043
const locs = this.cursors_table.get(k);
2044
if (locs == null && !this.cursor_map.has(account_id)) {
2045
// gone, and already gone.
2046
continue;
2047
}
2048
if (locs != null) {
2049
// changed
2050
this.cursor_map = this.cursor_map.set(account_id, locs);
2051
} else {
2052
// deleted
2053
this.cursor_map = this.cursor_map.delete(account_id);
2054
}
2055
this.emit("cursor_activity", account_id);
2056
}
2057
};
2058
2059
/* Returns *immutable* Map from account_id to list
2060
of cursor positions, if cursors are enabled.
2061
2062
- excludeSelf: do not include our own cursor
2063
- maxAge: only include cursors that have been updated with maxAge ms from now.
2064
*/
2065
get_cursors = ({
2066
maxAge = 60 * 1000,
2067
// excludeSelf:
2068
// 'always' -- *always* exclude self
2069
// 'never' -- never exclude self
2070
// 'heuristic' -- exclude self is older than last set from here, e.g., useful on
2071
// frontend so we don't see our own cursor unless more than one browser.
2072
excludeSelf = "always",
2073
}: {
2074
maxAge?: number;
2075
excludeSelf?: "always" | "never" | "heuristic";
2076
} = {}): CursorMap => {
2077
this.assert_not_closed("get_cursors");
2078
if (!this.cursors) {
2079
throw Error("cursors are not enabled");
2080
}
2081
if (this.cursors_table == null) {
2082
return Map(); // not loaded yet -- so no info yet.
2083
}
2084
const account_id: string = this.client_id();
2085
let map = this.cursor_map;
2086
if (map.has(account_id) && excludeSelf != "never") {
2087
if (
2088
excludeSelf == "always" ||
2089
(excludeSelf == "heuristic" &&
2090
this.cursor_last_time >=
2091
new Date(map.getIn([account_id, "time"], 0) as number))
2092
) {
2093
map = map.delete(account_id);
2094
}
2095
}
2096
// Remove any old cursors, where "old" is by default more than maxAge old.
2097
const now = Date.now();
2098
for (const [client_id, value] of map as any) {
2099
const time = value.get("time");
2100
if (time == null) {
2101
// this should always be set.
2102
map = map.delete(client_id);
2103
continue;
2104
}
2105
if (maxAge) {
2106
// we use abs to implicitly exclude a bad value that is somehow in the future,
2107
// if that were to happen.
2108
if (Math.abs(now - time.valueOf()) >= maxAge) {
2109
map = map.delete(client_id);
2110
continue;
2111
}
2112
}
2113
if (time >= now + 10 * 1000) {
2114
// We *always* delete any cursors more than 10 seconds in the future, since
2115
// that can only happen if a client inserts invalid data (e.g., clock not
2116
// yet synchronized). See https://github.com/sagemathinc/cocalc/issues/7969
2117
map = map.delete(client_id);
2118
continue;
2119
}
2120
}
2121
return map;
2122
};
2123
2124
/* Set settings map. Used for custom configuration just for
2125
this one file, e.g., overloading the spell checker language.
2126
*/
2127
set_settings = async (obj): Promise<void> => {
2128
this.assert_is_ready("set_settings");
2129
await this.set_syncstring_table({
2130
settings: obj,
2131
});
2132
};
2133
2134
client_id = () => {
2135
return this.client.client_id();
2136
};
2137
2138
// get settings object
2139
get_settings = (): Map<string, any> => {
2140
this.assert_is_ready("get_settings");
2141
return this.syncstring_table_get_one().get("settings", Map());
2142
};
2143
2144
/*
2145
Commits and saves current live syncdoc to backend.
2146
2147
Function only returns when there is nothing needing
2148
saving.
2149
2150
Save any changes we have as a new patch.
2151
*/
2152
save = reuseInFlight(async () => {
2153
const dbg = this.dbg("save");
2154
dbg();
2155
// We just keep trying while syncdoc is ready and there
2156
// are changes that have not been saved (due to this.doc
2157
// changing during the while loop!).
2158
if (this.doc == null || this.last == null || this.state == "closed") {
2159
// EXPECTED: this happens after document is closed
2160
// There's nothing to do regarding save if the table is
2161
// already closed. Note that we *do* have to save when
2162
// the table is init stage, since the project has to
2163
// record the newly opened version of the file to the
2164
// database! See
2165
// https://github.com/sagemathinc/cocalc/issues/4986
2166
return;
2167
}
2168
if (this.client?.is_deleted(this.path, this.project_id)) {
2169
dbg("not saving because deleted");
2170
return;
2171
}
2172
// Compute any patches.
2173
while (!this.doc.is_equal(this.last)) {
2174
dbg("something to save");
2175
this.emit("user-change");
2176
const doc = this.doc;
2177
// TODO: put in a delay if just saved too recently?
2178
// Or maybe won't matter since not using database?
2179
if (this.handle_patch_update_queue_running) {
2180
dbg("wait until the update queue is done");
2181
await once(this, "handle_patch_update_queue_done");
2182
// but wait until next loop (so as to check that needed
2183
// and state still ready).
2184
continue;
2185
}
2186
dbg("Compute new patch.");
2187
this.sync_remote_and_doc(false);
2188
// Emit event since this syncstring was
2189
// changed locally (or we wouldn't have had
2190
// to save at all).
2191
if (doc.is_equal(this.doc)) {
2192
dbg("no change during loop -- done!");
2193
break;
2194
}
2195
}
2196
if (this.state != "ready") {
2197
// above async waits could have resulted in state change.
2198
return;
2199
}
2200
await this.handle_patch_update_queue();
2201
if (this.state != "ready") {
2202
return;
2203
}
2204
2205
// Ensure all patches are saved to backend.
2206
// We do this after the above, so that creating the newest patch
2207
// happens immediately on save, which makes it possible for clients
2208
// to save current state without having to wait on an async, which is
2209
// useful to ensure specific undo points (e.g., right before a paste).
2210
await this.patches_table.save();
2211
});
2212
2213
private timeOfLastCommit: number | undefined = undefined;
2214
private next_patch_time = (): number => {
2215
let time = this.client.server_time().valueOf();
2216
if (time == this.timeOfLastCommit) {
2217
time = this.timeOfLastCommit + 1;
2218
}
2219
assertDefined(this.patch_list);
2220
time = this.patch_list.next_available_time(
2221
time,
2222
this.my_user_id,
2223
this.users.length,
2224
);
2225
return time;
2226
};
2227
2228
private commit_patch = (time: number, patch: XPatch): void => {
2229
this.timeOfLastCommit = time;
2230
this.assert_not_closed("commit_patch");
2231
assertDefined(this.patch_list);
2232
const obj: any = {
2233
// version for database
2234
string_id: this.string_id,
2235
// logical time -- usually the sync'd walltime, but
2236
// guaranteed to be increasing.
2237
time,
2238
// what we show user
2239
wall: this.client.server_time().valueOf(),
2240
patch: JSON.stringify(patch),
2241
user_id: this.my_user_id,
2242
is_snapshot: false,
2243
parents: this.patch_list.getHeads(),
2244
version: this.patch_list.lastVersion() + 1,
2245
};
2246
2247
this.my_patches[time.valueOf()] = obj;
2248
2249
if (this.doctype.patch_format != null) {
2250
obj.format = this.doctype.patch_format;
2251
}
2252
2253
// If in undo mode put the just-created patch in our
2254
// without timestamp list, so it won't be included
2255
// when doing undo/redo.
2256
if (this.undo_state != null) {
2257
this.undo_state.without.unshift(time);
2258
}
2259
2260
//console.log 'saving patch with time ', time.valueOf()
2261
let x = this.patches_table.set(obj, "none");
2262
if (x == null) {
2263
// TODO: just for NATS right now!
2264
x = fromJS(obj);
2265
}
2266
const y = this.processPatch({ x, patch, size: obj.patch.size });
2267
this.patch_list.add([y]);
2268
// Since *we* just made a definite change to the document, we're
2269
// active, so we check if we should make a snapshot. There is the
2270
// potential of a race condition where more than one clients make
2271
// a snapshot at the same time -- this would waste a little space
2272
// in the stream, but is otherwise harmless, since the snapshots
2273
// are identical.
2274
this.snapshotIfNecessary();
2275
};
2276
2277
private dstream = () => {
2278
// @ts-ignore -- in general patches_table might not be a conat one still,
2279
// or at least dstream is an internal implementation detail.
2280
const { dstream } = this.patches_table ?? {};
2281
if (dstream == null) {
2282
throw Error("dstream must be defined");
2283
}
2284
return dstream;
2285
};
2286
2287
// return the conat-assigned sequence number of the oldest entry in the
2288
// patch list with the given time, and also:
2289
// - prev_seq -- the sequence number of previous patch before that, for use in "load more"
2290
// - index -- the global index of the entry with the given time.
2291
private conatSnapshotSeqInfo = (
2292
time: number,
2293
): { seq: number; prev_seq?: number } => {
2294
const dstream = this.dstream();
2295
// seq = actual sequence number of the message with the patch that we're
2296
// snapshotting at -- i.e., at time
2297
let seq: number | undefined = undefined;
2298
// prev_seq = sequence number of patch of *previous* snapshot, if there is a previous one.
2299
// This is needed for incremental loading of more history.
2300
let prev_seq: number | undefined;
2301
let i = 0;
2302
for (const mesg of dstream.getAll()) {
2303
if (mesg.is_snapshot && mesg.time < time) {
2304
// the seq field of this message has the actual sequence number of the patch
2305
// that was snapshotted, along with the index of that patch.
2306
prev_seq = mesg.seq_info.seq;
2307
}
2308
if (seq === undefined && mesg.time == time) {
2309
seq = dstream.seq(i);
2310
}
2311
i += 1;
2312
}
2313
if (seq == null) {
2314
throw Error(
2315
`unable to find message with time '${time}'=${new Date(time)}`,
2316
);
2317
}
2318
return { seq, prev_seq };
2319
};
2320
2321
/* Create and store in the database a snapshot of the state
2322
of the string at the given point in time. This should
2323
be the time of an existing patch.
2324
2325
The point of a snapshot is that if you load all patches recorded
2326
>= this point in time, then you don't need any earlier ones to
2327
reconstruct the document, since otherwise, why have the snapshot at
2328
all, as it does not good. Due to potentially long offline users
2329
putting old data into history, this can fail. However, in the usual
2330
case we should never record a snapshot with this bad property.
2331
*/
2332
private snapshot = reuseInFlight(async (time: number): Promise<void> => {
2333
assertDefined(this.patch_list);
2334
const x = this.patch_list.patch(time);
2335
if (x == null) {
2336
throw Error(`no patch at time ${time}`);
2337
}
2338
if (x.snapshot != null) {
2339
// there is already a snapshot at this point in time,
2340
// so nothing further to do.
2341
return;
2342
}
2343
2344
const snapshot: string = this.patch_list.value({ time }).to_str();
2345
// save the snapshot itself in the patches table.
2346
const seq_info = this.conatSnapshotSeqInfo(time);
2347
const obj = {
2348
size: snapshot.length,
2349
string_id: this.string_id,
2350
time,
2351
wall: time,
2352
is_snapshot: true,
2353
snapshot,
2354
user_id: x.user_id,
2355
seq_info,
2356
};
2357
// also set snapshot in the this.patch_list, which which saves a little time.
2358
// and ensures that "(x.snapshot != null)" above works if snapshot is called again.
2359
this.patch_list.add([obj]);
2360
this.patches_table.set(obj);
2361
await this.patches_table.save();
2362
if (this.state != "ready") {
2363
return;
2364
}
2365
2366
const last_seq = seq_info.seq;
2367
await this.set_syncstring_table({
2368
last_snapshot: time,
2369
last_seq,
2370
});
2371
this.setLastSnapshot(time);
2372
this.last_seq = last_seq;
2373
});
2374
2375
// Have a snapshot every this.snapshot_interval patches, except
2376
// for the very last interval. Throttle so we don't try to make
2377
// snapshots too frequently, as making them is always optional and
2378
// now part of the UI.
2379
private snapshotIfNecessary = throttle(async (): Promise<void> => {
2380
if (this.get_state() !== "ready") {
2381
// especially important due to throttle
2382
return;
2383
}
2384
const dbg = this.dbg("snapshotIfNecessary");
2385
const max_size = Math.floor(1.2 * MAX_FILE_SIZE_MB * 1000000);
2386
const interval = this.snapshot_interval;
2387
dbg("check if we need to make a snapshot:", { interval, max_size });
2388
assertDefined(this.patch_list);
2389
const time = this.patch_list.time_of_unmade_periodic_snapshot(
2390
interval,
2391
max_size,
2392
);
2393
if (time != null) {
2394
dbg("yes, try to make a snapshot at time", time);
2395
try {
2396
await this.snapshot(time);
2397
} catch (err) {
2398
// this is expected to happen sometimes, e.g., when sufficient information
2399
// isn't known about the stream of patches.
2400
console.log(
2401
`(expected) WARNING: client temporarily unable to make a snapshot of ${this.path} -- ${err}`,
2402
);
2403
}
2404
} else {
2405
dbg("no need to make a snapshot yet");
2406
}
2407
}, 60000);
2408
2409
/*- x - patch object
2410
- patch: if given will be used as an actual patch
2411
instead of x.patch, which is a JSON string.
2412
*/
2413
private processPatch = ({
2414
x,
2415
patch,
2416
size: size0,
2417
}: {
2418
x: Map<string, any>;
2419
patch?: any;
2420
size?: number;
2421
}): Patch => {
2422
let t = x.get("time");
2423
if (typeof t != "number") {
2424
// backwards compat
2425
t = new Date(t).valueOf();
2426
}
2427
const time: number = t;
2428
const wall = x.get("wall") ?? time;
2429
const user_id: number = x.get("user_id");
2430
let parents: number[] = x.get("parents")?.toJS() ?? [];
2431
let size: number;
2432
const is_snapshot = x.get("is_snapshot");
2433
if (is_snapshot) {
2434
size = x.get("snapshot")?.length ?? 0;
2435
} else {
2436
if (patch == null) {
2437
/* Do **NOT** use misc.from_json, since we definitely
2438
do not want to unpack ISO timestamps as Date,
2439
since patch just contains the raw patches from
2440
user editing. This was done for a while, which
2441
led to horrific bugs in some edge cases...
2442
See https://github.com/sagemathinc/cocalc/issues/1771
2443
*/
2444
if (x.has("patch")) {
2445
const p: string = x.get("patch");
2446
patch = JSON.parse(p);
2447
size = p.length;
2448
} else {
2449
patch = [];
2450
size = 2;
2451
}
2452
} else {
2453
const p = x.get("patch");
2454
size = p?.length ?? size0 ?? JSON.stringify(patch).length;
2455
}
2456
}
2457
2458
const obj: Patch = {
2459
time,
2460
wall,
2461
user_id,
2462
patch,
2463
size,
2464
is_snapshot,
2465
parents,
2466
version: x.get("version"),
2467
};
2468
if (is_snapshot) {
2469
obj.snapshot = x.get("snapshot"); // this is a string
2470
obj.seq_info = x.get("seq_info")?.toJS();
2471
if (obj.snapshot == null || obj.seq_info == null) {
2472
console.warn("WARNING: message = ", x.toJS());
2473
throw Error(
2474
`message with is_snapshot true must also set snapshot and seq_info fields -- time=${time}`,
2475
);
2476
}
2477
}
2478
return obj;
2479
};
2480
2481
/* Return all patches with time such that
2482
time0 <= time <= time1;
2483
If time0 undefined then sets time0 equal to time of last_snapshot.
2484
If time1 undefined treated as +oo.
2485
*/
2486
private get_patches = (): Patch[] => {
2487
this.assert_table_is_ready("patches");
2488
2489
// m below is an immutable map with keys the string that
2490
// is the JSON version of the primary key
2491
// [string_id, timestamp, user_number].
2492
let m: Map<string, any> | undefined = this.patches_table.get();
2493
if (m == null) {
2494
// won't happen because of assert above.
2495
throw Error("patches_table must be initialized");
2496
}
2497
if (!Map.isMap(m)) {
2498
// TODO: this is just for proof of concept NATS!!
2499
m = fromJS(m);
2500
}
2501
const v: Patch[] = [];
2502
m.forEach((x, _) => {
2503
const p = this.processPatch({ x });
2504
if (p != null) {
2505
return v.push(p);
2506
}
2507
});
2508
v.sort(patch_cmp);
2509
return v;
2510
};
2511
2512
hasFullHistory = (): boolean => {
2513
if (this.patch_list == null) {
2514
return false;
2515
}
2516
return this.patch_list.hasFullHistory();
2517
};
2518
2519
// returns true if there may be additional history to load
2520
// after loading this. return false if definitely done.
2521
loadMoreHistory = async ({
2522
all,
2523
}: {
2524
// if true, loads all history
2525
all?: boolean;
2526
} = {}): Promise<boolean> => {
2527
if (this.hasFullHistory() || this.ephemeral || this.patch_list == null) {
2528
return false;
2529
}
2530
let start_seq;
2531
if (all) {
2532
start_seq = 1;
2533
} else {
2534
const seq_info = this.patch_list.getOldestSnapshot()?.seq_info;
2535
if (seq_info == null) {
2536
// nothing more to load
2537
return false;
2538
}
2539
start_seq = seq_info.prev_seq ?? 1;
2540
}
2541
// Doing this load triggers change events for all the patch info
2542
// that gets loaded.
2543
// TODO: right now we load everything, since the seq_info is wrong
2544
// from the NATS migration. Maybe this is fine since it is very efficient.
2545
// @ts-ignore
2546
await this.patches_table.dstream?.load({ start_seq: 0 });
2547
2548
// Wait until patch update queue is empty
2549
while (this.patch_update_queue.length > 0) {
2550
await once(this, "patch-update-queue-empty");
2551
}
2552
return start_seq > 1;
2553
};
2554
2555
legacyHistoryExists = async () => {
2556
const info = await this.legacy.getInfo();
2557
return !!info.uuid;
2558
};
2559
2560
private loadedLegacyHistory = false;
2561
loadLegacyHistory = reuseInFlight(async () => {
2562
if (this.loadedLegacyHistory) {
2563
return;
2564
}
2565
this.loadedLegacyHistory = true;
2566
if (!this.hasFullHistory()) {
2567
throw Error("must first load full history first");
2568
}
2569
const { patches, users } = await this.legacy.getPatches();
2570
if (this.patch_list == null) {
2571
return;
2572
}
2573
// @ts-ignore - cheating here
2574
const first = this.patch_list.patches[0];
2575
if ((first?.parents ?? []).length > 0) {
2576
throw Error("first patch should have no parents");
2577
}
2578
for (const patch of patches) {
2579
// @ts-ignore
2580
patch.time = new Date(patch.time).valueOf();
2581
}
2582
patches.sort(field_cmp("time"));
2583
const v: Patch[] = [];
2584
let version = -patches.length;
2585
let i = 0;
2586
for (const patch of patches) {
2587
// @ts-ignore
2588
patch.version = version;
2589
version += 1;
2590
if (i > 0) {
2591
// @ts-ignore
2592
patch.parents = [patches[i - 1].time];
2593
} else {
2594
// @ts-ignore
2595
patch.parents = [];
2596
}
2597
2598
// remap the user_id field
2599
const account_id = users[patch.user_id];
2600
let user_id = this.users.indexOf(account_id);
2601
if (user_id == -1) {
2602
this.users.push(account_id);
2603
user_id = this.users.length - 1;
2604
}
2605
patch.user_id = user_id;
2606
2607
const p = this.processPatch({ x: fromJS(patch) });
2608
i += 1;
2609
v.push(p);
2610
}
2611
if (first != null) {
2612
// @ts-ignore
2613
first.parents = [patches[patches.length - 1].time];
2614
first.is_snapshot = true;
2615
first.snapshot = this.patch_list.value({ time: first.time }).to_str();
2616
}
2617
this.patch_list.add(v);
2618
this.emit("change");
2619
});
2620
2621
show_history = (opts = {}): void => {
2622
assertDefined(this.patch_list);
2623
this.patch_list.show_history(opts);
2624
};
2625
2626
set_snapshot_interval = async (n: number): Promise<void> => {
2627
await this.set_syncstring_table({
2628
snapshot_interval: n,
2629
});
2630
await this.syncstring_table.save();
2631
};
2632
2633
get_last_save_to_disk_time = (): Date => {
2634
return this.last_save_to_disk_time;
2635
};
2636
2637
private handle_syncstring_save_state = async (
2638
state: string,
2639
time: Date,
2640
): Promise<void> => {
2641
// Called when the save state changes.
2642
2643
/* this.syncstring_save_state is used to make it possible to emit a
2644
'save-to-disk' event, whenever the state changes
2645
to indicate a save completed.
2646
2647
NOTE: it is intentional that this.syncstring_save_state is not defined
2648
the first time this function is called, so that save-to-disk
2649
with last save time gets emitted on initial load (which, e.g., triggers
2650
latex compilation properly in case of a .tex file).
2651
*/
2652
if (state === "done" && this.syncstring_save_state !== "done") {
2653
this.last_save_to_disk_time = time;
2654
this.emit("save-to-disk", time);
2655
}
2656
const dbg = this.dbg("handle_syncstring_save_state");
2657
dbg(
2658
`state='${state}', this.syncstring_save_state='${this.syncstring_save_state}', this.state='${this.state}'`,
2659
);
2660
if (
2661
this.state === "ready" &&
2662
(await this.isFileServer()) &&
2663
this.syncstring_save_state !== "requested" &&
2664
state === "requested"
2665
) {
2666
this.syncstring_save_state = state; // only used in the if above
2667
dbg("requesting save to disk -- calling save_to_disk");
2668
// state just changed to requesting a save to disk...
2669
// so we do it (unless of course syncstring is still
2670
// being initialized).
2671
try {
2672
// Uncomment the following to test simulating a
2673
// random failure in save_to_disk:
2674
// if (Math.random() < 0.5) throw Error("CHAOS MONKEY!"); // FOR TESTING ONLY.
2675
await this.save_to_disk();
2676
} catch (err) {
2677
// CRITICAL: we must unset this.syncstring_save_state (and set the save state);
2678
// otherwise, it stays as "requested" and this if statement would never get
2679
// run again, thus completely breaking saving this doc to disk.
2680
// It is normal behavior that *sometimes* this.save_to_disk might
2681
// throw an exception, e.g., if the file is temporarily deleted
2682
// or save it called before everything is initialized, or file
2683
// is temporarily set readonly, or maybe there is a file system error.
2684
// Of course, the finally below will also take care of this. However,
2685
// it's nice to record the error here.
2686
this.syncstring_save_state = "done";
2687
await this.set_save({ state: "done", error: `${err}` });
2688
dbg(`ERROR saving to disk in handle_syncstring_save_state-- ${err}`);
2689
} finally {
2690
// No matter what, after the above code is run,
2691
// the save state in the table better be "done".
2692
// We triple check that here, though of course
2693
// we believe the logic in save_to_disk and above
2694
// should always accomplish this.
2695
dbg("had to set the state to done in finally block");
2696
if (
2697
this.state === "ready" &&
2698
(this.syncstring_save_state != "done" ||
2699
this.syncstring_table_get_one().getIn(["save", "state"]) != "done")
2700
) {
2701
this.syncstring_save_state = "done";
2702
await this.set_save({ state: "done", error: "" });
2703
}
2704
}
2705
}
2706
};
2707
2708
private handle_syncstring_update = async (): Promise<void> => {
2709
if (this.state === "closed") {
2710
return;
2711
}
2712
if (this.syncstring_table == null) {
2713
logger.warn("handle_syncstring_update without syncstring_table", {
2714
path: this.path,
2715
state: this.state,
2716
});
2717
return;
2718
}
2719
const dbg = this.dbg("handle_syncstring_update");
2720
dbg();
2721
2722
const data = this.syncstring_table_get_one();
2723
const x: any = data != null ? data.toJS() : undefined;
2724
2725
if (x != null && x.save != null) {
2726
this.handle_syncstring_save_state(x.save.state, x.save.time);
2727
}
2728
2729
dbg(JSON.stringify(x));
2730
if (x == null || x.users == null) {
2731
dbg("new_document");
2732
await this.handle_syncstring_update_new_document();
2733
} else {
2734
dbg("update_existing");
2735
await this.handle_syncstring_update_existing_document(x, data);
2736
}
2737
};
2738
2739
private handle_syncstring_update_new_document = async (): Promise<void> => {
2740
// Brand new document
2741
this.emit("load-time-estimate", { type: "new", time: 1 });
2742
this.setLastSnapshot();
2743
this.last_seq = undefined;
2744
this.snapshot_interval =
2745
schema.SCHEMA.syncstrings.user_query?.get?.fields.snapshot_interval ??
2746
DEFAULT_SNAPSHOT_INTERVAL;
2747
2748
// Brand new syncstring
2749
// TODO: worry about race condition with everybody making themselves
2750
// have user_id 0... and also setting doctype.
2751
this.my_user_id = 0;
2752
this.users = [this.client.client_id()];
2753
const obj = {
2754
string_id: this.string_id,
2755
project_id: this.project_id,
2756
path: this.path,
2757
last_snapshot: this.last_snapshot,
2758
users: this.users,
2759
doctype: JSON.stringify(this.doctype),
2760
last_active: this.client.server_time(),
2761
};
2762
this.syncstring_table.set(obj);
2763
await this.syncstring_table.save();
2764
this.settings = Map();
2765
this.emit("metadata-change");
2766
this.emit("settings-change", this.settings);
2767
};
2768
2769
private handle_syncstring_update_existing_document = async (
2770
x: any,
2771
data: Map<string, any>,
2772
): Promise<void> => {
2773
if (this.state === "closed") {
2774
return;
2775
}
2776
// Existing document.
2777
2778
if (this.path == null) {
2779
// We just opened the file -- emit a load time estimate.
2780
this.emit("load-time-estimate", { type: "ready", time: 1 });
2781
}
2782
// TODO: handle doctype change here (?)
2783
this.setLastSnapshot(x.last_snapshot);
2784
this.last_seq = x.last_seq;
2785
this.snapshot_interval = x.snapshot_interval ?? DEFAULT_SNAPSHOT_INTERVAL;
2786
this.users = x.users ?? [];
2787
if (x.project_id) {
2788
// @ts-ignore
2789
this.project_id = x.project_id;
2790
}
2791
if (x.path) {
2792
// @ts-ignore
2793
this.path = x.path;
2794
}
2795
2796
const settings = data.get("settings", Map());
2797
if (settings !== this.settings) {
2798
this.settings = settings;
2799
this.emit("settings-change", settings);
2800
}
2801
2802
if (this.client != null) {
2803
// Ensure that this client is in the list of clients
2804
const client_id: string = this.client_id();
2805
this.my_user_id = this.users.indexOf(client_id);
2806
if (this.my_user_id === -1) {
2807
this.my_user_id = this.users.length;
2808
this.users.push(client_id);
2809
await this.set_syncstring_table({
2810
users: this.users,
2811
});
2812
}
2813
}
2814
this.emit("metadata-change");
2815
};
2816
2817
private init_watch = async (): Promise<void> => {
2818
if (!(await this.isFileServer())) {
2819
// ensures we are NOT watching anything
2820
await this.update_watch_path();
2821
return;
2822
}
2823
2824
// If path isn't being properly watched, make it so.
2825
if (this.watch_path !== this.path) {
2826
await this.update_watch_path(this.path);
2827
}
2828
2829
await this.pending_save_to_disk();
2830
};
2831
2832
private pending_save_to_disk = async (): Promise<void> => {
2833
this.assert_table_is_ready("syncstring");
2834
if (!(await this.isFileServer())) {
2835
return;
2836
}
2837
2838
const x = this.syncstring_table.get_one();
2839
// Check if there is a pending save-to-disk that is needed.
2840
if (x != null && x.getIn(["save", "state"]) === "requested") {
2841
try {
2842
await this.save_to_disk();
2843
} catch (err) {
2844
const dbg = this.dbg("pending_save_to_disk");
2845
dbg(`ERROR saving to disk in pending_save_to_disk -- ${err}`);
2846
}
2847
}
2848
};
2849
2850
private update_watch_path = async (path?: string): Promise<void> => {
2851
const dbg = this.dbg("update_watch_path");
2852
if (this.file_watcher != null) {
2853
// clean up
2854
dbg("close");
2855
this.file_watcher.close();
2856
delete this.file_watcher;
2857
delete this.watch_path;
2858
}
2859
if (path != null && this.client.is_deleted(path, this.project_id)) {
2860
dbg(`not setting up watching since "${path}" is explicitly deleted`);
2861
return;
2862
}
2863
if (path == null) {
2864
dbg("not opening another watcher since path is null");
2865
this.watch_path = path;
2866
return;
2867
}
2868
if (this.watch_path != null) {
2869
// this case is impossible since we deleted it above if it is was defined.
2870
dbg("watch_path already defined");
2871
return;
2872
}
2873
dbg("opening watcher...");
2874
if (this.state === "closed") {
2875
throw Error("must not be closed");
2876
}
2877
this.watch_path = path;
2878
try {
2879
if (!(await callback2(this.client.path_exists, { path }))) {
2880
if (this.client.is_deleted(path, this.project_id)) {
2881
dbg(`not setting up watching since "${path}" is explicitly deleted`);
2882
return;
2883
}
2884
// path does not exist
2885
dbg(
2886
`write '${path}' to disk from syncstring in-memory database version`,
2887
);
2888
const data = this.to_str();
2889
await callback2(this.client.write_file, { path, data });
2890
dbg(`wrote '${path}' to disk`);
2891
}
2892
} catch (err) {
2893
// This can happen, e.g, if path is read only.
2894
dbg(`could NOT write '${path}' to disk -- ${err}`);
2895
await this.update_if_file_is_read_only();
2896
// In this case, can't really setup a file watcher.
2897
return;
2898
}
2899
2900
dbg("now requesting to watch file");
2901
this.file_watcher = this.client.watch_file({ path });
2902
this.file_watcher.on("change", this.handle_file_watcher_change);
2903
this.file_watcher.on("delete", this.handle_file_watcher_delete);
2904
this.setupReadOnlyTimer();
2905
};
2906
2907
private setupReadOnlyTimer = () => {
2908
if (this.read_only_timer) {
2909
clearInterval(this.read_only_timer as any);
2910
this.read_only_timer = 0;
2911
}
2912
this.read_only_timer = <any>(
2913
setInterval(this.update_if_file_is_read_only, READ_ONLY_CHECK_INTERVAL_MS)
2914
);
2915
};
2916
2917
private handle_file_watcher_change = async (ctime: Date): Promise<void> => {
2918
const dbg = this.dbg("handle_file_watcher_change");
2919
const time: number = ctime.valueOf();
2920
dbg(
2921
`file_watcher: change, ctime=${time}, this.save_to_disk_start_ctime=${this.save_to_disk_start_ctime}, this.save_to_disk_end_ctime=${this.save_to_disk_end_ctime}`,
2922
);
2923
if (
2924
this.save_to_disk_start_ctime == null ||
2925
(this.save_to_disk_end_ctime != null &&
2926
time - this.save_to_disk_end_ctime >= RECENT_SAVE_TO_DISK_MS)
2927
) {
2928
// Either we never saved to disk, or the last attempt
2929
// to save was at least RECENT_SAVE_TO_DISK_MS ago, and it finished,
2930
// so definitely this change event was not caused by it.
2931
dbg("load_from_disk since no recent save to disk");
2932
await this.load_from_disk();
2933
return;
2934
}
2935
};
2936
2937
private handle_file_watcher_delete = async (): Promise<void> => {
2938
this.assert_is_ready("handle_file_watcher_delete");
2939
const dbg = this.dbg("handle_file_watcher_delete");
2940
dbg("delete: set_deleted and closing");
2941
await this.client.set_deleted(this.path, this.project_id);
2942
this.close();
2943
};
2944
2945
private load_from_disk = async (): Promise<number> => {
2946
const path = this.path;
2947
const dbg = this.dbg("load_from_disk");
2948
dbg();
2949
const exists: boolean = await callback2(this.client.path_exists, { path });
2950
let size: number;
2951
if (!exists) {
2952
dbg("file no longer exists -- setting to blank");
2953
size = 0;
2954
this.from_str("");
2955
} else {
2956
dbg("file exists");
2957
await this.update_if_file_is_read_only();
2958
2959
const data = await callback2<string>(this.client.path_read, {
2960
path,
2961
maxsize_MB: MAX_FILE_SIZE_MB,
2962
});
2963
2964
size = data.length;
2965
dbg(`got it -- length=${size}`);
2966
this.from_str(data);
2967
this.commit();
2968
// we also know that this is the version on disk, so we update the hash
2969
await this.set_save({
2970
state: "done",
2971
error: "",
2972
hash: hash_string(data),
2973
});
2974
}
2975
// save new version to database, which we just set via from_str.
2976
await this.save();
2977
return size;
2978
};
2979
2980
private set_save = async (save: {
2981
state: string;
2982
error: string;
2983
hash?: number;
2984
expected_hash?: number;
2985
time?: number;
2986
}): Promise<void> => {
2987
this.assert_table_is_ready("syncstring");
2988
// set timestamp of when the save happened; this can be useful
2989
// for coordinating running code, etc.... and is just generally useful.
2990
const cur = this.syncstring_table_get_one().toJS()?.save;
2991
if (cur != null) {
2992
if (
2993
cur.state == save.state &&
2994
cur.error == save.error &&
2995
cur.hash == (save.hash ?? cur.hash) &&
2996
cur.expected_hash == (save.expected_hash ?? cur.expected_hash) &&
2997
cur.time == (save.time ?? cur.time)
2998
) {
2999
// no genuine change, so no point in wasting cycles on updating.
3000
return;
3001
}
3002
}
3003
if (!save.time) {
3004
save.time = Date.now();
3005
}
3006
await this.set_syncstring_table({ save });
3007
};
3008
3009
private set_read_only = async (read_only: boolean): Promise<void> => {
3010
this.assert_table_is_ready("syncstring");
3011
await this.set_syncstring_table({ read_only });
3012
};
3013
3014
is_read_only = (): boolean => {
3015
this.assert_table_is_ready("syncstring");
3016
return this.syncstring_table_get_one().get("read_only");
3017
};
3018
3019
wait_until_read_only_known = async (): Promise<void> => {
3020
await this.wait_until_ready();
3021
function read_only_defined(t: SyncTable): boolean {
3022
const x = t.get_one();
3023
if (x == null) {
3024
return false;
3025
}
3026
return x.get("read_only") != null;
3027
}
3028
await this.syncstring_table.wait(read_only_defined, 5 * 60);
3029
};
3030
3031
/* Returns true if the current live version of this document has
3032
a different hash than the version mostly recently saved to disk.
3033
I.e., if there are changes that have not yet been **saved to
3034
disk**. See the other function has_uncommitted_changes below
3035
for determining whether there are changes that haven't been
3036
commited to the database yet. Returns *undefined* if
3037
initialization not even done yet. */
3038
has_unsaved_changes = (): boolean | undefined => {
3039
if (this.state !== "ready") {
3040
return;
3041
}
3042
const dbg = this.dbg("has_unsaved_changes");
3043
try {
3044
return this.hash_of_saved_version() !== this.hash_of_live_version();
3045
} catch (err) {
3046
dbg(
3047
"exception computing hash_of_saved_version and hash_of_live_version",
3048
err,
3049
);
3050
// This could happen, e.g. when syncstring_table isn't connected
3051
// in some edge case. Better to just say we don't know then crash
3052
// everything. See https://github.com/sagemathinc/cocalc/issues/3577
3053
return;
3054
}
3055
};
3056
3057
// Returns hash of last version saved to disk (as far as we know).
3058
hash_of_saved_version = (): number | undefined => {
3059
if (this.state !== "ready") {
3060
return;
3061
}
3062
return this.syncstring_table_get_one().getIn(["save", "hash"]) as
3063
| number
3064
| undefined;
3065
};
3066
3067
/* Return hash of the live version of the document,
3068
or undefined if the document isn't loaded yet.
3069
(TODO: write faster version of this for syncdb, which
3070
avoids converting to a string, which is a waste of time.) */
3071
hash_of_live_version = (): number | undefined => {
3072
if (this.state !== "ready") {
3073
return;
3074
}
3075
return hash_string(this.doc.to_str());
3076
};
3077
3078
/* Return true if there are changes to this syncstring that
3079
have not been committed to the database (with the commit
3080
acknowledged). This does not mean the file has been
3081
written to disk; however, it does mean that it safe for
3082
the user to close their browser.
3083
*/
3084
has_uncommitted_changes = (): boolean => {
3085
if (this.state !== "ready") {
3086
return false;
3087
}
3088
return this.patches_table.has_uncommitted_changes();
3089
};
3090
3091
// Commit any changes to the live document to
3092
// history as a new patch. Returns true if there
3093
// were changes and false otherwise. This works
3094
// fine offline, and does not wait until anything
3095
// is saved to the network, etc.
3096
commit = (emitChangeImmediately = false): boolean => {
3097
if (this.last == null || this.doc == null || this.last.is_equal(this.doc)) {
3098
return false;
3099
}
3100
// console.trace('commit');
3101
3102
if (emitChangeImmediately) {
3103
// used for local clients. NOTE: don't do this without explicit
3104
// request, since it could in some cases cause serious trouble.
3105
// E.g., for the jupyter backend doing this by default causes
3106
// an infinite recurse. Having this as an option is important, e.g.,
3107
// to avoid flicker/delay in the UI.
3108
this.emit_change();
3109
}
3110
3111
// Now save to backend as a new patch:
3112
this.emit("user-change");
3113
const patch = this.last.make_patch(this.doc); // must be nontrivial
3114
this.last = this.doc;
3115
// ... and save that to patches table
3116
const time = this.next_patch_time();
3117
this.commit_patch(time, patch);
3118
this.save(); // so eventually also gets sent out.
3119
this.touchProject();
3120
return true;
3121
};
3122
3123
/* Initiates a save of file to disk, then waits for the
3124
state to change. */
3125
save_to_disk = async (): Promise<void> => {
3126
if (this.state != "ready") {
3127
// We just make save_to_disk a successful
3128
// no operation, if the document is either
3129
// closed or hasn't finished opening, since
3130
// there's a lot of code that tries to save
3131
// on exit/close or automatically, and it
3132
// is difficult to ensure it all checks state
3133
// properly.
3134
return;
3135
}
3136
const dbg = this.dbg("save_to_disk");
3137
if (this.client.is_deleted(this.path, this.project_id)) {
3138
dbg("not saving to disk because deleted");
3139
await this.set_save({ state: "done", error: "" });
3140
return;
3141
}
3142
3143
// Make sure to include changes to the live document.
3144
// A side effect of save if we didn't do this is potentially
3145
// discarding them, which is obviously not good.
3146
this.commit();
3147
3148
dbg("initiating the save");
3149
if (!this.has_unsaved_changes()) {
3150
dbg("no unsaved changes, so don't save");
3151
// CRITICAL: this optimization is assumed by
3152
// autosave, etc.
3153
await this.set_save({ state: "done", error: "" });
3154
return;
3155
}
3156
3157
if (this.is_read_only()) {
3158
dbg("read only, so can't save to disk");
3159
// save should fail if file is read only and there are changes
3160
throw Error("can't save readonly file with changes to disk");
3161
}
3162
3163
// First make sure any changes are saved to the database.
3164
// One subtle case where this matters is that loading a file
3165
// with \r's into codemirror changes them to \n...
3166
if (!(await this.isFileServer())) {
3167
dbg("browser client -- sending any changes over network");
3168
await this.save();
3169
dbg("save done; now do actual save to the *disk*.");
3170
this.assert_is_ready("save_to_disk - after save");
3171
}
3172
3173
try {
3174
await this.save_to_disk_aux();
3175
} catch (err) {
3176
if (this.state != "ready") return;
3177
const error = `save to disk failed -- ${err}`;
3178
dbg(error);
3179
if (await this.isFileServer()) {
3180
this.set_save({ error, state: "done" });
3181
}
3182
}
3183
if (this.state != "ready") return;
3184
3185
if (!(await this.isFileServer())) {
3186
dbg("now wait for the save to disk to finish");
3187
this.assert_is_ready("save_to_disk - waiting to finish");
3188
await this.wait_for_save_to_disk_done();
3189
}
3190
this.update_has_unsaved_changes();
3191
};
3192
3193
/* Export the (currently loaded) history of editing of this
3194
document to a simple JSON-able object. */
3195
export_history = (options: HistoryExportOptions = {}): HistoryEntry[] => {
3196
this.assert_is_ready("export_history");
3197
const info = this.syncstring_table.get_one();
3198
if (info == null || !info.has("users")) {
3199
throw Error("syncstring table must be defined and users initialized");
3200
}
3201
const account_ids: string[] = info.get("users").toJS();
3202
assertDefined(this.patch_list);
3203
return export_history(account_ids, this.patch_list, options);
3204
};
3205
3206
private update_has_unsaved_changes = (): void => {
3207
if (this.state != "ready") {
3208
// This can happen, since this is called by a debounced function.
3209
// Make it a no-op in case we're not ready.
3210
// See https://github.com/sagemathinc/cocalc/issues/3577
3211
return;
3212
}
3213
const cur = this.has_unsaved_changes();
3214
if (cur !== this.last_has_unsaved_changes) {
3215
this.emit("has-unsaved-changes", cur);
3216
this.last_has_unsaved_changes = cur;
3217
}
3218
};
3219
3220
// wait for save.state to change state.
3221
private wait_for_save_to_disk_done = async (): Promise<void> => {
3222
const dbg = this.dbg("wait_for_save_to_disk_done");
3223
dbg();
3224
function until(table): boolean {
3225
const done = table.get_one().getIn(["save", "state"]) === "done";
3226
dbg("checking... done=", done);
3227
return done;
3228
}
3229
3230
let last_err: string | undefined = undefined;
3231
const f = async () => {
3232
dbg("f");
3233
if (
3234
this.state != "ready" ||
3235
this.client.is_deleted(this.path, this.project_id)
3236
) {
3237
dbg("not ready or deleted - no longer trying to save.");
3238
return;
3239
}
3240
try {
3241
dbg("waiting until done...");
3242
await this.syncstring_table.wait(until, 15);
3243
} catch (err) {
3244
dbg("timed out after 15s");
3245
throw Error("timed out");
3246
}
3247
if (
3248
this.state != "ready" ||
3249
this.client.is_deleted(this.path, this.project_id)
3250
) {
3251
dbg("not ready or deleted - no longer trying to save.");
3252
return;
3253
}
3254
const err = this.syncstring_table_get_one().getIn(["save", "error"]) as
3255
| string
3256
| undefined;
3257
if (err) {
3258
dbg("error", err);
3259
last_err = err;
3260
throw Error(err);
3261
}
3262
dbg("done, with no error.");
3263
last_err = undefined;
3264
return;
3265
};
3266
await retry_until_success({
3267
f,
3268
max_tries: 8,
3269
desc: "wait_for_save_to_disk_done",
3270
});
3271
if (
3272
this.state != "ready" ||
3273
this.client.is_deleted(this.path, this.project_id)
3274
) {
3275
return;
3276
}
3277
if (last_err && typeof this.client.log_error != null) {
3278
this.client.log_error?.({
3279
string_id: this.string_id,
3280
path: this.path,
3281
project_id: this.project_id,
3282
error: `Error saving file -- ${last_err}`,
3283
});
3284
}
3285
};
3286
3287
/* Auxiliary function 2 for saving to disk:
3288
If this is associated with
3289
a project and has a filename.
3290
A user (web browsers) sets the save state to requested.
3291
The project sets the state to saving, does the save
3292
to disk, then sets the state to done.
3293
*/
3294
private save_to_disk_aux = async (): Promise<void> => {
3295
this.assert_is_ready("save_to_disk_aux");
3296
3297
if (!(await this.isFileServer())) {
3298
return await this.save_to_disk_non_filesystem_owner();
3299
}
3300
3301
try {
3302
return await this.save_to_disk_filesystem_owner();
3303
} catch (err) {
3304
this.emit("save_to_disk_filesystem_owner", err);
3305
throw err;
3306
}
3307
};
3308
3309
private save_to_disk_non_filesystem_owner = async (): Promise<void> => {
3310
this.assert_is_ready("save_to_disk_non_filesystem_owner");
3311
3312
if (!this.has_unsaved_changes()) {
3313
/* Browser client has no unsaved changes,
3314
so don't need to save --
3315
CRITICAL: this optimization is assumed by autosave.
3316
*/
3317
return;
3318
}
3319
const x = this.syncstring_table.get_one();
3320
if (x != null && x.getIn(["save", "state"]) === "requested") {
3321
// Nothing to do -- save already requested, which is
3322
// all the browser client has to do.
3323
return;
3324
}
3325
3326
// string version of this doc
3327
const data: string = this.to_str();
3328
const expected_hash = hash_string(data);
3329
await this.set_save({ state: "requested", error: "", expected_hash });
3330
};
3331
3332
private save_to_disk_filesystem_owner = async (): Promise<void> => {
3333
this.assert_is_ready("save_to_disk_filesystem_owner");
3334
const dbg = this.dbg("save_to_disk_filesystem_owner");
3335
3336
// check if on-disk version is same as in memory, in
3337
// which case no save is needed.
3338
const data = this.to_str(); // string version of this doc
3339
const hash = hash_string(data);
3340
dbg("hash = ", hash);
3341
3342
/*
3343
// TODO: put this consistency check back in (?).
3344
const expected_hash = this.syncstring_table
3345
.get_one()
3346
.getIn(["save", "expected_hash"]);
3347
*/
3348
3349
if (hash === this.hash_of_saved_version()) {
3350
// No actual save to disk needed; still we better
3351
// record this fact in table in case it
3352
// isn't already recorded
3353
this.set_save({ state: "done", error: "", hash });
3354
return;
3355
}
3356
3357
const path = this.path;
3358
if (!path) {
3359
const err = "cannot save without path";
3360
this.set_save({ state: "done", error: err });
3361
throw Error(err);
3362
}
3363
3364
dbg("project - write to disk file", path);
3365
// set window to slightly earlier to account for clock
3366
// imprecision.
3367
// Over an sshfs mount, all stats info is **rounded down
3368
// to the nearest second**, which this also takes care of.
3369
this.save_to_disk_start_ctime = Date.now() - 1500;
3370
this.save_to_disk_end_ctime = undefined;
3371
try {
3372
await callback2(this.client.write_file, { path, data });
3373
this.assert_is_ready("save_to_disk_filesystem_owner -- after write_file");
3374
const stat = await callback2(this.client.path_stat, { path });
3375
this.assert_is_ready("save_to_disk_filesystem_owner -- after path_state");
3376
this.save_to_disk_end_ctime = stat.ctime.valueOf() + 1500;
3377
this.set_save({
3378
state: "done",
3379
error: "",
3380
hash: hash_string(data),
3381
});
3382
} catch (err) {
3383
this.set_save({ state: "done", error: JSON.stringify(err) });
3384
throw err;
3385
}
3386
};
3387
3388
/*
3389
When the underlying synctable that defines the state
3390
of the document changes due to new remote patches, this
3391
function is called.
3392
It handles update of the remote version, updating our
3393
live version as a result.
3394
*/
3395
private handle_patch_update = async (changed_keys): Promise<void> => {
3396
// console.log("handle_patch_update", { changed_keys });
3397
if (changed_keys == null || changed_keys.length === 0) {
3398
// this happens right now when we do a save.
3399
return;
3400
}
3401
3402
const dbg = this.dbg("handle_patch_update");
3403
//dbg(changed_keys);
3404
if (this.patch_update_queue == null) {
3405
this.patch_update_queue = [];
3406
}
3407
for (const key of changed_keys) {
3408
this.patch_update_queue.push(key);
3409
}
3410
3411
dbg("Clear patch update_queue in a later event loop...");
3412
await delay(1);
3413
await this.handle_patch_update_queue();
3414
dbg("done");
3415
};
3416
3417
/*
3418
Whenever new patches are added to this.patches_table,
3419
their timestamp gets added to this.patch_update_queue.
3420
*/
3421
private handle_patch_update_queue = async (): Promise<void> => {
3422
const dbg = this.dbg("handle_patch_update_queue");
3423
try {
3424
this.handle_patch_update_queue_running = true;
3425
while (this.state != "closed" && this.patch_update_queue.length > 0) {
3426
dbg("queue size = ", this.patch_update_queue.length);
3427
const v: Patch[] = [];
3428
for (const key of this.patch_update_queue) {
3429
let x = this.patches_table.get(key);
3430
if (x == null) {
3431
continue;
3432
}
3433
if (!Map.isMap(x)) {
3434
// TODO: my NATS synctable-stream doesn't convert to immutable on get.
3435
x = fromJS(x);
3436
}
3437
// may be null, e.g., when deleted.
3438
const t = x.get("time");
3439
// Optimization: only need to process patches that we didn't
3440
// create ourselves during this session.
3441
if (t && !this.my_patches[t.valueOf()]) {
3442
const p = this.processPatch({ x });
3443
//dbg(`patch=${JSON.stringify(p)}`);
3444
if (p != null) {
3445
v.push(p);
3446
}
3447
}
3448
}
3449
this.patch_update_queue = [];
3450
this.emit("patch-update-queue-empty");
3451
assertDefined(this.patch_list);
3452
this.patch_list.add(v);
3453
3454
dbg("waiting for remote and doc to sync...");
3455
this.sync_remote_and_doc(v.length > 0);
3456
await this.patches_table.save();
3457
if (this.state === ("closed" as State)) return; // closed during await; nothing further to do
3458
dbg("remote and doc now synced");
3459
3460
if (this.patch_update_queue.length > 0) {
3461
// It is very important that next loop happen in a later
3462
// event loop to avoid the this.sync_remote_and_doc call
3463
// in this.handle_patch_update_queue above from causing
3464
// sync_remote_and_doc to get called from within itself,
3465
// due to synctable changes being emited on save.
3466
dbg("wait for next event loop");
3467
await delay(1);
3468
}
3469
}
3470
} finally {
3471
if (this.state == "closed") return; // got closed, so nothing further to do
3472
3473
// OK, done and nothing in the queue
3474
// Notify save() to try again -- it may have
3475
// paused waiting for this to clear.
3476
dbg("done");
3477
this.handle_patch_update_queue_running = false;
3478
this.emit("handle_patch_update_queue_done");
3479
}
3480
};
3481
3482
/* Disable and enable sync. When disabled we still
3483
collect patches from upstream (but do not apply them
3484
locally), and changes we make are broadcast into
3485
the patch stream. When we re-enable sync, all
3486
patches are put together in the stream and
3487
everything is synced as normal. This is useful, e.g.,
3488
to make it so a user **actively** editing a document is
3489
not interrupted by being forced to sync (in particular,
3490
by the 'before-change' event that they use to update
3491
the live document).
3492
3493
Also, delay_sync will delay syncing local with upstream
3494
for the given number of ms. Calling it regularly while
3495
user is actively editing to avoid them being bothered
3496
by upstream patches getting merged in.
3497
3498
IMPORTANT: I implemented this, but it is NOT used anywhere
3499
else in the codebase, so don't trust that it works.
3500
*/
3501
3502
disable_sync = (): void => {
3503
this.sync_is_disabled = true;
3504
};
3505
3506
enable_sync = (): void => {
3507
this.sync_is_disabled = false;
3508
this.sync_remote_and_doc(true);
3509
};
3510
3511
delay_sync = (timeout_ms = 2000): void => {
3512
clearTimeout(this.delay_sync_timer);
3513
this.disable_sync();
3514
this.delay_sync_timer = setTimeout(() => {
3515
this.enable_sync();
3516
}, timeout_ms);
3517
};
3518
3519
/*
3520
Merge remote patches and live version to create new live version,
3521
which is equal to result of applying all patches.
3522
*/
3523
private sync_remote_and_doc = (upstreamPatches: boolean): void => {
3524
if (this.last == null || this.doc == null || this.sync_is_disabled) {
3525
return;
3526
}
3527
3528
// Critical to save what we have now so it doesn't get overwritten during
3529
// before-change or setting this.doc below. This caused
3530
// https://github.com/sagemathinc/cocalc/issues/5871
3531
this.commit();
3532
3533
if (upstreamPatches && this.state == "ready") {
3534
// First save any unsaved changes from the live document, which this
3535
// sync-doc doesn't acutally know the state of. E.g., this is some
3536
// rapidly changing live editor with changes not yet saved here.
3537
this.emit("before-change");
3538
// As a result of the emit in the previous line, all kinds of
3539
// nontrivial listener code probably just ran, and it should
3540
// have updated this.doc. We commit this.doc, so that the
3541
// upstream patches get applied against the correct live this.doc.
3542
this.commit();
3543
}
3544
3545
// Compute the global current state of the document,
3546
// which is got by applying all patches in order.
3547
// It is VERY important to do this, even if the
3548
// document is not yet ready, since it is critical
3549
// to properly set the state of this.doc to the value
3550
// of the patch list (e.g., not doing this 100% breaks
3551
// opening a file for the first time on cocalc-docker).
3552
assertDefined(this.patch_list);
3553
const new_remote = this.patch_list.value();
3554
if (!this.doc.is_equal(new_remote)) {
3555
// There is a possibility that live document changed, so
3556
// set to new version.
3557
this.last = this.doc = new_remote;
3558
if (this.state == "ready") {
3559
this.emit("after-change");
3560
this.emit_change();
3561
}
3562
}
3563
};
3564
3565
// Immediately alert all watchers of all changes since
3566
// last time.
3567
private emit_change = (): void => {
3568
this.emit("change", this.doc?.changes(this.before_change));
3569
this.before_change = this.doc;
3570
};
3571
3572
// Alert to changes soon, but debounced in case there are a large
3573
// number of calls in a group. This is called by default.
3574
// The debounce param is 0, since the idea is that this just waits
3575
// until the next "render loop" to avoid huge performance issues
3576
// with a nested for loop of sets. Doing it this way, massively
3577
// simplifies client code.
3578
emit_change_debounced: typeof this.emit_change = debounce(
3579
this.emit_change,
3580
0,
3581
);
3582
3583
private set_syncstring_table = async (obj, save = true) => {
3584
const value0 = this.syncstring_table_get_one();
3585
const value = mergeDeep(value0, fromJS(obj));
3586
if (value0.equals(value)) {
3587
return;
3588
}
3589
this.syncstring_table.set(value);
3590
if (save) {
3591
await this.syncstring_table.save();
3592
}
3593
};
3594
3595
// this keeps the project from idle timing out -- it happens
3596
// whenever there is an edit to the file by a browser, and
3597
// keeps the project from stopping.
3598
private touchProject = throttle(() => {
3599
if (this.client?.is_browser()) {
3600
this.client.touch_project?.(this.project_id);
3601
}
3602
}, 60000);
3603
3604
private initInterestLoop = async () => {
3605
if (!this.client.is_browser()) {
3606
// only browser clients -- so actual humans
3607
return;
3608
}
3609
const touch = async () => {
3610
if (this.state == "closed" || this.client?.touchOpenFile == null) return;
3611
await this.client.touchOpenFile({
3612
path: this.path,
3613
project_id: this.project_id,
3614
doctype: this.doctype,
3615
});
3616
};
3617
// then every CONAT_OPEN_FILE_TOUCH_INTERVAL (30 seconds).
3618
await until(
3619
async () => {
3620
if (this.state == "closed") {
3621
return true;
3622
}
3623
await touch();
3624
return false;
3625
},
3626
{
3627
start: CONAT_OPEN_FILE_TOUCH_INTERVAL,
3628
max: CONAT_OPEN_FILE_TOUCH_INTERVAL,
3629
},
3630
);
3631
};
3632
}
3633
3634
function isCompletePatchStream(dstream) {
3635
if (dstream.length == 0) {
3636
return false;
3637
}
3638
const first = dstream[0];
3639
if (first.is_snapshot) {
3640
return false;
3641
}
3642
if (first.parents == null) {
3643
// first ever commit
3644
return true;
3645
}
3646
for (let i = 1; i < dstream.length; i++) {
3647
if (dstream[i].is_snapshot && dstream[i].time == first.time) {
3648
return true;
3649
}
3650
}
3651
return false;
3652
}
3653
3654