Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/workbench/contrib/mcp/common/mcpGatewayToolBrokerChannel.ts
13401 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { CancellationToken } from '../../../../base/common/cancellation.js';
7
import { Emitter, Event } from '../../../../base/common/event.js';
8
import { Disposable } from '../../../../base/common/lifecycle.js';
9
import { autorun } from '../../../../base/common/observable.js';
10
import { IServerChannel } from '../../../../base/parts/ipc/common/ipc.js';
11
import { ILogService } from '../../../../platform/log/common/log.js';
12
import { IMcpGatewayServerDescriptor } from '../../../../platform/mcp/common/mcpGateway.js';
13
import { MCP } from '../../../../platform/mcp/common/modelContextProtocol.js';
14
import { URI } from '../../../../base/common/uri.js';
15
import { McpServer } from './mcpServer.js';
16
import { IMcpServer, IMcpService, McpCapability, McpServerCacheState, McpToolVisibility } from './mcpTypes.js';
17
import { startServerAndWaitForLiveTools } from './mcpTypesUtils.js';
18
19
interface ICallToolForServerArgs {
20
serverId: string;
21
name: string;
22
args: Record<string, unknown>;
23
chatSessionResource?: string;
24
}
25
26
interface IReadResourceForServerArgs {
27
serverId: string;
28
uri: string;
29
}
30
31
interface IServerIdArg {
32
serverId: string;
33
}
34
35
export class McpGatewayToolBrokerChannel extends Disposable implements IServerChannel<unknown> {
36
private readonly _onDidChangeTools = this._register(new Emitter<void>());
37
private readonly _onDidChangeResources = this._register(new Emitter<void>());
38
private readonly _onDidChangeServers = this._register(new Emitter<readonly IMcpGatewayServerDescriptor[]>());
39
40
/**
41
* Per-server promise that races server startup against the grace period timeout.
42
* Once set for a server, subsequent list calls await the already-resolved promise
43
* and return immediately instead of waiting again.
44
*
45
* The `resolved` flag tracks whether the promise has settled. If a server's
46
* cacheState regresses to Unknown/Outdated after the promise resolved (e.g.
47
* after a cache reset), `_waitForStartup` discards the stale entry and creates
48
* a fresh race so the server gets another chance to start.
49
*/
50
private readonly _startupGrace = new Map<string, { promise: Promise<boolean>; resolved: boolean }>();
51
52
constructor(
53
private readonly _mcpService: IMcpService,
54
private readonly _logService: ILogService,
55
private readonly _startupGracePeriodMs = 5000,
56
) {
57
super();
58
this._logService.debug('[McpGateway][ToolBroker] Initialized');
59
60
let toolsInitialized = false;
61
this._register(autorun(reader => {
62
for (const server of this._mcpService.servers.read(reader)) {
63
server.tools.read(reader);
64
}
65
66
if (toolsInitialized) {
67
this._logService.debug('[McpGateway][ToolBroker] Tools changed, firing onDidChangeTools');
68
this._onDidChangeTools.fire();
69
} else {
70
toolsInitialized = true;
71
}
72
}));
73
74
let resourcesInitialized = false;
75
this._register(autorun(reader => {
76
for (const server of this._mcpService.servers.read(reader)) {
77
server.capabilities.read(reader);
78
}
79
80
if (resourcesInitialized) {
81
this._logService.debug('[McpGateway][ToolBroker] Resources changed, firing onDidChangeResources');
82
this._onDidChangeResources.fire();
83
} else {
84
resourcesInitialized = true;
85
}
86
}));
87
88
let serversInitialized = false;
89
this._register(autorun(reader => {
90
const servers = this._mcpService.servers.read(reader);
91
92
if (serversInitialized) {
93
this._logService.debug('[McpGateway][ToolBroker] Servers changed, firing onDidChangeServers');
94
this._onDidChangeServers.fire(servers.map(s => ({ id: s.definition.id, label: s.definition.label })));
95
} else {
96
serversInitialized = true;
97
}
98
}));
99
}
100
101
private _getServerById(serverId: string): IMcpServer | undefined {
102
for (const server of this._mcpService.servers.get()) {
103
if (server.definition.id === serverId) {
104
return server;
105
}
106
}
107
return undefined;
108
}
109
110
private _waitForStartup(server: IMcpServer): Promise<boolean> {
111
const id = server.definition.id;
112
const existing = this._startupGrace.get(id);
113
// If the previous grace promise already resolved but the server is still
114
// Unknown/Outdated, the entry is stale (e.g. caches were reset). Discard
115
// it so we create a fresh race below.
116
if (existing?.resolved) {
117
const state = server.cacheState.get();
118
if (state === McpServerCacheState.Unknown || state === McpServerCacheState.Outdated) {
119
this._startupGrace.delete(id);
120
}
121
}
122
if (!this._startupGrace.has(id)) {
123
const entry: { promise: Promise<boolean>; resolved: boolean } = {
124
promise: Promise.race([
125
this._ensureServerReady(server),
126
new Promise<boolean>(resolve => setTimeout(() => resolve(false), this._startupGracePeriodMs)),
127
]),
128
resolved: false,
129
};
130
entry.promise.then(() => { entry.resolved = true; });
131
this._startupGrace.set(id, entry);
132
}
133
return this._startupGrace.get(id)!.promise;
134
}
135
136
private async _shouldUseCachedData(server: IMcpServer): Promise<boolean> {
137
const cacheState = server.cacheState.get();
138
if (cacheState === McpServerCacheState.Unknown || cacheState === McpServerCacheState.Outdated) {
139
await this._waitForStartup(server);
140
const newState = server.cacheState.get();
141
return newState === McpServerCacheState.Live
142
|| newState === McpServerCacheState.Cached
143
|| newState === McpServerCacheState.RefreshingFromCached;
144
}
145
return cacheState === McpServerCacheState.Live
146
|| cacheState === McpServerCacheState.Cached
147
|| cacheState === McpServerCacheState.RefreshingFromCached;
148
}
149
150
listen<T>(_ctx: unknown, event: string): Event<T> {
151
switch (event) {
152
case 'onDidChangeTools':
153
return this._onDidChangeTools.event as Event<T>;
154
case 'onDidChangeResources':
155
return this._onDidChangeResources.event as Event<T>;
156
case 'onDidChangeServers':
157
return this._onDidChangeServers.event as Event<T>;
158
}
159
160
throw new Error(`Invalid listen: ${event}`);
161
}
162
163
async call<T>(_ctx: unknown, command: string, arg?: unknown, cancellationToken?: CancellationToken): Promise<T> {
164
this._logService.debug(`[McpGateway][ToolBroker] IPC call: ${command}`);
165
166
switch (command) {
167
case 'listServers': {
168
const servers = this._listServers();
169
return servers as T;
170
}
171
case 'listToolsForServer': {
172
const { serverId } = arg as IServerIdArg;
173
const tools = await this._listToolsForServer(serverId);
174
return tools as T;
175
}
176
case 'callToolForServer': {
177
const { serverId, name, args, chatSessionResource } = arg as ICallToolForServerArgs;
178
const result = await this._callToolForServer(serverId, name, args || {}, chatSessionResource, cancellationToken);
179
return result as T;
180
}
181
case 'listResourcesForServer': {
182
const { serverId } = arg as IServerIdArg;
183
const resources = await this._listResourcesForServer(serverId);
184
return resources as T;
185
}
186
case 'readResourceForServer': {
187
const { serverId, uri } = arg as IReadResourceForServerArgs;
188
const result = await this._readResourceForServer(serverId, uri, cancellationToken);
189
return result as T;
190
}
191
case 'listResourceTemplatesForServer': {
192
const { serverId } = arg as IServerIdArg;
193
const templates = await this._listResourceTemplatesForServer(serverId);
194
return templates as T;
195
}
196
}
197
198
throw new Error(`Invalid call: ${command}`);
199
}
200
201
private _listServers(): readonly IMcpGatewayServerDescriptor[] {
202
const servers = this._mcpService.servers.get();
203
const result: IMcpGatewayServerDescriptor[] = [];
204
for (const server of servers) {
205
result.push({ id: server.definition.id, label: server.definition.label });
206
}
207
this._logService.debug(`[McpGateway][ToolBroker] listServers result: ${result.length} server(s): [${result.map(s => s.label).join(', ')}]`);
208
return result;
209
}
210
211
private async _listToolsForServer(serverId: string): Promise<readonly MCP.Tool[]> {
212
const server = this._getServerById(serverId);
213
if (!server) {
214
this._logService.warn(`[McpGateway][ToolBroker] listToolsForServer: unknown server '${serverId}'`);
215
return [];
216
}
217
if (!await this._shouldUseCachedData(server)) {
218
this._logService.debug(`[McpGateway][ToolBroker] Server '${serverId}' not ready, skipping tool listing`);
219
return [];
220
}
221
const tools = server.tools.get()
222
.filter(t => t.visibility & McpToolVisibility.Model)
223
.map(t => t.definition);
224
this._logService.debug(`[McpGateway][ToolBroker] listToolsForServer '${serverId}': ${tools.length} tool(s)`);
225
return tools;
226
}
227
228
private async _callToolForServer(serverId: string, name: string, args: Record<string, unknown>, chatSessionResource?: string, token: CancellationToken = CancellationToken.None): Promise<MCP.CallToolResult> {
229
this._logService.debug(`[McpGateway][ToolBroker] callToolForServer '${serverId}' tool '${name}' with args: ${JSON.stringify(args)}`);
230
231
const server = this._getServerById(serverId);
232
if (!server) {
233
throw new Error(`Unknown server: ${serverId}`);
234
}
235
236
const tool = server.tools.get().find(t =>
237
t.definition.name === name && (t.visibility & McpToolVisibility.Model)
238
);
239
if (!tool) {
240
throw new Error(`Unknown tool '${name}' on server '${serverId}'`);
241
}
242
243
const context = chatSessionResource ? { chatSessionResource: URI.parse(chatSessionResource) } : undefined;
244
const result = await tool.call(args, context, token);
245
this._logService.debug(`[McpGateway][ToolBroker] Tool '${name}' on '${serverId}' completed (isError=${result.isError ?? false}, content blocks=${result.content.length})`);
246
return result;
247
}
248
249
private async _listResourcesForServer(serverId: string): Promise<readonly MCP.Resource[]> {
250
const server = this._getServerById(serverId);
251
if (!server) {
252
this._logService.warn(`[McpGateway][ToolBroker] listResourcesForServer: unknown server '${serverId}'`);
253
return [];
254
}
255
if (!await this._shouldUseCachedData(server)) {
256
return [];
257
}
258
259
const capabilities = server.capabilities.get();
260
if (!capabilities || !(capabilities & McpCapability.Resources)) {
261
this._logService.debug(`[McpGateway][ToolBroker] Server '${serverId}' has no resource capability`);
262
return [];
263
}
264
265
try {
266
const resources = await McpServer.callOn(server, h => h.listResources());
267
this._logService.debug(`[McpGateway][ToolBroker] Server '${serverId}' listed ${resources.length} resource(s)`);
268
return resources;
269
} catch (error) {
270
this._logService.warn(`[McpGateway][ToolBroker] Server '${serverId}' failed to list resources`, error);
271
return [];
272
}
273
}
274
275
private async _readResourceForServer(serverId: string, uri: string, token: CancellationToken = CancellationToken.None): Promise<MCP.ReadResourceResult> {
276
const server = this._getServerById(serverId);
277
if (!server) {
278
throw new Error(`Unknown server: ${serverId}`);
279
}
280
281
this._logService.debug(`[McpGateway][ToolBroker] readResourceForServer '${uri}' from server '${serverId}'`);
282
const result = await McpServer.callOn(server, h => h.readResource({ uri }, token), token);
283
this._logService.debug(`[McpGateway][ToolBroker] readResourceForServer returned ${result.contents.length} content(s)`);
284
return result;
285
}
286
287
private async _listResourceTemplatesForServer(serverId: string): Promise<readonly MCP.ResourceTemplate[]> {
288
const server = this._getServerById(serverId);
289
if (!server) {
290
this._logService.warn(`[McpGateway][ToolBroker] listResourceTemplatesForServer: unknown server '${serverId}'`);
291
return [];
292
}
293
if (!await this._shouldUseCachedData(server)) {
294
return [];
295
}
296
297
const capabilities = server.capabilities.get();
298
if (!capabilities || !(capabilities & McpCapability.Resources)) {
299
return [];
300
}
301
302
try {
303
const resourceTemplates = await McpServer.callOn(server, h => h.listResourceTemplates());
304
this._logService.debug(`[McpGateway][ToolBroker] Server '${serverId}' listed ${resourceTemplates.length} resource template(s)`);
305
return resourceTemplates;
306
} catch (error) {
307
this._logService.warn(`[McpGateway][ToolBroker] Server '${serverId}' failed to list resource templates`, error);
308
return [];
309
}
310
}
311
312
private async _ensureServerReady(server: IMcpServer): Promise<boolean> {
313
const cacheState = server.cacheState.get();
314
if (cacheState !== McpServerCacheState.Unknown && cacheState !== McpServerCacheState.Outdated) {
315
return true;
316
}
317
318
this._logService.debug(`[McpGateway][ToolBroker] Server '${server.definition.id}' not ready (cacheState=${cacheState}), starting...`);
319
try {
320
const ready = await startServerAndWaitForLiveTools(server, {
321
promptType: 'all-untrusted',
322
errorOnUserInteraction: true,
323
});
324
this._logService.debug(`[McpGateway][ToolBroker] Server '${server.definition.id}' ready=${ready}`);
325
return ready;
326
} catch (error) {
327
this._logService.warn(`[McpGateway][ToolBroker] Server '${server.definition.id}' failed to start`, error);
328
return false;
329
}
330
}
331
}
332
333