Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/workbench/contrib/mcp/common/mcpTaskManager.ts
5252 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { disposableTimeout } from '../../../../base/common/async.js';
7
import { CancellationToken, CancellationTokenSource } from '../../../../base/common/cancellation.js';
8
import { CancellationError } from '../../../../base/common/errors.js';
9
import { Emitter } from '../../../../base/common/event.js';
10
import { Disposable, DisposableMap, DisposableStore, IDisposable, toDisposable } from '../../../../base/common/lifecycle.js';
11
import { generateUuid } from '../../../../base/common/uuid.js';
12
import type { McpServerRequestHandler } from './mcpServerRequestHandler.js';
13
import { McpError } from './mcpTypes.js';
14
import { MCP } from './modelContextProtocol.js';
15
16
export interface IMcpTaskInternal extends IDisposable {
17
readonly id: string;
18
onDidUpdateState(task: MCP.Task): void;
19
setHandler(handler: McpServerRequestHandler | undefined): void;
20
}
21
22
interface TaskEntry extends IDisposable {
23
task: MCP.Task;
24
result?: MCP.Result;
25
error?: MCP.Error;
26
cts: CancellationTokenSource;
27
/** Time when the task was created (client time), used to calculate TTL expiration */
28
createdAtTime: number;
29
/** Promise that resolves when the task execution completes */
30
executionPromise: Promise<void>;
31
}
32
33
/**
34
* Manages in-memory task state for server-side MCP tasks (sampling and elicitation).
35
* Also tracks client-side tasks to survive handler reconnections.
36
* Lifecycle is tied to the McpServer instance.
37
*/
38
export class McpTaskManager extends Disposable {
39
private readonly _serverTasks = this._register(new DisposableMap<string, TaskEntry>());
40
private readonly _clientTasks = this._register(new DisposableMap<string, IMcpTaskInternal>());
41
private readonly _onDidUpdateTask = this._register(new Emitter<MCP.Task>());
42
public readonly onDidUpdateTask = this._onDidUpdateTask.event;
43
44
/**
45
* Attach a new handler to this task manager.
46
* Updates all client tasks to use the new handler.
47
*/
48
setHandler(handler: McpServerRequestHandler | undefined): void {
49
for (const task of this._clientTasks.values()) {
50
task.setHandler(handler);
51
}
52
}
53
54
/**
55
* Get a client task by ID for status notification handling.
56
*/
57
getClientTask(taskId: string): IMcpTaskInternal | undefined {
58
return this._clientTasks.get(taskId);
59
}
60
61
/**
62
* Track a new client task.
63
*/
64
adoptClientTask(task: IMcpTaskInternal): void {
65
this._clientTasks.set(task.id, task);
66
}
67
68
/**
69
* Untracks a client task.
70
*/
71
abandonClientTask(taskId: string): void {
72
this._clientTasks.deleteAndDispose(taskId);
73
}
74
75
/**
76
* Create a new task and execute it asynchronously.
77
* Returns the task immediately while execution continues in the background.
78
*/
79
public createTask<TResult extends MCP.Result>(
80
ttl: number | null,
81
executor: (token: CancellationToken) => Promise<TResult>
82
): MCP.CreateTaskResult {
83
const taskId = generateUuid();
84
const createdAt = new Date().toISOString();
85
const createdAtTime = Date.now();
86
87
const task: MCP.Task = {
88
taskId,
89
status: 'working',
90
createdAt,
91
ttl,
92
lastUpdatedAt: new Date().toISOString(),
93
pollInterval: 1000, // Suggest 1 second polling interval
94
};
95
96
const store = new DisposableStore();
97
const cts = new CancellationTokenSource();
98
store.add(toDisposable(() => cts.dispose(true)));
99
100
const executionPromise = this._executeTask(taskId, executor, cts.token);
101
102
// Delete the task after its TTL. Or, if no TTL is given, delete it shortly after the task completes.
103
if (ttl) {
104
store.add(disposableTimeout(() => this._serverTasks.deleteAndDispose(taskId), ttl));
105
} else {
106
executionPromise.finally(() => {
107
const timeout = this._register(disposableTimeout(() => {
108
this._serverTasks.deleteAndDispose(taskId);
109
this._store.delete(timeout);
110
}, 60_000));
111
});
112
}
113
114
this._serverTasks.set(taskId, {
115
task,
116
cts,
117
dispose: () => store.dispose(),
118
createdAtTime,
119
executionPromise,
120
});
121
122
return { task };
123
}
124
125
/**
126
* Execute a task asynchronously and update its state.
127
*/
128
private async _executeTask<TResult extends MCP.Result>(
129
taskId: string,
130
executor: (token: CancellationToken) => Promise<TResult>,
131
token: CancellationToken
132
): Promise<void> {
133
try {
134
const result = await executor(token);
135
this._updateTaskStatus(taskId, 'completed', undefined, result);
136
} catch (error) {
137
if (error instanceof CancellationError) {
138
this._updateTaskStatus(taskId, 'cancelled', 'Task was cancelled by the client');
139
} else if (error instanceof McpError) {
140
this._updateTaskStatus(taskId, 'failed', error.message, undefined, {
141
code: error.code,
142
message: error.message,
143
data: error.data,
144
});
145
} else if (error instanceof Error) {
146
this._updateTaskStatus(taskId, 'failed', error.message, undefined, {
147
code: MCP.INTERNAL_ERROR,
148
message: error.message,
149
});
150
} else {
151
this._updateTaskStatus(taskId, 'failed', 'Unknown error', undefined, {
152
code: MCP.INTERNAL_ERROR,
153
message: 'Unknown error',
154
});
155
}
156
}
157
}
158
159
/**
160
* Update task status and optionally store result or error.
161
*/
162
private _updateTaskStatus(
163
taskId: string,
164
status: MCP.TaskStatus,
165
statusMessage?: string,
166
result?: MCP.Result,
167
error?: MCP.Error
168
): void {
169
const entry = this._serverTasks.get(taskId);
170
if (!entry) {
171
return;
172
}
173
174
entry.task.status = status;
175
entry.task.lastUpdatedAt = new Date().toISOString();
176
177
if (statusMessage !== undefined) {
178
entry.task.statusMessage = statusMessage;
179
}
180
if (result !== undefined) {
181
entry.result = result;
182
}
183
if (error !== undefined) {
184
entry.error = error;
185
}
186
187
this._onDidUpdateTask.fire({ ...entry.task });
188
}
189
190
/**
191
* Get the current state of a task.
192
* Returns an error if the task doesn't exist or has expired.
193
*/
194
public getTask(taskId: string): MCP.GetTaskResult {
195
const entry = this._serverTasks.get(taskId);
196
if (!entry) {
197
throw new McpError(MCP.INVALID_PARAMS, `Task not found: ${taskId}`);
198
}
199
200
return { ...entry.task };
201
}
202
203
/**
204
* Get the result of a completed task.
205
* Blocks until the task completes if it's still in progress.
206
*/
207
public async getTaskResult(taskId: string): Promise<MCP.GetTaskPayloadResult> {
208
const entry = this._serverTasks.get(taskId);
209
if (!entry) {
210
throw new McpError(MCP.INVALID_PARAMS, `Task not found: ${taskId}`);
211
}
212
213
if (entry.task.status === 'working' || entry.task.status === 'input_required') {
214
await entry.executionPromise;
215
}
216
217
// Refresh entry after waiting
218
const updatedEntry = this._serverTasks.get(taskId);
219
if (!updatedEntry) {
220
throw new McpError(MCP.INVALID_PARAMS, `Task not found: ${taskId}`);
221
}
222
223
if (updatedEntry.error) {
224
throw new McpError(updatedEntry.error.code, updatedEntry.error.message, updatedEntry.error.data);
225
}
226
227
if (!updatedEntry.result) {
228
throw new McpError(MCP.INTERNAL_ERROR, 'Task completed but no result available');
229
}
230
231
return updatedEntry.result;
232
}
233
234
/**
235
* Cancel a task.
236
*/
237
public cancelTask(taskId: string): MCP.CancelTaskResult {
238
const entry = this._serverTasks.get(taskId);
239
if (!entry) {
240
throw new McpError(MCP.INVALID_PARAMS, `Task not found: ${taskId}`);
241
}
242
243
// Check if already in terminal status
244
if (entry.task.status === 'completed' || entry.task.status === 'failed' || entry.task.status === 'cancelled') {
245
throw new McpError(MCP.INVALID_PARAMS, `Cannot cancel task in ${entry.task.status} status`);
246
}
247
248
entry.task.status = 'cancelled';
249
entry.task.statusMessage = 'Task was cancelled by the client';
250
entry.cts.cancel();
251
252
return { ...entry.task };
253
}
254
255
/**
256
* List all tasks.
257
*/
258
public listTasks(): MCP.ListTasksResult {
259
const tasks: MCP.Task[] = [];
260
261
for (const entry of this._serverTasks.values()) {
262
tasks.push({ ...entry.task });
263
}
264
265
return { tasks };
266
}
267
}
268
269