Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/exec-stream.ts
2208 views
1
/*
2
* Backend exec-stream functionality for streaming code execution.
3
* Core streaming logic that can be used by different services.
4
*/
5
6
import { unreachable } from "@cocalc/util/misc";
7
import {
8
ExecuteCodeOutput,
9
ExecuteCodeOutputAsync,
10
ExecuteCodeStats,
11
ExecuteCodeStreamEvent,
12
} from "@cocalc/util/types/execute-code";
13
import { asyncCache, executeCode } from "./execute-code";
14
import getLogger from "./logger";
15
import { abspath } from "./misc_node";
16
17
export type StreamEvent = {
18
type?: "job" | ExecuteCodeStreamEvent["type"];
19
data?: ExecuteCodeStreamEvent["data"];
20
error?: string;
21
};
22
23
const logger = getLogger("backend:exec-stream");
24
25
const MONITOR_STATS_LENGTH_MAX = 100; // Max stats entries
26
27
function truncStats(stats: ExecuteCodeStats): ExecuteCodeStats {
28
return stats.slice(stats.length - MONITOR_STATS_LENGTH_MAX);
29
}
30
31
export interface ExecuteStreamOptions {
32
command?: string;
33
args?: string[];
34
path?: string;
35
compute_server_id?: number;
36
bash?: boolean;
37
env?: { [key: string]: string };
38
timeout?: number;
39
max_output?: number;
40
verbose?: boolean;
41
project_id?: string;
42
debug?: string;
43
stream: (event: StreamEvent | null) => void;
44
waitForCompletion?: boolean;
45
}
46
47
export async function executeStream(
48
options: ExecuteStreamOptions,
49
): Promise<ExecuteCodeOutput | undefined> {
50
const { stream, debug, project_id, waitForCompletion, ...opts } = options;
51
52
// Log debug message for debugging purposes
53
if (debug) {
54
logger.debug(`executeStream: ${debug}`);
55
}
56
57
let job: ExecuteCodeOutput | undefined;
58
59
try {
60
let done = false;
61
let stats: ExecuteCodeStats = [];
62
63
// Create streaming callback, passed into execute-code::executeCode call
64
const streamCB = (event: ExecuteCodeStreamEvent) => {
65
if (done) {
66
logger.debug(
67
`executeStream: ignoring event type=${event.type} because stream is done`,
68
);
69
return;
70
}
71
72
logger.debug(`executeStream: received event type=${event.type}`);
73
74
switch (event.type) {
75
case "stdout":
76
stream({
77
type: "stdout",
78
data: event.data,
79
});
80
break;
81
82
case "stderr":
83
stream({
84
type: "stderr",
85
data: event.data,
86
});
87
break;
88
89
case "stats":
90
// Stats are accumulated in the stats array for the final result
91
if (
92
event.data &&
93
typeof event.data === "object" &&
94
"timestamp" in event.data
95
) {
96
stats.push(event.data as ExecuteCodeStats[0]);
97
// Keep stats array bounded
98
if (stats.length > MONITOR_STATS_LENGTH_MAX) {
99
stats.splice(0, stats.length - MONITOR_STATS_LENGTH_MAX);
100
}
101
stream({
102
type: "stats",
103
data: event.data,
104
});
105
}
106
break;
107
108
case "done":
109
logger.debug(`executeStream: processing done event`);
110
const result = event.data as ExecuteCodeOutputAsync;
111
// Include accumulated stats in final result
112
result.stats = truncStats(stats);
113
stream({
114
type: "done",
115
data: result,
116
});
117
done = true;
118
stream(null); // End the stream
119
break;
120
121
case "error":
122
logger.debug(`executeStream: processing error event`);
123
stream({ error: event.data as string });
124
done = true;
125
stream(null);
126
break;
127
128
default:
129
unreachable(event.type);
130
}
131
};
132
133
// Start an async execution job with streaming callback
134
job = await executeCode({
135
command: opts.command || "",
136
path: !!opts.compute_server_id ? opts.path : abspath(opts.path ?? ""),
137
...opts,
138
async_call: true, // Force async mode for streaming
139
streamCB, // Add the streaming callback
140
});
141
142
if (job?.type !== "async") {
143
stream({ error: "Failed to create async job for streaming" });
144
stream(null);
145
return undefined;
146
}
147
148
// Send initial job info with full async structure
149
// Get the current job status from cache in case it completed immediately
150
const currentJob = asyncCache.get(job.job_id);
151
const initialJobInfo: ExecuteCodeOutputAsync = {
152
type: "async",
153
job_id: job.job_id,
154
pid: job.pid,
155
status: currentJob?.status ?? job.status,
156
start: job.start,
157
stdout: currentJob?.stdout ?? "",
158
stderr: currentJob?.stderr ?? "",
159
exit_code: currentJob?.exit_code ?? 0, // Default to 0, will be updated when job completes
160
stats: currentJob?.stats ?? [],
161
};
162
163
stream({
164
type: "job",
165
data: initialJobInfo,
166
});
167
168
// If job already completed, send done event immediately
169
if (currentJob && currentJob.status !== "running") {
170
logger.debug(
171
`executeStream: job ${job.job_id} already completed, sending done event`,
172
);
173
stream({
174
type: "done",
175
data: currentJob,
176
});
177
done = true;
178
stream(null);
179
return currentJob;
180
}
181
182
// Stats monitoring is now handled by execute-code.ts via streamCB
183
} catch (err) {
184
stream({ error: `${err}` });
185
stream(null); // End the stream
186
return undefined;
187
}
188
189
// Return the job object so caller can wait for completion if desired
190
return job;
191
}
192
193