CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/sync/client/synctable-project.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Synctable that uses the project websocket rather than the database.
8
*/
9
10
import { delay } from "awaiting";
11
12
import { SyncTable, synctable_no_database } from "@cocalc/sync/table";
13
import { once, retry_until_success } from "@cocalc/util/async-utils";
14
import { assertDefined } from "@cocalc/util/misc";
15
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
16
import type { AppClient } from "./types";
17
18
// Always wait at least this long between connect attempts. This
19
// avoids flooding the project with connection requests if, e.g., the
20
// client limit for a particular file is reached.
21
const MIN_CONNECT_WAIT_MS = 5000;
22
23
interface Options {
24
project_id: string;
25
query: object;
26
options: any[];
27
client: AppClient;
28
throttle_changes?: undefined | number;
29
id: string;
30
}
31
32
import { EventEmitter } from "events";
33
34
class SyncTableChannel extends EventEmitter {
35
public synctable?: SyncTable;
36
private project_id: string;
37
private client: AppClient;
38
private channel?: any;
39
private websocket?: any;
40
private query: any;
41
private options: any;
42
private key: string;
43
44
private last_connect: number = 0;
45
46
private connected: boolean = false;
47
48
constructor(opts: Options) {
49
super();
50
const { project_id, query, options, client, throttle_changes } = opts;
51
if (query == null) {
52
throw Error("query must be defined");
53
}
54
if (options == null) {
55
throw Error("options must be defined");
56
}
57
this.key = key(opts);
58
this.synctable = synctable_no_database(
59
query,
60
options,
61
client,
62
throttle_changes,
63
[],
64
project_id,
65
);
66
(this.synctable as any).channel = this; // for debugging
67
this.synctable.setOnDisconnect = (changes, merge) => {
68
this.send_mesg_to_project({ event: "set-on-disconnect", changes, merge });
69
};
70
this.synctable.sendMessageToProject = (data) => {
71
this.send_mesg_to_project({ event: "message", data });
72
};
73
this.project_id = project_id;
74
this.client = client;
75
this.query = query;
76
this.options = options;
77
this.init_synctable_handlers();
78
79
this.connect = reuseInFlight(this.connect.bind(this));
80
this.log = this.log.bind(this);
81
this.connect();
82
}
83
84
public is_connected(): boolean {
85
return this.connected;
86
}
87
88
private log = (..._args) => {
89
//console.log("SyncTableChannel", this.query, ..._args);
90
};
91
92
private async connect(): Promise<void> {
93
this.log("connect...");
94
if (this.synctable == null) return;
95
this.set_connected(false);
96
this.clean_up_sockets();
97
98
const time_since_last_connect = Date.now() - this.last_connect;
99
if (time_since_last_connect < MIN_CONNECT_WAIT_MS) {
100
// Last attempt to connect was very recent, so we wait a little before
101
// trying again.
102
await delay(MIN_CONNECT_WAIT_MS - time_since_last_connect);
103
}
104
105
await retry_until_success({
106
max_delay: 5000,
107
f: this.attempt_to_connect.bind(this),
108
desc: "webapp-synctable-connect",
109
log: this.log,
110
});
111
112
this.last_connect = Date.now();
113
}
114
115
private set_connected(connected: boolean): void {
116
if (this.synctable == null) return;
117
this.log("set_connected", connected);
118
this.connected = connected;
119
if (this.synctable.client.set_connected != null) {
120
this.synctable.client.set_connected(connected);
121
}
122
if (connected) {
123
this.emit("connected");
124
} else {
125
this.emit("disconnected");
126
}
127
}
128
// Various things could go wrong, e.g., the websocket breaking
129
// while trying to get the api synctable_channel, touch
130
// project might time out, etc.
131
private async attempt_to_connect(): Promise<void> {
132
// Start with fresh websocket and channel -- old one may be dead.
133
this.clean_up_sockets();
134
// touch_project mainly makes sure that some hub is connected to
135
// the project, so the project can do DB queries. Also
136
// starts the project.
137
this.client.touch_project(this.project_id);
138
// Get a websocket.
139
this.websocket = await this.client.project_client.websocket(
140
this.project_id,
141
);
142
if (this.websocket.state != "online") {
143
// give websocket state one chance to change.
144
// It could change to destroyed or online.
145
this.log(
146
"wait for websocket to connect since state is",
147
this.websocket.state,
148
);
149
await once(this.websocket, "state");
150
}
151
if (this.websocket.state != "online") {
152
// Already offline... let's try again from the top.
153
this.log("websocket failed");
154
throw Error("websocket went offline already");
155
}
156
157
this.log("Get a channel");
158
const api = await this.client.project_client.api(this.project_id);
159
this.channel = await api.synctable_channel(this.query, this.options);
160
161
if (this.websocket.state != "online") {
162
// Already offline... let's try again from the top.
163
throw Error("websocket went offline already");
164
}
165
166
this.channel.on("data", this.handle_mesg_from_project.bind(this));
167
this.websocket.on("offline", this.connect);
168
this.channel.on("close", this.connect);
169
}
170
171
private init_synctable_handlers(): void {
172
assertDefined(this.synctable);
173
this.synctable.on("timed-changes", (timed_changes) => {
174
this.send_mesg_to_project({ timed_changes });
175
});
176
this.synctable.once("closed", this.close.bind(this));
177
}
178
179
private clean_up_sockets(): void {
180
if (this.channel != null) {
181
this.channel.removeListener("close", this.connect);
182
183
// Explicitly emit end -- this is a hack,
184
// since this is the only way to force the
185
// channel clean-up code to run in primus-multiplex,
186
// and it gets run async later if we don't do this.
187
// TODO: rewrite primus-multiplex from scratch.
188
this.channel.emit("end");
189
190
try {
191
this.channel.end();
192
} catch (err) {
193
// no op -- this does happen if channel.conn is destroyed
194
}
195
delete this.channel;
196
}
197
198
if (this.websocket != null) {
199
this.websocket.removeListener("offline", this.connect);
200
delete this.websocket;
201
}
202
}
203
204
private async close(): Promise<void> {
205
delete cache[this.key];
206
this.clean_up_sockets();
207
if (this.synctable != null) {
208
const s = this.synctable;
209
delete this.synctable;
210
await s.close();
211
}
212
}
213
214
private handle_mesg_from_project(mesg): void {
215
this.log("project --> client: ", mesg);
216
if (this.synctable == null) {
217
this.log("project --> client: NO SYNCTABLE");
218
return; // can happen during close
219
}
220
if (mesg == null) {
221
throw Error("mesg must not be null");
222
}
223
if (mesg.error != null) {
224
const { alert_message } = this.client;
225
const message = `Error opening file -- ${
226
mesg.error
227
} -- wait, restart your project or refresh your browser. Query=${JSON.stringify(
228
this.query,
229
)}`;
230
if (alert_message != null) {
231
alert_message({ type: "info", message, timeout: 10 });
232
} else {
233
console.warn(message);
234
}
235
}
236
if (mesg.event == "message") {
237
this.synctable.emit("message", mesg.data);
238
return;
239
}
240
if (mesg.init != null) {
241
this.log("project --> client: init_browser_client");
242
this.synctable.init_browser_client(mesg.init);
243
// after init message, we are now initialized
244
// and in the connected state.
245
this.set_connected(true);
246
}
247
if (mesg.versioned_changes != null) {
248
this.log("project --> client: versioned_changes");
249
this.synctable.apply_changes_to_browser_client(mesg.versioned_changes);
250
}
251
}
252
253
private send_mesg_to_project(mesg): void {
254
this.log("project <-- client: ", mesg);
255
if (!this.connected) {
256
throw Error("must be connected");
257
}
258
if (this.websocket == null) {
259
throw Error("websocket must not be null");
260
}
261
if (this.channel == null) {
262
throw Error("channel must not be null");
263
}
264
if (this.websocket.state != "online") {
265
throw Error(
266
`websocket state must be online but it is '${this.websocket.state}'`,
267
);
268
}
269
this.channel.write(mesg);
270
}
271
}
272
273
// We use a cache to ensure there is at most one synctable
274
// at a time with given defining parameters. This is just
275
// for efficiency and sanity, so we use JSON.stringify instead
276
// of a guranteed stable json.
277
const cache: { [key: string]: SyncTableChannel } = {};
278
279
// ONLY uncomment when developing!
280
// (window as any).channel_cache = cache;
281
282
// The id here is so that the synctables and channels are unique
283
// **for a given syncdoc**. There can be multiple syncdocs for
284
// the same underlying project_id/path, e.g.,
285
// - when timetravel and a document are both open at the same time,
286
// - when a document is closing (and saving offline changes) at the
287
// same time that it is being opened; to see this disconnect from
288
// the network, make changes, clocse the file tab, then open it
289
// again, and reconnect to the network.
290
// See https://github.com/sagemathinc/cocalc/issues/3595 for why this
291
// opts.id below is so important. I tried several different approaches,
292
// and this is the best by far.
293
function key(opts: Options): string {
294
return `${opts.id}-${opts.project_id}-${JSON.stringify(
295
opts.query,
296
)}-${JSON.stringify(opts.options)}`;
297
}
298
299
// NOTE: This function can be called by a LOT of different things at once whenever
300
// waiting to connect to a project. The "await once" inside it creates
301
// a listener on SyncTableChannel, and there is a limit on the number of
302
// those you can create without raising a limit (that was appearing in the
303
// console log a lot). Thus our use of reuseInFlight to prevent this.
304
async function synctable_project0(opts: Options): Promise<SyncTable> {
305
const k = key(opts);
306
// console.log("key = ", k);
307
let t;
308
if (cache[k] !== undefined) {
309
t = cache[k];
310
} else {
311
t = new SyncTableChannel(opts);
312
cache[k] = t;
313
}
314
if (!t.is_connected()) {
315
await once(t, "connected");
316
}
317
return t.synctable;
318
}
319
320
const synctable_project = reuseInFlight(synctable_project0, {
321
createKey: (args) =>
322
JSON.stringify([args[0].project_id, args[0].query, args[0].options]),
323
});
324
325
export default synctable_project;
326
327