Path: blob/main/src/vs/platform/agentHost/test/node/protocol/testHelpers.ts
13405 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { ChildProcess, fork } from 'child_process';6import { fileURLToPath } from 'url';7import { WebSocket } from 'ws';8import { URI } from '../../../../../base/common/uri.js';9import { SubscribeResult } from '../../../common/state/protocol/commands.js';10import type { ActionEnvelope, SessionAddedNotification } from '../../../common/state/sessionActions.js';11import { PROTOCOL_VERSION } from '../../../common/state/sessionCapabilities.js';12import {13isJsonRpcNotification,14isJsonRpcResponse,15type AhpNotification,16type JsonRpcErrorResponse,17type JsonRpcSuccessResponse,18type INotificationBroadcastParams,19type ProtocolMessage,20} from '../../../common/state/sessionProtocol.js';2122// ---- JSON-RPC test client ---------------------------------------------------2324interface IPendingCall {25resolve: (result: unknown) => void;26reject: (err: Error) => void;27}2829export class TestProtocolClient {30private readonly _ws: WebSocket;31private _nextId = 1;32private readonly _pendingCalls = new Map<number, IPendingCall>();33private readonly _notifications: AhpNotification[] = [];34private readonly _notifWaiters: { predicate: (n: AhpNotification) => boolean; resolve: (n: AhpNotification) => void; reject: (err: Error) => void }[] = [];3536constructor(port: number) {37this._ws = new WebSocket(`ws://127.0.0.1:${port}`);38}3940async connect(): Promise<void> {41return new Promise<void>((resolve, reject) => {42this._ws.on('open', () => {43this._ws.on('message', (data: Buffer | string) => {44const text = typeof data === 'string' ? data : data.toString('utf-8');45const msg = JSON.parse(text);46this._handleMessage(msg);47});48resolve();49});50this._ws.on('error', reject);51});52}5354private _handleMessage(msg: ProtocolMessage): void {55if (isJsonRpcResponse(msg)) {56const pending = this._pendingCalls.get(msg.id);57if (pending) {58this._pendingCalls.delete(msg.id);59const errResp = msg as JsonRpcErrorResponse;60if (errResp.error) {61pending.reject(new Error(errResp.error.message));62} else {63pending.resolve((msg as JsonRpcSuccessResponse).result);64}65}66} else if (isJsonRpcNotification(msg)) {67const notif = msg;68for (let i = this._notifWaiters.length - 1; i >= 0; i--) {69if (this._notifWaiters[i].predicate(notif)) {70const waiter = this._notifWaiters.splice(i, 1)[0];71waiter.resolve(notif);72}73}74this._notifications.push(notif);75}76}7778/** Send a JSON-RPC notification (fire-and-forget). */79notify(method: string, params?: unknown): void {80this._ws.send(JSON.stringify({ jsonrpc: '2.0', method, params }));81}8283/** Send a JSON-RPC request and await the response. */84call<T>(method: string, params?: unknown, timeoutMs = 5000): Promise<T> {85const id = this._nextId++;86this._ws.send(JSON.stringify({ jsonrpc: '2.0', id, method, params }));87return new Promise<T>((resolve, reject) => {88const timer = setTimeout(() => {89this._pendingCalls.delete(id);90reject(new Error(`Timeout waiting for response to ${method} (id=${id}, ${timeoutMs}ms)`));91}, timeoutMs);9293this._pendingCalls.set(id, {94resolve: result => { clearTimeout(timer); resolve(result as T); },95reject: err => { clearTimeout(timer); reject(err); },96});97});98}99100/** Wait for a server notification matching a predicate. */101waitForNotification(predicate: (n: AhpNotification) => boolean, timeoutMs = 5000): Promise<AhpNotification> {102const existing = this._notifications.find(predicate);103if (existing) {104return Promise.resolve(existing);105}106107return new Promise<AhpNotification>((resolve, reject) => {108const timer = setTimeout(() => {109const idx = this._notifWaiters.findIndex(w => w.resolve === resolve);110if (idx >= 0) {111this._notifWaiters.splice(idx, 1);112}113reject(new Error(`Timeout waiting for notification (${timeoutMs}ms)`));114}, timeoutMs);115116this._notifWaiters.push({117predicate,118resolve: n => { clearTimeout(timer); resolve(n); },119reject,120});121});122}123124/** Return all received notifications matching a predicate. */125receivedNotifications(predicate?: (n: AhpNotification) => boolean): AhpNotification[] {126return predicate ? this._notifications.filter(predicate) : [...this._notifications];127}128129/** Send a raw string over the WebSocket without JSON serialization. */130sendRaw(data: string): void {131this._ws.send(data);132}133134/** Wait for the next raw message from the server. */135waitForRawMessage(timeoutMs = 5000): Promise<unknown> {136return new Promise((resolve, reject) => {137const timer = setTimeout(() => {138cleanup();139reject(new Error(`Timeout waiting for raw message (${timeoutMs}ms)`));140}, timeoutMs);141const onMsg = (data: Buffer | string) => {142cleanup();143const text = typeof data === 'string' ? data : data.toString('utf-8');144resolve(JSON.parse(text));145};146const cleanup = () => {147clearTimeout(timer);148this._ws.removeListener('message', onMsg);149};150this._ws.on('message', onMsg);151});152}153154close(): void {155for (const w of this._notifWaiters) {156w.reject(new Error('Client closed'));157}158this._notifWaiters.length = 0;159for (const [, p] of this._pendingCalls) {160p.reject(new Error('Client closed'));161}162this._pendingCalls.clear();163this._ws.close();164}165166clearReceived(): void {167this._notifications.length = 0;168}169}170171// ---- Server process lifecycle -----------------------------------------------172173export interface IServerHandle {174process: ChildProcess;175port: number;176}177178export async function startServer(options?: { readonly quiet?: boolean; readonly userDataDir?: string; readonly env?: NodeJS.ProcessEnv }): Promise<IServerHandle> {179return new Promise((resolve, reject) => {180const serverPath = fileURLToPath(new URL('../../../node/agentHostServerMain.js', import.meta.url));181const args = ['--enable-mock-agent', '--port', '0', '--without-connection-token'];182if (options?.quiet ?? true) {183args.push('--quiet');184}185if (options?.userDataDir) {186args.push('--user-data-dir', options.userDataDir);187}188const child = fork(serverPath, args, {189stdio: ['pipe', 'pipe', 'pipe', 'ipc'],190env: options?.env ? { ...process.env, ...options.env } : process.env,191});192193const timer = setTimeout(() => {194child.kill();195reject(new Error('Server startup timed out'));196}, 10_000);197198child.stdout!.on('data', (data: Buffer) => {199const text = data.toString();200const match = text.match(/READY:(\d+)/);201if (match) {202clearTimeout(timer);203resolve({ process: child, port: parseInt(match[1], 10) });204}205});206207child.stderr!.on('data', () => {208// Intentionally swallowed - the test runner fails if console.error is used.209});210211child.on('error', err => {212clearTimeout(timer);213reject(err);214});215216child.on('exit', code => {217clearTimeout(timer);218reject(new Error(`Server exited prematurely with code ${code}`));219});220});221}222223/**224* Start the agent host server with the real Copilot SDK agent (no mock agent).225* The server is started with logging enabled so the CopilotAgent is registered.226*/227export async function startRealServer(): Promise<IServerHandle> {228return new Promise((resolve, reject) => {229const serverPath = fileURLToPath(new URL('../../../node/agentHostServerMain.js', import.meta.url));230const args = ['--port', '0', '--without-connection-token'];231const child = fork(serverPath, args, {232stdio: ['pipe', 'pipe', 'pipe', 'ipc'],233});234235const timer = setTimeout(() => {236child.kill();237reject(new Error('Real server startup timed out'));238}, 30_000);239240child.stdout!.on('data', (data: Buffer) => {241const text = data.toString();242const match = text.match(/READY:(\d+)/);243if (match) {244clearTimeout(timer);245resolve({ process: child, port: parseInt(match[1], 10) });246}247});248249child.stderr!.on('data', () => {250// Intentionally swallowed - the test runner fails if console.error is used.251});252253child.on('error', err => {254clearTimeout(timer);255reject(err);256});257258child.on('exit', code => {259clearTimeout(timer);260reject(new Error(`Real server exited prematurely with code ${code}`));261});262});263}264265// ---- Helpers ----------------------------------------------------------------266267let sessionCounter = 0;268269export function nextSessionUri(): string {270return URI.from({ scheme: 'mock', path: `/test-session-${++sessionCounter}` }).toString();271}272273export function isActionNotification(n: AhpNotification, actionType: string): boolean {274if (n.method !== 'action') {275return false;276}277const envelope = n.params as unknown as ActionEnvelope;278return envelope.action.type === actionType;279}280281export function getActionEnvelope(n: AhpNotification): ActionEnvelope {282return n.params as unknown as ActionEnvelope;283}284285/** Perform handshake, create a session, subscribe, and return its URI. */286export async function createAndSubscribeSession(c: TestProtocolClient, clientId: string, workingDirectory?: string): Promise<string> {287await c.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId });288289await c.call('createSession', { session: nextSessionUri(), provider: 'mock', workingDirectory });290291const notif = await c.waitForNotification(n =>292n.method === 'notification' && (n.params as INotificationBroadcastParams).notification.type === 'notify/sessionAdded'293);294const realSessionUri = ((notif.params as INotificationBroadcastParams).notification as SessionAddedNotification).summary.resource;295296await c.call<SubscribeResult>('subscribe', { resource: realSessionUri });297c.clearReceived();298299return realSessionUri;300}301302export function dispatchTurnStarted(c: TestProtocolClient, session: string, turnId: string, text: string, clientSeq: number): void {303c.notify('dispatchAction', {304clientSeq,305action: {306type: 'session/turnStarted',307session,308turnId,309userMessage: { text },310},311});312}313314315