Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/jupyter/pool/pool.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2023 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6Launching and managing Jupyter kernels in a pool for7performance.8*/910import { reuseInFlight } from "@cocalc/util/reuse-in-flight";11import { delay } from "awaiting";12import json from "json-stable-stringify";13import nodeCleanup from "node-cleanup";14import { unlinkSync } from "node:fs";15import { mkdir, readFile, writeFile } from "node:fs/promises";16import getLogger from "@cocalc/backend/logger";17import createChdirCommand from "@cocalc/util/jupyter-api/chdir-commands";18import createSetenvCommand from "@cocalc/util/jupyter-api/setenv-commands";19import { exists, unlink } from "@cocalc/backend/misc/async-utils-node";20import { getLanguage } from "@cocalc/jupyter/kernel/kernel-data";21import launchJupyterKernelNoPool, {22LaunchJupyterOpts,23SpawnedKernel,24} from "@cocalc/jupyter/kernel/launch-kernel";25import {26getConfig,27getConfigDir,28getLaunchDelayMS,29getSize,30getTimeoutS,31} from "./pool-params";32import { getAbsolutePathFromHome } from "@cocalc/jupyter/util/fs";3334// any kernel name whose lowercase representation contains one of these strings35// will never use the pool. See https://github.com/sagemathinc/cocalc/issues/704136const BLACKLIST = ["julia"];3738function isBlacklisted(kernel: string) {39const s = kernel.toLowerCase();40for (const n of BLACKLIST) {41if (s.includes(n)) {42return true;43}44}45return false;46}4748export type { LaunchJupyterOpts, SpawnedKernel };4950const log = getLogger("jupyter:pool");5152async function writeConfig(content: string): Promise<void> {53try {54// harmless to call if dir already exists55await mkdir(getConfigDir(), { recursive: true });56await writeFile(getConfig(), content);57} catch (error) {58log.debug("Error writeConfig -- ", error);59}60}6162async function readConfig(): Promise<string> {63return (await readFile(getConfig())).toString();64}6566const POOL: { [key: string]: SpawnedKernel[] } = {};67const EXPIRE: { [key: string]: number } = {};6869// Make key for cache that describes this kernel. We explicitly omit70// the parameters that aren't generic and would make it not possible to71// put this in a pool:72// - opts.cwd : current working directory73function makeKey({ name, opts }) {74// Copy of opts but delete opts.cwd and opts.env.COCALC_JUPYTER_FILENAME.75// We don't change opts though!76const opts0 = { ...opts };77delete opts0.cwd;78opts0.env = { ...opts.env };79delete opts0.env.COCALC_JUPYTER_FILENAME;80return json({ name, opts: opts0 });81}8283export default async function launchJupyterKernel(84name: string, // name of the kernel85opts: LaunchJupyterOpts,86size_arg?: number, // min number of these in the pool87timeout_s_arg?: number,88): Promise<SpawnedKernel> {89const size: number = size_arg ?? getSize();90const timeout_s: number = timeout_s_arg ?? getTimeoutS();91if (isBlacklisted(name)) {92log.debug(`not using kernel pool for ${name} because it is blacklisted`);93return await launchJupyterKernelNoPool(name, opts);94}95let language;96try {97language = await getLanguage(name);98} catch (error) {99log.error("Failed to get language of kernel -- not using pool", error);100return await launchJupyterKernelNoPool(name, opts);101}102103let initCode: string[] = [];104if (opts.cwd) {105try {106const absPath = getAbsolutePathFromHome(opts.cwd);107initCode.push(createChdirCommand(language, absPath));108} catch (error) {109log.error("Failed to get chdir command -- not using pool", error);110return await launchJupyterKernelNoPool(name, opts);111}112}113if (opts.env?.COCALC_JUPYTER_FILENAME) {114try {115initCode.push(116createSetenvCommand(117language,118"COCALC_JUPYTER_FILENAME",119opts.env.COCALC_JUPYTER_FILENAME,120),121);122} catch (error) {123log.error("Failed to get setenv command -- not using pool", error);124return await launchJupyterKernelNoPool(name, opts);125}126}127128const key = makeKey({ name, opts });129log.debug("launchJupyterKernel", key);130try {131if (POOL[key] == null) {132POOL[key] = [];133}134if (POOL[key].length > 0) {135const kernel = POOL[key].shift();136replenishPool(key, size, timeout_s);137return { ...(kernel as SpawnedKernel), initCode };138}139const kernel = await launchJupyterKernelNoPool(name, opts);140141// we don't start replenishing the pool until the kernel is initialized,142// since we don't want to slow down creating the kernel itself!143replenishPool(key, size, timeout_s);144145// we do NOT include the initCode here; it's not needed since this kernel146// isn't from the pool.147return kernel;148} catch (error) {149log.error("Failed to launch Jupyter kernel", error);150throw error;151}152}153154// Don't replenish pool for same key twice at same time, or155// pool could end up a little too big.156const replenishPool = reuseInFlight(157async (key: string, size_arg?: number, timeout_s_arg?: number) => {158const { name, opts } = JSON.parse(key);159if (isBlacklisted(name)) {160log.debug(161"replenishPool",162key,163` -- skipping since ${name} is blacklisted`,164);165return;166}167const size: number = size_arg ?? getSize();168const timeout_s: number = timeout_s_arg ?? getTimeoutS();169log.debug("replenishPool", key, { size, timeout_s });170try {171if (POOL[key] == null) {172POOL[key] = [];173}174const pool = POOL[key];175while (pool.length < size) {176log.debug("replenishPool - creating a kernel", key);177writeConfig(key);178await delay(getLaunchDelayMS());179const kernel = await launchJupyterKernelNoPool(name, opts);180pool.push(kernel);181EXPIRE[key] = Math.max(EXPIRE[key] ?? 0, Date.now() + 1000 * timeout_s);182}183} catch (error) {184log.error("Failed to replenish Jupyter kernel pool", error);185throw error;186}187},188{189createKey: (args) => args[0],190},191);192193/*194If there is nothing in the pool, find the newest non-hidden ipynb files in195the current directory or in any immediate subdirectory. It is a JSON file,196and we parse the197198*/199async function fillWhenEmpty() {200for (const key in POOL) {201if (POOL[key].length > 0) {202// nothing to do203return;204}205}206// pool is empty, so possibly put something in it.207try {208// this can throw, e.g., a corrupt file209const key = await readConfig();210if (key) {211// this can definitely throw, e.g., change image and then available kernels change. No need to crash the entire project in that case!212await replenishPool(key);213}214} catch (error) {215console.log("fillWhenEmpty -- A non-fatal error occurred:", error);216log.error("fillWhenEmpty -- A non-fatal error occurred:", error);217}218}219220async function maintainPool() {221log.debug("maintainPool", { EXPIRE });222const now = Date.now();223for (const key in EXPIRE) {224if (EXPIRE[key] < now) {225log.debug("maintainPool -- expiring key=", key);226const pool = POOL[key] ?? [];227while (pool.length > 0) {228const kernel = pool.shift() as SpawnedKernel;229try {230await killKernel(kernel);231} catch (error) {232// won't happen233log.error("Failed to kill Jupyter kernel", error);234}235}236}237}238fillWhenEmpty();239}240241export function init() {242// DO NOT create the pool if we're running under jest testing, since243// then tests don't exit cleanly.244if (process.env.NODE_ENV != "test") {245setInterval(maintainPool, 30 * 1000);246maintainPool();247}248}249250nodeCleanup(() => {251for (const key in POOL) {252for (const kernel of POOL[key]) {253try {254process.kill(-kernel.spawn.pid, "SIGTERM");255unlinkSync(kernel.connectionFile);256} catch (_) {}257}258}259});260261export async function killKernel(kernel: SpawnedKernel) {262kernel.spawn?.removeAllListeners();263try {264if (kernel.spawn?.pid) {265log.debug("killKernel pid=", kernel.spawn.pid);266try {267process.kill(-kernel.spawn.pid, "SIGTERM");268} catch (error) {269log.error("Failed to send SIGTERM to Jupyter kernel", error);270}271}272kernel.spawn?.close?.();273if (await exists(kernel.connectionFile)) {274try {275await unlink(kernel.connectionFile);276} catch (error) {277log.error(278`Failed to delete Jupyter kernel connection file ${kernel.connectionFile}`,279error,280);281}282}283} catch (error) {284log.error("Failed to kill Jupyter kernel", error);285}286}287288289