CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/execute-code.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020–2024 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
// Execute code in a subprocess.
7
8
import { callback } from "awaiting";
9
import LRU from "lru-cache";
10
import {
11
ChildProcessWithoutNullStreams,
12
spawn,
13
SpawnOptionsWithoutStdio,
14
} from "node:child_process";
15
import { chmod, mkdtemp, rm, writeFile } from "node:fs/promises";
16
import { tmpdir } from "node:os";
17
import { join } from "node:path";
18
import { EventEmitter } from "node:stream";
19
import shellEscape from "shell-escape";
20
21
import getLogger from "@cocalc/backend/logger";
22
import { envToInt } from "@cocalc/backend/misc/env-to-number";
23
import { aggregate } from "@cocalc/util/aggregate";
24
import { callback_opts } from "@cocalc/util/async-utils";
25
import { PROJECT_EXEC_DEFAULT_TIMEOUT_S } from "@cocalc/util/consts/project";
26
import { to_json, trunc, uuid, walltime } from "@cocalc/util/misc";
27
import {
28
ExecuteCodeOutputAsync,
29
ExecuteCodeOutputBlocking,
30
isExecuteCodeOptionsAsyncGet,
31
type ExecuteCodeFunctionWithCallback,
32
type ExecuteCodeOptions,
33
type ExecuteCodeOptionsAsyncGet,
34
type ExecuteCodeOptionsWithCallback,
35
type ExecuteCodeOutput,
36
} from "@cocalc/util/types/execute-code";
37
import { Processes } from "@cocalc/util/types/project-info/types";
38
import { envForSpawn } from "./misc";
39
import { ProcessStats } from "./process-stats";
40
41
const log = getLogger("execute-code");
42
43
const PREFIX = "COCALC_PROJECT_ASYNC_EXEC";
44
const ASYNC_CACHE_MAX = envToInt(`${PREFIX}_CACHE_MAX`, 100);
45
const ASYNC_CACHE_TTL_S = envToInt(`${PREFIX}_TTL_S`, 60 * 60);
46
// for async execution, every that many secs check up on the child-tree
47
const MONITOR_INTERVAL_S = envToInt(`${PREFIX}_MONITOR_INTERVAL_S`, 60);
48
const MONITOR_STATS_LENGTH_MAX = envToInt(
49
`${PREFIX}_MONITOR_STATS_LENGTH_MAX`,
50
100,
51
);
52
53
log.debug("configuration:", {
54
ASYNC_CACHE_MAX,
55
ASYNC_CACHE_TTL_S,
56
MONITOR_INTERVAL_S,
57
MONITOR_STATS_LENGTH_MAX,
58
});
59
60
type AsyncAwait = "finished";
61
const updates = new EventEmitter();
62
const eventKey = (type: AsyncAwait, job_id: string): string =>
63
`${type}-${job_id}`;
64
65
const asyncCache = new LRU<string, ExecuteCodeOutputAsync>({
66
max: ASYNC_CACHE_MAX,
67
ttl: 1000 * ASYNC_CACHE_TTL_S,
68
updateAgeOnGet: true,
69
updateAgeOnHas: true,
70
});
71
72
function truncStats(obj?: ExecuteCodeOutputAsync) {
73
if (Array.isArray(obj?.stats)) {
74
// truncate to $MONITOR_STATS_LENGTH_MAX, by discarding the inital entries
75
obj.stats = obj.stats.slice(obj.stats.length - MONITOR_STATS_LENGTH_MAX);
76
}
77
}
78
79
function asyncCacheUpdate(job_id: string, upd): ExecuteCodeOutputAsync {
80
const obj = asyncCache.get(job_id);
81
if (Array.isArray(obj?.stats) && Array.isArray(upd.stats)) {
82
obj.stats.push(...upd.stats);
83
truncStats(obj);
84
}
85
const next: ExecuteCodeOutputAsync = { ...obj, ...upd };
86
asyncCache.set(job_id, next);
87
if (next.status !== "running") {
88
updates.emit(eventKey("finished", next.job_id), next);
89
}
90
return next;
91
}
92
93
// Async/await interface to executing code.
94
export async function executeCode(
95
opts: ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet,
96
): Promise<ExecuteCodeOutput> {
97
return await callback_opts(execute_code)(opts);
98
}
99
100
// Callback interface to executing code.
101
// This will get deprecated and is only used by some old coffeescript code.
102
export const execute_code: ExecuteCodeFunctionWithCallback = aggregate(
103
(opts: ExecuteCodeOptionsWithCallback): void => {
104
(async () => {
105
try {
106
let data = await executeCodeNoAggregate(opts);
107
if (isExecuteCodeOptionsAsyncGet(opts) && data.type === "async") {
108
// stats could contain a lot of data. we only return it if requested.
109
if (opts.async_stats !== true) {
110
data = { ...data, stats: undefined };
111
}
112
}
113
opts.cb?.(undefined, data);
114
} catch (err) {
115
opts.cb?.(err);
116
}
117
})();
118
},
119
);
120
121
async function clean_up_tmp(tempDir: string | undefined) {
122
if (tempDir) {
123
await rm(tempDir, { force: true, recursive: true });
124
}
125
}
126
127
// actual implementation, without the aggregate wrapper
128
async function executeCodeNoAggregate(
129
opts: ExecuteCodeOptions | ExecuteCodeOptionsAsyncGet,
130
): Promise<ExecuteCodeOutput> {
131
if (isExecuteCodeOptionsAsyncGet(opts)) {
132
const key = opts.async_get;
133
const cached = asyncCache.get(key);
134
if (cached != null) {
135
const { async_await } = opts;
136
if (cached.status === "running" && async_await === true) {
137
return new Promise((done) =>
138
updates.once(eventKey("finished", key), (data) => done(data)),
139
);
140
} else {
141
return cached;
142
}
143
} else {
144
throw new Error(`Async operation '${key}' does not exist.`);
145
}
146
}
147
148
opts.args ??= [];
149
opts.timeout ??= PROJECT_EXEC_DEFAULT_TIMEOUT_S;
150
opts.ulimit_timeout ??= true;
151
opts.err_on_exit ??= true;
152
opts.verbose ??= true;
153
154
if (opts.verbose) {
155
log.debug(`input: ${opts.command} ${opts.args?.join(" ")}`);
156
}
157
const s = opts.command.split(/\s+/g); // split on whitespace
158
if (opts.args?.length === 0 && s.length > 1) {
159
opts.bash = true;
160
} else if (opts.bash && opts.args?.length > 0) {
161
// Selected bash, but still passed in args.
162
opts.command = shellEscape([opts.command].concat(opts.args));
163
opts.args = [];
164
}
165
166
if (opts.home == null) {
167
opts.home = process.env.HOME;
168
}
169
170
if (opts.path == null) {
171
opts.path = opts.home;
172
} else if (opts.path[0] !== "/") {
173
opts.path = opts.home + "/" + opts.path;
174
}
175
176
let tempDir: string | undefined = undefined;
177
178
try {
179
let origCommand = "";
180
if (opts.bash) {
181
// using bash, which (for better or worse), we do by writing the command to run
182
// under bash to a file, then executing that file.
183
let cmd: string;
184
if (opts.timeout && opts.ulimit_timeout) {
185
// This ensures that everything involved with this
186
// command really does die no matter what; it's
187
// better than killing from outside, since it gets
188
// all subprocesses since they inherit the limits.
189
// Leave it to the OS. Note that the argument to ulimit
190
// must be a whole number.
191
cmd = `ulimit -t ${Math.ceil(opts.timeout)}\n${opts.command}`;
192
} else {
193
cmd = opts.command;
194
}
195
196
// We write the cmd to a file, and replace the command and args
197
// with bash and the filename, then do everything below as we would
198
// have done anyways.
199
origCommand = opts.command;
200
opts.command = "bash";
201
tempDir = await mkdtemp(join(tmpdir(), "cocalc-"));
202
const tempPath = join(tempDir, "a.sh");
203
if (opts.verbose) {
204
log.debug("writing temp file that contains bash program", tempPath);
205
}
206
opts.args = [tempPath];
207
await writeFile(tempPath, cmd);
208
await chmod(tempPath, 0o700);
209
}
210
211
if (opts.async_call) {
212
// we return an ID, the caller can then use it to query the status
213
opts.max_output ??= 1024 * 1024; // we limit how much we keep in memory, to avoid problems;
214
opts.timeout ??= PROJECT_EXEC_DEFAULT_TIMEOUT_S;
215
const job_id = uuid();
216
const start = Date.now();
217
const job_config: ExecuteCodeOutputAsync = {
218
type: "async",
219
stdout: "",
220
stderr: "",
221
exit_code: 0,
222
start,
223
job_id,
224
status: "running",
225
};
226
asyncCache.set(job_id, job_config);
227
228
const pid: number | undefined = doSpawn(
229
{ ...opts, origCommand, job_id, job_config },
230
async (err, result) => {
231
log.debug("async/doSpawn returned", { err, result });
232
try {
233
const info: Omit<
234
ExecuteCodeOutputAsync,
235
"stdout" | "stderr" | "exit_code"
236
> = {
237
job_id,
238
type: "async",
239
elapsed_s: (Date.now() - start) / 1000,
240
start,
241
status: "error",
242
};
243
if (err) {
244
asyncCacheUpdate(job_id, {
245
stdout: "",
246
stderr: `${err}`,
247
exit_code: 1,
248
...info,
249
});
250
} else if (result != null) {
251
asyncCacheUpdate(job_id, {
252
...result,
253
...info,
254
...{ status: "completed" },
255
});
256
} else {
257
asyncCacheUpdate(job_id, {
258
stdout: "",
259
stderr: `No result`,
260
exit_code: 1,
261
...info,
262
});
263
}
264
} finally {
265
await clean_up_tmp(tempDir);
266
}
267
},
268
);
269
270
// pid could be undefined, this means it wasn't possible to spawn a child
271
return { ...job_config, pid };
272
} else {
273
// This is the blocking variant
274
return await callback(doSpawn, { ...opts, origCommand });
275
}
276
} finally {
277
// do not delete the tempDir in async mode!
278
if (!opts.async_call) await clean_up_tmp(tempDir);
279
}
280
}
281
282
function sumChildren(
283
procs: Processes,
284
children: { [pid: number]: number[] },
285
pid: number,
286
): { rss: number; pct_cpu: number; cpu_secs: number } | null {
287
const proc = procs[`${pid}`];
288
if (proc == null) {
289
log.debug(`sumChildren: no process ${pid} in proc`);
290
return null;
291
}
292
let rss = proc.stat.mem.rss;
293
let pct_cpu = proc.cpu.pct;
294
let cpu_secs = proc.cpu.secs;
295
for (const ch of children[pid] ?? []) {
296
const sc = sumChildren(procs, children, ch);
297
if (sc == null) return null;
298
rss += sc.rss;
299
pct_cpu += sc.pct_cpu;
300
cpu_secs += sc.cpu_secs;
301
}
302
return { rss, pct_cpu, cpu_secs };
303
}
304
305
function doSpawn(
306
opts: ExecuteCodeOptions & {
307
origCommand: string;
308
job_id?: string;
309
job_config?: ExecuteCodeOutputAsync;
310
},
311
cb: (err: string | undefined, result?: ExecuteCodeOutputBlocking) => void,
312
): number | undefined {
313
const start_time = walltime();
314
315
if (opts.verbose) {
316
log.debug(
317
"spawning",
318
opts.command,
319
"with args",
320
opts.args,
321
"and timeout",
322
opts.timeout,
323
"seconds",
324
);
325
}
326
327
const spawnOptions: SpawnOptionsWithoutStdio = {
328
detached: true, // so we can kill the entire process group if it times out
329
cwd: opts.path,
330
...(opts.uid ? { uid: opts.uid } : undefined),
331
...(opts.gid ? { uid: opts.gid } : undefined),
332
env: {
333
...envForSpawn(),
334
...opts.env,
335
...(opts.uid != null && opts.home ? { HOME: opts.home } : undefined),
336
},
337
};
338
339
// This is the state, which will be captured in closures
340
let child: ChildProcessWithoutNullStreams;
341
let ran_code = false;
342
let stdout = "";
343
let stderr = "";
344
let exit_code: undefined | number = undefined;
345
let stderr_is_done = false;
346
let stdout_is_done = false;
347
let killed = false;
348
let callback_done = false; // set in "finish", which is also called in a timeout
349
let timer: NodeJS.Timeout | undefined = undefined;
350
351
// periodically check up on the child process tree and record stats
352
// this also keeps the entry in the cache alive, when the ttl is less than the duration of the execution
353
async function startMonitor() {
354
const pid = child.pid;
355
const { job_id, job_config } = opts;
356
if (job_id == null || pid == null || job_config == null) return;
357
const monitor = new ProcessStats();
358
await monitor.init();
359
await new Promise((done) => setTimeout(done, 1000));
360
if (callback_done) return;
361
362
while (true) {
363
if (callback_done) return;
364
const { procs } = await monitor.processes(Date.now());
365
// reconstruct process tree
366
const children: { [pid: number]: number[] } = {};
367
for (const p of Object.values(procs)) {
368
const { pid, ppid } = p;
369
children[ppid] ??= [];
370
children[ppid].push(pid);
371
}
372
// we only consider those, which are the pid itself or one of its children
373
const sc = sumChildren(procs, children, pid);
374
if (sc == null) {
375
// If the process by PID is no longer known, either the process was killed or there are too many running.
376
// in any case, stop monitoring and do not update any data.
377
return;
378
}
379
const { rss, pct_cpu, cpu_secs } = sc;
380
// ?? fallback, in case the cache "forgot" about it
381
const obj = asyncCache.get(job_id) ?? job_config;
382
obj.pid = pid;
383
obj.stats ??= [];
384
obj.stats.push({
385
timestamp: Date.now(),
386
mem_rss: rss,
387
cpu_pct: pct_cpu,
388
cpu_secs,
389
});
390
truncStats(obj);
391
asyncCache.set(job_id, obj);
392
393
// initially, we record more frequently, but then we space it out up until the interval (probably 1 minute)
394
const elapsed_s = (Date.now() - job_config.start) / 1000;
395
// i.e. after 6 minutes, we check every minute
396
const next_s = Math.max(1, Math.floor(elapsed_s / 6));
397
const wait_s = Math.min(next_s, MONITOR_INTERVAL_S);
398
await new Promise((done) => setTimeout(done, wait_s * 1000));
399
}
400
}
401
402
try {
403
child = spawn(opts.command, opts.args, spawnOptions);
404
if (child.stdout == null || child.stderr == null) {
405
// The docs/examples at https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options
406
// suggest that r.stdout and r.stderr are always defined. However, this is
407
// definitely NOT the case in edge cases, as we have observed.
408
cb("error creating child process -- couldn't spawn child process");
409
return;
410
}
411
} catch (error) {
412
// Yes, spawn can cause this error if there is no memory, and there's no
413
// event! -- Error: spawn ENOMEM
414
ran_code = false;
415
cb(`error ${error}`);
416
return;
417
}
418
419
ran_code = true;
420
421
if (opts.verbose) {
422
log.debug("listening for stdout, stderr and exit_code...");
423
}
424
425
function update_async(
426
job_id: string | undefined,
427
aspect: "stdout" | "stderr" | "pid",
428
data: string | number,
429
): ExecuteCodeOutputAsync | undefined {
430
if (!job_id) return;
431
// job_config fallback, in case the cache forgot about it
432
const obj = asyncCache.get(job_id) ?? opts.job_config;
433
if (obj != null) {
434
if (aspect === "pid") {
435
if (typeof data === "number") {
436
obj.pid = data;
437
}
438
} else if (typeof data === "string") {
439
obj[aspect] = data;
440
}
441
asyncCache.set(job_id, obj);
442
}
443
return obj;
444
}
445
446
child.stdout.on("data", (data) => {
447
data = data.toString();
448
if (opts.max_output != null) {
449
if (stdout.length < opts.max_output) {
450
stdout += data.slice(0, opts.max_output - stdout.length);
451
}
452
} else {
453
stdout += data;
454
}
455
update_async(opts.job_id, "stdout", stdout);
456
});
457
458
child.stderr.on("data", (data) => {
459
data = data.toString();
460
if (opts.max_output != null) {
461
if (stderr.length < opts.max_output) {
462
stderr += data.slice(0, opts.max_output - stderr.length);
463
}
464
} else {
465
stderr += data;
466
}
467
update_async(opts.job_id, "stderr", stderr);
468
});
469
470
child.stderr.on("end", () => {
471
stderr_is_done = true;
472
finish();
473
});
474
475
child.stdout.on("end", () => {
476
stdout_is_done = true;
477
finish();
478
});
479
480
// Doc: https://nodejs.org/api/child_process.html#event-exit – read it!
481
// TODO: This is not 100% correct, because in case the process is killed (signal TERM),
482
// the $code is "null" and a second argument gives the signal (as a string). Hence, after a kill,
483
// this code below changes the exit code to 0. This could be a special case, though.
484
// It cannot be null, though, because the "finish" callback assumes that stdout, err and exit are set.
485
// The local $killed var is only true, if the process has been killed by the timeout – not by another kill.
486
child.on("exit", (code) => {
487
exit_code = code ?? 0;
488
finish();
489
});
490
491
// This can happen, e.g., "Error: spawn ENOMEM" if there is no memory. Without this handler,
492
// an unhandled exception gets raised, which is nasty.
493
// From docs: "Note that the exit-event may or may not fire after an error has occurred. "
494
child.on("error", (err) => {
495
if (exit_code == null) {
496
exit_code = 1;
497
}
498
stderr += to_json(err);
499
// a fundamental issue, we were not running some code
500
ran_code = false;
501
finish();
502
});
503
504
if (opts.job_id && child.pid) {
505
// we don't await it, it runs until $callback_done is true
506
update_async(opts.job_id, "pid", child.pid);
507
startMonitor();
508
}
509
510
const finish = (err?) => {
511
if (!killed && (!stdout_is_done || !stderr_is_done || exit_code == null)) {
512
// it wasn't killed and none of stdout, stderr, and exit_code hasn't been set.
513
// so we let the rest of them get set before actually finishing up.
514
return;
515
}
516
if (callback_done) {
517
// we already finished up.
518
return;
519
}
520
// finally finish up – this will also terminate the monitor
521
callback_done = true;
522
523
if (timer != null) {
524
clearTimeout(timer);
525
timer = undefined;
526
}
527
528
if (opts.verbose && log.isEnabled("debug")) {
529
log.debug(
530
"finished exec of",
531
opts.command,
532
"took",
533
walltime(start_time),
534
"seconds",
535
);
536
log.debug({
537
stdout: trunc(stdout, 512),
538
stderr: trunc(stderr, 512),
539
exit_code,
540
});
541
}
542
543
if (err) {
544
cb(err);
545
} else if (opts.err_on_exit && exit_code != 0) {
546
const x = opts.origCommand
547
? opts.origCommand
548
: `'${opts.command}' (args=${opts.args?.join(" ")})`;
549
if (opts.job_id) {
550
cb(stderr);
551
} else {
552
// sync behavor, like it was before
553
cb(
554
`command '${x}' exited with nonzero code ${exit_code} -- stderr='${trunc(
555
stderr,
556
1024,
557
)}'`,
558
);
559
}
560
} else if (!ran_code) {
561
// regardless of opts.err_on_exit !
562
const x = opts.origCommand
563
? opts.origCommand
564
: `'${opts.command}' (args=${opts.args?.join(" ")})`;
565
cb(
566
`command '${x}' was not able to run -- stderr='${trunc(stderr, 1024)}'`,
567
);
568
} else {
569
if (opts.max_output != null) {
570
if (stdout.length >= opts.max_output) {
571
stdout += ` (truncated at ${opts.max_output} characters)`;
572
}
573
if (stderr.length >= opts.max_output) {
574
stderr += ` (truncated at ${opts.max_output} characters)`;
575
}
576
}
577
if (exit_code == null) {
578
// if exit-code not set, may have been SIGKILL so we set it to 1
579
exit_code = 1;
580
}
581
cb(undefined, { type: "blocking", stdout, stderr, exit_code });
582
}
583
};
584
585
if (opts.timeout) {
586
// setup a timer that will kill the command after a certain amount of time.
587
const f = () => {
588
if (child.exitCode != null) {
589
// command already exited.
590
return;
591
}
592
if (opts.verbose) {
593
log.debug(
594
"subprocess did not exit after",
595
opts.timeout,
596
"seconds, so killing with SIGKILL",
597
);
598
}
599
try {
600
killed = true; // we set the kill flag in any case – i.e. process will no longer exist
601
if (child.pid != null) {
602
process.kill(-child.pid, "SIGKILL"); // this should kill process group
603
}
604
} catch (err) {
605
// Exceptions can happen, which left uncaught messes up calling code big time.
606
if (opts.verbose) {
607
log.debug("process.kill raised an exception", err);
608
}
609
}
610
finish(`killed command '${opts.command} ${opts.args?.join(" ")}'`);
611
};
612
timer = setTimeout(f, opts.timeout * 1000);
613
}
614
615
return child.pid;
616
}
617
618