Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/platform/agentHost/node/protocolServerHandler.ts
13394 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { Emitter } from '../../../base/common/event.js';
7
import { isJsonRpcResponse } from '../../../base/common/jsonRpcProtocol.js';
8
import { Disposable, DisposableStore } from '../../../base/common/lifecycle.js';
9
import { hasKey } from '../../../base/common/types.js';
10
import { URI } from '../../../base/common/uri.js';
11
import { ILogService } from '../../log/common/log.js';
12
import { AHPFileSystemProvider } from '../common/agentHostFileSystemProvider.js';
13
import { AgentSession, type IAgentService } from '../common/agentService.js';
14
import type { CommandMap } from '../common/state/protocol/messages.js';
15
import { ActionEnvelope, ActionType, INotification, isSessionAction, isTerminalAction, type SessionAction, type TerminalAction, type IRootConfigChangedAction } from '../common/state/sessionActions.js';
16
import { MIN_PROTOCOL_VERSION, PROTOCOL_VERSION } from '../common/state/sessionCapabilities.js';
17
import {
18
AHP_AUTH_REQUIRED,
19
AHP_PROVIDER_NOT_FOUND,
20
AHP_SESSION_NOT_FOUND,
21
AHP_UNSUPPORTED_PROTOCOL_VERSION,
22
JsonRpcRequest,
23
isJsonRpcNotification,
24
isJsonRpcRequest,
25
JSON_RPC_INTERNAL_ERROR,
26
JsonRpcErrorCodes,
27
ProtocolError,
28
type AhpServerNotification,
29
type InitializeParams,
30
type JsonRpcResponse,
31
type ReconnectParams,
32
type IStateSnapshot,
33
} from '../common/state/sessionProtocol.js';
34
import { ResponsePartKind, ROOT_STATE_URI, SessionStatus, ToolCallConfirmationReason, ToolCallStatus, ToolResultContentType, type SessionState } from '../common/state/sessionState.js';
35
import type { IProtocolServer, IProtocolTransport } from '../common/state/sessionTransport.js';
36
import { AgentHostStateManager } from './agentHostStateManager.js';
37
38
/** Default capacity of the server-side action replay buffer. */
39
const REPLAY_BUFFER_CAPACITY = 1000;
40
41
const CLIENT_TOOL_CALL_DISCONNECT_TIMEOUT = 30_000;
42
43
/** Build a JSON-RPC success response suitable for transport.send(). */
44
function jsonRpcSuccess(id: number, result: unknown): JsonRpcResponse {
45
return { jsonrpc: '2.0', id, result };
46
}
47
48
/** Build a JSON-RPC error response suitable for transport.send(). */
49
function jsonRpcError(id: number, code: number, message: string, data?: unknown): JsonRpcResponse {
50
return { jsonrpc: '2.0', id, error: { code, message, ...(data !== undefined ? { data } : {}) } };
51
}
52
53
/** Build a JSON-RPC error response from an unknown thrown value, preserving {@link ProtocolError} fields. */
54
function jsonRpcErrorFrom(id: number, err: unknown): JsonRpcResponse {
55
if (err instanceof ProtocolError) {
56
return jsonRpcError(id, err.code, err.message, err.data);
57
}
58
const message = err instanceof Error ? (err.stack ?? err.message) : String(err);
59
return jsonRpcError(id, JSON_RPC_INTERNAL_ERROR, message);
60
}
61
62
/**
63
* Methods handled by the request dispatcher. Excludes `initialize` and
64
* `reconnect` which are handled during the handshake phase.
65
*/
66
type RequestMethod = Exclude<keyof CommandMap, 'initialize' | 'reconnect'>;
67
68
/**
69
* Typed handler map: each key is a request method, each value is a handler
70
* that receives the correctly-typed params and must return the correctly-typed
71
* result. The compiler will error if a handler returns the wrong shape.
72
*/
73
type RequestHandlerMap = {
74
[M in RequestMethod]: (client: IConnectedClient, params: CommandMap[M]['params']) => Promise<CommandMap[M]['result']>;
75
};
76
77
/**
78
* Represents a connected protocol client with its subscription state.
79
*/
80
interface IConnectedClient {
81
readonly clientId: string;
82
readonly protocolVersion: number;
83
readonly transport: IProtocolTransport;
84
readonly subscriptions: Set<string>;
85
readonly disposables: DisposableStore;
86
}
87
88
/**
89
* Configuration for protocol-level concerns outside of IAgentService.
90
*/
91
export interface IProtocolServerConfig {
92
/** Default directory returned to clients during the initialize handshake. */
93
readonly defaultDirectory?: string;
94
}
95
96
/**
97
* Server-side handler that manages protocol connections, routes JSON-RPC
98
* messages to the agent service, and broadcasts actions/notifications
99
* to subscribed clients.
100
*/
101
export class ProtocolServerHandler extends Disposable {
102
103
private readonly _clients = new Map<string, IConnectedClient>();
104
private readonly _replayBuffer: ActionEnvelope[] = [];
105
private readonly _clientToolCallDisconnectTimeouts = new Map<string, ReturnType<typeof setTimeout>>();
106
107
private readonly _onDidChangeConnectionCount = this._register(new Emitter<number>());
108
109
/** Fires with the current client count whenever a client connects or disconnects. */
110
readonly onDidChangeConnectionCount = this._onDidChangeConnectionCount.event;
111
112
constructor(
113
private readonly _agentService: IAgentService,
114
private readonly _stateManager: AgentHostStateManager,
115
private readonly _server: IProtocolServer,
116
private readonly _config: IProtocolServerConfig,
117
private readonly _clientFileSystemProvider: AHPFileSystemProvider,
118
@ILogService private readonly _logService: ILogService,
119
) {
120
super();
121
122
this._register(this._server.onConnection(transport => {
123
this._handleNewConnection(transport);
124
}));
125
126
this._register(this._stateManager.onDidEmitEnvelope(envelope => {
127
this._replayBuffer.push(envelope);
128
if (this._replayBuffer.length > REPLAY_BUFFER_CAPACITY) {
129
this._replayBuffer.shift();
130
}
131
this._broadcastAction(envelope);
132
}));
133
134
this._register(this._stateManager.onDidEmitNotification(notification => {
135
this._broadcastNotification(notification);
136
}));
137
}
138
139
// ---- Connection handling -------------------------------------------------
140
141
private _handleNewConnection(transport: IProtocolTransport): void {
142
const disposables = new DisposableStore();
143
let client: IConnectedClient | undefined;
144
145
disposables.add(transport.onMessage(msg => {
146
if (isJsonRpcRequest(msg)) {
147
this._logService.trace(`[ProtocolServer] request: method=${msg.method} id=${msg.id}`);
148
149
// Handle initialize/reconnect as requests that set up the client
150
if (!client && msg.method === 'initialize') {
151
try {
152
const result = this._handleInitialize(msg.params, transport, disposables);
153
client = result.client;
154
transport.send(jsonRpcSuccess(msg.id, result.response));
155
} catch (err) {
156
transport.send(jsonRpcErrorFrom(msg.id, err));
157
}
158
return;
159
}
160
if (!client && msg.method === 'reconnect') {
161
try {
162
const result = this._handleReconnect(msg.params, transport, disposables);
163
client = result.client;
164
transport.send(jsonRpcSuccess(msg.id, result.response));
165
} catch (err) {
166
transport.send(jsonRpcErrorFrom(msg.id, err));
167
}
168
return;
169
}
170
171
if (!client) {
172
return;
173
}
174
this._handleRequest(client, msg.method, msg.params, msg.id);
175
} else if (isJsonRpcNotification(msg)) {
176
this._logService.trace(`[ProtocolServer] notification: method=${msg.method}`);
177
// Notification — fire-and-forget
178
switch (msg.method) {
179
case 'unsubscribe':
180
if (client) {
181
client.subscriptions.delete(msg.params.resource);
182
}
183
break;
184
case 'dispatchAction':
185
if (client) {
186
this._logService.trace(`[ProtocolServer] dispatchAction: ${JSON.stringify(msg.params.action.type)}`);
187
const action = msg.params.action as SessionAction | TerminalAction | IRootConfigChangedAction;
188
if (isSessionAction(action) || isTerminalAction(action) || action.type === ActionType.RootConfigChanged) {
189
this._agentService.dispatchAction(action, client.clientId, msg.params.clientSeq);
190
}
191
}
192
break;
193
}
194
} else if (isJsonRpcResponse(msg)) {
195
const pending = this._pendingReverseRequests.get(msg.id);
196
if (pending) {
197
this._pendingReverseRequests.delete(msg.id);
198
if (hasKey(msg, { error: true })) {
199
pending.reject(new Error(msg.error?.message ?? 'Reverse RPC error'));
200
} else {
201
pending.resolve(msg.result);
202
}
203
}
204
}
205
}));
206
207
disposables.add(transport.onClose(() => {
208
if (client && this._clients.get(client.clientId) === client) {
209
this._logService.info(`[ProtocolServer] Client disconnected: ${client.clientId}, subscriptions=${client.subscriptions.size}`);
210
this._clients.delete(client.clientId);
211
this._rejectPendingReverseRequests(client.clientId);
212
this._handleClientDisconnected(client.clientId);
213
this._onDidChangeConnectionCount.fire(this._clients.size);
214
}
215
disposables.dispose();
216
}));
217
218
disposables.add(transport);
219
}
220
221
// ---- Handshake handlers ----------------------------------------------------
222
223
private _handleInitialize(
224
params: InitializeParams,
225
transport: IProtocolTransport,
226
disposables: DisposableStore,
227
): { client: IConnectedClient; response: unknown } {
228
this._logService.info(`[ProtocolServer] Initialize: clientId=${params.clientId}, version=${params.protocolVersion}`);
229
230
if (params.protocolVersion < MIN_PROTOCOL_VERSION) {
231
throw new ProtocolError(
232
AHP_UNSUPPORTED_PROTOCOL_VERSION,
233
`Client protocol version ${params.protocolVersion} is below minimum ${MIN_PROTOCOL_VERSION}`,
234
);
235
}
236
237
const client: IConnectedClient = {
238
clientId: params.clientId,
239
protocolVersion: params.protocolVersion,
240
transport,
241
subscriptions: new Set(),
242
disposables,
243
};
244
this._clients.set(params.clientId, client);
245
this._onDidChangeConnectionCount.fire(this._clients.size);
246
247
disposables.add(this._clientFileSystemProvider.registerAuthority(params.clientId, {
248
resourceList: (uri) => this._sendReverseRequest(params.clientId, 'resourceList', { uri: uri.toString() }),
249
resourceRead: (uri) => this._sendReverseRequest(params.clientId, 'resourceRead', { uri: uri.toString() }),
250
resourceWrite: (params_) => this._sendReverseRequest(params.clientId, 'resourceWrite', params_),
251
resourceDelete: (params_) => this._sendReverseRequest(params.clientId, 'resourceDelete', params_),
252
resourceMove: (params_) => this._sendReverseRequest(params.clientId, 'resourceMove', params_),
253
}));
254
255
256
const snapshots: IStateSnapshot[] = [];
257
if (params.initialSubscriptions) {
258
for (const uri of params.initialSubscriptions) {
259
const snapshot = this._stateManager.getSnapshot(uri);
260
if (snapshot) {
261
snapshots.push(snapshot);
262
client.subscriptions.add(uri.toString());
263
this._clearClientToolCallDisconnectTimeout(params.clientId, uri.toString());
264
}
265
}
266
}
267
268
return {
269
client,
270
response: {
271
protocolVersion: PROTOCOL_VERSION,
272
serverSeq: this._stateManager.serverSeq,
273
snapshots,
274
defaultDirectory: this._config.defaultDirectory,
275
},
276
};
277
}
278
279
private _handleReconnect(
280
params: ReconnectParams,
281
transport: IProtocolTransport,
282
disposables: DisposableStore,
283
): { client: IConnectedClient; response: unknown } {
284
this._logService.info(`[ProtocolServer] Reconnect: clientId=${params.clientId}, lastSeenSeq=${params.lastSeenServerSeq}`);
285
286
const client: IConnectedClient = {
287
clientId: params.clientId,
288
protocolVersion: PROTOCOL_VERSION,
289
transport,
290
subscriptions: new Set(),
291
disposables,
292
};
293
this._clients.set(params.clientId, client);
294
this._onDidChangeConnectionCount.fire(this._clients.size);
295
296
const oldestBuffered = this._replayBuffer.length > 0 ? this._replayBuffer[0].serverSeq : this._stateManager.serverSeq;
297
const canReplay = params.lastSeenServerSeq >= oldestBuffered;
298
299
if (canReplay) {
300
const actions: ActionEnvelope[] = [];
301
for (const sub of params.subscriptions) {
302
client.subscriptions.add(sub.toString());
303
this._clearClientToolCallDisconnectTimeout(params.clientId, sub.toString());
304
}
305
for (const envelope of this._replayBuffer) {
306
if (envelope.serverSeq > params.lastSeenServerSeq) {
307
if (this._isRelevantToClient(client, envelope)) {
308
actions.push(envelope);
309
}
310
}
311
}
312
return { client, response: { type: 'replay', actions } };
313
} else {
314
const snapshots: IStateSnapshot[] = [];
315
for (const sub of params.subscriptions) {
316
const snapshot = this._stateManager.getSnapshot(sub);
317
if (snapshot) {
318
snapshots.push(snapshot);
319
client.subscriptions.add(sub);
320
this._clearClientToolCallDisconnectTimeout(params.clientId, sub);
321
}
322
}
323
return { client, response: { type: 'snapshot', snapshots } };
324
}
325
}
326
327
private _handleClientDisconnected(clientId: string): void {
328
for (const session of this._stateManager.getSessionUris()) {
329
const state = this._stateManager.getSessionState(session);
330
const ownsPendingToolCall = state ? this._hasPendingClientToolCall(state, clientId) : false;
331
if (state?.activeClient?.clientId === clientId) {
332
this._stateManager.dispatchServerAction({
333
type: ActionType.SessionActiveClientChanged,
334
session,
335
activeClient: null,
336
});
337
}
338
if (state?.activeClient?.clientId === clientId || ownsPendingToolCall) {
339
this._startClientToolCallDisconnectTimeout(clientId, session);
340
}
341
}
342
}
343
344
private _hasPendingClientToolCall(state: ReturnType<AgentHostStateManager['getSessionState']>, clientId: string): boolean {
345
const activeTurn = state?.activeTurn;
346
if (!activeTurn) {
347
return false;
348
}
349
return activeTurn.responseParts.some(part => part.kind === ResponsePartKind.ToolCall
350
&& part.toolCall.toolClientId === clientId
351
&& (part.toolCall.status === ToolCallStatus.Streaming || part.toolCall.status === ToolCallStatus.Running || part.toolCall.status === ToolCallStatus.PendingConfirmation));
352
}
353
354
private _hasReplacementActiveClientTool(state: SessionState, clientId: string, toolName: string): boolean {
355
const activeClient = state.activeClient;
356
return activeClient !== undefined
357
&& activeClient.clientId !== clientId
358
&& activeClient.tools.some(tool => tool.name === toolName);
359
}
360
361
private _startClientToolCallDisconnectTimeout(clientId: string, session: string): void {
362
this._clearClientToolCallDisconnectTimeout(clientId, session);
363
const key = this._clientToolCallDisconnectTimeoutKey(clientId, session);
364
this._clientToolCallDisconnectTimeouts.set(key, setTimeout(() => {
365
this._clientToolCallDisconnectTimeouts.delete(key);
366
this._completeDisconnectedClientToolCalls(clientId, session);
367
}, CLIENT_TOOL_CALL_DISCONNECT_TIMEOUT));
368
}
369
370
private _clearClientToolCallDisconnectTimeout(clientId: string, session: string): void {
371
const key = this._clientToolCallDisconnectTimeoutKey(clientId, session);
372
const timeout = this._clientToolCallDisconnectTimeouts.get(key);
373
if (timeout) {
374
clearTimeout(timeout);
375
this._clientToolCallDisconnectTimeouts.delete(key);
376
}
377
}
378
379
private _clientToolCallDisconnectTimeoutKey(clientId: string, session: string): string {
380
return `${clientId}\n${session}`;
381
}
382
383
private _completeDisconnectedClientToolCalls(clientId: string, session: string): void {
384
const state = this._stateManager.getSessionState(session);
385
const activeTurn = state?.activeTurn;
386
if (!activeTurn) {
387
return;
388
}
389
for (const part of activeTurn.responseParts) {
390
if (part.kind !== ResponsePartKind.ToolCall) {
391
continue;
392
}
393
const toolCall = part.toolCall;
394
if (toolCall.toolClientId === clientId && (toolCall.status === ToolCallStatus.Streaming || toolCall.status === ToolCallStatus.Running || toolCall.status === ToolCallStatus.PendingConfirmation)) {
395
const mayRetryWithReplacementClient = this._hasReplacementActiveClientTool(state, clientId, toolCall.toolName);
396
if (toolCall.status === ToolCallStatus.Streaming) {
397
this._stateManager.dispatchServerAction({
398
type: ActionType.SessionToolCallReady,
399
session,
400
turnId: activeTurn.id,
401
toolCallId: toolCall.toolCallId,
402
invocationMessage: toolCall.invocationMessage ?? toolCall.displayName,
403
confirmed: ToolCallConfirmationReason.NotNeeded,
404
});
405
}
406
this._stateManager.dispatchServerAction({
407
type: ActionType.SessionToolCallComplete,
408
session,
409
turnId: activeTurn.id,
410
toolCallId: toolCall.toolCallId,
411
result: {
412
success: false,
413
pastTenseMessage: `${toolCall.displayName} failed`,
414
...(mayRetryWithReplacementClient ? { content: [{ type: ToolResultContentType.Text, text: `The client that was running ${toolCall.displayName} disconnected, but another active client now provides ${toolCall.displayName}. You may try calling the tool again.` }] } : {}),
415
error: { message: `Client ${clientId} disconnected before completing ${toolCall.displayName}` },
416
},
417
});
418
}
419
}
420
}
421
422
// ---- Requests (expect a response) ---------------------------------------
423
424
/**
425
* Methods handled by the request dispatcher (excludes initialize/reconnect
426
* which are handled during the handshake phase).
427
*/
428
private readonly _requestHandlers: RequestHandlerMap = {
429
subscribe: async (client, params) => {
430
try {
431
const snapshot = await this._agentService.subscribe(URI.parse(params.resource));
432
client.subscriptions.add(params.resource);
433
this._clearClientToolCallDisconnectTimeout(client.clientId, params.resource);
434
return { snapshot };
435
} catch (err) {
436
if (err instanceof ProtocolError) {
437
throw err;
438
}
439
throw new ProtocolError(AHP_SESSION_NOT_FOUND, `Resource not found: ${params.resource}`);
440
}
441
},
442
createSession: async (_client, params) => {
443
let createdSession: URI;
444
// Resolve fork turnId to a 0-based index using the source session's
445
// turn list in the state manager.
446
let fork: { session: URI; turnIndex: number; turnId: string } | undefined;
447
if (params.fork) {
448
const sourceState = this._stateManager.getSessionState(params.fork.session);
449
if (!sourceState) {
450
throw new ProtocolError(AHP_SESSION_NOT_FOUND, `Fork source session not found: ${params.fork.session}`);
451
}
452
const turnIndex = sourceState.turns.findIndex(t => t.id === params.fork!.turnId);
453
if (turnIndex < 0) {
454
throw new ProtocolError(AHP_SESSION_NOT_FOUND, `Fork turn ID ${params.fork.turnId} not found in session ${params.fork.session}`);
455
}
456
fork = { session: URI.parse(params.fork.session), turnIndex, turnId: params.fork.turnId };
457
}
458
// If the client eagerly claimed the active client role, validate
459
// the clientId matches the connection before forwarding.
460
if (params.activeClient && params.activeClient.clientId !== _client.clientId) {
461
throw new ProtocolError(JsonRpcErrorCodes.InvalidParams, `createSession.activeClient.clientId must match the connection's clientId`);
462
}
463
try {
464
createdSession = await this._agentService.createSession({
465
provider: params.provider,
466
model: params.model,
467
workingDirectory: params.workingDirectory ? URI.parse(params.workingDirectory) : undefined,
468
session: URI.parse(params.session),
469
fork,
470
config: params.config,
471
activeClient: params.activeClient,
472
});
473
} catch (err) {
474
if (err instanceof ProtocolError) {
475
throw err;
476
}
477
throw new ProtocolError(AHP_PROVIDER_NOT_FOUND, err instanceof Error ? err.message : String(err));
478
}
479
// Verify the provider honored the client-chosen session URI per the protocol contract
480
if (createdSession.toString() !== URI.parse(params.session).toString()) {
481
this._logService.warn(`[ProtocolServer] createSession: provider returned URI ${createdSession.toString()} but client requested ${params.session}`);
482
}
483
return null;
484
},
485
disposeSession: async (_client, params) => {
486
await this._agentService.disposeSession(URI.parse(params.session));
487
return null;
488
},
489
resourceWrite: async (_client, params) => {
490
return this._agentService.resourceWrite(params);
491
},
492
listSessions: async () => {
493
const sessions = await this._agentService.listSessions();
494
const items = sessions.map(s => {
495
const provider = AgentSession.provider(s.session);
496
if (!provider) {
497
throw new Error(`Agent session URI has no provider scheme: ${s.session.toString()}`);
498
}
499
// Encode isRead/isArchived as status bitmask flags
500
let status = s.status ?? SessionStatus.Idle;
501
if (s.isRead) {
502
status |= SessionStatus.IsRead;
503
}
504
if (s.isArchived) {
505
status |= SessionStatus.IsArchived;
506
}
507
return {
508
resource: s.session.toString(),
509
provider,
510
title: s.summary ?? 'Session',
511
status,
512
activity: s.activity,
513
createdAt: s.startTime,
514
modifiedAt: s.modifiedTime,
515
...(s.project ? { project: { uri: s.project.uri.toString(), displayName: s.project.displayName } } : {}),
516
model: s.model,
517
workingDirectory: s.workingDirectory?.toString(),
518
diffs: s.diffs ? [...s.diffs] : undefined,
519
};
520
});
521
return { items };
522
},
523
resolveSessionConfig: async (_client, params) => {
524
return this._agentService.resolveSessionConfig({
525
provider: params.provider,
526
workingDirectory: params.workingDirectory ? URI.parse(params.workingDirectory) : undefined,
527
config: params.config,
528
});
529
},
530
sessionConfigCompletions: async (_client, params) => {
531
return this._agentService.sessionConfigCompletions({
532
provider: params.provider,
533
workingDirectory: params.workingDirectory ? URI.parse(params.workingDirectory) : undefined,
534
config: params.config,
535
property: params.property,
536
query: params.query,
537
});
538
},
539
fetchTurns: async (_client, params) => {
540
const state = this._stateManager.getSessionState(params.session);
541
if (!state) {
542
throw new ProtocolError(AHP_SESSION_NOT_FOUND, `Session not found: ${params.session}`);
543
}
544
const turns = state.turns;
545
const limit = Math.min(params.limit ?? 50, 100);
546
547
let endIndex = turns.length;
548
if (params.before) {
549
const idx = turns.findIndex(t => t.id === params.before);
550
if (idx !== -1) {
551
endIndex = idx;
552
}
553
}
554
555
const startIndex = Math.max(0, endIndex - limit);
556
return {
557
turns: turns.slice(startIndex, endIndex),
558
hasMore: startIndex > 0,
559
};
560
},
561
resourceList: async (_client, params) => {
562
return this._agentService.resourceList(URI.parse(params.uri));
563
},
564
resourceRead: async (_client, params) => {
565
return this._agentService.resourceRead(URI.parse(params.uri));
566
},
567
resourceCopy: async (_client, params) => {
568
return this._agentService.resourceCopy(params);
569
},
570
resourceDelete: async (_client, params) => {
571
return this._agentService.resourceDelete(params);
572
},
573
resourceMove: async (_client, params) => {
574
return this._agentService.resourceMove(params);
575
},
576
authenticate: async (_client, params) => {
577
const result = await this._agentService.authenticate(params);
578
if (!result.authenticated) {
579
throw new ProtocolError(AHP_AUTH_REQUIRED, 'Authentication failed for resource: ' + params.resource);
580
}
581
return {};
582
},
583
createTerminal: async (_client, params) => {
584
await this._agentService.createTerminal(params);
585
return null;
586
},
587
disposeTerminal: async (_client, params) => {
588
await this._agentService.disposeTerminal(URI.parse(params.terminal));
589
return null;
590
},
591
};
592
593
594
// ---- Reverse RPC (server → client requests) ----------------------------
595
596
private _reverseRequestId = 0;
597
private readonly _pendingReverseRequests = new Map<number, { clientId: string; resolve: (value: unknown) => void; reject: (reason: unknown) => void }>();
598
599
/**
600
* Sends a JSON-RPC request to a connected client and waits for the response.
601
* Used for reverse-RPC operations like reading client-side files.
602
* Rejects if the client disconnects or the server is disposed.
603
*/
604
private _sendReverseRequest<T>(clientId: string, method: string, params: unknown): Promise<T> {
605
const client = this._clients.get(clientId);
606
if (!client) {
607
return Promise.reject(new Error(`Client ${clientId} is not connected`));
608
}
609
const id = ++this._reverseRequestId;
610
return new Promise<T>((resolve, reject) => {
611
this._pendingReverseRequests.set(id, { clientId, resolve: resolve as (value: unknown) => void, reject });
612
const request: JsonRpcRequest = { jsonrpc: '2.0', id, method, params };
613
client.transport.send(request);
614
});
615
}
616
617
/**
618
* Rejects and clears all pending reverse-RPC requests for a given client.
619
*/
620
private _rejectPendingReverseRequests(clientId: string): void {
621
for (const [id, pending] of this._pendingReverseRequests) {
622
if (pending.clientId === clientId) {
623
this._pendingReverseRequests.delete(id);
624
pending.reject(new Error(`Client ${clientId} disconnected`));
625
}
626
}
627
}
628
629
private _handleRequest(client: IConnectedClient, method: string, params: unknown, id: number): void {
630
const handler = this._requestHandlers.hasOwnProperty(method) ? this._requestHandlers[method as RequestMethod] : undefined;
631
if (handler) {
632
(handler as (client: IConnectedClient, params: unknown) => Promise<unknown>)(client, params).then(result => {
633
this._logService.trace(`[ProtocolServer] Request '${method}' id=${id} succeeded`);
634
client.transport.send(jsonRpcSuccess(id, result ?? null));
635
}).catch(err => {
636
this._logService.error(`[ProtocolServer] Request '${method}' failed`, err);
637
client.transport.send(jsonRpcErrorFrom(id, err));
638
});
639
return;
640
}
641
642
// VS Code extension methods (not in the typed protocol maps yet)
643
const extensionResult = this._handleExtensionRequest(method, params);
644
if (extensionResult) {
645
extensionResult.then(result => {
646
client.transport.send(jsonRpcSuccess(id, result ?? null));
647
}).catch(err => {
648
this._logService.error(`[ProtocolServer] Extension request '${method}' failed`, err);
649
client.transport.send(jsonRpcErrorFrom(id, err));
650
});
651
return;
652
}
653
654
client.transport.send(jsonRpcError(id, JSON_RPC_INTERNAL_ERROR, `Unknown method: ${method}`));
655
}
656
657
/**
658
* Handle VS Code extension methods that are not yet part of the typed
659
* protocol. Returns a Promise if the method was recognized, undefined
660
* otherwise.
661
*/
662
private _handleExtensionRequest(method: string, _params: unknown): Promise<unknown> | undefined {
663
switch (method) {
664
case 'shutdown':
665
return this._agentService.shutdown();
666
default:
667
return undefined;
668
}
669
}
670
671
// ---- Broadcasting -------------------------------------------------------
672
673
private _broadcastAction(envelope: ActionEnvelope): void {
674
this._logService.trace(`[ProtocolServer] Broadcasting action: ${envelope.action.type}`);
675
const msg: AhpServerNotification<'action'> = { jsonrpc: '2.0', method: 'action', params: envelope };
676
for (const client of this._clients.values()) {
677
if (this._isRelevantToClient(client, envelope)) {
678
client.transport.send(msg);
679
}
680
}
681
}
682
683
private _broadcastNotification(notification: INotification): void {
684
const msg: AhpServerNotification<'notification'> = { jsonrpc: '2.0', method: 'notification', params: { notification } };
685
for (const client of this._clients.values()) {
686
client.transport.send(msg);
687
}
688
}
689
690
private _isRelevantToClient(client: IConnectedClient, envelope: ActionEnvelope): boolean {
691
const action = envelope.action;
692
if (action.type.startsWith('root/')) {
693
return client.subscriptions.has(ROOT_STATE_URI);
694
}
695
if (isSessionAction(action)) {
696
return client.subscriptions.has(action.session);
697
}
698
if (isTerminalAction(action)) {
699
return client.subscriptions.has(action.terminal);
700
}
701
return false;
702
}
703
704
override dispose(): void {
705
for (const client of this._clients.values()) {
706
client.disposables.dispose();
707
}
708
this._clients.clear();
709
for (const [, pending] of this._pendingReverseRequests) {
710
pending.reject(new Error('ProtocolServerHandler disposed'));
711
}
712
this._pendingReverseRequests.clear();
713
for (const timeout of this._clientToolCallDisconnectTimeouts.values()) {
714
clearTimeout(timeout);
715
}
716
this._clientToolCallDisconnectTimeouts.clear();
717
this._replayBuffer.length = 0;
718
super.dispose();
719
}
720
}
721
722