Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/jupyter/execute/execute-code.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6Send code to a kernel to be evaluated, then wait for7the results and gather them together.89TODO: for easy testing/debugging, at an "async run() : Messages[]" method.10*/1112import { callback, delay } from "awaiting";13import { EventEmitter } from "events";14import { VERSION } from "@cocalc/jupyter/kernel/version";15import type { JupyterKernelInterface as JupyterKernel } from "@cocalc/jupyter/types/project-interface";16import type { MessageType } from "@nteract/messaging";17import { bind_methods, copy_with, deep_copy, uuid } from "@cocalc/util/misc";18import {19CodeExecutionEmitterInterface,20ExecOpts,21StdinFunction,22} from "@cocalc/jupyter/types/project-interface";23import { getLogger } from "@cocalc/backend/logger";2425const log = getLogger("jupyter:execute-code");2627type State = "init" | "closed" | "running";2829export class CodeExecutionEmitter30extends EventEmitter31implements CodeExecutionEmitterInterface32{33readonly kernel: JupyterKernel;34readonly code: string;35readonly id?: string;36readonly stdin?: StdinFunction;37readonly halt_on_error: boolean;38// DO NOT set iopub_done or shell_done directly; instead39// set them using the function set_shell_done and set_iopub_done.40// This ensures that we call _finish when both vars have been set.41private iopub_done: boolean = false;42private shell_done: boolean = false;43private state: State = "init";44private all_output: object[] = [];45private _message: any;46private _go_cb: Function | undefined = undefined;47private timeout_ms?: number;48private timer?: any;49private killing: string = "";5051constructor(kernel: JupyterKernel, opts: ExecOpts) {52super();53this.kernel = kernel;54this.code = opts.code;55this.id = opts.id;56this.stdin = opts.stdin;57this.halt_on_error = !!opts.halt_on_error;58this.timeout_ms = opts.timeout_ms;59this._message = {60parent_header: {},61metadata: {},62channel: "shell",63header: {64msg_id: `execute_${uuid()}`,65username: "",66session: "",67msg_type: "execute_request" as MessageType,68version: VERSION,69date: new Date().toISOString(),70},71content: {72code: this.code,73silent: false,74store_history: true, // so execution_count is updated.75user_expressions: {},76allow_stdin: this.stdin != null,77},78};7980bind_methods(this);81}8283// Emits a valid result84// result is https://jupyter-client.readthedocs.io/en/stable/messaging.html#python-api85// Or an array of those when this.all is true86emit_output(output: object): void {87this.all_output.push(output);88this.emit("output", output);89}9091// Call this to inform anybody listening that we've canceled92// this execution, and will NOT be doing it ever, and it93// was explicitly canceled.94cancel(): void {95this.emit("canceled");96}9798close(): void {99if (this.state == "closed") return;100if (this.timer != null) {101clearTimeout(this.timer);102delete this.timer;103}104this.state = "closed";105this.emit("closed");106this.removeAllListeners();107}108109throw_error(err): void {110this.emit("error", err);111this.close();112}113114async _handle_stdin(mesg: any): Promise<void> {115if (!this.stdin) {116throw Error("BUG -- stdin handling not supported");117}118log.silly("_handle_stdin: STDIN kernel --> server: ", mesg);119if (mesg.parent_header.msg_id !== this._message.header.msg_id) {120log.warn(121"_handle_stdin: STDIN msg_id mismatch:",122mesg.parent_header.msg_id,123this._message.header.msg_id124);125return;126}127128let response;129try {130response = await this.stdin(131mesg.content.prompt ? mesg.content.prompt : "",132!!mesg.content.password133);134} catch (err) {135response = `ERROR -- ${err}`;136}137log.silly("_handle_stdin: STDIN client --> server", response);138const m = {139channel: "stdin",140parent_header: this._message.header,141metadata: {},142header: {143msg_id: uuid(), // this._message.header.msg_id144username: "",145session: "",146msg_type: "input_reply" as MessageType,147version: VERSION,148date: new Date().toISOString(),149},150content: {151value: response,152},153};154log.silly("_handle_stdin: STDIN server --> kernel:", m);155this.kernel.channel?.next(m);156}157158_handle_shell(mesg: any): void {159if (mesg.parent_header.msg_id !== this._message.header.msg_id) {160log.silly(161`_handle_shell: msg_id mismatch: ${mesg.parent_header.msg_id} != ${this._message.header.msg_id}`162);163return;164}165log.silly("_handle_shell: got SHELL message -- ", mesg);166167if (mesg.content?.status == "ok") {168this._push_mesg(mesg);169this.set_shell_done(true);170} else {171log.warn(`_handle_shell: status != ok: ${mesg.content?.status}`);172// NOTE: I'm adding support for "abort" status, since I was just reading173// the kernel docs and it exists but is deprecated. Some old kernels174// might use it and we should thus properly support it:175// https://jupyter-client.readthedocs.io/en/stable/messaging.html#request-reply176//177// 2023-05-11: this was conditional on mesg.content?.status == "error" or == "abort"178// but in reality, there was also "aborted". Hence this as an catch-all else.179if (this.halt_on_error) {180this.kernel.clear_execute_code_queue();181}182this.set_shell_done(true);183}184}185186private set_shell_done(value: boolean): void {187this.shell_done = value;188if (this.iopub_done && this.shell_done) {189this._finish();190}191}192193private set_iopub_done(value: boolean): void {194this.iopub_done = value;195if (this.iopub_done && this.shell_done) {196this._finish();197}198}199200_handle_iopub(mesg: any): void {201if (mesg.parent_header.msg_id !== this._message.header.msg_id) {202// iopub message for a different execute request so ignore it.203return;204}205log.silly("_handle_iopub: got IOPUB message -- ", mesg);206207if (mesg.content?.comm_id != null) {208// A comm message that is a result of execution of this code.209// IGNORE here -- all comm messages are handles at a higher210// level in jupyter.ts. Also, this case should never happen, since211// we do not emit an event from jupyter.ts in this case anyways.212} else {213// A normal output message.214this._push_mesg(mesg);215}216217this.set_iopub_done(218!!this.killing || mesg.content?.execution_state == "idle"219);220}221222// Called if the kernel is closed for some reason, e.g., crashing.223private handle_closed(): void {224log.debug("CodeExecutionEmitter.handle_closed: kernel closed");225this.killing = "kernel crashed";226this._finish();227}228229_finish(): void {230if (this.state == "closed") {231return;232}233this.kernel.removeListener("iopub", this._handle_iopub);234if (this.stdin != null) {235this.kernel.removeListener("stdin", this._handle_stdin);236}237this.kernel.removeListener("shell", this._handle_shell);238if (this.kernel._execute_code_queue != null) {239this.kernel._execute_code_queue.shift(); // finished240this.kernel._process_execute_code_queue(); // start next exec241}242this.kernel.removeListener("close", this.handle_closed);243this._push_mesg({ done: true });244this.close();245246// Finally call the callback that was setup in this._go.247// This is what makes it possible to await on the entire248// execution. Also it is important to explicitly249// signal an error if we had to kill execution due250// to hitting a timeout, since the kernel may or may251// not have randomly done so itself in output.252this._go_cb?.(this.killing);253this._go_cb = undefined;254}255256_push_mesg(mesg): void {257// TODO: mesg isn't a normal javascript object;258// it's **silently** immutable, which259// is pretty annoying for our use. For now, we260// just copy it, which is a waste.261const header = mesg.header;262mesg = copy_with(mesg, ["metadata", "content", "buffers", "done"]);263mesg = deep_copy(mesg);264if (header !== undefined) {265mesg.msg_type = header.msg_type;266}267this.emit_output(mesg);268}269270async go(): Promise<object[]> {271await callback(this._go);272return this.all_output;273}274275_go(cb: Function): void {276if (this.state != "init") {277cb("may only run once");278return;279}280this.state = "running";281log.silly("_execute_code", this.code);282if (this.kernel.get_state() === "closed") {283log.silly("_execute_code", "kernel.get_state() is closed");284this.close();285cb("closed - jupyter - execute_code");286return;287}288289this._go_cb = cb; // this._finish will call this.290291if (this.stdin != null) {292this.kernel.on("stdin", this._handle_stdin);293}294this.kernel.on("shell", this._handle_shell);295this.kernel.on("iopub", this._handle_iopub);296297log.debug("_execute_code: send the message to get things rolling");298this.kernel.channel?.next(this._message);299300this.kernel.on("closed", this.handle_closed);301302if (this.timeout_ms) {303// setup a timeout at which point things will get killed if they don't finish304this.timer = setTimeout(this.timeout, this.timeout_ms);305}306}307308private async timeout(): Promise<void> {309if (this.state == "closed") {310log.debug(311"CodeExecutionEmitter.timeout: already finished, so nothing to worry about"312);313return;314}315this.killing =316"Timeout Error: execution time limit = " +317Math.round((this.timeout_ms ?? 0) / 1000) +318" seconds";319let tries = 3;320let d = 1000;321while (this.state != ("closed" as State) && tries > 0) {322log.debug(323"CodeExecutionEmitter.timeout: code still running, so try to interrupt it"324);325// Code still running but timeout reached.326// Keep sending interrupt signal, which will hopefully do something to327// stop running code (there is no guarantee, of course). We328// try a few times...329this.kernel.signal("SIGINT");330await delay(d);331d *= 1.3;332tries -= 1;333}334if (this.state != ("closed" as State)) {335log.debug(336"CodeExecutionEmitter.timeout: now try SIGKILL, which should kill things for sure."337);338this.kernel.signal("SIGKILL");339this._finish();340}341}342}343344345