Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/platform/agentHost/test/node/protocol/testHelpers.ts
13405 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { ChildProcess, fork } from 'child_process';
7
import { fileURLToPath } from 'url';
8
import { WebSocket } from 'ws';
9
import { URI } from '../../../../../base/common/uri.js';
10
import { SubscribeResult } from '../../../common/state/protocol/commands.js';
11
import type { ActionEnvelope, SessionAddedNotification } from '../../../common/state/sessionActions.js';
12
import { PROTOCOL_VERSION } from '../../../common/state/sessionCapabilities.js';
13
import {
14
isJsonRpcNotification,
15
isJsonRpcResponse,
16
type AhpNotification,
17
type JsonRpcErrorResponse,
18
type JsonRpcSuccessResponse,
19
type INotificationBroadcastParams,
20
type ProtocolMessage,
21
} from '../../../common/state/sessionProtocol.js';
22
23
// ---- JSON-RPC test client ---------------------------------------------------
24
25
interface IPendingCall {
26
resolve: (result: unknown) => void;
27
reject: (err: Error) => void;
28
}
29
30
export class TestProtocolClient {
31
private readonly _ws: WebSocket;
32
private _nextId = 1;
33
private readonly _pendingCalls = new Map<number, IPendingCall>();
34
private readonly _notifications: AhpNotification[] = [];
35
private readonly _notifWaiters: { predicate: (n: AhpNotification) => boolean; resolve: (n: AhpNotification) => void; reject: (err: Error) => void }[] = [];
36
37
constructor(port: number) {
38
this._ws = new WebSocket(`ws://127.0.0.1:${port}`);
39
}
40
41
async connect(): Promise<void> {
42
return new Promise<void>((resolve, reject) => {
43
this._ws.on('open', () => {
44
this._ws.on('message', (data: Buffer | string) => {
45
const text = typeof data === 'string' ? data : data.toString('utf-8');
46
const msg = JSON.parse(text);
47
this._handleMessage(msg);
48
});
49
resolve();
50
});
51
this._ws.on('error', reject);
52
});
53
}
54
55
private _handleMessage(msg: ProtocolMessage): void {
56
if (isJsonRpcResponse(msg)) {
57
const pending = this._pendingCalls.get(msg.id);
58
if (pending) {
59
this._pendingCalls.delete(msg.id);
60
const errResp = msg as JsonRpcErrorResponse;
61
if (errResp.error) {
62
pending.reject(new Error(errResp.error.message));
63
} else {
64
pending.resolve((msg as JsonRpcSuccessResponse).result);
65
}
66
}
67
} else if (isJsonRpcNotification(msg)) {
68
const notif = msg;
69
for (let i = this._notifWaiters.length - 1; i >= 0; i--) {
70
if (this._notifWaiters[i].predicate(notif)) {
71
const waiter = this._notifWaiters.splice(i, 1)[0];
72
waiter.resolve(notif);
73
}
74
}
75
this._notifications.push(notif);
76
}
77
}
78
79
/** Send a JSON-RPC notification (fire-and-forget). */
80
notify(method: string, params?: unknown): void {
81
this._ws.send(JSON.stringify({ jsonrpc: '2.0', method, params }));
82
}
83
84
/** Send a JSON-RPC request and await the response. */
85
call<T>(method: string, params?: unknown, timeoutMs = 5000): Promise<T> {
86
const id = this._nextId++;
87
this._ws.send(JSON.stringify({ jsonrpc: '2.0', id, method, params }));
88
return new Promise<T>((resolve, reject) => {
89
const timer = setTimeout(() => {
90
this._pendingCalls.delete(id);
91
reject(new Error(`Timeout waiting for response to ${method} (id=${id}, ${timeoutMs}ms)`));
92
}, timeoutMs);
93
94
this._pendingCalls.set(id, {
95
resolve: result => { clearTimeout(timer); resolve(result as T); },
96
reject: err => { clearTimeout(timer); reject(err); },
97
});
98
});
99
}
100
101
/** Wait for a server notification matching a predicate. */
102
waitForNotification(predicate: (n: AhpNotification) => boolean, timeoutMs = 5000): Promise<AhpNotification> {
103
const existing = this._notifications.find(predicate);
104
if (existing) {
105
return Promise.resolve(existing);
106
}
107
108
return new Promise<AhpNotification>((resolve, reject) => {
109
const timer = setTimeout(() => {
110
const idx = this._notifWaiters.findIndex(w => w.resolve === resolve);
111
if (idx >= 0) {
112
this._notifWaiters.splice(idx, 1);
113
}
114
reject(new Error(`Timeout waiting for notification (${timeoutMs}ms)`));
115
}, timeoutMs);
116
117
this._notifWaiters.push({
118
predicate,
119
resolve: n => { clearTimeout(timer); resolve(n); },
120
reject,
121
});
122
});
123
}
124
125
/** Return all received notifications matching a predicate. */
126
receivedNotifications(predicate?: (n: AhpNotification) => boolean): AhpNotification[] {
127
return predicate ? this._notifications.filter(predicate) : [...this._notifications];
128
}
129
130
/** Send a raw string over the WebSocket without JSON serialization. */
131
sendRaw(data: string): void {
132
this._ws.send(data);
133
}
134
135
/** Wait for the next raw message from the server. */
136
waitForRawMessage(timeoutMs = 5000): Promise<unknown> {
137
return new Promise((resolve, reject) => {
138
const timer = setTimeout(() => {
139
cleanup();
140
reject(new Error(`Timeout waiting for raw message (${timeoutMs}ms)`));
141
}, timeoutMs);
142
const onMsg = (data: Buffer | string) => {
143
cleanup();
144
const text = typeof data === 'string' ? data : data.toString('utf-8');
145
resolve(JSON.parse(text));
146
};
147
const cleanup = () => {
148
clearTimeout(timer);
149
this._ws.removeListener('message', onMsg);
150
};
151
this._ws.on('message', onMsg);
152
});
153
}
154
155
close(): void {
156
for (const w of this._notifWaiters) {
157
w.reject(new Error('Client closed'));
158
}
159
this._notifWaiters.length = 0;
160
for (const [, p] of this._pendingCalls) {
161
p.reject(new Error('Client closed'));
162
}
163
this._pendingCalls.clear();
164
this._ws.close();
165
}
166
167
clearReceived(): void {
168
this._notifications.length = 0;
169
}
170
}
171
172
// ---- Server process lifecycle -----------------------------------------------
173
174
export interface IServerHandle {
175
process: ChildProcess;
176
port: number;
177
}
178
179
export async function startServer(options?: { readonly quiet?: boolean; readonly userDataDir?: string; readonly env?: NodeJS.ProcessEnv }): Promise<IServerHandle> {
180
return new Promise((resolve, reject) => {
181
const serverPath = fileURLToPath(new URL('../../../node/agentHostServerMain.js', import.meta.url));
182
const args = ['--enable-mock-agent', '--port', '0', '--without-connection-token'];
183
if (options?.quiet ?? true) {
184
args.push('--quiet');
185
}
186
if (options?.userDataDir) {
187
args.push('--user-data-dir', options.userDataDir);
188
}
189
const child = fork(serverPath, args, {
190
stdio: ['pipe', 'pipe', 'pipe', 'ipc'],
191
env: options?.env ? { ...process.env, ...options.env } : process.env,
192
});
193
194
const timer = setTimeout(() => {
195
child.kill();
196
reject(new Error('Server startup timed out'));
197
}, 10_000);
198
199
child.stdout!.on('data', (data: Buffer) => {
200
const text = data.toString();
201
const match = text.match(/READY:(\d+)/);
202
if (match) {
203
clearTimeout(timer);
204
resolve({ process: child, port: parseInt(match[1], 10) });
205
}
206
});
207
208
child.stderr!.on('data', () => {
209
// Intentionally swallowed - the test runner fails if console.error is used.
210
});
211
212
child.on('error', err => {
213
clearTimeout(timer);
214
reject(err);
215
});
216
217
child.on('exit', code => {
218
clearTimeout(timer);
219
reject(new Error(`Server exited prematurely with code ${code}`));
220
});
221
});
222
}
223
224
/**
225
* Start the agent host server with the real Copilot SDK agent (no mock agent).
226
* The server is started with logging enabled so the CopilotAgent is registered.
227
*/
228
export async function startRealServer(): Promise<IServerHandle> {
229
return new Promise((resolve, reject) => {
230
const serverPath = fileURLToPath(new URL('../../../node/agentHostServerMain.js', import.meta.url));
231
const args = ['--port', '0', '--without-connection-token'];
232
const child = fork(serverPath, args, {
233
stdio: ['pipe', 'pipe', 'pipe', 'ipc'],
234
});
235
236
const timer = setTimeout(() => {
237
child.kill();
238
reject(new Error('Real server startup timed out'));
239
}, 30_000);
240
241
child.stdout!.on('data', (data: Buffer) => {
242
const text = data.toString();
243
const match = text.match(/READY:(\d+)/);
244
if (match) {
245
clearTimeout(timer);
246
resolve({ process: child, port: parseInt(match[1], 10) });
247
}
248
});
249
250
child.stderr!.on('data', () => {
251
// Intentionally swallowed - the test runner fails if console.error is used.
252
});
253
254
child.on('error', err => {
255
clearTimeout(timer);
256
reject(err);
257
});
258
259
child.on('exit', code => {
260
clearTimeout(timer);
261
reject(new Error(`Real server exited prematurely with code ${code}`));
262
});
263
});
264
}
265
266
// ---- Helpers ----------------------------------------------------------------
267
268
let sessionCounter = 0;
269
270
export function nextSessionUri(): string {
271
return URI.from({ scheme: 'mock', path: `/test-session-${++sessionCounter}` }).toString();
272
}
273
274
export function isActionNotification(n: AhpNotification, actionType: string): boolean {
275
if (n.method !== 'action') {
276
return false;
277
}
278
const envelope = n.params as unknown as ActionEnvelope;
279
return envelope.action.type === actionType;
280
}
281
282
export function getActionEnvelope(n: AhpNotification): ActionEnvelope {
283
return n.params as unknown as ActionEnvelope;
284
}
285
286
/** Perform handshake, create a session, subscribe, and return its URI. */
287
export async function createAndSubscribeSession(c: TestProtocolClient, clientId: string, workingDirectory?: string): Promise<string> {
288
await c.call('initialize', { protocolVersion: PROTOCOL_VERSION, clientId });
289
290
await c.call('createSession', { session: nextSessionUri(), provider: 'mock', workingDirectory });
291
292
const notif = await c.waitForNotification(n =>
293
n.method === 'notification' && (n.params as INotificationBroadcastParams).notification.type === 'notify/sessionAdded'
294
);
295
const realSessionUri = ((notif.params as INotificationBroadcastParams).notification as SessionAddedNotification).summary.resource;
296
297
await c.call<SubscribeResult>('subscribe', { resource: realSessionUri });
298
c.clearReceived();
299
300
return realSessionUri;
301
}
302
303
export function dispatchTurnStarted(c: TestProtocolClient, session: string, turnId: string, text: string, clientSeq: number): void {
304
c.notify('dispatchAction', {
305
clientSeq,
306
action: {
307
type: 'session/turnStarted',
308
session,
309
turnId,
310
userMessage: { text },
311
},
312
});
313
}
314
315