Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/platform/agentHost/browser/remoteAgentHostProtocolClient.ts
13394 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
// Protocol client for communicating with a remote agent host process.
7
// Wraps WebSocketClientTransport and SessionClientState to provide a
8
// higher-level API matching IAgentService.
9
10
import { DeferredPromise } from '../../../base/common/async.js';
11
import { Emitter } from '../../../base/common/event.js';
12
import { Disposable, IReference } from '../../../base/common/lifecycle.js';
13
import { Schemas } from '../../../base/common/network.js';
14
import { hasKey } from '../../../base/common/types.js';
15
import { URI } from '../../../base/common/uri.js';
16
import { generateUuid } from '../../../base/common/uuid.js';
17
import { ILogService } from '../../log/common/log.js';
18
import { FileSystemProviderErrorCode, IFileService, toFileSystemProviderErrorCode } from '../../files/common/files.js';
19
import { AgentSession, IAgentConnection, IAgentCreateSessionConfig, IAgentResolveSessionConfigParams, IAgentSessionConfigCompletionsParams, IAgentSessionMetadata, AuthenticateParams, AuthenticateResult } from '../common/agentService.js';
20
import { AgentSubscriptionManager, type IAgentSubscription } from '../common/state/agentSubscription.js';
21
import { agentHostAuthority, fromAgentHostUri, toAgentHostUri } from '../common/agentHostUri.js';
22
import type { ClientNotificationMap, CommandMap, JsonRpcErrorResponse, JsonRpcRequest } from '../common/state/protocol/messages.js';
23
import type { ActionEnvelope, INotification, IRootConfigChangedAction, SessionAction, TerminalAction } from '../common/state/sessionActions.js';
24
import { SessionSummary, SessionStatus, ROOT_STATE_URI, StateComponents, type RootState } from '../common/state/sessionState.js';
25
import { PROTOCOL_VERSION } from '../common/state/sessionCapabilities.js';
26
import { isJsonRpcNotification, isJsonRpcRequest, isJsonRpcResponse, type ProtocolMessage, type IStateSnapshot } from '../common/state/sessionProtocol.js';
27
import { isClientTransport, type IProtocolTransport } from '../common/state/sessionTransport.js';
28
import { AhpErrorCodes } from '../common/state/protocol/errors.js';
29
import { ContentEncoding, type CreateTerminalParams, type ResolveSessionConfigResult, type SessionConfigCompletionsResult } from '../common/state/protocol/commands.js';
30
import { decodeBase64, encodeBase64, VSBuffer } from '../../../base/common/buffer.js';
31
32
const AHP_CLIENT_CONNECTION_CLOSED = -32000;
33
34
export class RemoteAgentHostProtocolError extends Error {
35
36
readonly code: number;
37
readonly data: unknown | undefined;
38
39
constructor(error: JsonRpcErrorResponse['error']) {
40
super(error.message);
41
this.code = error.code;
42
this.data = error.data;
43
}
44
45
static connectionClosed(address: string): RemoteAgentHostProtocolError {
46
return new RemoteAgentHostProtocolError({ code: AHP_CLIENT_CONNECTION_CLOSED, message: `Connection closed: ${address}` });
47
}
48
49
static disposed(address: string): RemoteAgentHostProtocolError {
50
return new RemoteAgentHostProtocolError({ code: AHP_CLIENT_CONNECTION_CLOSED, message: `Connection disposed: ${address}` });
51
}
52
}
53
54
interface IRemoteAgentHostExtensionCommandMap {
55
'shutdown': { params: undefined; result: void };
56
}
57
58
/**
59
* A protocol-level client for a single remote agent host connection.
60
* Manages the WebSocket transport, handshake, subscriptions, action dispatch,
61
* and command/response correlation.
62
*
63
* Implements {@link IAgentConnection} so consumers can program against
64
* a single interface regardless of whether the agent host is local or remote.
65
*/
66
export class RemoteAgentHostProtocolClient extends Disposable implements IAgentConnection {
67
68
declare readonly _serviceBrand: undefined;
69
70
private readonly _clientId = generateUuid();
71
private readonly _address: string;
72
private readonly _transport: IProtocolTransport;
73
private readonly _connectionAuthority: string;
74
private _serverSeq = 0;
75
private _nextClientSeq = 1;
76
private _defaultDirectory: string | undefined;
77
private readonly _subscriptionManager: AgentSubscriptionManager;
78
79
private readonly _onDidAction = this._register(new Emitter<ActionEnvelope>());
80
readonly onDidAction = this._onDidAction.event;
81
82
private readonly _onDidNotification = this._register(new Emitter<INotification>());
83
readonly onDidNotification = this._onDidNotification.event;
84
85
private readonly _onDidClose = this._register(new Emitter<void>());
86
readonly onDidClose = this._onDidClose.event;
87
88
/** Pending JSON-RPC requests keyed by request id. */
89
private readonly _pendingRequests = new Map<number, DeferredPromise<unknown>>();
90
private _nextRequestId = 1;
91
private _isClosed = false;
92
private _closeError: RemoteAgentHostProtocolError | undefined;
93
94
get clientId(): string {
95
return this._clientId;
96
}
97
98
get address(): string {
99
return this._address;
100
}
101
102
get defaultDirectory(): string | undefined {
103
return this._defaultDirectory;
104
}
105
106
constructor(
107
address: string,
108
transport: IProtocolTransport,
109
@ILogService private readonly _logService: ILogService,
110
@IFileService private readonly _fileService: IFileService,
111
) {
112
super();
113
this._address = address;
114
this._connectionAuthority = agentHostAuthority(address);
115
this._transport = transport;
116
this._register(this._transport);
117
this._register(this._transport.onMessage(msg => this._handleMessage(msg)));
118
this._register(this._transport.onClose(() => this._handleClose(RemoteAgentHostProtocolError.connectionClosed(this._address))));
119
120
this._subscriptionManager = this._register(new AgentSubscriptionManager(
121
this._clientId,
122
() => this.nextClientSeq(),
123
msg => this._logService.warn(`[RemoteAgentHostProtocolClient] ${msg}`),
124
resource => this.subscribe(resource),
125
resource => this.unsubscribe(resource),
126
));
127
128
// Forward action envelopes from the transport to the subscription manager
129
this._register(this.onDidAction(envelope => {
130
this._subscriptionManager.receiveEnvelope(envelope);
131
}));
132
}
133
134
override dispose(): void {
135
this._handleClose(RemoteAgentHostProtocolError.disposed(this._address));
136
super.dispose();
137
}
138
139
/**
140
* Connect to the remote agent host and perform the protocol handshake.
141
*/
142
async connect(): Promise<void> {
143
if (isClientTransport(this._transport)) {
144
await this._raceClose(this._transport.connect());
145
}
146
147
const result = await this._sendRequest('initialize', {
148
protocolVersion: PROTOCOL_VERSION,
149
clientId: this._clientId,
150
initialSubscriptions: [ROOT_STATE_URI],
151
});
152
this._serverSeq = result.serverSeq;
153
154
// Hydrate root state from the initial snapshot
155
for (const snapshot of result.snapshots ?? []) {
156
if (snapshot.resource === ROOT_STATE_URI) {
157
this._subscriptionManager.handleRootSnapshot(snapshot.state as RootState, snapshot.fromSeq);
158
}
159
}
160
161
if (result.defaultDirectory) {
162
const dir = result.defaultDirectory;
163
if (typeof dir === 'string') {
164
this._defaultDirectory = URI.parse(dir).path;
165
} else {
166
this._defaultDirectory = URI.revive(dir).path;
167
}
168
}
169
}
170
171
// ---- IAgentConnection subscription API ----------------------------------
172
173
get rootState(): IAgentSubscription<RootState> {
174
return this._subscriptionManager.rootState;
175
}
176
177
getSubscription<T>(kind: StateComponents, resource: URI): IReference<IAgentSubscription<T>> {
178
return this._subscriptionManager.getSubscription<T>(kind, resource);
179
}
180
181
getSubscriptionUnmanaged<T>(_kind: StateComponents, resource: URI): IAgentSubscription<T> | undefined {
182
return this._subscriptionManager.getSubscriptionUnmanaged<T>(resource);
183
}
184
185
dispatch(action: SessionAction | TerminalAction | IRootConfigChangedAction): void {
186
const seq = this._subscriptionManager.dispatchOptimistic(action);
187
this.dispatchAction(action, this._clientId, seq);
188
}
189
190
/**
191
* Subscribe to state at a URI. Returns the current state snapshot.
192
*/
193
async subscribe(resource: URI): Promise<IStateSnapshot> {
194
const result = await this._sendRequest('subscribe', { resource: resource.toString() });
195
return result.snapshot;
196
}
197
198
/**
199
* Unsubscribe from state at a URI.
200
*/
201
unsubscribe(resource: URI): void {
202
this._sendNotification('unsubscribe', { resource: resource.toString() });
203
}
204
205
/**
206
* Dispatch a client action to the server. Returns the clientSeq used.
207
*/
208
dispatchAction(action: SessionAction | TerminalAction | IRootConfigChangedAction, _clientId: string, clientSeq: number): void {
209
this._sendNotification('dispatchAction', { clientSeq, action });
210
}
211
212
/**
213
* Create a new session on the remote agent host.
214
*/
215
async createSession(config?: IAgentCreateSessionConfig): Promise<URI> {
216
const provider = config?.provider;
217
if (!provider) {
218
throw new Error('Cannot create remote agent host session without a provider.');
219
}
220
const session = AgentSession.uri(provider, generateUuid());
221
await this._sendRequest('createSession', {
222
session: session.toString(),
223
provider,
224
model: config?.model,
225
workingDirectory: config?.workingDirectory ? fromAgentHostUri(config.workingDirectory).toString() : undefined,
226
config: config?.config,
227
activeClient: config?.activeClient,
228
});
229
return session;
230
}
231
232
async resolveSessionConfig(params: IAgentResolveSessionConfigParams): Promise<ResolveSessionConfigResult> {
233
return this._sendRequest('resolveSessionConfig', {
234
provider: params.provider,
235
workingDirectory: params.workingDirectory ? fromAgentHostUri(params.workingDirectory).toString() : undefined,
236
config: params.config,
237
});
238
}
239
240
async sessionConfigCompletions(params: IAgentSessionConfigCompletionsParams): Promise<SessionConfigCompletionsResult> {
241
return this._sendRequest('sessionConfigCompletions', {
242
provider: params.provider,
243
workingDirectory: params.workingDirectory ? fromAgentHostUri(params.workingDirectory).toString() : undefined,
244
config: params.config,
245
property: params.property,
246
query: params.query,
247
});
248
}
249
250
/**
251
* Authenticate with the remote agent host using a specific scheme.
252
*/
253
async authenticate(params: AuthenticateParams): Promise<AuthenticateResult> {
254
await this._sendRequest('authenticate', params);
255
return { authenticated: true };
256
}
257
258
/**
259
* Gracefully shut down all sessions on the remote host.
260
*/
261
async shutdown(): Promise<void> {
262
await this._sendExtensionRequest('shutdown');
263
}
264
265
/**
266
* Dispose a session on the remote agent host.
267
*/
268
async disposeSession(session: URI): Promise<void> {
269
await this._sendRequest('disposeSession', { session: session.toString() });
270
}
271
272
/**
273
* Create a new terminal on the remote agent host.
274
*/
275
async createTerminal(params: CreateTerminalParams): Promise<void> {
276
await this._sendRequest('createTerminal', params);
277
}
278
279
/**
280
* Dispose a terminal on the remote agent host.
281
*/
282
async disposeTerminal(terminal: URI): Promise<void> {
283
await this._sendRequest('disposeTerminal', { terminal: terminal.toString() });
284
}
285
286
/**
287
* List all sessions from the remote agent host.
288
*/
289
async listSessions(): Promise<IAgentSessionMetadata[]> {
290
const result = await this._sendRequest('listSessions', {});
291
return result.items.map((s: SessionSummary) => ({
292
session: URI.parse(s.resource),
293
startTime: s.createdAt,
294
modifiedTime: s.modifiedAt,
295
...(s.project ? {
296
project: {
297
uri: this._toLocalProjectUri(URI.parse(s.project.uri)),
298
displayName: s.project.displayName,
299
}
300
} : {}),
301
summary: s.title,
302
status: s.status,
303
activity: s.activity,
304
workingDirectory: typeof s.workingDirectory === 'string' ? toAgentHostUri(URI.parse(s.workingDirectory), this._connectionAuthority) : undefined,
305
isRead: !!(s.status & SessionStatus.IsRead),
306
isArchived: !!(s.status & SessionStatus.IsArchived),
307
diffs: s.diffs,
308
}));
309
}
310
311
private _toLocalProjectUri(uri: URI): URI {
312
return uri.scheme === Schemas.file ? toAgentHostUri(uri, this._connectionAuthority) : uri;
313
}
314
315
/**
316
* List the contents of a directory on the remote host's filesystem.
317
*/
318
async resourceList(uri: URI): Promise<CommandMap['resourceList']['result']> {
319
return await this._sendRequest('resourceList', { uri: uri.toString() });
320
}
321
322
/**
323
* Read the content of a resource on the remote host.
324
*/
325
async resourceRead(uri: URI): Promise<CommandMap['resourceRead']['result']> {
326
return this._sendRequest('resourceRead', { uri: uri.toString() });
327
}
328
329
async resourceWrite(params: CommandMap['resourceWrite']['params']): Promise<CommandMap['resourceWrite']['result']> {
330
return this._sendRequest('resourceWrite', params);
331
}
332
333
async resourceCopy(params: CommandMap['resourceCopy']['params']): Promise<CommandMap['resourceCopy']['result']> {
334
return this._sendRequest('resourceCopy', params);
335
}
336
337
async resourceDelete(params: CommandMap['resourceDelete']['params']): Promise<CommandMap['resourceDelete']['result']> {
338
return this._sendRequest('resourceDelete', params);
339
}
340
341
async resourceMove(params: CommandMap['resourceMove']['params']): Promise<CommandMap['resourceMove']['result']> {
342
return this._sendRequest('resourceMove', params);
343
}
344
345
private _handleMessage(msg: ProtocolMessage): void {
346
if (isJsonRpcRequest(msg)) {
347
this._handleReverseRequest(msg.id, msg.method, msg.params);
348
} else if (isJsonRpcResponse(msg)) {
349
const pending = this._pendingRequests.get(msg.id);
350
if (pending) {
351
this._pendingRequests.delete(msg.id);
352
if (hasKey(msg, { error: true })) {
353
this._logService.warn(`[RemoteAgentHostProtocol] Request ${msg.id} failed:`, msg.error);
354
pending.error(this._toProtocolError(msg.error));
355
} else {
356
pending.complete(msg.result);
357
}
358
} else {
359
this._logService.warn(`[RemoteAgentHostProtocol] Received response for unknown request id ${msg.id}`);
360
}
361
} else if (isJsonRpcNotification(msg)) {
362
switch (msg.method) {
363
case 'action': {
364
// Protocol envelope → VS Code envelope (superset of action types)
365
const envelope = msg.params;
366
this._serverSeq = Math.max(this._serverSeq, envelope.serverSeq);
367
this._onDidAction.fire(envelope);
368
break;
369
}
370
case 'notification': {
371
const notification = msg.params.notification;
372
this._logService.trace(`[RemoteAgentHostProtocol] Notification: ${notification.type}`);
373
this._onDidNotification.fire(notification);
374
break;
375
}
376
default:
377
this._logService.trace(`[RemoteAgentHostProtocol] Unhandled method: ${msg.method}`);
378
break;
379
}
380
} else {
381
this._logService.warn(`[RemoteAgentHostProtocol] Unrecognized message:`, JSON.stringify(msg));
382
}
383
}
384
385
private _handleClose(error: RemoteAgentHostProtocolError): void {
386
if (this._isClosed) {
387
return;
388
}
389
390
this._isClosed = true;
391
this._closeError = error;
392
this._rejectPendingRequests(error);
393
this._onDidClose.fire();
394
}
395
396
private async _raceClose<T>(promise: Promise<T>): Promise<T> {
397
if (this._closeError) {
398
return Promise.reject(this._closeError);
399
}
400
401
let closeListener = Disposable.None;
402
const closePromise = new Promise<never>((_resolve, reject) => {
403
closeListener = this.onDidClose(() => reject(this._closeError));
404
});
405
406
try {
407
return await Promise.race([promise, closePromise]);
408
} finally {
409
closeListener.dispose();
410
}
411
}
412
413
/**
414
* Handles reverse RPC requests from the server (e.g. resourceList,
415
* resourceRead). Reads from the local file service and sends a response.
416
*/
417
private _handleReverseRequest(id: number, method: string, params: unknown): void {
418
const sendResult = (result: unknown) => {
419
this._transport.send({ jsonrpc: '2.0', id, result });
420
};
421
const sendError = (err: unknown) => {
422
const fsCode = toFileSystemProviderErrorCode(err instanceof Error ? err : undefined);
423
let code = -32000;
424
switch (fsCode) {
425
case FileSystemProviderErrorCode.FileNotFound: code = AhpErrorCodes.NotFound; break;
426
case FileSystemProviderErrorCode.NoPermissions: code = AhpErrorCodes.PermissionDenied; break;
427
case FileSystemProviderErrorCode.FileExists: code = AhpErrorCodes.AlreadyExists; break;
428
}
429
this._transport.send({ jsonrpc: '2.0', id, error: { code, message: err instanceof Error ? err.message : String(err) } });
430
};
431
const handle = (fn: () => Promise<unknown>) => {
432
fn().then(sendResult, sendError);
433
};
434
435
const p = params as Record<string, unknown>;
436
switch (method) {
437
case 'resourceList':
438
if (!p.uri) { sendError(new Error('Missing uri')); return; }
439
return handle(async () => {
440
const stat = await this._fileService.resolve(URI.parse(p.uri as string));
441
return { entries: (stat.children ?? []).map(c => ({ name: c.name, type: c.isDirectory ? 'directory' as const : 'file' as const })) };
442
});
443
case 'resourceRead':
444
if (!p.uri) { sendError(new Error('Missing uri')); return; }
445
return handle(async () => {
446
const content = await this._fileService.readFile(URI.parse(p.uri as string));
447
return { data: encodeBase64(content.value), encoding: ContentEncoding.Base64 };
448
});
449
case 'resourceWrite':
450
if (!p.uri || !p.data) { sendError(new Error('Missing uri or data')); return; }
451
return handle(async () => {
452
const writeUri = URI.parse(p.uri as string);
453
const buf = p.encoding === ContentEncoding.Base64
454
? decodeBase64(p.data as string)
455
: VSBuffer.fromString(p.data as string);
456
if (p.createOnly) {
457
await this._fileService.createFile(writeUri, buf, { overwrite: false });
458
} else {
459
await this._fileService.writeFile(writeUri, buf);
460
}
461
return {};
462
});
463
case 'resourceDelete':
464
if (!p.uri) { sendError(new Error('Missing uri')); return; }
465
return handle(() => this._fileService.del(URI.parse(p.uri as string), { recursive: !!p.recursive }).then(() => ({})));
466
case 'resourceMove':
467
if (!p.source || !p.destination) { sendError(new Error('Missing source or destination')); return; }
468
return handle(() => this._fileService.move(URI.parse(p.source as string), URI.parse(p.destination as string), !p.failIfExists).then(() => ({})));
469
default:
470
this._logService.warn(`[RemoteAgentHostProtocol] Unhandled reverse request: ${method}`);
471
sendError(new Error(`Unknown method: ${method}`));
472
}
473
}
474
475
/** Send a typed JSON-RPC notification for a protocol-defined method. */
476
private _sendNotification<M extends keyof ClientNotificationMap>(method: M, params: ClientNotificationMap[M]['params']): void {
477
// Generic M can't satisfy the distributive AhpNotification union directly
478
// eslint-disable-next-line local/code-no-dangerous-type-assertions
479
this._transport.send({ jsonrpc: '2.0' as const, method, params } as ProtocolMessage);
480
}
481
482
/** Send a typed JSON-RPC request for a protocol-defined method. */
483
private _sendRequest<M extends keyof CommandMap>(method: M, params: CommandMap[M]['params']): Promise<CommandMap[M]['result']> {
484
if (this._closeError) {
485
return Promise.reject(this._closeError);
486
}
487
488
const id = this._nextRequestId++;
489
const deferred = new DeferredPromise<unknown>();
490
this._pendingRequests.set(id, deferred);
491
// Generic M can't satisfy the distributive AhpRequest union directly
492
// eslint-disable-next-line local/code-no-dangerous-type-assertions
493
this._transport.send({ jsonrpc: '2.0' as const, id, method, params } as ProtocolMessage);
494
return deferred.p as Promise<CommandMap[M]['result']>;
495
}
496
497
/** Send a JSON-RPC request for a VS Code extension method (not in the protocol spec). */
498
private _sendExtensionRequest<M extends keyof IRemoteAgentHostExtensionCommandMap>(method: M, params?: IRemoteAgentHostExtensionCommandMap[M]['params']): Promise<IRemoteAgentHostExtensionCommandMap[M]['result']> {
499
if (this._closeError) {
500
return Promise.reject(this._closeError);
501
}
502
503
const id = this._nextRequestId++;
504
const deferred = new DeferredPromise<unknown>();
505
this._pendingRequests.set(id, deferred);
506
const request: JsonRpcRequest = { jsonrpc: '2.0', id, method, params };
507
this._transport.send(request);
508
return deferred.p as Promise<IRemoteAgentHostExtensionCommandMap[M]['result']>;
509
}
510
511
private _toProtocolError(error: JsonRpcErrorResponse['error']): RemoteAgentHostProtocolError {
512
return new RemoteAgentHostProtocolError(error);
513
}
514
515
private _rejectPendingRequests(error: RemoteAgentHostProtocolError): void {
516
for (const pending of this._pendingRequests.values()) {
517
pending.error(error);
518
}
519
this._pendingRequests.clear();
520
}
521
522
/**
523
* Get the next client sequence number for optimistic dispatch.
524
*/
525
nextClientSeq(): number {
526
return this._nextClientSeq++;
527
}
528
}
529
530