CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/jupyter/redux/project-actions.ts
Views: 923
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
project-actions: additional actions that are only available in the
8
backend/project, which "manages" everything.
9
10
This code should not *explicitly* require anything that is only
11
available in the project or requires node to run, so that we can
12
fully unit test it via mocking of components.
13
14
NOTE: this is also now the actions used by remote compute servers as well.
15
*/
16
17
import { get_kernel_data } from "@cocalc/jupyter/kernel/kernel-data";
18
import * as immutable from "immutable";
19
import json_stable from "json-stable-stringify";
20
import { debounce } from "lodash";
21
import { JupyterActions as JupyterActions0 } from "@cocalc/jupyter/redux/actions";
22
import { callback2, once } from "@cocalc/util/async-utils";
23
import * as misc from "@cocalc/util/misc";
24
import { OutputHandler } from "@cocalc/jupyter/execute/output-handler";
25
import { RunAllLoop } from "./run-all-loop";
26
import nbconvertChange from "./handle-nbconvert-change";
27
import type { ClientFs } from "@cocalc/sync/client/types";
28
import { kernel as createJupyterKernel } from "@cocalc/jupyter/kernel";
29
import {
30
decodeUUIDtoNum,
31
isEncodedNumUUID,
32
} from "@cocalc/util/compute/manager";
33
import { handleApiRequest } from "@cocalc/jupyter/kernel/websocket-api";
34
import { callback } from "awaiting";
35
import { get_blob_store } from "@cocalc/jupyter/blobs";
36
import { removeJupyterRedux } from "@cocalc/jupyter/kernel";
37
38
// see https://github.com/sagemathinc/cocalc/issues/8060
39
const MAX_OUTPUT_SAVE_DELAY = 30000;
40
41
type BackendState = "init" | "ready" | "spawning" | "starting" | "running";
42
43
export class JupyterActions extends JupyterActions0 {
44
private _backend_state: BackendState = "init";
45
private _initialize_manager_already_done: any;
46
private _kernel_state: any;
47
private _manager_run_cell_queue: any;
48
private _running_cells: { [id: string]: string };
49
private _throttled_ensure_positions_are_unique: any;
50
private run_all_loop?: RunAllLoop;
51
private clear_kernel_error?: any;
52
private running_manager_run_cell_process_queue: boolean = false;
53
private last_ipynb_save: number = 0;
54
protected _client: ClientFs; // this has filesystem access, etc.
55
56
public run_cell(
57
id: string,
58
save: boolean = true,
59
no_halt: boolean = false,
60
): void {
61
if (this.store.get("read_only")) return;
62
const cell = this.store.getIn(["cells", id]);
63
if (cell == null) {
64
// it is trivial to run a cell that does not exist -- nothing needs to be done.
65
return;
66
}
67
const cell_type = cell.get("cell_type", "code");
68
if (cell_type == "code") {
69
// when the backend is running code, just don't worry about
70
// trying to parse things like "foo?" out. We can't do
71
// it without CodeMirror, and it isn't worth it for that
72
// application.
73
this.run_code_cell(id, save, no_halt);
74
}
75
if (save) {
76
this.save_asap();
77
}
78
}
79
80
private set_backend_state(backend_state: BackendState): void {
81
this.dbg("set_backend_state")(backend_state);
82
83
/*
84
The backend states, which are put in the syncdb so clients
85
can display this:
86
87
- 'init' -- the backend is checking the file on disk, etc.
88
- 'ready' -- the backend is setup and ready to use; kernel isn't running though
89
- 'starting' -- the kernel itself is actived and currently starting up (e.g., Sage is starting up)
90
- 'running' -- the kernel is running and ready to evaluate code
91
92
93
'init' --> 'ready' --> 'spawning' --> 'starting' --> 'running'
94
/|\ |
95
|-----------------------------------------|
96
97
Going from ready to starting happens first when a code execution is requested.
98
*/
99
100
// Check just in case Typescript doesn't catch something:
101
if (
102
["init", "ready", "spawning", "starting", "running"].indexOf(
103
backend_state,
104
) === -1
105
) {
106
throw Error(`invalid backend state '${backend_state}'`);
107
}
108
if (backend_state == "init" && this._backend_state != "init") {
109
// Do NOT allow changing the state to init from any other state.
110
throw Error(
111
`illegal state change '${this._backend_state}' --> '${backend_state}'`,
112
);
113
}
114
this._backend_state = backend_state;
115
116
if (this.isCellRunner()) {
117
const stored_backend_state = this.syncdb
118
.get_one({ type: "settings" })
119
?.get("backend_state");
120
121
if (stored_backend_state != backend_state) {
122
this._set({
123
type: "settings",
124
backend_state,
125
last_backend_state: Date.now(),
126
});
127
this.save_asap();
128
}
129
130
// The following is to clear kernel_error if things are working only.
131
if (backend_state == "running") {
132
// clear kernel error if kernel successfully starts and stays
133
// in running state for a while.
134
this.clear_kernel_error = setTimeout(() => {
135
this._set({
136
type: "settings",
137
kernel_error: "",
138
});
139
}, 3000);
140
} else {
141
// change to a different state; cancel attempt to clear kernel error
142
if (this.clear_kernel_error) {
143
clearTimeout(this.clear_kernel_error);
144
delete this.clear_kernel_error;
145
}
146
}
147
}
148
}
149
150
set_kernel_state = (state: any, save = false) => {
151
if (!this.isCellRunner()) return;
152
this._kernel_state = state;
153
this._set({ type: "settings", kernel_state: state }, save);
154
};
155
156
// Called exactly once when the manager first starts up after the store is initialized.
157
// Here we ensure everything is in a consistent state so that we can react
158
// to changes later.
159
async initialize_manager() {
160
if (this._initialize_manager_already_done) {
161
return;
162
}
163
const dbg = this.dbg("initialize_manager");
164
dbg();
165
this._initialize_manager_already_done = true;
166
167
this.sync_exec_state = debounce(this.sync_exec_state, 2000);
168
this._throttled_ensure_positions_are_unique = debounce(
169
this.ensure_positions_are_unique,
170
5000,
171
);
172
// Listen for changes...
173
this.syncdb.on("change", this._backend_syncdb_change.bind(this));
174
175
this.setState({
176
// used by the kernel_info function of this.jupyter_kernel
177
start_time: this._client.server_time().valueOf(),
178
});
179
180
// clear nbconvert start on init, since no nbconvert can be running yet
181
this.syncdb.delete({ type: "nbconvert" });
182
183
// Initialize info about available kernels, which is used e.g., for
184
// saving to ipynb format.
185
this.init_kernel_info();
186
187
// We try once to load from disk. If it fails, then
188
// a record with type:'fatal'
189
// is created in the database; if it succeeds, that record is deleted.
190
// Try again only when the file changes.
191
await this._first_load();
192
193
// Listen for model state changes...
194
if (this.syncdb.ipywidgets_state == null) {
195
throw Error("syncdb's ipywidgets_state must be defined!");
196
}
197
this.syncdb.ipywidgets_state.on(
198
"change",
199
this.handle_ipywidgets_state_change.bind(this),
200
);
201
202
this.syncdb.on("cursor_activity", this.checkForComputeServerStateChange);
203
204
// initialize the websocket api
205
this.initWebsocketApi();
206
}
207
208
private async _first_load() {
209
const dbg = this.dbg("_first_load");
210
dbg("doing load");
211
if (this.is_closed()) {
212
throw Error("actions must not be closed");
213
}
214
try {
215
await this.loadFromDiskIfNewer();
216
} catch (err) {
217
dbg(`load failed -- ${err}; wait for file change and try again`);
218
const path = this.store.get("path");
219
const watcher = this._client.watch_file({ path });
220
await once(watcher, "change");
221
dbg("file changed");
222
watcher.close();
223
await this._first_load();
224
return;
225
}
226
dbg("loading worked");
227
this._init_after_first_load();
228
}
229
230
private _init_after_first_load() {
231
const dbg = this.dbg("_init_after_first_load");
232
233
dbg("initializing");
234
this.ensure_backend_kernel_setup(); // this may change the syncdb.
235
236
this.init_file_watcher();
237
238
this._state = "ready";
239
this.ensure_there_is_a_cell();
240
}
241
242
_backend_syncdb_change = (changes: any) => {
243
if (this.is_closed()) {
244
return;
245
}
246
const dbg = this.dbg("_backend_syncdb_change");
247
if (changes != null) {
248
changes.forEach((key) => {
249
switch (key.get("type")) {
250
case "settings":
251
dbg("settings change");
252
var record = this.syncdb.get_one(key);
253
if (record != null) {
254
// ensure kernel is properly configured
255
this.ensure_backend_kernel_setup();
256
// only the backend should change kernel and backend state;
257
// however, our security model allows otherwise (e.g., via TimeTravel).
258
if (
259
record.get("kernel_state") !== this._kernel_state &&
260
this._kernel_state != null
261
) {
262
this.set_kernel_state(this._kernel_state, true);
263
}
264
if (record.get("backend_state") !== this._backend_state) {
265
this.set_backend_state(this._backend_state);
266
}
267
268
if (record.get("run_all_loop_s")) {
269
if (this.run_all_loop == null) {
270
this.run_all_loop = new RunAllLoop(
271
this,
272
record.get("run_all_loop_s"),
273
);
274
} else {
275
// ensure interval is correct
276
this.run_all_loop.set_interval(record.get("run_all_loop_s"));
277
}
278
} else if (
279
!record.get("run_all_loop_s") &&
280
this.run_all_loop != null
281
) {
282
// stop it.
283
this.run_all_loop.close();
284
delete this.run_all_loop;
285
}
286
}
287
break;
288
}
289
});
290
}
291
292
this.ensure_there_is_a_cell();
293
this._throttled_ensure_positions_are_unique();
294
this.sync_exec_state();
295
};
296
297
// ensure_backend_kernel_setup ensures that we have a connection
298
// to the proper type of kernel.
299
// If running is true, starts the kernel and waits until running.
300
ensure_backend_kernel_setup = () => {
301
const dbg = this.dbg("ensure_backend_kernel_setup");
302
if (this.isDeleted()) {
303
dbg("file is deleted");
304
return;
305
}
306
307
const kernel = this.store.get("kernel");
308
309
let current: string | undefined = undefined;
310
if (this.jupyter_kernel != null) {
311
current = this.jupyter_kernel.name;
312
if (current == kernel && this.jupyter_kernel.get_state() != "closed") {
313
dbg("everything is properly setup and working");
314
return;
315
}
316
}
317
318
dbg(`kernel='${kernel}', current='${current}'`);
319
if (
320
this.jupyter_kernel != null &&
321
this.jupyter_kernel.get_state() != "closed"
322
) {
323
if (current != kernel) {
324
dbg("kernel changed -- kill running kernel to trigger switch");
325
this.jupyter_kernel.close();
326
return;
327
} else {
328
dbg("nothing to do");
329
return;
330
}
331
}
332
333
dbg("make a new kernel");
334
335
// No kernel wrapper object setup at all. Make one.
336
this.jupyter_kernel = createJupyterKernel({
337
name: kernel,
338
path: this.store.get("path"),
339
actions: this,
340
});
341
342
if (this.syncdb.ipywidgets_state == null) {
343
throw Error("syncdb's ipywidgets_state must be defined!");
344
}
345
this.syncdb.ipywidgets_state.clear();
346
347
if (this.jupyter_kernel == null) {
348
// to satisfy typescript.
349
throw Error("jupyter_kernel must be defined");
350
}
351
352
// save so gets reported to frontend, and surfaced to user:
353
// https://github.com/sagemathinc/cocalc/issues/4847
354
this.jupyter_kernel.on("kernel_error", (error) => {
355
this.set_kernel_error(error);
356
});
357
358
// Since we just made a new kernel, clearly no cells are running on the backend.
359
this._running_cells = {};
360
this.clear_all_cell_run_state();
361
362
this.restartKernelOnClose = () => {
363
// When the kernel closes, make sure a new kernel gets setup.
364
if (this.store == null || this._state !== "ready") {
365
// This event can also happen when this actions is being closed,
366
// in which case obviously we shouldn't make a new kernel.
367
return;
368
}
369
dbg("kernel closed -- make new one.");
370
this.ensure_backend_kernel_setup();
371
};
372
373
this.jupyter_kernel.once("closed", this.restartKernelOnClose);
374
375
// Track backend state changes other than closing, so they
376
// are visible to user etc.
377
// TODO: Maybe all these need to move to ephemeral table?
378
// There's a good argument that recording these is useful though, so when
379
// looking at time travel or debugging, you know what was going on.
380
this.jupyter_kernel.on("state", (state) => {
381
dbg("jupyter_kernel state --> ", state);
382
switch (state) {
383
case "off":
384
case "closed":
385
// things went wrong.
386
this._running_cells = {};
387
this.clear_all_cell_run_state();
388
this.set_backend_state("ready");
389
this.jupyter_kernel?.close();
390
this.running_manager_run_cell_process_queue = false;
391
delete this.jupyter_kernel;
392
return;
393
case "spawning":
394
case "starting":
395
this.set_connection_file(); // yes, fall through
396
case "running":
397
this.set_backend_state(state);
398
}
399
});
400
401
this.jupyter_kernel.on("execution_state", this.set_kernel_state);
402
403
this.handle_all_cell_attachments();
404
this.set_backend_state("ready");
405
};
406
407
set_connection_file = () => {
408
const connection_file = this.jupyter_kernel?.get_connection_file() ?? "";
409
this._set({
410
type: "settings",
411
connection_file,
412
});
413
};
414
415
init_kernel_info = async () => {
416
let kernels0 = this.store.get("kernels");
417
if (kernels0 != null) {
418
return;
419
}
420
const dbg = this.dbg("init_kernel_info");
421
dbg("getting");
422
let kernels;
423
try {
424
kernels = await get_kernel_data();
425
dbg("success");
426
} catch (err) {
427
dbg(`FAILED to get kernel info: ${err}`);
428
// TODO: what to do?? Saving will be broken...
429
return;
430
}
431
this.setState({
432
kernels: immutable.fromJS(kernels),
433
});
434
};
435
436
async ensure_backend_kernel_is_running() {
437
const dbg = this.dbg("ensure_backend_kernel_is_running");
438
if (this._backend_state == "ready") {
439
dbg("in state 'ready', so kick it into gear");
440
await this.set_backend_kernel_info();
441
dbg("done getting kernel info");
442
}
443
const is_running = (s): boolean => {
444
if (this._state === "closed") return true;
445
const t = s.get_one({ type: "settings" });
446
if (t == null) {
447
dbg("no settings");
448
return false;
449
} else {
450
const state = t.get("backend_state");
451
dbg(`state = ${state}`);
452
return state == "running";
453
}
454
};
455
await this.syncdb.wait(is_running, 60);
456
}
457
458
// onCellChange is called after a cell change has been
459
// incorporated into the store after the syncdb change event.
460
// - If we are responsible for running cells, then it ensures
461
// that cell gets computed.
462
// - We also handle attachments for markdown cells.
463
protected onCellChange(id: string, new_cell: any, old_cell: any) {
464
const dbg = this.dbg(`onCellChange(id='${id}')`);
465
dbg();
466
// this logging could be expensive due to toJS, so only uncomment
467
// if really needed
468
// dbg("new_cell=", new_cell?.toJS(), "old_cell", old_cell?.toJS());
469
470
if (
471
new_cell?.get("state") === "start" &&
472
old_cell?.get("state") !== "start" &&
473
this.isCellRunner()
474
) {
475
this.manager_run_cell_enqueue(id);
476
// attachments below only happen for markdown cells, which don't get run,
477
// we can return here:
478
return;
479
}
480
481
const attachments = new_cell?.get("attachments");
482
if (attachments != null && attachments !== old_cell?.get("attachments")) {
483
this.handle_cell_attachments(new_cell);
484
}
485
}
486
487
protected __syncdb_change_post_hook(doInit: boolean) {
488
if (doInit) {
489
if (this.isCellRunner()) {
490
// Since just opening the actions in the project, definitely the kernel
491
// isn't running so set this fact in the shared database. It will make
492
// things always be in the right initial state.
493
this.syncdb.set({
494
type: "settings",
495
backend_state: "init",
496
kernel_state: "idle",
497
kernel_usage: { memory: 0, cpu: 0 },
498
});
499
this.syncdb.commit();
500
}
501
502
// Also initialize the execution manager, which runs cells that have been
503
// requested to run.
504
this.initialize_manager();
505
}
506
if (this.store.get("kernel")) {
507
this.manager_run_cell_process_queue();
508
}
509
}
510
511
// Ensure that the cells listed as running *are* exactly the
512
// ones actually running or queued up to run.
513
sync_exec_state = () => {
514
// sync_exec_state is debounced, so it is *expected* to get called
515
// after actions have been closed.
516
if (this.store == null || this._state !== "ready") {
517
// not initialized, so we better not
518
// mess with cell state (that is somebody else's responsibility).
519
return;
520
}
521
// we are not the cell runner
522
if (!this.isCellRunner()) {
523
return;
524
}
525
526
const dbg = this.dbg("sync_exec_state");
527
let change = false;
528
const cells = this.store.get("cells");
529
// First verify that all actual cells that are said to be running
530
// (according to the store) are in fact running.
531
if (cells != null) {
532
cells.forEach((cell, id) => {
533
const state = cell.get("state");
534
if (
535
state != null &&
536
state != "done" &&
537
state != "start" && // regarding "start", see https://github.com/sagemathinc/cocalc/issues/5467
538
!this._running_cells?.[id]
539
) {
540
dbg(`set cell ${id} with state "${state}" to done`);
541
this._set({ type: "cell", id, state: "done" }, false);
542
change = true;
543
}
544
});
545
}
546
if (this._running_cells != null) {
547
const cells = this.store.get("cells");
548
// Next verify that every cell actually running is still in the document
549
// and listed as running. TimeTravel, deleting cells, etc., can
550
// certainly lead to this being necessary.
551
for (const id in this._running_cells) {
552
const state = cells.getIn([id, "state"]);
553
if (state == null || state === "done") {
554
// cell no longer exists or isn't in a running state
555
dbg(`tell kernel to not run ${id}`);
556
this._cancel_run(id);
557
}
558
}
559
}
560
if (change) {
561
return this._sync();
562
}
563
};
564
565
_cancel_run = (id: any) => {
566
const dbg = this.dbg(`_cancel_run ${id}`);
567
// All these checks are so we only cancel if it is actually running
568
// with the current kernel...
569
if (this._running_cells == null || this.jupyter_kernel == null) return;
570
const identity = this._running_cells[id];
571
if (identity == null) return;
572
if (this.jupyter_kernel.identity == identity) {
573
dbg("canceling");
574
this.jupyter_kernel.cancel_execute(id);
575
} else {
576
dbg("not canceling since wrong identity");
577
}
578
};
579
580
// Note that there is a request to run a given cell.
581
// You must call manager_run_cell_process_queue for them to actually start running.
582
protected manager_run_cell_enqueue(id: string) {
583
if (this._running_cells?.[id]) {
584
return;
585
}
586
if (this._manager_run_cell_queue == null) {
587
this._manager_run_cell_queue = {};
588
}
589
this._manager_run_cell_queue[id] = true;
590
}
591
592
// properly start running -- in order -- the cells that have been requested to run
593
protected async manager_run_cell_process_queue() {
594
if (this.running_manager_run_cell_process_queue) {
595
return;
596
}
597
this.running_manager_run_cell_process_queue = true;
598
try {
599
const dbg = this.dbg("manager_run_cell_process_queue");
600
const queue = this._manager_run_cell_queue;
601
if (queue == null) {
602
//dbg("queue is null");
603
return;
604
}
605
delete this._manager_run_cell_queue;
606
const v: any[] = [];
607
for (const id in queue) {
608
if (!this._running_cells?.[id]) {
609
v.push(this.store.getIn(["cells", id]));
610
}
611
}
612
613
if (v.length == 0) {
614
dbg("no non-running cells");
615
return; // nothing to do
616
}
617
618
v.sort((a, b) =>
619
misc.cmp(
620
a != null ? a.get("start") : undefined,
621
b != null ? b.get("start") : undefined,
622
),
623
);
624
625
dbg(
626
`found ${v.length} non-running cell that should be running, so ensuring kernel is running...`,
627
);
628
this.ensure_backend_kernel_setup();
629
try {
630
await this.ensure_backend_kernel_is_running();
631
if (this._state == "closed") return;
632
} catch (err) {
633
// if this fails, give up on evaluation.
634
return;
635
}
636
637
dbg(
638
`kernel is now running; requesting that each ${v.length} cell gets executed`,
639
);
640
for (const cell of v) {
641
if (cell != null) {
642
this.manager_run_cell(cell.get("id"));
643
}
644
}
645
646
if (this._manager_run_cell_queue != null) {
647
// run it again to process additional entries.
648
setTimeout(this.manager_run_cell_process_queue, 1);
649
}
650
} finally {
651
this.running_manager_run_cell_process_queue = false;
652
}
653
}
654
655
// returns new output handler for this cell.
656
protected _output_handler(cell: any) {
657
const dbg = this.dbg(`handler(id='${cell.id}')`);
658
if (
659
this.jupyter_kernel == null ||
660
this.jupyter_kernel.get_state() == "closed"
661
) {
662
throw Error("jupyter kernel must exist and not be closed");
663
}
664
this.reset_more_output(cell.id);
665
666
const handler = new OutputHandler({
667
cell,
668
max_output_length: this.store.get("max_output_length"),
669
report_started_ms: 250,
670
dbg,
671
});
672
673
dbg("setting up jupyter_kernel.once('closed', ...) handler");
674
const handleKernelClose = () => {
675
dbg("output handler -- closing due to jupyter kernel closed");
676
handler.close();
677
};
678
this.jupyter_kernel.once("closed", handleKernelClose);
679
// remove the "closed" handler we just defined above once
680
// we are done waiting for output from this cell.
681
// The output handler removes all listeners whenever it is
682
// finished, so we don't have to remove this listener for done.
683
handler.once("done", () =>
684
this.jupyter_kernel?.removeListener("closed", handleKernelClose),
685
);
686
687
handler.on("more_output", (mesg, mesg_length) => {
688
this.set_more_output(cell.id, mesg, mesg_length);
689
});
690
691
handler.on("process", (mesg) => {
692
// Do not enable -- mesg often very large!
693
// dbg("handler.on('process')", mesg);
694
if (
695
this.jupyter_kernel == null ||
696
this.jupyter_kernel.get_state() == "closed"
697
) {
698
return;
699
}
700
this.jupyter_kernel.process_output(mesg);
701
// dbg("handler -- after processing ", mesg);
702
});
703
704
return handler;
705
}
706
707
manager_run_cell = (id: string) => {
708
const dbg = this.dbg(`manager_run_cell(id='${id}')`);
709
dbg(JSON.stringify(misc.keys(this._running_cells)));
710
711
if (this._running_cells == null) {
712
this._running_cells = {};
713
}
714
715
if (this._running_cells[id]) {
716
dbg("cell already queued to run in kernel");
717
return;
718
}
719
720
// It's important to set this._running_cells[id] to be true so that
721
// sync_exec_state doesn't declare this cell done. The kernel identity
722
// will get set properly below in case it changes.
723
this._running_cells[id] = this.jupyter_kernel?.identity ?? "none";
724
725
const orig_cell = this.store.get("cells").get(id);
726
if (orig_cell == null) {
727
// nothing to do -- cell deleted
728
return;
729
}
730
731
let input: string | undefined = orig_cell.get("input", "");
732
if (input == null) {
733
input = "";
734
} else {
735
input = input.trim();
736
}
737
738
const halt_on_error: boolean = !orig_cell.get("no_halt", false);
739
740
if (this.jupyter_kernel == null) {
741
throw Error("bug -- this is guaranteed by the above");
742
}
743
this._running_cells[id] = this.jupyter_kernel.identity;
744
745
const cell: any = {
746
id,
747
type: "cell",
748
kernel: this.store.get("kernel"),
749
};
750
751
dbg(`using max_output_length=${this.store.get("max_output_length")}`);
752
const handler = this._output_handler(cell);
753
754
// exponentiallyThrottledSaved calls this.syncdb?.save, but
755
// it throttles the calls, and does so using exponential backoff
756
// up to MAX_OUTPUT_SAVE_DELAY milliseconds. Basically every
757
// time exponentiallyThrottledSaved is called it increases the
758
// interval used for throttling by multiplying saveThrottleMs by 1.3
759
// until saveThrottleMs gets to MAX_OUTPUT_SAVE_DELAY. There is no
760
// need at all to do a trailing call, since other code handles that.
761
let saveThrottleMs = 1;
762
let lastCall = 0;
763
const exponentiallyThrottledSaved = () => {
764
const now = Date.now();
765
if (now - lastCall < saveThrottleMs) {
766
return;
767
}
768
lastCall = now;
769
saveThrottleMs = Math.min(1.3 * saveThrottleMs, MAX_OUTPUT_SAVE_DELAY);
770
this.syncdb?.save();
771
};
772
773
handler.on("change", (save) => {
774
if (!this.store.getIn(["cells", id])) {
775
// The cell was deleted, but we just got some output
776
// NOTE: client shouldn't allow deleting running or queued
777
// cells, but we still want to do something useful/sensible.
778
// We put cell back where it was with same input.
779
cell.input = orig_cell.get("input");
780
cell.pos = orig_cell.get("pos");
781
}
782
this.syncdb.set(cell);
783
// This is potentially very verbose -- don't due it unless
784
// doing low level debugging:
785
//dbg(`change (save=${save}): cell='${JSON.stringify(cell)}'`);
786
if (save) {
787
exponentiallyThrottledSaved();
788
}
789
});
790
791
handler.once("done", () => {
792
dbg("handler is done");
793
this.store.removeListener("cell_change", cell_change);
794
exec.close();
795
if (this._running_cells != null) {
796
delete this._running_cells[id];
797
}
798
this.syncdb?.save();
799
setTimeout(() => this.syncdb?.save(), 100);
800
});
801
802
if (this.jupyter_kernel == null) {
803
handler.error("Unable to start Jupyter");
804
return;
805
}
806
807
const get_password = (): string => {
808
if (this.jupyter_kernel == null) {
809
dbg("get_password", id, "no kernel");
810
return "";
811
}
812
const password = this.jupyter_kernel.store.get(id);
813
dbg("get_password", id, password);
814
this.jupyter_kernel.store.delete(id);
815
return password;
816
};
817
818
// This is used only for stdin right now.
819
const cell_change = (cell_id, new_cell) => {
820
if (id === cell_id) {
821
dbg("cell_change");
822
handler.cell_changed(new_cell, get_password);
823
}
824
};
825
this.store.on("cell_change", cell_change);
826
827
const exec = this.jupyter_kernel.execute_code({
828
code: input,
829
id,
830
stdin: handler.stdin,
831
halt_on_error,
832
});
833
834
exec.on("output", (mesg) => {
835
// uncomment only for specific low level debugging -- see https://github.com/sagemathinc/cocalc/issues/7022
836
// dbg(`got mesg='${JSON.stringify(mesg)}'`); // !!!☡ ☡ ☡ -- EXTREME DANGER ☡ ☡ ☡ !!!!
837
838
if (mesg == null) {
839
// can't possibly happen, of course.
840
const err = "empty mesg";
841
dbg(`got error='${err}'`);
842
handler.error(err);
843
return;
844
}
845
if (mesg.done) {
846
// done is a special internal cocalc message.
847
handler.done();
848
return;
849
}
850
if (mesg.content?.transient?.display_id != null) {
851
// See https://github.com/sagemathinc/cocalc/issues/2132
852
// We find any other outputs in the document with
853
// the same transient.display_id, and set their output to
854
// this mesg's output.
855
this.handleTransientUpdate(mesg);
856
if (mesg.msg_type == "update_display_data") {
857
// don't also create a new output
858
return;
859
}
860
}
861
862
if (mesg.msg_type === "clear_output") {
863
handler.clear(mesg.content.wait);
864
return;
865
}
866
867
if (mesg.content.comm_id != null) {
868
// ignore any comm/widget related messages
869
return;
870
}
871
872
if (mesg.content.execution_state === "idle") {
873
this.store.removeListener("cell_change", cell_change);
874
return;
875
}
876
if (mesg.content.execution_state === "busy") {
877
handler.start();
878
}
879
if (mesg.content.payload != null) {
880
if (mesg.content.payload.length > 0) {
881
// payload shell message:
882
// Despite https://ipython.org/ipython-doc/3/development/messaging.html#payloads saying
883
// ""Payloads are considered deprecated, though their replacement is not yet implemented."
884
// we fully have to implement them, since they are used to implement (crazy, IMHO)
885
// things like %load in the python2 kernel!
886
mesg.content.payload.map((p) => handler.payload(p));
887
return;
888
}
889
} else {
890
// Normal iopub output message
891
handler.message(mesg.content);
892
return;
893
}
894
});
895
896
exec.on("error", (err) => {
897
dbg(`got error='${err}'`);
898
handler.error(err);
899
});
900
};
901
902
reset_more_output = (id: any) => {
903
if (id == null) {
904
delete this.store._more_output;
905
}
906
if (
907
(this.store._more_output != null
908
? this.store._more_output[id]
909
: undefined) != null
910
) {
911
return delete this.store._more_output[id];
912
}
913
};
914
915
set_more_output = (id: any, mesg: any, length: any): void => {
916
if (this.store._more_output == null) {
917
this.store._more_output = {};
918
}
919
const output =
920
this.store._more_output[id] != null
921
? this.store._more_output[id]
922
: (this.store._more_output[id] = {
923
length: 0,
924
messages: [],
925
lengths: [],
926
discarded: 0,
927
truncated: 0,
928
});
929
930
output.length += length;
931
output.lengths.push(length);
932
output.messages.push(mesg);
933
934
const goal_length = 10 * this.store.get("max_output_length");
935
while (output.length > goal_length) {
936
let need: any;
937
let did_truncate = false;
938
939
// check if there is a text field, which we can truncate
940
let len =
941
output.messages[0].text != null
942
? output.messages[0].text.length
943
: undefined;
944
if (len != null) {
945
need = output.length - goal_length + 50;
946
if (len > need) {
947
// Instead of throwing this message away, let's truncate its text part. After
948
// doing this, the message is at least need shorter than it was before.
949
output.messages[0].text = misc.trunc(
950
output.messages[0].text,
951
len - need,
952
);
953
did_truncate = true;
954
}
955
}
956
957
// check if there is a text/plain field, which we can thus also safely truncate
958
if (!did_truncate && output.messages[0].data != null) {
959
for (const field in output.messages[0].data) {
960
if (field === "text/plain") {
961
const val = output.messages[0].data[field];
962
len = val.length;
963
if (len != null) {
964
need = output.length - goal_length + 50;
965
if (len > need) {
966
// Instead of throwing this message away, let's truncate its text part. After
967
// doing this, the message is at least need shorter than it was before.
968
output.messages[0].data[field] = misc.trunc(val, len - need);
969
did_truncate = true;
970
}
971
}
972
}
973
}
974
}
975
976
if (did_truncate) {
977
const new_len = JSON.stringify(output.messages[0]).length;
978
output.length -= output.lengths[0] - new_len; // how much we saved
979
output.lengths[0] = new_len;
980
output.truncated += 1;
981
break;
982
}
983
984
const n = output.lengths.shift();
985
output.messages.shift();
986
output.length -= n;
987
output.discarded += 1;
988
}
989
};
990
991
private init_file_watcher() {
992
const dbg = this.dbg("file_watcher");
993
dbg();
994
this._file_watcher = this._client.watch_file({
995
path: this.store.get("path"),
996
debounce: 1000,
997
});
998
999
this._file_watcher.on("change", async () => {
1000
if (!this.isCellRunner()) {
1001
return;
1002
}
1003
dbg("change");
1004
try {
1005
await this.loadFromDiskIfNewer();
1006
} catch (err) {
1007
dbg("failed to load on change", err);
1008
}
1009
});
1010
}
1011
1012
/*
1013
* Unfortunately, though I spent two hours on this approach... it just doesn't work,
1014
* since, e.g., if the sync file doesn't already exist, it can't be created,
1015
* which breaks everything. So disabling for now and re-opening the issue.
1016
_sync_file_mode: =>
1017
dbg = @dbg("_sync_file_mode"); dbg()
1018
* Make the mode of the syncdb file the same as the mode of the .ipynb file.
1019
* This is used for read-only status.
1020
ipynb_file = @store.get('path')
1021
locals =
1022
ipynb_file_ro : undefined
1023
syncdb_file_ro : undefined
1024
syncdb_file = @syncdb.get_path()
1025
async.parallel([
1026
(cb) ->
1027
fs.access ipynb_file, fs.constants.W_OK, (err) ->
1028
* Also store in @_ipynb_file_ro to prevent starting kernel in this case.
1029
@_ipynb_file_ro = locals.ipynb_file_ro = !!err
1030
cb()
1031
(cb) ->
1032
fs.access syncdb_file, fs.constants.W_OK, (err) ->
1033
locals.syncdb_file_ro = !!err
1034
cb()
1035
], ->
1036
if locals.ipynb_file_ro == locals.syncdb_file_ro
1037
return
1038
dbg("mode change")
1039
async.parallel([
1040
(cb) ->
1041
fs.stat ipynb_file, (err, stats) ->
1042
locals.ipynb_stats = stats
1043
cb(err)
1044
(cb) ->
1045
* error if syncdb_file doesn't exist, which is GOOD, since
1046
* in that case we do not want to chmod which would create
1047
* that file as empty and blank it.
1048
fs.stat(syncdb_file, cb)
1049
], (err) ->
1050
if not err
1051
dbg("changing syncb mode to match ipynb mode")
1052
fs.chmod(syncdb_file, locals.ipynb_stats.mode)
1053
else
1054
dbg("error stating ipynb", err)
1055
)
1056
)
1057
*/
1058
1059
// Load file from disk if it is newer than
1060
// the last we saved to disk.
1061
private loadFromDiskIfNewer = async () => {
1062
const dbg = this.dbg("loadFromDiskIfNewer");
1063
// Get mtime of last .ipynb file that we explicitly saved.
1064
1065
// TODO: breaking the syncdb typescript data hiding. The
1066
// right fix will be to move
1067
// this info to a new ephemeral state table.
1068
const last_ipynb_save = await this.get_last_ipynb_save();
1069
dbg(`syncdb last_ipynb_save=${last_ipynb_save}`);
1070
let file_changed;
1071
if (last_ipynb_save == 0) {
1072
// we MUST load from file the first time, of course.
1073
file_changed = true;
1074
dbg("file changed because FIRST TIME");
1075
} else {
1076
const path = this.store.get("path");
1077
let stats;
1078
try {
1079
stats = await callback2(this._client.path_stat, { path });
1080
dbg(`stats.mtime = ${stats.mtime}`);
1081
} catch (err) {
1082
// This err just means the file doesn't exist.
1083
// We set the 'last load' to now in this case, since
1084
// the frontend clients need to know that we
1085
// have already scanned the disk.
1086
this.set_last_load();
1087
return;
1088
}
1089
const mtime = stats.mtime.getTime();
1090
file_changed = mtime > last_ipynb_save;
1091
dbg({ mtime, last_ipynb_save });
1092
}
1093
if (file_changed) {
1094
dbg(".ipynb disk file changed ==> loading state from disk");
1095
try {
1096
await this.load_ipynb_file();
1097
} catch (err) {
1098
dbg("failed to load on change", err);
1099
}
1100
} else {
1101
dbg("disk file NOT changed: NOT loading");
1102
}
1103
};
1104
1105
// if also set load is true, we also set the "last_ipynb_save" time.
1106
set_last_load = (alsoSetLoad: boolean = false) => {
1107
const last_load = new Date().getTime();
1108
this.syncdb.set({
1109
type: "file",
1110
last_load,
1111
});
1112
if (alsoSetLoad) {
1113
// yes, load v save is inconsistent!
1114
this.syncdb.set({ type: "settings", last_ipynb_save: last_load });
1115
}
1116
this.syncdb.commit();
1117
};
1118
1119
/* Determine timestamp of aux .ipynb file, and record it here,
1120
so we know that we do not have to load exactly that file
1121
back from disk. */
1122
private set_last_ipynb_save = async () => {
1123
let stats;
1124
try {
1125
stats = await callback2(this._client.path_stat, {
1126
path: this.store.get("path"),
1127
});
1128
} catch (err) {
1129
// no-op -- nothing to do.
1130
this.dbg("set_last_ipynb_save")(`WARNING -- issue in path_stat ${err}`);
1131
return;
1132
}
1133
1134
// This is ugly (i.e., how we get access), but I need to get this done.
1135
// This is the RIGHT place to save the info though.
1136
// TODO: move this state info to new ephemeral table.
1137
try {
1138
const last_ipynb_save = stats.mtime.getTime();
1139
this.last_ipynb_save = last_ipynb_save;
1140
this._set({
1141
type: "settings",
1142
last_ipynb_save,
1143
});
1144
this.dbg("stats.mtime.getTime()")(
1145
`set_last_ipynb_save = ${last_ipynb_save}`,
1146
);
1147
} catch (err) {
1148
this.dbg("set_last_ipynb_save")(
1149
`WARNING -- issue in set_last_ipynb_save ${err}`,
1150
);
1151
return;
1152
}
1153
};
1154
1155
private get_last_ipynb_save = async () => {
1156
const x =
1157
this.syncdb.get_one({ type: "settings" })?.get("last_ipynb_save") ?? 0;
1158
return Math.max(x, this.last_ipynb_save);
1159
};
1160
1161
load_ipynb_file = async () => {
1162
/*
1163
Read the ipynb file from disk. Fully use the ipynb file to
1164
set the syncdb's state. We do this when opening a new file, or when
1165
the file changes on disk (e.g., a git checkout or something).
1166
*/
1167
const dbg = this.dbg(`load_ipynb_file`);
1168
dbg("reading file");
1169
const path = this.store.get("path");
1170
let content: string;
1171
try {
1172
content = await callback2(this._client.path_read, {
1173
path,
1174
maxsize_MB: 50,
1175
});
1176
} catch (err) {
1177
// possibly file doesn't exist -- set notebook to empty.
1178
const exists = await callback2(this._client.path_exists, {
1179
path,
1180
});
1181
if (!exists) {
1182
content = "";
1183
} else {
1184
// It would be better to have a button to push instead of
1185
// suggesting running a command in the terminal, but
1186
// adding that took 1 second. Better than both would be
1187
// making it possible to edit huge files :-).
1188
const error = `Error reading ipynb file '${path}': ${err.toString()}. Fix this to continue. You can delete all output by typing cc-jupyter-no-output [filename].ipynb in a terminal.`;
1189
this.syncdb.set({ type: "fatal", error });
1190
throw Error(error);
1191
}
1192
}
1193
if (content.length === 0) {
1194
// Blank file, e.g., when creating in CoCalc.
1195
// This is good, works, etc. -- just clear state, including error.
1196
this.syncdb.delete();
1197
this.set_last_load(true);
1198
return;
1199
}
1200
1201
// File is nontrivial -- parse and load.
1202
let parsed_content;
1203
try {
1204
parsed_content = JSON.parse(content);
1205
} catch (err) {
1206
const error = `Error parsing the ipynb file '${path}': ${err}. You must fix the ipynb file somehow before continuing.`;
1207
dbg(error);
1208
this.syncdb.set({ type: "fatal", error });
1209
throw Error(error);
1210
}
1211
this.syncdb.delete({ type: "fatal" });
1212
await this.set_to_ipynb(parsed_content);
1213
this.set_last_load(true);
1214
};
1215
1216
save_ipynb_file = async () => {
1217
const dbg = this.dbg("save_ipynb_file");
1218
if (!this.isCellRunner()) {
1219
dbg("not cell runner, so NOT saving ipynb file to disk");
1220
return;
1221
}
1222
dbg("saving to file");
1223
1224
// Check first if file was deleted, in which case instead of saving to disk,
1225
// we should terminate and clean up everything.
1226
if (this.isDeleted()) {
1227
dbg("ipynb file is deleted, so NOT saving to disk and closing");
1228
this.close({ noSave: true });
1229
return;
1230
}
1231
1232
if (this.jupyter_kernel == null) {
1233
// The kernel is needed to get access to the blob store, which
1234
// may be needed to save to disk.
1235
this.ensure_backend_kernel_setup();
1236
if (this.jupyter_kernel == null) {
1237
// still not null? This would happen if no kernel is set at all,
1238
// in which case it's OK that saving isn't possible.
1239
throw Error("no kernel so cannot save");
1240
}
1241
}
1242
if (this.store.get("kernels") == null) {
1243
await this.init_kernel_info();
1244
if (this.store.get("kernels") == null) {
1245
// This should never happen, but maybe could in case of a very
1246
// messed up compute environment where the kernelspecs can't be listed.
1247
throw Error(
1248
"kernel info not known and can't be determined, so can't save",
1249
);
1250
}
1251
}
1252
dbg("going to try to save: getting ipynb object...");
1253
const blob_store = this.jupyter_kernel.get_blob_store();
1254
let ipynb = this.store.get_ipynb(blob_store);
1255
if (this.store.get("kernel")) {
1256
// if a kernel is set, check that it was sufficiently known that
1257
// we can fill in data about it -- see https://github.com/sagemathinc/cocalc/issues/7286
1258
if (ipynb?.metadata?.kernelspec?.name == null) {
1259
dbg("kernelspec not known -- try loading kernels again");
1260
await this.fetch_jupyter_kernels();
1261
// and again grab the ipynb
1262
ipynb = this.store.get_ipynb(blob_store);
1263
if (ipynb?.metadata?.kernelspec?.name == null) {
1264
dbg("kernelspec STILL not known: metadata will be incomplete");
1265
}
1266
}
1267
}
1268
dbg("got ipynb object");
1269
// We use json_stable (and indent 1) to be more diff friendly to user,
1270
// and more consistent with official Jupyter.
1271
const data = json_stable(ipynb, { space: 1 });
1272
if (data == null) {
1273
dbg("failed -- ipynb not defined yet");
1274
throw Error("ipynb not defined yet; can't save");
1275
}
1276
dbg("converted ipynb to stable JSON string", data?.length);
1277
//dbg(`got string version '${data}'`)
1278
try {
1279
dbg("writing to disk...");
1280
await callback2(this._client.write_file, {
1281
path: this.store.get("path"),
1282
data,
1283
});
1284
dbg("succeeded at saving");
1285
await this.set_last_ipynb_save();
1286
} catch (err) {
1287
const e = `error writing file: ${err}`;
1288
dbg(e);
1289
throw Error(e);
1290
}
1291
};
1292
1293
ensure_there_is_a_cell = () => {
1294
if (this._state !== "ready") {
1295
return;
1296
}
1297
const cells = this.store.get("cells");
1298
if (cells == null || (cells.size === 0 && this.isCellRunner())) {
1299
this._set({
1300
type: "cell",
1301
id: this.new_id(),
1302
pos: 0,
1303
input: "",
1304
});
1305
// We are obviously contributing content to this (empty!) notebook.
1306
return this.set_trust_notebook(true);
1307
}
1308
};
1309
1310
private handle_all_cell_attachments() {
1311
// Check if any cell attachments need to be loaded.
1312
const cells = this.store.get("cells");
1313
cells?.forEach((cell) => {
1314
this.handle_cell_attachments(cell);
1315
});
1316
}
1317
1318
private handle_cell_attachments(cell) {
1319
if (this.jupyter_kernel == null) {
1320
// can't do anything
1321
return;
1322
}
1323
const dbg = this.dbg(`handle_cell_attachments(id=${cell.get("id")})`);
1324
dbg();
1325
1326
const attachments = cell.get("attachments");
1327
if (attachments == null) return; // nothing to do
1328
attachments.forEach(async (x, name) => {
1329
if (x == null) return;
1330
if (x.get("type") === "load") {
1331
if (this.jupyter_kernel == null) return; // try later
1332
// need to load from disk
1333
this.set_cell_attachment(cell.get("id"), name, {
1334
type: "loading",
1335
value: null,
1336
});
1337
let sha1: string;
1338
try {
1339
sha1 = await this.jupyter_kernel.load_attachment(x.get("value"));
1340
} catch (err) {
1341
this.set_cell_attachment(cell.get("id"), name, {
1342
type: "error",
1343
value: `${err}`,
1344
});
1345
return;
1346
}
1347
this.set_cell_attachment(cell.get("id"), name, {
1348
type: "sha1",
1349
value: sha1,
1350
});
1351
}
1352
});
1353
}
1354
1355
// handle_ipywidgets_state_change is called when the project ipywidgets_state
1356
// object changes, e.g., in response to a user moving a slider in the browser.
1357
// It crafts a comm message that is sent to the running Jupyter kernel telling
1358
// it about this change by calling send_comm_message_to_kernel.
1359
private handle_ipywidgets_state_change(keys): void {
1360
if (this.is_closed()) {
1361
return;
1362
}
1363
const dbg = this.dbg("handle_ipywidgets_state_change");
1364
dbg(keys);
1365
if (this.jupyter_kernel == null) {
1366
dbg("no kernel, so ignoring changes to ipywidgets");
1367
return;
1368
}
1369
if (this.syncdb.ipywidgets_state == null) {
1370
throw Error("syncdb's ipywidgets_state must be defined!");
1371
}
1372
for (const key of keys) {
1373
const [, model_id, type] = JSON.parse(key);
1374
dbg({ key, model_id, type });
1375
let data: any;
1376
if (type === "value") {
1377
const state = this.syncdb.ipywidgets_state.get_model_value(model_id);
1378
// Saving the buffers on change is critical since otherwise this breaks:
1379
// https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20List.html#file-upload
1380
// Note that stupidly the buffer (e.g., image upload) gets sent to the kernel twice.
1381
// But it does work robustly, and the kernel and nodejs server processes next to each
1382
// other so this isn't so bad.
1383
const { buffer_paths, buffers } =
1384
this.syncdb.ipywidgets_state.getKnownBuffers(model_id);
1385
data = { method: "update", state, buffer_paths };
1386
this.jupyter_kernel.send_comm_message_to_kernel({
1387
msg_id: misc.uuid(),
1388
target_name: "jupyter.widget",
1389
comm_id: model_id,
1390
data,
1391
buffers,
1392
});
1393
} else if (type === "buffers") {
1394
// TODO: we MIGHT need implement this... but MAYBE NOT. An example where this seems like it might be
1395
// required is by the file upload widget, but actually that just uses the value type above, since
1396
// we explicitly fill in the widgets there; also there is an explicit comm upload message that
1397
// the widget sends out that updates the buffer, and in send_comm_message_to_kernel in jupyter/kernel/kernel.ts
1398
// when processing that message, we saves those buffers and make sure they are set in the
1399
// value case above (otherwise they would get removed).
1400
// https://ipywidgets.readthedocs.io/en/latest/examples/Widget%20List.html#file-upload
1401
// which creates a buffer from the content of the file, then sends it to the backend,
1402
// which sees a change and has to write that buffer to the kernel (here) so that
1403
// the running python process can actually do something with the file contents (e.g.,
1404
// process data, save file to disk, etc).
1405
// We need to be careful though to not send buffers to the kernel that the kernel sent us,
1406
// since that would be a waste.
1407
} else if (type === "state") {
1408
// TODO: currently ignoring this, since it seems chatty and pointless,
1409
// and could lead to race conditions probably with multiple users, etc.
1410
// It happens right when the widget is created.
1411
/*
1412
const state = this.syncdb.ipywidgets_state.getModelSerializedState(model_id);
1413
data = { method: "update", state };
1414
this.jupyter_kernel.send_comm_message_to_kernel(
1415
misc.uuid(),
1416
model_id,
1417
data
1418
);
1419
*/
1420
} else {
1421
throw Error(`invalid synctable state -- unknown type '${type}'`);
1422
}
1423
}
1424
}
1425
1426
public async process_comm_message_from_kernel(mesg: any): Promise<void> {
1427
const dbg = this.dbg("process_comm_message_from_kernel");
1428
// serializing the full message could cause enormous load on the server, since
1429
// the mesg may contain large buffers. Only do for low level debugging!
1430
// dbg(mesg); // EXTREME DANGER!
1431
// This should be safe:
1432
dbg(JSON.stringify(mesg.header));
1433
if (this.syncdb.ipywidgets_state == null) {
1434
throw Error("syncdb's ipywidgets_state must be defined!");
1435
}
1436
await this.syncdb.ipywidgets_state.process_comm_message_from_kernel(mesg);
1437
}
1438
1439
public capture_output_message(mesg: any): boolean {
1440
if (this.syncdb.ipywidgets_state == null) {
1441
throw Error("syncdb's ipywidgets_state must be defined!");
1442
}
1443
return this.syncdb.ipywidgets_state.capture_output_message(mesg);
1444
}
1445
1446
public close_project_only() {
1447
const dbg = this.dbg("close_project_only");
1448
dbg();
1449
if (this.run_all_loop) {
1450
this.run_all_loop.close();
1451
delete this.run_all_loop;
1452
}
1453
// this stops the kernel and cleans everything up
1454
// so no resources are wasted and next time starting
1455
// is clean
1456
(async () => {
1457
try {
1458
await removeJupyterRedux(this.store.get("path"), this.project_id);
1459
} catch (err) {
1460
dbg("WARNING -- issue removing jupyter redux", err);
1461
}
1462
})();
1463
}
1464
1465
// not actually async...
1466
public async signal(signal = "SIGINT"): Promise<void> {
1467
this.jupyter_kernel?.signal(signal);
1468
}
1469
1470
public handle_nbconvert_change(oldVal, newVal): void {
1471
nbconvertChange(this, oldVal?.toJS(), newVal?.toJS());
1472
}
1473
1474
protected isCellRunner = (): boolean => {
1475
if (this.is_closed()) {
1476
// it's closed, so obviously not the cell runner.
1477
return false;
1478
}
1479
const dbg = this.dbg("isCellRunner");
1480
let id;
1481
try {
1482
id = this.getComputeServerId();
1483
} catch (_) {
1484
// normal since debounced,
1485
// and anyways if anything like syncdb that getComputeServerId
1486
// depends on doesn't work, then we are clearly
1487
// not the cell runner
1488
return false;
1489
}
1490
dbg("id = ", id);
1491
if (id == 0 && this.is_project) {
1492
dbg("yes we are the cell runner (the project)");
1493
// when no remote compute servers are configured, the project is
1494
// responsible for evaluating code.
1495
return true;
1496
}
1497
if (this.is_compute_server) {
1498
// a remote compute server is supposed to be responsible. Are we it?
1499
try {
1500
const myId = decodeUUIDtoNum(this.syncdb.client_id());
1501
const isRunner = myId == id;
1502
dbg(isRunner ? "Yes, we are cell runner" : "NOT cell runner");
1503
return isRunner;
1504
} catch (err) {
1505
dbg(err);
1506
}
1507
}
1508
dbg("NO we are not the cell runner");
1509
return false;
1510
};
1511
1512
private lastComputeServerId = 0;
1513
private checkForComputeServerStateChange = (client_id) => {
1514
if (this.is_closed()) {
1515
return;
1516
}
1517
if (!isEncodedNumUUID(client_id)) {
1518
return;
1519
}
1520
const id = this.getComputeServerId();
1521
if (id != this.lastComputeServerId) {
1522
// reset all run state
1523
this.halt();
1524
this.clear_all_cell_run_state();
1525
}
1526
this.lastComputeServerId = id;
1527
};
1528
1529
/*
1530
WebSocket API
1531
1532
1. Handles api requests from the user via the generic websocket message channel
1533
provided by the syncdb.
1534
1535
2. In case a remote compute server connects and registers to handle api messages,
1536
then those are proxied to the remote server, handled there, and proxied back.
1537
*/
1538
1539
private initWebsocketApi = () => {
1540
if (this.is_project) {
1541
// only the project receives these messages from clients.
1542
this.syncdb.on("message", this.handleMessageFromClient);
1543
} else if (this.is_compute_server) {
1544
// compute servers receive messages from the project,
1545
// proxying an api request from a client.
1546
this.syncdb.on("message", this.handleMessageFromProject);
1547
}
1548
};
1549
1550
private remoteApiHandler: null | {
1551
spark: any; // the spark channel connection between project and compute server
1552
id: number; // this is a sequential id used for request/response pairing
1553
// when get response from computer server, one of these callbacks gets called:
1554
responseCallbacks: { [id: number]: (err: any, response: any) => void };
1555
} = null;
1556
1557
private handleMessageFromClient = async ({ data, spark }) => {
1558
// This is call in the project to handle api requests.
1559
// It either handles them directly, or if there is a remote
1560
// compute server, it forwards them to the remote compute server,
1561
// then proxies the response back to the client.
1562
1563
const dbg = this.dbg("handleMessageFromClient");
1564
dbg();
1565
// WARNING: potentially very verbose
1566
dbg(data);
1567
switch (data.event) {
1568
case "register-to-handle-api": {
1569
if (this.remoteApiHandler?.spark?.id == spark.id) {
1570
dbg(
1571
"register-to-handle-api -- it's the current one so nothing to do",
1572
);
1573
return;
1574
}
1575
if (this.remoteApiHandler?.spark != null) {
1576
dbg("register-to-handle-api -- remove existing handler");
1577
this.remoteApiHandler.spark.removeAllListeners();
1578
this.remoteApiHandler.spark.end();
1579
this.remoteApiHandler = null;
1580
}
1581
// a compute server client is volunteering to handle all api requests until they disconnect
1582
this.remoteApiHandler = { spark, id: 0, responseCallbacks: {} };
1583
dbg("register-to-handle-api -- spark.id = ", spark.id);
1584
spark.on("end", () => {
1585
dbg(
1586
"register-to-handle-api -- spark ended, spark.id = ",
1587
spark.id,
1588
" and this.remoteApiHandler?.spark.id=",
1589
this.remoteApiHandler?.spark.id,
1590
);
1591
if (this.remoteApiHandler?.spark.id == spark.id) {
1592
this.remoteApiHandler = null;
1593
}
1594
});
1595
return;
1596
}
1597
1598
case "api-request": {
1599
// browser client made an api request. This will get handled
1600
// either locally or via a remote compute server, depending on
1601
// whether this.remoteApiHandler is set (via the
1602
// register-to-handle-api event above).
1603
const response = await this.handleApiRequest(data);
1604
spark.write({
1605
event: "message",
1606
data: { event: "api-response", response, id: data.id },
1607
});
1608
return;
1609
}
1610
1611
case "api-response": {
1612
// handling api request that we proxied to a remote compute server.
1613
// We are handling the response from the remote compute server.
1614
if (this.remoteApiHandler == null) {
1615
dbg("WARNING: api-response event but there is no remote api handler");
1616
// api-response event can't be handled because no remote api handler is registered
1617
// This should only happen if the requesting spark just disconnected, so there's no way to
1618
// responsd anyways.
1619
return;
1620
}
1621
const cb = this.remoteApiHandler.responseCallbacks[data.id];
1622
if (cb != null) {
1623
delete this.remoteApiHandler.responseCallbacks[data.id];
1624
cb(undefined, data);
1625
} else {
1626
dbg("WARNING: api-response event for unknown id", data.id);
1627
}
1628
return;
1629
}
1630
1631
case "save-blob-to-project": {
1632
if (!this.is_project) {
1633
throw Error(
1634
"message save-blob-to-project should only be sent to the project",
1635
);
1636
}
1637
// A compute server sent the project a blob to store
1638
// in the local blob store.
1639
const blobStore = await get_blob_store();
1640
blobStore.save(data.data, data.type, data.ipynb);
1641
return;
1642
}
1643
1644
default: {
1645
// unknown event so send back error
1646
spark.write({
1647
event: "message",
1648
data: {
1649
event: "error",
1650
message: `unknown event ${data.event}`,
1651
id: data.id,
1652
},
1653
});
1654
}
1655
}
1656
};
1657
1658
// this should only be called on a compute server.
1659
public saveBlobToProject = (data: string, type: string, ipynb?: string) => {
1660
if (!this.is_compute_server) {
1661
throw Error(
1662
"saveBlobToProject should only be called on a compute server",
1663
);
1664
}
1665
const dbg = this.dbg("saveBlobToProject");
1666
if (this.is_closed()) {
1667
dbg("called AFTER closed");
1668
return;
1669
}
1670
// This is call on a compute server whenever something is
1671
// written to its local blob store. TODO: We do not wait for
1672
// confirmation that blob was sent yet though.
1673
dbg();
1674
this.syncdb.sendMessageToProject({
1675
event: "save-blob-to-project",
1676
data,
1677
type,
1678
ipynb,
1679
});
1680
};
1681
1682
private handleMessageFromProject = async (data) => {
1683
const dbg = this.dbg("handleMessageFromProject");
1684
if (this.is_closed()) {
1685
dbg("called AFTER closed");
1686
return;
1687
}
1688
// This is call on the remote compute server to handle api requests.
1689
dbg();
1690
// output could be very BIG:
1691
// dbg(data);
1692
if (data.event == "api-request") {
1693
const response = await this.handleApiRequest(data.request);
1694
try {
1695
await this.syncdb.sendMessageToProject({
1696
event: "api-response",
1697
id: data.id,
1698
response,
1699
});
1700
} catch (err) {
1701
// this happens when the websocket is disconnected
1702
dbg(`WARNING -- issue responding to message ${err}`);
1703
}
1704
return;
1705
}
1706
};
1707
1708
private handleApiRequest = async (data) => {
1709
if (this.remoteApiHandler != null) {
1710
return await this.handleApiRequestViaRemoteApiHandler(data);
1711
}
1712
const dbg = this.dbg("handleApiRequest");
1713
const { path, endpoint, query } = data;
1714
dbg("handling request in project", path);
1715
try {
1716
return await handleApiRequest(path, endpoint, query);
1717
} catch (err) {
1718
dbg("error -- ", err.message);
1719
return { event: "error", message: err.message };
1720
}
1721
};
1722
1723
private handleApiRequestViaRemoteApiHandler = async (data) => {
1724
const dbg = this.dbg("handleApiRequestViaRemoteApiHandler");
1725
dbg(data?.path);
1726
try {
1727
if (!this.is_project) {
1728
throw Error("BUG -- remote api requests only make sense in a project");
1729
}
1730
if (this.remoteApiHandler == null) {
1731
throw Error("BUG -- remote api handler not registered");
1732
}
1733
// Send a message to the remote asking it to handle this api request,
1734
// which calls the function handleMessageFromProject from above in that remote process.
1735
const { id, spark, responseCallbacks } = this.remoteApiHandler;
1736
spark.write({
1737
event: "message",
1738
data: { event: "api-request", request: data, id },
1739
});
1740
const waitForResponse = (cb) => {
1741
responseCallbacks[id] = cb;
1742
};
1743
this.remoteApiHandler.id += 1; // increment sequential protocol message tracker id
1744
return (await callback(waitForResponse)).response;
1745
} catch (err) {
1746
dbg("error -- ", err.message);
1747
return { event: "error", message: err.message };
1748
}
1749
};
1750
1751
// Handle transient cell messages.
1752
handleTransientUpdate = (mesg) => {
1753
const display_id = mesg.content?.transient?.display_id;
1754
if (!display_id) {
1755
return false;
1756
}
1757
1758
let matched = false;
1759
// are there any transient outputs in the entire document that
1760
// have this display_id? search to find them.
1761
// TODO: we could use a clever data structure to make
1762
// this faster and more likely to have bugs.
1763
const cells = this.syncdb.get({ type: "cell" });
1764
for (let cell of cells) {
1765
let output = cell.get("output");
1766
if (output != null) {
1767
for (const [n, val] of output) {
1768
if (val.getIn(["transient", "display_id"]) == display_id) {
1769
// found a match -- replace it
1770
output = output.set(n, immutable.fromJS(mesg.content));
1771
this.syncdb.set({ type: "cell", id: cell.get("id"), output });
1772
matched = true;
1773
}
1774
}
1775
}
1776
}
1777
if (matched) {
1778
this.syncdb.commit();
1779
}
1780
};
1781
// End Websocket API
1782
}
1783
1784