Path: blob/master/src/packages/jupyter/stateless-api/kernel.ts
5796 views
import { kernel as createKernel } from "@cocalc/jupyter/kernel";1import type { JupyterKernelInterface } from "@cocalc/jupyter/types/project-interface";2import { run_cell } from "@cocalc/jupyter/nbgrader/jupyter-run";3import { mkdtemp } from "fs/promises";4import { rmSync } from "fs";5import { tmpdir } from "os";6import { join } from "path";7import getLogger from "@cocalc/backend/logger";8import { reuseInFlight } from "@cocalc/util/reuse-in-flight";9import { type Limits } from "@cocalc/util/jupyter/nbgrader-types";10import { closeAll as closeAllLaunches } from "@cocalc/jupyter/kernel/launch-kernel";1112const log = getLogger("jupyter:stateless-api:kernel");1314export const DEFAULT_POOL_SIZE = 2;15const DEFAULT_POOL_TIMEOUT_S = 3600;1617// When we idle timeout we always keep at least this many kernels around. We don't go to 0.18const MIN_POOL_SIZE = 1;1920// -n = max open files21// -f = max bytes allowed to *write* to disk22// -t = max cputime is 30 seconds23// -v = max virtual memory usage to 3GB24const DEFAULT_ULIMIT = "-n 1000 -f 10485760 -t 30 -v 3000000";2526export default class Kernel {27private static pools: { [kernelName: string]: Kernel[] } = {};28private static last_active: { [kernelName: string]: number } = {};29private static ulimit: { [kernelName: string]: string } = {};3031private kernel?: JupyterKernelInterface;32private tempDir?: string;33private state?: "closed" | undefined = undefined;3435constructor(private kernelName: string) {36kernels.push(this);37}3839private static getPool(kernelName: string) {40let pool = Kernel.pools[kernelName];41if (pool == null) {42pool = Kernel.pools[kernelName] = [];43}44return pool;45}4647// changing ulimit only impacts NEWLY **created** kernels.48static setUlimit(kernelName: string, ulimit: string) {49Kernel.ulimit[kernelName] = ulimit;50}5152// Set a timeout for a given kernel pool (for a specifically named kernel)53// to determine when to clear it if no requests have been made.54private static setIdleTimeout(kernelName: string, timeout_s: number) {55if (!timeout_s) {56// 0 = no timeout57return;58}59const now = Date.now();60Kernel.last_active[kernelName] = now;61setTimeout(62() => {63if (Kernel.last_active[kernelName] > now) {64// kernel was requested after now.65return;66}67// No recent request for kernelName.68// Keep at least MIN_POOL_SIZE in Kernel.pools[kernelName]. I.e.,69// instead of closing and deleting everything, we just want to70// shrink the pool to MIN_POOL_SIZE.71// no request for kernelName, so we clear them from the pool72const poolToShrink = Kernel.pools[kernelName] ?? [];73if (poolToShrink.length > MIN_POOL_SIZE) {74// check if pool needs shrinking75// calculate how many to close76const numToClose = poolToShrink.length - MIN_POOL_SIZE;77for (let i = 0; i < numToClose; i++) {78poolToShrink[i].close(); // close oldest kernels first79}80// update pool to have only the most recent kernels81Kernel.pools[kernelName] = poolToShrink.slice(numToClose);82}83},84(timeout_s ?? DEFAULT_POOL_TIMEOUT_S) * 1000,85);86}8788static async getFromPool(89kernelName: string,90{91size = DEFAULT_POOL_SIZE,92timeout_s = DEFAULT_POOL_TIMEOUT_S,93}: { size?: number; timeout_s?: number } = {},94): Promise<Kernel> {95if (size <= 0) {96// not using a pool -- just create and return kernel97const k = new Kernel(kernelName);98await k.init();99return k;100}101this.setIdleTimeout(kernelName, timeout_s);102const pool = Kernel.getPool(kernelName);103let i = 1;104while (pool.length <= size) {105// <= since going to remove one below106const k = new Kernel(kernelName);107pool.push(k);108// we cause this kernel to get init'd soon, but NOT immediately, since starting109// several at once just makes them all take much longer exactly when the user110// most wants to use their new kernel111setTimeout(112async () => {113try {114await k.init();115} catch (err) {116log.debug("Failed to pre-init Jupyter kernel -- ", kernelName, err);117}118},119// stagger startup by a few seconds, though kernels that are needed will start ASAP.120Math.random() * 3000 * i,121);122i += 1;123}124const k = pool.shift() as Kernel;125// it's ok to call again due to reuseInFlight and that no-op after init.126await k.init();127return k;128}129130private init = reuseInFlight(async () => {131if (this.kernel != null || this.state == "closed") {132// already initialized133return;134}135this.tempDir = await mkdtemp(join(tmpdir(), "cocalc"));136if (this.state == "closed") {137this.close();138return;139}140const path = `${this.tempDir}/execute.ipynb`;141this.kernel = createKernel({142name: this.kernelName,143path,144ulimit: Kernel.ulimit[this.kernelName] ?? DEFAULT_ULIMIT,145});146await this.kernel.ensure_running();147if (this.state == "closed") {148this.close();149return;150}151await this.kernel.execute_code_now({ code: "" });152if (this.state == "closed") {153this.close();154return;155}156});157158// empty all pools and do not refill159static closeAll() {160for (const kernelName in Kernel.pools) {161for (const kernel of Kernel.pools[kernelName]) {162kernel.close();163}164}165Kernel.pools = {};166Kernel.last_active = {};167}168169execute = async (170code: string,171limits: Partial<Limits> = {172timeout_ms: 30000,173timeout_ms_per_cell: 30000,174max_output: 5000000,175max_output_per_cell: 1000000,176start_time: Date.now(),177total_output: 0,178},179) => {180if (this.kernel == null) {181throw Error("kernel already closed");182}183184if (limits.total_output == null) {185limits.total_output = 0;186}187const cell = { cell_type: "code", source: [code], outputs: [] };188await run_cell(this.kernel, limits, cell);189return cell.outputs;190};191192chdir = async (path: string) => {193if (this.kernel == null) return;194await this.kernel.chdir(path);195};196197// this is not used anywhere198returnToPool = async (): Promise<void> => {199if (this.kernel == null) {200throw Error("kernel already closed");201}202const pool = Kernel.getPool(this.kernelName);203pool.push(this);204};205206close = () => {207this.state = "closed";208try {209this.kernel?.close();210} catch (err) {211log.warn("Error closing kernel", err);212} finally {213delete this.kernel;214}215if (this.tempDir) {216try {217rmSync(this.tempDir, { force: true, recursive: true });218} catch (err) {219log.warn("Error cleaning up temporary directory", err);220} finally {221delete this.tempDir;222}223}224};225}226227// Clean up after any kernel created here228const kernels: Kernel[] = [];229function closeAll() {230closeAllLaunches();231for (const kernel of kernels) {232kernel.close();233}234kernels.length = 0;235}236237process.once("exit", () => {238closeAll();239});240241["SIGINT", "SIGTERM", "SIGQUIT"].forEach((sig) => {242process.once(sig, () => {243closeAll();244});245});246247248