Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/backend/execute-code.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2020–2024 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45// Execute code in a subprocess.67import { callback } from "awaiting";8import LRU from "lru-cache";9import {10ChildProcessWithoutNullStreams,11spawn,12SpawnOptionsWithoutStdio,13} from "node:child_process";14import { chmod, mkdtemp, rm, writeFile } from "node:fs/promises";15import { tmpdir } from "node:os";16import { join } from "node:path";17import { EventEmitter } from "node:stream";18import shellEscape from "shell-escape";1920import getLogger from "@cocalc/backend/logger";21import { envToInt } from "@cocalc/backend/misc/env-to-number";22import { aggregate } from "@cocalc/util/aggregate";23import { callback_opts } from "@cocalc/util/async-utils";24import { PROJECT_EXEC_DEFAULT_TIMEOUT_S } from "@cocalc/util/consts/project";25import { to_json, trunc, uuid, walltime } from "@cocalc/util/misc";26import {27ExecuteCodeOutputAsync,28ExecuteCodeOutputBlocking,29isExecuteCodeOptionsAsyncGet,30type ExecuteCodeFunctionWithCallback,31type ExecuteCodeOptions,32type ExecuteCodeOptionsAsyncGet,33type ExecuteCodeOptionsWithCallback,34type ExecuteCodeOutput,35} from "@cocalc/util/types/execute-code";36import { Processes } from "@cocalc/util/types/project-info/types";37import { envForSpawn } from "./misc";38import { ProcessStats } from "./process-stats";3940const log = getLogger("execute-code");4142const PREFIX = "COCALC_PROJECT_ASYNC_EXEC";43const ASYNC_CACHE_MAX = envToInt(`${PREFIX}_CACHE_MAX`, 100);44const ASYNC_CACHE_TTL_S = envToInt(`${PREFIX}_TTL_S`, 60 * 60);45// for async execution, every that many secs check up on the child-tree46const MONITOR_INTERVAL_S = envToInt(`${PREFIX}_MONITOR_INTERVAL_S`, 60);47const MONITOR_STATS_LENGTH_MAX = envToInt(48`${PREFIX}_MONITOR_STATS_LENGTH_MAX`,49100,50);5152log.debug("configuration:", {53ASYNC_CACHE_MAX,54ASYNC_CACHE_TTL_S,55MONITOR_INTERVAL_S,56MONITOR_STATS_LENGTH_MAX,57});5859type AsyncAwait = "finished";60const updates = new EventEmitter();61const eventKey = (type: AsyncAwait, job_id: string): string =>62`${type}-${job_id}`;6364const asyncCache = new LRU<string, ExecuteCodeOutputAsync>({65max: ASYNC_CACHE_MAX,66ttl: 1000 * ASYNC_CACHE_TTL_S,67updateAgeOnGet: true,68updateAgeOnHas: true,69});7071function truncStats(obj?: ExecuteCodeOutputAsync) {72if (Array.isArray(obj?.stats)) {73// truncate to $MONITOR_STATS_LENGTH_MAX, by discarding the inital entries74obj.stats = obj.stats.slice(obj.stats.length - MONITOR_STATS_LENGTH_MAX);75}76}7778function asyncCacheUpdate(job_id: string, upd): ExecuteCodeOutputAsync {79const obj = asyncCache.get(job_id);80if (Array.isArray(obj?.stats) && Array.isArray(upd.stats)) {81obj.stats.push(...upd.stats);82truncStats(obj);83}84const next: ExecuteCodeOutputAsync = { ...obj, ...upd };85asyncCache.set(job_id, next);86if (next.status !== "running") {87updates.emit(eventKey("finished", next.job_id), next);88}89return next;90}9192// Async/await interface to executing code.93export async function executeCode(94opts: ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet,95): Promise<ExecuteCodeOutput> {96return await callback_opts(execute_code)(opts);97}9899// Callback interface to executing code.100// This will get deprecated and is only used by some old coffeescript code.101export const execute_code: ExecuteCodeFunctionWithCallback = aggregate(102(opts: ExecuteCodeOptionsWithCallback): void => {103(async () => {104try {105let data = await executeCodeNoAggregate(opts);106if (isExecuteCodeOptionsAsyncGet(opts) && data.type === "async") {107// stats could contain a lot of data. we only return it if requested.108if (opts.async_stats !== true) {109data = { ...data, stats: undefined };110}111}112opts.cb?.(undefined, data);113} catch (err) {114opts.cb?.(err);115}116})();117},118);119120async function clean_up_tmp(tempDir: string | undefined) {121if (tempDir) {122await rm(tempDir, { force: true, recursive: true });123}124}125126// actual implementation, without the aggregate wrapper127async function executeCodeNoAggregate(128opts: ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet,129): Promise<ExecuteCodeOutput> {130if (isExecuteCodeOptionsAsyncGet(opts)) {131const key = opts.async_get;132const cached = asyncCache.get(key);133if (cached != null) {134const { async_await } = opts;135if (cached.status === "running" && async_await === true) {136return new Promise((done) =>137updates.once(eventKey("finished", key), (data) => done(data)),138);139} else {140return cached;141}142} else {143throw new Error(`Async operation '${key}' does not exist.`);144}145}146147opts.args ??= [];148opts.timeout ??= PROJECT_EXEC_DEFAULT_TIMEOUT_S;149opts.ulimit_timeout ??= true;150opts.err_on_exit ??= true;151opts.verbose ??= true;152153if (opts.verbose) {154log.debug(`input: ${opts.command} ${opts.args?.join(" ")}`);155}156const s = opts.command.split(/\s+/g); // split on whitespace157if (opts.args?.length === 0 && s.length > 1) {158opts.bash = true;159} else if (opts.bash && opts.args?.length > 0) {160// Selected bash, but still passed in args.161opts.command = shellEscape([opts.command].concat(opts.args));162opts.args = [];163}164165if (opts.home == null) {166opts.home = process.env.HOME;167}168169if (opts.path == null) {170opts.path = opts.home;171} else if (opts.path[0] !== "/") {172opts.path = opts.home + "/" + opts.path;173}174175let tempDir: string | undefined = undefined;176177try {178let origCommand = "";179if (opts.bash) {180// using bash, which (for better or worse), we do by writing the command to run181// under bash to a file, then executing that file.182let cmd: string;183if (opts.timeout && opts.ulimit_timeout) {184// This ensures that everything involved with this185// command really does die no matter what; it's186// better than killing from outside, since it gets187// all subprocesses since they inherit the limits.188// Leave it to the OS. Note that the argument to ulimit189// must be a whole number.190cmd = `ulimit -t ${Math.ceil(opts.timeout)}\n${opts.command}`;191} else {192cmd = opts.command;193}194195// We write the cmd to a file, and replace the command and args196// with bash and the filename, then do everything below as we would197// have done anyways.198origCommand = opts.command;199opts.command = "bash";200tempDir = await mkdtemp(join(tmpdir(), "cocalc-"));201const tempPath = join(tempDir, "a.sh");202if (opts.verbose) {203log.debug("writing temp file that contains bash program", tempPath);204}205opts.args = [tempPath];206await writeFile(tempPath, cmd);207await chmod(tempPath, 0o700);208}209210if (opts.async_call) {211// we return an ID, the caller can then use it to query the status212opts.max_output ??= 1024 * 1024; // we limit how much we keep in memory, to avoid problems;213opts.timeout ??= PROJECT_EXEC_DEFAULT_TIMEOUT_S;214const job_id = uuid();215const start = Date.now();216const job_config: ExecuteCodeOutputAsync = {217type: "async",218stdout: "",219stderr: "",220exit_code: 0,221start,222job_id,223status: "running",224};225asyncCache.set(job_id, job_config);226227const pid: number | undefined = doSpawn(228{ ...opts, origCommand, job_id, job_config },229async (err, result) => {230log.debug("async/doSpawn returned", { err, result });231try {232const info: Omit<233ExecuteCodeOutputAsync,234"stdout" | "stderr" | "exit_code"235> = {236job_id,237type: "async",238elapsed_s: (Date.now() - start) / 1000,239start,240status: "error",241};242if (err) {243asyncCacheUpdate(job_id, {244stdout: "",245stderr: `${err}`,246exit_code: 1,247...info,248});249} else if (result != null) {250asyncCacheUpdate(job_id, {251...result,252...info,253...{ status: "completed" },254});255} else {256asyncCacheUpdate(job_id, {257stdout: "",258stderr: `No result`,259exit_code: 1,260...info,261});262}263} finally {264await clean_up_tmp(tempDir);265}266},267);268269// pid could be undefined, this means it wasn't possible to spawn a child270return { ...job_config, pid };271} else {272// This is the blocking variant273return await callback(doSpawn, { ...opts, origCommand });274}275} finally {276// do not delete the tempDir in async mode!277if (!opts.async_call) await clean_up_tmp(tempDir);278}279}280281function sumChildren(282procs: Processes,283children: { [pid: number]: number[] },284pid: number,285): { rss: number; pct_cpu: number; cpu_secs: number } | null {286const proc = procs[`${pid}`];287if (proc == null) {288log.debug(`sumChildren: no process ${pid} in proc`);289return null;290}291let rss = proc.stat.mem.rss;292let pct_cpu = proc.cpu.pct;293let cpu_secs = proc.cpu.secs;294for (const ch of children[pid] ?? []) {295const sc = sumChildren(procs, children, ch);296if (sc == null) return null;297rss += sc.rss;298pct_cpu += sc.pct_cpu;299cpu_secs += sc.cpu_secs;300}301return { rss, pct_cpu, cpu_secs };302}303304function doSpawn(305opts: ExecuteCodeOptions & {306origCommand: string;307job_id?: string;308job_config?: ExecuteCodeOutputAsync;309},310cb: (err: string | undefined, result?: ExecuteCodeOutputBlocking) => void,311): number | undefined {312const start_time = walltime();313314if (opts.verbose) {315log.debug(316"spawning",317opts.command,318"with args",319opts.args,320"and timeout",321opts.timeout,322"seconds",323);324}325326const spawnOptions: SpawnOptionsWithoutStdio = {327detached: true, // so we can kill the entire process group if it times out328cwd: opts.path,329...(opts.uid ? { uid: opts.uid } : undefined),330...(opts.gid ? { uid: opts.gid } : undefined),331env: {332...envForSpawn(),333...opts.env,334...(opts.uid != null && opts.home ? { HOME: opts.home } : undefined),335},336};337338// This is the state, which will be captured in closures339let child: ChildProcessWithoutNullStreams;340let ran_code = false;341let stdout = "";342let stderr = "";343let exit_code: undefined | number = undefined;344let stderr_is_done = false;345let stdout_is_done = false;346let killed = false;347let callback_done = false; // set in "finish", which is also called in a timeout348let timer: NodeJS.Timeout | undefined = undefined;349350// periodically check up on the child process tree and record stats351// this also keeps the entry in the cache alive, when the ttl is less than the duration of the execution352async function startMonitor() {353const pid = child.pid;354const { job_id, job_config } = opts;355if (job_id == null || pid == null || job_config == null) return;356const monitor = new ProcessStats();357await monitor.init();358await new Promise((done) => setTimeout(done, 1000));359if (callback_done) return;360361while (true) {362if (callback_done) return;363const { procs } = await monitor.processes(Date.now());364// reconstruct process tree365const children: { [pid: number]: number[] } = {};366for (const p of Object.values(procs)) {367const { pid, ppid } = p;368children[ppid] ??= [];369children[ppid].push(pid);370}371// we only consider those, which are the pid itself or one of its children372const sc = sumChildren(procs, children, pid);373if (sc == null) {374// If the process by PID is no longer known, either the process was killed or there are too many running.375// in any case, stop monitoring and do not update any data.376return;377}378const { rss, pct_cpu, cpu_secs } = sc;379// ?? fallback, in case the cache "forgot" about it380const obj = asyncCache.get(job_id) ?? job_config;381obj.pid = pid;382obj.stats ??= [];383obj.stats.push({384timestamp: Date.now(),385mem_rss: rss,386cpu_pct: pct_cpu,387cpu_secs,388});389truncStats(obj);390asyncCache.set(job_id, obj);391392// initially, we record more frequently, but then we space it out up until the interval (probably 1 minute)393const elapsed_s = (Date.now() - job_config.start) / 1000;394// i.e. after 6 minutes, we check every minute395const next_s = Math.max(1, Math.floor(elapsed_s / 6));396const wait_s = Math.min(next_s, MONITOR_INTERVAL_S);397await new Promise((done) => setTimeout(done, wait_s * 1000));398}399}400401try {402child = spawn(opts.command, opts.args, spawnOptions);403if (child.stdout == null || child.stderr == null) {404// The docs/examples at https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options405// suggest that r.stdout and r.stderr are always defined. However, this is406// definitely NOT the case in edge cases, as we have observed.407cb("error creating child process -- couldn't spawn child process");408return;409}410} catch (error) {411// Yes, spawn can cause this error if there is no memory, and there's no412// event! -- Error: spawn ENOMEM413ran_code = false;414cb(`error ${error}`);415return;416}417418ran_code = true;419420if (opts.verbose) {421log.debug("listening for stdout, stderr and exit_code...");422}423424function update_async(425job_id: string | undefined,426aspect: "stdout" | "stderr" | "pid",427data: string | number,428): ExecuteCodeOutputAsync | undefined {429if (!job_id) return;430// job_config fallback, in case the cache forgot about it431const obj = asyncCache.get(job_id) ?? opts.job_config;432if (obj != null) {433if (aspect === "pid") {434if (typeof data === "number") {435obj.pid = data;436}437} else if (typeof data === "string") {438obj[aspect] = data;439}440asyncCache.set(job_id, obj);441}442return obj;443}444445child.stdout.on("data", (data) => {446data = data.toString();447if (opts.max_output != null) {448if (stdout.length < opts.max_output) {449stdout += data.slice(0, opts.max_output - stdout.length);450}451} else {452stdout += data;453}454update_async(opts.job_id, "stdout", stdout);455});456457child.stderr.on("data", (data) => {458data = data.toString();459if (opts.max_output != null) {460if (stderr.length < opts.max_output) {461stderr += data.slice(0, opts.max_output - stderr.length);462}463} else {464stderr += data;465}466update_async(opts.job_id, "stderr", stderr);467});468469child.stderr.on("end", () => {470stderr_is_done = true;471finish();472});473474child.stdout.on("end", () => {475stdout_is_done = true;476finish();477});478479// Doc: https://nodejs.org/api/child_process.html#event-exit – read it!480// TODO: This is not 100% correct, because in case the process is killed (signal TERM),481// the $code is "null" and a second argument gives the signal (as a string). Hence, after a kill,482// this code below changes the exit code to 0. This could be a special case, though.483// It cannot be null, though, because the "finish" callback assumes that stdout, err and exit are set.484// The local $killed var is only true, if the process has been killed by the timeout – not by another kill.485child.on("exit", (code) => {486exit_code = code ?? 0;487finish();488});489490// This can happen, e.g., "Error: spawn ENOMEM" if there is no memory. Without this handler,491// an unhandled exception gets raised, which is nasty.492// From docs: "Note that the exit-event may or may not fire after an error has occurred. "493child.on("error", (err) => {494if (exit_code == null) {495exit_code = 1;496}497stderr += to_json(err);498// a fundamental issue, we were not running some code499ran_code = false;500finish();501});502503if (opts.job_id && child.pid) {504// we don't await it, it runs until $callback_done is true505update_async(opts.job_id, "pid", child.pid);506startMonitor();507}508509const finish = (err?) => {510if (!killed && (!stdout_is_done || !stderr_is_done || exit_code == null)) {511// it wasn't killed and none of stdout, stderr, and exit_code hasn't been set.512// so we let the rest of them get set before actually finishing up.513return;514}515if (callback_done) {516// we already finished up.517return;518}519// finally finish up – this will also terminate the monitor520callback_done = true;521522if (timer != null) {523clearTimeout(timer);524timer = undefined;525}526527if (opts.verbose && log.isEnabled("debug")) {528log.debug(529"finished exec of",530opts.command,531"took",532walltime(start_time),533"seconds",534);535log.debug({536stdout: trunc(stdout, 512),537stderr: trunc(stderr, 512),538exit_code,539});540}541542if (err) {543cb(err);544} else if (opts.err_on_exit && exit_code != 0) {545const x = opts.origCommand546? opts.origCommand547: `'${opts.command}' (args=${opts.args?.join(" ")})`;548if (opts.job_id) {549cb(stderr);550} else {551// sync behavor, like it was before552cb(553`command '${x}' exited with nonzero code ${exit_code} -- stderr='${trunc(554stderr,5551024,556)}'`,557);558}559} else if (!ran_code) {560// regardless of opts.err_on_exit !561const x = opts.origCommand562? opts.origCommand563: `'${opts.command}' (args=${opts.args?.join(" ")})`;564cb(565`command '${x}' was not able to run -- stderr='${trunc(stderr, 1024)}'`,566);567} else {568if (opts.max_output != null) {569if (stdout.length >= opts.max_output) {570stdout += ` (truncated at ${opts.max_output} characters)`;571}572if (stderr.length >= opts.max_output) {573stderr += ` (truncated at ${opts.max_output} characters)`;574}575}576if (exit_code == null) {577// if exit-code not set, may have been SIGKILL so we set it to 1578exit_code = 1;579}580cb(undefined, { type: "blocking", stdout, stderr, exit_code });581}582};583584if (opts.timeout) {585// setup a timer that will kill the command after a certain amount of time.586const f = () => {587if (child.exitCode != null) {588// command already exited.589return;590}591if (opts.verbose) {592log.debug(593"subprocess did not exit after",594opts.timeout,595"seconds, so killing with SIGKILL",596);597}598try {599killed = true; // we set the kill flag in any case – i.e. process will no longer exist600if (child.pid != null) {601process.kill(-child.pid, "SIGKILL"); // this should kill process group602}603} catch (err) {604// Exceptions can happen, which left uncaught messes up calling code big time.605if (opts.verbose) {606log.debug("process.kill raised an exception", err);607}608}609finish(`killed command '${opts.command} ${opts.args?.join(" ")}'`);610};611timer = setTimeout(f, opts.timeout * 1000);612}613614return child.pid;615}616617618