Path: blob/master/src/packages/backend/exec-stream.test.ts
2208 views
/*1* Test exec-stream streaming functionality2*3* DEVELOPMENT:4*5* pnpm test ./exec-stream.test.ts6*/78import { delay } from "awaiting";9import { ExecuteCodeStreamEvent } from "@cocalc/util/types/execute-code";1011describe("executeStream function - unit tests", () => {12const mockExecuteCode = jest.fn();13const mockAsyncCache = {14get: jest.fn(),15set: jest.fn(),16};1718beforeEach(() => {19// Reset modules and mocks for proper test isolation20jest.resetModules();21jest.clearAllMocks();2223// Re-mock for each test to ensure clean state24jest.doMock("./execute-code", () => ({25executeCode: mockExecuteCode,26asyncCache: mockAsyncCache,27}));2829mockAsyncCache.get.mockClear();30mockAsyncCache.set.mockClear();31});3233it("streams stdout in batches", async () => {34let capturedStreamCB: ((event: ExecuteCodeStreamEvent) => void) | undefined;3536// Mock executeCode to capture the streamCB that executeStream passes to it37mockExecuteCode.mockImplementation(async (options) => {38capturedStreamCB = options.streamCB;39return {40type: "async",41job_id: "test-job-id",42pid: 1234,43status: "running",44start: Date.now(),45};46});4748// Mock asyncCache to return undefined (job not completed)49mockAsyncCache.get.mockReturnValue(undefined);5051const { executeStream } = await import("./exec-stream");52const userCallback = jest.fn(); // This is what the user passes to executeStream5354await executeStream({55project_id: "test-project-id",56command:57"echo 'first'; sleep 0.1; echo 'second'; sleep 0.1; echo 'third'",58bash: true,59stream: userCallback, // User's callback receives processed events60});6162// Verify executeCode was called correctly with streaming enabled63expect(mockExecuteCode).toHaveBeenCalledWith(64expect.objectContaining({65async_call: true,66streamCB: expect.any(Function),67}),68);6970// Simulate what executeCode would do: send streaming events to the captured callback71if (capturedStreamCB) {72capturedStreamCB({ type: "stdout", data: "first\n" });73capturedStreamCB({ type: "stdout", data: "second\n" });74capturedStreamCB({ type: "stdout", data: "third\n" });75capturedStreamCB({76type: "done",77data: {78type: "async",79job_id: "test-job-id",80status: "completed",81stdout: "first\nsecond\nthird\n",82stderr: "",83exit_code: 0,84start: Date.now(),85stats: [],86},87});88}8990// Verify the user's callback received the expected processed events91const calls = userCallback.mock.calls;92expect(calls.length).toBeGreaterThan(0);9394// First call should be initial job info95expect(calls[0][0].type).toBe("job");96expect(calls[0][0].data).toMatchObject({97type: "async",98job_id: "test-job-id",99pid: 1234,100status: "running",101});102103// Then the streaming events104expect(calls[1][0]).toEqual({ type: "stdout", data: "first\n" });105expect(calls[2][0]).toEqual({ type: "stdout", data: "second\n" });106expect(calls[3][0]).toEqual({ type: "stdout", data: "third\n" });107expect(calls[4][0]).toEqual({108type: "done",109data: expect.objectContaining({110stdout: "first\nsecond\nthird\n",111status: "completed",112}),113});114expect(calls[5][0]).toBe(null); // Stream end115});116117it("streams stdout and stderr (real execution alternative)", async () => {118// This test demonstrates the user's suggestion: let executeCode run for real119// instead of mocking and simulating events120121// Temporarily unmock executeCode for this test122jest.unmock("./execute-code");123jest.resetModules();124125// Import fresh executeStream that uses real executeCode126const { executeStream } = await import("./exec-stream");127128const streamEvents: any[] = [];129let streamEnded = false;130131const userCallback = jest.fn((event) => {132if (event) {133streamEvents.push(event);134} else {135streamEnded = true; // null event signals stream end136}137});138139// Run a real command that produces both stdout and stderr output140await executeStream({141project_id: "test-project-id",142command:143"echo 'stdout1'; >&2 echo 'stderr1'; echo 'stdout2'; >&2 echo 'stderr2'",144bash: true,145stream: userCallback,146});147148// Wait for the stream to end (instead of fixed delay)149while (!streamEnded) {150await delay(10); // Small delay to avoid busy waiting151}152153// Verify we got real streaming events154expect(streamEvents.length).toBeGreaterThan(0);155156// Find events by type157const stdoutEvents = streamEvents.filter((e) => e.type === "stdout");158const stderrEvents = streamEvents.filter((e) => e.type === "stderr");159const jobEvent = streamEvents.find((e) => e.type === "job");160const doneEvent = streamEvents.find((e) => e.type === "done");161162// Should have initial job info163expect(jobEvent).toBeDefined();164expect(jobEvent?.data?.job_id).toBeDefined();165expect(jobEvent?.data?.pid).toBeGreaterThan(0);166expect(jobEvent?.data?.status).toBe("running");167168// Should have stdout events from real execution (may be batched)169expect(stdoutEvents.length).toBeGreaterThanOrEqual(1);170expect(stdoutEvents.some((e) => e.data && e.data.includes("stdout1"))).toBe(171true,172);173expect(stdoutEvents.some((e) => e.data && e.data.includes("stdout2"))).toBe(174true,175);176177// Should have stderr events from real execution (may be batched)178expect(stderrEvents.length).toBeGreaterThanOrEqual(1);179expect(stderrEvents.some((e) => e.data && e.data.includes("stderr1"))).toBe(180true,181);182expect(stderrEvents.some((e) => e.data && e.data.includes("stderr2"))).toBe(183true,184);185186// Should have completion event187expect(doneEvent).toBeDefined();188expect(doneEvent?.data?.status).toBe("completed");189expect(doneEvent?.data?.exit_code).toBe(0);190191// Verify event order192const jobIndex = streamEvents.findIndex((e) => e.type === "job");193const doneIndex = streamEvents.findIndex((e) => e.type === "done");194expect(jobIndex).toBe(0);195expect(doneIndex).toBe(streamEvents.length - 1);196197// Re-mock for subsequent tests198jest.doMock("./execute-code", () => ({199executeCode: mockExecuteCode,200asyncCache: mockAsyncCache,201}));202});203204it("streams stderr in batches", async () => {205let capturedStreamCB: ((event: ExecuteCodeStreamEvent) => void) | undefined;206207mockExecuteCode.mockImplementation(async (options) => {208capturedStreamCB = options.streamCB;209return {210type: "async",211job_id: "test-job-id",212pid: 1234,213status: "running",214start: Date.now(),215};216});217218// Mock asyncCache to return undefined (job not completed)219mockAsyncCache.get.mockReturnValue(undefined);220221const { executeStream } = await import("./exec-stream");222const userCallback = jest.fn();223224await executeStream({225project_id: "test-project-id",226command: ">&2 echo 'error1'; sleep 0.1; >&2 echo 'error2'",227bash: true,228stream: userCallback,229});230231// Simulate what executeCode would do: send streaming events to the captured callback232if (capturedStreamCB) {233capturedStreamCB({ type: "stderr", data: "error1\n" });234capturedStreamCB({ type: "stderr", data: "error2\n" });235capturedStreamCB({236type: "done",237data: {238type: "async",239job_id: "test-job-id",240status: "completed",241stdout: "",242stderr: "error1\nerror2\n",243exit_code: 0,244start: Date.now(),245stats: [],246},247});248}249250// Verify the user's callback received the expected processed events251const calls = userCallback.mock.calls;252expect(calls.length).toBeGreaterThan(0);253254// First call should be initial job info255expect(calls[0][0].type).toBe("job");256257// Then the stderr events258expect(calls[1][0]).toEqual({ type: "stderr", data: "error1\n" });259expect(calls[2][0]).toEqual({ type: "stderr", data: "error2\n" });260expect(calls[3][0]).toEqual({261type: "done",262data: expect.objectContaining({263stderr: "error1\nerror2\n",264status: "completed",265}),266});267expect(calls[4][0]).toBe(null); // Stream end268});269270it("streams mixed stdout and stderr with stats", async () => {271let capturedStreamCB: ((event: ExecuteCodeStreamEvent) => void) | undefined;272273mockExecuteCode.mockImplementation(async (options) => {274capturedStreamCB = options.streamCB;275return {276type: "async",277job_id: "test-job-id",278pid: 1234,279status: "running",280start: Date.now(),281};282});283284// Mock asyncCache to return undefined (job not completed)285mockAsyncCache.get.mockReturnValue(undefined);286287const { executeStream } = await import("./exec-stream");288const userCallback = jest.fn();289290await executeStream({291project_id: "test-project-id",292command: "echo 'stdout1'; >&2 echo 'stderr1'; echo 'stdout2'",293bash: true,294stream: userCallback,295});296297// Simulate what executeCode would do: send mixed streaming events to the captured callback298if (capturedStreamCB) {299capturedStreamCB({ type: "stdout", data: "stdout1\n" });300capturedStreamCB({ type: "stderr", data: "stderr1\n" });301capturedStreamCB({302type: "stats",303data: {304timestamp: Date.now(),305cpu_pct: 1.5,306cpu_secs: 0.1,307mem_rss: 1024,308},309});310capturedStreamCB({ type: "stdout", data: "stdout2\n" });311capturedStreamCB({312type: "done",313data: {314type: "async",315job_id: "test-job-id",316status: "completed",317stdout: "stdout1\nstdout2\n",318stderr: "stderr1\n",319exit_code: 0,320start: Date.now(),321stats: [],322},323});324}325326// Verify all events were streamed in order327const calls = userCallback.mock.calls;328// First call should be initial job info329expect(calls[0][0].type).toBe("job");330// Then the streaming events in order331expect(calls[1][0]).toEqual({ type: "stdout", data: "stdout1\n" });332expect(calls[2][0]).toEqual({ type: "stderr", data: "stderr1\n" });333expect(calls[3][0]).toEqual({334type: "stats",335data: expect.objectContaining({336cpu_pct: 1.5,337cpu_secs: 0.1,338mem_rss: 1024,339}),340});341expect(calls[4][0]).toEqual({ type: "stdout", data: "stdout2\n" });342expect(calls[5][0]).toEqual({343type: "done",344data: expect.objectContaining({345stdout: "stdout1\nstdout2\n",346stderr: "stderr1\n",347}),348});349expect(calls[6][0]).toBe(null); // Stream end350});351352it("handles streaming errors", async () => {353let capturedStreamCB: ((event: ExecuteCodeStreamEvent) => void) | undefined;354355mockExecuteCode.mockImplementation(async (options) => {356capturedStreamCB = options.streamCB;357return {358type: "async",359job_id: "test-job-id",360pid: 1234,361status: "running",362start: Date.now(),363};364});365366// Mock asyncCache to return undefined (job not completed)367mockAsyncCache.get.mockReturnValue(undefined);368369const { executeStream } = await import("./exec-stream");370const userCallback = jest.fn();371372await executeStream({373project_id: "test-project-id",374command: "exit 1",375bash: true,376stream: userCallback,377});378379// Simulate what executeCode would do: send error event to the captured callback380if (capturedStreamCB) {381capturedStreamCB({382type: "error",383data: "Command failed with exit code 1",384});385}386387// Verify error event and stream ending388const calls = userCallback.mock.calls;389expect(calls.length).toBeGreaterThan(0);390391// First call should be initial job info392expect(calls[0][0].type).toBe("job");393394// Then the error event395expect(calls[1][0]).toEqual({396error: "Command failed with exit code 1",397});398expect(calls[2][0]).toBe(null); // Stream end399});400401it("handles process spawning errors", async () => {402mockExecuteCode.mockImplementation(async (_options) => {403// Simulate spawning error by throwing404throw new Error("Failed to spawn process");405});406407// Mock asyncCache to return undefined (no job created due to error)408mockAsyncCache.get.mockReturnValue(undefined);409410const { executeStream } = await import("./exec-stream");411const mockStream = jest.fn();412413await executeStream({414project_id: "test-project-id",415command: "nonexistent-command",416stream: mockStream,417});418419// Verify error event and stream ending420const calls = mockStream.mock.calls;421expect(calls.length).toBe(2); // Error event + stream end422423expect(calls[0][0]).toEqual({424error: "Error: Failed to spawn process",425});426expect(calls[1][0]).toBe(null); // Stream end427});428429it("handles jobs that complete immediately", async () => {430mockExecuteCode.mockImplementation(async (_options) => {431return {432type: "async",433job_id: "test-job-id",434pid: 1234,435status: "running",436start: Date.now(),437};438});439440// Mock asyncCache to return a completed job (simulating immediate completion)441const completedJob = {442type: "async",443job_id: "test-job-id",444status: "completed",445stdout: "quick output\n",446stderr: "",447exit_code: 0,448start: Date.now(),449stats: [],450};451mockAsyncCache.get.mockReturnValue(completedJob);452453const { executeStream } = await import("./exec-stream");454const mockStream = jest.fn();455456await executeStream({457project_id: "test-project-id",458command: "echo 'quick output'",459bash: true,460stream: mockStream,461});462463// For immediate completion, the done event should be sent immediately464// without needing to simulate streaming events465466// Verify event order and content467const calls = mockStream.mock.calls;468expect(calls.length).toBeGreaterThan(0);469470// First call should be initial job info471expect(calls[0][0].type).toBe("job");472473// Since job completed immediately, done event should come next474expect(calls[1][0]).toEqual({475type: "done",476data: expect.objectContaining({477status: "completed",478stdout: "quick output\n",479}),480});481expect(calls[2][0]).toBe(null); // Stream end482});483484it("handles error exit codes with streaming", async () => {485let capturedStreamCB: ((event: ExecuteCodeStreamEvent) => void) | undefined;486487mockExecuteCode.mockImplementation(async (options) => {488capturedStreamCB = options.streamCB;489return {490type: "async",491job_id: "test-job-id",492pid: 1234,493status: "running",494start: Date.now(),495};496});497498// Mock asyncCache to return undefined (job not completed)499mockAsyncCache.get.mockReturnValue(undefined);500501const { executeStream } = await import("./exec-stream");502const userCallback = jest.fn();503504await executeStream({505project_id: "test-project-id",506command: "exit 42",507bash: true,508stream: userCallback,509});510511// Simulate what executeCode would do: send error completion to the captured callback512if (capturedStreamCB) {513capturedStreamCB({514type: "done",515data: {516type: "async",517job_id: "test-job-id",518status: "error",519stdout: "",520stderr: "exit 42 failed",521exit_code: 42,522start: Date.now(),523stats: [],524},525});526}527528// Verify error completion event529const calls = userCallback.mock.calls;530expect(calls.length).toBeGreaterThan(0);531532// First call should be initial job info533expect(calls[0][0].type).toBe("job");534535// Then the done event with error status536expect(calls[1][0]).toEqual({537type: "done",538data: expect.objectContaining({539status: "error",540exit_code: 42,541stderr: "exit 42 failed",542}),543});544expect(calls[2][0]).toBe(null); // Stream end545});546547it("handles job creation failure", async () => {548mockExecuteCode.mockResolvedValue({549type: "blocking", // Wrong type - should be async550stdout: "some output",551});552553const { executeStream } = await import("./exec-stream");554const mockStream = jest.fn();555556await executeStream({557project_id: "test-project-id",558command: "echo test",559bash: true,560stream: mockStream,561});562563// Verify error event and stream ending564const calls = mockStream.mock.calls;565expect(calls.length).toBe(2); // Error event + stream end566567expect(calls[0][0]).toEqual({568error: "Failed to create async job for streaming",569});570expect(calls[1][0]).toBe(null); // Stream end571});572573it("handles timeout scenarios", async () => {574// For this test, let's use real executeCode to test actual timeout behavior575jest.unmock("./execute-code");576jest.resetModules();577578// Import fresh executeStream that uses real executeCode579const { executeStream } = await import("./exec-stream");580581const streamEvents: any[] = [];582583const userCallback = jest.fn((event) => {584if (event) {585streamEvents.push(event);586}587});588589// Start a long-running command with a short timeout590await executeStream({591project_id: "test-project-id",592command: "sleep 5", // 5 second command593bash: true,594timeout: 1, // 1 second timeout595stream: userCallback,596});597598// Wait for the command to either complete or timeout599await delay(2500); // Wait longer than the 1-second timeout600601// Verify we got events602expect(streamEvents.length).toBeGreaterThan(0);603604// Verify stream ended (should have null event at the end)605expect(userCallback).toHaveBeenLastCalledWith(null);606607// Find events by type608const jobEvent = streamEvents.find((e) => e.type === "job");609const doneEvent = streamEvents.find((e) => e.type === "done");610const errorEvents = streamEvents.filter((e) => e.error);611612// Should have initial job info613expect(jobEvent).toBeDefined();614expect(jobEvent?.data?.job_id).toBeDefined();615616// Check what actually happened - either timeout occurred or command completed617if (doneEvent) {618// If we got a done event, check if it indicates an error/timeout619if (620doneEvent.data?.status === "error" ||621doneEvent.data?.exit_code !== 0622) {623// Command was terminated (possibly due to timeout)624} else {625// Command completed normally626}627} else if (errorEvents.length > 0) {628// Error events received629} else {630// No completion events received - job may still be running631// Let's check if we can query the job status manually632const { executeCode } = await import("./execute-code");633try {634await executeCode({635async_get: jobEvent?.data?.job_id,636async_await: false,637});638// Manual job status check completed639} catch (err) {640// Manual job status check failed641}642}643644// At minimum, we should have the job event645expect(jobEvent).toBeDefined();646647// The test should verify that timeout option was passed correctly648// Even if the actual timeout doesn't trigger in test environment,649// we can verify the option was set properly650expect(jobEvent?.data?.job_id).toBeDefined();651652// Re-mock for subsequent tests653jest.doMock("./execute-code", () => ({654executeCode: mockExecuteCode,655asyncCache: mockAsyncCache,656}));657});658659it("handles non-existent executable", async () => {660mockExecuteCode.mockImplementation(async (_options) => {661// Simulate the error that occurs when executable doesn't exist662throw new Error("spawn foobar ENOENT");663});664665const { executeStream } = await import("./exec-stream");666const mockStream = jest.fn();667668await executeStream({669project_id: "test-project-id",670command: "foobar",671args: ["baz"],672stream: mockStream,673});674675// Verify error event and stream ending676const calls = mockStream.mock.calls;677expect(calls.length).toBe(2); // Error event + stream end678679expect(calls[0][0]).toEqual({680error: "Error: spawn foobar ENOENT",681});682expect(calls[1][0]).toBe(null); // Stream end683});684});685686// Integration tests using real executeCode687describe("exec-stream integration tests", () => {688beforeAll(() => {689// Unmock executeCode for integration tests690jest.unmock("./execute-code");691jest.resetModules();692});693694it("streams real bash script output in batches with delays", async () => {695// Import fresh executeStream that uses real executeCode696const { executeStream } = await import("./exec-stream");697698const streamEvents: any[] = [];699const mockStream = jest.fn((event) => {700if (event) {701streamEvents.push(event);702}703});704705// Create a bash script that outputs to both stdout and stderr with delays706const bashScript = `707echo "stdout batch 1"708sleep 0.1709>&2 echo "stderr batch 1"710sleep 0.1711echo "stdout batch 2"712sleep 0.1713>&2 echo "stderr batch 2"714sleep 0.1715echo "stdout batch 3"716`;717718await executeStream({719project_id: "test-project-id",720command: bashScript.trim(),721bash: true,722stream: mockStream,723});724725// Wait for the streaming to complete (increase timeout for reliability)726await delay(1500);727728// Verify we got the expected stream events729expect(streamEvents.length).toBeGreaterThan(0);730731// Find events by type732const stdoutEvents = streamEvents.filter((e) => e.type === "stdout");733const stderrEvents = streamEvents.filter((e) => e.type === "stderr");734const jobEvent = streamEvents.find((e) => e.type === "job");735const doneEvent = streamEvents.find((e) => e.type === "done");736737// Should have initial job info as first event738expect(jobEvent).toBeDefined();739expect(jobEvent?.data?.job_id).toBeDefined();740expect(jobEvent?.data?.pid).toBeGreaterThan(0);741expect(jobEvent?.data?.status).toBe("running");742743// Should have multiple stdout batches744expect(stdoutEvents.length).toBeGreaterThanOrEqual(3);745expect(stdoutEvents.some((e) => e.data && e.data.includes("batch 1"))).toBe(746true,747);748expect(stdoutEvents.some((e) => e.data && e.data.includes("batch 2"))).toBe(749true,750);751expect(stdoutEvents.some((e) => e.data && e.data.includes("batch 3"))).toBe(752true,753);754755// Should have stderr batches756expect(stderrEvents.length).toBeGreaterThanOrEqual(2);757expect(stderrEvents.some((e) => e.data && e.data.includes("batch 1"))).toBe(758true,759);760expect(stderrEvents.some((e) => e.data && e.data.includes("batch 2"))).toBe(761true,762);763764// Should have completion event765expect(doneEvent).toBeDefined();766expect(doneEvent?.data?.status).toBe("completed");767expect(doneEvent?.data?.exit_code).toBe(0);768769// Verify event order: job first, then streaming events, then done770const jobIndex = streamEvents.findIndex((e) => e.type === "job");771const doneIndex = streamEvents.findIndex((e) => e.type === "done");772expect(jobIndex).toBe(0); // Job event should be first773expect(doneIndex).toBe(streamEvents.length - 1); // Done event should be last774});775776it("handles process monitoring with stats streaming", async () => {777// Import fresh executeStream that uses real executeCode778const { executeStream } = await import("./exec-stream");779780const streamEvents: any[] = [];781const mockStream = jest.fn((event) => {782if (event) {783streamEvents.push(event);784}785});786787// Run a longer task to get stats788const bashScript = `789echo "Starting CPU intensive task"790python3 -c "791import time792import os793print(f'PID: {os.getpid()}')794t0=time.time()795while t0+2>time.time():796sum([_ for _ in range(10**6)])797print('CPU task completed')798"799echo "Task finished"800`;801802await executeStream({803project_id: "test-project-id",804command: bashScript.trim(),805bash: true,806stream: mockStream,807});808809// Wait for completion (longer timeout for CPU intensive task)810await delay(5000);811812// Verify we got events813expect(streamEvents.length).toBeGreaterThan(0);814815// Find events by type816const jobEvent = streamEvents.find((e) => e.type === "job");817const doneEvent = streamEvents.find((e) => e.type === "done");818const statsEvents = streamEvents.filter((e) => e.type === "stats");819const stdoutEvents = streamEvents.filter((e) => e.type === "stdout");820821// Should have initial job info822expect(jobEvent).toBeDefined();823expect(jobEvent?.data?.job_id).toBeDefined();824expect(jobEvent?.data?.status).toBe("running");825826// Should have stdout events from the script827expect(stdoutEvents.length).toBeGreaterThan(0);828expect(829stdoutEvents.some(830(e) => e.data && e.data.includes("Starting CPU intensive task"),831),832).toBe(true);833expect(834stdoutEvents.some((e) => e.data && e.data.includes("Task finished")),835).toBe(true);836837// Check if we have stats events (may not be generated in all environments)838if (statsEvents.length > 0) {839// Verify stats structure if we have stats840const statsEvent = statsEvents[0];841expect(statsEvent.data).toMatchObject({842timestamp: expect.any(Number),843cpu_pct: expect.any(Number),844cpu_secs: expect.any(Number),845mem_rss: expect.any(Number),846// pid may not be present in all stats formats847});848849// Stats should have reasonable values850expect(statsEvent.data.cpu_pct).toBeGreaterThanOrEqual(0);851expect(statsEvent.data.mem_rss).toBeGreaterThan(0);852} else {853// If no stats events, just log a warning but don't fail854console.warn(855"No stats events generated - this may be normal in test environment",856);857}858859// Should have completion event860expect(doneEvent).toBeDefined();861expect(doneEvent?.data?.status).toBe("completed");862expect(doneEvent?.data?.exit_code).toBe(0);863expect(doneEvent?.data?.stats).toBeDefined();864865// Verify event order: job first, then streaming events, then done866const jobIndex = streamEvents.findIndex((e) => e.type === "job");867const doneIndex = streamEvents.findIndex((e) => e.type === "done");868expect(jobIndex).toBe(0); // Job event should be first869expect(doneIndex).toBe(streamEvents.length - 1); // Done event should be last870}, 15000); // 15 second timeout871872it("handles command errors with proper done events", async () => {873// Import fresh executeStream that uses real executeCode874const { executeStream } = await import("./exec-stream");875876const streamEvents: any[] = [];877const mockStream = jest.fn((event) => {878if (event) {879streamEvents.push(event);880}881});882883// Run a command that will fail884await executeStream({885project_id: "test-project-id",886command: "exit 123",887bash: true,888stream: mockStream,889});890891// Wait for completion892await delay(1000);893894// Verify we got events895expect(streamEvents.length).toBeGreaterThan(0);896897// Find events by type898const jobEvent = streamEvents.find((e) => e.type === "job");899const doneEvent = streamEvents.find((e) => e.type === "done");900901// Should have initial job info902expect(jobEvent).toBeDefined();903expect(jobEvent?.data?.job_id).toBeDefined();904expect(jobEvent?.data?.status).toBe("running");905906// Should have completion event with error status907expect(doneEvent).toBeDefined();908expect(doneEvent?.data?.status).toBe("error");909expect(doneEvent?.data?.exit_code).toBe(123);910911// Verify event order: job first, then done912const jobIndex = streamEvents.findIndex((e) => e.type === "job");913const doneIndex = streamEvents.findIndex((e) => e.type === "done");914expect(jobIndex).toBe(0); // Job event should be first915expect(doneIndex).toBe(streamEvents.length - 1); // Done event should be last916});917918it("handles invalid commands with proper error events", async () => {919// Import fresh executeStream that uses real executeCode920const { executeStream } = await import("./exec-stream");921922const streamEvents: any[] = [];923const mockStream = jest.fn((event) => {924if (event) {925streamEvents.push(event);926}927});928929// Run a command that doesn't exist930await executeStream({931project_id: "test-project-id",932command: "this-command-does-not-exist-12345",933stream: mockStream,934});935936// Wait for completion937await delay(1000);938939// Verify we got events940expect(streamEvents.length).toBeGreaterThan(0);941942// Find events by type943const jobEvent = streamEvents.find((e) => e.type === "job");944const doneEvent = streamEvents.find((e) => e.type === "done");945946// Should have initial job info947expect(jobEvent).toBeDefined();948expect(jobEvent?.data?.job_id).toBeDefined();949expect(jobEvent?.data?.status).toBe("running");950951// Should have completion event with error status952expect(doneEvent).toBeDefined();953expect(doneEvent?.data?.status).toBe("error");954expect(doneEvent?.data?.exit_code).toBeGreaterThan(0); // Non-zero exit code955956// Verify event order: job first, then done957const jobIndex = streamEvents.findIndex((e) => e.type === "job");958const doneIndex = streamEvents.findIndex((e) => e.type === "done");959expect(jobIndex).toBe(0); // Job event should be first960expect(doneIndex).toBe(streamEvents.length - 1); // Done event should be last961});962});963964965