CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/project/client.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2023 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
client.ts -- A project viewed as a client for a hub.
8
9
For security reasons, a project does initiate a TCP connection to a hub,
10
but rather hubs initiate TCP connections to projects:
11
12
* MINUS: This makes various things more complicated, e.g., a project
13
might not have any open connection to a hub, but still "want" to write
14
something to the database; in such a case it is simply out of luck
15
and must wait.
16
17
* PLUS: Security is simpler since a hub initiates the connection to
18
a project. A hub doesn't have to receive TCP connections and decide
19
whether or not to trust what is on the other end of those connections.
20
21
That said, this architecture could change, and very little code would change
22
as a result.
23
*/
24
import EventEmitter from "node:events";
25
import fs from "node:fs";
26
import { join } from "node:path";
27
import { FileSystemClient } from "@cocalc/sync-client/lib/client-fs";
28
import { execute_code, uuidsha1 } from "@cocalc/backend/misc_node";
29
import { CoCalcSocket } from "@cocalc/backend/tcp/enable-messaging-protocol";
30
import { SyncDoc } from "@cocalc/sync/editor/generic/sync-doc";
31
import type { ProjectClient as ProjectClientInterface } from "@cocalc/sync/editor/generic/types";
32
import { SyncString } from "@cocalc/sync/editor/string/sync";
33
import * as synctable2 from "@cocalc/sync/table";
34
import { callback2, once } from "@cocalc/util/async-utils";
35
import { PROJECT_HUB_HEARTBEAT_INTERVAL_S } from "@cocalc/util/heartbeat";
36
import * as message from "@cocalc/util/message";
37
import * as misc from "@cocalc/util/misc";
38
import type { CB } from "@cocalc/util/types/callback";
39
import type { ExecuteCodeOptionsWithCallback } from "@cocalc/util/types/execute-code";
40
import * as blobs from "./blobs";
41
import { symmetric_channel } from "./browser-websocket/symmetric_channel";
42
import { json } from "./common";
43
import * as data from "./data";
44
import initJupyter from "./jupyter/init";
45
import * as kucalc from "./kucalc";
46
import { getLogger } from "./logger";
47
import * as sage_session from "./sage_session";
48
import { getListingsTable } from "@cocalc/project/sync/listings";
49
import { get_synctable } from "./sync/open-synctables";
50
import { get_syncdoc } from "./sync/sync-doc";
51
52
const winston = getLogger("client");
53
54
const HOME = process.env.HOME ?? "/home/user";
55
56
let DEBUG = false;
57
// Easy way to enable debugging in any project anywhere.
58
const DEBUG_FILE = join(HOME, ".smc-DEBUG");
59
if (fs.existsSync(DEBUG_FILE)) {
60
DEBUG = true;
61
} else if (kucalc.IN_KUCALC) {
62
// always make verbose in kucalc, since logs are taken care of by the k8s
63
// logging infrastructure...
64
DEBUG = true;
65
} else {
66
winston.info(
67
"create this file to enable very verbose debugging:",
68
DEBUG_FILE,
69
);
70
}
71
72
winston.info(`DEBUG = ${DEBUG}`);
73
if (!DEBUG) {
74
winston.info(`create ${DEBUG_FILE} for much more verbose logging`);
75
}
76
77
let client: Client;
78
79
export function init() {
80
if (client != null) {
81
throw Error("BUG: Client already initialized!");
82
}
83
client = new Client();
84
return client;
85
}
86
87
export function getClient(): Client {
88
if (client == null) {
89
throw Error("BUG: Client not initialized!");
90
}
91
return client;
92
}
93
94
let ALREADY_CREATED = false;
95
96
type HubCB = CB<any, { event: "error"; error?: string }>;
97
98
export class Client extends EventEmitter implements ProjectClientInterface {
99
private project_id: string;
100
private _connected: boolean;
101
102
private _hub_callbacks: {
103
[key: string]: HubCB;
104
};
105
private _hub_client_sockets: {
106
[id: string]: {
107
socket: CoCalcSocket;
108
callbacks?: { [id: string]: HubCB | CB<any, string> };
109
activity: Date;
110
};
111
};
112
private _changefeed_sockets: any;
113
private _open_syncstrings?: { [key: string]: SyncString };
114
115
// use to define a logging function that is cleanly used internally
116
dbg = (f: string) => {
117
if (DEBUG && winston) {
118
return (...m) => {
119
return winston.debug(`Client.${f}`, ...m);
120
};
121
} else {
122
return function (..._) {};
123
}
124
};
125
126
private filesystemClient = new FileSystemClient();
127
write_file = this.filesystemClient.write_file;
128
path_read = this.filesystemClient.path_read;
129
path_stat = this.filesystemClient.path_stat;
130
file_size_async = this.filesystemClient.file_size_async;
131
file_stat_async = this.filesystemClient.file_stat_async;
132
watch_file = this.filesystemClient.watch_file;
133
134
constructor() {
135
super();
136
if (ALREADY_CREATED) {
137
throw Error("BUG: Client already created!");
138
}
139
ALREADY_CREATED = true;
140
this.project_id = data.project_id;
141
this.dbg("constructor")();
142
this.setMaxListeners(300); // every open file/table/sync db listens for connect event, which adds up.
143
// initialize two caches
144
this._hub_callbacks = {};
145
this._hub_client_sockets = {};
146
this._changefeed_sockets = {};
147
this._connected = false;
148
149
// Start listening for syncstrings that have been recently modified, so that we
150
// can open them and provide filesystem and computational support.
151
// TODO: delete this code.
152
//# @_init_recent_syncstrings_table()
153
154
if (kucalc.IN_KUCALC) {
155
kucalc.init(this);
156
}
157
158
misc.bind_methods(this);
159
160
initJupyter();
161
}
162
163
public alert_message({
164
type = "default",
165
title,
166
message,
167
}: {
168
type?: "default";
169
title?: string;
170
message: string;
171
block?: boolean;
172
timeout?: number; // time in seconds
173
}): void {
174
this.dbg("alert_message")(type, title, message);
175
}
176
177
// todo: more could be closed...
178
public close(): void {
179
if (this._open_syncstrings != null) {
180
const object = misc.keys(this._open_syncstrings);
181
for (let _ in object) {
182
const s = this._open_syncstrings[_];
183
s.close();
184
}
185
delete this._open_syncstrings;
186
}
187
//return clearInterval(this._recent_syncstrings_interval);
188
}
189
190
// account_id or project_id of this client
191
public client_id(): string {
192
return this.project_id;
193
}
194
195
public get_project_id(): string {
196
return this.project_id;
197
}
198
199
// true since this client is a project
200
public is_project(): boolean {
201
return true;
202
}
203
204
public is_browser(): boolean {
205
return false;
206
}
207
208
public is_compute_server(): boolean {
209
return false;
210
}
211
212
// false since this client is not a user
213
public is_user(): boolean {
214
return false;
215
}
216
217
public is_signed_in(): boolean {
218
return true;
219
}
220
221
public is_connected(): boolean {
222
return this._connected;
223
}
224
225
// We trust the time on our own compute servers (unlike random user's browser).
226
public server_time(): Date {
227
return new Date();
228
}
229
230
// Declare that the given socket is active right now and can be used for
231
// communication with some hub (the one the socket is connected to).
232
public active_socket(socket: CoCalcSocket): void {
233
const dbg = this.dbg(
234
`active_socket(id=${socket.id},ip='${socket.remoteAddress}')`,
235
);
236
let x = this._hub_client_sockets[socket.id];
237
if (x == null) {
238
dbg();
239
x = this._hub_client_sockets[socket.id] = {
240
socket,
241
callbacks: {},
242
activity: new Date(),
243
};
244
let heartbeat_interval: ReturnType<typeof setInterval> | undefined =
245
undefined;
246
const socket_end = (): void => {
247
if (heartbeat_interval == null) {
248
// alrady destroyed it
249
return;
250
}
251
dbg("ending socket");
252
clearInterval(heartbeat_interval);
253
heartbeat_interval = undefined;
254
if (x.callbacks != null) {
255
for (const id in x.callbacks) {
256
// TODO: is this right? Should we call the callback an {event:error} object?
257
const cb = x.callbacks[id] as CB<any, string>;
258
cb?.("socket closed");
259
}
260
delete x.callbacks; // so additional trigger of end doesn't do anything
261
}
262
delete this._hub_client_sockets[socket.id];
263
dbg(
264
`number of active sockets now equals ${misc.len(
265
this._hub_client_sockets,
266
)}`,
267
);
268
if (misc.len(this._hub_client_sockets) === 0) {
269
this._connected = false;
270
dbg("lost all active sockets");
271
this.emit("disconnected");
272
}
273
socket.end();
274
socket.destroy();
275
};
276
277
socket.on("end", socket_end);
278
socket.on("error", socket_end);
279
280
const check_heartbeat = (): void => {
281
if (
282
socket.heartbeat == null ||
283
Date.now() - socket.heartbeat.getTime() >=
284
1.5 * PROJECT_HUB_HEARTBEAT_INTERVAL_S * 1000
285
) {
286
dbg("heartbeat failed");
287
socket_end();
288
} else {
289
dbg("heartbeat -- socket is working");
290
}
291
};
292
293
heartbeat_interval = setInterval(
294
check_heartbeat,
295
1.5 * PROJECT_HUB_HEARTBEAT_INTERVAL_S * 1000,
296
);
297
298
if (misc.len(this._hub_client_sockets) >= 1) {
299
dbg("CONNECTED!");
300
this._connected = true;
301
this.emit("connected");
302
}
303
} else {
304
x.activity = new Date();
305
}
306
}
307
308
// Handle a mesg coming back from some hub. If we have a callback we call it
309
// for the given message, then return true. Otherwise, return
310
// false, meaning something else should try to handle this message.
311
public handle_mesg(mesg, socket) {
312
const dbg = this.dbg(`handle_mesg(${misc.trunc_middle(json(mesg), 512)})`);
313
const f = this._hub_callbacks[mesg.id];
314
if (f != null) {
315
dbg("calling callback");
316
if (!mesg.multi_response) {
317
delete this._hub_callbacks[mesg.id];
318
delete this._hub_client_sockets[socket.id].callbacks?.[mesg.id];
319
}
320
try {
321
f(mesg);
322
} catch (err) {
323
dbg(`WARNING: error handling message from client. -- ${err}`);
324
}
325
return true;
326
} else {
327
dbg("no callback");
328
return false;
329
}
330
}
331
332
// Get a socket connection to the hub from one in our cache; choose one at random.
333
// There is obviously no guarantee to get the same hub if you call this twice!
334
// Returns undefined if there are currently no connections from any hub to us
335
// (in which case, the project must wait).
336
public get_hub_socket() {
337
const socket_ids = misc.keys(this._hub_client_sockets);
338
this.dbg("get_hub_socket")(
339
`there are ${socket_ids.length} sockets -- ${JSON.stringify(socket_ids)}`,
340
);
341
if (socket_ids.length === 0) {
342
return;
343
}
344
return this._hub_client_sockets[misc.random_choice(socket_ids)].socket;
345
}
346
347
// Send a message to some hub server and await a response (if cb defined).
348
public call(opts: {
349
message: any;
350
timeout?: number; // timeout in seconds; if specified call will error out after this much time
351
socket?: CoCalcSocket; // if specified, use this socket
352
cb?: CB<any, string>; // awaits response if given
353
}) {
354
const dbg = this.dbg(`call(message=${json(opts.message)})`);
355
dbg();
356
const socket =
357
opts.socket != null ? opts.socket : (opts.socket = this.get_hub_socket()); // set socket to best one if no socket specified
358
if (socket == null) {
359
dbg("no sockets");
360
// currently, due to the security model, there's no way out of this; that will change...
361
opts.cb?.("no hubs currently connected to this project");
362
return;
363
}
364
if (opts.cb != null) {
365
let timer;
366
if (opts.timeout) {
367
dbg("configure timeout");
368
const fail = () => {
369
dbg("failed");
370
delete this._hub_callbacks[opts.message.id];
371
opts.cb?.(`timeout after ${opts.timeout}s`);
372
delete opts.cb;
373
};
374
timer = setTimeout(fail, opts.timeout * 1000);
375
}
376
if (opts.message.id == null) {
377
opts.message.id = misc.uuid();
378
}
379
const cb = (this._hub_callbacks[opts.message.id] = (resp) => {
380
//dbg("got response: #{misc.trunc(json(resp),400)}")
381
if (timer != null) {
382
clearTimeout(timer);
383
timer = undefined;
384
}
385
if (resp?.event === "error") {
386
opts.cb?.(resp.error ? resp.error : "error");
387
} else {
388
opts.cb?.(undefined, resp);
389
}
390
});
391
const callbacks = this._hub_client_sockets[socket.id].callbacks;
392
if (callbacks != null) {
393
callbacks[opts.message.id] = cb;
394
}
395
}
396
// Finally, send the message
397
return socket.write_mesg("json", opts.message);
398
}
399
400
// Do a project_query
401
public query({
402
query,
403
options,
404
changes,
405
//standby = false, // **IGNORED**
406
timeout = 30,
407
cb,
408
}: {
409
query: any; // a query (see schema.js)
410
options?: { [key: string]: any }[]; // options to the query, e.g., [{limit:5}] )
411
changes?: boolean; // whether or not to create a changefeed
412
//standby: boolean; // **IGNORED**
413
timeout: number; // how long in *seconds* wait for initial result from hub database call
414
cb: CB<any, string>;
415
}) {
416
if (options != null && !misc.is_array(options)) {
417
throw Error("options must be an array");
418
return;
419
}
420
const mesg = message.query({
421
id: misc.uuid(),
422
query,
423
options,
424
changes,
425
multi_response: changes,
426
});
427
const socket = this.get_hub_socket();
428
if (socket == null) {
429
// It will try later when one is available...
430
cb("no hub socket available");
431
return;
432
}
433
if (changes) {
434
// Record socket for this changefeed in @_changefeed_sockets
435
this._changefeed_sockets[mesg.id] = socket;
436
// CRITICAL: On error or end, send an end error to the synctable, so that it will
437
// attempt to reconnect (and also stop writing to the socket).
438
// This is important, since for project clients
439
// the disconnected event is only emitted when *all* connections from
440
// hubs to the local_hub end. If two connections s1 and s2 are open,
441
// and s1 is used for a sync table, and s1 closes (e.g., hub1 is restarted),
442
// then s2 is still open and no 'disconnected' event is emitted. Nonetheless,
443
// it's important for the project to consider the synctable broken and
444
// try to reconnect it, which in this case it would do using s2.
445
socket.on("error", () => {
446
cb("socket-end");
447
});
448
socket.on("end", () => {
449
cb("socket-end");
450
});
451
}
452
return this.call({
453
message: mesg,
454
timeout,
455
socket,
456
cb,
457
});
458
}
459
460
// Cancel an outstanding changefeed query.
461
private _query_cancel(opts: { id: string; cb?: CB }) {
462
const socket = this._changefeed_sockets[opts.id];
463
if (socket == null) {
464
// nothing to do
465
return opts.cb?.();
466
} else {
467
return this.call({
468
message: message.query_cancel({ id: opts.id }),
469
timeout: 30,
470
socket,
471
cb: opts.cb,
472
});
473
}
474
}
475
476
// ASYNC version
477
public async query_cancel(id) {
478
return await callback2(this._query_cancel, { id });
479
}
480
481
public sync_table(query, options?: any, throttle_changes = undefined) {
482
return synctable2.synctable(query, options, this, throttle_changes);
483
}
484
485
// We leave in the project_id for consistency with the browser UI.
486
// And maybe someday we'll have tables managed across projects (?).
487
public async synctable_project(_project_id: string, query, _options) {
488
// TODO: this is ONLY for syncstring tables (syncstrings, patches, cursors).
489
// Also, options are ignored -- since we use whatever was selected by the frontend.
490
const the_synctable = await get_synctable(query, this);
491
// To provide same API, must also wait until done initializing.
492
if (the_synctable.get_state() !== "connected") {
493
await once(the_synctable, "connected");
494
}
495
if (the_synctable.get_state() !== "connected") {
496
throw Error(
497
"Bug -- state of synctable must be connected " + JSON.stringify(query),
498
);
499
}
500
return the_synctable;
501
}
502
503
// WARNING: making two of the exact same sync_string or sync_db will definitely
504
// lead to corruption!
505
506
// Get the synchronized doc with the given path. Returns undefined
507
// if currently no such sync-doc.
508
public syncdoc({ path }: { path: string }): SyncDoc | undefined {
509
return get_syncdoc(path);
510
}
511
512
public symmetric_channel(name) {
513
return symmetric_channel(name);
514
}
515
516
public path_access(opts: { path: string; mode: string; cb: CB }): void {
517
// mode: sub-sequence of 'rwxf' -- see https://nodejs.org/api/fs.html#fs_class_fs_stats
518
// cb(err); err = if any access fails; err=undefined if all access is OK
519
let access = 0;
520
for (let s of opts.mode) {
521
access |= fs[s.toUpperCase() + "_OK"];
522
}
523
return fs.access(opts.path, access, opts.cb);
524
}
525
526
// TODO: exists is deprecated. "To check if a file exists
527
// without manipulating it afterwards, fs.access() is
528
// recommended."
529
public path_exists(opts: { path: string; cb: CB }) {
530
const dbg = this.dbg(`checking if path (='${opts.path}') exists`);
531
dbg();
532
return fs.exists(opts.path, (exists) => {
533
dbg(`returned ${exists}`);
534
opts.cb(undefined, exists);
535
}); // err actually never happens with node.js, so we change api to be more consistent
536
}
537
538
// Size of file in bytes (divide by 1000 for K, by 10^6 for MB.)
539
public file_size(opts: { filename: string; cb: CB }): void {
540
this.path_stat({
541
path: opts.filename,
542
cb: (err, stat) => {
543
opts.cb(err, stat?.size);
544
},
545
});
546
}
547
548
// execute a command using the shell or a subprocess -- see docs for execute_code in misc_node.
549
public shell(opts: ExecuteCodeOptionsWithCallback): void {
550
execute_code(opts);
551
}
552
553
// return new sage session -- the code that actually calls this is in the @cocalc/sync package
554
// in "packages/sync/editor/generic/evaluator.ts"
555
public sage_session({
556
path,
557
}: {
558
path: string; // the path to the *worksheet* file
559
}): sage_session.SageSessionType {
560
return sage_session.sage_session({ path, client: this });
561
}
562
563
// Save a blob to the central db blobstore.
564
// The sha1 is optional.
565
public save_blob({
566
blob,
567
sha1,
568
uuid: optsUUID,
569
cb,
570
}: {
571
blob: Buffer; // Buffer of data
572
sha1?: string;
573
uuid?: string; // if given then uuid must be derived from sha1 hash
574
cb?: (err: string | undefined, resp?: any) => void;
575
}) {
576
const uuid = optsUUID ?? uuidsha1(blob, sha1);
577
const dbg = this.dbg(`save_blob(uuid='${uuid}')`);
578
const hub = this.get_hub_socket();
579
if (hub == null) {
580
dbg("fail -- no global hubs");
581
cb?.(
582
"no global hubs are connected to the local hub, so nowhere to send file",
583
);
584
return;
585
}
586
dbg("sending blob mesg");
587
hub.write_mesg("blob", { uuid, blob });
588
dbg("waiting for response");
589
blobs.receive_save_blob_message({
590
sha1: uuid,
591
cb: (resp): void => {
592
if (resp?.error) {
593
dbg(`fail -- '${resp.error}'`);
594
cb?.(resp.error, resp);
595
} else {
596
dbg("success");
597
cb?.(undefined, resp);
598
}
599
},
600
});
601
}
602
603
public get_blob(opts: {
604
blob: Buffer; // Buffer of data
605
sha1?: string;
606
uuid?: string; // if given is uuid derived from sha1
607
cb?: (err: string) => void; // (err, resp)
608
}) {
609
const dbg = this.dbg("get_blob");
610
dbg(opts.sha1);
611
opts.cb?.("get_blob: not implemented");
612
}
613
614
// no-op; assumed async api
615
touch_project(_project_id: string) {}
616
617
async get_syncdoc_history(string_id: string, patches = false) {
618
const dbg = this.dbg("get_syncdoc_history");
619
dbg(string_id, patches);
620
const mesg = message.get_syncdoc_history({
621
string_id,
622
patches,
623
});
624
return await callback2(this.call, { message: mesg });
625
}
626
627
// Return true if the file was explicitly deleted.
628
// Returns unknown if don't know
629
// Returns false if definitely not.
630
public is_deleted(filename: string, _project_id: string) {
631
return getListingsTable()?.isDeleted(filename);
632
}
633
634
public async set_deleted(
635
filename: string,
636
_project_id?: string,
637
): Promise<void> {
638
// project_id is ignored
639
const listings = getListingsTable();
640
return await listings?.setDeleted(filename);
641
}
642
}
643
644