Path: blob/master/src/packages/jupyter/kernel/kernel.ts
5738 views
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6Jupyter Backend78For interactive testing:910$ node1112> j = require('./dist/kernel'); k = j.kernel({name:'python3', path:'x.ipynb'});13> console.log(JSON.stringify(await k.execute_code_now({code:'2+3'}),0,2))1415*/1617// POOL VERSION - faster to restart but possible subtle issues18const USE_KERNEL_POOL = true;1920// const DEBUG = true; // only for extreme debugging.21const DEBUG = false; // normal mode22if (DEBUG) {23console.log("Enabling low level Jupyter kernel debugging.");24}2526// NOTE: we choose to use node-cleanup instead of the much more27// popular exit-hook, since node-cleanup actually works for us.28// https://github.com/jtlapp/node-cleanup/issues/1629// Also exit-hook is hard to import from commonjs.30import nodeCleanup from "node-cleanup";31import { reuseInFlight } from "@cocalc/util/reuse-in-flight";32import { callback } from "awaiting";33import type { MessageType } from "@cocalc/jupyter/zmq/types";34import { jupyterSockets, type JupyterSockets } from "@cocalc/jupyter/zmq";35import { EventEmitter } from "node:events";36import { unlink } from "@cocalc/backend/misc/async-utils-node";37import { remove_redundant_reps } from "@cocalc/jupyter/ipynb/import-from-ipynb";38import { JupyterActions } from "@cocalc/jupyter/redux/project-actions";39import {40type BlobStoreInterface,41CodeExecutionEmitterInterface,42ExecOpts,43JupyterKernelInterface,44KernelInfo,45} from "@cocalc/jupyter/types/project-interface";46import { JupyterStore } from "@cocalc/jupyter/redux/store";47import { JUPYTER_MIMETYPES } from "@cocalc/jupyter/util/misc";48import type { SyncDB } from "@cocalc/sync/editor/db/sync";49import { retry_until_success, until } from "@cocalc/util/async-utils";50import createChdirCommand from "@cocalc/util/jupyter-api/chdir-commands";51import { key_value_store } from "@cocalc/util/key-value-store";52import {53copy,54deep_copy,55is_array,56len,57merge,58original_path,59path_split,60uuid,61uint8ArrayToBase64,62} from "@cocalc/util/misc";63import { CodeExecutionEmitter } from "@cocalc/jupyter/execute/execute-code";64import {65getLanguage,66get_kernel_data_by_name,67} from "@cocalc/jupyter/kernel/kernel-data";6869import launchJupyterKernel, {70LaunchJupyterOpts,71SpawnedKernel,72killKernel,73} from "@cocalc/jupyter/pool/pool";74// non-pool version75import launchJupyterKernelNoPool from "@cocalc/jupyter/kernel/launch-kernel";76import { kernels } from "./kernels";77import { getAbsolutePathFromHome } from "@cocalc/jupyter/util/fs";78import type { KernelParams } from "@cocalc/jupyter/types/kernel";79import { redux_name } from "@cocalc/util/redux/name";80import { redux } from "@cocalc/jupyter/redux/app";81import { VERSION } from "@cocalc/jupyter/kernel/version";82import type { NbconvertParams } from "@cocalc/util/jupyter/types";83import type { Client } from "@cocalc/sync/client/types";84import { getLogger } from "@cocalc/backend/logger";85import { base64ToBuffer } from "@cocalc/util/base64";86import { sha1 as misc_node_sha1 } from "@cocalc/backend/misc_node";87import { join } from "path";88import { readFile } from "fs/promises";8990const MAX_KERNEL_SPAWN_TIME = 120 * 1000;9192type State = "failed" | "off" | "spawning" | "starting" | "running" | "closed";9394const logger = getLogger("jupyter:kernel");9596// We make it so nbconvert functionality can be dynamically enabled97// by calling this at runtime. The reason is because some users of98// this code (e.g., remote kernels) don't need to provide nbconvert99// functionality, and our implementation has some heavy dependencies,100// e.g., on a big chunk of the react frontend.101let nbconvert: (opts: NbconvertParams) => Promise<void> = async () => {102throw Error("nbconvert is not enabled");103};104export function initNbconvert(f) {105nbconvert = f;106}107108/*109We set a few extra user-specific options for the environment in which110Sage-based Jupyter kernels run; these are more multi-user friendly.111*/112const SAGE_JUPYTER_ENV = merge(copy(process.env), {113PYTHONUSERBASE: `${process.env.HOME}/.local`,114PYTHON_EGG_CACHE: `${process.env.HOME}/.sage/.python-eggs`,115R_MAKEVARS_USER: `${process.env.HOME}/.sage/R/Makevars.user`,116});117118// Initialize the actions and store for working with a specific119// Jupyter notebook. The syncdb is the syncdoc associated to120// the ipynb file, and this function creates the corresponding121// actions and store, which make it possible to work with this122// notebook.123export async function initJupyterRedux(syncdb: SyncDB, client: Client) {124const project_id = syncdb.project_id;125if (project_id == null) {126throw Error("project_id must be defined");127}128if (syncdb.get_state() == "closed") {129throw Error("syncdb must not be closed");130}131132// This path is the file we will watch for changes and save to, which is in the original133// official ipynb format:134const path = original_path(syncdb.get_path());135logger.debug("initJupyterRedux", path);136137const name = redux_name(project_id, path);138if (redux.getStore(name) != null && redux.getActions(name) != null) {139logger.debug(140"initJupyterRedux",141path,142" -- existing actions, so removing them",143);144// The redux info for this notebook already exists, so don't145// try to make it again without first deleting the existing one.146// Having two at once basically results in things feeling hung.147// This should never happen, but we ensure it148// See https://github.com/sagemathinc/cocalc/issues/4331149await removeJupyterRedux(path, project_id);150}151const store = redux.createStore(name, JupyterStore);152const actions = redux.createActions(name, JupyterActions);153154actions._init(project_id, path, syncdb, store, client);155156syncdb.once("error", (err) =>157logger.error("initJupyterRedux", path, "syncdb ERROR", err),158);159syncdb.once("ready", () =>160logger.debug("initJupyterRedux", path, "syncdb ready"),161);162}163164export async function getJupyterRedux(syncdb: SyncDB) {165const project_id = syncdb.project_id;166const path = original_path(syncdb.get_path());167const name = redux_name(project_id, path);168return { actions: redux.getActions(name), store: redux.getStore(name) };169}170171// Remove the store/actions for a given Jupyter notebook,172// and also close the kernel if it is running.173export async function removeJupyterRedux(174path: string,175project_id: string,176): Promise<void> {177logger.debug("removeJupyterRedux", path);178// if there is a kernel, close it179try {180await kernels.get(path)?.close();181} catch (_err) {182// ignore183}184const name = redux_name(project_id, path);185const actions = redux.getActions(name);186if (actions != null) {187try {188await actions.close();189} catch (err) {190logger.debug(191"removeJupyterRedux",192path,193" WARNING -- issue closing actions",194err,195);196}197}198redux.removeStore(name);199redux.removeActions(name);200}201202export function kernel(opts: KernelParams): JupyterKernel {203return new JupyterKernel(opts.name, opts.path, opts.actions, opts.ulimit);204}205206/*207Jupyter Kernel interface.208209The kernel does *NOT* start up until either spawn is explicitly called, or210code execution is explicitly requested. This makes it possible to211call process_output without spawning an actual kernel.212*/213214// Ensure that the kernels all get killed when the process exits.215nodeCleanup(() => {216for (const kernelPath in kernels.kernels) {217// We do NOT await the close since that's not really218// supported or possible in general.219const { _kernel } = kernels.kernels[kernelPath];220if (_kernel) {221killKernel(_kernel);222}223}224});225226// NOTE: keep JupyterKernel implementation private -- use the kernel function227// above, and the interface defined in types.228export class JupyterKernel229extends EventEmitter230implements JupyterKernelInterface231{232// name -- if undefined that means "no actual Jupyter kernel" (i.e., this JupyterKernel exists233// here, but there is no actual separate real Jupyter kernel process and one won't be created).234// Everything should work, except you can't *spawn* such a kernel.235public name: string | undefined;236237// this is a key:value store used mainly for stdin support right now. NOTHING TO DO WITH REDUX!238public store: any;239240public readonly identity: string = uuid();241242private stderr: string = "";243private ulimit?: string;244private _path: string;245private _actions?: JupyterActions;246private _state: State;247private _directory: string;248private _filename: string;249public _kernel?: SpawnedKernel;250private _kernel_info?: KernelInfo;251public _execute_code_queue: CodeExecutionEmitter[] = [];252public sockets?: JupyterSockets;253private has_ensured_running: boolean = false;254private failedError: string = "";255256constructor(257name: string | undefined,258_path: string,259_actions: JupyterActions | undefined,260ulimit: string | undefined,261) {262super();263264this.ulimit = ulimit;265266this.name = name;267this._path = _path;268this._actions = _actions;269270this.store = key_value_store();271const { head, tail } = path_split(getAbsolutePathFromHome(this._path));272this._directory = head;273this._filename = tail;274this.setState("off");275this._execute_code_queue = [];276if (kernels.get(this._path) !== undefined) {277// This happens when we change the kernel for a given file, e.g.,278// from python2 to python3.279// Obviously, it is important to clean up after the old kernel.280kernels.get(this._path)?.close();281}282kernels.set(this._path, this);283this.setMaxListeners(100);284const dbg = this.dbg("constructor");285dbg("done");286}287288get_path = () => {289return this._path;290};291292// no-op if calling it doesn't change the state.293private setState = (state: State): void => {294// state = 'off' --> 'spawning' --> 'starting' --> 'running' --> 'closed'295// 'failed'296if (this._state == state) return;297this._state = state;298this.emit("state", this._state);299this.emit(this._state); // we *SHOULD* use this everywhere, not above.300};301302private setFailed = (error: string): void => {303this.failedError = error;304this.emit("kernel_error", error);305this.setState("failed");306};307308get_state = (): string => {309return this._state;310};311312private spawnedAlready = false;313spawn = async (spawn_opts?: {314env?: { [key: string]: string };315}): Promise<void> => {316if (this._state === "closed") {317// game over!318throw Error("closed -- kernel spawn");319}320if (!this.name) {321// spawning not allowed.322throw Error("cannot spawn since no kernel is set");323}324if (["running", "starting"].includes(this._state)) {325// Already spawned, so no need to do it again.326return;327}328329if (this.spawnedAlready) {330return;331}332this.spawnedAlready = true;333334this.setState("spawning");335const dbg = this.dbg("spawn");336dbg("spawning kernel...");337338// ****339// CRITICAL: anything added to opts better not be specific340// to the kernel path or it will completely break using a341// pool, which makes things massively slower.342// ****343344const opts: LaunchJupyterOpts = {345env: spawn_opts?.env ?? {},346ulimit: this.ulimit,347};348349try {350const kernelData = await get_kernel_data_by_name(this.name);351// This matches "sage", "sage-x.y", and Sage Python3 ("sage -python -m ipykernel")352if (kernelData.argv[0].startsWith("sage")) {353dbg("setting special environment for Sage kernels");354opts.env = merge(opts.env, SAGE_JUPYTER_ENV);355}356} catch (err) {357dbg(`No kernelData available for ${this.name}`);358}359360// Make cocalc default to the colab renderer for cocalc-jupyter, since361// this one happens to work best for us, and they don't have a custom362// one for us. See https://plot.ly/python/renderers/ and363// https://github.com/sagemathinc/cocalc/issues/4259364opts.env.PLOTLY_RENDERER = "colab";365opts.env.COCALC_JUPYTER_KERNELNAME = this.name;366367// !!! WARNING: do NOT add anything new here that depends on that path!!!!368// Otherwise the pool will switch to falling back to not being used, and369// cocalc would then be massively slower.370// Non-uniform customization.371// launchJupyterKernel is explicitly smart enough to deal with opts.cwd372if (this._directory) {373opts.cwd = this._directory;374}375// launchJupyterKernel is explicitly smart enough to deal with opts.env.COCALC_JUPYTER_FILENAME376opts.env.COCALC_JUPYTER_FILENAME = this._path;377// and launchJupyterKernel is NOT smart enough to deal with anything else!378379try {380if (USE_KERNEL_POOL) {381dbg("launching Jupyter kernel, possibly from pool");382this._kernel = await launchJupyterKernel(this.name, opts);383} else {384dbg("launching Jupyter kernel, NOT using pool");385this._kernel = await launchJupyterKernelNoPool(this.name, opts);386}387dbg("finishing kernel setup");388await this.finishSpawningKernel();389} catch (err) {390dbg(`ERROR spawning kernel - ${err}, ${err.stack}`);391// @ts-ignore392if (this._state == "closed") {393throw Error("closed");394}395// console.trace(err);396this.setFailed(397`**Unable to Spawn Jupyter Kernel:**\n\n${err} \n\nTry this in a terminal to help debug this (or contact support): \`jupyter console --kernel=${this.name}\`\n\nOnce you fix the problem, explicitly restart this kernel to test here.`,398);399}400};401402get_spawned_kernel = () => {403return this._kernel;404};405406get_connection_file = (): string | undefined => {407return this._kernel?.connectionFile;408};409410private finishSpawningKernel = async () => {411const dbg = this.dbg("finishSpawningKernel");412dbg("now finishing spawn of kernel...");413414if (DEBUG) {415this.low_level_dbg();416}417418if (!this._kernel) {419throw Error("_kernel must be defined");420}421this._kernel.spawn.on("error", (err) => {422const error = `${err}\n${this.stderr}`;423dbg("kernel error", error);424this.setFailed(error);425});426427// Track stderr from the subprocess itself (the kernel).428// This is useful for debugging broken kernels, etc., and is especially429// useful since it exists even if the kernel sends nothing over any430// zmq sockets (e.g., due to being very broken).431this.stderr = "";432this._kernel.spawn.stderr.on("data", (data) => {433const s = data.toString();434this.stderr += s;435if (this.stderr.length > 5000) {436// truncate if gets long for some reason -- only the end will437// be useful...438this.stderr = this.stderr.slice(this.stderr.length - 4000);439}440});441442this._kernel.spawn.stdout.on("data", (_data) => {443// NOTE: it is very important to read stdout (and stderr above)444// even if we **totally ignore** the data. Otherwise, exec445// might overflow446// https://github.com/sagemathinc/cocalc/issues/5065447});448449dbg("create main channel...", this._kernel.config);450451// This horrible code is because jupyterSockets will just "hang452// forever" if the kernel doesn't get spawned for some reason.453// (TODO: now that I completely rewrote jupytersockets, we could454// just put a timeout there or better checks? not sure.)455// Thus we do some tests, waiting for at least 2 seconds for there456// to be a pid. This is complicated and ugly, and I'm sorry about that,457// but sometimes that's life.458try {459await until(460() => {461if (this._state != "spawning") {462// gave up463return true;464}465if (this.pid()) {466// there's a process :-)467return true;468}469return false;470},471{ start: 100, max: 100, timeout: 3000 },472);473} catch (err) {474// timed out475this.setFailed(`Failed to start kernel process. ${err}`);476return;477}478if (this._state != "spawning") {479// got canceled480return;481}482const pid = this.pid();483if (!pid) {484throw Error("bug");485}486let success = false;487let gaveUp = false;488setTimeout(() => {489if (!success) {490gaveUp = true;491// it's been 30s and the channels didn't work. Let's give up.492// probably the kernel process just failed.493this.setFailed("Failed to start kernel process -- timeout");494// We can't yet "cancel" createMainChannel itself -- that will require495// rewriting that dependency.496// https://github.com/sagemathinc/cocalc/issues/7040497// I did rewrite that -- so let's revisit this!498}499}, MAX_KERNEL_SPAWN_TIME);500const sockets = await jupyterSockets(this._kernel.config, this.identity);501if (gaveUp) {502process.kill(-pid, 9);503return;504}505this.sockets = sockets;506success = true;507dbg("created main channel");508sockets.on("shell", (mesg) => this.emit("shell", mesg));509sockets.on("stdin", (mesg) => this.emit("stdin", mesg));510sockets.on("iopub", (mesg) => {511this.setState("running");512if (mesg.content != null && mesg.content.execution_state != null) {513this.emit("execution_state", mesg.content.execution_state);514}515516if (mesg.content?.comm_id != null) {517// A comm message, which gets handled directly.518this.process_comm_message_from_kernel(mesg);519return;520}521522if (this._actions?.capture_output_message(mesg)) {523// captured an output message -- do not process further524return;525}526527this.emit("iopub", mesg);528});529530this._kernel.spawn.once("exit", (exit_code, signal) => {531if (this._state === "closed") {532return;533}534this.dbg("kernel_exit")(535`spawned kernel terminated with exit code ${exit_code} (signal=${signal}); stderr=${this.stderr}`,536);537const stderr = this.stderr ? `\n...\n${this.stderr}` : "";538if (signal != null) {539this.setFailed(`Kernel last terminated by signal ${signal}.${stderr}`);540} else if (exit_code != null) {541this.setFailed(`Kernel last exited with code ${exit_code}.${stderr}`);542}543this.close();544});545546if (this._state == "spawning") {547// so we can start sending code execution to the kernel, etc.548this.setState("starting");549}550};551552pid = (): number | undefined => {553return this._kernel?.spawn?.pid;554};555556// Signal should be a string like "SIGINT", "SIGKILL".557// See https://nodejs.org/api/process.html#process_process_kill_pid_signal558signal = (signal: string): void => {559const dbg = this.dbg("signal");560const pid = this.pid();561dbg(`pid=${pid}, signal=${signal}`);562if (!pid) {563return;564}565try {566process.kill(-pid, signal); // negative to signal the process group567this.clear_execute_code_queue();568} catch (err) {569dbg(`error: ${err}`);570}571};572573close = (): void => {574this.dbg("close")();575if (this._state === "closed") {576return;577}578this.signal("SIGKILL");579if (this.sockets != null) {580this.sockets.close();581delete this.sockets;582}583this.setState("closed");584if (this.store != null) {585this.store.close();586delete this.store;587}588const kernel = kernels.get(this._path);589if (kernel != null && kernel.identity === this.identity) {590kernels.delete(this._path);591}592this.removeAllListeners();593if (this._kernel != null) {594killKernel(this._kernel);595delete this._kernel;596delete this.sockets;597}598if (this._execute_code_queue != null) {599for (const runningCode of this._execute_code_queue) {600runningCode.close();601}602this._execute_code_queue = [];603}604};605606// public, since we do use it from some other places...607dbg = (f: string): Function => {608return (...args) => {609//console.log(610logger.debug(611`jupyter.Kernel('${this.name ?? "no kernel"}',path='${612this._path613}').${f}`,614...args,615);616};617};618619low_level_dbg = (): void => {620const dbg = (...args) => logger.silly("low_level_debug", ...args);621dbg("Enabling");622if (this._kernel) {623this._kernel.spawn.all?.on("data", (data) =>624dbg("STDIO", data.toString()),625);626}627};628629ensure_running = reuseInFlight(async (): Promise<void> => {630const dbg = this.dbg("ensure_running");631dbg(this._state);632if (this._state == "closed") {633throw Error("closed so not possible to ensure running");634}635if (this._state == "running") {636return;637}638dbg("spawning");639await this.spawn();640if (this.get_state() != "starting" && this.get_state() != "running") {641return;642}643if (this._kernel?.initCode != null) {644for (const code of this._kernel?.initCode ?? []) {645dbg("initCode ", code);646this.execute_code({ code }, true);647}648}649if (!this.has_ensured_running) {650this.has_ensured_running = true;651}652});653654execute_code = (655opts: ExecOpts,656skipToFront = false,657): CodeExecutionEmitterInterface => {658if (opts.halt_on_error === undefined) {659// if not specified, default to true.660opts.halt_on_error = true;661}662if (this._state === "closed") {663throw Error("closed -- kernel -- execute_code");664}665const code = new CodeExecutionEmitter(this, opts);666if (skipToFront) {667this._execute_code_queue.unshift(code);668} else {669this._execute_code_queue.push(code);670}671if (this._execute_code_queue.length == 1) {672// start it going!673this._process_execute_code_queue();674}675return code;676};677678cancel_execute = (id: string): void => {679if (this._state === "closed") {680return;681}682const dbg = this.dbg(`cancel_execute(id='${id}')`);683if (684this._execute_code_queue == null ||685this._execute_code_queue.length === 0686) {687dbg("nothing to do");688return;689}690if (this._execute_code_queue.length > 1) {691dbg(692"mutate this._execute_code_queue removing everything with the given id",693);694for (let i = this._execute_code_queue.length - 1; i--; i >= 1) {695const code = this._execute_code_queue[i];696if (code.id === id) {697dbg(`removing entry ${i} from queue`);698this._execute_code_queue.splice(i, 1);699code.cancel();700}701}702}703// if the currently running computation involves this id, send an704// interrupt signal (that's the best we can do)705if (this._execute_code_queue[0].id === id) {706dbg("interrupting running computation");707this.signal("SIGINT");708}709};710711_process_execute_code_queue = async (): Promise<void> => {712const dbg = this.dbg("_process_execute_code_queue");713dbg(`state='${this._state}'`);714if (this._state === "closed") {715dbg("closed");716return;717}718if (this._execute_code_queue == null) {719dbg("no queue");720return;721}722const n = this._execute_code_queue.length;723if (n === 0) {724dbg("queue is empty");725return;726}727dbg(`queue has ${n} items; ensure kernel running`);728try {729await this.ensure_running();730await this._execute_code_queue[0].go();731} catch (err) {732dbg(`WARNING: error running kernel -- ${err}`);733for (const code of this._execute_code_queue) {734code.throw_error(err);735}736this._execute_code_queue = [];737}738};739740clear_execute_code_queue = (): void => {741const dbg = this.dbg("_clear_execute_code_queue");742// ensure no future queued up evaluation occurs (currently running743// one will complete and new executions could happen)744if (this._state === "closed") {745dbg("no op since state is closed");746return;747}748if (this._execute_code_queue == null) {749dbg("nothing to do since queue is null");750return;751}752dbg(`clearing queue of size ${this._execute_code_queue.length}`);753const mesg = { done: true };754for (const code_execution_emitter of this._execute_code_queue.slice(1)) {755code_execution_emitter.emit_output(mesg);756code_execution_emitter.close();757}758this._execute_code_queue = [];759};760761// This is like execute_code, but async and returns all the results.762// This is used for unit testing and interactive work at763// the terminal and nbgrader and the stateless api.764execute_code_now = async (opts: ExecOpts): Promise<object[]> => {765this.dbg("execute_code_now")();766if (this._state == "closed") {767throw Error("closed");768}769if (this.failedError) {770throw Error(this.failedError);771}772const output = this.execute_code({ halt_on_error: true, ...opts });773const v: object[] = [];774for await (const mesg of output.iter()) {775v.push(mesg);776}777if (this.failedError) {778// kernel failed during call779throw Error(this.failedError);780}781return v;782};783784private saveBlob = (data: string, type: string) => {785const blobs = this._actions?.blobs;786if (blobs == null) {787throw Error("blob store not available");788}789const buf: Buffer = !type.startsWith("text/")790? Buffer.from(data, "base64")791: Buffer.from(data);792793const sha1: string = misc_node_sha1(buf);794blobs.set(sha1, buf);795return sha1;796};797798process_output = (content: any): void => {799if (this._state === "closed") {800return;801}802const dbg = this.dbg("process_output");803if (content.data == null) {804// No data -- https://github.com/sagemathinc/cocalc/issues/6665805// NO do not do this sort of thing. This is exactly the sort of situation where806// content could be very large, and JSON.stringify could use huge amounts of memory.807// If you need to see this for debugging, uncomment it.808// dbg(trunc(JSON.stringify(content), 300));809// todo: FOR now -- later may remove large stdout, stderr, etc...810// dbg("no data, so nothing to do");811return;812}813814remove_redundant_reps(content.data);815816const saveBlob = (data, type) => {817try {818return this.saveBlob(data, type);819} catch (err) {820dbg(`WARNING: Jupyter blob store not working -- ${err}`);821// i think it'll just send the large data on in the usual way instead822// via the output, instead of using the blob store. It's probably just823// less efficient.824}825};826827let type: string;828for (type of JUPYTER_MIMETYPES) {829if (content.data[type] == null) {830continue;831}832if (833type.split("/")[0] === "image" ||834type === "application/pdf" ||835type === "text/html"836) {837// Store all images and PDF and text/html in a binary blob store, so we don't have838// to involve it in realtime sync. It tends to be large, etc.839const sha1 = saveBlob(content.data[type], type);840if (type == "text/html") {841// NOTE: in general, this may or may not get rendered as an iframe --842// we use iframe for backward compatibility.843content.data["iframe"] = sha1;844delete content.data["text/html"];845} else {846content.data[type] = sha1;847}848}849}850};851852call = async (msg_type: string, content?: any): Promise<any> => {853this.dbg("call")(msg_type);854if (!this.has_ensured_running) {855await this.ensure_running();856}857// Do a paranoid double check anyways...858if (this.sockets == null || this._state == "closed") {859throw Error("not running, so can't call");860}861862const message = {863parent_header: {},864metadata: {},865channel: "shell",866content,867header: {868msg_id: uuid(),869username: "",870session: "",871msg_type: msg_type as MessageType,872version: VERSION,873date: new Date().toISOString(),874},875};876877// Send the message878this.sockets.send(message);879880// Wait for the response that has the right msg_id.881let the_mesg: any = undefined;882const wait_for_response = (cb) => {883const f = (mesg) => {884if (mesg.parent_header.msg_id === message.header.msg_id) {885this.removeListener("shell", f);886this.removeListener("closed", g);887mesg = deep_copy(mesg.content);888if (len(mesg.metadata) === 0) {889delete mesg.metadata;890}891the_mesg = mesg;892cb();893}894};895const g = () => {896this.removeListener("shell", f);897this.removeListener("closed", g);898cb("closed - jupyter - kernel - call");899};900this.on("shell", f);901this.on("closed", g);902};903await callback(wait_for_response);904return the_mesg;905};906907complete = async (opts: { code: any; cursor_pos: any }): Promise<any> => {908const dbg = this.dbg("complete");909dbg(`code='${opts.code}', cursor_pos='${opts.cursor_pos}'`);910return await this.call("complete_request", opts);911};912913introspect = async (opts: {914code: any;915cursor_pos: any;916detail_level: any;917}): Promise<any> => {918const dbg = this.dbg("introspect");919dbg(920`code='${opts.code}', cursor_pos='${opts.cursor_pos}', detail_level=${opts.detail_level}`,921);922return await this.call("inspect_request", opts);923};924925kernel_info = reuseInFlight(async (): Promise<KernelInfo> => {926if (this._kernel_info !== undefined) {927return this._kernel_info;928}929const info = await this.call("kernel_info_request");930info.nodejs_version = process.version;931if (this._actions != null) {932info.start_time = this._actions.store.get("start_time");933}934this._kernel_info = info;935return info;936});937938save_ipynb_file = async (opts?): Promise<void> => {939if (this._actions != null) {940await this._actions.save_ipynb_file(opts);941} else {942throw Error("save_ipynb_file -- ERROR: actions not known");943}944};945946more_output = (id: string): any[] => {947if (id == null) {948throw new Error("must specify id");949}950if (this._actions == null) {951throw new Error("must have redux actions");952}953return this._actions.store.get_more_output(id) ?? [];954};955956nbconvert = reuseInFlight(957async (args: string[], timeout?: number): Promise<void> => {958if (timeout === undefined) {959timeout = 60; // seconds960}961if (!is_array(args)) {962throw new Error("args must be an array");963}964args = copy(args);965args.push("--");966args.push(this._filename);967await nbconvert({968args,969timeout,970directory: this._directory,971});972},973);974975load_attachment = async (path: string): Promise<string> => {976const dbg = this.dbg("load_attachment");977dbg(`path='${path}'`);978if (path[0] !== "/") {979path = join(process.env.HOME ?? "", path);980}981const f = async (): Promise<string> => {982const bs = this.get_blob_store();983if (bs == null) {984throw new Error("BlobStore not available");985}986return await bs.readFile(path);987};988try {989return await retry_until_success({990f,991max_time: 30000,992});993} catch (err) {994unlink(path); // TODO: think through again if this is the right thing to do.995throw err;996}997};998999// This is called by project-actions when exporting the notebook1000// to an ipynb file:1001get_blob_store = (): BlobStoreInterface | undefined => {1002const blobs = this._actions?.blobs;1003if (blobs == null) {1004return;1005}1006const t = new TextDecoder();1007return {1008getBase64: (sha1: string): string | undefined => {1009const buf = blobs.get(sha1);1010if (buf === undefined) {1011return buf;1012}1013return uint8ArrayToBase64(buf);1014},10151016getString: (sha1: string): string | undefined => {1017const buf = blobs.get(sha1);1018if (buf === undefined) {1019return buf;1020}1021return t.decode(buf);1022},10231024readFile: async (path: string): Promise<string> => {1025const buf = await readFile(path);1026const sha1: string = misc_node_sha1(buf);1027blobs.set(sha1, buf);1028return sha1;1029},10301031saveBase64: (data: string) => {1032const buf = Buffer.from(data, "base64");1033const sha1: string = misc_node_sha1(buf);1034blobs.set(sha1, buf);1035return sha1;1036},1037};1038};10391040process_comm_message_from_kernel = (mesg): void => {1041if (this._actions == null) {1042return;1043}1044const dbg = this.dbg("process_comm_message_from_kernel");1045// This can be HUGE so don't print out the entire message; e.g., it could contain1046// massive binary data!1047dbg(mesg.header);1048this._actions.process_comm_message_from_kernel(mesg);1049};10501051ipywidgetsGetBuffer = (1052model_id: string,1053// buffer_path is the string[] *or* the JSON of that.1054buffer_path: string | string[],1055): Buffer | undefined => {1056if (typeof buffer_path != "string") {1057buffer_path = JSON.stringify(buffer_path);1058}1059return this._actions?.syncdb.ipywidgets_state?.getBuffer(1060model_id,1061buffer_path,1062);1063};10641065send_comm_message_to_kernel = ({1066msg_id,1067comm_id,1068target_name,1069data,1070buffers64,1071buffers,1072}: {1073msg_id: string;1074comm_id: string;1075target_name: string;1076data: any;1077buffers64?: string[];1078buffers?: Buffer[];1079}): void => {1080if (this.sockets == null) {1081throw Error("sockets not initialized");1082}1083const dbg = this.dbg("send_comm_message_to_kernel");1084// this is HUGE1085// dbg({ msg_id, comm_id, target_name, data, buffers64 });1086if (buffers64 != null && buffers64.length > 0) {1087buffers = buffers64?.map((x) => Buffer.from(base64ToBuffer(x))) ?? [];1088dbg(1089"buffers lengths = ",1090buffers.map((x) => x.byteLength),1091);1092if (this._actions?.syncdb.ipywidgets_state != null) {1093this._actions.syncdb.ipywidgets_state.setModelBuffers(1094comm_id,1095data.buffer_paths,1096buffers,1097false,1098);1099}1100}11011102const message = {1103parent_header: {},1104metadata: {},1105channel: "shell",1106content: { comm_id, target_name, data },1107header: {1108msg_id,1109username: "user",1110session: "",1111msg_type: "comm_msg" as MessageType,1112version: VERSION,1113date: new Date().toISOString(),1114},1115buffers,1116};11171118// HUGE1119// dbg(message);1120// "The Kernel listens for these messages on the Shell channel,1121// and the Frontend listens for them on the IOPub channel." -- docs1122this.sockets.send(message);1123};11241125chdir = async (path: string): Promise<void> => {1126if (!this.name) return; // no kernel, no current directory1127const dbg = this.dbg("chdir");1128dbg({ path });1129let lang;1130try {1131// using probably cached data, so likely very fast1132lang = await getLanguage(this.name);1133} catch (err) {1134dbg("WARNING ", err);1135const info = await this.kernel_info();1136lang = info.language_info?.name ?? "";1137}11381139const absPath = getAbsolutePathFromHome(path);1140const code = createChdirCommand(lang, absPath);1141// code = '' if no command needed, e.g., for sparql.1142if (code) {1143await this.execute_code_now({ code });1144}1145};1146}11471148export function get_kernel_by_pid(pid: number): JupyterKernel | undefined {1149for (const kernel of Object.values(kernels.kernels)) {1150if (kernel.get_spawned_kernel()?.spawn.pid === pid) {1151return kernel;1152}1153}1154return;1155}115611571158