CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/jupyter/redux/project-actions.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
project-actions: additional actions that are only available in the
8
backend/project, which "manages" everything.
9
10
This code should not *explicitly* require anything that is only
11
available in the project or requires node to run, so that we can
12
fully unit test it via mocking of components.
13
14
NOTE: this is also now the actions used by remote compute servers as well.
15
*/
16
17
import { get_kernel_data } from "@cocalc/jupyter/kernel/kernel-data";
18
import * as immutable from "immutable";
19
import json_stable from "json-stable-stringify";
20
import { debounce } from "lodash";
21
import { JupyterActions as JupyterActions0 } from "@cocalc/jupyter/redux/actions";
22
import { callback2, once } from "@cocalc/util/async-utils";
23
import * as misc from "@cocalc/util/misc";
24
import { OutputHandler } from "@cocalc/jupyter/execute/output-handler";
25
import { RunAllLoop } from "./run-all-loop";
26
import nbconvertChange from "./handle-nbconvert-change";
27
import type { ClientFs } from "@cocalc/sync/client/types";
28
import { kernel as createJupyterKernel } from "@cocalc/jupyter/kernel";
29
import {
30
decodeUUIDtoNum,
31
isEncodedNumUUID,
32
} from "@cocalc/util/compute/manager";
33
import { handleApiRequest } from "@cocalc/jupyter/kernel/websocket-api";
34
import { callback } from "awaiting";
35
import { get_blob_store } from "@cocalc/jupyter/blobs";
36
import { removeJupyterRedux } from "@cocalc/jupyter/kernel";
37
38
type BackendState = "init" | "ready" | "spawning" | "starting" | "running";
39
40
export class JupyterActions extends JupyterActions0 {
41
private _backend_state: BackendState = "init";
42
private _initialize_manager_already_done: any;
43
private _kernel_state: any;
44
private _manager_run_cell_queue: any;
45
private _running_cells: { [id: string]: string };
46
private _throttled_ensure_positions_are_unique: any;
47
private run_all_loop?: RunAllLoop;
48
private clear_kernel_error?: any;
49
private running_manager_run_cell_process_queue: boolean = false;
50
private last_ipynb_save: number = 0;
51
protected _client: ClientFs; // this has filesystem access, etc.
52
53
public run_cell(
54
id: string,
55
save: boolean = true,
56
no_halt: boolean = false,
57
): void {
58
if (this.store.get("read_only")) return;
59
const cell = this.store.getIn(["cells", id]);
60
if (cell == null) {
61
// it is trivial to run a cell that does not exist -- nothing needs to be done.
62
return;
63
}
64
const cell_type = cell.get("cell_type", "code");
65
if (cell_type == "code") {
66
// when the backend is running code, just don't worry about
67
// trying to parse things like "foo?" out. We can't do
68
// it without CodeMirror, and it isn't worth it for that
69
// application.
70
this.run_code_cell(id, save, no_halt);
71
}
72
if (save) {
73
this.save_asap();
74
}
75
}
76
77
private set_backend_state(backend_state: BackendState): void {
78
this.dbg("set_backend_state")(backend_state);
79
80
/*
81
The backend states, which are put in the syncdb so clients
82
can display this:
83
84
- 'init' -- the backend is checking the file on disk, etc.
85
- 'ready' -- the backend is setup and ready to use; kernel isn't running though
86
- 'starting' -- the kernel itself is actived and currently starting up (e.g., Sage is starting up)
87
- 'running' -- the kernel is running and ready to evaluate code
88
89
90
'init' --> 'ready' --> 'spawning' --> 'starting' --> 'running'
91
/|\ |
92
|-----------------------------------------|
93
94
Going from ready to starting happens first when a code execution is requested.
95
*/
96
97
// Check just in case Typescript doesn't catch something:
98
if (
99
["init", "ready", "spawning", "starting", "running"].indexOf(
100
backend_state,
101
) === -1
102
) {
103
throw Error(`invalid backend state '${backend_state}'`);
104
}
105
if (backend_state == "init" && this._backend_state != "init") {
106
// Do NOT allow changing the state to init from any other state.
107
throw Error(
108
`illegal state change '${this._backend_state}' --> '${backend_state}'`,
109
);
110
}
111
this._backend_state = backend_state;
112
113
if (this.isCellRunner()) {
114
const stored_backend_state = this.syncdb
115
.get_one({ type: "settings" })
116
?.get("backend_state");
117
118
if (stored_backend_state != backend_state) {
119
this._set({
120
type: "settings",
121
backend_state,
122
last_backend_state: Date.now(),
123
});
124
this.save_asap();
125
}
126
127
// The following is to clear kernel_error if things are working only.
128
if (backend_state == "running") {
129
// clear kernel error if kernel successfully starts and stays
130
// in running state for a while.
131
this.clear_kernel_error = setTimeout(() => {
132
this._set({
133
type: "settings",
134
kernel_error: "",
135
});
136
}, 3000);
137
} else {
138
// change to a different state; cancel attempt to clear kernel error
139
if (this.clear_kernel_error) {
140
clearTimeout(this.clear_kernel_error);
141
delete this.clear_kernel_error;
142
}
143
}
144
}
145
}
146
147
set_kernel_state = (state: any, save = false) => {
148
if (!this.isCellRunner()) return;
149
this._kernel_state = state;
150
this._set({ type: "settings", kernel_state: state }, save);
151
};
152
153
// Called exactly once when the manager first starts up after the store is initialized.
154
// Here we ensure everything is in a consistent state so that we can react
155
// to changes later.
156
async initialize_manager() {
157
if (this._initialize_manager_already_done) {
158
return;
159
}
160
const dbg = this.dbg("initialize_manager");
161
dbg();
162
this._initialize_manager_already_done = true;
163
164
this.sync_exec_state = debounce(this.sync_exec_state, 2000);
165
this._throttled_ensure_positions_are_unique = debounce(
166
this.ensure_positions_are_unique,
167
5000,
168
);
169
// Listen for changes...
170
this.syncdb.on("change", this._backend_syncdb_change.bind(this));
171
172
this.setState({
173
// used by the kernel_info function of this.jupyter_kernel
174
start_time: this._client.server_time().valueOf(),
175
});
176
177
// clear nbconvert start on init, since no nbconvert can be running yet
178
this.syncdb.delete({ type: "nbconvert" });
179
180
// Initialize info about available kernels, which is used e.g., for
181
// saving to ipynb format.
182
this.init_kernel_info();
183
184
// We try once to load from disk. If it fails, then
185
// a record with type:'fatal'
186
// is created in the database; if it succeeds, that record is deleted.
187
// Try again only when the file changes.
188
await this._first_load();
189
190
// Listen for model state changes...
191
if (this.syncdb.ipywidgets_state == null) {
192
throw Error("syncdb's ipywidgets_state must be defined!");
193
}
194
this.syncdb.ipywidgets_state.on(
195
"change",
196
this.handle_ipywidgets_state_change.bind(this),
197
);
198
199
this.syncdb.on("cursor_activity", this.checkForComputeServerStateChange);
200
201
// initialize the websocket api
202
this.initWebsocketApi();
203
}
204
205
private async _first_load() {
206
const dbg = this.dbg("_first_load");
207
dbg("doing load");
208
if (this.is_closed()) {
209
throw Error("actions must not be closed");
210
}
211
try {
212
await this.loadFromDiskIfNewer();
213
} catch (err) {
214
dbg(`load failed -- ${err}; wait for file change and try again`);
215
const path = this.store.get("path");
216
const watcher = this._client.watch_file({ path });
217
await once(watcher, "change");
218
dbg("file changed");
219
watcher.close();
220
await this._first_load();
221
return;
222
}
223
dbg("loading worked");
224
this._init_after_first_load();
225
}
226
227
private _init_after_first_load() {
228
const dbg = this.dbg("_init_after_first_load");
229
230
dbg("initializing");
231
this.ensure_backend_kernel_setup(); // this may change the syncdb.
232
233
this.init_file_watcher();
234
235
this._state = "ready";
236
this.ensure_there_is_a_cell();
237
}
238
239
_backend_syncdb_change = (changes: any) => {
240
if (this.is_closed()) {
241
return;
242
}
243
const dbg = this.dbg("_backend_syncdb_change");
244
if (changes != null) {
245
changes.forEach((key) => {
246
switch (key.get("type")) {
247
case "settings":
248
dbg("settings change");
249
var record = this.syncdb.get_one(key);
250
if (record != null) {
251
// ensure kernel is properly configured
252
this.ensure_backend_kernel_setup();
253
// only the backend should change kernel and backend state;
254
// however, our security model allows otherwise (e.g., via TimeTravel).
255
if (
256
record.get("kernel_state") !== this._kernel_state &&
257
this._kernel_state != null
258
) {
259
this.set_kernel_state(this._kernel_state, true);
260
}
261
if (record.get("backend_state") !== this._backend_state) {
262
this.set_backend_state(this._backend_state);
263
}
264
265
if (record.get("run_all_loop_s")) {
266
if (this.run_all_loop == null) {
267
this.run_all_loop = new RunAllLoop(
268
this,
269
record.get("run_all_loop_s"),
270
);
271
} else {
272
// ensure interval is correct
273
this.run_all_loop.set_interval(record.get("run_all_loop_s"));
274
}
275
} else if (
276
!record.get("run_all_loop_s") &&
277
this.run_all_loop != null
278
) {
279
// stop it.
280
this.run_all_loop.close();
281
delete this.run_all_loop;
282
}
283
}
284
break;
285
}
286
});
287
}
288
289
this.ensure_there_is_a_cell();
290
this._throttled_ensure_positions_are_unique();
291
this.sync_exec_state();
292
};
293
294
// ensure_backend_kernel_setup ensures that we have a connection
295
// to the proper type of kernel.
296
// If running is true, starts the kernel and waits until running.
297
ensure_backend_kernel_setup = () => {
298
const dbg = this.dbg("ensure_backend_kernel_setup");
299
if (this.isDeleted()) {
300
dbg("file is deleted");
301
return;
302
}
303
304
const kernel = this.store.get("kernel");
305
306
let current: string | undefined = undefined;
307
if (this.jupyter_kernel != null) {
308
current = this.jupyter_kernel.name;
309
if (current == kernel && this.jupyter_kernel.get_state() != "closed") {
310
dbg("everything is properly setup and working");
311
return;
312
}
313
}
314
315
dbg(`kernel='${kernel}', current='${current}'`);
316
if (
317
this.jupyter_kernel != null &&
318
this.jupyter_kernel.get_state() != "closed"
319
) {
320
if (current != kernel) {
321
dbg("kernel changed -- kill running kernel to trigger switch");
322
this.jupyter_kernel.close();
323
return;
324
} else {
325
dbg("nothing to do");
326
return;
327
}
328
}
329
330
dbg("make a new kernel");
331
332
// No kernel wrapper object setup at all. Make one.
333
this.jupyter_kernel = createJupyterKernel({
334
name: kernel,
335
path: this.store.get("path"),
336
actions: this,
337
});
338
339
if (this.syncdb.ipywidgets_state == null) {
340
throw Error("syncdb's ipywidgets_state must be defined!");
341
}
342
this.syncdb.ipywidgets_state.clear();
343
344
if (this.jupyter_kernel == null) {
345
// to satisfy typescript.
346
throw Error("jupyter_kernel must be defined");
347
}
348
349
// save so gets reported to frontend, and surfaced to user:
350
// https://github.com/sagemathinc/cocalc/issues/4847
351
this.jupyter_kernel.on("kernel_error", (error) => {
352
this.set_kernel_error(error);
353
});
354
355
// Since we just made a new kernel, clearly no cells are running on the backend.
356
this._running_cells = {};
357
this.clear_all_cell_run_state();
358
359
this.restartKernelOnClose = () => {
360
// When the kernel closes, make sure a new kernel gets setup.
361
if (this.store == null || this._state !== "ready") {
362
// This event can also happen when this actions is being closed,
363
// in which case obviously we shouldn't make a new kernel.
364
return;
365
}
366
dbg("kernel closed -- make new one.");
367
this.ensure_backend_kernel_setup();
368
};
369
370
this.jupyter_kernel.once("closed", this.restartKernelOnClose);
371
372
// Track backend state changes other than closing, so they
373
// are visible to user etc.
374
// TODO: Maybe all these need to move to ephemeral table?
375
// There's a good argument that recording these is useful though, so when
376
// looking at time travel or debugging, you know what was going on.
377
this.jupyter_kernel.on("state", (state) => {
378
dbg("jupyter_kernel state --> ", state);
379
switch (state) {
380
case "off":
381
case "closed":
382
// things went wrong.
383
this._running_cells = {};
384
this.clear_all_cell_run_state();
385
this.set_backend_state("ready");
386
this.jupyter_kernel?.close();
387
this.running_manager_run_cell_process_queue = false;
388
delete this.jupyter_kernel;
389
return;
390
case "spawning":
391
case "starting":
392
this.set_connection_file(); // yes, fall through
393
case "running":
394
this.set_backend_state(state);
395
}
396
});
397
398
this.jupyter_kernel.on("execution_state", this.set_kernel_state);
399
400
this.handle_all_cell_attachments();
401
this.set_backend_state("ready");
402
};
403
404
set_connection_file = () => {
405
const connection_file = this.jupyter_kernel?.get_connection_file() ?? "";
406
this._set({
407
type: "settings",
408
connection_file,
409
});
410
};
411
412
init_kernel_info = async () => {
413
let kernels0 = this.store.get("kernels");
414
if (kernels0 != null) {
415
return;
416
}
417
const dbg = this.dbg("init_kernel_info");
418
dbg("getting");
419
let kernels;
420
try {
421
kernels = await get_kernel_data();
422
dbg("success");
423
} catch (err) {
424
dbg(`FAILED to get kernel info: ${err}`);
425
// TODO: what to do?? Saving will be broken...
426
return;
427
}
428
this.setState({
429
kernels: immutable.fromJS(kernels),
430
});
431
};
432
433
async ensure_backend_kernel_is_running() {
434
const dbg = this.dbg("ensure_backend_kernel_is_running");
435
if (this._backend_state == "ready") {
436
dbg("in state 'ready', so kick it into gear");
437
await this.set_backend_kernel_info();
438
dbg("done getting kernel info");
439
}
440
const is_running = (s): boolean => {
441
if (this._state === "closed") return true;
442
const t = s.get_one({ type: "settings" });
443
if (t == null) {
444
dbg("no settings");
445
return false;
446
} else {
447
const state = t.get("backend_state");
448
dbg(`state = ${state}`);
449
return state == "running";
450
}
451
};
452
await this.syncdb.wait(is_running, 60);
453
}
454
455
// onCellChange is called after a cell change has been
456
// incorporated into the store after the syncdb change event.
457
// - If we are responsible for running cells, then it ensures
458
// that cell gets computed.
459
// - We also handle attachments for markdown cells.
460
protected onCellChange(id: string, new_cell: any, old_cell: any) {
461
const dbg = this.dbg(`onCellChange(id='${id}')`);
462
dbg();
463
// this logging could be expensive due to toJS, so only uncomment
464
// if really needed
465
// dbg("new_cell=", new_cell?.toJS(), "old_cell", old_cell?.toJS());
466
467
if (
468
new_cell?.get("state") === "start" &&
469
old_cell?.get("state") !== "start" &&
470
this.isCellRunner()
471
) {
472
this.manager_run_cell_enqueue(id);
473
// attachments below only happen for markdown cells, which don't get run,
474
// we can return here:
475
return;
476
}
477
478
const attachments = new_cell?.get("attachments");
479
if (attachments != null && attachments !== old_cell?.get("attachments")) {
480
this.handle_cell_attachments(new_cell);
481
}
482
}
483
484
protected __syncdb_change_post_hook(doInit: boolean) {
485
if (doInit) {
486
if (this.isCellRunner()) {
487
// Since just opening the actions in the project, definitely the kernel
488
// isn't running so set this fact in the shared database. It will make
489
// things always be in the right initial state.
490
this.syncdb.set({
491
type: "settings",
492
backend_state: "init",
493
kernel_state: "idle",
494
kernel_usage: { memory: 0, cpu: 0 },
495
});
496
this.syncdb.commit();
497
}
498
499
// Also initialize the execution manager, which runs cells that have been
500
// requested to run.
501
this.initialize_manager();
502
}
503
if (this.store.get("kernel")) {
504
this.manager_run_cell_process_queue();
505
}
506
}
507
508
// Ensure that the cells listed as running *are* exactly the
509
// ones actually running or queued up to run.
510
sync_exec_state = () => {
511
// sync_exec_state is debounced, so it is *expected* to get called
512
// after actions have been closed.
513
if (this.store == null || this._state !== "ready") {
514
// not initialized, so we better not
515
// mess with cell state (that is somebody else's responsibility).
516
return;
517
}
518
// we are not the cell runner
519
if (!this.isCellRunner()) {
520
return;
521
}
522
523
const dbg = this.dbg("sync_exec_state");
524
let change = false;
525
const cells = this.store.get("cells");
526
// First verify that all actual cells that are said to be running
527
// (according to the store) are in fact running.
528
if (cells != null) {
529
cells.forEach((cell, id) => {
530
const state = cell.get("state");
531
if (
532
state != null &&
533
state != "done" &&
534
state != "start" && // regarding "start", see https://github.com/sagemathinc/cocalc/issues/5467
535
!this._running_cells?.[id]
536
) {
537
dbg(`set cell ${id} with state "${state}" to done`);
538
this._set({ type: "cell", id, state: "done" }, false);
539
change = true;
540
}
541
});
542
}
543
if (this._running_cells != null) {
544
const cells = this.store.get("cells");
545
// Next verify that every cell actually running is still in the document
546
// and listed as running. TimeTravel, deleting cells, etc., can
547
// certainly lead to this being necessary.
548
for (const id in this._running_cells) {
549
const state = cells.getIn([id, "state"]);
550
if (state == null || state === "done") {
551
// cell no longer exists or isn't in a running state
552
dbg(`tell kernel to not run ${id}`);
553
this._cancel_run(id);
554
}
555
}
556
}
557
if (change) {
558
return this._sync();
559
}
560
};
561
562
_cancel_run = (id: any) => {
563
const dbg = this.dbg(`_cancel_run ${id}`);
564
// All these checks are so we only cancel if it is actually running
565
// with the current kernel...
566
if (this._running_cells == null || this.jupyter_kernel == null) return;
567
const identity = this._running_cells[id];
568
if (identity == null) return;
569
if (this.jupyter_kernel.identity == identity) {
570
dbg("canceling");
571
this.jupyter_kernel.cancel_execute(id);
572
} else {
573
dbg("not canceling since wrong identity");
574
}
575
};
576
577
// Note that there is a request to run a given cell.
578
// You must call manager_run_cell_process_queue for them to actually start running.
579
protected manager_run_cell_enqueue(id: string) {
580
if (this._running_cells?.[id]) {
581
return;
582
}
583
if (this._manager_run_cell_queue == null) {
584
this._manager_run_cell_queue = {};
585
}
586
this._manager_run_cell_queue[id] = true;
587
}
588
589
// properly start running -- in order -- the cells that have been requested to run
590
protected async manager_run_cell_process_queue() {
591
if (this.running_manager_run_cell_process_queue) {
592
return;
593
}
594
this.running_manager_run_cell_process_queue = true;
595
try {
596
const dbg = this.dbg("manager_run_cell_process_queue");
597
const queue = this._manager_run_cell_queue;
598
if (queue == null) {
599
//dbg("queue is null");
600
return;
601
}
602
delete this._manager_run_cell_queue;
603
const v: any[] = [];
604
for (const id in queue) {
605
if (!this._running_cells?.[id]) {
606
v.push(this.store.getIn(["cells", id]));
607
}
608
}
609
610
if (v.length == 0) {
611
dbg("no non-running cells");
612
return; // nothing to do
613
}
614
615
v.sort((a, b) =>
616
misc.cmp(
617
a != null ? a.get("start") : undefined,
618
b != null ? b.get("start") : undefined,
619
),
620
);
621
622
dbg(
623
`found ${v.length} non-running cell that should be running, so ensuring kernel is running...`,
624
);
625
this.ensure_backend_kernel_setup();
626
try {
627
await this.ensure_backend_kernel_is_running();
628
if (this._state == "closed") return;
629
} catch (err) {
630
// if this fails, give up on evaluation.
631
return;
632
}
633
634
dbg(
635
`kernel is now running; requesting that each ${v.length} cell gets executed`,
636
);
637
for (const cell of v) {
638
if (cell != null) {
639
this.manager_run_cell(cell.get("id"));
640
}
641
}
642
643
if (this._manager_run_cell_queue != null) {
644
// run it again to process additional entries.
645
setTimeout(this.manager_run_cell_process_queue, 1);
646
}
647
} finally {
648
this.running_manager_run_cell_process_queue = false;
649
}
650
}
651
652
// returns new output handler for this cell.
653
protected _output_handler(cell: any) {
654
const dbg = this.dbg(`handler(id='${cell.id}')`);
655
if (
656
this.jupyter_kernel == null ||
657
this.jupyter_kernel.get_state() == "closed"
658
) {
659
throw Error("jupyter kernel must exist and not be closed");
660
}
661
this.reset_more_output(cell.id);
662
663
const handler = new OutputHandler({
664
cell,
665
max_output_length: this.store.get("max_output_length"),
666
report_started_ms: 250,
667
dbg,
668
});
669
670
dbg("setting up jupyter_kernel.once('closed', ...) handler");
671
const handleKernelClose = () => {
672
dbg("output handler -- closing due to jupyter kernel closed");
673
handler.close();
674
};
675
this.jupyter_kernel.once("closed", handleKernelClose);
676
// remove the "closed" handler we just defined above once
677
// we are done waiting for output from this cell.
678
// The output handler removes all listeners whenever it is
679
// finished, so we don't have to remove this listener for done.
680
handler.once("done", () =>
681
this.jupyter_kernel?.removeListener("closed", handleKernelClose),
682
);
683
684
handler.on("more_output", (mesg, mesg_length) => {
685
this.set_more_output(cell.id, mesg, mesg_length);
686
});
687
688
handler.on("process", (mesg) => {
689
// Do not enable -- mesg often very large!
690
// dbg("handler.on('process')", mesg);
691
if (
692
this.jupyter_kernel == null ||
693
this.jupyter_kernel.get_state() == "closed"
694
) {
695
return;
696
}
697
this.jupyter_kernel.process_output(mesg);
698
// dbg("handler -- after processing ", mesg);
699
});
700
701
return handler;
702
}
703
704
manager_run_cell = (id: string) => {
705
const dbg = this.dbg(`manager_run_cell(id='${id}')`);
706
dbg(JSON.stringify(misc.keys(this._running_cells)));
707
708
if (this._running_cells == null) {
709
this._running_cells = {};
710
}
711
712
if (this._running_cells[id]) {
713
dbg("cell already queued to run in kernel");
714
return;
715
}
716
717
// It's important to set this._running_cells[id] to be true so that
718
// sync_exec_state doesn't declare this cell done. The kernel identity
719
// will get set properly below in case it changes.
720
this._running_cells[id] = this.jupyter_kernel?.identity ?? "none";
721
722
const orig_cell = this.store.get("cells").get(id);
723
if (orig_cell == null) {
724
// nothing to do -- cell deleted
725
return;
726
}
727
728
let input: string | undefined = orig_cell.get("input", "");
729
if (input == null) {
730
input = "";
731
} else {
732
input = input.trim();
733
}
734
735
const halt_on_error: boolean = !orig_cell.get("no_halt", false);
736
737
if (this.jupyter_kernel == null) {
738
throw Error("bug -- this is guaranteed by the above");
739
}
740
this._running_cells[id] = this.jupyter_kernel.identity;
741
742
const cell: any = {
743
id,
744
type: "cell",
745
kernel: this.store.get("kernel"),
746
};
747
748
dbg(`using max_output_length=${this.store.get("max_output_length")}`);
749
const handler = this._output_handler(cell);
750
751
handler.on("change", (save) => {
752
if (!this.store.getIn(["cells", id])) {
753
// The cell was deleted, but we just got some output
754
// NOTE: client shouldn't allow deleting running or queued
755
// cells, but we still want to do something useful/sensible.
756
// We put cell back where it was with same input.
757
cell.input = orig_cell.get("input");
758
cell.pos = orig_cell.get("pos");
759
}
760
this.syncdb.set(cell);
761
// This is potentially very verbose -- don't due it unless
762
// doing low level debugging:
763
//dbg(`change (save=${save}): cell='${JSON.stringify(cell)}'`);
764
if (save) {
765
this.syncdb.save();
766
}
767
});
768
769
handler.once("done", () => {
770
dbg("handler is done");
771
this.store.removeListener("cell_change", cell_change);
772
exec.close();
773
if (this._running_cells != null) {
774
delete this._running_cells[id];
775
}
776
this.syncdb.save();
777
setTimeout(() => this.syncdb?.save(), 100);
778
});
779
780
if (this.jupyter_kernel == null) {
781
handler.error("Unable to start Jupyter");
782
return;
783
}
784
785
const get_password = (): string => {
786
if (this.jupyter_kernel == null) {
787
dbg("get_password", id, "no kernel");
788
return "";
789
}
790
const password = this.jupyter_kernel.store.get(id);
791
dbg("get_password", id, password);
792
this.jupyter_kernel.store.delete(id);
793
return password;
794
};
795
796
// This is used only for stdin right now.
797
const cell_change = (cell_id, new_cell) => {
798
if (id === cell_id) {
799
dbg("cell_change");
800
handler.cell_changed(new_cell, get_password);
801
}
802
};
803
this.store.on("cell_change", cell_change);
804
805
const exec = this.jupyter_kernel.execute_code({
806
code: input,
807
id,
808
stdin: handler.stdin,
809
halt_on_error,
810
});
811
812
exec.on("output", (mesg) => {
813
// uncomment only for specific low level debugging -- see https://github.com/sagemathinc/cocalc/issues/7022
814
// dbg(`got mesg='${JSON.stringify(mesg)}'`); // !!!☡ ☡ ☡ -- EXTREME DANGER ☡ ☡ ☡ !!!!
815
816
if (mesg == null) {
817
// can't possibly happen, of course.
818
const err = "empty mesg";
819
dbg(`got error='${err}'`);
820
handler.error(err);
821
return;
822
}
823
if (mesg.done) {
824
// done is a special internal cocalc message.
825
handler.done();
826
return;
827
}
828
if (mesg.content?.transient?.display_id != null) {
829
// See https://github.com/sagemathinc/cocalc/issues/2132
830
// We find any other outputs in the document with
831
// the same transient.display_id, and set their output to
832
// this mesg's output.
833
this.handleTransientUpdate(mesg);
834
if (mesg.msg_type == "update_display_data") {
835
// don't also create a new output
836
return;
837
}
838
}
839
840
if (mesg.msg_type === "clear_output") {
841
handler.clear(mesg.content.wait);
842
return;
843
}
844
845
if (mesg.content.comm_id != null) {
846
// ignore any comm/widget related messages
847
return;
848
}
849
850
if (mesg.content.execution_state === "idle") {
851
this.store.removeListener("cell_change", cell_change);
852
return;
853
}
854
if (mesg.content.execution_state === "busy") {
855
handler.start();
856
}
857
if (mesg.content.payload != null) {
858
if (mesg.content.payload.length > 0) {
859
// payload shell message:
860
// Despite https://ipython.org/ipython-doc/3/development/messaging.html#payloads saying
861
// ""Payloads are considered deprecated, though their replacement is not yet implemented."
862
// we fully have to implement them, since they are used to implement (crazy, IMHO)
863
// things like %load in the python2 kernel!
864
mesg.content.payload.map((p) => handler.payload(p));
865
return;
866
}
867
} else {
868
// Normal iopub output message
869
handler.message(mesg.content);
870
return;
871
}
872
});
873
874
exec.on("error", (err) => {
875
dbg(`got error='${err}'`);
876
handler.error(err);
877
});
878
};
879
880
reset_more_output = (id: any) => {
881
if (id == null) {
882
delete this.store._more_output;
883
}
884
if (
885
(this.store._more_output != null
886
? this.store._more_output[id]
887
: undefined) != null
888
) {
889
return delete this.store._more_output[id];
890
}
891
};
892
893
set_more_output = (id: any, mesg: any, length: any): void => {
894
if (this.store._more_output == null) {
895
this.store._more_output = {};
896
}
897
const output =
898
this.store._more_output[id] != null
899
? this.store._more_output[id]
900
: (this.store._more_output[id] = {
901
length: 0,
902
messages: [],
903
lengths: [],
904
discarded: 0,
905
truncated: 0,
906
});
907
908
output.length += length;
909
output.lengths.push(length);
910
output.messages.push(mesg);
911
912
const goal_length = 10 * this.store.get("max_output_length");
913
while (output.length > goal_length) {
914
let need: any;
915
let did_truncate = false;
916
917
// check if there is a text field, which we can truncate
918
let len =
919
output.messages[0].text != null
920
? output.messages[0].text.length
921
: undefined;
922
if (len != null) {
923
need = output.length - goal_length + 50;
924
if (len > need) {
925
// Instead of throwing this message away, let's truncate its text part. After
926
// doing this, the message is at least need shorter than it was before.
927
output.messages[0].text = misc.trunc(
928
output.messages[0].text,
929
len - need,
930
);
931
did_truncate = true;
932
}
933
}
934
935
// check if there is a text/plain field, which we can thus also safely truncate
936
if (!did_truncate && output.messages[0].data != null) {
937
for (const field in output.messages[0].data) {
938
if (field === "text/plain") {
939
const val = output.messages[0].data[field];
940
len = val.length;
941
if (len != null) {
942
need = output.length - goal_length + 50;
943
if (len > need) {
944
// Instead of throwing this message away, let's truncate its text part. After
945
// doing this, the message is at least need shorter than it was before.
946
output.messages[0].data[field] = misc.trunc(val, len - need);
947
did_truncate = true;
948
}
949
}
950
}
951
}
952
}
953
954
if (did_truncate) {
955
const new_len = JSON.stringify(output.messages[0]).length;
956
output.length -= output.lengths[0] - new_len; // how much we saved
957
output.lengths[0] = new_len;
958
output.truncated += 1;
959
break;
960
}
961
962
const n = output.lengths.shift();
963
output.messages.shift();
964
output.length -= n;
965
output.discarded += 1;
966
}
967
};
968
969
private init_file_watcher() {
970
const dbg = this.dbg("file_watcher");
971
dbg();
972
this._file_watcher = this._client.watch_file({
973
path: this.store.get("path"),
974
debounce: 1000,
975
});
976
977
this._file_watcher.on("change", async () => {
978
if (!this.isCellRunner()) {
979
return;
980
}
981
dbg("change");
982
try {
983
await this.loadFromDiskIfNewer();
984
} catch (err) {
985
dbg("failed to load on change", err);
986
}
987
});
988
}
989
990
/*
991
* Unfortunately, though I spent two hours on this approach... it just doesn't work,
992
* since, e.g., if the sync file doesn't already exist, it can't be created,
993
* which breaks everything. So disabling for now and re-opening the issue.
994
_sync_file_mode: =>
995
dbg = @dbg("_sync_file_mode"); dbg()
996
* Make the mode of the syncdb file the same as the mode of the .ipynb file.
997
* This is used for read-only status.
998
ipynb_file = @store.get('path')
999
locals =
1000
ipynb_file_ro : undefined
1001
syncdb_file_ro : undefined
1002
syncdb_file = @syncdb.get_path()
1003
async.parallel([
1004
(cb) ->
1005
fs.access ipynb_file, fs.constants.W_OK, (err) ->
1006
* Also store in @_ipynb_file_ro to prevent starting kernel in this case.
1007
@_ipynb_file_ro = locals.ipynb_file_ro = !!err
1008
cb()
1009
(cb) ->
1010
fs.access syncdb_file, fs.constants.W_OK, (err) ->
1011
locals.syncdb_file_ro = !!err
1012
cb()
1013
], ->
1014
if locals.ipynb_file_ro == locals.syncdb_file_ro
1015
return
1016
dbg("mode change")
1017
async.parallel([
1018
(cb) ->
1019
fs.stat ipynb_file, (err, stats) ->
1020
locals.ipynb_stats = stats
1021
cb(err)
1022
(cb) ->
1023
* error if syncdb_file doesn't exist, which is GOOD, since
1024
* in that case we do not want to chmod which would create
1025
* that file as empty and blank it.
1026
fs.stat(syncdb_file, cb)
1027
], (err) ->
1028
if not err
1029
dbg("changing syncb mode to match ipynb mode")
1030
fs.chmod(syncdb_file, locals.ipynb_stats.mode)
1031
else
1032
dbg("error stating ipynb", err)
1033
)
1034
)
1035
*/
1036
1037
// Load file from disk if it is newer than
1038
// the last we saved to disk.
1039
private loadFromDiskIfNewer = async () => {
1040
const dbg = this.dbg("loadFromDiskIfNewer");
1041
// Get mtime of last .ipynb file that we explicitly saved.
1042
1043
// TODO: breaking the syncdb typescript data hiding. The
1044
// right fix will be to move
1045
// this info to a new ephemeral state table.
1046
const last_ipynb_save = await this.get_last_ipynb_save();
1047
dbg(`syncdb last_ipynb_save=${last_ipynb_save}`);
1048
let file_changed;
1049
if (last_ipynb_save == 0) {
1050
// we MUST load from file the first time, of course.
1051
file_changed = true;
1052
dbg("file changed because FIRST TIME");
1053
} else {
1054
const path = this.store.get("path");
1055
let stats;
1056
try {
1057
stats = await callback2(this._client.path_stat, { path });
1058
dbg(`stats.mtime = ${stats.mtime}`);
1059
} catch (err) {
1060
// This err just means the file doesn't exist.
1061
// We set the 'last load' to now in this case, since
1062
// the frontend clients need to know that we
1063
// have already scanned the disk.
1064
this.set_last_load();
1065
return;
1066
}
1067
const mtime = stats.mtime.getTime();
1068
file_changed = mtime > last_ipynb_save;
1069
dbg({ mtime, last_ipynb_save });
1070
}
1071
if (file_changed) {
1072
dbg(".ipynb disk file changed ==> loading state from disk");
1073
try {
1074
await this.load_ipynb_file();
1075
} catch (err) {
1076
dbg("failed to load on change", err);
1077
}
1078
} else {
1079
dbg("disk file NOT changed: NOT loading");
1080
}
1081
};
1082
1083
// if also set load is true, we also set the "last_ipynb_save" time.
1084
set_last_load = (alsoSetLoad: boolean = false) => {
1085
const last_load = new Date().getTime();
1086
this.syncdb.set({
1087
type: "file",
1088
last_load,
1089
});
1090
if (alsoSetLoad) {
1091
// yes, load v save is inconsistent!
1092
this.syncdb.set({ type: "settings", last_ipynb_save: last_load });
1093
}
1094
this.syncdb.commit();
1095
};
1096
1097
/* Determine timestamp of aux .ipynb file, and record it here,
1098
so we know that we do not have to load exactly that file
1099
back from disk. */
1100
private set_last_ipynb_save = async () => {
1101
let stats;
1102
try {
1103
stats = await callback2(this._client.path_stat, {
1104
path: this.store.get("path"),
1105
});
1106
} catch (err) {
1107
// no-op -- nothing to do.
1108
this.dbg("set_last_ipynb_save")(`WARNING -- issue in path_stat ${err}`);
1109
return;
1110
}
1111
1112
// This is ugly (i.e., how we get access), but I need to get this done.
1113
// This is the RIGHT place to save the info though.
1114
// TODO: move this state info to new ephemeral table.
1115
try {
1116
const last_ipynb_save = stats.mtime.getTime();
1117
this.last_ipynb_save = last_ipynb_save;
1118
this._set({
1119
type: "settings",
1120
last_ipynb_save,
1121
});
1122
this.dbg("stats.mtime.getTime()")(
1123
`set_last_ipynb_save = ${last_ipynb_save}`,
1124
);
1125
} catch (err) {
1126
this.dbg("set_last_ipynb_save")(
1127
`WARNING -- issue in set_last_ipynb_save ${err}`,
1128
);
1129
return;
1130
}
1131
};
1132
1133
private get_last_ipynb_save = async () => {
1134
const x =
1135
this.syncdb.get_one({ type: "settings" })?.get("last_ipynb_save") ?? 0;
1136
return Math.max(x, this.last_ipynb_save);
1137
};
1138
1139
load_ipynb_file = async () => {
1140
/*
1141
Read the ipynb file from disk. Fully use the ipynb file to
1142
set the syncdb's state. We do this when opening a new file, or when
1143
the file changes on disk (e.g., a git checkout or something).
1144
*/
1145
const dbg = this.dbg(`load_ipynb_file`);
1146
dbg("reading file");
1147
const path = this.store.get("path");
1148
let content: string;
1149
try {
1150
content = await callback2(this._client.path_read, {
1151
path,
1152
maxsize_MB: 50,
1153
});
1154
} catch (err) {
1155
// possibly file doesn't exist -- set notebook to empty.
1156
const exists = await callback2(this._client.path_exists, {
1157
path,
1158
});
1159
if (!exists) {
1160
content = "";
1161
} else {
1162
// It would be better to have a button to push instead of
1163
// suggesting running a command in the terminal, but
1164
// adding that took 1 second. Better than both would be
1165
// making it possible to edit huge files :-).
1166
const error = `Error reading ipynb file '${path}': ${err.toString()}. Fix this to continue. You can delete all output by typing cc-jupyter-no-output [filename].ipynb in a terminal.`;
1167
this.syncdb.set({ type: "fatal", error });
1168
throw Error(error);
1169
}
1170
}
1171
if (content.length === 0) {
1172
// Blank file, e.g., when creating in CoCalc.
1173
// This is good, works, etc. -- just clear state, including error.
1174
this.syncdb.delete();
1175
this.set_last_load(true);
1176
return;
1177
}
1178
1179
// File is nontrivial -- parse and load.
1180
let parsed_content;
1181
try {
1182
parsed_content = JSON.parse(content);
1183
} catch (err) {
1184
const error = `Error parsing the ipynb file '${path}': ${err}. You must fix the ipynb file somehow before continuing.`;
1185
dbg(error);
1186
this.syncdb.set({ type: "fatal", error });
1187
throw Error(error);
1188
}
1189
this.syncdb.delete({ type: "fatal" });
1190
await this.set_to_ipynb(parsed_content);
1191
this.set_last_load(true);
1192
};
1193
1194
save_ipynb_file = async () => {
1195
const dbg = this.dbg("save_ipynb_file");
1196
if (!this.isCellRunner()) {
1197
dbg("not cell runner, so NOT saving ipynb file to disk");
1198
return;
1199
}
1200
dbg("saving to file");
1201
1202
// Check first if file was deleted, in which case instead of saving to disk,
1203
// we should terminate and clean up everything.
1204
if (this.isDeleted()) {
1205
dbg("ipynb file is deleted, so NOT saving to disk and closing");
1206
this.close({ noSave: true });
1207
return;
1208
}
1209
1210
if (this.jupyter_kernel == null) {
1211
// The kernel is needed to get access to the blob store, which
1212
// may be needed to save to disk.
1213
this.ensure_backend_kernel_setup();
1214
if (this.jupyter_kernel == null) {
1215
// still not null? This would happen if no kernel is set at all,
1216
// in which case it's OK that saving isn't possible.
1217
throw Error("no kernel so cannot save");
1218
}
1219
}
1220
if (this.store.get("kernels") == null) {
1221
await this.init_kernel_info();
1222
if (this.store.get("kernels") == null) {
1223
// This should never happen, but maybe could in case of a very
1224
// messed up compute environment where the kernelspecs can't be listed.
1225
throw Error(
1226
"kernel info not known and can't be determined, so can't save",
1227
);
1228
}
1229
}
1230
dbg("going to try to save: getting ipynb object...");
1231
const blob_store = this.jupyter_kernel.get_blob_store();
1232
let ipynb = this.store.get_ipynb(blob_store);
1233
if (this.store.get("kernel")) {
1234
// if a kernel is set, check that it was sufficiently known that
1235
// we can fill in data about it -- see https://github.com/sagemathinc/cocalc/issues/7286
1236
if (ipynb?.metadata?.kernelspec?.name == null) {
1237
dbg("kernelspec not known -- try loading kernels again");
1238
await this.fetch_jupyter_kernels();
1239
// and again grab the ipynb
1240
ipynb = this.store.get_ipynb(blob_store);
1241
if (ipynb?.metadata?.kernelspec?.name == null) {
1242
dbg("kernelspec STILL not known: metadata will be incomplete");
1243
}
1244
}
1245
}
1246
dbg("got ipynb object");
1247
// We use json_stable (and indent 1) to be more diff friendly to user,
1248
// and more consistent with official Jupyter.
1249
const data = json_stable(ipynb, { space: 1 });
1250
if (data == null) {
1251
dbg("failed -- ipynb not defined yet");
1252
throw Error("ipynb not defined yet; can't save");
1253
}
1254
dbg("converted ipynb to stable JSON string", data?.length);
1255
//dbg(`got string version '${data}'`)
1256
try {
1257
dbg("writing to disk...");
1258
await callback2(this._client.write_file, {
1259
path: this.store.get("path"),
1260
data,
1261
});
1262
dbg("succeeded at saving");
1263
await this.set_last_ipynb_save();
1264
} catch (err) {
1265
const e = `error writing file: ${err}`;
1266
dbg(e);
1267
throw Error(e);
1268
}
1269
};
1270
1271
ensure_there_is_a_cell = () => {
1272
if (this._state !== "ready") {
1273
return;
1274
}
1275
const cells = this.store.get("cells");
1276
if (cells == null || (cells.size === 0 && this.isCellRunner())) {
1277
this._set({
1278
type: "cell",
1279
id: this.new_id(),
1280
pos: 0,
1281
input: "",
1282
});
1283
// We are obviously contributing content to this (empty!) notebook.
1284
return this.set_trust_notebook(true);
1285
}
1286
};
1287
1288
private handle_all_cell_attachments() {
1289
// Check if any cell attachments need to be loaded.
1290
const cells = this.store.get("cells");
1291
cells?.forEach((cell) => {
1292
this.handle_cell_attachments(cell);
1293
});
1294
}
1295
1296
private handle_cell_attachments(cell) {
1297
if (this.jupyter_kernel == null) {
1298
// can't do anything
1299
return;
1300
}
1301
const dbg = this.dbg(`handle_cell_attachments(id=${cell.get("id")})`);
1302
dbg();
1303
1304
const attachments = cell.get("attachments");
1305
if (attachments == null) return; // nothing to do
1306
attachments.forEach(async (x, name) => {
1307
if (x == null) return;
1308
if (x.get("type") === "load") {
1309
if (this.jupyter_kernel == null) return; // try later
1310
// need to load from disk
1311
this.set_cell_attachment(cell.get("id"), name, {
1312
type: "loading",
1313
value: null,
1314
});
1315
let sha1: string;
1316
try {
1317
sha1 = await this.jupyter_kernel.load_attachment(x.get("value"));
1318
} catch (err) {
1319
this.set_cell_attachment(cell.get("id"), name, {
1320
type: "error",
1321
value: `${err}`,
1322
});
1323
return;
1324
}
1325
this.set_cell_attachment(cell.get("id"), name, {
1326
type: "sha1",
1327
value: sha1,
1328
});
1329
}
1330
});
1331
}
1332
1333
// handle_ipywidgets_state_change is called when the project ipywidgets_state
1334
// object changes, e.g., in response to a user moving a slider in the browser.
1335
// It crafts a comm message that is sent to the running Jupyter kernel telling
1336
// it about this change by calling send_comm_message_to_kernel.
1337
private handle_ipywidgets_state_change(keys): void {
1338
if (this.is_closed()) {
1339
return;
1340
}
1341
const dbg = this.dbg("handle_ipywidgets_state_change");
1342
dbg(keys);
1343
if (this.jupyter_kernel == null) {
1344
dbg("no kernel, so ignoring changes to ipywidgets");
1345
return;
1346
}
1347
if (this.syncdb.ipywidgets_state == null) {
1348
throw Error("syncdb's ipywidgets_state must be defined!");
1349
}
1350
for (const key of keys) {
1351
const [, model_id, type] = JSON.parse(key);
1352
dbg({ key, model_id, type });
1353
let data: any;
1354
if (type === "value") {
1355
const state = this.syncdb.ipywidgets_state.get_model_value(model_id);
1356
// Saving the buffers on change is critical since otherwise this breaks:
1357
// https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20List.html#file-upload
1358
// Note that stupidly the buffer (e.g., image upload) gets sent to the kernel twice.
1359
// But it does work robustly, and the kernel and nodejs server processes next to each
1360
// other so this isn't so bad.
1361
const { buffer_paths, buffers } =
1362
this.syncdb.ipywidgets_state.getKnownBuffers(model_id);
1363
data = { method: "update", state, buffer_paths };
1364
this.jupyter_kernel.send_comm_message_to_kernel({
1365
msg_id: misc.uuid(),
1366
target_name: "jupyter.widget",
1367
comm_id: model_id,
1368
data,
1369
buffers,
1370
});
1371
} else if (type === "buffers") {
1372
// TODO: we MIGHT need implement this... but MAYBE NOT. An example where this seems like it might be
1373
// required is by the file upload widget, but actually that just uses the value type above, since
1374
// we explicitly fill in the widgets there; also there is an explicit comm upload message that
1375
// the widget sends out that updates the buffer, and in send_comm_message_to_kernel in jupyter/kernel/kernel.ts
1376
// when processing that message, we saves those buffers and make sure they are set in the
1377
// value case above (otherwise they would get removed).
1378
// https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20List.html#file-upload
1379
// which creates a buffer from the content of the file, then sends it to the backend,
1380
// which sees a change and has to write that buffer to the kernel (here) so that
1381
// the running python process can actually do something with the file contents (e.g.,
1382
// process data, save file to disk, etc).
1383
// We need to be careful though to not send buffers to the kernel that the kernel sent us,
1384
// since that would be a waste.
1385
} else if (type === "state") {
1386
// TODO: currently ignoring this, since it seems chatty and pointless,
1387
// and could lead to race conditions probably with multiple users, etc.
1388
// It happens right when the widget is created.
1389
/*
1390
const state = this.syncdb.ipywidgets_state.getModelSerializedState(model_id);
1391
data = { method: "update", state };
1392
this.jupyter_kernel.send_comm_message_to_kernel(
1393
misc.uuid(),
1394
model_id,
1395
data
1396
);
1397
*/
1398
} else {
1399
throw Error(`invalid synctable state -- unknown type '${type}'`);
1400
}
1401
}
1402
}
1403
1404
public async process_comm_message_from_kernel(mesg: any): Promise<void> {
1405
const dbg = this.dbg("process_comm_message_from_kernel");
1406
// serializing the full message could cause enormous load on the server, since
1407
// the mesg may contain large buffers. Only do for low level debugging!
1408
// dbg(mesg); // EXTREME DANGER!
1409
// This should be safe:
1410
dbg(JSON.stringify(mesg.header));
1411
if (this.syncdb.ipywidgets_state == null) {
1412
throw Error("syncdb's ipywidgets_state must be defined!");
1413
}
1414
await this.syncdb.ipywidgets_state.process_comm_message_from_kernel(mesg);
1415
}
1416
1417
public capture_output_message(mesg: any): boolean {
1418
if (this.syncdb.ipywidgets_state == null) {
1419
throw Error("syncdb's ipywidgets_state must be defined!");
1420
}
1421
return this.syncdb.ipywidgets_state.capture_output_message(mesg);
1422
}
1423
1424
public close_project_only() {
1425
const dbg = this.dbg("close_project_only");
1426
dbg();
1427
if (this.run_all_loop) {
1428
this.run_all_loop.close();
1429
delete this.run_all_loop;
1430
}
1431
// this stops the kernel and cleans everything up
1432
// so no resources are wasted and next time starting
1433
// is clean
1434
(async () => {
1435
try {
1436
await removeJupyterRedux(this.store.get("path"), this.project_id);
1437
} catch (err) {
1438
dbg("WARNING -- issue removing jupyter redux", err);
1439
}
1440
})();
1441
}
1442
1443
// not actually async...
1444
public async signal(signal = "SIGINT"): Promise<void> {
1445
this.jupyter_kernel?.signal(signal);
1446
}
1447
1448
public handle_nbconvert_change(oldVal, newVal): void {
1449
nbconvertChange(this, oldVal?.toJS(), newVal?.toJS());
1450
}
1451
1452
protected isCellRunner = (): boolean => {
1453
if (this.is_closed()) {
1454
// it's closed, so obviously not the cell runner.
1455
return false;
1456
}
1457
const dbg = this.dbg("isCellRunner");
1458
let id;
1459
try {
1460
id = this.getComputeServerId();
1461
} catch (_) {
1462
// normal since debounced,
1463
// and anyways if anything like syncdb that getComputeServerId
1464
// depends on doesn't work, then we are clearly
1465
// not the cell runner
1466
return false;
1467
}
1468
dbg("id = ", id);
1469
if (id == 0 && this.is_project) {
1470
dbg("yes we are the cell runner (the project)");
1471
// when no remote compute servers are configured, the project is
1472
// responsible for evaluating code.
1473
return true;
1474
}
1475
if (this.is_compute_server) {
1476
// a remote compute server is supposed to be responsible. Are we it?
1477
try {
1478
const myId = decodeUUIDtoNum(this.syncdb.client_id());
1479
const isRunner = myId == id;
1480
dbg(isRunner ? "Yes, we are cell runner" : "NOT cell runner");
1481
return isRunner;
1482
} catch (err) {
1483
dbg(err);
1484
}
1485
}
1486
dbg("NO we are not the cell runner");
1487
return false;
1488
};
1489
1490
private lastComputeServerId = 0;
1491
private checkForComputeServerStateChange = (client_id) => {
1492
if (this.is_closed()) {
1493
return;
1494
}
1495
if (!isEncodedNumUUID(client_id)) {
1496
return;
1497
}
1498
const id = this.getComputeServerId();
1499
if (id != this.lastComputeServerId) {
1500
// reset all run state
1501
this.halt();
1502
this.clear_all_cell_run_state();
1503
}
1504
this.lastComputeServerId = id;
1505
};
1506
1507
/*
1508
WebSocket API
1509
1510
1. Handles api requests from the user via the generic websocket message channel
1511
provided by the syncdb.
1512
1513
2. In case a remote compute server connects and registers to handle api messages,
1514
then those are proxied to the remote server, handled there, and proxied back.
1515
*/
1516
1517
private initWebsocketApi = () => {
1518
if (this.is_project) {
1519
// only the project receives these messages from clients.
1520
this.syncdb.on("message", this.handleMessageFromClient);
1521
} else if (this.is_compute_server) {
1522
// compute servers receive messages from the project,
1523
// proxying an api request from a client.
1524
this.syncdb.on("message", this.handleMessageFromProject);
1525
}
1526
};
1527
1528
private remoteApiHandler: null | {
1529
spark: any; // the spark channel connection between project and compute server
1530
id: number; // this is a sequential id used for request/response pairing
1531
// when get response from computer server, one of these callbacks gets called:
1532
responseCallbacks: { [id: number]: (err: any, response: any) => void };
1533
} = null;
1534
1535
private handleMessageFromClient = async ({ data, spark }) => {
1536
// This is call in the project to handle api requests.
1537
// It either handles them directly, or if there is a remote
1538
// compute server, it forwards them to the remote compute server,
1539
// then proxies the response back to the client.
1540
1541
const dbg = this.dbg("handleMessageFromClient");
1542
dbg();
1543
// WARNING: potentially very verbose
1544
dbg(data);
1545
switch (data.event) {
1546
case "register-to-handle-api": {
1547
if (this.remoteApiHandler?.spark?.id == spark.id) {
1548
dbg(
1549
"register-to-handle-api -- it's the current one so nothing to do",
1550
);
1551
return;
1552
}
1553
if (this.remoteApiHandler?.spark != null) {
1554
dbg("register-to-handle-api -- remove existing handler");
1555
this.remoteApiHandler.spark.removeAllListeners();
1556
this.remoteApiHandler.spark.end();
1557
this.remoteApiHandler = null;
1558
}
1559
// a compute server client is volunteering to handle all api requests until they disconnect
1560
this.remoteApiHandler = { spark, id: 0, responseCallbacks: {} };
1561
dbg("register-to-handle-api -- spark.id = ", spark.id);
1562
spark.on("end", () => {
1563
dbg(
1564
"register-to-handle-api -- spark ended, spark.id = ",
1565
spark.id,
1566
" and this.remoteApiHandler?.spark.id=",
1567
this.remoteApiHandler?.spark.id,
1568
);
1569
if (this.remoteApiHandler?.spark.id == spark.id) {
1570
this.remoteApiHandler = null;
1571
}
1572
});
1573
return;
1574
}
1575
1576
case "api-request": {
1577
// browser client made an api request. This will get handled
1578
// either locally or via a remote compute server, depending on
1579
// whether this.remoteApiHandler is set (via the
1580
// register-to-handle-api event above).
1581
const response = await this.handleApiRequest(data);
1582
spark.write({
1583
event: "message",
1584
data: { event: "api-response", response, id: data.id },
1585
});
1586
return;
1587
}
1588
1589
case "api-response": {
1590
// handling api request that we proxied to a remote compute server.
1591
// We are handling the response from the remote compute server.
1592
if (this.remoteApiHandler == null) {
1593
dbg("WARNING: api-response event but there is no remote api handler");
1594
// api-response event can't be handled because no remote api handler is registered
1595
// This should only happen if the requesting spark just disconnected, so there's no way to
1596
// responsd anyways.
1597
return;
1598
}
1599
const cb = this.remoteApiHandler.responseCallbacks[data.id];
1600
if (cb != null) {
1601
delete this.remoteApiHandler.responseCallbacks[data.id];
1602
cb(undefined, data);
1603
} else {
1604
dbg("WARNING: api-response event for unknown id", data.id);
1605
}
1606
return;
1607
}
1608
1609
case "save-blob-to-project": {
1610
if (!this.is_project) {
1611
throw Error(
1612
"message save-blob-to-project should only be sent to the project",
1613
);
1614
}
1615
// A compute server sent the project a blob to store
1616
// in the local blob store.
1617
const blobStore = await get_blob_store();
1618
blobStore.save(data.data, data.type, data.ipynb);
1619
return;
1620
}
1621
1622
default: {
1623
// unknown event so send back error
1624
spark.write({
1625
event: "message",
1626
data: {
1627
event: "error",
1628
message: `unknown event ${data.event}`,
1629
id: data.id,
1630
},
1631
});
1632
}
1633
}
1634
};
1635
1636
// this should only be called on a compute server.
1637
public saveBlobToProject = (data: string, type: string, ipynb?: string) => {
1638
if (!this.is_compute_server) {
1639
throw Error(
1640
"saveBlobToProject should only be called on a compute server",
1641
);
1642
}
1643
const dbg = this.dbg("saveBlobToProject");
1644
if (this.is_closed()) {
1645
dbg("called AFTER closed");
1646
return;
1647
}
1648
// This is call on a compute server whenever something is
1649
// written to its local blob store. TODO: We do not wait for
1650
// confirmation that blob was sent yet though.
1651
dbg();
1652
this.syncdb.sendMessageToProject({
1653
event: "save-blob-to-project",
1654
data,
1655
type,
1656
ipynb,
1657
});
1658
};
1659
1660
private handleMessageFromProject = async (data) => {
1661
const dbg = this.dbg("handleMessageFromProject");
1662
if (this.is_closed()) {
1663
dbg("called AFTER closed");
1664
return;
1665
}
1666
// This is call on the remote compute server to handle api requests.
1667
dbg();
1668
// output could be very BIG:
1669
// dbg(data);
1670
if (data.event == "api-request") {
1671
const response = await this.handleApiRequest(data.request);
1672
try {
1673
await this.syncdb.sendMessageToProject({
1674
event: "api-response",
1675
id: data.id,
1676
response,
1677
});
1678
} catch (err) {
1679
// this happens when the websocket is disconnected
1680
dbg(`WARNING -- issue responding to message ${err}`);
1681
}
1682
return;
1683
}
1684
};
1685
1686
private handleApiRequest = async (data) => {
1687
if (this.remoteApiHandler != null) {
1688
return await this.handleApiRequestViaRemoteApiHandler(data);
1689
}
1690
const dbg = this.dbg("handleApiRequest");
1691
const { path, endpoint, query } = data;
1692
dbg("handling request in project", path);
1693
try {
1694
return await handleApiRequest(path, endpoint, query);
1695
} catch (err) {
1696
dbg("error -- ", err.message);
1697
return { event: "error", message: err.message };
1698
}
1699
};
1700
1701
private handleApiRequestViaRemoteApiHandler = async (data) => {
1702
const dbg = this.dbg("handleApiRequestViaRemoteApiHandler");
1703
dbg(data?.path);
1704
try {
1705
if (!this.is_project) {
1706
throw Error("BUG -- remote api requests only make sense in a project");
1707
}
1708
if (this.remoteApiHandler == null) {
1709
throw Error("BUG -- remote api handler not registered");
1710
}
1711
// Send a message to the remote asking it to handle this api request,
1712
// which calls the function handleMessageFromProject from above in that remote process.
1713
const { id, spark, responseCallbacks } = this.remoteApiHandler;
1714
spark.write({
1715
event: "message",
1716
data: { event: "api-request", request: data, id },
1717
});
1718
const waitForResponse = (cb) => {
1719
responseCallbacks[id] = cb;
1720
};
1721
this.remoteApiHandler.id += 1; // increment sequential protocol message tracker id
1722
return (await callback(waitForResponse)).response;
1723
} catch (err) {
1724
dbg("error -- ", err.message);
1725
return { event: "error", message: err.message };
1726
}
1727
};
1728
1729
// Handle transient cell messages.
1730
handleTransientUpdate = (mesg) => {
1731
const display_id = mesg.content?.transient?.display_id;
1732
if (!display_id) {
1733
return false;
1734
}
1735
1736
let matched = false;
1737
// are there any transient outputs in the entire document that
1738
// have this display_id? search to find them.
1739
// TODO: we could use a clever data structure to make
1740
// this faster and more likely to have bugs.
1741
const cells = this.syncdb.get({ type: "cell" });
1742
for (let cell of cells) {
1743
let output = cell.get("output");
1744
if (output != null) {
1745
for (const [n, val] of output) {
1746
if (val.getIn(["transient", "display_id"]) == display_id) {
1747
// found a match -- replace it
1748
output = output.set(n, immutable.fromJS(mesg.content));
1749
this.syncdb.set({ type: "cell", id: cell.get("id"), output });
1750
matched = true;
1751
}
1752
}
1753
}
1754
}
1755
if (matched) {
1756
this.syncdb.commit();
1757
}
1758
};
1759
// End Websocket API
1760
}
1761
1762