Path: blob/master/src/packages/backend/execute-code.ts
5539 views
/*1* This file is part of CoCalc: Copyright © 2020–2026 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45// Execute code in a subprocess.67import { callback, delay } from "awaiting";8import LRU from "lru-cache";9import {10ChildProcessWithoutNullStreams,11spawn,12SpawnOptionsWithoutStdio,13} from "node:child_process";14import { chmod, mkdtemp, rm, writeFile } from "node:fs/promises";15import { tmpdir } from "node:os";16import { join } from "node:path";17import { EventEmitter } from "node:stream";18import shellEscape from "shell-escape";1920import getLogger from "@cocalc/backend/logger";21import { envToInt } from "@cocalc/backend/misc/env-to-number";22import { aggregate } from "@cocalc/util/aggregate";23import { callback_opts } from "@cocalc/util/async-utils";24import { PROJECT_EXEC_DEFAULT_TIMEOUT_S } from "@cocalc/util/consts/project";25import {26to_json,27trunc,28trunc_middle,29uuid,30walltime,31} from "@cocalc/util/misc";32import {33ExecuteCodeOutputAsync,34ExecuteCodeOutputBlocking,35isExecuteCodeOptionsAsyncGet,36type ExecuteCodeFunctionWithCallback,37type ExecuteCodeOptions,38type ExecuteCodeOptionsAsyncGet,39type ExecuteCodeOptionsWithCallback,40type ExecuteCodeOutput,41} from "@cocalc/util/types/execute-code";42import { envForSpawn } from "./misc";43import { ProcessStats, sumChildren } from "./process-stats";4445const log = getLogger("execute-code");4647const PREFIX = "COCALC_PROJECT_ASYNC_EXEC";48const ASYNC_CACHE_MAX = envToInt(`${PREFIX}_CACHE_MAX`, 100);49const ASYNC_CACHE_TTL_S = envToInt(`${PREFIX}_TTL_S`, 60 * 60);50// for async execution, every that many secs check up on the child-tree51let MONITOR_INTERVAL_S = envToInt(`${PREFIX}_MONITOR_INTERVAL_S`, 60);5253export function setMonitorIntervalSeconds(n) {54MONITOR_INTERVAL_S = n;55}5657const MONITOR_STATS_LENGTH_MAX = envToInt(58`${PREFIX}_MONITOR_STATS_LENGTH_MAX`,59100,60);6162log.debug("configuration:", {63ASYNC_CACHE_MAX,64ASYNC_CACHE_TTL_S,65MONITOR_INTERVAL_S,66MONITOR_STATS_LENGTH_MAX,67});6869type AsyncAwait = "finished";70const updates = new EventEmitter();71const eventKey = (type: AsyncAwait, job_id: string): string =>72`${type}-${job_id}`;7374export const asyncCache = new LRU<string, ExecuteCodeOutputAsync>({75max: ASYNC_CACHE_MAX,76ttl: 1000 * ASYNC_CACHE_TTL_S,77updateAgeOnGet: true,78updateAgeOnHas: true,79});8081function truncStats(obj?: ExecuteCodeOutputAsync) {82if (Array.isArray(obj?.stats)) {83// truncate to $MONITOR_STATS_LENGTH_MAX, by discarding the inital entries84obj.stats = obj.stats.slice(obj.stats.length - MONITOR_STATS_LENGTH_MAX);85}86}8788function asyncCacheUpdate(job_id: string, upd): ExecuteCodeOutputAsync {89const obj = asyncCache.get(job_id);90if (Array.isArray(obj?.stats) && Array.isArray(upd.stats)) {91obj.stats.push(...upd.stats);92truncStats(obj);93}94const next: ExecuteCodeOutputAsync = { ...obj, ...upd };95asyncCache.set(job_id, next);96if (next.status !== "running") {97updates.emit(eventKey("finished", next.job_id), next);98}99return next;100}101102// Async/await interface to executing code.103export async function executeCode(104opts: ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet,105): Promise<ExecuteCodeOutput> {106return await callback_opts(execute_code)(opts);107}108109// Callback interface to executing code.110// This will get deprecated and is only used by some old coffeescript code.111export const execute_code: ExecuteCodeFunctionWithCallback = aggregate(112(opts: ExecuteCodeOptionsWithCallback): void => {113(async () => {114try {115let data = await executeCodeNoAggregate(opts);116if (isExecuteCodeOptionsAsyncGet(opts) && data.type === "async") {117// stats could contain a lot of data. we only return it if requested.118if (opts.async_stats !== true) {119data = { ...data, stats: undefined };120}121}122opts.cb?.(undefined, data);123} catch (err) {124opts.cb?.(err);125}126})();127},128);129130export async function cleanUpTempDir(tempDir: string | undefined) {131if (tempDir) {132try {133await rm(tempDir, { force: true, recursive: true });134} catch (err) {135console.log("WARNING: issue cleaning up tempDir", err);136}137}138}139140// actual implementation, without the aggregate wrapper141async function executeCodeNoAggregate(142opts: ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet,143): Promise<ExecuteCodeOutput> {144if (isExecuteCodeOptionsAsyncGet(opts)) {145const key = opts.async_get;146const cached = asyncCache.get(key);147if (cached != null) {148const { async_await } = opts;149if (cached.status === "running" && async_await === true) {150return new Promise((done) =>151updates.once(eventKey("finished", key), (data) => done(data)),152);153} else {154return cached;155}156} else {157throw new Error(`Async operation '${key}' does not exist.`);158}159}160161opts.args ??= [];162opts.timeout ??= PROJECT_EXEC_DEFAULT_TIMEOUT_S;163opts.ulimit_timeout ??= true;164opts.err_on_exit ??= true;165opts.verbose ??= false;166167if (opts.verbose) {168log.debug(`input: ${opts.command} ${opts.args?.join(" ")}`);169}170const s = opts.command.split(/\s+/g); // split on whitespace171if (opts.args?.length === 0 && s.length > 1) {172opts.bash = true;173} else if (opts.bash && opts.args?.length > 0) {174// Selected bash, but still passed in args.175opts.command = shellEscape([opts.command].concat(opts.args));176opts.args = [];177}178179if (opts.home == null) {180opts.home = process.env.HOME;181}182183if (opts.path == null) {184opts.path = opts.home;185} else if (opts.path[0] !== "/") {186opts.path = opts.home + "/" + opts.path;187}188if (opts.cwd) {189opts.path = opts.cwd;190}191192let tempDir: string | undefined = undefined;193194try {195let origCommand = "";196if (opts.bash) {197// using bash, which (for better or worse), we do by writing the command to run198// under bash to a file, then executing that file.199let cmd: string;200if (opts.timeout && opts.ulimit_timeout) {201// This ensures that everything involved with this202// command really does die no matter what; it's203// better than killing from outside, since it gets204// all subprocesses since they inherit the limits.205// Leave it to the OS. Note that the argument to ulimit206// must be a whole number.207cmd = `ulimit -t ${Math.ceil(opts.timeout)}\n${opts.command}`;208} else {209cmd = opts.command;210}211212// We write the cmd to a file, and replace the command and args213// with bash and the filename, then do everything below as we would214// have done anyways.215origCommand = opts.command;216opts.command = "bash";217tempDir = await mkdtemp(join(tmpdir(), "cocalc-"));218const tempPath = join(tempDir, "a.sh");219if (opts.verbose) {220log.debug("writing temp file that contains bash program", tempPath);221}222opts.args = [tempPath];223await writeFile(tempPath, cmd);224await chmod(tempPath, 0o700);225}226227if (opts.async_call) {228// we return an ID, the caller can then use it to query the status229opts.max_output ??= 1024 * 1024; // we limit how much we keep in memory, to avoid problems;230opts.timeout ??= PROJECT_EXEC_DEFAULT_TIMEOUT_S;231const job_id = uuid();232const start = Date.now();233const job_config: ExecuteCodeOutputAsync = {234type: "async",235stdout: "",236stderr: "",237exit_code: 0,238start,239job_id,240status: "running",241};242asyncCache.set(job_id, job_config);243244const child = doSpawn(245{ ...opts, origCommand, job_id, job_config },246async (err, result) => {247log.debug("async/doSpawn returned", {248err,249result: {250type: result?.type,251stdout: trunc_middle(result?.stdout),252stderr: trunc_middle(result?.stderr),253exit_code: result?.exit_code,254},255});256try {257const info: Omit<258ExecuteCodeOutputAsync,259"stdout" | "stderr" | "exit_code"260> = {261job_id,262type: "async",263elapsed_s: (Date.now() - start) / 1000,264start,265status: "error",266};267if (err) {268asyncCacheUpdate(job_id, {269stdout: "",270stderr: `${err}`,271exit_code: 1,272...info,273});274} else if (result != null) {275asyncCacheUpdate(job_id, {276...result,277...info,278...{ status: "completed" },279});280} else {281asyncCacheUpdate(job_id, {282stdout: "",283stderr: `No result`,284exit_code: 1,285...info,286});287}288} finally {289await cleanUpTempDir(tempDir);290}291},292);293const pid = child?.pid;294295// pid could be undefined, this means it wasn't possible to spawn a child296return { ...job_config, pid };297} else {298// This is the blocking variant299return await callback(doSpawn, { ...opts, origCommand });300}301} finally {302// do not delete the tempDir in async mode!303if (!opts.async_call) {304await cleanUpTempDir(tempDir);305}306}307}308309function doSpawn(310opts: ExecuteCodeOptions & {311origCommand: string;312job_id?: string;313job_config?: ExecuteCodeOutputAsync;314},315cb?: (err: string | undefined, result?: ExecuteCodeOutputBlocking) => void,316) {317const start_time = walltime();318319if (opts.verbose) {320log.debug(321"spawning",322opts.command,323"with args",324opts.args,325"and timeout",326opts.timeout,327"seconds",328);329}330331const spawnOptions: SpawnOptionsWithoutStdio = {332detached: true, // so we can kill the entire process group if it times out333cwd: opts.path,334...(opts.uid ? { uid: opts.uid } : undefined),335...(opts.gid ? { uid: opts.gid } : undefined),336env: {337...envForSpawn(),338...opts.env,339...(opts.uid != null && opts.home ? { HOME: opts.home } : undefined),340},341};342343// This is the state, which will be captured in closures344let child: ChildProcessWithoutNullStreams;345let ran_code = false;346let stdout = "";347let stderr = "";348let exit_code: undefined | number = undefined;349let stderr_is_done = false;350let stdout_is_done = false;351let killed = false;352let callback_done = false; // set in "finish", which is also called in a timeout353let timer: NodeJS.Timeout | undefined = undefined;354355// periodically check up on the child process tree and record stats356// this also keeps the entry in the cache alive, when the ttl is less than the duration of the execution357async function startMonitor() {358const pid = child.pid;359const { job_id, job_config } = opts;360if (job_id == null || pid == null || job_config == null) return;361const monitor = ProcessStats.getInstance();362await delay(1000);363if (callback_done) return;364365while (true) {366if (callback_done) return;367const { procs } = await monitor.processes(Date.now(), "execute-code");368// reconstruct process tree369const children: { [pid: number]: number[] } = {};370for (const p of Object.values(procs)) {371const { pid, ppid } = p;372children[ppid] ??= [];373children[ppid].push(pid);374}375// we only consider those, which are the pid itself or one of its children376const sc = sumChildren(procs, children, pid);377if (sc == null) {378// If the process by PID is no longer known, either the process was killed or there are too many running.379// in any case, stop monitoring and do not update any data.380return;381}382const { rss, cpu_pct: pct_cpu, cpu_secs } = sc;383// ?? fallback, in case the cache "forgot" about it384const obj = asyncCache.get(job_id) ?? job_config;385obj.pid = pid;386obj.stats ??= [];387const statEntry = {388timestamp: Date.now(),389mem_rss: rss,390cpu_pct: pct_cpu,391cpu_secs,392};393obj.stats.push(statEntry);394truncStats(obj);395asyncCache.set(job_id, obj);396// Stream stats update if callback provided397if (opts.streamCB) {398opts.streamCB({ type: "stats", data: statEntry });399}400401// initially, we record more frequently, but then we space it out up until the interval (probably 1 minute)402const elapsed_s = (Date.now() - job_config.start) / 1000;403// i.e. after 6 minutes, we check every minute404const next_s = Math.max(1, Math.floor(elapsed_s / 6));405const wait_s = Math.min(next_s, MONITOR_INTERVAL_S);406await delay(wait_s * 1000);407}408}409410try {411child = spawn(opts.command, opts.args, spawnOptions);412if (child.stdout == null || child.stderr == null) {413// The docs/examples at https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options414// suggest that r.stdout and r.stderr are always defined. However, this is415// definitely NOT the case in edge cases, as we have observed.416const errorMsg =417"error creating child process -- couldn't spawn child process";418// For streaming, also send error event419if (opts.streamCB && opts.async_call) {420opts.streamCB({ type: "error", data: errorMsg });421}422cb?.(errorMsg);423return;424}425} catch (error) {426// Yes, spawn can cause this error if there is no memory, and there's no427// event! -- Error: spawn ENOMEM428ran_code = false;429const errorMsg = `error ${error}`;430// For streaming, also send error event431if (opts.streamCB && opts.async_call) {432opts.streamCB({ type: "error", data: errorMsg });433}434cb?.(errorMsg);435return;436}437438ran_code = true;439440if (opts.verbose) {441log.debug("listening for stdout, stderr and exit_code...");442}443444// Batching mechanism for streaming to reduce message frequency -- otherwise there could be 100msg/s to process445let streamBatchTimer: NodeJS.Timeout | undefined;446const streamBuffer = { stdout: "", stderr: "" };447448// Send batched stream data449const sendBatchedStream = () => {450if (!opts.streamCB) return;451452const hasStdout = streamBuffer.stdout.length > 0;453const hasStderr = streamBuffer.stderr.length > 0;454455if (hasStdout || hasStderr) {456// Send stdout if available457if (hasStdout) {458opts.streamCB({ type: "stdout", data: streamBuffer.stdout });459streamBuffer.stdout = "";460}461// Send stderr if available462if (hasStderr) {463opts.streamCB({ type: "stderr", data: streamBuffer.stderr });464streamBuffer.stderr = "";465}466}467};468469// Flush any remaining buffered data and cleanup470const flushStreamBuffer = () => {471if (streamBatchTimer) {472clearInterval(streamBatchTimer);473streamBatchTimer = undefined;474}475sendBatchedStream();476};477478// Start batch timer if streaming is enabled, every 100ms479if (opts.streamCB) {480streamBatchTimer = setInterval(sendBatchedStream, 100);481}482483function update_async(484job_id: string | undefined,485aspect: "stdout" | "stderr" | "pid",486data: string | number,487): ExecuteCodeOutputAsync | undefined {488if (!job_id) return;489// job_config fallback, in case the cache forgot about it490const obj = asyncCache.get(job_id) ?? opts.job_config;491if (obj != null) {492if (aspect === "pid") {493if (typeof data === "number") {494obj.pid = data;495}496} else if (typeof data === "string") {497obj[aspect] = data;498}499asyncCache.set(job_id, obj);500}501return obj;502}503504child.stdout.on("data", (data) => {505data = data.toString();506const prevLength = stdout.length;507if (opts.max_output != null) {508if (stdout.length < opts.max_output) {509const newData = data.slice(0, opts.max_output - stdout.length);510stdout += newData;511// Buffer the new portion for batched streaming512if (opts.streamCB && stdout.length > prevLength) {513streamBuffer.stdout += newData;514}515}516} else {517stdout += data;518// Buffer the new data for batched streaming519if (opts.streamCB) {520streamBuffer.stdout += data;521}522}523update_async(opts.job_id, "stdout", stdout);524});525526child.stderr.on("data", (data) => {527data = data.toString();528const prevLength = stderr.length;529if (opts.max_output != null) {530if (stderr.length < opts.max_output) {531const newData = data.slice(0, opts.max_output - stderr.length);532stderr += newData;533// Buffer the new portion for batched streaming534if (opts.streamCB && stderr.length > prevLength) {535streamBuffer.stderr += newData;536}537}538} else {539stderr += data;540// Buffer the new data for batched streaming541if (opts.streamCB) {542streamBuffer.stderr += data;543}544}545update_async(opts.job_id, "stderr", stderr);546});547548child.stderr.on("end", () => {549stderr_is_done = true;550finish();551});552553child.stdout.on("end", () => {554stdout_is_done = true;555finish();556});557558// Doc: https://nodejs.org/api/child_process.html#event-exit – read it!559// TODO: This is not 100% correct, because in case the process is killed (signal TERM),560// the $code is "null" and a second argument gives the signal (as a string). Hence, after a kill,561// this code below changes the exit code to 0. This could be a special case, though.562// It cannot be null, though, because the "finish" callback assumes that stdout, err and exit are set.563// The local $killed var is only true, if the process has been killed by the timeout – not by another kill.564child.on("exit", (code) => {565exit_code = code ?? 0;566finish();567});568569// This can happen, e.g., "Error: spawn ENOMEM" if there is no memory. Without this handler,570// an unhandled exception gets raised, which is nasty.571// From docs: "Note that the exit-event may or may not fire after an error has occurred. "572child.on("error", (err) => {573if (exit_code == null) {574exit_code = 1;575}576stderr += to_json(err);577// a fundamental issue, we were not running some code578ran_code = false;579// For streaming, flush buffer and send error event580if (opts.streamCB && opts.async_call && opts.job_id) {581flushStreamBuffer(); // Flush any buffered data first582const errorResult: ExecuteCodeOutputAsync = {583type: "async",584job_id: opts.job_id,585stdout,586stderr,587exit_code: exit_code ?? 1,588status: "error",589elapsed_s: walltime(start_time),590start: opts.job_config?.start ?? Date.now(),591pid: child.pid,592stats: opts.job_config?.stats,593};594opts.streamCB({ type: "done", data: errorResult });595}596finish();597});598599if (opts.job_id && child.pid) {600// we don't await it, it runs until $callback_done is true601update_async(opts.job_id, "pid", child.pid);602startMonitor();603}604605const finish = (err?) => {606if (!killed && (!stdout_is_done || !stderr_is_done || exit_code == null)) {607// it wasn't killed and none of stdout, stderr, and exit_code hasn't been set.608// so we let the rest of them get set before actually finishing up.609return;610}611if (callback_done) {612// we already finished up.613return;614}615616// Safety check: if we're using streaming and the process has exited but streams aren't done,617// force completion after a short delay to prevent hanging618if (619opts.streamCB &&620exit_code != null &&621(!stdout_is_done || !stderr_is_done)622) {623setTimeout(() => {624if (!callback_done) {625stdout_is_done = true;626stderr_is_done = true;627finish(err);628}629}, 1000); // Wait 1 second for streams to complete630return;631}632// finally finish up – this will also terminate the monitor633callback_done = true;634635// Flush any remaining buffered stream data before finishing636if (opts.streamCB) {637flushStreamBuffer();638}639640if (timer != null) {641clearTimeout(timer);642timer = undefined;643}644645if (opts.verbose && log.isEnabled("debug")) {646log.debug(647"finished exec of",648opts.command,649"took",650walltime(start_time),651"seconds",652);653log.debug({654stdout: trunc(stdout, 512),655stderr: trunc(stderr, 512),656exit_code,657});658}659660// Handle timeout case first - this takes precedence over other error conditions661if (err && killed) {662// Process was killed due to timeout663if (opts.job_id) {664// For async with streaming, send timeout error in done event665if (opts.streamCB) {666const errorResult: ExecuteCodeOutputAsync = {667type: "async",668job_id: opts.job_id,669stdout,670stderr,671exit_code: 1, // Timeout is always an error672status: "error",673elapsed_s: walltime(start_time),674start: opts.job_config?.start ?? Date.now(),675pid: child.pid,676stats: opts.job_config?.stats,677};678opts.streamCB({ type: "done", data: errorResult });679}680// For streaming, don't call cb with error - let the stream handle it681if (!opts.streamCB) {682cb?.(err);683}684} else {685// sync behavior, like it was before686cb?.(err);687}688} else if (err) {689cb?.(err);690} else if (opts.err_on_exit && exit_code != 0) {691const x = opts.origCommand692? opts.origCommand693: `'${opts.command}' (args=${opts.args?.join(" ")})`;694if (opts.job_id) {695// For async with streaming, send error in done event696if (opts.streamCB) {697const errorResult: ExecuteCodeOutputAsync = {698type: "async",699job_id: opts.job_id,700stdout,701stderr,702exit_code: exit_code ?? 1,703status: "error",704elapsed_s: walltime(start_time),705start: opts.job_config?.start ?? Date.now(),706pid: child.pid,707stats: opts.job_config?.stats,708};709opts.streamCB({ type: "done", data: errorResult });710}711// For streaming, don't call cb with error - let the stream handle it712if (!opts.streamCB) {713cb?.(stderr);714}715} else {716// sync behavor, like it was before717cb?.(718`command '${x}' exited with nonzero code ${exit_code} -- stderr='${trunc(719stderr,7201024,721)}'`,722);723}724} else if (!ran_code) {725// regardless of opts.err_on_exit !726const x = opts.origCommand727? opts.origCommand728: `'${opts.command}' (args=${opts.args?.join(" ")})`;729cb?.(730`command '${x}' was not able to run -- stderr='${trunc(stderr, 1024)}'`,731);732} else {733if (opts.max_output != null) {734if (stdout.length >= opts.max_output) {735stdout += ` (truncated at ${opts.max_output} characters)`;736}737if (stderr.length >= opts.max_output) {738stderr += ` (truncated at ${opts.max_output} characters)`;739}740}741if (exit_code == null) {742// if exit-code not set, may have been SIGKILL so we set it to 1743exit_code = 1;744}745const result = { type: "blocking" as const, stdout, stderr, exit_code };746// For async with streaming, send the final done event747if (opts.streamCB && opts.async_call) {748const finalResult: ExecuteCodeOutputAsync = {749type: "async",750job_id: opts.job_id!,751stdout,752stderr,753exit_code: exit_code ?? 0,754status: "completed",755elapsed_s: walltime(start_time),756start: opts.job_config?.start ?? Date.now(),757pid: child.pid,758stats: opts.job_config?.stats,759};760opts.streamCB({ type: "done", data: finalResult });761}762cb?.(undefined, result);763}764};765766if (opts.timeout) {767// setup a timer that will kill the command after a certain amount of time.768const f = () => {769if (child.exitCode != null) {770// command already exited.771return;772}773if (opts.verbose) {774log.debug(775"subprocess did not exit after",776opts.timeout,777"seconds, so killing with SIGKILL",778);779}780try {781killed = true; // we set the kill flag in any case – i.e. process will no longer exist782if (child.pid != null) {783process.kill(-child.pid, "SIGKILL"); // this should kill process group784}785} catch (err) {786// Exceptions can happen, which left uncaught messes up calling code big time.787if (opts.verbose) {788log.debug("process.kill raised an exception", err);789}790}791finish(`killed command '${opts.command} ${opts.args?.join(" ")}'`);792};793timer = setTimeout(f, opts.timeout * 1000);794}795796return child;797}798799800