Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/execute-code.ts
5539 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020–2026 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
// Execute code in a subprocess.
7
8
import { callback, delay } from "awaiting";
9
import LRU from "lru-cache";
10
import {
11
ChildProcessWithoutNullStreams,
12
spawn,
13
SpawnOptionsWithoutStdio,
14
} from "node:child_process";
15
import { chmod, mkdtemp, rm, writeFile } from "node:fs/promises";
16
import { tmpdir } from "node:os";
17
import { join } from "node:path";
18
import { EventEmitter } from "node:stream";
19
import shellEscape from "shell-escape";
20
21
import getLogger from "@cocalc/backend/logger";
22
import { envToInt } from "@cocalc/backend/misc/env-to-number";
23
import { aggregate } from "@cocalc/util/aggregate";
24
import { callback_opts } from "@cocalc/util/async-utils";
25
import { PROJECT_EXEC_DEFAULT_TIMEOUT_S } from "@cocalc/util/consts/project";
26
import {
27
to_json,
28
trunc,
29
trunc_middle,
30
uuid,
31
walltime,
32
} from "@cocalc/util/misc";
33
import {
34
ExecuteCodeOutputAsync,
35
ExecuteCodeOutputBlocking,
36
isExecuteCodeOptionsAsyncGet,
37
type ExecuteCodeFunctionWithCallback,
38
type ExecuteCodeOptions,
39
type ExecuteCodeOptionsAsyncGet,
40
type ExecuteCodeOptionsWithCallback,
41
type ExecuteCodeOutput,
42
} from "@cocalc/util/types/execute-code";
43
import { envForSpawn } from "./misc";
44
import { ProcessStats, sumChildren } from "./process-stats";
45
46
const log = getLogger("execute-code");
47
48
const PREFIX = "COCALC_PROJECT_ASYNC_EXEC";
49
const ASYNC_CACHE_MAX = envToInt(`${PREFIX}_CACHE_MAX`, 100);
50
const ASYNC_CACHE_TTL_S = envToInt(`${PREFIX}_TTL_S`, 60 * 60);
51
// for async execution, every that many secs check up on the child-tree
52
let MONITOR_INTERVAL_S = envToInt(`${PREFIX}_MONITOR_INTERVAL_S`, 60);
53
54
export function setMonitorIntervalSeconds(n) {
55
MONITOR_INTERVAL_S = n;
56
}
57
58
const MONITOR_STATS_LENGTH_MAX = envToInt(
59
`${PREFIX}_MONITOR_STATS_LENGTH_MAX`,
60
100,
61
);
62
63
log.debug("configuration:", {
64
ASYNC_CACHE_MAX,
65
ASYNC_CACHE_TTL_S,
66
MONITOR_INTERVAL_S,
67
MONITOR_STATS_LENGTH_MAX,
68
});
69
70
type AsyncAwait = "finished";
71
const updates = new EventEmitter();
72
const eventKey = (type: AsyncAwait, job_id: string): string =>
73
`${type}-${job_id}`;
74
75
export const asyncCache = new LRU<string, ExecuteCodeOutputAsync>({
76
max: ASYNC_CACHE_MAX,
77
ttl: 1000 * ASYNC_CACHE_TTL_S,
78
updateAgeOnGet: true,
79
updateAgeOnHas: true,
80
});
81
82
function truncStats(obj?: ExecuteCodeOutputAsync) {
83
if (Array.isArray(obj?.stats)) {
84
// truncate to $MONITOR_STATS_LENGTH_MAX, by discarding the inital entries
85
obj.stats = obj.stats.slice(obj.stats.length - MONITOR_STATS_LENGTH_MAX);
86
}
87
}
88
89
function asyncCacheUpdate(job_id: string, upd): ExecuteCodeOutputAsync {
90
const obj = asyncCache.get(job_id);
91
if (Array.isArray(obj?.stats) && Array.isArray(upd.stats)) {
92
obj.stats.push(...upd.stats);
93
truncStats(obj);
94
}
95
const next: ExecuteCodeOutputAsync = { ...obj, ...upd };
96
asyncCache.set(job_id, next);
97
if (next.status !== "running") {
98
updates.emit(eventKey("finished", next.job_id), next);
99
}
100
return next;
101
}
102
103
// Async/await interface to executing code.
104
export async function executeCode(
105
opts: ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet,
106
): Promise<ExecuteCodeOutput> {
107
return await callback_opts(execute_code)(opts);
108
}
109
110
// Callback interface to executing code.
111
// This will get deprecated and is only used by some old coffeescript code.
112
export const execute_code: ExecuteCodeFunctionWithCallback = aggregate(
113
(opts: ExecuteCodeOptionsWithCallback): void => {
114
(async () => {
115
try {
116
let data = await executeCodeNoAggregate(opts);
117
if (isExecuteCodeOptionsAsyncGet(opts) && data.type === "async") {
118
// stats could contain a lot of data. we only return it if requested.
119
if (opts.async_stats !== true) {
120
data = { ...data, stats: undefined };
121
}
122
}
123
opts.cb?.(undefined, data);
124
} catch (err) {
125
opts.cb?.(err);
126
}
127
})();
128
},
129
);
130
131
export async function cleanUpTempDir(tempDir: string | undefined) {
132
if (tempDir) {
133
try {
134
await rm(tempDir, { force: true, recursive: true });
135
} catch (err) {
136
console.log("WARNING: issue cleaning up tempDir", err);
137
}
138
}
139
}
140
141
// actual implementation, without the aggregate wrapper
142
async function executeCodeNoAggregate(
143
opts: ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet,
144
): Promise<ExecuteCodeOutput> {
145
if (isExecuteCodeOptionsAsyncGet(opts)) {
146
const key = opts.async_get;
147
const cached = asyncCache.get(key);
148
if (cached != null) {
149
const { async_await } = opts;
150
if (cached.status === "running" && async_await === true) {
151
return new Promise((done) =>
152
updates.once(eventKey("finished", key), (data) => done(data)),
153
);
154
} else {
155
return cached;
156
}
157
} else {
158
throw new Error(`Async operation '${key}' does not exist.`);
159
}
160
}
161
162
opts.args ??= [];
163
opts.timeout ??= PROJECT_EXEC_DEFAULT_TIMEOUT_S;
164
opts.ulimit_timeout ??= true;
165
opts.err_on_exit ??= true;
166
opts.verbose ??= false;
167
168
if (opts.verbose) {
169
log.debug(`input: ${opts.command} ${opts.args?.join(" ")}`);
170
}
171
const s = opts.command.split(/\s+/g); // split on whitespace
172
if (opts.args?.length === 0 && s.length > 1) {
173
opts.bash = true;
174
} else if (opts.bash && opts.args?.length > 0) {
175
// Selected bash, but still passed in args.
176
opts.command = shellEscape([opts.command].concat(opts.args));
177
opts.args = [];
178
}
179
180
if (opts.home == null) {
181
opts.home = process.env.HOME;
182
}
183
184
if (opts.path == null) {
185
opts.path = opts.home;
186
} else if (opts.path[0] !== "/") {
187
opts.path = opts.home + "/" + opts.path;
188
}
189
if (opts.cwd) {
190
opts.path = opts.cwd;
191
}
192
193
let tempDir: string | undefined = undefined;
194
195
try {
196
let origCommand = "";
197
if (opts.bash) {
198
// using bash, which (for better or worse), we do by writing the command to run
199
// under bash to a file, then executing that file.
200
let cmd: string;
201
if (opts.timeout && opts.ulimit_timeout) {
202
// This ensures that everything involved with this
203
// command really does die no matter what; it's
204
// better than killing from outside, since it gets
205
// all subprocesses since they inherit the limits.
206
// Leave it to the OS. Note that the argument to ulimit
207
// must be a whole number.
208
cmd = `ulimit -t ${Math.ceil(opts.timeout)}\n${opts.command}`;
209
} else {
210
cmd = opts.command;
211
}
212
213
// We write the cmd to a file, and replace the command and args
214
// with bash and the filename, then do everything below as we would
215
// have done anyways.
216
origCommand = opts.command;
217
opts.command = "bash";
218
tempDir = await mkdtemp(join(tmpdir(), "cocalc-"));
219
const tempPath = join(tempDir, "a.sh");
220
if (opts.verbose) {
221
log.debug("writing temp file that contains bash program", tempPath);
222
}
223
opts.args = [tempPath];
224
await writeFile(tempPath, cmd);
225
await chmod(tempPath, 0o700);
226
}
227
228
if (opts.async_call) {
229
// we return an ID, the caller can then use it to query the status
230
opts.max_output ??= 1024 * 1024; // we limit how much we keep in memory, to avoid problems;
231
opts.timeout ??= PROJECT_EXEC_DEFAULT_TIMEOUT_S;
232
const job_id = uuid();
233
const start = Date.now();
234
const job_config: ExecuteCodeOutputAsync = {
235
type: "async",
236
stdout: "",
237
stderr: "",
238
exit_code: 0,
239
start,
240
job_id,
241
status: "running",
242
};
243
asyncCache.set(job_id, job_config);
244
245
const child = doSpawn(
246
{ ...opts, origCommand, job_id, job_config },
247
async (err, result) => {
248
log.debug("async/doSpawn returned", {
249
err,
250
result: {
251
type: result?.type,
252
stdout: trunc_middle(result?.stdout),
253
stderr: trunc_middle(result?.stderr),
254
exit_code: result?.exit_code,
255
},
256
});
257
try {
258
const info: Omit<
259
ExecuteCodeOutputAsync,
260
"stdout" | "stderr" | "exit_code"
261
> = {
262
job_id,
263
type: "async",
264
elapsed_s: (Date.now() - start) / 1000,
265
start,
266
status: "error",
267
};
268
if (err) {
269
asyncCacheUpdate(job_id, {
270
stdout: "",
271
stderr: `${err}`,
272
exit_code: 1,
273
...info,
274
});
275
} else if (result != null) {
276
asyncCacheUpdate(job_id, {
277
...result,
278
...info,
279
...{ status: "completed" },
280
});
281
} else {
282
asyncCacheUpdate(job_id, {
283
stdout: "",
284
stderr: `No result`,
285
exit_code: 1,
286
...info,
287
});
288
}
289
} finally {
290
await cleanUpTempDir(tempDir);
291
}
292
},
293
);
294
const pid = child?.pid;
295
296
// pid could be undefined, this means it wasn't possible to spawn a child
297
return { ...job_config, pid };
298
} else {
299
// This is the blocking variant
300
return await callback(doSpawn, { ...opts, origCommand });
301
}
302
} finally {
303
// do not delete the tempDir in async mode!
304
if (!opts.async_call) {
305
await cleanUpTempDir(tempDir);
306
}
307
}
308
}
309
310
function doSpawn(
311
opts: ExecuteCodeOptions & {
312
origCommand: string;
313
job_id?: string;
314
job_config?: ExecuteCodeOutputAsync;
315
},
316
cb?: (err: string | undefined, result?: ExecuteCodeOutputBlocking) => void,
317
) {
318
const start_time = walltime();
319
320
if (opts.verbose) {
321
log.debug(
322
"spawning",
323
opts.command,
324
"with args",
325
opts.args,
326
"and timeout",
327
opts.timeout,
328
"seconds",
329
);
330
}
331
332
const spawnOptions: SpawnOptionsWithoutStdio = {
333
detached: true, // so we can kill the entire process group if it times out
334
cwd: opts.path,
335
...(opts.uid ? { uid: opts.uid } : undefined),
336
...(opts.gid ? { uid: opts.gid } : undefined),
337
env: {
338
...envForSpawn(),
339
...opts.env,
340
...(opts.uid != null && opts.home ? { HOME: opts.home } : undefined),
341
},
342
};
343
344
// This is the state, which will be captured in closures
345
let child: ChildProcessWithoutNullStreams;
346
let ran_code = false;
347
let stdout = "";
348
let stderr = "";
349
let exit_code: undefined | number = undefined;
350
let stderr_is_done = false;
351
let stdout_is_done = false;
352
let killed = false;
353
let callback_done = false; // set in "finish", which is also called in a timeout
354
let timer: NodeJS.Timeout | undefined = undefined;
355
356
// periodically check up on the child process tree and record stats
357
// this also keeps the entry in the cache alive, when the ttl is less than the duration of the execution
358
async function startMonitor() {
359
const pid = child.pid;
360
const { job_id, job_config } = opts;
361
if (job_id == null || pid == null || job_config == null) return;
362
const monitor = ProcessStats.getInstance();
363
await delay(1000);
364
if (callback_done) return;
365
366
while (true) {
367
if (callback_done) return;
368
const { procs } = await monitor.processes(Date.now(), "execute-code");
369
// reconstruct process tree
370
const children: { [pid: number]: number[] } = {};
371
for (const p of Object.values(procs)) {
372
const { pid, ppid } = p;
373
children[ppid] ??= [];
374
children[ppid].push(pid);
375
}
376
// we only consider those, which are the pid itself or one of its children
377
const sc = sumChildren(procs, children, pid);
378
if (sc == null) {
379
// If the process by PID is no longer known, either the process was killed or there are too many running.
380
// in any case, stop monitoring and do not update any data.
381
return;
382
}
383
const { rss, cpu_pct: pct_cpu, cpu_secs } = sc;
384
// ?? fallback, in case the cache "forgot" about it
385
const obj = asyncCache.get(job_id) ?? job_config;
386
obj.pid = pid;
387
obj.stats ??= [];
388
const statEntry = {
389
timestamp: Date.now(),
390
mem_rss: rss,
391
cpu_pct: pct_cpu,
392
cpu_secs,
393
};
394
obj.stats.push(statEntry);
395
truncStats(obj);
396
asyncCache.set(job_id, obj);
397
// Stream stats update if callback provided
398
if (opts.streamCB) {
399
opts.streamCB({ type: "stats", data: statEntry });
400
}
401
402
// initially, we record more frequently, but then we space it out up until the interval (probably 1 minute)
403
const elapsed_s = (Date.now() - job_config.start) / 1000;
404
// i.e. after 6 minutes, we check every minute
405
const next_s = Math.max(1, Math.floor(elapsed_s / 6));
406
const wait_s = Math.min(next_s, MONITOR_INTERVAL_S);
407
await delay(wait_s * 1000);
408
}
409
}
410
411
try {
412
child = spawn(opts.command, opts.args, spawnOptions);
413
if (child.stdout == null || child.stderr == null) {
414
// The docs/examples at https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options
415
// suggest that r.stdout and r.stderr are always defined. However, this is
416
// definitely NOT the case in edge cases, as we have observed.
417
const errorMsg =
418
"error creating child process -- couldn't spawn child process";
419
// For streaming, also send error event
420
if (opts.streamCB && opts.async_call) {
421
opts.streamCB({ type: "error", data: errorMsg });
422
}
423
cb?.(errorMsg);
424
return;
425
}
426
} catch (error) {
427
// Yes, spawn can cause this error if there is no memory, and there's no
428
// event! -- Error: spawn ENOMEM
429
ran_code = false;
430
const errorMsg = `error ${error}`;
431
// For streaming, also send error event
432
if (opts.streamCB && opts.async_call) {
433
opts.streamCB({ type: "error", data: errorMsg });
434
}
435
cb?.(errorMsg);
436
return;
437
}
438
439
ran_code = true;
440
441
if (opts.verbose) {
442
log.debug("listening for stdout, stderr and exit_code...");
443
}
444
445
// Batching mechanism for streaming to reduce message frequency -- otherwise there could be 100msg/s to process
446
let streamBatchTimer: NodeJS.Timeout | undefined;
447
const streamBuffer = { stdout: "", stderr: "" };
448
449
// Send batched stream data
450
const sendBatchedStream = () => {
451
if (!opts.streamCB) return;
452
453
const hasStdout = streamBuffer.stdout.length > 0;
454
const hasStderr = streamBuffer.stderr.length > 0;
455
456
if (hasStdout || hasStderr) {
457
// Send stdout if available
458
if (hasStdout) {
459
opts.streamCB({ type: "stdout", data: streamBuffer.stdout });
460
streamBuffer.stdout = "";
461
}
462
// Send stderr if available
463
if (hasStderr) {
464
opts.streamCB({ type: "stderr", data: streamBuffer.stderr });
465
streamBuffer.stderr = "";
466
}
467
}
468
};
469
470
// Flush any remaining buffered data and cleanup
471
const flushStreamBuffer = () => {
472
if (streamBatchTimer) {
473
clearInterval(streamBatchTimer);
474
streamBatchTimer = undefined;
475
}
476
sendBatchedStream();
477
};
478
479
// Start batch timer if streaming is enabled, every 100ms
480
if (opts.streamCB) {
481
streamBatchTimer = setInterval(sendBatchedStream, 100);
482
}
483
484
function update_async(
485
job_id: string | undefined,
486
aspect: "stdout" | "stderr" | "pid",
487
data: string | number,
488
): ExecuteCodeOutputAsync | undefined {
489
if (!job_id) return;
490
// job_config fallback, in case the cache forgot about it
491
const obj = asyncCache.get(job_id) ?? opts.job_config;
492
if (obj != null) {
493
if (aspect === "pid") {
494
if (typeof data === "number") {
495
obj.pid = data;
496
}
497
} else if (typeof data === "string") {
498
obj[aspect] = data;
499
}
500
asyncCache.set(job_id, obj);
501
}
502
return obj;
503
}
504
505
child.stdout.on("data", (data) => {
506
data = data.toString();
507
const prevLength = stdout.length;
508
if (opts.max_output != null) {
509
if (stdout.length < opts.max_output) {
510
const newData = data.slice(0, opts.max_output - stdout.length);
511
stdout += newData;
512
// Buffer the new portion for batched streaming
513
if (opts.streamCB && stdout.length > prevLength) {
514
streamBuffer.stdout += newData;
515
}
516
}
517
} else {
518
stdout += data;
519
// Buffer the new data for batched streaming
520
if (opts.streamCB) {
521
streamBuffer.stdout += data;
522
}
523
}
524
update_async(opts.job_id, "stdout", stdout);
525
});
526
527
child.stderr.on("data", (data) => {
528
data = data.toString();
529
const prevLength = stderr.length;
530
if (opts.max_output != null) {
531
if (stderr.length < opts.max_output) {
532
const newData = data.slice(0, opts.max_output - stderr.length);
533
stderr += newData;
534
// Buffer the new portion for batched streaming
535
if (opts.streamCB && stderr.length > prevLength) {
536
streamBuffer.stderr += newData;
537
}
538
}
539
} else {
540
stderr += data;
541
// Buffer the new data for batched streaming
542
if (opts.streamCB) {
543
streamBuffer.stderr += data;
544
}
545
}
546
update_async(opts.job_id, "stderr", stderr);
547
});
548
549
child.stderr.on("end", () => {
550
stderr_is_done = true;
551
finish();
552
});
553
554
child.stdout.on("end", () => {
555
stdout_is_done = true;
556
finish();
557
});
558
559
// Doc: https://nodejs.org/api/child_process.html#event-exit – read it!
560
// TODO: This is not 100% correct, because in case the process is killed (signal TERM),
561
// the $code is "null" and a second argument gives the signal (as a string). Hence, after a kill,
562
// this code below changes the exit code to 0. This could be a special case, though.
563
// It cannot be null, though, because the "finish" callback assumes that stdout, err and exit are set.
564
// The local $killed var is only true, if the process has been killed by the timeout – not by another kill.
565
child.on("exit", (code) => {
566
exit_code = code ?? 0;
567
finish();
568
});
569
570
// This can happen, e.g., "Error: spawn ENOMEM" if there is no memory. Without this handler,
571
// an unhandled exception gets raised, which is nasty.
572
// From docs: "Note that the exit-event may or may not fire after an error has occurred. "
573
child.on("error", (err) => {
574
if (exit_code == null) {
575
exit_code = 1;
576
}
577
stderr += to_json(err);
578
// a fundamental issue, we were not running some code
579
ran_code = false;
580
// For streaming, flush buffer and send error event
581
if (opts.streamCB && opts.async_call && opts.job_id) {
582
flushStreamBuffer(); // Flush any buffered data first
583
const errorResult: ExecuteCodeOutputAsync = {
584
type: "async",
585
job_id: opts.job_id,
586
stdout,
587
stderr,
588
exit_code: exit_code ?? 1,
589
status: "error",
590
elapsed_s: walltime(start_time),
591
start: opts.job_config?.start ?? Date.now(),
592
pid: child.pid,
593
stats: opts.job_config?.stats,
594
};
595
opts.streamCB({ type: "done", data: errorResult });
596
}
597
finish();
598
});
599
600
if (opts.job_id && child.pid) {
601
// we don't await it, it runs until $callback_done is true
602
update_async(opts.job_id, "pid", child.pid);
603
startMonitor();
604
}
605
606
const finish = (err?) => {
607
if (!killed && (!stdout_is_done || !stderr_is_done || exit_code == null)) {
608
// it wasn't killed and none of stdout, stderr, and exit_code hasn't been set.
609
// so we let the rest of them get set before actually finishing up.
610
return;
611
}
612
if (callback_done) {
613
// we already finished up.
614
return;
615
}
616
617
// Safety check: if we're using streaming and the process has exited but streams aren't done,
618
// force completion after a short delay to prevent hanging
619
if (
620
opts.streamCB &&
621
exit_code != null &&
622
(!stdout_is_done || !stderr_is_done)
623
) {
624
setTimeout(() => {
625
if (!callback_done) {
626
stdout_is_done = true;
627
stderr_is_done = true;
628
finish(err);
629
}
630
}, 1000); // Wait 1 second for streams to complete
631
return;
632
}
633
// finally finish up – this will also terminate the monitor
634
callback_done = true;
635
636
// Flush any remaining buffered stream data before finishing
637
if (opts.streamCB) {
638
flushStreamBuffer();
639
}
640
641
if (timer != null) {
642
clearTimeout(timer);
643
timer = undefined;
644
}
645
646
if (opts.verbose && log.isEnabled("debug")) {
647
log.debug(
648
"finished exec of",
649
opts.command,
650
"took",
651
walltime(start_time),
652
"seconds",
653
);
654
log.debug({
655
stdout: trunc(stdout, 512),
656
stderr: trunc(stderr, 512),
657
exit_code,
658
});
659
}
660
661
// Handle timeout case first - this takes precedence over other error conditions
662
if (err && killed) {
663
// Process was killed due to timeout
664
if (opts.job_id) {
665
// For async with streaming, send timeout error in done event
666
if (opts.streamCB) {
667
const errorResult: ExecuteCodeOutputAsync = {
668
type: "async",
669
job_id: opts.job_id,
670
stdout,
671
stderr,
672
exit_code: 1, // Timeout is always an error
673
status: "error",
674
elapsed_s: walltime(start_time),
675
start: opts.job_config?.start ?? Date.now(),
676
pid: child.pid,
677
stats: opts.job_config?.stats,
678
};
679
opts.streamCB({ type: "done", data: errorResult });
680
}
681
// For streaming, don't call cb with error - let the stream handle it
682
if (!opts.streamCB) {
683
cb?.(err);
684
}
685
} else {
686
// sync behavior, like it was before
687
cb?.(err);
688
}
689
} else if (err) {
690
cb?.(err);
691
} else if (opts.err_on_exit && exit_code != 0) {
692
const x = opts.origCommand
693
? opts.origCommand
694
: `'${opts.command}' (args=${opts.args?.join(" ")})`;
695
if (opts.job_id) {
696
// For async with streaming, send error in done event
697
if (opts.streamCB) {
698
const errorResult: ExecuteCodeOutputAsync = {
699
type: "async",
700
job_id: opts.job_id,
701
stdout,
702
stderr,
703
exit_code: exit_code ?? 1,
704
status: "error",
705
elapsed_s: walltime(start_time),
706
start: opts.job_config?.start ?? Date.now(),
707
pid: child.pid,
708
stats: opts.job_config?.stats,
709
};
710
opts.streamCB({ type: "done", data: errorResult });
711
}
712
// For streaming, don't call cb with error - let the stream handle it
713
if (!opts.streamCB) {
714
cb?.(stderr);
715
}
716
} else {
717
// sync behavor, like it was before
718
cb?.(
719
`command '${x}' exited with nonzero code ${exit_code} -- stderr='${trunc(
720
stderr,
721
1024,
722
)}'`,
723
);
724
}
725
} else if (!ran_code) {
726
// regardless of opts.err_on_exit !
727
const x = opts.origCommand
728
? opts.origCommand
729
: `'${opts.command}' (args=${opts.args?.join(" ")})`;
730
cb?.(
731
`command '${x}' was not able to run -- stderr='${trunc(stderr, 1024)}'`,
732
);
733
} else {
734
if (opts.max_output != null) {
735
if (stdout.length >= opts.max_output) {
736
stdout += ` (truncated at ${opts.max_output} characters)`;
737
}
738
if (stderr.length >= opts.max_output) {
739
stderr += ` (truncated at ${opts.max_output} characters)`;
740
}
741
}
742
if (exit_code == null) {
743
// if exit-code not set, may have been SIGKILL so we set it to 1
744
exit_code = 1;
745
}
746
const result = { type: "blocking" as const, stdout, stderr, exit_code };
747
// For async with streaming, send the final done event
748
if (opts.streamCB && opts.async_call) {
749
const finalResult: ExecuteCodeOutputAsync = {
750
type: "async",
751
job_id: opts.job_id!,
752
stdout,
753
stderr,
754
exit_code: exit_code ?? 0,
755
status: "completed",
756
elapsed_s: walltime(start_time),
757
start: opts.job_config?.start ?? Date.now(),
758
pid: child.pid,
759
stats: opts.job_config?.stats,
760
};
761
opts.streamCB({ type: "done", data: finalResult });
762
}
763
cb?.(undefined, result);
764
}
765
};
766
767
if (opts.timeout) {
768
// setup a timer that will kill the command after a certain amount of time.
769
const f = () => {
770
if (child.exitCode != null) {
771
// command already exited.
772
return;
773
}
774
if (opts.verbose) {
775
log.debug(
776
"subprocess did not exit after",
777
opts.timeout,
778
"seconds, so killing with SIGKILL",
779
);
780
}
781
try {
782
killed = true; // we set the kill flag in any case – i.e. process will no longer exist
783
if (child.pid != null) {
784
process.kill(-child.pid, "SIGKILL"); // this should kill process group
785
}
786
} catch (err) {
787
// Exceptions can happen, which left uncaught messes up calling code big time.
788
if (opts.verbose) {
789
log.debug("process.kill raised an exception", err);
790
}
791
}
792
finish(`killed command '${opts.command} ${opts.args?.join(" ")}'`);
793
};
794
timer = setTimeout(f, opts.timeout * 1000);
795
}
796
797
return child;
798
}
799
800