Path: blob/master/src/packages/backend/exec-stream.ts
2208 views
/*1* Backend exec-stream functionality for streaming code execution.2* Core streaming logic that can be used by different services.3*/45import { unreachable } from "@cocalc/util/misc";6import {7ExecuteCodeOutput,8ExecuteCodeOutputAsync,9ExecuteCodeStats,10ExecuteCodeStreamEvent,11} from "@cocalc/util/types/execute-code";12import { asyncCache, executeCode } from "./execute-code";13import getLogger from "./logger";14import { abspath } from "./misc_node";1516export type StreamEvent = {17type?: "job" | ExecuteCodeStreamEvent["type"];18data?: ExecuteCodeStreamEvent["data"];19error?: string;20};2122const logger = getLogger("backend:exec-stream");2324const MONITOR_STATS_LENGTH_MAX = 100; // Max stats entries2526function truncStats(stats: ExecuteCodeStats): ExecuteCodeStats {27return stats.slice(stats.length - MONITOR_STATS_LENGTH_MAX);28}2930export interface ExecuteStreamOptions {31command?: string;32args?: string[];33path?: string;34compute_server_id?: number;35bash?: boolean;36env?: { [key: string]: string };37timeout?: number;38max_output?: number;39verbose?: boolean;40project_id?: string;41debug?: string;42stream: (event: StreamEvent | null) => void;43waitForCompletion?: boolean;44}4546export async function executeStream(47options: ExecuteStreamOptions,48): Promise<ExecuteCodeOutput | undefined> {49const { stream, debug, project_id, waitForCompletion, ...opts } = options;5051// Log debug message for debugging purposes52if (debug) {53logger.debug(`executeStream: ${debug}`);54}5556let job: ExecuteCodeOutput | undefined;5758try {59let done = false;60let stats: ExecuteCodeStats = [];6162// Create streaming callback, passed into execute-code::executeCode call63const streamCB = (event: ExecuteCodeStreamEvent) => {64if (done) {65logger.debug(66`executeStream: ignoring event type=${event.type} because stream is done`,67);68return;69}7071logger.debug(`executeStream: received event type=${event.type}`);7273switch (event.type) {74case "stdout":75stream({76type: "stdout",77data: event.data,78});79break;8081case "stderr":82stream({83type: "stderr",84data: event.data,85});86break;8788case "stats":89// Stats are accumulated in the stats array for the final result90if (91event.data &&92typeof event.data === "object" &&93"timestamp" in event.data94) {95stats.push(event.data as ExecuteCodeStats[0]);96// Keep stats array bounded97if (stats.length > MONITOR_STATS_LENGTH_MAX) {98stats.splice(0, stats.length - MONITOR_STATS_LENGTH_MAX);99}100stream({101type: "stats",102data: event.data,103});104}105break;106107case "done":108logger.debug(`executeStream: processing done event`);109const result = event.data as ExecuteCodeOutputAsync;110// Include accumulated stats in final result111result.stats = truncStats(stats);112stream({113type: "done",114data: result,115});116done = true;117stream(null); // End the stream118break;119120case "error":121logger.debug(`executeStream: processing error event`);122stream({ error: event.data as string });123done = true;124stream(null);125break;126127default:128unreachable(event.type);129}130};131132// Start an async execution job with streaming callback133job = await executeCode({134command: opts.command || "",135path: !!opts.compute_server_id ? opts.path : abspath(opts.path ?? ""),136...opts,137async_call: true, // Force async mode for streaming138streamCB, // Add the streaming callback139});140141if (job?.type !== "async") {142stream({ error: "Failed to create async job for streaming" });143stream(null);144return undefined;145}146147// Send initial job info with full async structure148// Get the current job status from cache in case it completed immediately149const currentJob = asyncCache.get(job.job_id);150const initialJobInfo: ExecuteCodeOutputAsync = {151type: "async",152job_id: job.job_id,153pid: job.pid,154status: currentJob?.status ?? job.status,155start: job.start,156stdout: currentJob?.stdout ?? "",157stderr: currentJob?.stderr ?? "",158exit_code: currentJob?.exit_code ?? 0, // Default to 0, will be updated when job completes159stats: currentJob?.stats ?? [],160};161162stream({163type: "job",164data: initialJobInfo,165});166167// If job already completed, send done event immediately168if (currentJob && currentJob.status !== "running") {169logger.debug(170`executeStream: job ${job.job_id} already completed, sending done event`,171);172stream({173type: "done",174data: currentJob,175});176done = true;177stream(null);178return currentJob;179}180181// Stats monitoring is now handled by execute-code.ts via streamCB182} catch (err) {183stream({ error: `${err}` });184stream(null); // End the stream185return undefined;186}187188// Return the job object so caller can wait for completion if desired189return job;190}191192193