Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/sync/editor/generic/evaluator.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45//##############################################################################6//7// CoCalc: Collaborative Calculation8// Copyright (C) 2016, Sagemath Inc., MS-RSL.9//10//##############################################################################1112/*13Evaluation of code with streaming output built on both the clients and14server (local hub) using a sync_table. This evaluator is associated15to a syncdoc editing session, and provides code evaluation that16may be used to enhance the experience of document editing.17*/1819const stringify = require("json-stable-stringify");2021import { SyncTable } from "@cocalc/sync/table/synctable";22import { to_key } from "@cocalc/sync/table/util";23import {24close,25copy_with,26copy_without,27from_json,28to_json,29} from "@cocalc/util/misc";30import { FLAGS, MARKERS, sagews } from "@cocalc/util/sagews";31import { ISageSession, SageCallOpts } from "@cocalc/util/types/sage";32import { SyncDoc } from "./sync-doc";33import { Client } from "./types";3435type State = "init" | "ready" | "closed";3637// What's supported so far.38type Program = "sage" | "bash";3940// Object whose meaning depends on the program41type Input = any;4243export class Evaluator {44private syncdoc: SyncDoc;45private client: Client;46private inputs_table: SyncTable;47private outputs_table: SyncTable;48private sage_session: ISageSession;49private state: State = "init";50private table_options: any[] = [];51private create_synctable: Function;5253private last_call_time: Date = new Date(0);5455constructor(syncdoc: SyncDoc, client: Client, create_synctable: Function) {56this.syncdoc = syncdoc;57this.client = client;58this.create_synctable = create_synctable;59if (this.syncdoc.data_server == "project") {60// options only supported for project...61this.table_options = [{ ephemeral: true, persistent: true }];62}63}6465public async init(): Promise<void> {66// Initialize the inputs and outputs tables in parallel:67const i = this.init_eval_inputs();68const o = this.init_eval_outputs();69await Promise.all([i, o]);7071if (this.client.is_project()) {72await this.init_project_evaluator();73}74this.set_state("ready");75}7677public async close(): Promise<void> {78if (this.inputs_table != null) {79await this.inputs_table.close();80}81if (this.outputs_table != null) {82await this.outputs_table.close();83}84if (this.sage_session != null) {85this.sage_session.close();86}87close(this);88this.set_state("closed");89}9091private dbg(_f): Function {92if (this.client.is_project()) {93return this.client.dbg(`Evaluator.${_f}`);94} else {95return (..._) => {};96}97}9899private async init_eval_inputs(): Promise<void> {100const query = {101eval_inputs: [102{103string_id: this.syncdoc.get_string_id(),104input: null,105time: null,106user_id: null,107},108],109};110this.inputs_table = await this.create_synctable(111query,112this.table_options,1130,114);115}116117private async init_eval_outputs(): Promise<void> {118const query = {119eval_outputs: [120{121string_id: this.syncdoc.get_string_id(),122output: null,123time: null,124number: null,125},126],127};128this.outputs_table = await this.create_synctable(129query,130this.table_options,1310,132);133this.outputs_table.setMaxListeners(200); // in case of many evaluations at once.134}135136private set_state(state: State): void {137this.state = state;138}139140private assert_not_closed(): void {141if (this.state === "closed") {142throw Error("closed -- sync evaluator");143}144}145146private assert_is_project(): void {147if (!this.client.is_project()) {148throw Error("BUG -- this code should only run in the project.");149}150}151152private assert_is_browser(): void {153if (this.client.is_project()) {154throw Error("BUG -- this code should only run in the web browser.");155}156}157158// If given, cb below is called repeatedly with results as they appear.159public call(opts: { program: Program; input: Input; cb?: Function }): void {160this.assert_not_closed();161this.assert_is_browser();162const dbg = this.dbg("call");163dbg(opts.program, opts.input, opts.cb != undefined);164165let time = this.client.server_time();166// Perturb time if it is <= last time when this client did an evaluation.167// We do this so that the time below is different than anything else.168if (time <= this.last_call_time) {169// slightly later170time = new Date(this.last_call_time.valueOf() + 1);171}172// make time be congruent to our uid173this.last_call_time = time;174175const user_id: number = this.syncdoc.get_my_user_id();176const obj = {177string_id: this.syncdoc.get_string_id(),178time,179user_id,180input: copy_without(opts, "cb"),181};182dbg(JSON.stringify(obj));183this.inputs_table.set(obj);184// root cause of https://github.com/sagemathinc/cocalc/issues/1589185this.inputs_table.save();186187if (opts.cb == null) {188// Fire and forget -- no need to listen for responses.189dbg("no cb defined, so fire and forget");190return;191}192193// Listen for output until we receive a message with mesg.done true.194const messages = {};195196// output may appear in random order, so we use mesg_number197// to sort it out.198let mesg_number = 0;199200const send = (mesg) => {201dbg("send", mesg);202if (mesg.done) {203this.outputs_table.removeListener("change", handle_output);204}205if (opts.cb != null) {206opts.cb(mesg);207}208};209210const handle_output = (keys: string[]) => {211// console.log("handle_output #{to_json(keys)}")212dbg("handle_output", keys);213this.assert_not_closed();214for (const key of keys) {215const t = from_json(key);216if (t[1].valueOf() != time.valueOf()) {217dbg("not our eval", t[1].valueOf(), time.valueOf());218continue;219}220const x = this.outputs_table.get(key);221if (x == null) {222dbg("x is null");223continue;224}225const y = x.get("output");226if (y == null) {227dbg("y is null");228continue;229}230dbg("y = ", JSON.stringify(y.toJS()));231const mesg = y.toJS();232if (mesg == null) {233dbg("probably never happens, but makes typescript happy.");234continue;235}236// OK, we called opts.cb on output mesg with the given timestamp and user_id...237delete mesg.id; // waste of space238239// Messages may arrive in somewhat random order. This *DOES HAPPEN*,240// since changes are output from the project by computing a diff of241// a synctable, and then an array of objects sent out... and242// the order in that diff is random.243// E.g. this in a Sage worksheet would break:244// for i in range(20): print i; sys.stdout.flush()245if (t[2] !== mesg_number) {246// Not the next message, so put message in the247// set of messages that arrived too early.248dbg("put message in holding", t[2], mesg_number);249messages[t[2]] = mesg;250continue;251}252253// Finally, the right message to handle next.254// Inform caller of result255send(mesg);256mesg_number += 1;257258// Then, push out any messages that arrived earlier259// that are ready to send.260while (messages[mesg_number] != null) {261send(messages[mesg_number]);262delete messages[mesg_number];263mesg_number += 1;264}265}266};267268this.outputs_table.on("change", handle_output);269}270271private execute_sage_code_hook(output_uuid: string): Function {272this.assert_is_project();273const dbg = this.dbg(`execute_sage_code_hook('${output_uuid}')`);274dbg();275this.assert_not_closed();276277// We track the output_line from within this project, and compare278// to what is set in the document (by the user). If they go out279// of sync for a while, we fill in the result.280// TODO: since it's now possible to know whether or not users are281// connected... maybe we could use that instead?282let output_line = MARKERS.output;283284const hook = (mesg) => {285dbg(`processing mesg '${to_json(mesg)}'`);286let content = this.syncdoc.to_str();287let i = content.indexOf(MARKERS.output + output_uuid);288if (i === -1) {289// no cell anymore, so do nothing further right now.290return;291}292i += 37;293const n = content.indexOf("\n", i);294if (n === -1) {295// corrupted? -- don't try further right now.296return;297}298// This is what the frontend also does:299output_line +=300stringify(copy_without(mesg, ["id", "event"])) + MARKERS.output;301302if (output_line.length - 1 <= n - i) {303// Things are looking fine (at least, the line is longer enough).304// TODO: try instead comparing actual content, not just length?305// Or maybe don't... since this stupid code will all get deleted anyways306// when we rewrite sagews handling.307return;308}309310dbg("browser client didn't maintain sync promptly. fixing");311dbg(312`sage_execute_code: i=${i}, n=${n}, output_line.length=${output_line.length}`,313);314dbg(`output_line='${output_line}', sync_line='${content.slice(i, n)}'`);315const x = content.slice(0, i);316content = x + output_line + content.slice(n);317if (mesg.done) {318let j = x.lastIndexOf(MARKERS.cell);319if (j !== -1) {320j = x.lastIndexOf("\n", j);321const cell_id = x.slice(j + 2, j + 38);322//dbg("removing a cell flag: before='#{content}', cell_id='#{cell_id}'")323const S = sagews(content);324S.remove_cell_flag(cell_id, FLAGS.running);325S.set_cell_flag(cell_id, FLAGS.this_session);326content = S.content;327}328}329//dbg("removing a cell flag: after='#{content}'")330this.syncdoc.from_str(content);331this.syncdoc.commit();332};333334return (mesg) => {335setTimeout(() => hook(mesg), 5000);336};337}338339private handle_input_change(key: string): void {340this.assert_not_closed();341this.assert_is_project();342343const dbg = this.dbg("handle_input_change");344dbg(`change: ${key}`);345346const t = from_json(key);347let number, string_id, time;348const id = ([string_id, time, number] = [t[0], t[1], 0]);349if (this.outputs_table.get(to_key(id)) != null) {350dbg("already being handled");351return;352}353dbg(`no outputs yet with key ${to_json(id)}`);354const r = this.inputs_table.get(key);355if (r == null) {356dbg("deleting from input?");357throw Error("deleting from input not implemented");358// happens when deleting from input table (if that is359// ever supported, e.g., for maybe trimming old evals...)360return;361}362const input = r.get("input");363if (input == null) {364throw Error("input must be specified");365return;366}367const x = input.toJS();368dbg("x = ", x);369if (x == null) {370throw Error("BUG: can't happen");371return;372}373if (x.program == null || x.input == null) {374this.outputs_table.set({375string_id,376time,377number,378output: {379error: "must specify both program and input",380done: true,381},382});383this.outputs_table.save();384return;385}386387let f;388switch (x.program) {389case "sage":390f = this.evaluate_using_sage;391break;392case "shell":393f = this.evaluate_using_shell;394break;395default:396this.outputs_table.set({397string_id,398time,399number,400output: {401error: `no program '${x.program}'`,402done: true,403},404});405this.outputs_table.save();406return;407}408f = f.bind(this);409410let hook: Function;411if (412x.program === "sage" &&413x.input.event === "execute_code" &&414x.input.output_uuid != null415) {416hook = this.execute_sage_code_hook(x.input.output_uuid);417} else {418// no op419hook = (_) => {};420}421422f(x.input, (output) => {423if (this.state == "closed") {424return;425}426427dbg(`got output='${to_json(output)}'; id=${to_json(id)}`);428hook(output);429this.outputs_table.set({ string_id, time, number, output });430this.outputs_table.save();431number += 1;432});433}434435// Runs only in the project436private async init_project_evaluator(): Promise<void> {437this.assert_is_project();438439const dbg = this.dbg("init_project_evaluator");440dbg("init");441this.inputs_table.on("change", async (keys) => {442for (const key of keys) {443await this.handle_input_change(key);444}445});446/* CRITICAL: it's very important to handle all the inputs447that may have happened just moments before448this object got created. Why? The first input is449the user trying to frickin' evaluate a cell450in their worksheet to start things running... and they451might somehow do that moments before the worksheet452gets opened on the backend; if we don't do the453following, then often this eval is missed, and454confusion and frustration ensues. */455const v = this.inputs_table.get();456if (v != null) {457dbg(`handle ${v.size} pending evaluations`);458for (const key of v.keys()) {459if (key != null) {460await this.handle_input_change(key);461}462}463}464}465466private ensure_sage_session_exists(): void {467if (this.sage_session != null) return;468this.dbg("ensure_sage_session_exists")();469// This code only runs in the project, where client470// has a sage_session method.471this.sage_session = this.client.sage_session({472path: this.syncdoc.get_path(),473});474}475476// Runs only in the project477private async evaluate_using_sage(478input: SageCallOpts["input"],479cb: SageCallOpts["cb"],480): Promise<void> {481this.assert_is_project();482const dbg = this.dbg("evaluate_using_sage");483dbg();484485// TODO: input also may have -- uuid, output_uuid, timeout486if (input.event === "execute_code") {487input = copy_with(input, ["code", "data", "preparse", "event", "id"]);488dbg(489"ensure sage session is running, so we can actually execute the code",490);491}492try {493this.ensure_sage_session_exists();494if (input.event === "execute_code") {495// We only need to actually create the socket, which makes a running process,496// if we are going to execute code. The other events, e.g., 'status' don't497// need a running sage session.498if (!this.sage_session.is_running()) {499dbg("sage session is not running, so init socket");500await this.sage_session.init_socket();501}502}503} catch (error) {504cb({ error, done: true });505return;506}507dbg("send call to backend sage session manager", to_json(input));508await this.sage_session.call({ input, cb });509}510511// Runs only in the project512private evaluate_using_shell(input: Input, cb: Function): void {513this.assert_is_project();514const dbg = this.dbg("evaluate_using_shell");515dbg();516517input.cb = (err, output) => {518if (output == null) {519output = {};520}521if (err) {522output.error = err;523}524output.done = true;525cb(output);526};527this.client.shell(input);528}529}530531532