Path: blob/main/src/vs/platform/agentHost/browser/remoteAgentHostProtocolClient.ts
13394 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45// Protocol client for communicating with a remote agent host process.6// Wraps WebSocketClientTransport and SessionClientState to provide a7// higher-level API matching IAgentService.89import { DeferredPromise } from '../../../base/common/async.js';10import { Emitter } from '../../../base/common/event.js';11import { Disposable, IReference } from '../../../base/common/lifecycle.js';12import { Schemas } from '../../../base/common/network.js';13import { hasKey } from '../../../base/common/types.js';14import { URI } from '../../../base/common/uri.js';15import { generateUuid } from '../../../base/common/uuid.js';16import { ILogService } from '../../log/common/log.js';17import { FileSystemProviderErrorCode, IFileService, toFileSystemProviderErrorCode } from '../../files/common/files.js';18import { AgentSession, IAgentConnection, IAgentCreateSessionConfig, IAgentResolveSessionConfigParams, IAgentSessionConfigCompletionsParams, IAgentSessionMetadata, AuthenticateParams, AuthenticateResult } from '../common/agentService.js';19import { AgentSubscriptionManager, type IAgentSubscription } from '../common/state/agentSubscription.js';20import { agentHostAuthority, fromAgentHostUri, toAgentHostUri } from '../common/agentHostUri.js';21import type { ClientNotificationMap, CommandMap, JsonRpcErrorResponse, JsonRpcRequest } from '../common/state/protocol/messages.js';22import type { ActionEnvelope, INotification, IRootConfigChangedAction, SessionAction, TerminalAction } from '../common/state/sessionActions.js';23import { SessionSummary, SessionStatus, ROOT_STATE_URI, StateComponents, type RootState } from '../common/state/sessionState.js';24import { PROTOCOL_VERSION } from '../common/state/sessionCapabilities.js';25import { isJsonRpcNotification, isJsonRpcRequest, isJsonRpcResponse, type ProtocolMessage, type IStateSnapshot } from '../common/state/sessionProtocol.js';26import { isClientTransport, type IProtocolTransport } from '../common/state/sessionTransport.js';27import { AhpErrorCodes } from '../common/state/protocol/errors.js';28import { ContentEncoding, type CreateTerminalParams, type ResolveSessionConfigResult, type SessionConfigCompletionsResult } from '../common/state/protocol/commands.js';29import { decodeBase64, encodeBase64, VSBuffer } from '../../../base/common/buffer.js';3031const AHP_CLIENT_CONNECTION_CLOSED = -32000;3233export class RemoteAgentHostProtocolError extends Error {3435readonly code: number;36readonly data: unknown | undefined;3738constructor(error: JsonRpcErrorResponse['error']) {39super(error.message);40this.code = error.code;41this.data = error.data;42}4344static connectionClosed(address: string): RemoteAgentHostProtocolError {45return new RemoteAgentHostProtocolError({ code: AHP_CLIENT_CONNECTION_CLOSED, message: `Connection closed: ${address}` });46}4748static disposed(address: string): RemoteAgentHostProtocolError {49return new RemoteAgentHostProtocolError({ code: AHP_CLIENT_CONNECTION_CLOSED, message: `Connection disposed: ${address}` });50}51}5253interface IRemoteAgentHostExtensionCommandMap {54'shutdown': { params: undefined; result: void };55}5657/**58* A protocol-level client for a single remote agent host connection.59* Manages the WebSocket transport, handshake, subscriptions, action dispatch,60* and command/response correlation.61*62* Implements {@link IAgentConnection} so consumers can program against63* a single interface regardless of whether the agent host is local or remote.64*/65export class RemoteAgentHostProtocolClient extends Disposable implements IAgentConnection {6667declare readonly _serviceBrand: undefined;6869private readonly _clientId = generateUuid();70private readonly _address: string;71private readonly _transport: IProtocolTransport;72private readonly _connectionAuthority: string;73private _serverSeq = 0;74private _nextClientSeq = 1;75private _defaultDirectory: string | undefined;76private readonly _subscriptionManager: AgentSubscriptionManager;7778private readonly _onDidAction = this._register(new Emitter<ActionEnvelope>());79readonly onDidAction = this._onDidAction.event;8081private readonly _onDidNotification = this._register(new Emitter<INotification>());82readonly onDidNotification = this._onDidNotification.event;8384private readonly _onDidClose = this._register(new Emitter<void>());85readonly onDidClose = this._onDidClose.event;8687/** Pending JSON-RPC requests keyed by request id. */88private readonly _pendingRequests = new Map<number, DeferredPromise<unknown>>();89private _nextRequestId = 1;90private _isClosed = false;91private _closeError: RemoteAgentHostProtocolError | undefined;9293get clientId(): string {94return this._clientId;95}9697get address(): string {98return this._address;99}100101get defaultDirectory(): string | undefined {102return this._defaultDirectory;103}104105constructor(106address: string,107transport: IProtocolTransport,108@ILogService private readonly _logService: ILogService,109@IFileService private readonly _fileService: IFileService,110) {111super();112this._address = address;113this._connectionAuthority = agentHostAuthority(address);114this._transport = transport;115this._register(this._transport);116this._register(this._transport.onMessage(msg => this._handleMessage(msg)));117this._register(this._transport.onClose(() => this._handleClose(RemoteAgentHostProtocolError.connectionClosed(this._address))));118119this._subscriptionManager = this._register(new AgentSubscriptionManager(120this._clientId,121() => this.nextClientSeq(),122msg => this._logService.warn(`[RemoteAgentHostProtocolClient] ${msg}`),123resource => this.subscribe(resource),124resource => this.unsubscribe(resource),125));126127// Forward action envelopes from the transport to the subscription manager128this._register(this.onDidAction(envelope => {129this._subscriptionManager.receiveEnvelope(envelope);130}));131}132133override dispose(): void {134this._handleClose(RemoteAgentHostProtocolError.disposed(this._address));135super.dispose();136}137138/**139* Connect to the remote agent host and perform the protocol handshake.140*/141async connect(): Promise<void> {142if (isClientTransport(this._transport)) {143await this._raceClose(this._transport.connect());144}145146const result = await this._sendRequest('initialize', {147protocolVersion: PROTOCOL_VERSION,148clientId: this._clientId,149initialSubscriptions: [ROOT_STATE_URI],150});151this._serverSeq = result.serverSeq;152153// Hydrate root state from the initial snapshot154for (const snapshot of result.snapshots ?? []) {155if (snapshot.resource === ROOT_STATE_URI) {156this._subscriptionManager.handleRootSnapshot(snapshot.state as RootState, snapshot.fromSeq);157}158}159160if (result.defaultDirectory) {161const dir = result.defaultDirectory;162if (typeof dir === 'string') {163this._defaultDirectory = URI.parse(dir).path;164} else {165this._defaultDirectory = URI.revive(dir).path;166}167}168}169170// ---- IAgentConnection subscription API ----------------------------------171172get rootState(): IAgentSubscription<RootState> {173return this._subscriptionManager.rootState;174}175176getSubscription<T>(kind: StateComponents, resource: URI): IReference<IAgentSubscription<T>> {177return this._subscriptionManager.getSubscription<T>(kind, resource);178}179180getSubscriptionUnmanaged<T>(_kind: StateComponents, resource: URI): IAgentSubscription<T> | undefined {181return this._subscriptionManager.getSubscriptionUnmanaged<T>(resource);182}183184dispatch(action: SessionAction | TerminalAction | IRootConfigChangedAction): void {185const seq = this._subscriptionManager.dispatchOptimistic(action);186this.dispatchAction(action, this._clientId, seq);187}188189/**190* Subscribe to state at a URI. Returns the current state snapshot.191*/192async subscribe(resource: URI): Promise<IStateSnapshot> {193const result = await this._sendRequest('subscribe', { resource: resource.toString() });194return result.snapshot;195}196197/**198* Unsubscribe from state at a URI.199*/200unsubscribe(resource: URI): void {201this._sendNotification('unsubscribe', { resource: resource.toString() });202}203204/**205* Dispatch a client action to the server. Returns the clientSeq used.206*/207dispatchAction(action: SessionAction | TerminalAction | IRootConfigChangedAction, _clientId: string, clientSeq: number): void {208this._sendNotification('dispatchAction', { clientSeq, action });209}210211/**212* Create a new session on the remote agent host.213*/214async createSession(config?: IAgentCreateSessionConfig): Promise<URI> {215const provider = config?.provider;216if (!provider) {217throw new Error('Cannot create remote agent host session without a provider.');218}219const session = AgentSession.uri(provider, generateUuid());220await this._sendRequest('createSession', {221session: session.toString(),222provider,223model: config?.model,224workingDirectory: config?.workingDirectory ? fromAgentHostUri(config.workingDirectory).toString() : undefined,225config: config?.config,226activeClient: config?.activeClient,227});228return session;229}230231async resolveSessionConfig(params: IAgentResolveSessionConfigParams): Promise<ResolveSessionConfigResult> {232return this._sendRequest('resolveSessionConfig', {233provider: params.provider,234workingDirectory: params.workingDirectory ? fromAgentHostUri(params.workingDirectory).toString() : undefined,235config: params.config,236});237}238239async sessionConfigCompletions(params: IAgentSessionConfigCompletionsParams): Promise<SessionConfigCompletionsResult> {240return this._sendRequest('sessionConfigCompletions', {241provider: params.provider,242workingDirectory: params.workingDirectory ? fromAgentHostUri(params.workingDirectory).toString() : undefined,243config: params.config,244property: params.property,245query: params.query,246});247}248249/**250* Authenticate with the remote agent host using a specific scheme.251*/252async authenticate(params: AuthenticateParams): Promise<AuthenticateResult> {253await this._sendRequest('authenticate', params);254return { authenticated: true };255}256257/**258* Gracefully shut down all sessions on the remote host.259*/260async shutdown(): Promise<void> {261await this._sendExtensionRequest('shutdown');262}263264/**265* Dispose a session on the remote agent host.266*/267async disposeSession(session: URI): Promise<void> {268await this._sendRequest('disposeSession', { session: session.toString() });269}270271/**272* Create a new terminal on the remote agent host.273*/274async createTerminal(params: CreateTerminalParams): Promise<void> {275await this._sendRequest('createTerminal', params);276}277278/**279* Dispose a terminal on the remote agent host.280*/281async disposeTerminal(terminal: URI): Promise<void> {282await this._sendRequest('disposeTerminal', { terminal: terminal.toString() });283}284285/**286* List all sessions from the remote agent host.287*/288async listSessions(): Promise<IAgentSessionMetadata[]> {289const result = await this._sendRequest('listSessions', {});290return result.items.map((s: SessionSummary) => ({291session: URI.parse(s.resource),292startTime: s.createdAt,293modifiedTime: s.modifiedAt,294...(s.project ? {295project: {296uri: this._toLocalProjectUri(URI.parse(s.project.uri)),297displayName: s.project.displayName,298}299} : {}),300summary: s.title,301status: s.status,302activity: s.activity,303workingDirectory: typeof s.workingDirectory === 'string' ? toAgentHostUri(URI.parse(s.workingDirectory), this._connectionAuthority) : undefined,304isRead: !!(s.status & SessionStatus.IsRead),305isArchived: !!(s.status & SessionStatus.IsArchived),306diffs: s.diffs,307}));308}309310private _toLocalProjectUri(uri: URI): URI {311return uri.scheme === Schemas.file ? toAgentHostUri(uri, this._connectionAuthority) : uri;312}313314/**315* List the contents of a directory on the remote host's filesystem.316*/317async resourceList(uri: URI): Promise<CommandMap['resourceList']['result']> {318return await this._sendRequest('resourceList', { uri: uri.toString() });319}320321/**322* Read the content of a resource on the remote host.323*/324async resourceRead(uri: URI): Promise<CommandMap['resourceRead']['result']> {325return this._sendRequest('resourceRead', { uri: uri.toString() });326}327328async resourceWrite(params: CommandMap['resourceWrite']['params']): Promise<CommandMap['resourceWrite']['result']> {329return this._sendRequest('resourceWrite', params);330}331332async resourceCopy(params: CommandMap['resourceCopy']['params']): Promise<CommandMap['resourceCopy']['result']> {333return this._sendRequest('resourceCopy', params);334}335336async resourceDelete(params: CommandMap['resourceDelete']['params']): Promise<CommandMap['resourceDelete']['result']> {337return this._sendRequest('resourceDelete', params);338}339340async resourceMove(params: CommandMap['resourceMove']['params']): Promise<CommandMap['resourceMove']['result']> {341return this._sendRequest('resourceMove', params);342}343344private _handleMessage(msg: ProtocolMessage): void {345if (isJsonRpcRequest(msg)) {346this._handleReverseRequest(msg.id, msg.method, msg.params);347} else if (isJsonRpcResponse(msg)) {348const pending = this._pendingRequests.get(msg.id);349if (pending) {350this._pendingRequests.delete(msg.id);351if (hasKey(msg, { error: true })) {352this._logService.warn(`[RemoteAgentHostProtocol] Request ${msg.id} failed:`, msg.error);353pending.error(this._toProtocolError(msg.error));354} else {355pending.complete(msg.result);356}357} else {358this._logService.warn(`[RemoteAgentHostProtocol] Received response for unknown request id ${msg.id}`);359}360} else if (isJsonRpcNotification(msg)) {361switch (msg.method) {362case 'action': {363// Protocol envelope → VS Code envelope (superset of action types)364const envelope = msg.params;365this._serverSeq = Math.max(this._serverSeq, envelope.serverSeq);366this._onDidAction.fire(envelope);367break;368}369case 'notification': {370const notification = msg.params.notification;371this._logService.trace(`[RemoteAgentHostProtocol] Notification: ${notification.type}`);372this._onDidNotification.fire(notification);373break;374}375default:376this._logService.trace(`[RemoteAgentHostProtocol] Unhandled method: ${msg.method}`);377break;378}379} else {380this._logService.warn(`[RemoteAgentHostProtocol] Unrecognized message:`, JSON.stringify(msg));381}382}383384private _handleClose(error: RemoteAgentHostProtocolError): void {385if (this._isClosed) {386return;387}388389this._isClosed = true;390this._closeError = error;391this._rejectPendingRequests(error);392this._onDidClose.fire();393}394395private async _raceClose<T>(promise: Promise<T>): Promise<T> {396if (this._closeError) {397return Promise.reject(this._closeError);398}399400let closeListener = Disposable.None;401const closePromise = new Promise<never>((_resolve, reject) => {402closeListener = this.onDidClose(() => reject(this._closeError));403});404405try {406return await Promise.race([promise, closePromise]);407} finally {408closeListener.dispose();409}410}411412/**413* Handles reverse RPC requests from the server (e.g. resourceList,414* resourceRead). Reads from the local file service and sends a response.415*/416private _handleReverseRequest(id: number, method: string, params: unknown): void {417const sendResult = (result: unknown) => {418this._transport.send({ jsonrpc: '2.0', id, result });419};420const sendError = (err: unknown) => {421const fsCode = toFileSystemProviderErrorCode(err instanceof Error ? err : undefined);422let code = -32000;423switch (fsCode) {424case FileSystemProviderErrorCode.FileNotFound: code = AhpErrorCodes.NotFound; break;425case FileSystemProviderErrorCode.NoPermissions: code = AhpErrorCodes.PermissionDenied; break;426case FileSystemProviderErrorCode.FileExists: code = AhpErrorCodes.AlreadyExists; break;427}428this._transport.send({ jsonrpc: '2.0', id, error: { code, message: err instanceof Error ? err.message : String(err) } });429};430const handle = (fn: () => Promise<unknown>) => {431fn().then(sendResult, sendError);432};433434const p = params as Record<string, unknown>;435switch (method) {436case 'resourceList':437if (!p.uri) { sendError(new Error('Missing uri')); return; }438return handle(async () => {439const stat = await this._fileService.resolve(URI.parse(p.uri as string));440return { entries: (stat.children ?? []).map(c => ({ name: c.name, type: c.isDirectory ? 'directory' as const : 'file' as const })) };441});442case 'resourceRead':443if (!p.uri) { sendError(new Error('Missing uri')); return; }444return handle(async () => {445const content = await this._fileService.readFile(URI.parse(p.uri as string));446return { data: encodeBase64(content.value), encoding: ContentEncoding.Base64 };447});448case 'resourceWrite':449if (!p.uri || !p.data) { sendError(new Error('Missing uri or data')); return; }450return handle(async () => {451const writeUri = URI.parse(p.uri as string);452const buf = p.encoding === ContentEncoding.Base64453? decodeBase64(p.data as string)454: VSBuffer.fromString(p.data as string);455if (p.createOnly) {456await this._fileService.createFile(writeUri, buf, { overwrite: false });457} else {458await this._fileService.writeFile(writeUri, buf);459}460return {};461});462case 'resourceDelete':463if (!p.uri) { sendError(new Error('Missing uri')); return; }464return handle(() => this._fileService.del(URI.parse(p.uri as string), { recursive: !!p.recursive }).then(() => ({})));465case 'resourceMove':466if (!p.source || !p.destination) { sendError(new Error('Missing source or destination')); return; }467return handle(() => this._fileService.move(URI.parse(p.source as string), URI.parse(p.destination as string), !p.failIfExists).then(() => ({})));468default:469this._logService.warn(`[RemoteAgentHostProtocol] Unhandled reverse request: ${method}`);470sendError(new Error(`Unknown method: ${method}`));471}472}473474/** Send a typed JSON-RPC notification for a protocol-defined method. */475private _sendNotification<M extends keyof ClientNotificationMap>(method: M, params: ClientNotificationMap[M]['params']): void {476// Generic M can't satisfy the distributive AhpNotification union directly477// eslint-disable-next-line local/code-no-dangerous-type-assertions478this._transport.send({ jsonrpc: '2.0' as const, method, params } as ProtocolMessage);479}480481/** Send a typed JSON-RPC request for a protocol-defined method. */482private _sendRequest<M extends keyof CommandMap>(method: M, params: CommandMap[M]['params']): Promise<CommandMap[M]['result']> {483if (this._closeError) {484return Promise.reject(this._closeError);485}486487const id = this._nextRequestId++;488const deferred = new DeferredPromise<unknown>();489this._pendingRequests.set(id, deferred);490// Generic M can't satisfy the distributive AhpRequest union directly491// eslint-disable-next-line local/code-no-dangerous-type-assertions492this._transport.send({ jsonrpc: '2.0' as const, id, method, params } as ProtocolMessage);493return deferred.p as Promise<CommandMap[M]['result']>;494}495496/** Send a JSON-RPC request for a VS Code extension method (not in the protocol spec). */497private _sendExtensionRequest<M extends keyof IRemoteAgentHostExtensionCommandMap>(method: M, params?: IRemoteAgentHostExtensionCommandMap[M]['params']): Promise<IRemoteAgentHostExtensionCommandMap[M]['result']> {498if (this._closeError) {499return Promise.reject(this._closeError);500}501502const id = this._nextRequestId++;503const deferred = new DeferredPromise<unknown>();504this._pendingRequests.set(id, deferred);505const request: JsonRpcRequest = { jsonrpc: '2.0', id, method, params };506this._transport.send(request);507return deferred.p as Promise<IRemoteAgentHostExtensionCommandMap[M]['result']>;508}509510private _toProtocolError(error: JsonRpcErrorResponse['error']): RemoteAgentHostProtocolError {511return new RemoteAgentHostProtocolError(error);512}513514private _rejectPendingRequests(error: RemoteAgentHostProtocolError): void {515for (const pending of this._pendingRequests.values()) {516pending.error(error);517}518this._pendingRequests.clear();519}520521/**522* Get the next client sequence number for optimistic dispatch.523*/524nextClientSeq(): number {525return this._nextClientSeq++;526}527}528529530