Path: blob/master/src/packages/project/exec-stream.ts
2208 views
/*1Project-side exec-stream service that handles streaming execution requests.2Similar to how the project API service works, but specifically for streaming exec.3*/45import { executeStream, StreamEvent } from "@cocalc/backend/exec-stream";6import { Message, Subscription } from "@cocalc/conat/core/client";7import { projectSubject, EXEC_STREAM_SERVICE } from "@cocalc/conat/names";8import { connectToConat } from "@cocalc/project/conat/connection";9import { project_id } from "@cocalc/project/data";10import { getLogger } from "@cocalc/project/logger";1112const logger = getLogger("project:exec-stream");1314export function init() {15serve();16}1718async function serve() {19logger.debug("serve: create project exec-stream service");20const cn = connectToConat();21const subject = projectSubject({22project_id,23compute_server_id: 0, // This is the project home base, always 024service: EXEC_STREAM_SERVICE,25});2627logger.debug(28`serve: creating exec-stream service for project ${project_id} and subject='${subject}'`,29);30const api = await cn.subscribe(subject, { queue: "q" });31await listen(api, subject);32}3334async function listen(api: Subscription, subject: string) {35logger.debug(`Listening on subject='${subject}'`);3637for await (const mesg of api) {38handleMessage(mesg);39}40}4142async function handleMessage(mesg: Message) {43const options = mesg.data;4445let seq = 0;46const respond = ({ type, data, error }: StreamEvent) => {47mesg.respondSync({ type, data, error, seq });48seq += 1;49};5051let done = false;52const end = () => {53if (done) return;54done = true;55// end response stream with null payload.56mesg.respondSync(null);57};5859const stream = (event: StreamEvent) => {60if (done) return;61if (event != null) {62respond(event);63} else {64end();65}66};6768try {69// SECURITY: verify that the project_id claimed in options matches70// with our actual project_id71if (options.project_id != project_id) {72throw Error("project_id is invalid");73}7475const { stream: _, project_id: reqProjectId, ...opts } = options;7677// Call the backend executeStream function78await executeStream({79...opts,80project_id: reqProjectId,81stream,82});83} catch (err) {84if (!done) {85respond({ error: `${err}` });86end();87}88}89}909192