Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/project/exec-stream.ts
2208 views
1
/*
2
Project-side exec-stream service that handles streaming execution requests.
3
Similar to how the project API service works, but specifically for streaming exec.
4
*/
5
6
import { executeStream, StreamEvent } from "@cocalc/backend/exec-stream";
7
import { Message, Subscription } from "@cocalc/conat/core/client";
8
import { projectSubject, EXEC_STREAM_SERVICE } from "@cocalc/conat/names";
9
import { connectToConat } from "@cocalc/project/conat/connection";
10
import { project_id } from "@cocalc/project/data";
11
import { getLogger } from "@cocalc/project/logger";
12
13
const logger = getLogger("project:exec-stream");
14
15
export function init() {
16
serve();
17
}
18
19
async function serve() {
20
logger.debug("serve: create project exec-stream service");
21
const cn = connectToConat();
22
const subject = projectSubject({
23
project_id,
24
compute_server_id: 0, // This is the project home base, always 0
25
service: EXEC_STREAM_SERVICE,
26
});
27
28
logger.debug(
29
`serve: creating exec-stream service for project ${project_id} and subject='${subject}'`,
30
);
31
const api = await cn.subscribe(subject, { queue: "q" });
32
await listen(api, subject);
33
}
34
35
async function listen(api: Subscription, subject: string) {
36
logger.debug(`Listening on subject='${subject}'`);
37
38
for await (const mesg of api) {
39
handleMessage(mesg);
40
}
41
}
42
43
async function handleMessage(mesg: Message) {
44
const options = mesg.data;
45
46
let seq = 0;
47
const respond = ({ type, data, error }: StreamEvent) => {
48
mesg.respondSync({ type, data, error, seq });
49
seq += 1;
50
};
51
52
let done = false;
53
const end = () => {
54
if (done) return;
55
done = true;
56
// end response stream with null payload.
57
mesg.respondSync(null);
58
};
59
60
const stream = (event: StreamEvent) => {
61
if (done) return;
62
if (event != null) {
63
respond(event);
64
} else {
65
end();
66
}
67
};
68
69
try {
70
// SECURITY: verify that the project_id claimed in options matches
71
// with our actual project_id
72
if (options.project_id != project_id) {
73
throw Error("project_id is invalid");
74
}
75
76
const { stream: _, project_id: reqProjectId, ...opts } = options;
77
78
// Call the backend executeStream function
79
await executeStream({
80
...opts,
81
project_id: reqProjectId,
82
stream,
83
});
84
} catch (err) {
85
if (!done) {
86
respond({ error: `${err}` });
87
end();
88
}
89
}
90
}
91
92