Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/workbench/contrib/mcp/common/mcpServerRequestHandler.ts
5272 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { equals } from '../../../../base/common/arrays.js';
7
import { assertNever, softAssertNever } from '../../../../base/common/assert.js';
8
import { DeferredPromise, disposableTimeout, IntervalTimer } from '../../../../base/common/async.js';
9
import { CancellationToken, CancellationTokenSource } from '../../../../base/common/cancellation.js';
10
import { CancellationError } from '../../../../base/common/errors.js';
11
import { Emitter } from '../../../../base/common/event.js';
12
import { Iterable } from '../../../../base/common/iterator.js';
13
import { Disposable, DisposableStore, toDisposable } from '../../../../base/common/lifecycle.js';
14
import { autorun, ISettableObservable, ObservablePromise, observableValue, transaction } from '../../../../base/common/observable.js';
15
import { IInstantiationService } from '../../../../platform/instantiation/common/instantiation.js';
16
import { canLog, ILogger, log, LogLevel } from '../../../../platform/log/common/log.js';
17
import { IProductService } from '../../../../platform/product/common/productService.js';
18
import { IMcpMessageTransport } from './mcpRegistryTypes.js';
19
import { IMcpTaskInternal, McpTaskManager } from './mcpTaskManager.js';
20
import { IMcpClientMethods, McpConnectionState, McpError, MpcResponseError } from './mcpTypes.js';
21
import { isTaskResult, translateMcpLogMessage } from './mcpTypesUtils.js';
22
import { MCP } from './modelContextProtocol.js';
23
24
/**
25
* Maps request IDs to handlers.
26
*/
27
interface PendingRequest {
28
promise: DeferredPromise<MCP.Result>;
29
}
30
31
export interface McpRoot {
32
uri: string;
33
name?: string;
34
}
35
36
export interface IMcpServerRequestHandlerOptions extends IMcpClientMethods {
37
/** MCP message transport */
38
launch: IMcpMessageTransport;
39
/** Logger instance. */
40
logger: ILogger;
41
/** Log level MCP messages is logged at */
42
requestLogLevel?: LogLevel;
43
/** Task manager for server-side MCP tasks (shared across reconnections) */
44
taskManager: McpTaskManager;
45
}
46
47
/**
48
* Request handler for communicating with an MCP server.
49
*
50
* Handles sending requests and receiving responses, with automatic
51
* handling of ping requests and typed client request methods.
52
*/
53
export class McpServerRequestHandler extends Disposable {
54
private _nextRequestId = 1;
55
private readonly _pendingRequests = new Map<MCP.RequestId, PendingRequest>();
56
57
private _hasAnnouncedRoots = false;
58
private _roots: MCP.Root[] = [];
59
60
public set roots(roots: MCP.Root[]) {
61
if (!equals(this._roots, roots)) {
62
this._roots = roots;
63
if (this._hasAnnouncedRoots) {
64
this.sendNotification({ method: 'notifications/roots/list_changed' });
65
this._hasAnnouncedRoots = false;
66
}
67
}
68
}
69
70
private _serverInit!: MCP.InitializeResult;
71
public get capabilities(): MCP.ServerCapabilities {
72
return this._serverInit.capabilities;
73
}
74
75
public get serverInfo(): MCP.Implementation {
76
return this._serverInit.serverInfo;
77
}
78
79
public get serverInstructions(): string | undefined {
80
return this._serverInit.instructions;
81
}
82
83
// Event emitters for server notifications
84
private readonly _onDidReceiveCancelledNotification = this._register(new Emitter<MCP.CancelledNotification>());
85
readonly onDidReceiveCancelledNotification = this._onDidReceiveCancelledNotification.event;
86
87
private readonly _onDidReceiveProgressNotification = this._register(new Emitter<MCP.ProgressNotification>());
88
readonly onDidReceiveProgressNotification = this._onDidReceiveProgressNotification.event;
89
90
private readonly _onDidReceiveElicitationCompleteNotification = this._register(new Emitter<MCP.ElicitationCompleteNotification>());
91
readonly onDidReceiveElicitationCompleteNotification = this._onDidReceiveElicitationCompleteNotification.event;
92
93
private readonly _onDidChangeResourceList = this._register(new Emitter<void>());
94
readonly onDidChangeResourceList = this._onDidChangeResourceList.event;
95
96
private readonly _onDidUpdateResource = this._register(new Emitter<MCP.ResourceUpdatedNotification>());
97
readonly onDidUpdateResource = this._onDidUpdateResource.event;
98
99
private readonly _onDidChangeToolList = this._register(new Emitter<void>());
100
readonly onDidChangeToolList = this._onDidChangeToolList.event;
101
102
private readonly _onDidChangePromptList = this._register(new Emitter<void>());
103
readonly onDidChangePromptList = this._onDidChangePromptList.event;
104
105
/**
106
* Connects to the MCP server and does the initialization handshake.
107
* @throws MpcResponseError if the server fails to initialize.
108
*/
109
public static async create(instaService: IInstantiationService, opts: IMcpServerRequestHandlerOptions, token?: CancellationToken) {
110
const mcp = new McpServerRequestHandler(opts);
111
const store = new DisposableStore();
112
try {
113
const timer = store.add(new IntervalTimer());
114
timer.cancelAndSet(() => {
115
opts.logger.info('Waiting for server to respond to `initialize` request...');
116
}, 5000);
117
118
await instaService.invokeFunction(async accessor => {
119
const productService = accessor.get(IProductService);
120
const initialized = await mcp.sendRequest<MCP.InitializeRequest, MCP.InitializeResult>({
121
method: 'initialize',
122
params: {
123
protocolVersion: MCP.LATEST_PROTOCOL_VERSION,
124
capabilities: {
125
roots: { listChanged: true },
126
sampling: opts.createMessageRequestHandler ? {} : undefined,
127
elicitation: opts.elicitationRequestHandler ? { form: {}, url: {} } : undefined,
128
tasks: {
129
list: {},
130
cancel: {},
131
requests: {
132
sampling: opts.createMessageRequestHandler ? { createMessage: {} } : undefined,
133
elicitation: opts.elicitationRequestHandler ? { create: {} } : undefined,
134
},
135
},
136
extensions: {
137
'io.modelcontextprotocol/ui': {
138
mimeTypes: ['text/html;profile=mcp-app']
139
}
140
}
141
},
142
clientInfo: {
143
name: productService.nameLong,
144
version: productService.version,
145
}
146
}
147
}, token);
148
mcp._serverInit = initialized;
149
mcp._sendLogLevelToServer(opts.logger.getLevel());
150
151
mcp.sendNotification<MCP.InitializedNotification>({
152
method: 'notifications/initialized'
153
});
154
});
155
156
return mcp;
157
} catch (e) {
158
mcp.dispose();
159
throw e;
160
} finally {
161
store.dispose();
162
}
163
}
164
165
public readonly logger: ILogger;
166
private readonly _launch: IMcpMessageTransport;
167
private readonly _requestLogLevel: LogLevel;
168
private readonly _createMessageRequestHandler: IMcpServerRequestHandlerOptions['createMessageRequestHandler'];
169
private readonly _elicitationRequestHandler: IMcpServerRequestHandlerOptions['elicitationRequestHandler'];
170
private readonly _taskManager: McpTaskManager;
171
172
protected constructor({
173
launch,
174
logger,
175
createMessageRequestHandler,
176
elicitationRequestHandler,
177
requestLogLevel = LogLevel.Debug,
178
taskManager,
179
}: IMcpServerRequestHandlerOptions) {
180
super();
181
this._launch = launch;
182
this.logger = logger;
183
this._requestLogLevel = requestLogLevel;
184
this._createMessageRequestHandler = createMessageRequestHandler;
185
this._elicitationRequestHandler = elicitationRequestHandler;
186
this._taskManager = taskManager;
187
188
// Attach this handler to the task manager
189
this._taskManager.setHandler(this);
190
this._register(this._taskManager.onDidUpdateTask(task => {
191
this.send({
192
jsonrpc: MCP.JSONRPC_VERSION,
193
method: 'notifications/tasks/status',
194
params: task
195
} satisfies MCP.TaskStatusNotification);
196
}));
197
this._register(toDisposable(() => this._taskManager.setHandler(undefined)));
198
199
this._register(launch.onDidReceiveMessage(message => this.handleMessage(message)));
200
this._register(autorun(reader => {
201
const state = launch.state.read(reader).state;
202
// the handler will get disposed when the launch stops, but if we're still
203
// create()'ing we need to make sure to cancel the initialize request.
204
if (state === McpConnectionState.Kind.Error || state === McpConnectionState.Kind.Stopped) {
205
this.cancelAllRequests();
206
}
207
}));
208
209
// Listen for log level changes and forward them to the MCP server
210
this._register(logger.onDidChangeLogLevel((logLevel) => {
211
this._sendLogLevelToServer(logLevel);
212
}));
213
}
214
215
/**
216
* Send a client request to the server and return the response.
217
*
218
* @param request The request to send
219
* @param token Cancellation token
220
* @param timeoutMs Optional timeout in milliseconds
221
* @returns A promise that resolves with the response
222
*/
223
private async sendRequest<T extends MCP.ClientRequest, R extends MCP.ServerResult>(
224
request: Pick<T, 'params' | 'method'>,
225
token: CancellationToken = CancellationToken.None
226
): Promise<R> {
227
if (this._store.isDisposed) {
228
return Promise.reject(new CancellationError());
229
}
230
231
const id = this._nextRequestId++;
232
233
// Create the full JSON-RPC request
234
const jsonRpcRequest: MCP.JSONRPCRequest = {
235
jsonrpc: MCP.JSONRPC_VERSION,
236
id,
237
...request
238
};
239
240
const promise = new DeferredPromise<MCP.ServerResult>();
241
// Store the pending request
242
this._pendingRequests.set(id, { promise });
243
// Set up cancellation
244
const cancelListener = token.onCancellationRequested(() => {
245
if (!promise.isSettled) {
246
this._pendingRequests.delete(id);
247
this.sendNotification({ method: 'notifications/cancelled', params: { requestId: id } });
248
promise.cancel();
249
}
250
cancelListener.dispose();
251
});
252
253
// Send the request
254
this.send(jsonRpcRequest);
255
const ret = promise.p.finally(() => {
256
cancelListener.dispose();
257
this._pendingRequests.delete(id);
258
});
259
260
return ret as Promise<R>;
261
}
262
263
private send(mcp: MCP.JSONRPCMessage) {
264
if (canLog(this.logger.getLevel(), this._requestLogLevel)) { // avoid building the string if we don't need to
265
log(this.logger, this._requestLogLevel, `[editor -> server] ${JSON.stringify(mcp)}`);
266
}
267
268
this._launch.send(mcp);
269
}
270
271
/**
272
* Handles paginated requests by making multiple requests until all items are retrieved.
273
*
274
* @param method The method name to call
275
* @param getItems Function to extract the array of items from a result
276
* @param initialParams Initial parameters
277
* @param token Cancellation token
278
* @returns Promise with all items combined
279
*/
280
private async *sendRequestPaginated<T extends MCP.PaginatedRequest & MCP.ClientRequest, R extends MCP.PaginatedResult, I>(method: T['method'], getItems: (result: R) => I[], initialParams?: Omit<T['params'], 'jsonrpc' | 'id'>, token: CancellationToken = CancellationToken.None): AsyncIterable<I[]> {
281
let nextCursor: MCP.Cursor | undefined = undefined;
282
283
do {
284
const params: T['params'] = {
285
...initialParams,
286
cursor: nextCursor
287
};
288
289
const result: R = await this.sendRequest<T, R>({ method, params }, token);
290
yield getItems(result);
291
nextCursor = result.nextCursor;
292
} while (nextCursor !== undefined && !token.isCancellationRequested);
293
}
294
295
private sendNotification<N extends MCP.ClientNotification>(notification: Omit<N, 'jsonrpc'>): void {
296
this.send({ ...notification, jsonrpc: MCP.JSONRPC_VERSION });
297
}
298
299
/**
300
* Handle incoming messages from the server
301
*/
302
private handleMessage(message: MCP.JSONRPCMessage): void {
303
if (canLog(this.logger.getLevel(), this._requestLogLevel)) { // avoid building the string if we don't need to
304
log(this.logger, this._requestLogLevel, `[server -> editor] ${JSON.stringify(message)}`);
305
}
306
307
// Handle responses to our requests
308
if ('id' in message) {
309
if ('result' in message) {
310
this.handleResult(message);
311
} else if ('error' in message) {
312
this.handleError(message);
313
}
314
}
315
316
// Handle requests from the server
317
if ('method' in message) {
318
if ('id' in message) {
319
this.handleServerRequest(message as MCP.JSONRPCRequest & MCP.ServerRequest);
320
} else {
321
this.handleServerNotification(message as MCP.JSONRPCNotification & MCP.ServerNotification);
322
}
323
}
324
}
325
326
/**
327
* Handle successful responses
328
*/
329
private handleResult(response: MCP.JSONRPCResultResponse): void {
330
if (response.id !== undefined) {
331
const request = this._pendingRequests.get(response.id);
332
if (request) {
333
this._pendingRequests.delete(response.id);
334
request.promise.complete(response.result);
335
}
336
}
337
}
338
339
/**
340
* Handle error responses
341
*/
342
private handleError(response: MCP.JSONRPCErrorResponse): void {
343
if (response.id !== undefined) {
344
const request = this._pendingRequests.get(response.id);
345
if (request) {
346
this._pendingRequests.delete(response.id);
347
request.promise.error(new MpcResponseError(response.error.message, response.error.code, response.error.data));
348
}
349
}
350
}
351
352
/**
353
* Handle incoming server requests
354
*/
355
private async handleServerRequest(request: MCP.JSONRPCRequest & MCP.ServerRequest): Promise<void> {
356
try {
357
let response: MCP.Result | undefined;
358
if (request.method === 'ping') {
359
response = this.handlePing(request);
360
} else if (request.method === 'roots/list') {
361
response = this.handleRootsList(request);
362
} else if (request.method === 'sampling/createMessage' && this._createMessageRequestHandler) {
363
// Check if this is a task-augmented request
364
if (request.params.task) {
365
const taskResult = this._taskManager.createTask(
366
request.params.task.ttl ?? null,
367
(token) => this._createMessageRequestHandler!(request.params, token)
368
);
369
taskResult._meta ??= {};
370
taskResult._meta['io.modelcontextprotocol/related-task'] = { taskId: taskResult.task.taskId };
371
response = taskResult;
372
} else {
373
response = await this._createMessageRequestHandler(request.params);
374
}
375
} else if (request.method === 'elicitation/create' && this._elicitationRequestHandler) {
376
// Check if this is a task-augmented request
377
if (request.params.task) {
378
const taskResult = this._taskManager.createTask(
379
request.params.task.ttl ?? null,
380
(token) => this._elicitationRequestHandler!(request.params, token)
381
);
382
taskResult._meta ??= {};
383
taskResult._meta['io.modelcontextprotocol/related-task'] = { taskId: taskResult.task.taskId };
384
response = taskResult;
385
} else {
386
response = await this._elicitationRequestHandler(request.params);
387
}
388
} else if (request.method === 'tasks/get') {
389
response = this._taskManager.getTask(request.params.taskId);
390
} else if (request.method === 'tasks/result') {
391
response = await this._taskManager.getTaskResult(request.params.taskId);
392
} else if (request.method === 'tasks/cancel') {
393
response = this._taskManager.cancelTask(request.params.taskId);
394
} else if (request.method === 'tasks/list') {
395
response = this._taskManager.listTasks();
396
} else {
397
throw McpError.methodNotFound(request.method);
398
}
399
this.respondToRequest(request, response);
400
} catch (e) {
401
if (!(e instanceof McpError)) {
402
this.logger.error(`Error handling request ${request.method}:`, e);
403
e = McpError.unknown(e);
404
}
405
406
const errorResponse: MCP.JSONRPCErrorResponse = {
407
jsonrpc: MCP.JSONRPC_VERSION,
408
id: request.id,
409
error: {
410
code: e.code,
411
message: e.message,
412
data: e.data,
413
}
414
};
415
416
this.send(errorResponse);
417
}
418
}
419
/**
420
* Handle incoming server notifications
421
*/
422
private handleServerNotification(request: MCP.JSONRPCNotification & MCP.ServerNotification): void {
423
switch (request.method) {
424
case 'notifications/message':
425
return this.handleLoggingNotification(request);
426
case 'notifications/cancelled':
427
this._onDidReceiveCancelledNotification.fire(request);
428
return this.handleCancelledNotification(request);
429
case 'notifications/progress':
430
this._onDidReceiveProgressNotification.fire(request);
431
return;
432
case 'notifications/resources/list_changed':
433
this._onDidChangeResourceList.fire();
434
return;
435
case 'notifications/resources/updated':
436
this._onDidUpdateResource.fire(request);
437
return;
438
case 'notifications/tools/list_changed':
439
this._onDidChangeToolList.fire();
440
return;
441
case 'notifications/prompts/list_changed':
442
this._onDidChangePromptList.fire();
443
return;
444
case 'notifications/elicitation/complete':
445
this._onDidReceiveElicitationCompleteNotification.fire(request);
446
return;
447
case 'notifications/tasks/status':
448
this._taskManager.getClientTask(request.params.taskId)?.onDidUpdateState(request.params);
449
return;
450
default:
451
softAssertNever(request);
452
}
453
}
454
455
private handleCancelledNotification(request: MCP.CancelledNotification): void {
456
if (request.params.requestId) {
457
const pendingRequest = this._pendingRequests.get(request.params.requestId);
458
if (pendingRequest) {
459
this._pendingRequests.delete(request.params.requestId);
460
pendingRequest.promise.cancel();
461
}
462
}
463
}
464
465
private handleLoggingNotification(request: MCP.LoggingMessageNotification): void {
466
translateMcpLogMessage(this.logger, request.params);
467
}
468
469
/**
470
* Send a generic response to a request
471
*/
472
private respondToRequest(request: MCP.JSONRPCRequest, result: MCP.Result): void {
473
const response: MCP.JSONRPCResponse = {
474
jsonrpc: MCP.JSONRPC_VERSION,
475
id: request.id,
476
result
477
};
478
this.send(response);
479
}
480
481
/**
482
* Send a response to a ping request
483
*/
484
private handlePing(_request: MCP.PingRequest): {} {
485
return {};
486
}
487
488
/**
489
* Send a response to a roots/list request
490
*/
491
private handleRootsList(_request: MCP.ListRootsRequest): MCP.ListRootsResult {
492
this._hasAnnouncedRoots = true;
493
return { roots: this._roots };
494
}
495
496
private cancelAllRequests() {
497
this._pendingRequests.forEach(pending => pending.promise.cancel());
498
this._pendingRequests.clear();
499
}
500
501
public override dispose(): void {
502
this.cancelAllRequests();
503
super.dispose();
504
}
505
506
/**
507
* Forwards log level changes to the MCP server if it supports logging
508
*/
509
private async _sendLogLevelToServer(logLevel: LogLevel): Promise<void> {
510
try {
511
// Only send if the server supports logging capabilities
512
if (!this.capabilities.logging) {
513
return;
514
}
515
516
await this.setLevel({ level: mapLogLevelToMcp(logLevel) });
517
} catch (error) {
518
this.logger.error(`Failed to set MCP server log level: ${error}`);
519
}
520
}
521
522
/**
523
* Send an initialize request
524
*/
525
initialize(params: MCP.InitializeRequest['params'], token?: CancellationToken): Promise<MCP.InitializeResult> {
526
return this.sendRequest<MCP.InitializeRequest, MCP.InitializeResult>({ method: 'initialize', params }, token);
527
}
528
529
/**
530
* List available resources
531
*/
532
listResources(params?: MCP.ListResourcesRequest['params'], token?: CancellationToken): Promise<MCP.Resource[]> {
533
return Iterable.asyncToArrayFlat(this.listResourcesIterable(params, token));
534
}
535
536
/**
537
* List available resources (iterable)
538
*/
539
listResourcesIterable(params?: MCP.ListResourcesRequest['params'], token?: CancellationToken): AsyncIterable<MCP.Resource[]> {
540
return this.sendRequestPaginated<MCP.ListResourcesRequest, MCP.ListResourcesResult, MCP.Resource>('resources/list', result => result.resources, params, token);
541
}
542
543
/**
544
* Read a specific resource
545
*/
546
readResource(params: MCP.ReadResourceRequest['params'], token?: CancellationToken): Promise<MCP.ReadResourceResult> {
547
return this.sendRequest<MCP.ReadResourceRequest, MCP.ReadResourceResult>({ method: 'resources/read', params }, token);
548
}
549
550
/**
551
* List available resource templates
552
*/
553
listResourceTemplates(params?: MCP.ListResourceTemplatesRequest['params'], token?: CancellationToken): Promise<MCP.ResourceTemplate[]> {
554
return Iterable.asyncToArrayFlat(this.sendRequestPaginated<MCP.ListResourceTemplatesRequest, MCP.ListResourceTemplatesResult, MCP.ResourceTemplate>('resources/templates/list', result => result.resourceTemplates, params, token));
555
}
556
557
/**
558
* Subscribe to resource updates
559
*/
560
subscribe(params: MCP.SubscribeRequest['params'], token?: CancellationToken): Promise<MCP.EmptyResult> {
561
return this.sendRequest<MCP.SubscribeRequest, MCP.EmptyResult>({ method: 'resources/subscribe', params }, token);
562
}
563
564
/**
565
* Unsubscribe from resource updates
566
*/
567
unsubscribe(params: MCP.UnsubscribeRequest['params'], token?: CancellationToken): Promise<MCP.EmptyResult> {
568
return this.sendRequest<MCP.UnsubscribeRequest, MCP.EmptyResult>({ method: 'resources/unsubscribe', params }, token);
569
}
570
571
/**
572
* List available prompts
573
*/
574
listPrompts(params?: MCP.ListPromptsRequest['params'], token?: CancellationToken): Promise<MCP.Prompt[]> {
575
return Iterable.asyncToArrayFlat(this.sendRequestPaginated<MCP.ListPromptsRequest, MCP.ListPromptsResult, MCP.Prompt>('prompts/list', result => result.prompts, params, token));
576
}
577
578
/**
579
* Get a specific prompt
580
*/
581
getPrompt(params: MCP.GetPromptRequest['params'], token?: CancellationToken): Promise<MCP.GetPromptResult> {
582
return this.sendRequest<MCP.GetPromptRequest, MCP.GetPromptResult>({ method: 'prompts/get', params }, token);
583
}
584
585
/**
586
* List available tools
587
*/
588
listTools(params?: MCP.ListToolsRequest['params'], token?: CancellationToken): Promise<MCP.Tool[]> {
589
return Iterable.asyncToArrayFlat(this.sendRequestPaginated<MCP.ListToolsRequest, MCP.ListToolsResult, MCP.Tool>('tools/list', result => result.tools, params, token));
590
}
591
592
/**
593
* Call a specific tool. Supports tasks automatically if `task` is set on the request.
594
*/
595
async callTool(params: MCP.CallToolRequest['params'] & MCP.Request['params'], token?: CancellationToken): Promise<MCP.CallToolResult> {
596
const response = await this.sendRequest<MCP.CallToolRequest, MCP.CallToolResult | MCP.CreateTaskResult>({ method: 'tools/call', params }, token);
597
598
if (isTaskResult(response)) {
599
const task = new McpTask<MCP.CallToolResult>(response.task, token);
600
this._taskManager.adoptClientTask(task);
601
task.setHandler(this);
602
return task.result.finally(() => {
603
this._taskManager.abandonClientTask(task.id);
604
});
605
}
606
607
return response;
608
609
}
610
611
/**
612
* Set the logging level
613
*/
614
setLevel(params: MCP.SetLevelRequest['params'], token?: CancellationToken): Promise<MCP.EmptyResult> {
615
return this.sendRequest<MCP.SetLevelRequest, MCP.EmptyResult>({ method: 'logging/setLevel', params }, token);
616
}
617
618
/**
619
* Find completions for an argument
620
*/
621
complete(params: MCP.CompleteRequest['params'], token?: CancellationToken): Promise<MCP.CompleteResult> {
622
return this.sendRequest<MCP.CompleteRequest, MCP.CompleteResult>({ method: 'completion/complete', params }, token);
623
}
624
625
/**
626
* Get task status
627
*/
628
getTask(params: { taskId: string }, token?: CancellationToken): Promise<MCP.GetTaskResult> {
629
return this.sendRequest<MCP.GetTaskRequest, MCP.GetTaskResult>({ method: 'tasks/get', params }, token);
630
}
631
632
/**
633
* Get task result
634
*/
635
getTaskResult(params: { taskId: string }, token?: CancellationToken): Promise<MCP.GetTaskPayloadResult> {
636
return this.sendRequest<MCP.GetTaskPayloadRequest, MCP.GetTaskPayloadResult>({ method: 'tasks/result', params }, token);
637
}
638
639
/**
640
* Cancel a task
641
*/
642
cancelTask(params: { taskId: string }, token?: CancellationToken): Promise<MCP.CancelTaskResult> {
643
return this.sendRequest<MCP.CancelTaskRequest, MCP.CancelTaskResult>({ method: 'tasks/cancel', params }, token);
644
}
645
646
/**
647
* List all tasks
648
*/
649
listTasks(params?: MCP.ListTasksRequest['params'], token?: CancellationToken): Promise<MCP.Task[]> {
650
return Iterable.asyncToArrayFlat(
651
this.sendRequestPaginated<MCP.ListTasksRequest, MCP.ListTasksResult, MCP.Task>(
652
'tasks/list', result => result.tasks, params, token
653
)
654
);
655
}
656
}
657
658
function isTaskInTerminalState(task: MCP.Task): boolean {
659
return task.status === 'completed' || task.status === 'failed' || task.status === 'cancelled';
660
}
661
662
/**
663
* Implementation of a task that handles polling, status notifications, and handler reconnections. It implements the task polling loop internally and can also be
664
* updated externally via `onDidUpdateState`, when notifications are received
665
* for example.
666
* @internal
667
*/
668
export class McpTask<T extends MCP.Result> extends Disposable implements IMcpTaskInternal {
669
private readonly promise = new DeferredPromise<T>();
670
671
public get result(): Promise<T> {
672
return this.promise.p;
673
}
674
675
public get id() {
676
return this._task.taskId;
677
}
678
679
private _lastTaskState: ISettableObservable<MCP.Task>;
680
private _handler = observableValue<McpServerRequestHandler | undefined>('mcpTaskHandler', undefined);
681
682
constructor(
683
private readonly _task: MCP.Task,
684
_token: CancellationToken = CancellationToken.None
685
) {
686
super();
687
688
const expiresAt = _task.ttl ? (Date.now() + _task.ttl) : undefined;
689
this._lastTaskState = observableValue('lastTaskState', this._task);
690
691
const store = this._register(new DisposableStore());
692
693
// Handle external cancellation token
694
if (_token.isCancellationRequested) {
695
this._lastTaskState.set({ ...this._task, status: 'cancelled' }, undefined);
696
} else {
697
store.add(_token.onCancellationRequested(() => {
698
const current = this._lastTaskState.get();
699
if (!isTaskInTerminalState(current)) {
700
this._lastTaskState.set({ ...current, status: 'cancelled' }, undefined);
701
}
702
}));
703
}
704
705
// Handle TTL expiration with an explicit timeout
706
if (expiresAt) {
707
const ttlTimeout = expiresAt - Date.now();
708
if (ttlTimeout <= 0) {
709
this._lastTaskState.set({ ...this._task, status: 'cancelled', statusMessage: 'Task timed out.' }, undefined);
710
} else {
711
store.add(disposableTimeout(() => {
712
const current = this._lastTaskState.get();
713
if (!isTaskInTerminalState(current)) {
714
this._lastTaskState.set({ ...current, status: 'cancelled', statusMessage: 'Task timed out.' }, undefined);
715
}
716
}, ttlTimeout));
717
}
718
}
719
720
// A `tasks/result` call triggered by an input_required state.
721
const inputRequiredLookup = observableValue<ObservablePromise<MCP.Task> | undefined>('activeResultLookup', undefined);
722
723
// 1. Poll for task updates when the task isn't in a terminal state
724
store.add(autorun(reader => {
725
const current = this._lastTaskState.read(reader);
726
if (isTaskInTerminalState(current)) {
727
return;
728
}
729
730
// When a task goes into the input_required state, by spec we should call
731
// `tasks/result` which can return an SSE stream of task updates. No need
732
// to poll while such a lookup is going on, but once it resolves we should
733
// clear and update our state.
734
const lookup = inputRequiredLookup.read(reader);
735
if (lookup) {
736
const result = lookup.promiseResult.read(reader);
737
return transaction(tx => {
738
if (!result) {
739
// still ongoing
740
} else if (result.data) {
741
inputRequiredLookup.set(undefined, tx);
742
this._lastTaskState.set(result.data, tx);
743
} else {
744
inputRequiredLookup.set(undefined, tx);
745
if (result.error instanceof McpError && result.error.code === MCP.INVALID_PARAMS) {
746
this._lastTaskState.set({ ...current, status: 'cancelled' }, undefined);
747
} else {
748
// Maybe a connection error -- start polling again
749
this._lastTaskState.set({ ...current, status: 'working' }, undefined);
750
}
751
}
752
});
753
}
754
755
const handler = this._handler.read(reader);
756
if (!handler) {
757
return;
758
}
759
760
const pollInterval = _task.pollInterval ?? 2000;
761
const cts = new CancellationTokenSource(_token);
762
reader.store.add(toDisposable(() => cts.dispose(true)));
763
reader.store.add(disposableTimeout(() => {
764
handler.getTask({ taskId: current.taskId }, cts.token)
765
.catch((e): MCP.Task | undefined => {
766
if (e instanceof McpError && e.code === MCP.INVALID_PARAMS) {
767
return { ...current, status: 'cancelled' };
768
} else {
769
return { ...current }; // errors are already logged, keep in current state
770
}
771
})
772
.then(r => {
773
if (r && !cts.token.isCancellationRequested) {
774
this._lastTaskState.set(r, undefined);
775
}
776
});
777
}, pollInterval));
778
}));
779
780
// 2. Get the result once it's available (or propagate errors). Trigger
781
// input_required handling as needed. Only react when the status itself changes.
782
const lastStatus = this._lastTaskState.map(task => task.status);
783
store.add(autorun(reader => {
784
const status = lastStatus.read(reader);
785
if (status === 'failed') {
786
const current = this._lastTaskState.read(undefined);
787
this.promise.error(new Error(`Task ${current.taskId} failed: ${current.statusMessage ?? 'unknown error'}`));
788
store.dispose();
789
} else if (status === 'cancelled') {
790
this.promise.cancel();
791
store.dispose();
792
} else if (status === 'input_required') {
793
const handler = this._handler.read(reader);
794
if (handler) {
795
const current = this._lastTaskState.read(undefined);
796
const cts = new CancellationTokenSource(_token);
797
reader.store.add(toDisposable(() => cts.dispose(true)));
798
inputRequiredLookup.set(new ObservablePromise<MCP.Task>(handler.getTask({ taskId: current.taskId }, cts.token)), undefined);
799
}
800
} else if (status === 'completed') {
801
const handler = this._handler.read(reader);
802
if (handler) {
803
this.promise.settleWith(handler.getTaskResult({ taskId: _task.taskId }, _token) as Promise<T>);
804
store.dispose();
805
}
806
} else if (status === 'working') {
807
// no-op
808
} else {
809
softAssertNever(status);
810
}
811
}));
812
}
813
814
onDidUpdateState(task: MCP.Task) {
815
this._lastTaskState.set(task, undefined);
816
}
817
818
setHandler(handler: McpServerRequestHandler | undefined): void {
819
this._handler.set(handler, undefined);
820
}
821
}
822
823
/**
824
* Maps VSCode LogLevel to MCP LoggingLevel
825
*/
826
function mapLogLevelToMcp(logLevel: LogLevel): MCP.LoggingLevel {
827
switch (logLevel) {
828
case LogLevel.Trace:
829
return 'debug'; // MCP doesn't have trace, use debug
830
case LogLevel.Debug:
831
return 'debug';
832
case LogLevel.Info:
833
return 'info';
834
case LogLevel.Warning:
835
return 'warning';
836
case LogLevel.Error:
837
return 'error';
838
case LogLevel.Off:
839
return 'emergency'; // MCP doesn't have off, use emergency
840
default:
841
return assertNever(logLevel); // Off and other levels are not supported
842
}
843
}
844
845