CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/frontend/client/hub.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
import { callback, delay } from "awaiting";
7
import { throttle } from "lodash";
8
import type { WebappClient } from "./client";
9
import { delete_cookie } from "../misc/cookies";
10
import {
11
copy_without,
12
from_json_socket,
13
to_json_socket,
14
defaults,
15
required,
16
uuid,
17
} from "@cocalc/util/misc";
18
import * as message from "@cocalc/util/message";
19
import {
20
do_anonymous_setup,
21
should_do_anonymous_setup,
22
} from "./anonymous-setup";
23
import {
24
deleteRememberMe,
25
setRememberMe,
26
} from "@cocalc/frontend/misc/remember-me";
27
import { appBasePath } from "@cocalc/frontend/customize/app-base-path";
28
29
// Maximum number of outstanding concurrent messages (that have responses)
30
// to send at once to hub-websocket.
31
const MAX_CONCURRENT: number = 17;
32
33
// just define what we need for code sanity
34
interface PrimusConnection {
35
write: (arg0: string) => void;
36
open: () => void;
37
end: () => void;
38
latency?: number;
39
}
40
41
export interface MessageInfo {
42
count: number;
43
sent: number;
44
sent_length: number;
45
recv: number;
46
recv_length: number;
47
enqueued: number;
48
max_concurrent: number;
49
}
50
51
export class HubClient {
52
private client: WebappClient;
53
private conn?: PrimusConnection;
54
55
private connected: boolean = false;
56
private connection_is_totally_dead: boolean = false;
57
private num_attempts: number = 0;
58
private signed_in: boolean = false;
59
private signed_in_time: number = 0;
60
private signed_in_mesg: object;
61
62
private call_callbacks: {
63
[id: string]: {
64
timeout?: any;
65
error_event: boolean;
66
first: boolean;
67
cb: Function;
68
};
69
} = {};
70
71
private mesg_data: {
72
queue: any[];
73
count: number;
74
sent: number;
75
sent_length: number;
76
recv: number;
77
recv_length: number;
78
} = {
79
queue: [], // messages in the queue to send
80
count: 0, // number of message currently outstanding
81
sent: 0, // total number of messages sent to backend.
82
sent_length: 0, // total amount of data sent
83
recv: 0, // number of messages received from backend
84
recv_length: 0,
85
};
86
87
constructor(client: WebappClient) {
88
this.client = client;
89
90
/* We heavily throttle this, since it's ONLY used for the connections
91
dialog, which users never look at, and it could waste cpu trying to
92
update things for no reason. It also impacts the color of the
93
connection indicator, so throttling will make that color change a
94
bit more laggy. That's probably worth it. */
95
this.emit_mesg_data = throttle(this.emit_mesg_data.bind(this), 2000);
96
97
// never attempt to reconnect more than once per 10s, no matter what.
98
this.reconnect = throttle(this.reconnect.bind(this), 10000);
99
100
// Start attempting to connect to a hub.
101
this.init_hub_websocket();
102
}
103
104
private emit_mesg_data(): void {
105
const info: MessageInfo = copy_without(this.mesg_data, ["queue"]) as any;
106
info.enqueued = this.mesg_data.queue.length;
107
info.max_concurrent = MAX_CONCURRENT;
108
this.client.emit("mesg_info", info);
109
}
110
111
public get_num_attempts(): number {
112
return this.num_attempts;
113
}
114
115
public send(mesg: object): void {
116
//console.log("send at #{misc.mswalltime()}", mesg)
117
const data = to_json_socket(mesg);
118
this.mesg_data.sent_length += data.length;
119
this.emit_mesg_data();
120
this.write_data(data);
121
}
122
123
private write_data(data: string): void {
124
if (this.conn == null) {
125
console.warn(
126
"HubClient.write_data: can't write data since not connected",
127
);
128
return;
129
}
130
try {
131
this.conn.write(data);
132
} catch (err) {
133
console.warn("HubClient.write_data", err);
134
}
135
}
136
137
private delete_websocket_cookie(): void {
138
delete_cookie("SMCSERVERID3");
139
}
140
141
public is_signed_in(): boolean {
142
return this.is_connected() && !!this.signed_in;
143
}
144
145
public set_signed_in(): void {
146
this.signed_in = true;
147
}
148
149
public set_signed_out(): void {
150
this.signed_in = false;
151
}
152
153
public get_signed_in_time(): number {
154
return this.signed_in_time;
155
}
156
157
public get_signed_in_mesg(): object {
158
return this.signed_in_mesg;
159
}
160
161
public is_connected(): boolean {
162
return !!this.connected;
163
}
164
165
public reconnect(): void {
166
if (this.connection_is_totally_dead) {
167
// CRITICAL: See https://github.com/primus/primus#primusopen !
168
this.conn?.open();
169
}
170
}
171
172
public disconnect(): void {
173
if (this.connected) {
174
this.conn?.end();
175
}
176
}
177
178
private ondata(data: string): void {
179
//console.log("got #{data.length} of data")
180
this.mesg_data.recv += 1;
181
this.mesg_data.recv_length += data.length;
182
this.emit_mesg_data();
183
this.handle_json_data(data);
184
}
185
186
private async handle_json_data(data: string): Promise<void> {
187
this.emit_mesg_data();
188
const mesg = from_json_socket(data);
189
// console.log(`handle_json_data: ${data}`);
190
switch (mesg.event) {
191
case "cookies":
192
try {
193
await this.client.account_client.cookies(mesg);
194
} catch (err) {
195
console.warn("Error handling cookie ", mesg, err);
196
}
197
break;
198
199
case "signed_in":
200
this.client.account_id = mesg.account_id;
201
this.set_signed_in();
202
this.signed_in_time = Date.now();
203
setRememberMe(appBasePath);
204
this.signed_in_mesg = mesg;
205
this.client.emit("signed_in", mesg);
206
break;
207
208
case "remember_me_failed":
209
deleteRememberMe(appBasePath);
210
this.client.emit(mesg.event, mesg);
211
break;
212
213
case "version":
214
this.client.emit("new_version", {
215
version: mesg.version,
216
min_version: mesg.min_version,
217
});
218
break;
219
220
case "error":
221
// An error that isn't tagged with an id -- some sort of general problem.
222
if (mesg.id == null) {
223
console.log(`WARNING: ${JSON.stringify(mesg.error)}`);
224
return;
225
}
226
break;
227
228
case "start_metrics":
229
this.client.emit("start_metrics", mesg.interval_s);
230
break;
231
}
232
233
// the call f(null, mesg) below can mutate mesg (!), so we better save the id here.
234
const { id } = mesg;
235
const v = this.call_callbacks[id];
236
if (v != null) {
237
const { cb, error_event } = v;
238
v.first = false;
239
if (error_event && mesg.event === "error") {
240
if (!mesg.error) {
241
// make sure mesg.error is set to something.
242
mesg.error = "error";
243
}
244
cb(mesg.error);
245
} else {
246
cb(undefined, mesg);
247
}
248
if (!mesg.multi_response) {
249
delete this.call_callbacks[id];
250
}
251
}
252
}
253
254
private do_call(opts: any, cb: Function): void {
255
if (opts.cb == null) {
256
// console.log("no opts.cb", opts.message)
257
// A call to the backend, but where we do not wait for a response.
258
// In order to maintain at least roughly our limit on MAX_CONCURRENT,
259
// we simply pretend that this message takes about 150ms
260
// to complete. This helps space things out so the server can
261
// handle requests properly, instead of just discarding them (be nice
262
// to the backend and it will be nice to you).
263
this.send(opts.message);
264
setTimeout(cb, 150);
265
return;
266
}
267
if (opts.message.id == null) {
268
// Assign a uuid (usually we do this)
269
opts.message.id = uuid();
270
}
271
const { id } = opts.message;
272
let called_cb: boolean = false;
273
if (this.call_callbacks[id] != null) {
274
// User is requesting to send a message with the same id as
275
// a currently outstanding message. This typically happens
276
// when disconnecting and reconnecting. It's critical to
277
// clear up the existing call before overwritting
278
// call_callbacks[id]. The point is the message id's are
279
// NOT at all guaranteed to be random.
280
this.clear_call(id);
281
}
282
283
this.call_callbacks[id] = {
284
cb: (...args) => {
285
if (!called_cb) {
286
called_cb = true;
287
cb();
288
}
289
// NOTE: opts.cb is always defined since otherwise
290
// we would have exited above.
291
if (opts.cb != null) {
292
opts.cb(...args);
293
}
294
},
295
error_event: !!opts.error_event,
296
first: true,
297
};
298
299
this.send(opts.message);
300
301
if (opts.timeout) {
302
this.call_callbacks[id].timeout = setTimeout(() => {
303
if (this.call_callbacks[id] == null || this.call_callbacks[id].first) {
304
const error = "Timeout after " + opts.timeout + " seconds";
305
if (!called_cb) {
306
called_cb = true;
307
cb();
308
}
309
if (opts.cb != null) {
310
opts.cb(error, message.error({ id, error }));
311
}
312
delete this.call_callbacks[id];
313
}
314
}, opts.timeout * 1000);
315
} else {
316
// IMPORTANT: No matter what, we call cb within 60s; if we don't do this then
317
// in case opts.timeout isn't set but opts.cb is, but user disconnects,
318
// then cb would never get called, which throws off our call counter.
319
// Note that the input to cb doesn't matter.
320
const f = () => {
321
if (!called_cb) {
322
called_cb = true;
323
cb();
324
}
325
};
326
this.call_callbacks[id].timeout = setTimeout(f, 60 * 1000);
327
}
328
}
329
330
public call(opts: any): void {
331
// This function:
332
// * Modifies the message by adding an id attribute with a random uuid value
333
// * Sends the message to the hub
334
// * When message comes back with that id, call the callback and delete it (if cb opts.cb is defined)
335
// The message will not be seen by @handle_message.
336
// * If the timeout is reached before any messages come back, delete the callback and stop listening.
337
// However, if the message later arrives it may still be handled by @handle_message.
338
opts = defaults(opts, {
339
message: required,
340
timeout: undefined,
341
error_event: false, // if true, turn error events into just a normal err
342
allow_post: undefined, // TODO: deprecated -- completely ignored and not used in any way.
343
cb: undefined,
344
});
345
if (!this.is_connected()) {
346
if (opts.cb != null) {
347
opts.cb("not connected");
348
}
349
return;
350
}
351
this.mesg_data.queue.push(opts);
352
this.mesg_data.sent += 1;
353
this.update_calls();
354
}
355
356
// like call above, but async and error_event defaults to TRUE,
357
// so an exception is raised on resp messages that have event='error'.
358
359
public async async_call(opts: any): Promise<any> {
360
const f = (cb) => {
361
opts.cb = cb;
362
this.call(opts);
363
};
364
if (opts.error_event == null) {
365
opts.error_event = true;
366
}
367
return await callback(f);
368
}
369
370
private update_calls(): void {
371
while (
372
this.mesg_data.queue.length > 0 &&
373
this.mesg_data.count < MAX_CONCURRENT
374
) {
375
this.process_next_call();
376
}
377
}
378
379
private process_next_call(): void {
380
if (this.mesg_data.queue.length === 0) {
381
return;
382
}
383
this.mesg_data.count += 1;
384
const opts = this.mesg_data.queue.shift();
385
this.emit_mesg_data();
386
this.do_call(opts, () => {
387
this.mesg_data.count -= 1;
388
this.emit_mesg_data();
389
this.update_calls();
390
});
391
}
392
393
private clear_call(id: string): void {
394
const obj = this.call_callbacks[id];
395
if (obj == null) return;
396
delete this.call_callbacks[id];
397
obj.cb("disconnect");
398
if (obj.timeout) {
399
clearTimeout(obj.timeout);
400
delete obj.timeout;
401
}
402
}
403
404
private clear_call_queue(): void {
405
for (const id in this.call_callbacks) {
406
this.clear_call(id);
407
}
408
this.emit_mesg_data();
409
}
410
411
private async init_hub_websocket(): Promise<void> {
412
const log = (...mesg) => console.log("hub_websocket -", ...mesg);
413
log("connect");
414
this.client.emit("connecting");
415
416
this.client.on("connected", () => {
417
this.send_version();
418
// Any outstanding calls made before connecting happened
419
// can't possibly succeed, so we clear all outstanding messages.
420
this.clear_call_queue();
421
});
422
423
this.delete_websocket_cookie();
424
// Important: window.Primus is usually defined when we get to the point
425
// of running this code. However, sometimes it doesn't -- timing is random
426
// and whether it is defined here depends on a hub being available to
427
// serve it up. So we just keep trying until it is defined.
428
// There is no need to back off or delay, since we aren't
429
// actually doing anything at all here in terms of work!
430
log("Loading global websocket client library from hub-websocket...");
431
while (window.Primus == null) {
432
await delay(200);
433
}
434
log(
435
"Loaded global websocket library! Now creating websocket connection to hub-websocket...",
436
);
437
const conn = (this.conn = new window.Primus({
438
reconnect: {
439
max: 30000,
440
min: 3000,
441
retries: 1000,
442
},
443
}));
444
445
conn.on("open", async () => {
446
this.connected = true;
447
this.connection_is_totally_dead = false;
448
this.client.emit("connected");
449
log("connected");
450
this.num_attempts = 0;
451
452
conn.removeAllListeners("data");
453
conn.on("data", this.ondata.bind(this));
454
455
if (should_do_anonymous_setup()) {
456
do_anonymous_setup(this.client);
457
}
458
});
459
460
conn.on("outgoing::open", () => {
461
log("connecting");
462
this.client.emit("connecting");
463
});
464
465
conn.on("offline", () => {
466
log("offline");
467
this.connected = this.signed_in = false;
468
this.client.emit("disconnected", "offline");
469
});
470
471
conn.on("online", () => {
472
log("online");
473
});
474
475
conn.on("message", (evt) => {
476
this.ondata(evt.data);
477
});
478
479
conn.on("error", (err) => {
480
log("error: ", err);
481
});
482
// NOTE: we do NOT emit an error event in this case! See
483
// https://github.com/sagemathinc/cocalc/issues/1819
484
// for extensive discussion.
485
486
conn.on("close", () => {
487
log("closed");
488
this.connected = this.signed_in = false;
489
this.client.emit("disconnected", "close");
490
});
491
492
conn.on("end", () => {
493
this.connection_is_totally_dead = true;
494
});
495
496
conn.on("reconnect scheduled", (opts) => {
497
this.num_attempts = opts.attempt;
498
// This just informs everybody that we *are* disconnected.
499
this.client.emit("disconnected", "close");
500
conn.removeAllListeners("data");
501
this.delete_websocket_cookie();
502
log(
503
`reconnect scheduled (attempt ${opts.attempt} out of ${opts.retries})`,
504
);
505
});
506
507
conn.on("reconnect", () => {
508
this.client.emit("connecting");
509
});
510
}
511
512
private send_version(): void {
513
this.send(message.version({ version: this.client.version() }));
514
}
515
516
public fix_connection(): void {
517
this.delete_websocket_cookie();
518
this.conn?.end();
519
this.conn?.open();
520
}
521
522
public latency(): number | void {
523
if (this.connected) {
524
return this.conn?.latency;
525
}
526
}
527
}
528
529