Path: blob/main/src/vs/platform/agentHost/node/protocolServerHandler.ts
13394 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { Emitter } from '../../../base/common/event.js';6import { isJsonRpcResponse } from '../../../base/common/jsonRpcProtocol.js';7import { Disposable, DisposableStore } from '../../../base/common/lifecycle.js';8import { hasKey } from '../../../base/common/types.js';9import { URI } from '../../../base/common/uri.js';10import { ILogService } from '../../log/common/log.js';11import { AHPFileSystemProvider } from '../common/agentHostFileSystemProvider.js';12import { AgentSession, type IAgentService } from '../common/agentService.js';13import type { CommandMap } from '../common/state/protocol/messages.js';14import { ActionEnvelope, ActionType, INotification, isSessionAction, isTerminalAction, type SessionAction, type TerminalAction, type IRootConfigChangedAction } from '../common/state/sessionActions.js';15import { MIN_PROTOCOL_VERSION, PROTOCOL_VERSION } from '../common/state/sessionCapabilities.js';16import {17AHP_AUTH_REQUIRED,18AHP_PROVIDER_NOT_FOUND,19AHP_SESSION_NOT_FOUND,20AHP_UNSUPPORTED_PROTOCOL_VERSION,21JsonRpcRequest,22isJsonRpcNotification,23isJsonRpcRequest,24JSON_RPC_INTERNAL_ERROR,25JsonRpcErrorCodes,26ProtocolError,27type AhpServerNotification,28type InitializeParams,29type JsonRpcResponse,30type ReconnectParams,31type IStateSnapshot,32} from '../common/state/sessionProtocol.js';33import { ResponsePartKind, ROOT_STATE_URI, SessionStatus, ToolCallConfirmationReason, ToolCallStatus, ToolResultContentType, type SessionState } from '../common/state/sessionState.js';34import type { IProtocolServer, IProtocolTransport } from '../common/state/sessionTransport.js';35import { AgentHostStateManager } from './agentHostStateManager.js';3637/** Default capacity of the server-side action replay buffer. */38const REPLAY_BUFFER_CAPACITY = 1000;3940const CLIENT_TOOL_CALL_DISCONNECT_TIMEOUT = 30_000;4142/** Build a JSON-RPC success response suitable for transport.send(). */43function jsonRpcSuccess(id: number, result: unknown): JsonRpcResponse {44return { jsonrpc: '2.0', id, result };45}4647/** Build a JSON-RPC error response suitable for transport.send(). */48function jsonRpcError(id: number, code: number, message: string, data?: unknown): JsonRpcResponse {49return { jsonrpc: '2.0', id, error: { code, message, ...(data !== undefined ? { data } : {}) } };50}5152/** Build a JSON-RPC error response from an unknown thrown value, preserving {@link ProtocolError} fields. */53function jsonRpcErrorFrom(id: number, err: unknown): JsonRpcResponse {54if (err instanceof ProtocolError) {55return jsonRpcError(id, err.code, err.message, err.data);56}57const message = err instanceof Error ? (err.stack ?? err.message) : String(err);58return jsonRpcError(id, JSON_RPC_INTERNAL_ERROR, message);59}6061/**62* Methods handled by the request dispatcher. Excludes `initialize` and63* `reconnect` which are handled during the handshake phase.64*/65type RequestMethod = Exclude<keyof CommandMap, 'initialize' | 'reconnect'>;6667/**68* Typed handler map: each key is a request method, each value is a handler69* that receives the correctly-typed params and must return the correctly-typed70* result. The compiler will error if a handler returns the wrong shape.71*/72type RequestHandlerMap = {73[M in RequestMethod]: (client: IConnectedClient, params: CommandMap[M]['params']) => Promise<CommandMap[M]['result']>;74};7576/**77* Represents a connected protocol client with its subscription state.78*/79interface IConnectedClient {80readonly clientId: string;81readonly protocolVersion: number;82readonly transport: IProtocolTransport;83readonly subscriptions: Set<string>;84readonly disposables: DisposableStore;85}8687/**88* Configuration for protocol-level concerns outside of IAgentService.89*/90export interface IProtocolServerConfig {91/** Default directory returned to clients during the initialize handshake. */92readonly defaultDirectory?: string;93}9495/**96* Server-side handler that manages protocol connections, routes JSON-RPC97* messages to the agent service, and broadcasts actions/notifications98* to subscribed clients.99*/100export class ProtocolServerHandler extends Disposable {101102private readonly _clients = new Map<string, IConnectedClient>();103private readonly _replayBuffer: ActionEnvelope[] = [];104private readonly _clientToolCallDisconnectTimeouts = new Map<string, ReturnType<typeof setTimeout>>();105106private readonly _onDidChangeConnectionCount = this._register(new Emitter<number>());107108/** Fires with the current client count whenever a client connects or disconnects. */109readonly onDidChangeConnectionCount = this._onDidChangeConnectionCount.event;110111constructor(112private readonly _agentService: IAgentService,113private readonly _stateManager: AgentHostStateManager,114private readonly _server: IProtocolServer,115private readonly _config: IProtocolServerConfig,116private readonly _clientFileSystemProvider: AHPFileSystemProvider,117@ILogService private readonly _logService: ILogService,118) {119super();120121this._register(this._server.onConnection(transport => {122this._handleNewConnection(transport);123}));124125this._register(this._stateManager.onDidEmitEnvelope(envelope => {126this._replayBuffer.push(envelope);127if (this._replayBuffer.length > REPLAY_BUFFER_CAPACITY) {128this._replayBuffer.shift();129}130this._broadcastAction(envelope);131}));132133this._register(this._stateManager.onDidEmitNotification(notification => {134this._broadcastNotification(notification);135}));136}137138// ---- Connection handling -------------------------------------------------139140private _handleNewConnection(transport: IProtocolTransport): void {141const disposables = new DisposableStore();142let client: IConnectedClient | undefined;143144disposables.add(transport.onMessage(msg => {145if (isJsonRpcRequest(msg)) {146this._logService.trace(`[ProtocolServer] request: method=${msg.method} id=${msg.id}`);147148// Handle initialize/reconnect as requests that set up the client149if (!client && msg.method === 'initialize') {150try {151const result = this._handleInitialize(msg.params, transport, disposables);152client = result.client;153transport.send(jsonRpcSuccess(msg.id, result.response));154} catch (err) {155transport.send(jsonRpcErrorFrom(msg.id, err));156}157return;158}159if (!client && msg.method === 'reconnect') {160try {161const result = this._handleReconnect(msg.params, transport, disposables);162client = result.client;163transport.send(jsonRpcSuccess(msg.id, result.response));164} catch (err) {165transport.send(jsonRpcErrorFrom(msg.id, err));166}167return;168}169170if (!client) {171return;172}173this._handleRequest(client, msg.method, msg.params, msg.id);174} else if (isJsonRpcNotification(msg)) {175this._logService.trace(`[ProtocolServer] notification: method=${msg.method}`);176// Notification — fire-and-forget177switch (msg.method) {178case 'unsubscribe':179if (client) {180client.subscriptions.delete(msg.params.resource);181}182break;183case 'dispatchAction':184if (client) {185this._logService.trace(`[ProtocolServer] dispatchAction: ${JSON.stringify(msg.params.action.type)}`);186const action = msg.params.action as SessionAction | TerminalAction | IRootConfigChangedAction;187if (isSessionAction(action) || isTerminalAction(action) || action.type === ActionType.RootConfigChanged) {188this._agentService.dispatchAction(action, client.clientId, msg.params.clientSeq);189}190}191break;192}193} else if (isJsonRpcResponse(msg)) {194const pending = this._pendingReverseRequests.get(msg.id);195if (pending) {196this._pendingReverseRequests.delete(msg.id);197if (hasKey(msg, { error: true })) {198pending.reject(new Error(msg.error?.message ?? 'Reverse RPC error'));199} else {200pending.resolve(msg.result);201}202}203}204}));205206disposables.add(transport.onClose(() => {207if (client && this._clients.get(client.clientId) === client) {208this._logService.info(`[ProtocolServer] Client disconnected: ${client.clientId}, subscriptions=${client.subscriptions.size}`);209this._clients.delete(client.clientId);210this._rejectPendingReverseRequests(client.clientId);211this._handleClientDisconnected(client.clientId);212this._onDidChangeConnectionCount.fire(this._clients.size);213}214disposables.dispose();215}));216217disposables.add(transport);218}219220// ---- Handshake handlers ----------------------------------------------------221222private _handleInitialize(223params: InitializeParams,224transport: IProtocolTransport,225disposables: DisposableStore,226): { client: IConnectedClient; response: unknown } {227this._logService.info(`[ProtocolServer] Initialize: clientId=${params.clientId}, version=${params.protocolVersion}`);228229if (params.protocolVersion < MIN_PROTOCOL_VERSION) {230throw new ProtocolError(231AHP_UNSUPPORTED_PROTOCOL_VERSION,232`Client protocol version ${params.protocolVersion} is below minimum ${MIN_PROTOCOL_VERSION}`,233);234}235236const client: IConnectedClient = {237clientId: params.clientId,238protocolVersion: params.protocolVersion,239transport,240subscriptions: new Set(),241disposables,242};243this._clients.set(params.clientId, client);244this._onDidChangeConnectionCount.fire(this._clients.size);245246disposables.add(this._clientFileSystemProvider.registerAuthority(params.clientId, {247resourceList: (uri) => this._sendReverseRequest(params.clientId, 'resourceList', { uri: uri.toString() }),248resourceRead: (uri) => this._sendReverseRequest(params.clientId, 'resourceRead', { uri: uri.toString() }),249resourceWrite: (params_) => this._sendReverseRequest(params.clientId, 'resourceWrite', params_),250resourceDelete: (params_) => this._sendReverseRequest(params.clientId, 'resourceDelete', params_),251resourceMove: (params_) => this._sendReverseRequest(params.clientId, 'resourceMove', params_),252}));253254255const snapshots: IStateSnapshot[] = [];256if (params.initialSubscriptions) {257for (const uri of params.initialSubscriptions) {258const snapshot = this._stateManager.getSnapshot(uri);259if (snapshot) {260snapshots.push(snapshot);261client.subscriptions.add(uri.toString());262this._clearClientToolCallDisconnectTimeout(params.clientId, uri.toString());263}264}265}266267return {268client,269response: {270protocolVersion: PROTOCOL_VERSION,271serverSeq: this._stateManager.serverSeq,272snapshots,273defaultDirectory: this._config.defaultDirectory,274},275};276}277278private _handleReconnect(279params: ReconnectParams,280transport: IProtocolTransport,281disposables: DisposableStore,282): { client: IConnectedClient; response: unknown } {283this._logService.info(`[ProtocolServer] Reconnect: clientId=${params.clientId}, lastSeenSeq=${params.lastSeenServerSeq}`);284285const client: IConnectedClient = {286clientId: params.clientId,287protocolVersion: PROTOCOL_VERSION,288transport,289subscriptions: new Set(),290disposables,291};292this._clients.set(params.clientId, client);293this._onDidChangeConnectionCount.fire(this._clients.size);294295const oldestBuffered = this._replayBuffer.length > 0 ? this._replayBuffer[0].serverSeq : this._stateManager.serverSeq;296const canReplay = params.lastSeenServerSeq >= oldestBuffered;297298if (canReplay) {299const actions: ActionEnvelope[] = [];300for (const sub of params.subscriptions) {301client.subscriptions.add(sub.toString());302this._clearClientToolCallDisconnectTimeout(params.clientId, sub.toString());303}304for (const envelope of this._replayBuffer) {305if (envelope.serverSeq > params.lastSeenServerSeq) {306if (this._isRelevantToClient(client, envelope)) {307actions.push(envelope);308}309}310}311return { client, response: { type: 'replay', actions } };312} else {313const snapshots: IStateSnapshot[] = [];314for (const sub of params.subscriptions) {315const snapshot = this._stateManager.getSnapshot(sub);316if (snapshot) {317snapshots.push(snapshot);318client.subscriptions.add(sub);319this._clearClientToolCallDisconnectTimeout(params.clientId, sub);320}321}322return { client, response: { type: 'snapshot', snapshots } };323}324}325326private _handleClientDisconnected(clientId: string): void {327for (const session of this._stateManager.getSessionUris()) {328const state = this._stateManager.getSessionState(session);329const ownsPendingToolCall = state ? this._hasPendingClientToolCall(state, clientId) : false;330if (state?.activeClient?.clientId === clientId) {331this._stateManager.dispatchServerAction({332type: ActionType.SessionActiveClientChanged,333session,334activeClient: null,335});336}337if (state?.activeClient?.clientId === clientId || ownsPendingToolCall) {338this._startClientToolCallDisconnectTimeout(clientId, session);339}340}341}342343private _hasPendingClientToolCall(state: ReturnType<AgentHostStateManager['getSessionState']>, clientId: string): boolean {344const activeTurn = state?.activeTurn;345if (!activeTurn) {346return false;347}348return activeTurn.responseParts.some(part => part.kind === ResponsePartKind.ToolCall349&& part.toolCall.toolClientId === clientId350&& (part.toolCall.status === ToolCallStatus.Streaming || part.toolCall.status === ToolCallStatus.Running || part.toolCall.status === ToolCallStatus.PendingConfirmation));351}352353private _hasReplacementActiveClientTool(state: SessionState, clientId: string, toolName: string): boolean {354const activeClient = state.activeClient;355return activeClient !== undefined356&& activeClient.clientId !== clientId357&& activeClient.tools.some(tool => tool.name === toolName);358}359360private _startClientToolCallDisconnectTimeout(clientId: string, session: string): void {361this._clearClientToolCallDisconnectTimeout(clientId, session);362const key = this._clientToolCallDisconnectTimeoutKey(clientId, session);363this._clientToolCallDisconnectTimeouts.set(key, setTimeout(() => {364this._clientToolCallDisconnectTimeouts.delete(key);365this._completeDisconnectedClientToolCalls(clientId, session);366}, CLIENT_TOOL_CALL_DISCONNECT_TIMEOUT));367}368369private _clearClientToolCallDisconnectTimeout(clientId: string, session: string): void {370const key = this._clientToolCallDisconnectTimeoutKey(clientId, session);371const timeout = this._clientToolCallDisconnectTimeouts.get(key);372if (timeout) {373clearTimeout(timeout);374this._clientToolCallDisconnectTimeouts.delete(key);375}376}377378private _clientToolCallDisconnectTimeoutKey(clientId: string, session: string): string {379return `${clientId}\n${session}`;380}381382private _completeDisconnectedClientToolCalls(clientId: string, session: string): void {383const state = this._stateManager.getSessionState(session);384const activeTurn = state?.activeTurn;385if (!activeTurn) {386return;387}388for (const part of activeTurn.responseParts) {389if (part.kind !== ResponsePartKind.ToolCall) {390continue;391}392const toolCall = part.toolCall;393if (toolCall.toolClientId === clientId && (toolCall.status === ToolCallStatus.Streaming || toolCall.status === ToolCallStatus.Running || toolCall.status === ToolCallStatus.PendingConfirmation)) {394const mayRetryWithReplacementClient = this._hasReplacementActiveClientTool(state, clientId, toolCall.toolName);395if (toolCall.status === ToolCallStatus.Streaming) {396this._stateManager.dispatchServerAction({397type: ActionType.SessionToolCallReady,398session,399turnId: activeTurn.id,400toolCallId: toolCall.toolCallId,401invocationMessage: toolCall.invocationMessage ?? toolCall.displayName,402confirmed: ToolCallConfirmationReason.NotNeeded,403});404}405this._stateManager.dispatchServerAction({406type: ActionType.SessionToolCallComplete,407session,408turnId: activeTurn.id,409toolCallId: toolCall.toolCallId,410result: {411success: false,412pastTenseMessage: `${toolCall.displayName} failed`,413...(mayRetryWithReplacementClient ? { content: [{ type: ToolResultContentType.Text, text: `The client that was running ${toolCall.displayName} disconnected, but another active client now provides ${toolCall.displayName}. You may try calling the tool again.` }] } : {}),414error: { message: `Client ${clientId} disconnected before completing ${toolCall.displayName}` },415},416});417}418}419}420421// ---- Requests (expect a response) ---------------------------------------422423/**424* Methods handled by the request dispatcher (excludes initialize/reconnect425* which are handled during the handshake phase).426*/427private readonly _requestHandlers: RequestHandlerMap = {428subscribe: async (client, params) => {429try {430const snapshot = await this._agentService.subscribe(URI.parse(params.resource));431client.subscriptions.add(params.resource);432this._clearClientToolCallDisconnectTimeout(client.clientId, params.resource);433return { snapshot };434} catch (err) {435if (err instanceof ProtocolError) {436throw err;437}438throw new ProtocolError(AHP_SESSION_NOT_FOUND, `Resource not found: ${params.resource}`);439}440},441createSession: async (_client, params) => {442let createdSession: URI;443// Resolve fork turnId to a 0-based index using the source session's444// turn list in the state manager.445let fork: { session: URI; turnIndex: number; turnId: string } | undefined;446if (params.fork) {447const sourceState = this._stateManager.getSessionState(params.fork.session);448if (!sourceState) {449throw new ProtocolError(AHP_SESSION_NOT_FOUND, `Fork source session not found: ${params.fork.session}`);450}451const turnIndex = sourceState.turns.findIndex(t => t.id === params.fork!.turnId);452if (turnIndex < 0) {453throw new ProtocolError(AHP_SESSION_NOT_FOUND, `Fork turn ID ${params.fork.turnId} not found in session ${params.fork.session}`);454}455fork = { session: URI.parse(params.fork.session), turnIndex, turnId: params.fork.turnId };456}457// If the client eagerly claimed the active client role, validate458// the clientId matches the connection before forwarding.459if (params.activeClient && params.activeClient.clientId !== _client.clientId) {460throw new ProtocolError(JsonRpcErrorCodes.InvalidParams, `createSession.activeClient.clientId must match the connection's clientId`);461}462try {463createdSession = await this._agentService.createSession({464provider: params.provider,465model: params.model,466workingDirectory: params.workingDirectory ? URI.parse(params.workingDirectory) : undefined,467session: URI.parse(params.session),468fork,469config: params.config,470activeClient: params.activeClient,471});472} catch (err) {473if (err instanceof ProtocolError) {474throw err;475}476throw new ProtocolError(AHP_PROVIDER_NOT_FOUND, err instanceof Error ? err.message : String(err));477}478// Verify the provider honored the client-chosen session URI per the protocol contract479if (createdSession.toString() !== URI.parse(params.session).toString()) {480this._logService.warn(`[ProtocolServer] createSession: provider returned URI ${createdSession.toString()} but client requested ${params.session}`);481}482return null;483},484disposeSession: async (_client, params) => {485await this._agentService.disposeSession(URI.parse(params.session));486return null;487},488resourceWrite: async (_client, params) => {489return this._agentService.resourceWrite(params);490},491listSessions: async () => {492const sessions = await this._agentService.listSessions();493const items = sessions.map(s => {494const provider = AgentSession.provider(s.session);495if (!provider) {496throw new Error(`Agent session URI has no provider scheme: ${s.session.toString()}`);497}498// Encode isRead/isArchived as status bitmask flags499let status = s.status ?? SessionStatus.Idle;500if (s.isRead) {501status |= SessionStatus.IsRead;502}503if (s.isArchived) {504status |= SessionStatus.IsArchived;505}506return {507resource: s.session.toString(),508provider,509title: s.summary ?? 'Session',510status,511activity: s.activity,512createdAt: s.startTime,513modifiedAt: s.modifiedTime,514...(s.project ? { project: { uri: s.project.uri.toString(), displayName: s.project.displayName } } : {}),515model: s.model,516workingDirectory: s.workingDirectory?.toString(),517diffs: s.diffs ? [...s.diffs] : undefined,518};519});520return { items };521},522resolveSessionConfig: async (_client, params) => {523return this._agentService.resolveSessionConfig({524provider: params.provider,525workingDirectory: params.workingDirectory ? URI.parse(params.workingDirectory) : undefined,526config: params.config,527});528},529sessionConfigCompletions: async (_client, params) => {530return this._agentService.sessionConfigCompletions({531provider: params.provider,532workingDirectory: params.workingDirectory ? URI.parse(params.workingDirectory) : undefined,533config: params.config,534property: params.property,535query: params.query,536});537},538fetchTurns: async (_client, params) => {539const state = this._stateManager.getSessionState(params.session);540if (!state) {541throw new ProtocolError(AHP_SESSION_NOT_FOUND, `Session not found: ${params.session}`);542}543const turns = state.turns;544const limit = Math.min(params.limit ?? 50, 100);545546let endIndex = turns.length;547if (params.before) {548const idx = turns.findIndex(t => t.id === params.before);549if (idx !== -1) {550endIndex = idx;551}552}553554const startIndex = Math.max(0, endIndex - limit);555return {556turns: turns.slice(startIndex, endIndex),557hasMore: startIndex > 0,558};559},560resourceList: async (_client, params) => {561return this._agentService.resourceList(URI.parse(params.uri));562},563resourceRead: async (_client, params) => {564return this._agentService.resourceRead(URI.parse(params.uri));565},566resourceCopy: async (_client, params) => {567return this._agentService.resourceCopy(params);568},569resourceDelete: async (_client, params) => {570return this._agentService.resourceDelete(params);571},572resourceMove: async (_client, params) => {573return this._agentService.resourceMove(params);574},575authenticate: async (_client, params) => {576const result = await this._agentService.authenticate(params);577if (!result.authenticated) {578throw new ProtocolError(AHP_AUTH_REQUIRED, 'Authentication failed for resource: ' + params.resource);579}580return {};581},582createTerminal: async (_client, params) => {583await this._agentService.createTerminal(params);584return null;585},586disposeTerminal: async (_client, params) => {587await this._agentService.disposeTerminal(URI.parse(params.terminal));588return null;589},590};591592593// ---- Reverse RPC (server → client requests) ----------------------------594595private _reverseRequestId = 0;596private readonly _pendingReverseRequests = new Map<number, { clientId: string; resolve: (value: unknown) => void; reject: (reason: unknown) => void }>();597598/**599* Sends a JSON-RPC request to a connected client and waits for the response.600* Used for reverse-RPC operations like reading client-side files.601* Rejects if the client disconnects or the server is disposed.602*/603private _sendReverseRequest<T>(clientId: string, method: string, params: unknown): Promise<T> {604const client = this._clients.get(clientId);605if (!client) {606return Promise.reject(new Error(`Client ${clientId} is not connected`));607}608const id = ++this._reverseRequestId;609return new Promise<T>((resolve, reject) => {610this._pendingReverseRequests.set(id, { clientId, resolve: resolve as (value: unknown) => void, reject });611const request: JsonRpcRequest = { jsonrpc: '2.0', id, method, params };612client.transport.send(request);613});614}615616/**617* Rejects and clears all pending reverse-RPC requests for a given client.618*/619private _rejectPendingReverseRequests(clientId: string): void {620for (const [id, pending] of this._pendingReverseRequests) {621if (pending.clientId === clientId) {622this._pendingReverseRequests.delete(id);623pending.reject(new Error(`Client ${clientId} disconnected`));624}625}626}627628private _handleRequest(client: IConnectedClient, method: string, params: unknown, id: number): void {629const handler = this._requestHandlers.hasOwnProperty(method) ? this._requestHandlers[method as RequestMethod] : undefined;630if (handler) {631(handler as (client: IConnectedClient, params: unknown) => Promise<unknown>)(client, params).then(result => {632this._logService.trace(`[ProtocolServer] Request '${method}' id=${id} succeeded`);633client.transport.send(jsonRpcSuccess(id, result ?? null));634}).catch(err => {635this._logService.error(`[ProtocolServer] Request '${method}' failed`, err);636client.transport.send(jsonRpcErrorFrom(id, err));637});638return;639}640641// VS Code extension methods (not in the typed protocol maps yet)642const extensionResult = this._handleExtensionRequest(method, params);643if (extensionResult) {644extensionResult.then(result => {645client.transport.send(jsonRpcSuccess(id, result ?? null));646}).catch(err => {647this._logService.error(`[ProtocolServer] Extension request '${method}' failed`, err);648client.transport.send(jsonRpcErrorFrom(id, err));649});650return;651}652653client.transport.send(jsonRpcError(id, JSON_RPC_INTERNAL_ERROR, `Unknown method: ${method}`));654}655656/**657* Handle VS Code extension methods that are not yet part of the typed658* protocol. Returns a Promise if the method was recognized, undefined659* otherwise.660*/661private _handleExtensionRequest(method: string, _params: unknown): Promise<unknown> | undefined {662switch (method) {663case 'shutdown':664return this._agentService.shutdown();665default:666return undefined;667}668}669670// ---- Broadcasting -------------------------------------------------------671672private _broadcastAction(envelope: ActionEnvelope): void {673this._logService.trace(`[ProtocolServer] Broadcasting action: ${envelope.action.type}`);674const msg: AhpServerNotification<'action'> = { jsonrpc: '2.0', method: 'action', params: envelope };675for (const client of this._clients.values()) {676if (this._isRelevantToClient(client, envelope)) {677client.transport.send(msg);678}679}680}681682private _broadcastNotification(notification: INotification): void {683const msg: AhpServerNotification<'notification'> = { jsonrpc: '2.0', method: 'notification', params: { notification } };684for (const client of this._clients.values()) {685client.transport.send(msg);686}687}688689private _isRelevantToClient(client: IConnectedClient, envelope: ActionEnvelope): boolean {690const action = envelope.action;691if (action.type.startsWith('root/')) {692return client.subscriptions.has(ROOT_STATE_URI);693}694if (isSessionAction(action)) {695return client.subscriptions.has(action.session);696}697if (isTerminalAction(action)) {698return client.subscriptions.has(action.terminal);699}700return false;701}702703override dispose(): void {704for (const client of this._clients.values()) {705client.disposables.dispose();706}707this._clients.clear();708for (const [, pending] of this._pendingReverseRequests) {709pending.reject(new Error('ProtocolServerHandler disposed'));710}711this._pendingReverseRequests.clear();712for (const timeout of this._clientToolCallDisconnectTimeouts.values()) {713clearTimeout(timeout);714}715this._clientToolCallDisconnectTimeouts.clear();716this._replayBuffer.length = 0;717super.dispose();718}719}720721722