Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/workbench/contrib/mcp/common/mcpServerRequestHandler.ts
3296 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { equals } from '../../../../base/common/arrays.js';
7
import { assertNever } from '../../../../base/common/assert.js';
8
import { DeferredPromise, IntervalTimer } from '../../../../base/common/async.js';
9
import { CancellationToken } from '../../../../base/common/cancellation.js';
10
import { CancellationError } from '../../../../base/common/errors.js';
11
import { Emitter } from '../../../../base/common/event.js';
12
import { Iterable } from '../../../../base/common/iterator.js';
13
import { Disposable, DisposableStore } from '../../../../base/common/lifecycle.js';
14
import { autorun } from '../../../../base/common/observable.js';
15
import { IInstantiationService } from '../../../../platform/instantiation/common/instantiation.js';
16
import { canLog, ILogger, log, LogLevel } from '../../../../platform/log/common/log.js';
17
import { IProductService } from '../../../../platform/product/common/productService.js';
18
import { IMcpMessageTransport } from './mcpRegistryTypes.js';
19
import { IMcpClientMethods, McpConnectionState, McpError, MpcResponseError } from './mcpTypes.js';
20
import { MCP } from './modelContextProtocol.js';
21
22
/**
23
* Maps request IDs to handlers.
24
*/
25
interface PendingRequest {
26
promise: DeferredPromise<MCP.Result>;
27
}
28
29
export interface McpRoot {
30
uri: string;
31
name?: string;
32
}
33
34
export interface IMcpServerRequestHandlerOptions extends IMcpClientMethods {
35
/** MCP message transport */
36
launch: IMcpMessageTransport;
37
/** Logger instance. */
38
logger: ILogger;
39
/** Log level MCP messages is logged at */
40
requestLogLevel?: LogLevel;
41
}
42
43
/**
44
* Request handler for communicating with an MCP server.
45
*
46
* Handles sending requests and receiving responses, with automatic
47
* handling of ping requests and typed client request methods.
48
*/
49
export class McpServerRequestHandler extends Disposable {
50
private _nextRequestId = 1;
51
private readonly _pendingRequests = new Map<MCP.RequestId, PendingRequest>();
52
53
private _hasAnnouncedRoots = false;
54
private _roots: MCP.Root[] = [];
55
56
public set roots(roots: MCP.Root[]) {
57
if (!equals(this._roots, roots)) {
58
this._roots = roots;
59
if (this._hasAnnouncedRoots) {
60
this.sendNotification({ method: 'notifications/roots/list_changed' });
61
this._hasAnnouncedRoots = false;
62
}
63
}
64
}
65
66
private _serverInit!: MCP.InitializeResult;
67
public get capabilities(): MCP.ServerCapabilities {
68
return this._serverInit.capabilities;
69
}
70
71
public get serverInfo(): MCP.Implementation {
72
return this._serverInit.serverInfo;
73
}
74
75
public get serverInstructions(): string | undefined {
76
return this._serverInit.instructions;
77
}
78
79
// Event emitters for server notifications
80
private readonly _onDidReceiveCancelledNotification = this._register(new Emitter<MCP.CancelledNotification>());
81
readonly onDidReceiveCancelledNotification = this._onDidReceiveCancelledNotification.event;
82
83
private readonly _onDidReceiveProgressNotification = this._register(new Emitter<MCP.ProgressNotification>());
84
readonly onDidReceiveProgressNotification = this._onDidReceiveProgressNotification.event;
85
86
private readonly _onDidChangeResourceList = this._register(new Emitter<void>());
87
readonly onDidChangeResourceList = this._onDidChangeResourceList.event;
88
89
private readonly _onDidUpdateResource = this._register(new Emitter<MCP.ResourceUpdatedNotification>());
90
readonly onDidUpdateResource = this._onDidUpdateResource.event;
91
92
private readonly _onDidChangeToolList = this._register(new Emitter<void>());
93
readonly onDidChangeToolList = this._onDidChangeToolList.event;
94
95
private readonly _onDidChangePromptList = this._register(new Emitter<void>());
96
readonly onDidChangePromptList = this._onDidChangePromptList.event;
97
98
/**
99
* Connects to the MCP server and does the initialization handshake.
100
* @throws MpcResponseError if the server fails to initialize.
101
*/
102
public static async create(instaService: IInstantiationService, opts: IMcpServerRequestHandlerOptions, token?: CancellationToken) {
103
const mcp = new McpServerRequestHandler(opts);
104
const store = new DisposableStore();
105
try {
106
const timer = store.add(new IntervalTimer());
107
timer.cancelAndSet(() => {
108
opts.logger.info('Waiting for server to respond to `initialize` request...');
109
}, 5000);
110
111
await instaService.invokeFunction(async accessor => {
112
const productService = accessor.get(IProductService);
113
const initialized = await mcp.sendRequest<MCP.InitializeRequest, MCP.InitializeResult>({
114
method: 'initialize',
115
params: {
116
protocolVersion: MCP.LATEST_PROTOCOL_VERSION,
117
capabilities: {
118
roots: { listChanged: true },
119
sampling: opts.createMessageRequestHandler ? {} : undefined,
120
elicitation: opts.elicitationRequestHandler ? {} : undefined,
121
},
122
clientInfo: {
123
name: productService.nameLong,
124
version: productService.version,
125
}
126
}
127
}, token);
128
129
mcp._serverInit = initialized;
130
mcp._sendLogLevelToServer(opts.logger.getLevel());
131
132
mcp.sendNotification<MCP.InitializedNotification>({
133
method: 'notifications/initialized'
134
});
135
});
136
137
return mcp;
138
} catch (e) {
139
mcp.dispose();
140
throw e;
141
} finally {
142
store.dispose();
143
}
144
}
145
146
public readonly logger: ILogger;
147
private readonly _launch: IMcpMessageTransport;
148
private readonly _requestLogLevel: LogLevel;
149
private readonly _createMessageRequestHandler: IMcpServerRequestHandlerOptions['createMessageRequestHandler'];
150
private readonly _elicitationRequestHandler: IMcpServerRequestHandlerOptions['elicitationRequestHandler'];
151
152
protected constructor({
153
launch,
154
logger,
155
createMessageRequestHandler,
156
elicitationRequestHandler,
157
requestLogLevel = LogLevel.Debug,
158
}: IMcpServerRequestHandlerOptions) {
159
super();
160
this._launch = launch;
161
this.logger = logger;
162
this._requestLogLevel = requestLogLevel;
163
this._createMessageRequestHandler = createMessageRequestHandler;
164
this._elicitationRequestHandler = elicitationRequestHandler;
165
166
this._register(launch.onDidReceiveMessage(message => this.handleMessage(message)));
167
this._register(autorun(reader => {
168
const state = launch.state.read(reader).state;
169
// the handler will get disposed when the launch stops, but if we're still
170
// create()'ing we need to make sure to cancel the initialize request.
171
if (state === McpConnectionState.Kind.Error || state === McpConnectionState.Kind.Stopped) {
172
this.cancelAllRequests();
173
}
174
}));
175
176
// Listen for log level changes and forward them to the MCP server
177
this._register(logger.onDidChangeLogLevel((logLevel) => {
178
this._sendLogLevelToServer(logLevel);
179
}));
180
}
181
182
/**
183
* Send a client request to the server and return the response.
184
*
185
* @param request The request to send
186
* @param token Cancellation token
187
* @param timeoutMs Optional timeout in milliseconds
188
* @returns A promise that resolves with the response
189
*/
190
private async sendRequest<T extends MCP.ClientRequest, R extends MCP.ServerResult>(
191
request: Pick<T, 'params' | 'method'>,
192
token: CancellationToken = CancellationToken.None
193
): Promise<R> {
194
if (this._store.isDisposed) {
195
return Promise.reject(new CancellationError());
196
}
197
198
const id = this._nextRequestId++;
199
200
// Create the full JSON-RPC request
201
const jsonRpcRequest: MCP.JSONRPCRequest = {
202
jsonrpc: MCP.JSONRPC_VERSION,
203
id,
204
...request
205
};
206
207
const promise = new DeferredPromise<MCP.ServerResult>();
208
// Store the pending request
209
this._pendingRequests.set(id, { promise });
210
// Set up cancellation
211
const cancelListener = token.onCancellationRequested(() => {
212
if (!promise.isSettled) {
213
this._pendingRequests.delete(id);
214
this.sendNotification({ method: 'notifications/cancelled', params: { requestId: id } });
215
promise.cancel();
216
}
217
cancelListener.dispose();
218
});
219
220
// Send the request
221
this.send(jsonRpcRequest);
222
const ret = promise.p.finally(() => {
223
cancelListener.dispose();
224
this._pendingRequests.delete(id);
225
});
226
227
return ret as Promise<R>;
228
}
229
230
private send(mcp: MCP.JSONRPCMessage) {
231
if (canLog(this.logger.getLevel(), this._requestLogLevel)) { // avoid building the string if we don't need to
232
log(this.logger, this._requestLogLevel, `[editor -> server] ${JSON.stringify(mcp)}`);
233
}
234
235
this._launch.send(mcp);
236
}
237
238
/**
239
* Handles paginated requests by making multiple requests until all items are retrieved.
240
*
241
* @param method The method name to call
242
* @param getItems Function to extract the array of items from a result
243
* @param initialParams Initial parameters
244
* @param token Cancellation token
245
* @returns Promise with all items combined
246
*/
247
private async *sendRequestPaginated<T extends MCP.PaginatedRequest & MCP.ClientRequest, R extends MCP.PaginatedResult, I>(method: T['method'], getItems: (result: R) => I[], initialParams?: Omit<T['params'], 'jsonrpc' | 'id'>, token: CancellationToken = CancellationToken.None): AsyncIterable<I[]> {
248
let nextCursor: MCP.Cursor | undefined = undefined;
249
250
do {
251
const params: T['params'] = {
252
...initialParams,
253
cursor: nextCursor
254
};
255
256
const result: R = await this.sendRequest<T, R>({ method, params }, token);
257
yield getItems(result);
258
nextCursor = result.nextCursor;
259
} while (nextCursor !== undefined && !token.isCancellationRequested);
260
}
261
262
private sendNotification<N extends MCP.ClientNotification>(notification: N): void {
263
this.send({ ...notification, jsonrpc: MCP.JSONRPC_VERSION });
264
}
265
266
/**
267
* Handle incoming messages from the server
268
*/
269
private handleMessage(message: MCP.JSONRPCMessage): void {
270
if (canLog(this.logger.getLevel(), this._requestLogLevel)) { // avoid building the string if we don't need to
271
log(this.logger, this._requestLogLevel, `[server -> editor] ${JSON.stringify(message)}`);
272
}
273
274
// Handle responses to our requests
275
if ('id' in message) {
276
if ('result' in message) {
277
this.handleResult(message);
278
} else if ('error' in message) {
279
this.handleError(message);
280
}
281
}
282
283
// Handle requests from the server
284
if ('method' in message) {
285
if ('id' in message) {
286
this.handleServerRequest(message as MCP.JSONRPCRequest & MCP.ServerRequest);
287
} else {
288
this.handleServerNotification(message as MCP.JSONRPCNotification & MCP.ServerNotification);
289
}
290
}
291
}
292
293
/**
294
* Handle successful responses
295
*/
296
private handleResult(response: MCP.JSONRPCResponse): void {
297
const request = this._pendingRequests.get(response.id);
298
if (request) {
299
this._pendingRequests.delete(response.id);
300
request.promise.complete(response.result);
301
}
302
}
303
304
/**
305
* Handle error responses
306
*/
307
private handleError(response: MCP.JSONRPCError): void {
308
const request = this._pendingRequests.get(response.id);
309
if (request) {
310
this._pendingRequests.delete(response.id);
311
request.promise.error(new MpcResponseError(response.error.message, response.error.code, response.error.data));
312
}
313
}
314
315
/**
316
* Handle incoming server requests
317
*/
318
private async handleServerRequest(request: MCP.JSONRPCRequest & MCP.ServerRequest): Promise<void> {
319
try {
320
let response: MCP.Result | undefined;
321
if (request.method === 'ping') {
322
response = this.handlePing(request);
323
} else if (request.method === 'roots/list') {
324
response = this.handleRootsList(request);
325
} else if (request.method === 'sampling/createMessage' && this._createMessageRequestHandler) {
326
response = await this._createMessageRequestHandler(request.params as MCP.CreateMessageRequest['params']);
327
} else if (request.method === 'elicitation/create' && this._elicitationRequestHandler) {
328
response = await this._elicitationRequestHandler(request.params as MCP.ElicitRequest['params']);
329
} else {
330
throw McpError.methodNotFound(request.method);
331
}
332
this.respondToRequest(request, response);
333
} catch (e) {
334
if (!(e instanceof McpError)) {
335
this.logger.error(`Error handling request ${request.method}:`, e);
336
e = McpError.unknown(e);
337
}
338
339
const errorResponse: MCP.JSONRPCError = {
340
jsonrpc: MCP.JSONRPC_VERSION,
341
id: request.id,
342
error: {
343
code: e.code,
344
message: e.message,
345
data: e.data,
346
}
347
};
348
349
this.send(errorResponse);
350
}
351
}
352
/**
353
* Handle incoming server notifications
354
*/
355
private handleServerNotification(request: MCP.JSONRPCNotification & MCP.ServerNotification): void {
356
switch (request.method) {
357
case 'notifications/message':
358
return this.handleLoggingNotification(request);
359
case 'notifications/cancelled':
360
this._onDidReceiveCancelledNotification.fire(request);
361
return this.handleCancelledNotification(request);
362
case 'notifications/progress':
363
this._onDidReceiveProgressNotification.fire(request);
364
return;
365
case 'notifications/resources/list_changed':
366
this._onDidChangeResourceList.fire();
367
return;
368
case 'notifications/resources/updated':
369
this._onDidUpdateResource.fire(request);
370
return;
371
case 'notifications/tools/list_changed':
372
this._onDidChangeToolList.fire();
373
return;
374
case 'notifications/prompts/list_changed':
375
this._onDidChangePromptList.fire();
376
return;
377
}
378
}
379
380
private handleCancelledNotification(request: MCP.CancelledNotification): void {
381
const pendingRequest = this._pendingRequests.get(request.params.requestId);
382
if (pendingRequest) {
383
this._pendingRequests.delete(request.params.requestId);
384
pendingRequest.promise.cancel();
385
}
386
}
387
388
private handleLoggingNotification(request: MCP.LoggingMessageNotification): void {
389
let contents = typeof request.params.data === 'string' ? request.params.data : JSON.stringify(request.params.data);
390
if (request.params.logger) {
391
contents = `${request.params.logger}: ${contents}`;
392
}
393
394
switch (request.params?.level) {
395
case 'debug':
396
this.logger.debug(contents);
397
break;
398
case 'info':
399
case 'notice':
400
this.logger.info(contents);
401
break;
402
case 'warning':
403
this.logger.warn(contents);
404
break;
405
case 'error':
406
case 'critical':
407
case 'alert':
408
case 'emergency':
409
this.logger.error(contents);
410
break;
411
default:
412
this.logger.info(contents);
413
break;
414
}
415
}
416
417
/**
418
* Send a generic response to a request
419
*/
420
private respondToRequest(request: MCP.JSONRPCRequest, result: MCP.Result): void {
421
const response: MCP.JSONRPCResponse = {
422
jsonrpc: MCP.JSONRPC_VERSION,
423
id: request.id,
424
result
425
};
426
this.send(response);
427
}
428
429
/**
430
* Send a response to a ping request
431
*/
432
private handlePing(_request: MCP.PingRequest): {} {
433
return {};
434
}
435
436
/**
437
* Send a response to a roots/list request
438
*/
439
private handleRootsList(_request: MCP.ListRootsRequest): MCP.ListRootsResult {
440
this._hasAnnouncedRoots = true;
441
return { roots: this._roots };
442
}
443
444
private cancelAllRequests() {
445
this._pendingRequests.forEach(pending => pending.promise.cancel());
446
this._pendingRequests.clear();
447
}
448
449
public override dispose(): void {
450
this.cancelAllRequests();
451
super.dispose();
452
}
453
454
/**
455
* Forwards log level changes to the MCP server if it supports logging
456
*/
457
private async _sendLogLevelToServer(logLevel: LogLevel): Promise<void> {
458
try {
459
// Only send if the server supports logging capabilities
460
if (!this.capabilities.logging) {
461
return;
462
}
463
464
await this.setLevel({ level: mapLogLevelToMcp(logLevel) });
465
} catch (error) {
466
this.logger.error(`Failed to set MCP server log level: ${error}`);
467
}
468
}
469
470
/**
471
* Send an initialize request
472
*/
473
initialize(params: MCP.InitializeRequest['params'], token?: CancellationToken): Promise<MCP.InitializeResult> {
474
return this.sendRequest<MCP.InitializeRequest, MCP.InitializeResult>({ method: 'initialize', params }, token);
475
}
476
477
/**
478
* List available resources
479
*/
480
listResources(params?: MCP.ListResourcesRequest['params'], token?: CancellationToken): Promise<MCP.Resource[]> {
481
return Iterable.asyncToArrayFlat(this.listResourcesIterable(params, token));
482
}
483
484
/**
485
* List available resources (iterable)
486
*/
487
listResourcesIterable(params?: MCP.ListResourcesRequest['params'], token?: CancellationToken): AsyncIterable<MCP.Resource[]> {
488
return this.sendRequestPaginated<MCP.ListResourcesRequest, MCP.ListResourcesResult, MCP.Resource>('resources/list', result => result.resources, params, token);
489
}
490
491
/**
492
* Read a specific resource
493
*/
494
readResource(params: MCP.ReadResourceRequest['params'], token?: CancellationToken): Promise<MCP.ReadResourceResult> {
495
return this.sendRequest<MCP.ReadResourceRequest, MCP.ReadResourceResult>({ method: 'resources/read', params }, token);
496
}
497
498
/**
499
* List available resource templates
500
*/
501
listResourceTemplates(params?: MCP.ListResourceTemplatesRequest['params'], token?: CancellationToken): Promise<MCP.ResourceTemplate[]> {
502
return Iterable.asyncToArrayFlat(this.sendRequestPaginated<MCP.ListResourceTemplatesRequest, MCP.ListResourceTemplatesResult, MCP.ResourceTemplate>('resources/templates/list', result => result.resourceTemplates, params, token));
503
}
504
505
/**
506
* Subscribe to resource updates
507
*/
508
subscribe(params: MCP.SubscribeRequest['params'], token?: CancellationToken): Promise<MCP.EmptyResult> {
509
return this.sendRequest<MCP.SubscribeRequest, MCP.EmptyResult>({ method: 'resources/subscribe', params }, token);
510
}
511
512
/**
513
* Unsubscribe from resource updates
514
*/
515
unsubscribe(params: MCP.UnsubscribeRequest['params'], token?: CancellationToken): Promise<MCP.EmptyResult> {
516
return this.sendRequest<MCP.UnsubscribeRequest, MCP.EmptyResult>({ method: 'resources/unsubscribe', params }, token);
517
}
518
519
/**
520
* List available prompts
521
*/
522
listPrompts(params?: MCP.ListPromptsRequest['params'], token?: CancellationToken): Promise<MCP.Prompt[]> {
523
return Iterable.asyncToArrayFlat(this.sendRequestPaginated<MCP.ListPromptsRequest, MCP.ListPromptsResult, MCP.Prompt>('prompts/list', result => result.prompts, params, token));
524
}
525
526
/**
527
* Get a specific prompt
528
*/
529
getPrompt(params: MCP.GetPromptRequest['params'], token?: CancellationToken): Promise<MCP.GetPromptResult> {
530
return this.sendRequest<MCP.GetPromptRequest, MCP.GetPromptResult>({ method: 'prompts/get', params }, token);
531
}
532
533
/**
534
* List available tools
535
*/
536
listTools(params?: MCP.ListToolsRequest['params'], token?: CancellationToken): Promise<MCP.Tool[]> {
537
return Iterable.asyncToArrayFlat(this.sendRequestPaginated<MCP.ListToolsRequest, MCP.ListToolsResult, MCP.Tool>('tools/list', result => result.tools, params, token));
538
}
539
540
/**
541
* Call a specific tool
542
*/
543
callTool(params: MCP.CallToolRequest['params'] & MCP.Request['params'], token?: CancellationToken): Promise<MCP.CallToolResult> {
544
return this.sendRequest<MCP.CallToolRequest, MCP.CallToolResult>({ method: 'tools/call', params }, token);
545
}
546
547
/**
548
* Set the logging level
549
*/
550
setLevel(params: MCP.SetLevelRequest['params'], token?: CancellationToken): Promise<MCP.EmptyResult> {
551
return this.sendRequest<MCP.SetLevelRequest, MCP.EmptyResult>({ method: 'logging/setLevel', params }, token);
552
}
553
554
/**
555
* Find completions for an argument
556
*/
557
complete(params: MCP.CompleteRequest['params'], token?: CancellationToken): Promise<MCP.CompleteResult> {
558
return this.sendRequest<MCP.CompleteRequest, MCP.CompleteResult>({ method: 'completion/complete', params }, token);
559
}
560
}
561
562
563
/**
564
* Maps VSCode LogLevel to MCP LoggingLevel
565
*/
566
function mapLogLevelToMcp(logLevel: LogLevel): MCP.LoggingLevel {
567
switch (logLevel) {
568
case LogLevel.Trace:
569
return 'debug'; // MCP doesn't have trace, use debug
570
case LogLevel.Debug:
571
return 'debug';
572
case LogLevel.Info:
573
return 'info';
574
case LogLevel.Warning:
575
return 'warning';
576
case LogLevel.Error:
577
return 'error';
578
case LogLevel.Off:
579
return 'emergency'; // MCP doesn't have off, use emergency
580
default:
581
return assertNever(logLevel); // Off and other levels are not supported
582
}
583
}
584
585