Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/jupyter/execute/output-handler.ts
5787 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Class that handles output messages generated for evaluation of code
8
for a particular cell.
9
10
WARNING: For efficiency reasons (involving syncdb patch sizes),
11
outputs is a map from the (string representations of) the numbers
12
from 0 to n-1, where there are n messages. So watch out.
13
14
OutputHandler emits these events:
15
16
- 'change' -- (save), called when we change cell; if save=true, recommend
17
broadcasting this change to other users ASAP.
18
19
- 'done' -- emited once when finished; after this, everything is cleaned up
20
21
- 'more_output' -- If we exceed the message limit, emit more_output
22
(mesg, mesg_length) with extra messages.
23
24
- 'process' -- Gets called on any incoming message; it may
25
**mutate** the message, e.g., removing images uses this.
26
27
*/
28
29
import { callback } from "awaiting";
30
import { EventEmitter } from "events";
31
import {
32
close,
33
defaults,
34
required,
35
server_time,
36
len,
37
to_json,
38
is_object,
39
} from "@cocalc/util/misc";
40
41
const now = () => server_time().valueOf() - 0;
42
43
const MIN_SAVE_INTERVAL_MS = 500;
44
const MAX_SAVE_INTERVAL_MS = 45000;
45
46
export class OutputHandler extends EventEmitter {
47
private _opts: any;
48
private _n: number;
49
private _clear_before_next_output: boolean;
50
private _output_length: number;
51
private _in_more_output_mode: any;
52
private _state: any;
53
private _stdin_cb: any;
54
55
// Never commit output to send to the frontend more frequently than this.saveIntervalMs
56
// Otherwise, we'll end up with a large number of patches.
57
// We start out with MIN_SAVE_INTERVAL_MS and exponentially back it off to
58
// MAX_SAVE_INTERVAL_MS.
59
private lastSave: number = 0;
60
private saveIntervalMs = MIN_SAVE_INTERVAL_MS;
61
62
constructor(opts: any) {
63
super();
64
this._opts = defaults(opts, {
65
cell: required, // object; the cell whose output (etc.) will get mutated
66
// If given, used to truncate, discard output messages; extra
67
// messages are saved and made available.
68
max_output_length: undefined,
69
max_output_messages: undefined,
70
report_started_ms: undefined, // If no messages for this many ms, then we update via set to indicate
71
// that cell is being run.
72
dbg: undefined,
73
});
74
const { cell } = this._opts;
75
cell.output = null;
76
cell.exec_count = null;
77
cell.state = "run";
78
cell.start = null;
79
cell.end = null;
80
// Internal state
81
this._n = 0;
82
this._clear_before_next_output = false;
83
this._output_length = 0;
84
this._in_more_output_mode = false;
85
this._state = "ready";
86
// Report that computation started if there is no output soon.
87
if (this._opts.report_started_ms != null) {
88
setTimeout(this._report_started, this._opts.report_started_ms);
89
}
90
91
this.stdin = this.stdin.bind(this);
92
}
93
94
close = (): void => {
95
if (this._state == "closed") return;
96
this._state = "closed";
97
this.emit("done");
98
this.removeAllListeners();
99
close(this, new Set(["_state", "close"]));
100
};
101
102
_clear_output = (save?: any): void => {
103
if (this._state === "closed") {
104
return;
105
}
106
this._clear_before_next_output = false;
107
// clear output message -- we delete all the outputs
108
// reset the counter n, save, and are done.
109
// IMPORTANT: In Jupyter the clear_output message and everything
110
// before it is NOT saved in the notebook output itself
111
// (like in Sage worksheets).
112
this._opts.cell.output = null;
113
this._n = 0;
114
this._output_length = 0;
115
this.emit("change", save);
116
};
117
118
_report_started = (): void => {
119
if (this._state == "closed" || this._n > 0) {
120
// do nothing -- already getting output or done.
121
return;
122
}
123
this.emit("change", true);
124
};
125
126
// Call when computation starts
127
start = () => {
128
if (this._state === "closed") {
129
return;
130
}
131
this._opts.cell.start = (new Date() as any) - 0;
132
this._opts.cell.state = "busy";
133
this.emit("change", true);
134
};
135
136
// Call error if an error occurs. An appropriate error message is generated.
137
// Computation is considered done.
138
error = (err: any): void => {
139
if (err === "closed") {
140
// See https://github.com/sagemathinc/cocalc/issues/2388
141
this.message({
142
data: {
143
"text/markdown":
144
"<font color='red'>**Jupyter Kernel terminated:**</font> This might be caused by running out of memory or hitting a bug in some library (e.g., forking too many processes, trying to access invalid memory, etc.). Consider restarting or upgrading your project or running the relevant code directly in a terminal to track down the cause, as [explained here](https://github.com/sagemathinc/cocalc/wiki/KernelTerminated).",
145
},
146
});
147
} else {
148
this.message({
149
text: `${err}`,
150
name: "stderr",
151
});
152
}
153
this.done();
154
};
155
156
// Call done exactly once when done
157
done = (): void => {
158
if (this._state === "closed") {
159
return;
160
}
161
this._opts.cell.state = "done";
162
if (this._opts.cell.start == null) {
163
this._opts.cell.start = now();
164
}
165
this._opts.cell.end = now();
166
this.emit("change", true);
167
this.close();
168
};
169
170
// Handle clear
171
clear = (wait: any): void => {
172
if (wait) {
173
// wait until next output before clearing.
174
this._clear_before_next_output = true;
175
return;
176
}
177
this._clear_output();
178
};
179
180
_clean_mesg = (mesg: any): void => {
181
delete mesg.execution_state;
182
delete mesg.code;
183
delete mesg.status;
184
delete mesg.source;
185
// Colab sends non-standard messages like {"request":{"delayMillis":500}}
186
// Let's ignore them https://github.com/sagemathinc/cocalc/issues/8460
187
delete mesg.request;
188
for (const k in mesg) {
189
const v = mesg[k];
190
if (is_object(v) && len(v) === 0) {
191
delete mesg[k];
192
}
193
}
194
};
195
196
private _push_mesg = (mesg: any, save?: boolean): void => {
197
if (this._state === "closed") {
198
return;
199
}
200
201
if (save == null) {
202
const n = now();
203
if (n - this.lastSave > this.saveIntervalMs) {
204
save = true;
205
this.lastSave = n;
206
this.saveIntervalMs = Math.min(
207
MAX_SAVE_INTERVAL_MS,
208
this.saveIntervalMs * 1.1,
209
);
210
}
211
} else if (save == true) {
212
this.lastSave = now();
213
}
214
215
if (this._opts.cell.output === null) {
216
this._opts.cell.output = {};
217
}
218
this._opts.cell.output[`${this._n}`] = mesg;
219
this._n += 1;
220
this.emit("change", save);
221
};
222
223
set_input = (input: any, save = true): void => {
224
if (this._state === "closed") {
225
return;
226
}
227
this._opts.cell.input = input;
228
this.emit("change", save);
229
};
230
231
// Process incoming messages. This may mutate mesg.
232
message = (mesg: any): void => {
233
let has_exec_count: any;
234
if (this._state === "closed") {
235
return;
236
}
237
238
if (this._opts.cell.end) {
239
// ignore any messages once we're done.
240
return;
241
}
242
243
// record execution_count, if there.
244
if (mesg.execution_count != null) {
245
has_exec_count = true;
246
this._opts.cell.exec_count = mesg.execution_count;
247
delete mesg.execution_count;
248
} else {
249
has_exec_count = false;
250
}
251
252
// delete useless fields
253
this._clean_mesg(mesg);
254
255
if (len(mesg) === 0) {
256
// don't even bother saving this message; nothing useful here.
257
return;
258
}
259
260
if (has_exec_count) {
261
// message that has an execution count
262
mesg.exec_count = this._opts.cell.exec_count;
263
}
264
265
// hook to process message (e.g., this may mutate mesg,
266
// e.g., to remove big images)
267
this.emit("process", mesg);
268
269
if (this._clear_before_next_output) {
270
this._clear_output(false);
271
}
272
273
const s = JSON.stringify(mesg);
274
const mesg_length = s.length;
275
276
if (this._in_more_output_mode) {
277
this.emit("more_output", mesg, mesg_length);
278
return;
279
}
280
281
// check if limits exceeded:
282
283
this._output_length += mesg_length;
284
285
const notTooLong =
286
this._opts.max_output_length == null ||
287
this._output_length <= this._opts.max_output_length;
288
const notTooMany =
289
this._opts.max_output_messages == null ||
290
this._n < this._opts.max_output_messages;
291
292
if (notTooLong && notTooMany) {
293
// limits NOT exceeded
294
this._push_mesg(mesg);
295
return;
296
}
297
298
// Switch to too much output mode:
299
this._push_mesg({ more_output: true });
300
this._in_more_output_mode = true;
301
this.emit("more_output", mesg, mesg_length);
302
};
303
304
async stdin(prompt: string, password: boolean): Promise<string> {
305
// See docs for stdin option to execute_code in backend jupyter.coffee
306
this._push_mesg({ name: "input", opts: { prompt, password } });
307
// Now we wait until the output message we just included has its
308
// value set. Then we call cb with that value.
309
// This weird thing below sets this._stdin_cb, then
310
// waits for this._stdin_cb to be called, which happens
311
// when cell_changed gets called.
312
return await callback((cb) => (this._stdin_cb = cb));
313
}
314
315
// Call this when the cell changes; only used for stdin right now.
316
cell_changed = (cell: any, get_password: any): void => {
317
if (this._state === "closed") {
318
return;
319
}
320
if (this._stdin_cb == null) {
321
return;
322
}
323
const output = cell != null ? cell.get("output") : undefined;
324
if (output == null) {
325
return;
326
}
327
const value = output.getIn([`${output.size - 1}`, "value"]);
328
if (value != null) {
329
let x = value;
330
if (this._opts.cell.output) {
331
const n = `${len(this._opts.cell.output) - 1}`;
332
if (
333
get_password != null &&
334
this._opts.cell.output[n] &&
335
this._opts.cell.output[n].opts != null &&
336
this._opts.cell.output[n].opts.password
337
) {
338
// In case of a password, the value is NEVER placed in the document.
339
// Instead the value is submitted to the backend via https, with
340
// a random identifier put in the value.
341
x = get_password(); // get actual password
342
}
343
if (this._opts.cell.output[`${n}`] != null) {
344
this._opts.cell.output[`${n}`].value = value;
345
} // sync output-handler view of output with syncdb
346
}
347
this._stdin_cb(undefined, x);
348
delete this._stdin_cb;
349
}
350
};
351
352
payload = (payload: any): void => {
353
if (this._state === "closed") {
354
return;
355
}
356
if (payload.source === "set_next_input") {
357
this.set_input(payload.text);
358
} else if (payload.source === "page") {
359
// Just handle as a normal message; and we don't show in the pager,
360
// which doesn't make sense for multiple users.
361
// This happens when requesting help for r:
362
// https://github.com/sagemathinc/cocalc/issues/1933
363
this.message(payload);
364
} else {
365
// No idea what to do with this...
366
if (typeof this._opts.dbg === "function") {
367
this._opts.dbg(`Unknown PAYLOAD: ${to_json(payload)}`);
368
}
369
}
370
};
371
}
372
373