CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/project/sync/server.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
SyncTable server channel -- used for supporting realtime sync
8
between project and browser client.
9
10
TODO:
11
12
- [ ] If initial query fails, need to raise exception. Right now it gets
13
silently swallowed in persistent mode...
14
*/
15
16
// How long to wait from when we hit 0 clients until closing this channel.
17
// Making this short saves memory and cpu.
18
// Making it longer reduces the potential time to open a file, e.g., if you
19
// disconnect then reconnect, e.g., by refreshing your browser.
20
// Related to https://github.com/sagemathinc/cocalc/issues/5627
21
// and https://github.com/sagemathinc/cocalc/issues/5823
22
// and https://github.com/sagemathinc/cocalc/issues/5617
23
24
// This is a hard upper bound on the number of browser sessions that could
25
// have the same file open at once. We put some limit on it, to at least
26
// limit problems from bugs which crash projects (since each connection uses
27
// memory, and it adds up). Some customers want 100+ simultaneous users,
28
// so don't set this too low (except for dev)!
29
const MAX_CONNECTIONS = 500;
30
31
// The frontend client code *should* prevent many connections, but some
32
// old broken clients may not work properly. This must be at least 2,
33
// since we can have two clients for a given channel at once if a file is
34
// being closed still, while it is reopened (e.g., when user does this:
35
// disconnect, change, close, open, reconnect). Also, this setting prevents
36
// some potentially malicious conduct, and also possible new clients with bugs.
37
// It is VERY important that this not be too small, since there is often
38
// a delay/timeout before a channel is properly closed.
39
const MAX_CONNECTIONS_FROM_ONE_CLIENT = 10;
40
41
import {
42
synctable_no_changefeed,
43
synctable_no_database,
44
SyncTable,
45
VersionedChange,
46
set_debug,
47
} from "@cocalc/sync/table";
48
49
// Only uncomment this for an intense level of debugging.
50
// set_debug(true);
51
// @ts-ignore -- typescript nonsense.
52
const _ = set_debug;
53
54
import { init_syncdoc, getSyncDocFromSyncTable } from "./sync-doc";
55
import { key, register_synctable } from "./open-synctables";
56
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
57
import { once } from "@cocalc/util/async-utils";
58
import { delay } from "awaiting";
59
import { close, deep_copy, len } from "@cocalc/util/misc";
60
import { registerListingsTable } from "./listings";
61
import { register_project_info_table } from "./project-info";
62
import { register_project_status_table } from "./project-status";
63
import { register_usage_info_table } from "./usage-info";
64
import type { MergeType } from "@cocalc/sync/table/synctable";
65
import Client from "@cocalc/sync-client";
66
import { getJupyterRedux } from "@cocalc/jupyter/kernel";
67
68
type Query = { [key: string]: any };
69
70
interface Spark {
71
address: { ip: string };
72
id: string;
73
conn: {
74
id: string;
75
write: (obj: any) => boolean;
76
once: (str: string, fn: Function) => void;
77
on: (str: string, fn: Function) => void;
78
writable: boolean;
79
};
80
write: (obj: any) => boolean;
81
end: (...args) => void;
82
on: (str: string, fn: Function) => void;
83
}
84
85
interface Channel {
86
write: (obj: any) => boolean;
87
on: (str: string, fn: Function) => void;
88
forEach: (fn: Function) => void;
89
destroy: Function;
90
}
91
92
interface Primus {
93
channel: (str: string) => Channel;
94
}
95
96
interface Logger {
97
debug: Function;
98
}
99
100
import stringify from "json-stable-stringify";
101
import { sha1 } from "@cocalc/backend/sha1";
102
103
const COCALC_EPHEMERAL_STATE: boolean =
104
process.env.COCALC_EPHEMERAL_STATE === "yes";
105
106
class SyncTableChannel {
107
private synctable: SyncTable;
108
private client: Client;
109
private logger: Logger;
110
public readonly name: string;
111
private query: Query;
112
private options: any[] = [];
113
private query_string: string;
114
private channel: Channel;
115
private closed: boolean = false;
116
private closing: boolean = false;
117
private setOnDisconnect: {
118
[spark_id: string]: { changes: any; merge: MergeType }[];
119
} = {};
120
private num_connections: { n: number; changed: Date } = {
121
n: 0,
122
changed: new Date(),
123
};
124
125
// If true, do not use a database at all, even on the backend.
126
// Table is reset any time this object is created. This is
127
// useful, e.g., for tracking user cursor locations or other
128
// ephemeral state.
129
private ephemeral: boolean = false;
130
131
// If true, do not close even if all clients have disconnected.
132
// This is used to keep sessions running, even when all browsers
133
// have closed, e.g., state for Sage worksheets, jupyter
134
// notebooks, etc., where user may want to close their browser
135
// (or just drop a connection temporarily) while a persistent stateful
136
// session continues running.
137
private persistent: boolean = false;
138
139
private connections_from_one_client: { [id: string]: number } = {};
140
141
constructor({
142
client,
143
primus,
144
query,
145
options,
146
logger,
147
name,
148
}: {
149
client: Client;
150
primus: Primus;
151
name: string;
152
query: Query;
153
options: any;
154
logger: Logger;
155
}) {
156
this.name = name;
157
this.client = client;
158
this.logger = logger;
159
this.query = query;
160
this.init_options(options);
161
if (COCALC_EPHEMERAL_STATE) {
162
// No matter what, we set ephemeral true when
163
// this env var is set, since all db access
164
// will be denied anyways.
165
this.ephemeral = true;
166
}
167
this.query_string = stringify(query); // used only for logging
168
this.channel = primus.channel(this.name);
169
this.log(
170
`creating new sync channel (persistent=${this.persistent}, ephemeral=${this.ephemeral})`,
171
);
172
}
173
174
public async init(): Promise<void> {
175
this.init_handlers();
176
await this.init_synctable();
177
}
178
179
private init_options(options): void {
180
if (options == null) {
181
return;
182
}
183
for (const option of deep_copy(options)) {
184
// deep_copy so do not mutate input options.
185
if (typeof option != "object" || option == null) {
186
throw Error("invalid options");
187
}
188
for (const x of ["ephemeral", "persistent"]) {
189
// options that are only for project websocket tables.
190
if (option[x] != null) {
191
this[x] = option[x];
192
delete option[x];
193
}
194
}
195
if (len(option) > 0) {
196
// remaining synctable/database options.
197
this.options.push(option);
198
}
199
}
200
}
201
202
private log(...args): void {
203
if (this.logger == null) return;
204
this.logger.debug(
205
`SyncTableChannel('${this.name}', '${this.query_string}'${
206
this.closed ? ",CLOSED" : ""
207
}): `,
208
...args,
209
);
210
}
211
212
private init_handlers(): void {
213
this.log("init_handlers");
214
this.channel.on("connection", this.new_connection.bind(this));
215
this.channel.on("disconnection", this.end_connection.bind(this));
216
}
217
218
private async init_synctable(): Promise<void> {
219
this.log("init_synctable");
220
let create_synctable: Function;
221
if (this.ephemeral) {
222
this.log("init_synctable -- ephemeral (no database)");
223
create_synctable = synctable_no_database;
224
} else {
225
this.log("init_synctable -- persistent (but no changefeeds)");
226
create_synctable = synctable_no_changefeed;
227
}
228
this.synctable = create_synctable(this.query, this.options, this.client);
229
230
// if the synctable closes, then the channel should also close.
231
// I think this should happen, e.g., when we "close and halt"
232
// a jupyter notebook, which closes the synctable, triggering this.
233
this.synctable.once("closed", this.close.bind(this));
234
235
if (this.query[this.synctable.get_table()][0].string_id != null) {
236
register_synctable(this.query, this.synctable);
237
}
238
if (this.synctable.table === "syncstrings") {
239
this.log("init_synctable -- syncstrings: also initialize syncdoc...");
240
init_syncdoc(this.client, this.synctable);
241
}
242
243
this.synctable.on(
244
"versioned-changes",
245
this.send_versioned_changes_to_browsers.bind(this),
246
);
247
248
this.log("created synctable -- waiting for connected state");
249
await once(this.synctable, "connected");
250
this.log("created synctable -- now connected");
251
252
// broadcast synctable content to all connected clients.
253
this.broadcast_synctable_to_browsers();
254
}
255
256
private increment_connection_count(spark: Spark): number {
257
// account for new connection from this particular client.
258
let m: undefined | number = this.connections_from_one_client[spark.conn.id];
259
if (m === undefined) m = 0;
260
return (this.connections_from_one_client[spark.conn.id] = m + 1);
261
}
262
263
private decrement_connection_count(spark: Spark): number {
264
const m: undefined | number =
265
this.connections_from_one_client[spark.conn.id];
266
if (m === undefined) {
267
return 0;
268
}
269
return (this.connections_from_one_client[spark.conn.id] = Math.max(
270
0,
271
m - 1,
272
));
273
}
274
275
private async new_connection(spark: Spark): Promise<void> {
276
// Now handle the connection
277
const n = this.num_connections.n + 1;
278
this.num_connections = { n, changed: new Date() };
279
280
// account for new connection from this particular client.
281
const m = this.increment_connection_count(spark);
282
283
this.log(
284
`new connection from (address=${spark.address.ip}, conn=${spark.conn.id}) -- ${spark.id} -- num_connections = ${n} (from this client = ${m})`,
285
);
286
287
if (m > MAX_CONNECTIONS_FROM_ONE_CLIENT) {
288
const error = `Too many connections (${m} > ${MAX_CONNECTIONS_FROM_ONE_CLIENT}) from this client. You might need to refresh your browser.`;
289
this.log(
290
`${error} Waiting 15s, then killing new connection from ${spark.id}...`,
291
);
292
await delay(15000); // minimize impact of client trying again, which it should do...
293
this.decrement_connection_count(spark);
294
spark.end({ error });
295
return;
296
}
297
298
if (n > MAX_CONNECTIONS) {
299
const error = `Too many connections (${n} > ${MAX_CONNECTIONS})`;
300
this.log(
301
`${error} Waiting 5s, then killing new connection from ${spark.id}`,
302
);
303
await delay(5000); // minimize impact of client trying again, which it should do
304
this.decrement_connection_count(spark);
305
spark.end({ error });
306
return;
307
}
308
309
if (this.closed) {
310
this.log(`table closed: killing new connection from ${spark.id}`);
311
this.decrement_connection_count(spark);
312
spark.end();
313
return;
314
}
315
if (this.synctable != null && this.synctable.get_state() == "closed") {
316
this.log(`table state closed: killing new connection from ${spark.id}`);
317
this.decrement_connection_count(spark);
318
spark.end();
319
return;
320
}
321
if (
322
this.synctable != null &&
323
this.synctable.get_state() == "disconnected"
324
) {
325
// Because synctable is being initialized for the first time,
326
// or it temporarily disconnected (e.g., lost hub), and is
327
// trying to reconnect. So just wait for it to connect.
328
await once(this.synctable, "connected");
329
}
330
331
// Now that table is connected, we can send initial mesg to browser
332
// with table state.
333
this.send_synctable_to_browser(spark);
334
335
spark.on("data", async (mesg) => {
336
try {
337
await this.handle_mesg_from_browser(spark, mesg);
338
} catch (err) {
339
spark.write({ error: `error handling mesg -- ${err}` });
340
this.log("error handling mesg -- ", err, err.stack);
341
}
342
});
343
}
344
345
private async end_connection(spark: Spark): Promise<void> {
346
// This should never go below 0 (that would be a bug), but let's
347
// just ewnsure it doesn't since if it did that would weirdly break
348
// things for users as the table would keep trying to close.
349
const n = Math.max(0, this.num_connections.n - 1);
350
this.num_connections = { n, changed: new Date() };
351
352
const m = this.decrement_connection_count(spark);
353
this.log(
354
`spark event -- end connection ${spark.address.ip} -- ${spark.id} -- num_connections = ${n} (from this client = ${m})`,
355
);
356
357
if (!this.closed) {
358
try {
359
const x = this.setOnDisconnect[spark.id];
360
this.log("do setOnDisconnect", x);
361
if (x != null) {
362
for (const { changes, merge } of x) {
363
this.synctable.set(changes, merge);
364
}
365
delete this.setOnDisconnect[spark.id];
366
}
367
} catch (err) {
368
this.log("setOnDisconnect error", err);
369
}
370
}
371
372
this.check_if_should_save_or_close();
373
}
374
375
private send_synctable_to_browser(spark: Spark): void {
376
if (this.closed || this.closing || this.synctable == null) return;
377
this.log("send_synctable_to_browser");
378
spark.write({ init: this.synctable.initial_version_for_browser_client() });
379
}
380
381
private broadcast_synctable_to_browsers(): void {
382
if (this.closed || this.closing || this.synctable == null) return;
383
this.log("broadcast_synctable_to_browsers");
384
const x = { init: this.synctable.initial_version_for_browser_client() };
385
this.channel.write(x);
386
}
387
388
/* This is called when a user disconnects. This always triggers a save to
389
disk. It may also trigger closing the file in some cases. */
390
private async check_if_should_save_or_close() {
391
if (this.closed) {
392
// don't bother if either already closed
393
return;
394
}
395
this.log("check_if_should_save_or_close: save to disk if possible");
396
try {
397
await this.save_if_possible();
398
} catch (err) {
399
// the name "save if possible" suggests this should be non-fatal.
400
this.log(
401
"check_if_should_save_or_close: WARNING: unable to save -- ",
402
err,
403
);
404
}
405
const { n } = this.num_connections ?? {};
406
this.log("check_if_should_save_or_close", { n });
407
if (!this.persistent && n === 0) {
408
this.log("check_if_should_save_or_close: close if possible");
409
await this.close_if_possible();
410
}
411
}
412
413
private async handle_mesg_from_browser(
414
spark: Spark,
415
mesg: any,
416
): Promise<void> {
417
// do not log the actual mesg, since it can be huge and make the logfile dozens of MB.
418
// Temporarily enable as needed for debugging purposes.
419
//this.log("handle_mesg_from_browser ", { mesg });
420
if (this.closed) {
421
throw Error("received mesg from browser AFTER close");
422
}
423
if (mesg == null) {
424
throw Error("mesg must not be null");
425
}
426
if (mesg.event == "set-on-disconnect") {
427
this.log("set-on-disconnect", mesg, spark.id);
428
if (this.setOnDisconnect[spark.id] == null) {
429
this.setOnDisconnect[spark.id] = [];
430
}
431
this.setOnDisconnect[spark.id].push(mesg);
432
return;
433
}
434
if (mesg.event == "message") {
435
// generic messages from any client to the project can be
436
// handled by backend code by listening for message events.
437
this.synctable.emit("message", {
438
data: mesg.data,
439
spark,
440
channel: this.channel,
441
});
442
return;
443
}
444
if (mesg.timed_changes != null) {
445
this.synctable.apply_changes_from_browser_client(mesg.timed_changes);
446
}
447
await this.synctable.save();
448
}
449
450
private send_versioned_changes_to_browsers(
451
versioned_changes: VersionedChange[],
452
): void {
453
if (this.closed) return;
454
this.log("send_versioned_changes_to_browsers");
455
const x = { versioned_changes };
456
this.channel.write(x);
457
}
458
459
private async save_if_possible(): Promise<void> {
460
if (this.closed || this.closing) {
461
return; // closing or already closed
462
}
463
this.log("save_if_possible: saves changes to database");
464
await this.synctable.save();
465
if (this.synctable.table === "syncstrings") {
466
this.log("save_if_possible: also fetch syncdoc");
467
const syncdoc = getSyncDocFromSyncTable(this.synctable);
468
if (syncdoc != null) {
469
const path = syncdoc.get_path();
470
this.log("save_if_possible: saving syncdoc to disk", { path });
471
if (path.endsWith(".sage-jupyter2")) {
472
// treat jupyter notebooks in a special way, since they have
473
// an aux .ipynb file that the syncdoc doesn't know about. In
474
// this case we save the ipynb to disk, not just the hidden
475
// syncdb file.
476
const { actions } = await getJupyterRedux(syncdoc);
477
if (actions == null) {
478
this.log("save_if_possible: jupyter -- actions is null");
479
} else {
480
if (!actions.isCellRunner()) {
481
this.log("save_if_possible: jupyter -- not cell runner");
482
return;
483
}
484
this.log("save_if_possible: jupyter -- saving to ipynb");
485
await actions.save_ipynb_file();
486
}
487
}
488
await syncdoc.save_to_disk();
489
} else {
490
this.log("save_if_possible: no syncdoc");
491
}
492
}
493
}
494
495
private async close_if_possible(): Promise<void> {
496
if (this.closed || this.closing) {
497
return; // closing or already closed
498
}
499
const { n, changed } = this.num_connections;
500
const delay = Date.now() - changed.valueOf();
501
this.log(
502
`close_if_possible: there are ${n} connections and delay=${delay}`,
503
);
504
if (n === 0) {
505
this.log(`close_if_possible: close this SyncTableChannel atomically`);
506
// actually close
507
this.close();
508
} else {
509
this.log(`close_if_possible: NOT closing this SyncTableChannel`);
510
}
511
}
512
513
private close(): void {
514
if (this.closed) {
515
return;
516
}
517
this.log("close: closing");
518
this.closing = true;
519
delete synctable_channels[this.name];
520
this.channel.destroy();
521
this.synctable.close_no_async();
522
this.log("close: closed");
523
close(this); // don't call this.log after this!
524
this.closed = true;
525
}
526
527
public get_synctable(): SyncTable {
528
return this.synctable;
529
}
530
}
531
532
const synctable_channels: { [name: string]: SyncTableChannel } = {};
533
534
function createKey(args): string {
535
return stringify([args[3], args[4]]);
536
}
537
538
function channel_name(query: any, options: any[]): string {
539
// stable identifier to this query + options across
540
// project restart, etc. We first make the options
541
// as canonical as we can:
542
const opts = {};
543
for (const x of options) {
544
for (const key in x) {
545
opts[key] = x[key];
546
}
547
}
548
// It's critical that we dedup the synctables having
549
// to do with sync-doc's. A problem case is multiple
550
// queries for the same table, due to the time cutoff
551
// for patches after making a snapshot.
552
let q: string;
553
try {
554
q = key(query);
555
} catch {
556
// throws an error if the table doesn't have a string_id;
557
// that's fine - in this case, just make a key out of the query.
558
q = query;
559
}
560
const y = stringify([q, opts]);
561
const s = sha1(y);
562
return `sync:${s}`;
563
}
564
565
async function synctable_channel0(
566
client: any,
567
primus: any,
568
logger: any,
569
query: any,
570
options: any[],
571
): Promise<string> {
572
const name = channel_name(query, options);
573
logger.debug("synctable_channel", JSON.stringify(query), name);
574
if (synctable_channels[name] === undefined) {
575
synctable_channels[name] = new SyncTableChannel({
576
client,
577
primus,
578
name,
579
query,
580
options,
581
logger,
582
});
583
await synctable_channels[name].init();
584
if (query?.listings != null) {
585
registerListingsTable(synctable_channels[name].get_synctable(), query);
586
} else if (query?.project_info != null) {
587
register_project_info_table(
588
synctable_channels[name].get_synctable(),
589
logger,
590
client.client_id(),
591
);
592
} else if (query?.project_status != null) {
593
register_project_status_table(
594
synctable_channels[name].get_synctable(),
595
logger,
596
client.client_id(),
597
);
598
} else if (query?.usage_info != null) {
599
register_usage_info_table(
600
synctable_channels[name].get_synctable(),
601
client.client_id(),
602
);
603
}
604
}
605
return name;
606
}
607
608
export const synctable_channel = reuseInFlight(synctable_channel0, {
609
createKey,
610
});
611
612