Path: blob/main/src/vs/workbench/contrib/mcp/common/mcpTaskManager.ts
5252 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { disposableTimeout } from '../../../../base/common/async.js';6import { CancellationToken, CancellationTokenSource } from '../../../../base/common/cancellation.js';7import { CancellationError } from '../../../../base/common/errors.js';8import { Emitter } from '../../../../base/common/event.js';9import { Disposable, DisposableMap, DisposableStore, IDisposable, toDisposable } from '../../../../base/common/lifecycle.js';10import { generateUuid } from '../../../../base/common/uuid.js';11import type { McpServerRequestHandler } from './mcpServerRequestHandler.js';12import { McpError } from './mcpTypes.js';13import { MCP } from './modelContextProtocol.js';1415export interface IMcpTaskInternal extends IDisposable {16readonly id: string;17onDidUpdateState(task: MCP.Task): void;18setHandler(handler: McpServerRequestHandler | undefined): void;19}2021interface TaskEntry extends IDisposable {22task: MCP.Task;23result?: MCP.Result;24error?: MCP.Error;25cts: CancellationTokenSource;26/** Time when the task was created (client time), used to calculate TTL expiration */27createdAtTime: number;28/** Promise that resolves when the task execution completes */29executionPromise: Promise<void>;30}3132/**33* Manages in-memory task state for server-side MCP tasks (sampling and elicitation).34* Also tracks client-side tasks to survive handler reconnections.35* Lifecycle is tied to the McpServer instance.36*/37export class McpTaskManager extends Disposable {38private readonly _serverTasks = this._register(new DisposableMap<string, TaskEntry>());39private readonly _clientTasks = this._register(new DisposableMap<string, IMcpTaskInternal>());40private readonly _onDidUpdateTask = this._register(new Emitter<MCP.Task>());41public readonly onDidUpdateTask = this._onDidUpdateTask.event;4243/**44* Attach a new handler to this task manager.45* Updates all client tasks to use the new handler.46*/47setHandler(handler: McpServerRequestHandler | undefined): void {48for (const task of this._clientTasks.values()) {49task.setHandler(handler);50}51}5253/**54* Get a client task by ID for status notification handling.55*/56getClientTask(taskId: string): IMcpTaskInternal | undefined {57return this._clientTasks.get(taskId);58}5960/**61* Track a new client task.62*/63adoptClientTask(task: IMcpTaskInternal): void {64this._clientTasks.set(task.id, task);65}6667/**68* Untracks a client task.69*/70abandonClientTask(taskId: string): void {71this._clientTasks.deleteAndDispose(taskId);72}7374/**75* Create a new task and execute it asynchronously.76* Returns the task immediately while execution continues in the background.77*/78public createTask<TResult extends MCP.Result>(79ttl: number | null,80executor: (token: CancellationToken) => Promise<TResult>81): MCP.CreateTaskResult {82const taskId = generateUuid();83const createdAt = new Date().toISOString();84const createdAtTime = Date.now();8586const task: MCP.Task = {87taskId,88status: 'working',89createdAt,90ttl,91lastUpdatedAt: new Date().toISOString(),92pollInterval: 1000, // Suggest 1 second polling interval93};9495const store = new DisposableStore();96const cts = new CancellationTokenSource();97store.add(toDisposable(() => cts.dispose(true)));9899const executionPromise = this._executeTask(taskId, executor, cts.token);100101// Delete the task after its TTL. Or, if no TTL is given, delete it shortly after the task completes.102if (ttl) {103store.add(disposableTimeout(() => this._serverTasks.deleteAndDispose(taskId), ttl));104} else {105executionPromise.finally(() => {106const timeout = this._register(disposableTimeout(() => {107this._serverTasks.deleteAndDispose(taskId);108this._store.delete(timeout);109}, 60_000));110});111}112113this._serverTasks.set(taskId, {114task,115cts,116dispose: () => store.dispose(),117createdAtTime,118executionPromise,119});120121return { task };122}123124/**125* Execute a task asynchronously and update its state.126*/127private async _executeTask<TResult extends MCP.Result>(128taskId: string,129executor: (token: CancellationToken) => Promise<TResult>,130token: CancellationToken131): Promise<void> {132try {133const result = await executor(token);134this._updateTaskStatus(taskId, 'completed', undefined, result);135} catch (error) {136if (error instanceof CancellationError) {137this._updateTaskStatus(taskId, 'cancelled', 'Task was cancelled by the client');138} else if (error instanceof McpError) {139this._updateTaskStatus(taskId, 'failed', error.message, undefined, {140code: error.code,141message: error.message,142data: error.data,143});144} else if (error instanceof Error) {145this._updateTaskStatus(taskId, 'failed', error.message, undefined, {146code: MCP.INTERNAL_ERROR,147message: error.message,148});149} else {150this._updateTaskStatus(taskId, 'failed', 'Unknown error', undefined, {151code: MCP.INTERNAL_ERROR,152message: 'Unknown error',153});154}155}156}157158/**159* Update task status and optionally store result or error.160*/161private _updateTaskStatus(162taskId: string,163status: MCP.TaskStatus,164statusMessage?: string,165result?: MCP.Result,166error?: MCP.Error167): void {168const entry = this._serverTasks.get(taskId);169if (!entry) {170return;171}172173entry.task.status = status;174entry.task.lastUpdatedAt = new Date().toISOString();175176if (statusMessage !== undefined) {177entry.task.statusMessage = statusMessage;178}179if (result !== undefined) {180entry.result = result;181}182if (error !== undefined) {183entry.error = error;184}185186this._onDidUpdateTask.fire({ ...entry.task });187}188189/**190* Get the current state of a task.191* Returns an error if the task doesn't exist or has expired.192*/193public getTask(taskId: string): MCP.GetTaskResult {194const entry = this._serverTasks.get(taskId);195if (!entry) {196throw new McpError(MCP.INVALID_PARAMS, `Task not found: ${taskId}`);197}198199return { ...entry.task };200}201202/**203* Get the result of a completed task.204* Blocks until the task completes if it's still in progress.205*/206public async getTaskResult(taskId: string): Promise<MCP.GetTaskPayloadResult> {207const entry = this._serverTasks.get(taskId);208if (!entry) {209throw new McpError(MCP.INVALID_PARAMS, `Task not found: ${taskId}`);210}211212if (entry.task.status === 'working' || entry.task.status === 'input_required') {213await entry.executionPromise;214}215216// Refresh entry after waiting217const updatedEntry = this._serverTasks.get(taskId);218if (!updatedEntry) {219throw new McpError(MCP.INVALID_PARAMS, `Task not found: ${taskId}`);220}221222if (updatedEntry.error) {223throw new McpError(updatedEntry.error.code, updatedEntry.error.message, updatedEntry.error.data);224}225226if (!updatedEntry.result) {227throw new McpError(MCP.INTERNAL_ERROR, 'Task completed but no result available');228}229230return updatedEntry.result;231}232233/**234* Cancel a task.235*/236public cancelTask(taskId: string): MCP.CancelTaskResult {237const entry = this._serverTasks.get(taskId);238if (!entry) {239throw new McpError(MCP.INVALID_PARAMS, `Task not found: ${taskId}`);240}241242// Check if already in terminal status243if (entry.task.status === 'completed' || entry.task.status === 'failed' || entry.task.status === 'cancelled') {244throw new McpError(MCP.INVALID_PARAMS, `Cannot cancel task in ${entry.task.status} status`);245}246247entry.task.status = 'cancelled';248entry.task.statusMessage = 'Task was cancelled by the client';249entry.cts.cancel();250251return { ...entry.task };252}253254/**255* List all tasks.256*/257public listTasks(): MCP.ListTasksResult {258const tasks: MCP.Task[] = [];259260for (const entry of this._serverTasks.values()) {261tasks.push({ ...entry.task });262}263264return { tasks };265}266}267268269