Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/jupyter/kernel/kernel.ts
5738 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Jupyter Backend
8
9
For interactive testing:
10
11
$ node
12
13
> j = require('./dist/kernel'); k = j.kernel({name:'python3', path:'x.ipynb'});
14
> console.log(JSON.stringify(await k.execute_code_now({code:'2+3'}),0,2))
15
16
*/
17
18
// POOL VERSION - faster to restart but possible subtle issues
19
const USE_KERNEL_POOL = true;
20
21
// const DEBUG = true; // only for extreme debugging.
22
const DEBUG = false; // normal mode
23
if (DEBUG) {
24
console.log("Enabling low level Jupyter kernel debugging.");
25
}
26
27
// NOTE: we choose to use node-cleanup instead of the much more
28
// popular exit-hook, since node-cleanup actually works for us.
29
// https://github.com/jtlapp/node-cleanup/issues/16
30
// Also exit-hook is hard to import from commonjs.
31
import nodeCleanup from "node-cleanup";
32
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
33
import { callback } from "awaiting";
34
import type { MessageType } from "@cocalc/jupyter/zmq/types";
35
import { jupyterSockets, type JupyterSockets } from "@cocalc/jupyter/zmq";
36
import { EventEmitter } from "node:events";
37
import { unlink } from "@cocalc/backend/misc/async-utils-node";
38
import { remove_redundant_reps } from "@cocalc/jupyter/ipynb/import-from-ipynb";
39
import { JupyterActions } from "@cocalc/jupyter/redux/project-actions";
40
import {
41
type BlobStoreInterface,
42
CodeExecutionEmitterInterface,
43
ExecOpts,
44
JupyterKernelInterface,
45
KernelInfo,
46
} from "@cocalc/jupyter/types/project-interface";
47
import { JupyterStore } from "@cocalc/jupyter/redux/store";
48
import { JUPYTER_MIMETYPES } from "@cocalc/jupyter/util/misc";
49
import type { SyncDB } from "@cocalc/sync/editor/db/sync";
50
import { retry_until_success, until } from "@cocalc/util/async-utils";
51
import createChdirCommand from "@cocalc/util/jupyter-api/chdir-commands";
52
import { key_value_store } from "@cocalc/util/key-value-store";
53
import {
54
copy,
55
deep_copy,
56
is_array,
57
len,
58
merge,
59
original_path,
60
path_split,
61
uuid,
62
uint8ArrayToBase64,
63
} from "@cocalc/util/misc";
64
import { CodeExecutionEmitter } from "@cocalc/jupyter/execute/execute-code";
65
import {
66
getLanguage,
67
get_kernel_data_by_name,
68
} from "@cocalc/jupyter/kernel/kernel-data";
69
70
import launchJupyterKernel, {
71
LaunchJupyterOpts,
72
SpawnedKernel,
73
killKernel,
74
} from "@cocalc/jupyter/pool/pool";
75
// non-pool version
76
import launchJupyterKernelNoPool from "@cocalc/jupyter/kernel/launch-kernel";
77
import { kernels } from "./kernels";
78
import { getAbsolutePathFromHome } from "@cocalc/jupyter/util/fs";
79
import type { KernelParams } from "@cocalc/jupyter/types/kernel";
80
import { redux_name } from "@cocalc/util/redux/name";
81
import { redux } from "@cocalc/jupyter/redux/app";
82
import { VERSION } from "@cocalc/jupyter/kernel/version";
83
import type { NbconvertParams } from "@cocalc/util/jupyter/types";
84
import type { Client } from "@cocalc/sync/client/types";
85
import { getLogger } from "@cocalc/backend/logger";
86
import { base64ToBuffer } from "@cocalc/util/base64";
87
import { sha1 as misc_node_sha1 } from "@cocalc/backend/misc_node";
88
import { join } from "path";
89
import { readFile } from "fs/promises";
90
91
const MAX_KERNEL_SPAWN_TIME = 120 * 1000;
92
93
type State = "failed" | "off" | "spawning" | "starting" | "running" | "closed";
94
95
const logger = getLogger("jupyter:kernel");
96
97
// We make it so nbconvert functionality can be dynamically enabled
98
// by calling this at runtime. The reason is because some users of
99
// this code (e.g., remote kernels) don't need to provide nbconvert
100
// functionality, and our implementation has some heavy dependencies,
101
// e.g., on a big chunk of the react frontend.
102
let nbconvert: (opts: NbconvertParams) => Promise<void> = async () => {
103
throw Error("nbconvert is not enabled");
104
};
105
export function initNbconvert(f) {
106
nbconvert = f;
107
}
108
109
/*
110
We set a few extra user-specific options for the environment in which
111
Sage-based Jupyter kernels run; these are more multi-user friendly.
112
*/
113
const SAGE_JUPYTER_ENV = merge(copy(process.env), {
114
PYTHONUSERBASE: `${process.env.HOME}/.local`,
115
PYTHON_EGG_CACHE: `${process.env.HOME}/.sage/.python-eggs`,
116
R_MAKEVARS_USER: `${process.env.HOME}/.sage/R/Makevars.user`,
117
});
118
119
// Initialize the actions and store for working with a specific
120
// Jupyter notebook. The syncdb is the syncdoc associated to
121
// the ipynb file, and this function creates the corresponding
122
// actions and store, which make it possible to work with this
123
// notebook.
124
export async function initJupyterRedux(syncdb: SyncDB, client: Client) {
125
const project_id = syncdb.project_id;
126
if (project_id == null) {
127
throw Error("project_id must be defined");
128
}
129
if (syncdb.get_state() == "closed") {
130
throw Error("syncdb must not be closed");
131
}
132
133
// This path is the file we will watch for changes and save to, which is in the original
134
// official ipynb format:
135
const path = original_path(syncdb.get_path());
136
logger.debug("initJupyterRedux", path);
137
138
const name = redux_name(project_id, path);
139
if (redux.getStore(name) != null && redux.getActions(name) != null) {
140
logger.debug(
141
"initJupyterRedux",
142
path,
143
" -- existing actions, so removing them",
144
);
145
// The redux info for this notebook already exists, so don't
146
// try to make it again without first deleting the existing one.
147
// Having two at once basically results in things feeling hung.
148
// This should never happen, but we ensure it
149
// See https://github.com/sagemathinc/cocalc/issues/4331
150
await removeJupyterRedux(path, project_id);
151
}
152
const store = redux.createStore(name, JupyterStore);
153
const actions = redux.createActions(name, JupyterActions);
154
155
actions._init(project_id, path, syncdb, store, client);
156
157
syncdb.once("error", (err) =>
158
logger.error("initJupyterRedux", path, "syncdb ERROR", err),
159
);
160
syncdb.once("ready", () =>
161
logger.debug("initJupyterRedux", path, "syncdb ready"),
162
);
163
}
164
165
export async function getJupyterRedux(syncdb: SyncDB) {
166
const project_id = syncdb.project_id;
167
const path = original_path(syncdb.get_path());
168
const name = redux_name(project_id, path);
169
return { actions: redux.getActions(name), store: redux.getStore(name) };
170
}
171
172
// Remove the store/actions for a given Jupyter notebook,
173
// and also close the kernel if it is running.
174
export async function removeJupyterRedux(
175
path: string,
176
project_id: string,
177
): Promise<void> {
178
logger.debug("removeJupyterRedux", path);
179
// if there is a kernel, close it
180
try {
181
await kernels.get(path)?.close();
182
} catch (_err) {
183
// ignore
184
}
185
const name = redux_name(project_id, path);
186
const actions = redux.getActions(name);
187
if (actions != null) {
188
try {
189
await actions.close();
190
} catch (err) {
191
logger.debug(
192
"removeJupyterRedux",
193
path,
194
" WARNING -- issue closing actions",
195
err,
196
);
197
}
198
}
199
redux.removeStore(name);
200
redux.removeActions(name);
201
}
202
203
export function kernel(opts: KernelParams): JupyterKernel {
204
return new JupyterKernel(opts.name, opts.path, opts.actions, opts.ulimit);
205
}
206
207
/*
208
Jupyter Kernel interface.
209
210
The kernel does *NOT* start up until either spawn is explicitly called, or
211
code execution is explicitly requested. This makes it possible to
212
call process_output without spawning an actual kernel.
213
*/
214
215
// Ensure that the kernels all get killed when the process exits.
216
nodeCleanup(() => {
217
for (const kernelPath in kernels.kernels) {
218
// We do NOT await the close since that's not really
219
// supported or possible in general.
220
const { _kernel } = kernels.kernels[kernelPath];
221
if (_kernel) {
222
killKernel(_kernel);
223
}
224
}
225
});
226
227
// NOTE: keep JupyterKernel implementation private -- use the kernel function
228
// above, and the interface defined in types.
229
export class JupyterKernel
230
extends EventEmitter
231
implements JupyterKernelInterface
232
{
233
// name -- if undefined that means "no actual Jupyter kernel" (i.e., this JupyterKernel exists
234
// here, but there is no actual separate real Jupyter kernel process and one won't be created).
235
// Everything should work, except you can't *spawn* such a kernel.
236
public name: string | undefined;
237
238
// this is a key:value store used mainly for stdin support right now. NOTHING TO DO WITH REDUX!
239
public store: any;
240
241
public readonly identity: string = uuid();
242
243
private stderr: string = "";
244
private ulimit?: string;
245
private _path: string;
246
private _actions?: JupyterActions;
247
private _state: State;
248
private _directory: string;
249
private _filename: string;
250
public _kernel?: SpawnedKernel;
251
private _kernel_info?: KernelInfo;
252
public _execute_code_queue: CodeExecutionEmitter[] = [];
253
public sockets?: JupyterSockets;
254
private has_ensured_running: boolean = false;
255
private failedError: string = "";
256
257
constructor(
258
name: string | undefined,
259
_path: string,
260
_actions: JupyterActions | undefined,
261
ulimit: string | undefined,
262
) {
263
super();
264
265
this.ulimit = ulimit;
266
267
this.name = name;
268
this._path = _path;
269
this._actions = _actions;
270
271
this.store = key_value_store();
272
const { head, tail } = path_split(getAbsolutePathFromHome(this._path));
273
this._directory = head;
274
this._filename = tail;
275
this.setState("off");
276
this._execute_code_queue = [];
277
if (kernels.get(this._path) !== undefined) {
278
// This happens when we change the kernel for a given file, e.g.,
279
// from python2 to python3.
280
// Obviously, it is important to clean up after the old kernel.
281
kernels.get(this._path)?.close();
282
}
283
kernels.set(this._path, this);
284
this.setMaxListeners(100);
285
const dbg = this.dbg("constructor");
286
dbg("done");
287
}
288
289
get_path = () => {
290
return this._path;
291
};
292
293
// no-op if calling it doesn't change the state.
294
private setState = (state: State): void => {
295
// state = 'off' --> 'spawning' --> 'starting' --> 'running' --> 'closed'
296
// 'failed'
297
if (this._state == state) return;
298
this._state = state;
299
this.emit("state", this._state);
300
this.emit(this._state); // we *SHOULD* use this everywhere, not above.
301
};
302
303
private setFailed = (error: string): void => {
304
this.failedError = error;
305
this.emit("kernel_error", error);
306
this.setState("failed");
307
};
308
309
get_state = (): string => {
310
return this._state;
311
};
312
313
private spawnedAlready = false;
314
spawn = async (spawn_opts?: {
315
env?: { [key: string]: string };
316
}): Promise<void> => {
317
if (this._state === "closed") {
318
// game over!
319
throw Error("closed -- kernel spawn");
320
}
321
if (!this.name) {
322
// spawning not allowed.
323
throw Error("cannot spawn since no kernel is set");
324
}
325
if (["running", "starting"].includes(this._state)) {
326
// Already spawned, so no need to do it again.
327
return;
328
}
329
330
if (this.spawnedAlready) {
331
return;
332
}
333
this.spawnedAlready = true;
334
335
this.setState("spawning");
336
const dbg = this.dbg("spawn");
337
dbg("spawning kernel...");
338
339
// ****
340
// CRITICAL: anything added to opts better not be specific
341
// to the kernel path or it will completely break using a
342
// pool, which makes things massively slower.
343
// ****
344
345
const opts: LaunchJupyterOpts = {
346
env: spawn_opts?.env ?? {},
347
ulimit: this.ulimit,
348
};
349
350
try {
351
const kernelData = await get_kernel_data_by_name(this.name);
352
// This matches "sage", "sage-x.y", and Sage Python3 ("sage -python -m ipykernel")
353
if (kernelData.argv[0].startsWith("sage")) {
354
dbg("setting special environment for Sage kernels");
355
opts.env = merge(opts.env, SAGE_JUPYTER_ENV);
356
}
357
} catch (err) {
358
dbg(`No kernelData available for ${this.name}`);
359
}
360
361
// Make cocalc default to the colab renderer for cocalc-jupyter, since
362
// this one happens to work best for us, and they don't have a custom
363
// one for us. See https://plot.ly/python/renderers/ and
364
// https://github.com/sagemathinc/cocalc/issues/4259
365
opts.env.PLOTLY_RENDERER = "colab";
366
opts.env.COCALC_JUPYTER_KERNELNAME = this.name;
367
368
// !!! WARNING: do NOT add anything new here that depends on that path!!!!
369
// Otherwise the pool will switch to falling back to not being used, and
370
// cocalc would then be massively slower.
371
// Non-uniform customization.
372
// launchJupyterKernel is explicitly smart enough to deal with opts.cwd
373
if (this._directory) {
374
opts.cwd = this._directory;
375
}
376
// launchJupyterKernel is explicitly smart enough to deal with opts.env.COCALC_JUPYTER_FILENAME
377
opts.env.COCALC_JUPYTER_FILENAME = this._path;
378
// and launchJupyterKernel is NOT smart enough to deal with anything else!
379
380
try {
381
if (USE_KERNEL_POOL) {
382
dbg("launching Jupyter kernel, possibly from pool");
383
this._kernel = await launchJupyterKernel(this.name, opts);
384
} else {
385
dbg("launching Jupyter kernel, NOT using pool");
386
this._kernel = await launchJupyterKernelNoPool(this.name, opts);
387
}
388
dbg("finishing kernel setup");
389
await this.finishSpawningKernel();
390
} catch (err) {
391
dbg(`ERROR spawning kernel - ${err}, ${err.stack}`);
392
// @ts-ignore
393
if (this._state == "closed") {
394
throw Error("closed");
395
}
396
// console.trace(err);
397
this.setFailed(
398
`**Unable to Spawn Jupyter Kernel:**\n\n${err} \n\nTry this in a terminal to help debug this (or contact support): \`jupyter console --kernel=${this.name}\`\n\nOnce you fix the problem, explicitly restart this kernel to test here.`,
399
);
400
}
401
};
402
403
get_spawned_kernel = () => {
404
return this._kernel;
405
};
406
407
get_connection_file = (): string | undefined => {
408
return this._kernel?.connectionFile;
409
};
410
411
private finishSpawningKernel = async () => {
412
const dbg = this.dbg("finishSpawningKernel");
413
dbg("now finishing spawn of kernel...");
414
415
if (DEBUG) {
416
this.low_level_dbg();
417
}
418
419
if (!this._kernel) {
420
throw Error("_kernel must be defined");
421
}
422
this._kernel.spawn.on("error", (err) => {
423
const error = `${err}\n${this.stderr}`;
424
dbg("kernel error", error);
425
this.setFailed(error);
426
});
427
428
// Track stderr from the subprocess itself (the kernel).
429
// This is useful for debugging broken kernels, etc., and is especially
430
// useful since it exists even if the kernel sends nothing over any
431
// zmq sockets (e.g., due to being very broken).
432
this.stderr = "";
433
this._kernel.spawn.stderr.on("data", (data) => {
434
const s = data.toString();
435
this.stderr += s;
436
if (this.stderr.length > 5000) {
437
// truncate if gets long for some reason -- only the end will
438
// be useful...
439
this.stderr = this.stderr.slice(this.stderr.length - 4000);
440
}
441
});
442
443
this._kernel.spawn.stdout.on("data", (_data) => {
444
// NOTE: it is very important to read stdout (and stderr above)
445
// even if we **totally ignore** the data. Otherwise, exec
446
// might overflow
447
// https://github.com/sagemathinc/cocalc/issues/5065
448
});
449
450
dbg("create main channel...", this._kernel.config);
451
452
// This horrible code is because jupyterSockets will just "hang
453
// forever" if the kernel doesn't get spawned for some reason.
454
// (TODO: now that I completely rewrote jupytersockets, we could
455
// just put a timeout there or better checks? not sure.)
456
// Thus we do some tests, waiting for at least 2 seconds for there
457
// to be a pid. This is complicated and ugly, and I'm sorry about that,
458
// but sometimes that's life.
459
try {
460
await until(
461
() => {
462
if (this._state != "spawning") {
463
// gave up
464
return true;
465
}
466
if (this.pid()) {
467
// there's a process :-)
468
return true;
469
}
470
return false;
471
},
472
{ start: 100, max: 100, timeout: 3000 },
473
);
474
} catch (err) {
475
// timed out
476
this.setFailed(`Failed to start kernel process. ${err}`);
477
return;
478
}
479
if (this._state != "spawning") {
480
// got canceled
481
return;
482
}
483
const pid = this.pid();
484
if (!pid) {
485
throw Error("bug");
486
}
487
let success = false;
488
let gaveUp = false;
489
setTimeout(() => {
490
if (!success) {
491
gaveUp = true;
492
// it's been 30s and the channels didn't work. Let's give up.
493
// probably the kernel process just failed.
494
this.setFailed("Failed to start kernel process -- timeout");
495
// We can't yet "cancel" createMainChannel itself -- that will require
496
// rewriting that dependency.
497
// https://github.com/sagemathinc/cocalc/issues/7040
498
// I did rewrite that -- so let's revisit this!
499
}
500
}, MAX_KERNEL_SPAWN_TIME);
501
const sockets = await jupyterSockets(this._kernel.config, this.identity);
502
if (gaveUp) {
503
process.kill(-pid, 9);
504
return;
505
}
506
this.sockets = sockets;
507
success = true;
508
dbg("created main channel");
509
sockets.on("shell", (mesg) => this.emit("shell", mesg));
510
sockets.on("stdin", (mesg) => this.emit("stdin", mesg));
511
sockets.on("iopub", (mesg) => {
512
this.setState("running");
513
if (mesg.content != null && mesg.content.execution_state != null) {
514
this.emit("execution_state", mesg.content.execution_state);
515
}
516
517
if (mesg.content?.comm_id != null) {
518
// A comm message, which gets handled directly.
519
this.process_comm_message_from_kernel(mesg);
520
return;
521
}
522
523
if (this._actions?.capture_output_message(mesg)) {
524
// captured an output message -- do not process further
525
return;
526
}
527
528
this.emit("iopub", mesg);
529
});
530
531
this._kernel.spawn.once("exit", (exit_code, signal) => {
532
if (this._state === "closed") {
533
return;
534
}
535
this.dbg("kernel_exit")(
536
`spawned kernel terminated with exit code ${exit_code} (signal=${signal}); stderr=${this.stderr}`,
537
);
538
const stderr = this.stderr ? `\n...\n${this.stderr}` : "";
539
if (signal != null) {
540
this.setFailed(`Kernel last terminated by signal ${signal}.${stderr}`);
541
} else if (exit_code != null) {
542
this.setFailed(`Kernel last exited with code ${exit_code}.${stderr}`);
543
}
544
this.close();
545
});
546
547
if (this._state == "spawning") {
548
// so we can start sending code execution to the kernel, etc.
549
this.setState("starting");
550
}
551
};
552
553
pid = (): number | undefined => {
554
return this._kernel?.spawn?.pid;
555
};
556
557
// Signal should be a string like "SIGINT", "SIGKILL".
558
// See https://nodejs.org/api/process.html#process_process_kill_pid_signal
559
signal = (signal: string): void => {
560
const dbg = this.dbg("signal");
561
const pid = this.pid();
562
dbg(`pid=${pid}, signal=${signal}`);
563
if (!pid) {
564
return;
565
}
566
try {
567
process.kill(-pid, signal); // negative to signal the process group
568
this.clear_execute_code_queue();
569
} catch (err) {
570
dbg(`error: ${err}`);
571
}
572
};
573
574
close = (): void => {
575
this.dbg("close")();
576
if (this._state === "closed") {
577
return;
578
}
579
this.signal("SIGKILL");
580
if (this.sockets != null) {
581
this.sockets.close();
582
delete this.sockets;
583
}
584
this.setState("closed");
585
if (this.store != null) {
586
this.store.close();
587
delete this.store;
588
}
589
const kernel = kernels.get(this._path);
590
if (kernel != null && kernel.identity === this.identity) {
591
kernels.delete(this._path);
592
}
593
this.removeAllListeners();
594
if (this._kernel != null) {
595
killKernel(this._kernel);
596
delete this._kernel;
597
delete this.sockets;
598
}
599
if (this._execute_code_queue != null) {
600
for (const runningCode of this._execute_code_queue) {
601
runningCode.close();
602
}
603
this._execute_code_queue = [];
604
}
605
};
606
607
// public, since we do use it from some other places...
608
dbg = (f: string): Function => {
609
return (...args) => {
610
//console.log(
611
logger.debug(
612
`jupyter.Kernel('${this.name ?? "no kernel"}',path='${
613
this._path
614
}').${f}`,
615
...args,
616
);
617
};
618
};
619
620
low_level_dbg = (): void => {
621
const dbg = (...args) => logger.silly("low_level_debug", ...args);
622
dbg("Enabling");
623
if (this._kernel) {
624
this._kernel.spawn.all?.on("data", (data) =>
625
dbg("STDIO", data.toString()),
626
);
627
}
628
};
629
630
ensure_running = reuseInFlight(async (): Promise<void> => {
631
const dbg = this.dbg("ensure_running");
632
dbg(this._state);
633
if (this._state == "closed") {
634
throw Error("closed so not possible to ensure running");
635
}
636
if (this._state == "running") {
637
return;
638
}
639
dbg("spawning");
640
await this.spawn();
641
if (this.get_state() != "starting" && this.get_state() != "running") {
642
return;
643
}
644
if (this._kernel?.initCode != null) {
645
for (const code of this._kernel?.initCode ?? []) {
646
dbg("initCode ", code);
647
this.execute_code({ code }, true);
648
}
649
}
650
if (!this.has_ensured_running) {
651
this.has_ensured_running = true;
652
}
653
});
654
655
execute_code = (
656
opts: ExecOpts,
657
skipToFront = false,
658
): CodeExecutionEmitterInterface => {
659
if (opts.halt_on_error === undefined) {
660
// if not specified, default to true.
661
opts.halt_on_error = true;
662
}
663
if (this._state === "closed") {
664
throw Error("closed -- kernel -- execute_code");
665
}
666
const code = new CodeExecutionEmitter(this, opts);
667
if (skipToFront) {
668
this._execute_code_queue.unshift(code);
669
} else {
670
this._execute_code_queue.push(code);
671
}
672
if (this._execute_code_queue.length == 1) {
673
// start it going!
674
this._process_execute_code_queue();
675
}
676
return code;
677
};
678
679
cancel_execute = (id: string): void => {
680
if (this._state === "closed") {
681
return;
682
}
683
const dbg = this.dbg(`cancel_execute(id='${id}')`);
684
if (
685
this._execute_code_queue == null ||
686
this._execute_code_queue.length === 0
687
) {
688
dbg("nothing to do");
689
return;
690
}
691
if (this._execute_code_queue.length > 1) {
692
dbg(
693
"mutate this._execute_code_queue removing everything with the given id",
694
);
695
for (let i = this._execute_code_queue.length - 1; i--; i >= 1) {
696
const code = this._execute_code_queue[i];
697
if (code.id === id) {
698
dbg(`removing entry ${i} from queue`);
699
this._execute_code_queue.splice(i, 1);
700
code.cancel();
701
}
702
}
703
}
704
// if the currently running computation involves this id, send an
705
// interrupt signal (that's the best we can do)
706
if (this._execute_code_queue[0].id === id) {
707
dbg("interrupting running computation");
708
this.signal("SIGINT");
709
}
710
};
711
712
_process_execute_code_queue = async (): Promise<void> => {
713
const dbg = this.dbg("_process_execute_code_queue");
714
dbg(`state='${this._state}'`);
715
if (this._state === "closed") {
716
dbg("closed");
717
return;
718
}
719
if (this._execute_code_queue == null) {
720
dbg("no queue");
721
return;
722
}
723
const n = this._execute_code_queue.length;
724
if (n === 0) {
725
dbg("queue is empty");
726
return;
727
}
728
dbg(`queue has ${n} items; ensure kernel running`);
729
try {
730
await this.ensure_running();
731
await this._execute_code_queue[0].go();
732
} catch (err) {
733
dbg(`WARNING: error running kernel -- ${err}`);
734
for (const code of this._execute_code_queue) {
735
code.throw_error(err);
736
}
737
this._execute_code_queue = [];
738
}
739
};
740
741
clear_execute_code_queue = (): void => {
742
const dbg = this.dbg("_clear_execute_code_queue");
743
// ensure no future queued up evaluation occurs (currently running
744
// one will complete and new executions could happen)
745
if (this._state === "closed") {
746
dbg("no op since state is closed");
747
return;
748
}
749
if (this._execute_code_queue == null) {
750
dbg("nothing to do since queue is null");
751
return;
752
}
753
dbg(`clearing queue of size ${this._execute_code_queue.length}`);
754
const mesg = { done: true };
755
for (const code_execution_emitter of this._execute_code_queue.slice(1)) {
756
code_execution_emitter.emit_output(mesg);
757
code_execution_emitter.close();
758
}
759
this._execute_code_queue = [];
760
};
761
762
// This is like execute_code, but async and returns all the results.
763
// This is used for unit testing and interactive work at
764
// the terminal and nbgrader and the stateless api.
765
execute_code_now = async (opts: ExecOpts): Promise<object[]> => {
766
this.dbg("execute_code_now")();
767
if (this._state == "closed") {
768
throw Error("closed");
769
}
770
if (this.failedError) {
771
throw Error(this.failedError);
772
}
773
const output = this.execute_code({ halt_on_error: true, ...opts });
774
const v: object[] = [];
775
for await (const mesg of output.iter()) {
776
v.push(mesg);
777
}
778
if (this.failedError) {
779
// kernel failed during call
780
throw Error(this.failedError);
781
}
782
return v;
783
};
784
785
private saveBlob = (data: string, type: string) => {
786
const blobs = this._actions?.blobs;
787
if (blobs == null) {
788
throw Error("blob store not available");
789
}
790
const buf: Buffer = !type.startsWith("text/")
791
? Buffer.from(data, "base64")
792
: Buffer.from(data);
793
794
const sha1: string = misc_node_sha1(buf);
795
blobs.set(sha1, buf);
796
return sha1;
797
};
798
799
process_output = (content: any): void => {
800
if (this._state === "closed") {
801
return;
802
}
803
const dbg = this.dbg("process_output");
804
if (content.data == null) {
805
// No data -- https://github.com/sagemathinc/cocalc/issues/6665
806
// NO do not do this sort of thing. This is exactly the sort of situation where
807
// content could be very large, and JSON.stringify could use huge amounts of memory.
808
// If you need to see this for debugging, uncomment it.
809
// dbg(trunc(JSON.stringify(content), 300));
810
// todo: FOR now -- later may remove large stdout, stderr, etc...
811
// dbg("no data, so nothing to do");
812
return;
813
}
814
815
remove_redundant_reps(content.data);
816
817
const saveBlob = (data, type) => {
818
try {
819
return this.saveBlob(data, type);
820
} catch (err) {
821
dbg(`WARNING: Jupyter blob store not working -- ${err}`);
822
// i think it'll just send the large data on in the usual way instead
823
// via the output, instead of using the blob store. It's probably just
824
// less efficient.
825
}
826
};
827
828
let type: string;
829
for (type of JUPYTER_MIMETYPES) {
830
if (content.data[type] == null) {
831
continue;
832
}
833
if (
834
type.split("/")[0] === "image" ||
835
type === "application/pdf" ||
836
type === "text/html"
837
) {
838
// Store all images and PDF and text/html in a binary blob store, so we don't have
839
// to involve it in realtime sync. It tends to be large, etc.
840
const sha1 = saveBlob(content.data[type], type);
841
if (type == "text/html") {
842
// NOTE: in general, this may or may not get rendered as an iframe --
843
// we use iframe for backward compatibility.
844
content.data["iframe"] = sha1;
845
delete content.data["text/html"];
846
} else {
847
content.data[type] = sha1;
848
}
849
}
850
}
851
};
852
853
call = async (msg_type: string, content?: any): Promise<any> => {
854
this.dbg("call")(msg_type);
855
if (!this.has_ensured_running) {
856
await this.ensure_running();
857
}
858
// Do a paranoid double check anyways...
859
if (this.sockets == null || this._state == "closed") {
860
throw Error("not running, so can't call");
861
}
862
863
const message = {
864
parent_header: {},
865
metadata: {},
866
channel: "shell",
867
content,
868
header: {
869
msg_id: uuid(),
870
username: "",
871
session: "",
872
msg_type: msg_type as MessageType,
873
version: VERSION,
874
date: new Date().toISOString(),
875
},
876
};
877
878
// Send the message
879
this.sockets.send(message);
880
881
// Wait for the response that has the right msg_id.
882
let the_mesg: any = undefined;
883
const wait_for_response = (cb) => {
884
const f = (mesg) => {
885
if (mesg.parent_header.msg_id === message.header.msg_id) {
886
this.removeListener("shell", f);
887
this.removeListener("closed", g);
888
mesg = deep_copy(mesg.content);
889
if (len(mesg.metadata) === 0) {
890
delete mesg.metadata;
891
}
892
the_mesg = mesg;
893
cb();
894
}
895
};
896
const g = () => {
897
this.removeListener("shell", f);
898
this.removeListener("closed", g);
899
cb("closed - jupyter - kernel - call");
900
};
901
this.on("shell", f);
902
this.on("closed", g);
903
};
904
await callback(wait_for_response);
905
return the_mesg;
906
};
907
908
complete = async (opts: { code: any; cursor_pos: any }): Promise<any> => {
909
const dbg = this.dbg("complete");
910
dbg(`code='${opts.code}', cursor_pos='${opts.cursor_pos}'`);
911
return await this.call("complete_request", opts);
912
};
913
914
introspect = async (opts: {
915
code: any;
916
cursor_pos: any;
917
detail_level: any;
918
}): Promise<any> => {
919
const dbg = this.dbg("introspect");
920
dbg(
921
`code='${opts.code}', cursor_pos='${opts.cursor_pos}', detail_level=${opts.detail_level}`,
922
);
923
return await this.call("inspect_request", opts);
924
};
925
926
kernel_info = reuseInFlight(async (): Promise<KernelInfo> => {
927
if (this._kernel_info !== undefined) {
928
return this._kernel_info;
929
}
930
const info = await this.call("kernel_info_request");
931
info.nodejs_version = process.version;
932
if (this._actions != null) {
933
info.start_time = this._actions.store.get("start_time");
934
}
935
this._kernel_info = info;
936
return info;
937
});
938
939
save_ipynb_file = async (opts?): Promise<void> => {
940
if (this._actions != null) {
941
await this._actions.save_ipynb_file(opts);
942
} else {
943
throw Error("save_ipynb_file -- ERROR: actions not known");
944
}
945
};
946
947
more_output = (id: string): any[] => {
948
if (id == null) {
949
throw new Error("must specify id");
950
}
951
if (this._actions == null) {
952
throw new Error("must have redux actions");
953
}
954
return this._actions.store.get_more_output(id) ?? [];
955
};
956
957
nbconvert = reuseInFlight(
958
async (args: string[], timeout?: number): Promise<void> => {
959
if (timeout === undefined) {
960
timeout = 60; // seconds
961
}
962
if (!is_array(args)) {
963
throw new Error("args must be an array");
964
}
965
args = copy(args);
966
args.push("--");
967
args.push(this._filename);
968
await nbconvert({
969
args,
970
timeout,
971
directory: this._directory,
972
});
973
},
974
);
975
976
load_attachment = async (path: string): Promise<string> => {
977
const dbg = this.dbg("load_attachment");
978
dbg(`path='${path}'`);
979
if (path[0] !== "/") {
980
path = join(process.env.HOME ?? "", path);
981
}
982
const f = async (): Promise<string> => {
983
const bs = this.get_blob_store();
984
if (bs == null) {
985
throw new Error("BlobStore not available");
986
}
987
return await bs.readFile(path);
988
};
989
try {
990
return await retry_until_success({
991
f,
992
max_time: 30000,
993
});
994
} catch (err) {
995
unlink(path); // TODO: think through again if this is the right thing to do.
996
throw err;
997
}
998
};
999
1000
// This is called by project-actions when exporting the notebook
1001
// to an ipynb file:
1002
get_blob_store = (): BlobStoreInterface | undefined => {
1003
const blobs = this._actions?.blobs;
1004
if (blobs == null) {
1005
return;
1006
}
1007
const t = new TextDecoder();
1008
return {
1009
getBase64: (sha1: string): string | undefined => {
1010
const buf = blobs.get(sha1);
1011
if (buf === undefined) {
1012
return buf;
1013
}
1014
return uint8ArrayToBase64(buf);
1015
},
1016
1017
getString: (sha1: string): string | undefined => {
1018
const buf = blobs.get(sha1);
1019
if (buf === undefined) {
1020
return buf;
1021
}
1022
return t.decode(buf);
1023
},
1024
1025
readFile: async (path: string): Promise<string> => {
1026
const buf = await readFile(path);
1027
const sha1: string = misc_node_sha1(buf);
1028
blobs.set(sha1, buf);
1029
return sha1;
1030
},
1031
1032
saveBase64: (data: string) => {
1033
const buf = Buffer.from(data, "base64");
1034
const sha1: string = misc_node_sha1(buf);
1035
blobs.set(sha1, buf);
1036
return sha1;
1037
},
1038
};
1039
};
1040
1041
process_comm_message_from_kernel = (mesg): void => {
1042
if (this._actions == null) {
1043
return;
1044
}
1045
const dbg = this.dbg("process_comm_message_from_kernel");
1046
// This can be HUGE so don't print out the entire message; e.g., it could contain
1047
// massive binary data!
1048
dbg(mesg.header);
1049
this._actions.process_comm_message_from_kernel(mesg);
1050
};
1051
1052
ipywidgetsGetBuffer = (
1053
model_id: string,
1054
// buffer_path is the string[] *or* the JSON of that.
1055
buffer_path: string | string[],
1056
): Buffer | undefined => {
1057
if (typeof buffer_path != "string") {
1058
buffer_path = JSON.stringify(buffer_path);
1059
}
1060
return this._actions?.syncdb.ipywidgets_state?.getBuffer(
1061
model_id,
1062
buffer_path,
1063
);
1064
};
1065
1066
send_comm_message_to_kernel = ({
1067
msg_id,
1068
comm_id,
1069
target_name,
1070
data,
1071
buffers64,
1072
buffers,
1073
}: {
1074
msg_id: string;
1075
comm_id: string;
1076
target_name: string;
1077
data: any;
1078
buffers64?: string[];
1079
buffers?: Buffer[];
1080
}): void => {
1081
if (this.sockets == null) {
1082
throw Error("sockets not initialized");
1083
}
1084
const dbg = this.dbg("send_comm_message_to_kernel");
1085
// this is HUGE
1086
// dbg({ msg_id, comm_id, target_name, data, buffers64 });
1087
if (buffers64 != null && buffers64.length > 0) {
1088
buffers = buffers64?.map((x) => Buffer.from(base64ToBuffer(x))) ?? [];
1089
dbg(
1090
"buffers lengths = ",
1091
buffers.map((x) => x.byteLength),
1092
);
1093
if (this._actions?.syncdb.ipywidgets_state != null) {
1094
this._actions.syncdb.ipywidgets_state.setModelBuffers(
1095
comm_id,
1096
data.buffer_paths,
1097
buffers,
1098
false,
1099
);
1100
}
1101
}
1102
1103
const message = {
1104
parent_header: {},
1105
metadata: {},
1106
channel: "shell",
1107
content: { comm_id, target_name, data },
1108
header: {
1109
msg_id,
1110
username: "user",
1111
session: "",
1112
msg_type: "comm_msg" as MessageType,
1113
version: VERSION,
1114
date: new Date().toISOString(),
1115
},
1116
buffers,
1117
};
1118
1119
// HUGE
1120
// dbg(message);
1121
// "The Kernel listens for these messages on the Shell channel,
1122
// and the Frontend listens for them on the IOPub channel." -- docs
1123
this.sockets.send(message);
1124
};
1125
1126
chdir = async (path: string): Promise<void> => {
1127
if (!this.name) return; // no kernel, no current directory
1128
const dbg = this.dbg("chdir");
1129
dbg({ path });
1130
let lang;
1131
try {
1132
// using probably cached data, so likely very fast
1133
lang = await getLanguage(this.name);
1134
} catch (err) {
1135
dbg("WARNING ", err);
1136
const info = await this.kernel_info();
1137
lang = info.language_info?.name ?? "";
1138
}
1139
1140
const absPath = getAbsolutePathFromHome(path);
1141
const code = createChdirCommand(lang, absPath);
1142
// code = '' if no command needed, e.g., for sparql.
1143
if (code) {
1144
await this.execute_code_now({ code });
1145
}
1146
};
1147
}
1148
1149
export function get_kernel_by_pid(pid: number): JupyterKernel | undefined {
1150
for (const kernel of Object.values(kernels.kernels)) {
1151
if (kernel.get_spawned_kernel()?.spawn.pid === pid) {
1152
return kernel;
1153
}
1154
}
1155
return;
1156
}
1157
1158