Path: blob/main/src/vs/workbench/contrib/mcp/common/mcpGatewayToolBrokerChannel.ts
13401 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { CancellationToken } from '../../../../base/common/cancellation.js';6import { Emitter, Event } from '../../../../base/common/event.js';7import { Disposable } from '../../../../base/common/lifecycle.js';8import { autorun } from '../../../../base/common/observable.js';9import { IServerChannel } from '../../../../base/parts/ipc/common/ipc.js';10import { ILogService } from '../../../../platform/log/common/log.js';11import { IMcpGatewayServerDescriptor } from '../../../../platform/mcp/common/mcpGateway.js';12import { MCP } from '../../../../platform/mcp/common/modelContextProtocol.js';13import { URI } from '../../../../base/common/uri.js';14import { McpServer } from './mcpServer.js';15import { IMcpServer, IMcpService, McpCapability, McpServerCacheState, McpToolVisibility } from './mcpTypes.js';16import { startServerAndWaitForLiveTools } from './mcpTypesUtils.js';1718interface ICallToolForServerArgs {19serverId: string;20name: string;21args: Record<string, unknown>;22chatSessionResource?: string;23}2425interface IReadResourceForServerArgs {26serverId: string;27uri: string;28}2930interface IServerIdArg {31serverId: string;32}3334export class McpGatewayToolBrokerChannel extends Disposable implements IServerChannel<unknown> {35private readonly _onDidChangeTools = this._register(new Emitter<void>());36private readonly _onDidChangeResources = this._register(new Emitter<void>());37private readonly _onDidChangeServers = this._register(new Emitter<readonly IMcpGatewayServerDescriptor[]>());3839/**40* Per-server promise that races server startup against the grace period timeout.41* Once set for a server, subsequent list calls await the already-resolved promise42* and return immediately instead of waiting again.43*44* The `resolved` flag tracks whether the promise has settled. If a server's45* cacheState regresses to Unknown/Outdated after the promise resolved (e.g.46* after a cache reset), `_waitForStartup` discards the stale entry and creates47* a fresh race so the server gets another chance to start.48*/49private readonly _startupGrace = new Map<string, { promise: Promise<boolean>; resolved: boolean }>();5051constructor(52private readonly _mcpService: IMcpService,53private readonly _logService: ILogService,54private readonly _startupGracePeriodMs = 5000,55) {56super();57this._logService.debug('[McpGateway][ToolBroker] Initialized');5859let toolsInitialized = false;60this._register(autorun(reader => {61for (const server of this._mcpService.servers.read(reader)) {62server.tools.read(reader);63}6465if (toolsInitialized) {66this._logService.debug('[McpGateway][ToolBroker] Tools changed, firing onDidChangeTools');67this._onDidChangeTools.fire();68} else {69toolsInitialized = true;70}71}));7273let resourcesInitialized = false;74this._register(autorun(reader => {75for (const server of this._mcpService.servers.read(reader)) {76server.capabilities.read(reader);77}7879if (resourcesInitialized) {80this._logService.debug('[McpGateway][ToolBroker] Resources changed, firing onDidChangeResources');81this._onDidChangeResources.fire();82} else {83resourcesInitialized = true;84}85}));8687let serversInitialized = false;88this._register(autorun(reader => {89const servers = this._mcpService.servers.read(reader);9091if (serversInitialized) {92this._logService.debug('[McpGateway][ToolBroker] Servers changed, firing onDidChangeServers');93this._onDidChangeServers.fire(servers.map(s => ({ id: s.definition.id, label: s.definition.label })));94} else {95serversInitialized = true;96}97}));98}99100private _getServerById(serverId: string): IMcpServer | undefined {101for (const server of this._mcpService.servers.get()) {102if (server.definition.id === serverId) {103return server;104}105}106return undefined;107}108109private _waitForStartup(server: IMcpServer): Promise<boolean> {110const id = server.definition.id;111const existing = this._startupGrace.get(id);112// If the previous grace promise already resolved but the server is still113// Unknown/Outdated, the entry is stale (e.g. caches were reset). Discard114// it so we create a fresh race below.115if (existing?.resolved) {116const state = server.cacheState.get();117if (state === McpServerCacheState.Unknown || state === McpServerCacheState.Outdated) {118this._startupGrace.delete(id);119}120}121if (!this._startupGrace.has(id)) {122const entry: { promise: Promise<boolean>; resolved: boolean } = {123promise: Promise.race([124this._ensureServerReady(server),125new Promise<boolean>(resolve => setTimeout(() => resolve(false), this._startupGracePeriodMs)),126]),127resolved: false,128};129entry.promise.then(() => { entry.resolved = true; });130this._startupGrace.set(id, entry);131}132return this._startupGrace.get(id)!.promise;133}134135private async _shouldUseCachedData(server: IMcpServer): Promise<boolean> {136const cacheState = server.cacheState.get();137if (cacheState === McpServerCacheState.Unknown || cacheState === McpServerCacheState.Outdated) {138await this._waitForStartup(server);139const newState = server.cacheState.get();140return newState === McpServerCacheState.Live141|| newState === McpServerCacheState.Cached142|| newState === McpServerCacheState.RefreshingFromCached;143}144return cacheState === McpServerCacheState.Live145|| cacheState === McpServerCacheState.Cached146|| cacheState === McpServerCacheState.RefreshingFromCached;147}148149listen<T>(_ctx: unknown, event: string): Event<T> {150switch (event) {151case 'onDidChangeTools':152return this._onDidChangeTools.event as Event<T>;153case 'onDidChangeResources':154return this._onDidChangeResources.event as Event<T>;155case 'onDidChangeServers':156return this._onDidChangeServers.event as Event<T>;157}158159throw new Error(`Invalid listen: ${event}`);160}161162async call<T>(_ctx: unknown, command: string, arg?: unknown, cancellationToken?: CancellationToken): Promise<T> {163this._logService.debug(`[McpGateway][ToolBroker] IPC call: ${command}`);164165switch (command) {166case 'listServers': {167const servers = this._listServers();168return servers as T;169}170case 'listToolsForServer': {171const { serverId } = arg as IServerIdArg;172const tools = await this._listToolsForServer(serverId);173return tools as T;174}175case 'callToolForServer': {176const { serverId, name, args, chatSessionResource } = arg as ICallToolForServerArgs;177const result = await this._callToolForServer(serverId, name, args || {}, chatSessionResource, cancellationToken);178return result as T;179}180case 'listResourcesForServer': {181const { serverId } = arg as IServerIdArg;182const resources = await this._listResourcesForServer(serverId);183return resources as T;184}185case 'readResourceForServer': {186const { serverId, uri } = arg as IReadResourceForServerArgs;187const result = await this._readResourceForServer(serverId, uri, cancellationToken);188return result as T;189}190case 'listResourceTemplatesForServer': {191const { serverId } = arg as IServerIdArg;192const templates = await this._listResourceTemplatesForServer(serverId);193return templates as T;194}195}196197throw new Error(`Invalid call: ${command}`);198}199200private _listServers(): readonly IMcpGatewayServerDescriptor[] {201const servers = this._mcpService.servers.get();202const result: IMcpGatewayServerDescriptor[] = [];203for (const server of servers) {204result.push({ id: server.definition.id, label: server.definition.label });205}206this._logService.debug(`[McpGateway][ToolBroker] listServers result: ${result.length} server(s): [${result.map(s => s.label).join(', ')}]`);207return result;208}209210private async _listToolsForServer(serverId: string): Promise<readonly MCP.Tool[]> {211const server = this._getServerById(serverId);212if (!server) {213this._logService.warn(`[McpGateway][ToolBroker] listToolsForServer: unknown server '${serverId}'`);214return [];215}216if (!await this._shouldUseCachedData(server)) {217this._logService.debug(`[McpGateway][ToolBroker] Server '${serverId}' not ready, skipping tool listing`);218return [];219}220const tools = server.tools.get()221.filter(t => t.visibility & McpToolVisibility.Model)222.map(t => t.definition);223this._logService.debug(`[McpGateway][ToolBroker] listToolsForServer '${serverId}': ${tools.length} tool(s)`);224return tools;225}226227private async _callToolForServer(serverId: string, name: string, args: Record<string, unknown>, chatSessionResource?: string, token: CancellationToken = CancellationToken.None): Promise<MCP.CallToolResult> {228this._logService.debug(`[McpGateway][ToolBroker] callToolForServer '${serverId}' tool '${name}' with args: ${JSON.stringify(args)}`);229230const server = this._getServerById(serverId);231if (!server) {232throw new Error(`Unknown server: ${serverId}`);233}234235const tool = server.tools.get().find(t =>236t.definition.name === name && (t.visibility & McpToolVisibility.Model)237);238if (!tool) {239throw new Error(`Unknown tool '${name}' on server '${serverId}'`);240}241242const context = chatSessionResource ? { chatSessionResource: URI.parse(chatSessionResource) } : undefined;243const result = await tool.call(args, context, token);244this._logService.debug(`[McpGateway][ToolBroker] Tool '${name}' on '${serverId}' completed (isError=${result.isError ?? false}, content blocks=${result.content.length})`);245return result;246}247248private async _listResourcesForServer(serverId: string): Promise<readonly MCP.Resource[]> {249const server = this._getServerById(serverId);250if (!server) {251this._logService.warn(`[McpGateway][ToolBroker] listResourcesForServer: unknown server '${serverId}'`);252return [];253}254if (!await this._shouldUseCachedData(server)) {255return [];256}257258const capabilities = server.capabilities.get();259if (!capabilities || !(capabilities & McpCapability.Resources)) {260this._logService.debug(`[McpGateway][ToolBroker] Server '${serverId}' has no resource capability`);261return [];262}263264try {265const resources = await McpServer.callOn(server, h => h.listResources());266this._logService.debug(`[McpGateway][ToolBroker] Server '${serverId}' listed ${resources.length} resource(s)`);267return resources;268} catch (error) {269this._logService.warn(`[McpGateway][ToolBroker] Server '${serverId}' failed to list resources`, error);270return [];271}272}273274private async _readResourceForServer(serverId: string, uri: string, token: CancellationToken = CancellationToken.None): Promise<MCP.ReadResourceResult> {275const server = this._getServerById(serverId);276if (!server) {277throw new Error(`Unknown server: ${serverId}`);278}279280this._logService.debug(`[McpGateway][ToolBroker] readResourceForServer '${uri}' from server '${serverId}'`);281const result = await McpServer.callOn(server, h => h.readResource({ uri }, token), token);282this._logService.debug(`[McpGateway][ToolBroker] readResourceForServer returned ${result.contents.length} content(s)`);283return result;284}285286private async _listResourceTemplatesForServer(serverId: string): Promise<readonly MCP.ResourceTemplate[]> {287const server = this._getServerById(serverId);288if (!server) {289this._logService.warn(`[McpGateway][ToolBroker] listResourceTemplatesForServer: unknown server '${serverId}'`);290return [];291}292if (!await this._shouldUseCachedData(server)) {293return [];294}295296const capabilities = server.capabilities.get();297if (!capabilities || !(capabilities & McpCapability.Resources)) {298return [];299}300301try {302const resourceTemplates = await McpServer.callOn(server, h => h.listResourceTemplates());303this._logService.debug(`[McpGateway][ToolBroker] Server '${serverId}' listed ${resourceTemplates.length} resource template(s)`);304return resourceTemplates;305} catch (error) {306this._logService.warn(`[McpGateway][ToolBroker] Server '${serverId}' failed to list resource templates`, error);307return [];308}309}310311private async _ensureServerReady(server: IMcpServer): Promise<boolean> {312const cacheState = server.cacheState.get();313if (cacheState !== McpServerCacheState.Unknown && cacheState !== McpServerCacheState.Outdated) {314return true;315}316317this._logService.debug(`[McpGateway][ToolBroker] Server '${server.definition.id}' not ready (cacheState=${cacheState}), starting...`);318try {319const ready = await startServerAndWaitForLiveTools(server, {320promptType: 'all-untrusted',321errorOnUserInteraction: true,322});323this._logService.debug(`[McpGateway][ToolBroker] Server '${server.definition.id}' ready=${ready}`);324return ready;325} catch (error) {326this._logService.warn(`[McpGateway][ToolBroker] Server '${server.definition.id}' failed to start`, error);327return false;328}329}330}331332333