CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/sync/editor/generic/evaluator.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
//##############################################################################
7
//
8
// CoCalc: Collaborative Calculation
9
// Copyright (C) 2016, Sagemath Inc., MS-RSL.
10
//
11
//##############################################################################
12
13
/*
14
Evaluation of code with streaming output built on both the clients and
15
server (local hub) using a sync_table. This evaluator is associated
16
to a syncdoc editing session, and provides code evaluation that
17
may be used to enhance the experience of document editing.
18
*/
19
20
const stringify = require("json-stable-stringify");
21
22
import { SyncTable } from "@cocalc/sync/table/synctable";
23
import { to_key } from "@cocalc/sync/table/util";
24
import {
25
close,
26
copy_with,
27
copy_without,
28
from_json,
29
to_json,
30
} from "@cocalc/util/misc";
31
import { FLAGS, MARKERS, sagews } from "@cocalc/util/sagews";
32
import { ISageSession, SageCallOpts } from "@cocalc/util/types/sage";
33
import { SyncDoc } from "./sync-doc";
34
import { Client } from "./types";
35
36
type State = "init" | "ready" | "closed";
37
38
// What's supported so far.
39
type Program = "sage" | "bash";
40
41
// Object whose meaning depends on the program
42
type Input = any;
43
44
export class Evaluator {
45
private syncdoc: SyncDoc;
46
private client: Client;
47
private inputs_table: SyncTable;
48
private outputs_table: SyncTable;
49
private sage_session: ISageSession;
50
private state: State = "init";
51
private table_options: any[] = [];
52
private create_synctable: Function;
53
54
private last_call_time: Date = new Date(0);
55
56
constructor(syncdoc: SyncDoc, client: Client, create_synctable: Function) {
57
this.syncdoc = syncdoc;
58
this.client = client;
59
this.create_synctable = create_synctable;
60
if (this.syncdoc.data_server == "project") {
61
// options only supported for project...
62
this.table_options = [{ ephemeral: true, persistent: true }];
63
}
64
}
65
66
public async init(): Promise<void> {
67
// Initialize the inputs and outputs tables in parallel:
68
const i = this.init_eval_inputs();
69
const o = this.init_eval_outputs();
70
await Promise.all([i, o]);
71
72
if (this.client.is_project()) {
73
await this.init_project_evaluator();
74
}
75
this.set_state("ready");
76
}
77
78
public async close(): Promise<void> {
79
if (this.inputs_table != null) {
80
await this.inputs_table.close();
81
}
82
if (this.outputs_table != null) {
83
await this.outputs_table.close();
84
}
85
if (this.sage_session != null) {
86
this.sage_session.close();
87
}
88
close(this);
89
this.set_state("closed");
90
}
91
92
private dbg(_f): Function {
93
if (this.client.is_project()) {
94
return this.client.dbg(`Evaluator.${_f}`);
95
} else {
96
return (..._) => {};
97
}
98
}
99
100
private async init_eval_inputs(): Promise<void> {
101
const query = {
102
eval_inputs: [
103
{
104
string_id: this.syncdoc.get_string_id(),
105
input: null,
106
time: null,
107
user_id: null,
108
},
109
],
110
};
111
this.inputs_table = await this.create_synctable(
112
query,
113
this.table_options,
114
0,
115
);
116
}
117
118
private async init_eval_outputs(): Promise<void> {
119
const query = {
120
eval_outputs: [
121
{
122
string_id: this.syncdoc.get_string_id(),
123
output: null,
124
time: null,
125
number: null,
126
},
127
],
128
};
129
this.outputs_table = await this.create_synctable(
130
query,
131
this.table_options,
132
0,
133
);
134
this.outputs_table.setMaxListeners(200); // in case of many evaluations at once.
135
}
136
137
private set_state(state: State): void {
138
this.state = state;
139
}
140
141
private assert_not_closed(): void {
142
if (this.state === "closed") {
143
throw Error("closed -- sync evaluator");
144
}
145
}
146
147
private assert_is_project(): void {
148
if (!this.client.is_project()) {
149
throw Error("BUG -- this code should only run in the project.");
150
}
151
}
152
153
private assert_is_browser(): void {
154
if (this.client.is_project()) {
155
throw Error("BUG -- this code should only run in the web browser.");
156
}
157
}
158
159
// If given, cb below is called repeatedly with results as they appear.
160
public call(opts: { program: Program; input: Input; cb?: Function }): void {
161
this.assert_not_closed();
162
this.assert_is_browser();
163
const dbg = this.dbg("call");
164
dbg(opts.program, opts.input, opts.cb != undefined);
165
166
let time = this.client.server_time();
167
// Perturb time if it is <= last time when this client did an evaluation.
168
// We do this so that the time below is different than anything else.
169
if (time <= this.last_call_time) {
170
// slightly later
171
time = new Date(this.last_call_time.valueOf() + 1);
172
}
173
// make time be congruent to our uid
174
this.last_call_time = time;
175
176
const user_id: number = this.syncdoc.get_my_user_id();
177
const obj = {
178
string_id: this.syncdoc.get_string_id(),
179
time,
180
user_id,
181
input: copy_without(opts, "cb"),
182
};
183
dbg(JSON.stringify(obj));
184
this.inputs_table.set(obj);
185
// root cause of https://github.com/sagemathinc/cocalc/issues/1589
186
this.inputs_table.save();
187
188
if (opts.cb == null) {
189
// Fire and forget -- no need to listen for responses.
190
dbg("no cb defined, so fire and forget");
191
return;
192
}
193
194
// Listen for output until we receive a message with mesg.done true.
195
const messages = {};
196
197
// output may appear in random order, so we use mesg_number
198
// to sort it out.
199
let mesg_number = 0;
200
201
const send = (mesg) => {
202
dbg("send", mesg);
203
if (mesg.done) {
204
this.outputs_table.removeListener("change", handle_output);
205
}
206
if (opts.cb != null) {
207
opts.cb(mesg);
208
}
209
};
210
211
const handle_output = (keys: string[]) => {
212
// console.log("handle_output #{to_json(keys)}")
213
dbg("handle_output", keys);
214
this.assert_not_closed();
215
for (const key of keys) {
216
const t = from_json(key);
217
if (t[1].valueOf() != time.valueOf()) {
218
dbg("not our eval", t[1].valueOf(), time.valueOf());
219
continue;
220
}
221
const x = this.outputs_table.get(key);
222
if (x == null) {
223
dbg("x is null");
224
continue;
225
}
226
const y = x.get("output");
227
if (y == null) {
228
dbg("y is null");
229
continue;
230
}
231
dbg("y = ", JSON.stringify(y.toJS()));
232
const mesg = y.toJS();
233
if (mesg == null) {
234
dbg("probably never happens, but makes typescript happy.");
235
continue;
236
}
237
// OK, we called opts.cb on output mesg with the given timestamp and user_id...
238
delete mesg.id; // waste of space
239
240
// Messages may arrive in somewhat random order. This *DOES HAPPEN*,
241
// since changes are output from the project by computing a diff of
242
// a synctable, and then an array of objects sent out... and
243
// the order in that diff is random.
244
// E.g. this in a Sage worksheet would break:
245
// for i in range(20): print i; sys.stdout.flush()
246
if (t[2] !== mesg_number) {
247
// Not the next message, so put message in the
248
// set of messages that arrived too early.
249
dbg("put message in holding", t[2], mesg_number);
250
messages[t[2]] = mesg;
251
continue;
252
}
253
254
// Finally, the right message to handle next.
255
// Inform caller of result
256
send(mesg);
257
mesg_number += 1;
258
259
// Then, push out any messages that arrived earlier
260
// that are ready to send.
261
while (messages[mesg_number] != null) {
262
send(messages[mesg_number]);
263
delete messages[mesg_number];
264
mesg_number += 1;
265
}
266
}
267
};
268
269
this.outputs_table.on("change", handle_output);
270
}
271
272
private execute_sage_code_hook(output_uuid: string): Function {
273
this.assert_is_project();
274
const dbg = this.dbg(`execute_sage_code_hook('${output_uuid}')`);
275
dbg();
276
this.assert_not_closed();
277
278
// We track the output_line from within this project, and compare
279
// to what is set in the document (by the user). If they go out
280
// of sync for a while, we fill in the result.
281
// TODO: since it's now possible to know whether or not users are
282
// connected... maybe we could use that instead?
283
let output_line = MARKERS.output;
284
285
const hook = (mesg) => {
286
dbg(`processing mesg '${to_json(mesg)}'`);
287
let content = this.syncdoc.to_str();
288
let i = content.indexOf(MARKERS.output + output_uuid);
289
if (i === -1) {
290
// no cell anymore, so do nothing further right now.
291
return;
292
}
293
i += 37;
294
const n = content.indexOf("\n", i);
295
if (n === -1) {
296
// corrupted? -- don't try further right now.
297
return;
298
}
299
// This is what the frontend also does:
300
output_line +=
301
stringify(copy_without(mesg, ["id", "event"])) + MARKERS.output;
302
303
if (output_line.length - 1 <= n - i) {
304
// Things are looking fine (at least, the line is longer enough).
305
// TODO: try instead comparing actual content, not just length?
306
// Or maybe don't... since this stupid code will all get deleted anyways
307
// when we rewrite sagews handling.
308
return;
309
}
310
311
dbg("browser client didn't maintain sync promptly. fixing");
312
dbg(
313
`sage_execute_code: i=${i}, n=${n}, output_line.length=${output_line.length}`,
314
);
315
dbg(`output_line='${output_line}', sync_line='${content.slice(i, n)}'`);
316
const x = content.slice(0, i);
317
content = x + output_line + content.slice(n);
318
if (mesg.done) {
319
let j = x.lastIndexOf(MARKERS.cell);
320
if (j !== -1) {
321
j = x.lastIndexOf("\n", j);
322
const cell_id = x.slice(j + 2, j + 38);
323
//dbg("removing a cell flag: before='#{content}', cell_id='#{cell_id}'")
324
const S = sagews(content);
325
S.remove_cell_flag(cell_id, FLAGS.running);
326
S.set_cell_flag(cell_id, FLAGS.this_session);
327
content = S.content;
328
}
329
}
330
//dbg("removing a cell flag: after='#{content}'")
331
this.syncdoc.from_str(content);
332
this.syncdoc.commit();
333
};
334
335
return (mesg) => {
336
setTimeout(() => hook(mesg), 5000);
337
};
338
}
339
340
private handle_input_change(key: string): void {
341
this.assert_not_closed();
342
this.assert_is_project();
343
344
const dbg = this.dbg("handle_input_change");
345
dbg(`change: ${key}`);
346
347
const t = from_json(key);
348
let number, string_id, time;
349
const id = ([string_id, time, number] = [t[0], t[1], 0]);
350
if (this.outputs_table.get(to_key(id)) != null) {
351
dbg("already being handled");
352
return;
353
}
354
dbg(`no outputs yet with key ${to_json(id)}`);
355
const r = this.inputs_table.get(key);
356
if (r == null) {
357
dbg("deleting from input?");
358
throw Error("deleting from input not implemented");
359
// happens when deleting from input table (if that is
360
// ever supported, e.g., for maybe trimming old evals...)
361
return;
362
}
363
const input = r.get("input");
364
if (input == null) {
365
throw Error("input must be specified");
366
return;
367
}
368
const x = input.toJS();
369
dbg("x = ", x);
370
if (x == null) {
371
throw Error("BUG: can't happen");
372
return;
373
}
374
if (x.program == null || x.input == null) {
375
this.outputs_table.set({
376
string_id,
377
time,
378
number,
379
output: {
380
error: "must specify both program and input",
381
done: true,
382
},
383
});
384
this.outputs_table.save();
385
return;
386
}
387
388
let f;
389
switch (x.program) {
390
case "sage":
391
f = this.evaluate_using_sage;
392
break;
393
case "shell":
394
f = this.evaluate_using_shell;
395
break;
396
default:
397
this.outputs_table.set({
398
string_id,
399
time,
400
number,
401
output: {
402
error: `no program '${x.program}'`,
403
done: true,
404
},
405
});
406
this.outputs_table.save();
407
return;
408
}
409
f = f.bind(this);
410
411
let hook: Function;
412
if (
413
x.program === "sage" &&
414
x.input.event === "execute_code" &&
415
x.input.output_uuid != null
416
) {
417
hook = this.execute_sage_code_hook(x.input.output_uuid);
418
} else {
419
// no op
420
hook = (_) => {};
421
}
422
423
f(x.input, (output) => {
424
if (this.state == "closed") {
425
return;
426
}
427
428
dbg(`got output='${to_json(output)}'; id=${to_json(id)}`);
429
hook(output);
430
this.outputs_table.set({ string_id, time, number, output });
431
this.outputs_table.save();
432
number += 1;
433
});
434
}
435
436
// Runs only in the project
437
private async init_project_evaluator(): Promise<void> {
438
this.assert_is_project();
439
440
const dbg = this.dbg("init_project_evaluator");
441
dbg("init");
442
this.inputs_table.on("change", async (keys) => {
443
for (const key of keys) {
444
await this.handle_input_change(key);
445
}
446
});
447
/* CRITICAL: it's very important to handle all the inputs
448
that may have happened just moments before
449
this object got created. Why? The first input is
450
the user trying to frickin' evaluate a cell
451
in their worksheet to start things running... and they
452
might somehow do that moments before the worksheet
453
gets opened on the backend; if we don't do the
454
following, then often this eval is missed, and
455
confusion and frustration ensues. */
456
const v = this.inputs_table.get();
457
if (v != null) {
458
dbg(`handle ${v.size} pending evaluations`);
459
for (const key of v.keys()) {
460
if (key != null) {
461
await this.handle_input_change(key);
462
}
463
}
464
}
465
}
466
467
private ensure_sage_session_exists(): void {
468
if (this.sage_session != null) return;
469
this.dbg("ensure_sage_session_exists")();
470
// This code only runs in the project, where client
471
// has a sage_session method.
472
this.sage_session = this.client.sage_session({
473
path: this.syncdoc.get_path(),
474
});
475
}
476
477
// Runs only in the project
478
private async evaluate_using_sage(
479
input: SageCallOpts["input"],
480
cb: SageCallOpts["cb"],
481
): Promise<void> {
482
this.assert_is_project();
483
const dbg = this.dbg("evaluate_using_sage");
484
dbg();
485
486
// TODO: input also may have -- uuid, output_uuid, timeout
487
if (input.event === "execute_code") {
488
input = copy_with(input, ["code", "data", "preparse", "event", "id"]);
489
dbg(
490
"ensure sage session is running, so we can actually execute the code",
491
);
492
}
493
try {
494
this.ensure_sage_session_exists();
495
if (input.event === "execute_code") {
496
// We only need to actually create the socket, which makes a running process,
497
// if we are going to execute code. The other events, e.g., 'status' don't
498
// need a running sage session.
499
if (!this.sage_session.is_running()) {
500
dbg("sage session is not running, so init socket");
501
await this.sage_session.init_socket();
502
}
503
}
504
} catch (error) {
505
cb({ error, done: true });
506
return;
507
}
508
dbg("send call to backend sage session manager", to_json(input));
509
await this.sage_session.call({ input, cb });
510
}
511
512
// Runs only in the project
513
private evaluate_using_shell(input: Input, cb: Function): void {
514
this.assert_is_project();
515
const dbg = this.dbg("evaluate_using_shell");
516
dbg();
517
518
input.cb = (err, output) => {
519
if (output == null) {
520
output = {};
521
}
522
if (err) {
523
output.error = err;
524
}
525
output.done = true;
526
cb(output);
527
};
528
this.client.shell(input);
529
}
530
}
531
532