CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/jupyter/kernel/kernel.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Jupyter Backend
8
9
For interactive testing:
10
11
$ node
12
13
> j = require('./dist/kernel'); k = j.kernel({name:'python3', path:'x.ipynb'});
14
> console.log(JSON.stringify(await k.execute_code_now({code:'2+3'}),0,2))
15
16
*/
17
18
// const DEBUG = true; // only for extreme debugging.
19
const DEBUG = false; // normal mode
20
if (DEBUG) {
21
console.log("Enabling low level Jupyter kernel debugging.");
22
}
23
24
// NOTE: we choose to use node-cleanup instead of the much more
25
// popular exit-hook, since node-cleanup actually works for us.
26
// https://github.com/jtlapp/node-cleanup/issues/16
27
// Also exit-hook is hard to import from commonjs.
28
import nodeCleanup from "node-cleanup";
29
import type { Channels, MessageType } from "@nteract/messaging";
30
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
31
import { callback, delay } from "awaiting";
32
import { createMainChannel } from "enchannel-zmq-backend";
33
import { EventEmitter } from "node:events";
34
import { unlink } from "@cocalc/backend/misc/async-utils-node";
35
import {
36
process as iframe_process,
37
is_likely_iframe,
38
} from "@cocalc/jupyter/blobs/iframe";
39
import { remove_redundant_reps } from "@cocalc/jupyter/ipynb/import-from-ipynb";
40
import { JupyterActions } from "@cocalc/jupyter/redux/project-actions";
41
import {
42
CodeExecutionEmitterInterface,
43
ExecOpts,
44
JupyterKernelInterface,
45
KernelInfo,
46
} from "@cocalc/jupyter/types/project-interface";
47
import { JupyterStore } from "@cocalc/jupyter/redux/store";
48
import { JUPYTER_MIMETYPES } from "@cocalc/jupyter/util/misc";
49
import type { SyncDB } from "@cocalc/sync/editor/db/sync";
50
import { retry_until_success } from "@cocalc/util/async-utils";
51
import createChdirCommand from "@cocalc/util/jupyter-api/chdir-commands";
52
import { key_value_store } from "@cocalc/util/key-value-store";
53
import {
54
copy,
55
deep_copy,
56
is_array,
57
len,
58
merge,
59
original_path,
60
path_split,
61
uuid,
62
} from "@cocalc/util/misc";
63
import { CodeExecutionEmitter } from "@cocalc/jupyter/execute/execute-code";
64
import { get_blob_store_sync } from "@cocalc/jupyter/blobs";
65
import {
66
getLanguage,
67
get_kernel_data_by_name,
68
} from "@cocalc/jupyter/kernel/kernel-data";
69
import launchJupyterKernel, {
70
LaunchJupyterOpts,
71
SpawnedKernel,
72
killKernel,
73
} from "@cocalc/jupyter/pool/pool";
74
import { getAbsolutePathFromHome } from "@cocalc/jupyter/util/fs";
75
import type { KernelParams } from "@cocalc/jupyter/types/kernel";
76
import { redux_name } from "@cocalc/util/redux/name";
77
import { redux } from "@cocalc/jupyter/redux/app";
78
import { VERSION } from "@cocalc/jupyter/kernel/version";
79
import type { NbconvertParams } from "@cocalc/jupyter/types/nbconvert";
80
import type { Client } from "@cocalc/sync/client/types";
81
import { getLogger } from "@cocalc/backend/logger";
82
import { base64ToBuffer } from "@cocalc/util/base64";
83
84
const MAX_KERNEL_SPAWN_TIME = 120 * 1000;
85
86
const logger = getLogger("jupyter:kernel");
87
88
// We make it so nbconvert functionality can be dynamically enabled
89
// by calling this at runtime. The reason is because some users of
90
// this code (e.g., remote kernels) don't need to provide nbconvert
91
// functionality, and our implementation has some heavy dependencies,
92
// e.g., on a big chunk of the react frontend.
93
let nbconvert: (opts: NbconvertParams) => Promise<void> = async () => {
94
throw Error("nbconvert is not enabled");
95
};
96
export function initNbconvert(f) {
97
nbconvert = f;
98
}
99
100
/*
101
We set a few extra user-specific options for the environment in which
102
Sage-based Jupyter kernels run; these are more multi-user friendly.
103
*/
104
const SAGE_JUPYTER_ENV = merge(copy(process.env), {
105
PYTHONUSERBASE: `${process.env.HOME}/.local`,
106
PYTHON_EGG_CACHE: `${process.env.HOME}/.sage/.python-eggs`,
107
R_MAKEVARS_USER: `${process.env.HOME}/.sage/R/Makevars.user`,
108
});
109
110
// Initialize the actions and store for working with a specific
111
// Jupyter notebook. The syncdb is the syncdoc associated to
112
// the ipynb file, and this function creates the corresponding
113
// actions and store, which make it possible to work with this
114
// notebook.
115
export async function initJupyterRedux(syncdb: SyncDB, client: Client) {
116
const project_id = syncdb.project_id;
117
if (project_id == null) {
118
throw Error("project_id must be defined");
119
}
120
if (syncdb.get_state() == "closed") {
121
throw Error("syncdb must not be closed");
122
}
123
124
// This path is the file we will watch for changes and save to, which is in the original
125
// official ipynb format:
126
const path = original_path(syncdb.get_path());
127
logger.debug("initJupyterRedux", path);
128
129
const name = redux_name(project_id, path);
130
if (redux.getStore(name) != null && redux.getActions(name) != null) {
131
logger.debug(
132
"initJupyterRedux",
133
path,
134
" -- existing actions, so removing them",
135
);
136
// The redux info for this notebook already exists, so don't
137
// try to make it again without first deleting the existing one.
138
// Having two at once basically results in things feeling hung.
139
// This should never happen, but we ensure it
140
// See https://github.com/sagemathinc/cocalc/issues/4331
141
await removeJupyterRedux(path, project_id);
142
}
143
const store = redux.createStore(name, JupyterStore);
144
const actions = redux.createActions(name, JupyterActions);
145
146
actions._init(project_id, path, syncdb, store, client);
147
148
syncdb.once("error", (err) =>
149
logger.error("initJupyterRedux", path, "syncdb ERROR", err),
150
);
151
syncdb.once("ready", () =>
152
logger.debug("initJupyterRedux", path, "syncdb ready"),
153
);
154
}
155
156
export async function getJupyterRedux(syncdb: SyncDB) {
157
const project_id = syncdb.project_id;
158
const path = original_path(syncdb.get_path());
159
const name = redux_name(project_id, path);
160
return { actions: redux.getActions(name), store: redux.getStore(name) };
161
}
162
163
// Remove the store/actions for a given Jupyter notebook,
164
// and also close the kernel if it is running.
165
export async function removeJupyterRedux(
166
path: string,
167
project_id: string,
168
): Promise<void> {
169
logger.debug("removeJupyterRedux", path);
170
// if there is a kernel, close it
171
try {
172
await get_existing_kernel(path)?.close();
173
} catch (_err) {
174
// ignore
175
}
176
const name = redux_name(project_id, path);
177
const actions = redux.getActions(name);
178
if (actions != null) {
179
try {
180
await actions.close();
181
} catch (err) {
182
logger.debug(
183
"removeJupyterRedux",
184
path,
185
" WARNING -- issue closing actions",
186
err,
187
);
188
}
189
}
190
redux.removeStore(name);
191
redux.removeActions(name);
192
}
193
194
export function kernel(opts: KernelParams): JupyterKernel {
195
return new JupyterKernel(opts.name, opts.path, opts.actions, opts.ulimit);
196
}
197
198
/*
199
Jupyter Kernel interface.
200
201
The kernel does *NOT* start up until either spawn is explicitly called, or
202
code execution is explicitly requested. This makes it possible to
203
call process_output without spawning an actual kernel.
204
*/
205
const _jupyter_kernels: { [path: string]: JupyterKernel } = {};
206
207
// Ensure that the kernels all get killed when the process exits.
208
nodeCleanup(() => {
209
for (const kernelPath in _jupyter_kernels) {
210
// We do NOT await the close since that's not really
211
// supported or possible in general.
212
const { _kernel } = _jupyter_kernels[kernelPath] as any;
213
if (_kernel) {
214
killKernel(_kernel);
215
}
216
}
217
});
218
219
// NOTE: keep JupyterKernel implementation private -- use the kernel function
220
// above, and the interface defined in types.
221
class JupyterKernel extends EventEmitter implements JupyterKernelInterface {
222
// name -- if undefined that means "no actual Jupyter kernel" (i.e., this JupyterKernel exists
223
// here, but there is no actual separate real Jupyter kernel process and one won't be created).
224
// Everything should work, except you can't *spawn* such a kernel.
225
public name: string | undefined;
226
227
public store: any; // this is a key:value store used mainly for stdin support right now. NOTHING TO DO WITH REDUX!
228
public readonly identity: string = uuid();
229
230
private stderr: string = "";
231
private ulimit?: string;
232
private _path: string;
233
private _actions?: JupyterActions;
234
private _state: string;
235
private _directory: string;
236
private _filename: string;
237
private _kernel?: SpawnedKernel;
238
private _kernel_info?: KernelInfo;
239
public _execute_code_queue: CodeExecutionEmitter[] = [];
240
public channel?: Channels;
241
private has_ensured_running: boolean = false;
242
243
constructor(
244
name: string | undefined,
245
_path: string,
246
_actions: JupyterActions | undefined,
247
ulimit: string | undefined,
248
) {
249
super();
250
251
this.ulimit = ulimit;
252
this.spawn = reuseInFlight(this.spawn.bind(this));
253
254
this.kernel_info = reuseInFlight(this.kernel_info.bind(this));
255
this.nbconvert = reuseInFlight(this.nbconvert.bind(this));
256
this.ensure_running = reuseInFlight(this.ensure_running.bind(this));
257
258
this.close = this.close.bind(this);
259
this.process_output = this.process_output.bind(this);
260
261
this.name = name;
262
this._path = _path;
263
this._actions = _actions;
264
265
this.store = key_value_store();
266
const { head, tail } = path_split(getAbsolutePathFromHome(this._path));
267
this._directory = head;
268
this._filename = tail;
269
this._set_state("off");
270
this._execute_code_queue = [];
271
if (_jupyter_kernels[this._path] !== undefined) {
272
// This happens when we change the kernel for a given file, e.g., from python2 to python3.
273
// Obviously, it is important to clean up after the old kernel.
274
_jupyter_kernels[this._path].close();
275
}
276
_jupyter_kernels[this._path] = this;
277
this.setMaxListeners(100);
278
const dbg = this.dbg("constructor");
279
dbg("done");
280
}
281
282
public get_path() {
283
return this._path;
284
}
285
286
// no-op if calling it doesn't change the state.
287
private _set_state(state: string): void {
288
// state = 'off' --> 'spawning' --> 'starting' --> 'running' --> 'closed'
289
if (this._state == state) return;
290
this._state = state;
291
this.emit("state", this._state);
292
this.emit(this._state); // we *SHOULD* use this everywhere, not above.
293
}
294
295
get_state(): string {
296
return this._state;
297
}
298
299
async spawn(spawn_opts?: { env?: { [key: string]: string } }): Promise<void> {
300
if (this._state === "closed") {
301
// game over!
302
throw Error("closed -- kernel spawn");
303
}
304
if (!this.name) {
305
// spawning not allowed.
306
throw Error("cannot spawn since no kernel is set");
307
}
308
if (["running", "starting"].includes(this._state)) {
309
// Already spawned, so no need to do it again.
310
return;
311
}
312
this._set_state("spawning");
313
const dbg = this.dbg("spawn");
314
dbg("spawning kernel...");
315
316
// ****
317
// CRITICAL: anything added to opts better not be specific
318
// to the kernel path or it will completely break using a
319
// pool, which makes things massively slower.
320
// ****
321
322
const opts: LaunchJupyterOpts = {
323
env: spawn_opts?.env ?? {},
324
...(this.ulimit != null ? { ulimit: this.ulimit } : undefined),
325
};
326
327
try {
328
const kernelData = await get_kernel_data_by_name(this.name);
329
// This matches "sage", "sage-x.y", and Sage Python3 ("sage -python -m ipykernel")
330
if (kernelData.argv[0].startsWith("sage")) {
331
dbg("setting special environment for Sage kernels");
332
opts.env = merge(opts.env, SAGE_JUPYTER_ENV);
333
}
334
} catch (err) {
335
dbg(`No kernelData available for ${this.name}`);
336
}
337
338
// Make cocalc default to the colab renderer for cocalc-jupyter, since
339
// this one happens to work best for us, and they don't have a custom
340
// one for us. See https://plot.ly/python/renderers/ and
341
// https://github.com/sagemathinc/cocalc/issues/4259
342
opts.env.PLOTLY_RENDERER = "colab";
343
opts.env.COCALC_JUPYTER_KERNELNAME = this.name;
344
345
// !!! WARNING: do NOT add anything new here that depends on that path!!!!
346
// Otherwise the pool will switch to falling back to not being used, and
347
// cocalc would then be massively slower.
348
// Non-uniform customization.
349
// launchJupyterKernel is explicitly smart enough to deal with opts.cwd
350
if (this._directory) {
351
opts.cwd = this._directory;
352
}
353
// launchJupyterKernel is explicitly smart enough to deal with opts.env.COCALC_JUPYTER_FILENAME
354
opts.env.COCALC_JUPYTER_FILENAME = this._path;
355
// and launchJupyterKernel is NOT smart enough to deal with anything else!
356
357
try {
358
dbg("launching kernel interface...");
359
this._kernel = await launchJupyterKernel(this.name, opts);
360
await this.finish_spawn();
361
} catch (err) {
362
dbg("ERROR spawning kernel", err);
363
if (this._state === "closed") {
364
throw Error("closed -- kernel spawn later");
365
}
366
this._set_state("off");
367
throw err;
368
}
369
370
// NOW we do path-related customizations:
371
// TODO: we will set each of these after getting a kernel from the pool
372
// expose path of jupyter notebook -- https://github.com/sagemathinc/cocalc/issues/5165
373
//opts.env.COCALC_JUPYTER_FILENAME = this._path;
374
// if (this._directory !== "") {
375
// opts.cwd = this._directory;
376
// }
377
}
378
379
get_spawned_kernel() {
380
return this._kernel;
381
}
382
383
public get_connection_file(): string | undefined {
384
return this._kernel?.connectionFile;
385
}
386
387
private finish_spawn = async () => {
388
const dbg = this.dbg("finish_spawn");
389
dbg("now finishing spawn of kernel...");
390
391
if (DEBUG) {
392
this.low_level_dbg();
393
}
394
395
if (!this._kernel) {
396
throw Error("_kernel must be defined");
397
}
398
this._kernel.spawn.on("error", (err) => {
399
const error = `${err}\n${this.stderr}`;
400
dbg("kernel error", error);
401
this.emit("kernel_error", error);
402
this._set_state("off");
403
});
404
405
// Track stderr from the subprocess itself (the kernel).
406
// This is useful for debugging broken kernels, etc., and is especially
407
// useful since it exists even if the kernel sends nothing over any
408
// zmq channels (e.g., due to being very broken).
409
this.stderr = "";
410
this._kernel.spawn.stderr.on("data", (data) => {
411
const s = data.toString();
412
this.stderr += s;
413
if (this.stderr.length > 5000) {
414
// truncate if gets long for some reason -- only the end will
415
// be useful...
416
this.stderr = this.stderr.slice(this.stderr.length - 4000);
417
}
418
});
419
420
this._kernel.spawn.stdout.on("data", (_data) => {
421
// NOTE: it is very important to read stdout (and stderr above)
422
// even if we **totally ignore** the data. Otherwise, execa saves
423
// some amount then just locks up and doesn't allow flushing the
424
// output stream. This is a "nice" feature of execa, since it means
425
// no data gets dropped. See https://github.com/sagemathinc/cocalc/issues/5065
426
});
427
428
dbg("create main channel...", this._kernel.config);
429
430
// This horrible code is becacuse createMainChannel will just "hang
431
// forever" if the kernel doesn't get spawned for some reason.
432
// Thus we do some tests, waiting for at least 2 seconds for there
433
// to be a pid. This is complicated and ugly, and I'm sorry about that,
434
// but sometimes that's life.
435
let i = 0;
436
while (i < 20 && this._state == "spawning" && !this._kernel?.spawn?.pid) {
437
i += 1;
438
await delay(100);
439
}
440
if (this._state != "spawning" || !this._kernel?.spawn?.pid) {
441
if (this._state == "spawning") {
442
this.emit("kernel_error", "Failed to start kernel process.");
443
this._set_state("off");
444
}
445
return;
446
}
447
const local = { success: false, gaveUp: false };
448
setTimeout(() => {
449
if (!local.success) {
450
local.gaveUp = true;
451
// it's been 30s and the channels didn't work. Let's give up.
452
// probably the kernel process just failed.
453
this.emit("kernel_error", "Failed to start kernel process.");
454
this._set_state("off");
455
// We can't "cancel" createMainChannel itself -- that will require
456
// rewriting that dependency.
457
// https://github.com/sagemathinc/cocalc/issues/7040
458
}
459
}, MAX_KERNEL_SPAWN_TIME);
460
const channel = await createMainChannel(
461
this._kernel.config,
462
"",
463
this.identity,
464
);
465
if (local.gaveUp) {
466
return;
467
}
468
this.channel = channel;
469
local.success = true;
470
dbg("created main channel");
471
472
this.channel?.subscribe((mesg) => {
473
switch (mesg.channel) {
474
case "shell":
475
this._set_state("running");
476
this.emit("shell", mesg);
477
break;
478
case "stdin":
479
this.emit("stdin", mesg);
480
break;
481
case "iopub":
482
this._set_state("running");
483
if (mesg.content != null && mesg.content.execution_state != null) {
484
this.emit("execution_state", mesg.content.execution_state);
485
}
486
487
if (mesg.content?.comm_id != null) {
488
// A comm message, which gets handled directly.
489
this.process_comm_message_from_kernel(mesg);
490
break;
491
}
492
493
if (this._actions?.capture_output_message(mesg)) {
494
// captured an output message -- do not process further
495
break;
496
}
497
498
this.emit("iopub", mesg);
499
break;
500
}
501
});
502
503
this._kernel.spawn.on("exit", (exit_code, signal) => {
504
if (this._state === "closed") {
505
return;
506
}
507
this.dbg("kernel_exit")(
508
`spawned kernel terminated with exit code ${exit_code} (signal=${signal}); stderr=${this.stderr}`,
509
);
510
const stderr = this.stderr ? `\n...\n${this.stderr}` : "";
511
if (signal != null) {
512
this.emit(
513
"kernel_error",
514
`Kernel last terminated by signal ${signal}.${stderr}`,
515
);
516
} else if (exit_code != null) {
517
this.emit(
518
"kernel_error",
519
`Kernel last exited with code ${exit_code}.${stderr}`,
520
);
521
}
522
this.close();
523
});
524
525
// so we can start sending code execution to the kernel, etc.
526
this._set_state("starting");
527
};
528
529
// Signal should be a string like "SIGINT", "SIGKILL".
530
// See https://nodejs.org/api/process.html#process_process_kill_pid_signal
531
signal(signal: string): void {
532
const dbg = this.dbg("signal");
533
const spawn = this._kernel != null ? this._kernel.spawn : undefined;
534
const pid = spawn?.pid;
535
dbg(`pid=${pid}, signal=${signal}`);
536
if (pid == null) {
537
return;
538
}
539
try {
540
this.clear_execute_code_queue();
541
process.kill(-pid, signal); // negative to kill the process group
542
} catch (err) {
543
dbg(`error: ${err}`);
544
}
545
}
546
547
// This is async, but the process.kill happens *before*
548
// anything async. That's important for cleaning these
549
// up when the project terminates.
550
async close(): Promise<void> {
551
this.dbg("close")();
552
if (this._state === "closed") {
553
return;
554
}
555
this._set_state("closed");
556
if (this.store != null) {
557
this.store.close();
558
delete this.store;
559
}
560
const kernel = _jupyter_kernels[this._path];
561
if (kernel != null && kernel.identity === this.identity) {
562
delete _jupyter_kernels[this._path];
563
}
564
this.removeAllListeners();
565
if (this._kernel != null) {
566
killKernel(this._kernel);
567
delete this._kernel;
568
delete this.channel;
569
}
570
if (this._execute_code_queue != null) {
571
for (const code_snippet of this._execute_code_queue) {
572
code_snippet.close();
573
}
574
this._execute_code_queue = [];
575
}
576
}
577
578
// public, since we do use it from some other places...
579
dbg(f: string): Function {
580
return (...args) => {
581
//console.log(
582
logger.debug(
583
`jupyter.Kernel('${this.name ?? "no kernel"}',path='${
584
this._path
585
}').${f}`,
586
...args,
587
);
588
};
589
}
590
591
low_level_dbg(): void {
592
const dbg = (...args) => logger.silly("low_level_debug", ...args);
593
dbg("Enabling");
594
if (this._kernel) {
595
this._kernel.spawn.all?.on("data", (data) =>
596
dbg("STDIO", data.toString()),
597
);
598
}
599
// for low level debugging only...
600
this.channel?.subscribe((mesg) => {
601
dbg(mesg);
602
});
603
}
604
605
async ensure_running(): Promise<void> {
606
const dbg = this.dbg("ensure_running");
607
dbg(this._state);
608
if (this._state == "closed") {
609
throw Error("closed so not possible to ensure running");
610
}
611
if (this._state == "running") {
612
return;
613
}
614
dbg("spawning");
615
await this.spawn();
616
if (this._kernel?.initCode != null) {
617
for (const code of this._kernel?.initCode ?? []) {
618
dbg("initCode ", code);
619
await new CodeExecutionEmitter(this, { code }).go();
620
}
621
}
622
if (!this.has_ensured_running) {
623
this.has_ensured_running = true;
624
}
625
}
626
627
execute_code(
628
opts: ExecOpts,
629
skipToFront = false,
630
): CodeExecutionEmitterInterface {
631
if (opts.halt_on_error === undefined) {
632
// if not specified, default to true.
633
opts.halt_on_error = true;
634
}
635
if (this._state === "closed") {
636
throw Error("closed -- kernel -- execute_code");
637
}
638
const code = new CodeExecutionEmitter(this, opts);
639
if (skipToFront) {
640
this._execute_code_queue.unshift(code);
641
} else {
642
this._execute_code_queue.push(code);
643
}
644
if (this._execute_code_queue.length == 1) {
645
// start it going!
646
this._process_execute_code_queue();
647
}
648
return code;
649
}
650
651
cancel_execute(id: string): void {
652
if (this._state === "closed") {
653
return;
654
}
655
const dbg = this.dbg(`cancel_execute(id='${id}')`);
656
if (
657
this._execute_code_queue == null ||
658
this._execute_code_queue.length === 0
659
) {
660
dbg("nothing to do");
661
return;
662
}
663
if (this._execute_code_queue.length > 1) {
664
dbg(
665
"mutate this._execute_code_queue removing everything with the given id",
666
);
667
for (let i = this._execute_code_queue.length - 1; i--; i >= 1) {
668
const code = this._execute_code_queue[i];
669
if (code.id === id) {
670
dbg(`removing entry ${i} from queue`);
671
this._execute_code_queue.splice(i, 1);
672
code.cancel();
673
}
674
}
675
}
676
// if the currently running computation involves this id, send an
677
// interrupt signal (that's the best we can do)
678
if (this._execute_code_queue[0].id === id) {
679
dbg("interrupting running computation");
680
this.signal("SIGINT");
681
}
682
}
683
684
async _process_execute_code_queue(): Promise<void> {
685
const dbg = this.dbg("_process_execute_code_queue");
686
dbg(`state='${this._state}'`);
687
if (this._state === "closed") {
688
dbg("closed");
689
return;
690
}
691
if (this._execute_code_queue == null) {
692
dbg("no queue");
693
return;
694
}
695
const n = this._execute_code_queue.length;
696
if (n === 0) {
697
dbg("queue is empty");
698
return;
699
}
700
dbg(
701
`queue has ${n} items; ensure kernel running`,
702
this._execute_code_queue,
703
);
704
try {
705
await this.ensure_running();
706
this._execute_code_queue[0].go();
707
} catch (err) {
708
dbg(`error running kernel -- ${err}`);
709
for (const code of this._execute_code_queue) {
710
code.throw_error(err);
711
}
712
this._execute_code_queue = [];
713
}
714
}
715
716
public clear_execute_code_queue(): void {
717
const dbg = this.dbg("_clear_execute_code_queue");
718
// ensure no future queued up evaluation occurs (currently running
719
// one will complete and new executions could happen)
720
if (this._state === "closed") {
721
dbg("no op since state is closed");
722
return;
723
}
724
if (this._execute_code_queue == null) {
725
dbg("nothing to do since queue is null");
726
return;
727
}
728
dbg(`clearing queue of size ${this._execute_code_queue.length}`);
729
const mesg = { done: true };
730
for (const code_execution_emitter of this._execute_code_queue.slice(1)) {
731
code_execution_emitter.emit_output(mesg);
732
code_execution_emitter.close();
733
}
734
this._execute_code_queue = [];
735
}
736
737
// This is like execute_code, but async and returns all the results,
738
// and does not use the internal execution queue.
739
// This is used for unit testing and interactive work at the terminal and nbgrader and the stateless api.
740
async execute_code_now(opts: ExecOpts): Promise<object[]> {
741
this.dbg("execute_code_now")();
742
if (this._state === "closed") {
743
throw Error("closed -- kernel -- execute_code_now");
744
}
745
if (opts.halt_on_error === undefined) {
746
// if not specified, default to true.
747
opts.halt_on_error = true;
748
}
749
await this.ensure_running();
750
return await new CodeExecutionEmitter(this, opts).go();
751
}
752
753
process_output(content: any): void {
754
if (this._state === "closed") {
755
return;
756
}
757
const dbg = this.dbg("process_output");
758
if (content.data == null) {
759
// No data -- https://github.com/sagemathinc/cocalc/issues/6665
760
// NO do not do this sort of thing. This is exactly the sort of situation where
761
// content could be very large, and JSON.stringify could use huge amounts of memory.
762
// If you need to see this for debugging, uncomment it.
763
// dbg(trunc(JSON.stringify(content), 300));
764
// todo: FOR now -- later may remove large stdout, stderr, etc...
765
// dbg("no data, so nothing to do");
766
return;
767
}
768
769
remove_redundant_reps(content.data);
770
771
let saveToBlobStore;
772
try {
773
const blob_store = get_blob_store_sync();
774
saveToBlobStore = (
775
data: string,
776
type: string,
777
ipynb?: string,
778
): string => {
779
const sha1 = blob_store.save(data, type, ipynb);
780
if (this._actions?.is_compute_server) {
781
this._actions?.saveBlobToProject(data, type, ipynb);
782
}
783
return sha1;
784
};
785
} catch (err) {
786
dbg(`WARNING: Jupyter blob store is not available -- ${err}`);
787
// there is nothing to process without the blob store to save
788
// data in!
789
return;
790
}
791
792
let type: string;
793
for (type of JUPYTER_MIMETYPES) {
794
if (content.data[type] == null) {
795
continue;
796
}
797
if (type.split("/")[0] === "image" || type === "application/pdf") {
798
// Store all images and PDF in the blob store:
799
content.data[type] = saveToBlobStore(content.data[type], type);
800
} else if (type === "text/html" && is_likely_iframe(content.data[type])) {
801
// Likely iframe, so we treat it as such. This is very important, e.g.,
802
// because of Sage's iframe 3d graphics. We parse
803
// and remove these and serve them from the backend.
804
// {iframe: sha1 of srcdoc}
805
content.data["iframe"] = iframe_process(
806
content.data[type],
807
saveToBlobStore,
808
);
809
delete content.data[type];
810
}
811
}
812
}
813
814
async call(msg_type: string, content?: any): Promise<any> {
815
this.dbg("call")(msg_type);
816
if (!this.has_ensured_running) {
817
await this.ensure_running();
818
}
819
// Do a paranoid double check anyways...
820
if (this.channel == null || this._state == "closed") {
821
throw Error("not running, so can't call");
822
}
823
824
const message = {
825
parent_header: {},
826
metadata: {},
827
channel: "shell",
828
content,
829
header: {
830
msg_id: uuid(),
831
username: "",
832
session: "",
833
msg_type: msg_type as MessageType,
834
version: VERSION,
835
date: new Date().toISOString(),
836
},
837
};
838
839
// Send the message
840
this.channel?.next(message);
841
842
// Wait for the response that has the right msg_id.
843
let the_mesg: any = undefined;
844
const wait_for_response = (cb) => {
845
const f = (mesg) => {
846
if (mesg.parent_header.msg_id === message.header.msg_id) {
847
this.removeListener("shell", f);
848
this.removeListener("closed", g);
849
mesg = deep_copy(mesg.content);
850
if (len(mesg.metadata) === 0) {
851
delete mesg.metadata;
852
}
853
the_mesg = mesg;
854
cb();
855
}
856
};
857
const g = () => {
858
this.removeListener("shell", f);
859
this.removeListener("closed", g);
860
cb("closed - jupyter - kernel - call");
861
};
862
this.on("shell", f);
863
this.on("closed", g);
864
};
865
await callback(wait_for_response);
866
return the_mesg;
867
}
868
869
async complete(opts: { code: any; cursor_pos: any }): Promise<any> {
870
const dbg = this.dbg("complete");
871
dbg(`code='${opts.code}', cursor_pos='${opts.cursor_pos}'`);
872
return await this.call("complete_request", opts);
873
}
874
875
async introspect(opts: {
876
code: any;
877
cursor_pos: any;
878
detail_level: any;
879
}): Promise<any> {
880
const dbg = this.dbg("introspect");
881
dbg(
882
`code='${opts.code}', cursor_pos='${opts.cursor_pos}', detail_level=${opts.detail_level}`,
883
);
884
return await this.call("inspect_request", opts);
885
}
886
887
async kernel_info(): Promise<KernelInfo> {
888
if (this._kernel_info !== undefined) {
889
return this._kernel_info;
890
}
891
const info = await this.call("kernel_info_request");
892
info.nodejs_version = process.version;
893
if (this._actions != null) {
894
info.start_time = this._actions.store.get("start_time");
895
}
896
this._kernel_info = info;
897
return info;
898
}
899
900
async save_ipynb_file(): Promise<void> {
901
if (this._actions != null) {
902
await this._actions.save_ipynb_file();
903
} else {
904
throw Error("save_ipynb_file -- ERROR: actions not known");
905
}
906
}
907
908
more_output(id: string): any[] {
909
if (id == null) {
910
throw new Error("must specify id");
911
}
912
if (this._actions == null) {
913
throw new Error("must have redux actions");
914
}
915
return this._actions.store.get_more_output(id) || [];
916
}
917
918
async nbconvert(args: string[], timeout?: number): Promise<void> {
919
if (timeout === undefined) {
920
timeout = 60; // seconds
921
}
922
if (!is_array(args)) {
923
throw new Error("args must be an array");
924
}
925
args = copy(args);
926
args.push("--");
927
args.push(this._filename);
928
await nbconvert({
929
args,
930
timeout,
931
directory: this._directory,
932
});
933
}
934
935
// TODO: double check that this actually returns sha1
936
async load_attachment(path: string): Promise<string> {
937
const dbg = this.dbg("load_attachment");
938
dbg(`path='${path}'`);
939
if (path[0] !== "/") {
940
path = process.env.HOME + "/" + path;
941
}
942
async function f(): Promise<string> {
943
const bs = get_blob_store_sync();
944
if (bs == null) throw new Error("BlobStore not available");
945
return bs.readFile(path, "base64");
946
}
947
try {
948
return await retry_until_success({
949
f: f,
950
max_time: 30000,
951
});
952
} catch (err) {
953
unlink(path); // TODO: think through again if this is the right thing to do.
954
throw err;
955
}
956
}
957
958
// This is called by project-actions when exporting the notebook
959
// to an ipynb file:
960
get_blob_store() {
961
return get_blob_store_sync();
962
}
963
964
process_attachment(base64, mime): string | undefined {
965
const blob_store = get_blob_store_sync();
966
return blob_store?.save(base64, mime);
967
}
968
969
process_comm_message_from_kernel(mesg): void {
970
if (this._actions == null) {
971
return;
972
}
973
const dbg = this.dbg("process_comm_message_from_kernel");
974
// This can be HUGE so don't print out the entire message; e.g., it could contain
975
// massive binary data!
976
dbg(mesg.header);
977
this._actions.process_comm_message_from_kernel(mesg);
978
}
979
980
public ipywidgetsGetBuffer(
981
model_id: string,
982
// buffer_path is the string[] *or* the JSON of that.
983
buffer_path: string | string[],
984
): Buffer | undefined {
985
if (typeof buffer_path != "string") {
986
buffer_path = JSON.stringify(buffer_path);
987
}
988
return this._actions?.syncdb.ipywidgets_state?.getBuffer(
989
model_id,
990
buffer_path,
991
);
992
}
993
994
public send_comm_message_to_kernel({
995
msg_id,
996
comm_id,
997
target_name,
998
data,
999
buffers64,
1000
buffers,
1001
}: {
1002
msg_id: string;
1003
comm_id: string;
1004
target_name: string;
1005
data: any;
1006
buffers64?: string[];
1007
buffers?: Buffer[];
1008
}): void {
1009
const dbg = this.dbg("send_comm_message_to_kernel");
1010
dbg({ msg_id, comm_id, target_name, data, buffers64 });
1011
if (buffers64 != null && buffers64.length > 0) {
1012
buffers = buffers64?.map((x) => Buffer.from(base64ToBuffer(x))) ?? [];
1013
dbg(
1014
"buffers lengths = ",
1015
buffers.map((x) => x.byteLength),
1016
);
1017
if (this._actions?.syncdb.ipywidgets_state != null) {
1018
this._actions.syncdb.ipywidgets_state.setModelBuffers(
1019
comm_id,
1020
data.buffer_paths,
1021
buffers,
1022
false,
1023
);
1024
}
1025
}
1026
1027
const message = {
1028
parent_header: {},
1029
metadata: {},
1030
channel: "shell",
1031
content: { comm_id, target_name, data },
1032
header: {
1033
msg_id,
1034
username: "user",
1035
session: "",
1036
msg_type: "comm_msg" as MessageType,
1037
version: VERSION,
1038
date: new Date().toISOString(),
1039
},
1040
buffers,
1041
};
1042
1043
dbg(message);
1044
// "The Kernel listens for these messages on the Shell channel,
1045
// and the Frontend listens for them on the IOPub channel." -- docs
1046
this.channel?.next(message);
1047
}
1048
1049
async chdir(path: string): Promise<void> {
1050
if (!this.name) return; // no kernel, no current directory
1051
const dbg = this.dbg("chdir");
1052
dbg({ path });
1053
let lang;
1054
try {
1055
// using probably cached data, so likely very fast
1056
lang = await getLanguage(this.name);
1057
} catch (err) {
1058
dbg("WARNING ", err);
1059
const info = await this.kernel_info();
1060
lang = info.language_info?.name ?? "";
1061
}
1062
1063
const absPath = getAbsolutePathFromHome(path);
1064
const code = createChdirCommand(lang, absPath);
1065
if (code) {
1066
// returns '' if no command needed, e.g., for sparql.
1067
await this.execute_code_now({ code });
1068
}
1069
}
1070
}
1071
1072
export function get_existing_kernel(path: string): JupyterKernel | undefined {
1073
return _jupyter_kernels[path];
1074
}
1075
1076
export function get_kernel_by_pid(pid: number): JupyterKernel | undefined {
1077
for (const kernel of Object.values(_jupyter_kernels)) {
1078
if (kernel.get_spawned_kernel()?.spawn.pid === pid) {
1079
return kernel;
1080
}
1081
}
1082
return;
1083
}
1084
1085