CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/jupyter/execute/output-handler.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Class that handles output messages generated for evaluation of code
8
for a particular cell.
9
10
WARNING: For efficiency reasons (involving syncdb patch sizes),
11
outputs is a map from the (string representations of) the numbers
12
from 0 to n-1, where there are n messages. So watch out.
13
14
OutputHandler emits two events:
15
16
- 'change' -- (save), called when we change cell; if save=true, recommend
17
broadcasting this change to other users ASAP.
18
19
- 'done' -- emited once when finished; after this, everything is cleaned up
20
21
- 'more_output' -- If we exceed the message limit, emit more_output (mesg, mesg_length)
22
with extra messages.
23
24
- 'process' -- Gets called on any incoming message; it may
25
**mutate** the message, e.g., removing images uses this.
26
27
*/
28
29
import { callback } from "awaiting";
30
import { EventEmitter } from "events";
31
import {
32
close,
33
defaults,
34
required,
35
server_time,
36
len,
37
to_json,
38
is_object,
39
} from "@cocalc/util/misc";
40
41
const now = () => server_time().valueOf() - 0;
42
43
const MIN_SAVE_INTERVAL_MS = 500;
44
const MAX_SAVE_INTERVAL_MS = 45000;
45
46
export class OutputHandler extends EventEmitter {
47
private _opts: any;
48
private _n: number;
49
private _clear_before_next_output: boolean;
50
private _output_length: number;
51
private _in_more_output_mode: any;
52
private _state: any;
53
private _stdin_cb: any;
54
55
// Never commit output to send to the frontend more frequently than this.saveIntervalMs
56
// Otherwise, we'll end up with a large number of patches.
57
// We start out with MIN_SAVE_INTERVAL_MS and exponentially back it off to
58
// MAX_SAVE_INTERVAL_MS.
59
private lastSave: number = 0;
60
private saveIntervalMs = MIN_SAVE_INTERVAL_MS;
61
62
constructor(opts: any) {
63
super();
64
this._opts = defaults(opts, {
65
cell: required, // object; the cell whose output (etc.) will get mutated
66
max_output_length: undefined, // If given, used to truncate, discard output messages; extra
67
// messages are saved and made available.
68
report_started_ms: undefined, // If no messages for this many ms, then we update via set to indicate
69
// that cell is being run.
70
dbg: undefined,
71
});
72
const { cell } = this._opts;
73
cell.output = null;
74
cell.exec_count = null;
75
cell.state = "run";
76
cell.start = null;
77
cell.end = null;
78
// Internal state
79
this._n = 0;
80
this._clear_before_next_output = false;
81
this._output_length = 0;
82
this._in_more_output_mode = false;
83
this._state = "ready";
84
// Report that computation started if there is no output soon.
85
if (this._opts.report_started_ms != null) {
86
setTimeout(this._report_started, this._opts.report_started_ms);
87
}
88
89
this.stdin = this.stdin.bind(this);
90
}
91
92
close = (): void => {
93
if (this._state == "closed") return;
94
this._state = "closed";
95
this.emit("done");
96
this.removeAllListeners();
97
close(this, new Set(["_state", "close"]));
98
};
99
100
_clear_output = (save?: any): void => {
101
if (this._state === "closed") {
102
return;
103
}
104
this._clear_before_next_output = false;
105
// clear output message -- we delete all the outputs
106
// reset the counter n, save, and are done.
107
// IMPORTANT: In Jupyter the clear_output message and everything
108
// before it is NOT saved in the notebook output itself
109
// (like in Sage worksheets).
110
this._opts.cell.output = null;
111
this._n = 0;
112
this._output_length = 0;
113
this.emit("change", save);
114
};
115
116
_report_started = (): void => {
117
if (this._state == "closed" || this._n > 0) {
118
// do nothing -- already getting output or done.
119
return;
120
}
121
this.emit("change", true);
122
};
123
124
// Call when computation starts
125
start = () => {
126
if (this._state === "closed") {
127
return;
128
}
129
this._opts.cell.start = (new Date() as any) - 0;
130
this._opts.cell.state = "busy";
131
this.emit("change", true);
132
};
133
134
// Call error if an error occurs. An appropriate error message is generated.
135
// Computation is considered done.
136
error = (err: any): void => {
137
if (err === "closed") {
138
// See https://github.com/sagemathinc/cocalc/issues/2388
139
this.message({
140
data: {
141
"text/markdown":
142
"<font color='red'>**Jupyter Kernel terminated:**</font> This might be caused by running out of memory or hitting a bug in some library (e.g., forking too many processes, trying to access invalid memory, etc.). Consider restarting or upgrading your project or running the relevant code directly in a terminal to track down the cause, as [explained here](https://github.com/sagemathinc/cocalc/wiki/KernelTerminated).",
143
},
144
});
145
} else {
146
this.message({
147
text: `${err}`,
148
name: "stderr",
149
});
150
}
151
this.done();
152
};
153
154
// Call done exactly once when done
155
done = (): void => {
156
if (this._state === "closed") {
157
return;
158
}
159
this._opts.cell.state = "done";
160
if (this._opts.cell.start == null) {
161
this._opts.cell.start = now();
162
}
163
this._opts.cell.end = now();
164
this.emit("change", true);
165
this.close();
166
};
167
168
// Handle clear
169
clear = (wait: any): void => {
170
if (wait) {
171
// wait until next output before clearing.
172
this._clear_before_next_output = true;
173
return;
174
}
175
this._clear_output();
176
};
177
178
_clean_mesg = (mesg: any): void => {
179
delete mesg.execution_state;
180
delete mesg.code;
181
delete mesg.status;
182
delete mesg.source;
183
for (const k in mesg) {
184
const v = mesg[k];
185
if (is_object(v) && len(v) === 0) {
186
delete mesg[k];
187
}
188
}
189
};
190
191
private _push_mesg = (mesg: any, save?: boolean): void => {
192
if (this._state === "closed") {
193
return;
194
}
195
196
if (save == null) {
197
const n = now();
198
if (n - this.lastSave > this.saveIntervalMs) {
199
save = true;
200
this.lastSave = n;
201
this.saveIntervalMs = Math.min(
202
MAX_SAVE_INTERVAL_MS,
203
this.saveIntervalMs * 1.1
204
);
205
}
206
} else if (save == true) {
207
this.lastSave = now();
208
}
209
210
if (this._opts.cell.output === null) {
211
this._opts.cell.output = {};
212
}
213
this._opts.cell.output[`${this._n}`] = mesg;
214
this._n += 1;
215
this.emit("change", save);
216
};
217
218
set_input = (input: any, save = true): void => {
219
if (this._state === "closed") {
220
return;
221
}
222
this._opts.cell.input = input;
223
this.emit("change", save);
224
};
225
226
// Process incoming messages. This may mutate mesg.
227
message = (mesg: any): void => {
228
let has_exec_count: any;
229
if (this._state === "closed") {
230
return;
231
}
232
233
if (this._opts.cell.end) {
234
// ignore any messages once we're done.
235
return;
236
}
237
238
// record execution_count, if there.
239
if (mesg.execution_count != null) {
240
has_exec_count = true;
241
this._opts.cell.exec_count = mesg.execution_count;
242
delete mesg.execution_count;
243
} else {
244
has_exec_count = false;
245
}
246
247
// delete useless fields
248
this._clean_mesg(mesg);
249
250
if (len(mesg) === 0) {
251
// don't even bother saving this message; nothing useful here.
252
return;
253
}
254
255
if (has_exec_count) {
256
// message that has an execution count
257
mesg.exec_count = this._opts.cell.exec_count;
258
}
259
260
// hook to process message (e.g., this may mutate mesg,
261
// e.g., to remove big images)
262
this.emit("process", mesg);
263
264
if (this._clear_before_next_output) {
265
this._clear_output(false);
266
}
267
268
if (!this._opts.max_output_length) {
269
this._push_mesg(mesg);
270
return;
271
}
272
273
// worry about length
274
const s = JSON.stringify(mesg);
275
const mesg_length = (s && s.length) || 0;
276
this._output_length += mesg_length;
277
278
if (this._output_length <= this._opts.max_output_length) {
279
this._push_mesg(mesg);
280
return;
281
}
282
283
// Check if we have entered the mode were output gets put in
284
// the set_more_output buffer.
285
if (!this._in_more_output_mode) {
286
this._push_mesg({ more_output: true });
287
this._in_more_output_mode = true;
288
}
289
this.emit("more_output", mesg, mesg_length);
290
};
291
292
async stdin(prompt: string, password: boolean): Promise<string> {
293
// See docs for stdin option to execute_code in backend jupyter.coffee
294
this._push_mesg({ name: "input", opts: { prompt, password } });
295
// Now we wait until the output message we just included has its
296
// value set. Then we call cb with that value.
297
// This weird thing below sets this._stdin_cb, then
298
// waits for this._stdin_cb to be called, which happens
299
// when cell_changed gets called.
300
return await callback((cb) => (this._stdin_cb = cb));
301
}
302
303
// Call this when the cell changes; only used for stdin right now.
304
cell_changed = (cell: any, get_password: any): void => {
305
if (this._state === "closed") {
306
return;
307
}
308
if (this._stdin_cb == null) {
309
return;
310
}
311
const output = cell != null ? cell.get("output") : undefined;
312
if (output == null) {
313
return;
314
}
315
const value = output.getIn([`${output.size - 1}`, "value"]);
316
if (value != null) {
317
let x = value;
318
if (this._opts.cell.output) {
319
const n = `${len(this._opts.cell.output) - 1}`;
320
if (
321
get_password != null &&
322
this._opts.cell.output[n] &&
323
this._opts.cell.output[n].opts != null &&
324
this._opts.cell.output[n].opts.password
325
) {
326
// In case of a password, the value is NEVER placed in the document.
327
// Instead the value is submitted to the backend via https, with
328
// a random identifier put in the value.
329
x = get_password(); // get actual password
330
}
331
if (this._opts.cell.output[`${n}`] != null) {
332
this._opts.cell.output[`${n}`].value = value;
333
} // sync output-handler view of output with syncdb
334
}
335
this._stdin_cb(undefined, x);
336
delete this._stdin_cb;
337
}
338
};
339
340
payload = (payload: any): void => {
341
if (this._state === "closed") {
342
return;
343
}
344
if (payload.source === "set_next_input") {
345
this.set_input(payload.text);
346
} else if (payload.source === "page") {
347
// Just handle as a normal message; and we don't show in the pager,
348
// which doesn't make sense for multiple users.
349
// This happens when requesting help for r:
350
// https://github.com/sagemathinc/cocalc/issues/1933
351
this.message(payload);
352
} else {
353
// No idea what to do with this...
354
if (typeof this._opts.dbg === "function") {
355
this._opts.dbg(`Unknown PAYLOAD: ${to_json(payload)}`);
356
}
357
}
358
};
359
}
360
361