CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/jupyter/execute/execute-code.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Send code to a kernel to be evaluated, then wait for
8
the results and gather them together.
9
10
TODO: for easy testing/debugging, at an "async run() : Messages[]" method.
11
*/
12
13
import { callback, delay } from "awaiting";
14
import { EventEmitter } from "events";
15
import { VERSION } from "@cocalc/jupyter/kernel/version";
16
import type { JupyterKernelInterface as JupyterKernel } from "@cocalc/jupyter/types/project-interface";
17
import type { MessageType } from "@nteract/messaging";
18
import { bind_methods, copy_with, deep_copy, uuid } from "@cocalc/util/misc";
19
import {
20
CodeExecutionEmitterInterface,
21
ExecOpts,
22
StdinFunction,
23
} from "@cocalc/jupyter/types/project-interface";
24
import { getLogger } from "@cocalc/backend/logger";
25
26
const log = getLogger("jupyter:execute-code");
27
28
type State = "init" | "closed" | "running";
29
30
export class CodeExecutionEmitter
31
extends EventEmitter
32
implements CodeExecutionEmitterInterface
33
{
34
readonly kernel: JupyterKernel;
35
readonly code: string;
36
readonly id?: string;
37
readonly stdin?: StdinFunction;
38
readonly halt_on_error: boolean;
39
// DO NOT set iopub_done or shell_done directly; instead
40
// set them using the function set_shell_done and set_iopub_done.
41
// This ensures that we call _finish when both vars have been set.
42
private iopub_done: boolean = false;
43
private shell_done: boolean = false;
44
private state: State = "init";
45
private all_output: object[] = [];
46
private _message: any;
47
private _go_cb: Function | undefined = undefined;
48
private timeout_ms?: number;
49
private timer?: any;
50
private killing: string = "";
51
52
constructor(kernel: JupyterKernel, opts: ExecOpts) {
53
super();
54
this.kernel = kernel;
55
this.code = opts.code;
56
this.id = opts.id;
57
this.stdin = opts.stdin;
58
this.halt_on_error = !!opts.halt_on_error;
59
this.timeout_ms = opts.timeout_ms;
60
this._message = {
61
parent_header: {},
62
metadata: {},
63
channel: "shell",
64
header: {
65
msg_id: `execute_${uuid()}`,
66
username: "",
67
session: "",
68
msg_type: "execute_request" as MessageType,
69
version: VERSION,
70
date: new Date().toISOString(),
71
},
72
content: {
73
code: this.code,
74
silent: false,
75
store_history: true, // so execution_count is updated.
76
user_expressions: {},
77
allow_stdin: this.stdin != null,
78
},
79
};
80
81
bind_methods(this);
82
}
83
84
// Emits a valid result
85
// result is https://jupyter-client.readthedocs.io/en/stable/messaging.html#python-api
86
// Or an array of those when this.all is true
87
emit_output(output: object): void {
88
this.all_output.push(output);
89
this.emit("output", output);
90
}
91
92
// Call this to inform anybody listening that we've canceled
93
// this execution, and will NOT be doing it ever, and it
94
// was explicitly canceled.
95
cancel(): void {
96
this.emit("canceled");
97
}
98
99
close(): void {
100
if (this.state == "closed") return;
101
if (this.timer != null) {
102
clearTimeout(this.timer);
103
delete this.timer;
104
}
105
this.state = "closed";
106
this.emit("closed");
107
this.removeAllListeners();
108
}
109
110
throw_error(err): void {
111
this.emit("error", err);
112
this.close();
113
}
114
115
async _handle_stdin(mesg: any): Promise<void> {
116
if (!this.stdin) {
117
throw Error("BUG -- stdin handling not supported");
118
}
119
log.silly("_handle_stdin: STDIN kernel --> server: ", mesg);
120
if (mesg.parent_header.msg_id !== this._message.header.msg_id) {
121
log.warn(
122
"_handle_stdin: STDIN msg_id mismatch:",
123
mesg.parent_header.msg_id,
124
this._message.header.msg_id
125
);
126
return;
127
}
128
129
let response;
130
try {
131
response = await this.stdin(
132
mesg.content.prompt ? mesg.content.prompt : "",
133
!!mesg.content.password
134
);
135
} catch (err) {
136
response = `ERROR -- ${err}`;
137
}
138
log.silly("_handle_stdin: STDIN client --> server", response);
139
const m = {
140
channel: "stdin",
141
parent_header: this._message.header,
142
metadata: {},
143
header: {
144
msg_id: uuid(), // this._message.header.msg_id
145
username: "",
146
session: "",
147
msg_type: "input_reply" as MessageType,
148
version: VERSION,
149
date: new Date().toISOString(),
150
},
151
content: {
152
value: response,
153
},
154
};
155
log.silly("_handle_stdin: STDIN server --> kernel:", m);
156
this.kernel.channel?.next(m);
157
}
158
159
_handle_shell(mesg: any): void {
160
if (mesg.parent_header.msg_id !== this._message.header.msg_id) {
161
log.silly(
162
`_handle_shell: msg_id mismatch: ${mesg.parent_header.msg_id} != ${this._message.header.msg_id}`
163
);
164
return;
165
}
166
log.silly("_handle_shell: got SHELL message -- ", mesg);
167
168
if (mesg.content?.status == "ok") {
169
this._push_mesg(mesg);
170
this.set_shell_done(true);
171
} else {
172
log.warn(`_handle_shell: status != ok: ${mesg.content?.status}`);
173
// NOTE: I'm adding support for "abort" status, since I was just reading
174
// the kernel docs and it exists but is deprecated. Some old kernels
175
// might use it and we should thus properly support it:
176
// https://jupyter-client.readthedocs.io/en/stable/messaging.html#request-reply
177
//
178
// 2023-05-11: this was conditional on mesg.content?.status == "error" or == "abort"
179
// but in reality, there was also "aborted". Hence this as an catch-all else.
180
if (this.halt_on_error) {
181
this.kernel.clear_execute_code_queue();
182
}
183
this.set_shell_done(true);
184
}
185
}
186
187
private set_shell_done(value: boolean): void {
188
this.shell_done = value;
189
if (this.iopub_done && this.shell_done) {
190
this._finish();
191
}
192
}
193
194
private set_iopub_done(value: boolean): void {
195
this.iopub_done = value;
196
if (this.iopub_done && this.shell_done) {
197
this._finish();
198
}
199
}
200
201
_handle_iopub(mesg: any): void {
202
if (mesg.parent_header.msg_id !== this._message.header.msg_id) {
203
// iopub message for a different execute request so ignore it.
204
return;
205
}
206
log.silly("_handle_iopub: got IOPUB message -- ", mesg);
207
208
if (mesg.content?.comm_id != null) {
209
// A comm message that is a result of execution of this code.
210
// IGNORE here -- all comm messages are handles at a higher
211
// level in jupyter.ts. Also, this case should never happen, since
212
// we do not emit an event from jupyter.ts in this case anyways.
213
} else {
214
// A normal output message.
215
this._push_mesg(mesg);
216
}
217
218
this.set_iopub_done(
219
!!this.killing || mesg.content?.execution_state == "idle"
220
);
221
}
222
223
// Called if the kernel is closed for some reason, e.g., crashing.
224
private handle_closed(): void {
225
log.debug("CodeExecutionEmitter.handle_closed: kernel closed");
226
this.killing = "kernel crashed";
227
this._finish();
228
}
229
230
_finish(): void {
231
if (this.state == "closed") {
232
return;
233
}
234
this.kernel.removeListener("iopub", this._handle_iopub);
235
if (this.stdin != null) {
236
this.kernel.removeListener("stdin", this._handle_stdin);
237
}
238
this.kernel.removeListener("shell", this._handle_shell);
239
if (this.kernel._execute_code_queue != null) {
240
this.kernel._execute_code_queue.shift(); // finished
241
this.kernel._process_execute_code_queue(); // start next exec
242
}
243
this.kernel.removeListener("close", this.handle_closed);
244
this._push_mesg({ done: true });
245
this.close();
246
247
// Finally call the callback that was setup in this._go.
248
// This is what makes it possible to await on the entire
249
// execution. Also it is important to explicitly
250
// signal an error if we had to kill execution due
251
// to hitting a timeout, since the kernel may or may
252
// not have randomly done so itself in output.
253
this._go_cb?.(this.killing);
254
this._go_cb = undefined;
255
}
256
257
_push_mesg(mesg): void {
258
// TODO: mesg isn't a normal javascript object;
259
// it's **silently** immutable, which
260
// is pretty annoying for our use. For now, we
261
// just copy it, which is a waste.
262
const header = mesg.header;
263
mesg = copy_with(mesg, ["metadata", "content", "buffers", "done"]);
264
mesg = deep_copy(mesg);
265
if (header !== undefined) {
266
mesg.msg_type = header.msg_type;
267
}
268
this.emit_output(mesg);
269
}
270
271
async go(): Promise<object[]> {
272
await callback(this._go);
273
return this.all_output;
274
}
275
276
_go(cb: Function): void {
277
if (this.state != "init") {
278
cb("may only run once");
279
return;
280
}
281
this.state = "running";
282
log.silly("_execute_code", this.code);
283
if (this.kernel.get_state() === "closed") {
284
log.silly("_execute_code", "kernel.get_state() is closed");
285
this.close();
286
cb("closed - jupyter - execute_code");
287
return;
288
}
289
290
this._go_cb = cb; // this._finish will call this.
291
292
if (this.stdin != null) {
293
this.kernel.on("stdin", this._handle_stdin);
294
}
295
this.kernel.on("shell", this._handle_shell);
296
this.kernel.on("iopub", this._handle_iopub);
297
298
log.debug("_execute_code: send the message to get things rolling");
299
this.kernel.channel?.next(this._message);
300
301
this.kernel.on("closed", this.handle_closed);
302
303
if (this.timeout_ms) {
304
// setup a timeout at which point things will get killed if they don't finish
305
this.timer = setTimeout(this.timeout, this.timeout_ms);
306
}
307
}
308
309
private async timeout(): Promise<void> {
310
if (this.state == "closed") {
311
log.debug(
312
"CodeExecutionEmitter.timeout: already finished, so nothing to worry about"
313
);
314
return;
315
}
316
this.killing =
317
"Timeout Error: execution time limit = " +
318
Math.round((this.timeout_ms ?? 0) / 1000) +
319
" seconds";
320
let tries = 3;
321
let d = 1000;
322
while (this.state != ("closed" as State) && tries > 0) {
323
log.debug(
324
"CodeExecutionEmitter.timeout: code still running, so try to interrupt it"
325
);
326
// Code still running but timeout reached.
327
// Keep sending interrupt signal, which will hopefully do something to
328
// stop running code (there is no guarantee, of course). We
329
// try a few times...
330
this.kernel.signal("SIGINT");
331
await delay(d);
332
d *= 1.3;
333
tries -= 1;
334
}
335
if (this.state != ("closed" as State)) {
336
log.debug(
337
"CodeExecutionEmitter.timeout: now try SIGKILL, which should kill things for sure."
338
);
339
this.kernel.signal("SIGKILL");
340
this._finish();
341
}
342
}
343
}
344
345