Path: blob/main/src/vs/workbench/contrib/mcp/common/mcpServerRequestHandler.ts
5272 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { equals } from '../../../../base/common/arrays.js';6import { assertNever, softAssertNever } from '../../../../base/common/assert.js';7import { DeferredPromise, disposableTimeout, IntervalTimer } from '../../../../base/common/async.js';8import { CancellationToken, CancellationTokenSource } from '../../../../base/common/cancellation.js';9import { CancellationError } from '../../../../base/common/errors.js';10import { Emitter } from '../../../../base/common/event.js';11import { Iterable } from '../../../../base/common/iterator.js';12import { Disposable, DisposableStore, toDisposable } from '../../../../base/common/lifecycle.js';13import { autorun, ISettableObservable, ObservablePromise, observableValue, transaction } from '../../../../base/common/observable.js';14import { IInstantiationService } from '../../../../platform/instantiation/common/instantiation.js';15import { canLog, ILogger, log, LogLevel } from '../../../../platform/log/common/log.js';16import { IProductService } from '../../../../platform/product/common/productService.js';17import { IMcpMessageTransport } from './mcpRegistryTypes.js';18import { IMcpTaskInternal, McpTaskManager } from './mcpTaskManager.js';19import { IMcpClientMethods, McpConnectionState, McpError, MpcResponseError } from './mcpTypes.js';20import { isTaskResult, translateMcpLogMessage } from './mcpTypesUtils.js';21import { MCP } from './modelContextProtocol.js';2223/**24* Maps request IDs to handlers.25*/26interface PendingRequest {27promise: DeferredPromise<MCP.Result>;28}2930export interface McpRoot {31uri: string;32name?: string;33}3435export interface IMcpServerRequestHandlerOptions extends IMcpClientMethods {36/** MCP message transport */37launch: IMcpMessageTransport;38/** Logger instance. */39logger: ILogger;40/** Log level MCP messages is logged at */41requestLogLevel?: LogLevel;42/** Task manager for server-side MCP tasks (shared across reconnections) */43taskManager: McpTaskManager;44}4546/**47* Request handler for communicating with an MCP server.48*49* Handles sending requests and receiving responses, with automatic50* handling of ping requests and typed client request methods.51*/52export class McpServerRequestHandler extends Disposable {53private _nextRequestId = 1;54private readonly _pendingRequests = new Map<MCP.RequestId, PendingRequest>();5556private _hasAnnouncedRoots = false;57private _roots: MCP.Root[] = [];5859public set roots(roots: MCP.Root[]) {60if (!equals(this._roots, roots)) {61this._roots = roots;62if (this._hasAnnouncedRoots) {63this.sendNotification({ method: 'notifications/roots/list_changed' });64this._hasAnnouncedRoots = false;65}66}67}6869private _serverInit!: MCP.InitializeResult;70public get capabilities(): MCP.ServerCapabilities {71return this._serverInit.capabilities;72}7374public get serverInfo(): MCP.Implementation {75return this._serverInit.serverInfo;76}7778public get serverInstructions(): string | undefined {79return this._serverInit.instructions;80}8182// Event emitters for server notifications83private readonly _onDidReceiveCancelledNotification = this._register(new Emitter<MCP.CancelledNotification>());84readonly onDidReceiveCancelledNotification = this._onDidReceiveCancelledNotification.event;8586private readonly _onDidReceiveProgressNotification = this._register(new Emitter<MCP.ProgressNotification>());87readonly onDidReceiveProgressNotification = this._onDidReceiveProgressNotification.event;8889private readonly _onDidReceiveElicitationCompleteNotification = this._register(new Emitter<MCP.ElicitationCompleteNotification>());90readonly onDidReceiveElicitationCompleteNotification = this._onDidReceiveElicitationCompleteNotification.event;9192private readonly _onDidChangeResourceList = this._register(new Emitter<void>());93readonly onDidChangeResourceList = this._onDidChangeResourceList.event;9495private readonly _onDidUpdateResource = this._register(new Emitter<MCP.ResourceUpdatedNotification>());96readonly onDidUpdateResource = this._onDidUpdateResource.event;9798private readonly _onDidChangeToolList = this._register(new Emitter<void>());99readonly onDidChangeToolList = this._onDidChangeToolList.event;100101private readonly _onDidChangePromptList = this._register(new Emitter<void>());102readonly onDidChangePromptList = this._onDidChangePromptList.event;103104/**105* Connects to the MCP server and does the initialization handshake.106* @throws MpcResponseError if the server fails to initialize.107*/108public static async create(instaService: IInstantiationService, opts: IMcpServerRequestHandlerOptions, token?: CancellationToken) {109const mcp = new McpServerRequestHandler(opts);110const store = new DisposableStore();111try {112const timer = store.add(new IntervalTimer());113timer.cancelAndSet(() => {114opts.logger.info('Waiting for server to respond to `initialize` request...');115}, 5000);116117await instaService.invokeFunction(async accessor => {118const productService = accessor.get(IProductService);119const initialized = await mcp.sendRequest<MCP.InitializeRequest, MCP.InitializeResult>({120method: 'initialize',121params: {122protocolVersion: MCP.LATEST_PROTOCOL_VERSION,123capabilities: {124roots: { listChanged: true },125sampling: opts.createMessageRequestHandler ? {} : undefined,126elicitation: opts.elicitationRequestHandler ? { form: {}, url: {} } : undefined,127tasks: {128list: {},129cancel: {},130requests: {131sampling: opts.createMessageRequestHandler ? { createMessage: {} } : undefined,132elicitation: opts.elicitationRequestHandler ? { create: {} } : undefined,133},134},135extensions: {136'io.modelcontextprotocol/ui': {137mimeTypes: ['text/html;profile=mcp-app']138}139}140},141clientInfo: {142name: productService.nameLong,143version: productService.version,144}145}146}, token);147mcp._serverInit = initialized;148mcp._sendLogLevelToServer(opts.logger.getLevel());149150mcp.sendNotification<MCP.InitializedNotification>({151method: 'notifications/initialized'152});153});154155return mcp;156} catch (e) {157mcp.dispose();158throw e;159} finally {160store.dispose();161}162}163164public readonly logger: ILogger;165private readonly _launch: IMcpMessageTransport;166private readonly _requestLogLevel: LogLevel;167private readonly _createMessageRequestHandler: IMcpServerRequestHandlerOptions['createMessageRequestHandler'];168private readonly _elicitationRequestHandler: IMcpServerRequestHandlerOptions['elicitationRequestHandler'];169private readonly _taskManager: McpTaskManager;170171protected constructor({172launch,173logger,174createMessageRequestHandler,175elicitationRequestHandler,176requestLogLevel = LogLevel.Debug,177taskManager,178}: IMcpServerRequestHandlerOptions) {179super();180this._launch = launch;181this.logger = logger;182this._requestLogLevel = requestLogLevel;183this._createMessageRequestHandler = createMessageRequestHandler;184this._elicitationRequestHandler = elicitationRequestHandler;185this._taskManager = taskManager;186187// Attach this handler to the task manager188this._taskManager.setHandler(this);189this._register(this._taskManager.onDidUpdateTask(task => {190this.send({191jsonrpc: MCP.JSONRPC_VERSION,192method: 'notifications/tasks/status',193params: task194} satisfies MCP.TaskStatusNotification);195}));196this._register(toDisposable(() => this._taskManager.setHandler(undefined)));197198this._register(launch.onDidReceiveMessage(message => this.handleMessage(message)));199this._register(autorun(reader => {200const state = launch.state.read(reader).state;201// the handler will get disposed when the launch stops, but if we're still202// create()'ing we need to make sure to cancel the initialize request.203if (state === McpConnectionState.Kind.Error || state === McpConnectionState.Kind.Stopped) {204this.cancelAllRequests();205}206}));207208// Listen for log level changes and forward them to the MCP server209this._register(logger.onDidChangeLogLevel((logLevel) => {210this._sendLogLevelToServer(logLevel);211}));212}213214/**215* Send a client request to the server and return the response.216*217* @param request The request to send218* @param token Cancellation token219* @param timeoutMs Optional timeout in milliseconds220* @returns A promise that resolves with the response221*/222private async sendRequest<T extends MCP.ClientRequest, R extends MCP.ServerResult>(223request: Pick<T, 'params' | 'method'>,224token: CancellationToken = CancellationToken.None225): Promise<R> {226if (this._store.isDisposed) {227return Promise.reject(new CancellationError());228}229230const id = this._nextRequestId++;231232// Create the full JSON-RPC request233const jsonRpcRequest: MCP.JSONRPCRequest = {234jsonrpc: MCP.JSONRPC_VERSION,235id,236...request237};238239const promise = new DeferredPromise<MCP.ServerResult>();240// Store the pending request241this._pendingRequests.set(id, { promise });242// Set up cancellation243const cancelListener = token.onCancellationRequested(() => {244if (!promise.isSettled) {245this._pendingRequests.delete(id);246this.sendNotification({ method: 'notifications/cancelled', params: { requestId: id } });247promise.cancel();248}249cancelListener.dispose();250});251252// Send the request253this.send(jsonRpcRequest);254const ret = promise.p.finally(() => {255cancelListener.dispose();256this._pendingRequests.delete(id);257});258259return ret as Promise<R>;260}261262private send(mcp: MCP.JSONRPCMessage) {263if (canLog(this.logger.getLevel(), this._requestLogLevel)) { // avoid building the string if we don't need to264log(this.logger, this._requestLogLevel, `[editor -> server] ${JSON.stringify(mcp)}`);265}266267this._launch.send(mcp);268}269270/**271* Handles paginated requests by making multiple requests until all items are retrieved.272*273* @param method The method name to call274* @param getItems Function to extract the array of items from a result275* @param initialParams Initial parameters276* @param token Cancellation token277* @returns Promise with all items combined278*/279private async *sendRequestPaginated<T extends MCP.PaginatedRequest & MCP.ClientRequest, R extends MCP.PaginatedResult, I>(method: T['method'], getItems: (result: R) => I[], initialParams?: Omit<T['params'], 'jsonrpc' | 'id'>, token: CancellationToken = CancellationToken.None): AsyncIterable<I[]> {280let nextCursor: MCP.Cursor | undefined = undefined;281282do {283const params: T['params'] = {284...initialParams,285cursor: nextCursor286};287288const result: R = await this.sendRequest<T, R>({ method, params }, token);289yield getItems(result);290nextCursor = result.nextCursor;291} while (nextCursor !== undefined && !token.isCancellationRequested);292}293294private sendNotification<N extends MCP.ClientNotification>(notification: Omit<N, 'jsonrpc'>): void {295this.send({ ...notification, jsonrpc: MCP.JSONRPC_VERSION });296}297298/**299* Handle incoming messages from the server300*/301private handleMessage(message: MCP.JSONRPCMessage): void {302if (canLog(this.logger.getLevel(), this._requestLogLevel)) { // avoid building the string if we don't need to303log(this.logger, this._requestLogLevel, `[server -> editor] ${JSON.stringify(message)}`);304}305306// Handle responses to our requests307if ('id' in message) {308if ('result' in message) {309this.handleResult(message);310} else if ('error' in message) {311this.handleError(message);312}313}314315// Handle requests from the server316if ('method' in message) {317if ('id' in message) {318this.handleServerRequest(message as MCP.JSONRPCRequest & MCP.ServerRequest);319} else {320this.handleServerNotification(message as MCP.JSONRPCNotification & MCP.ServerNotification);321}322}323}324325/**326* Handle successful responses327*/328private handleResult(response: MCP.JSONRPCResultResponse): void {329if (response.id !== undefined) {330const request = this._pendingRequests.get(response.id);331if (request) {332this._pendingRequests.delete(response.id);333request.promise.complete(response.result);334}335}336}337338/**339* Handle error responses340*/341private handleError(response: MCP.JSONRPCErrorResponse): void {342if (response.id !== undefined) {343const request = this._pendingRequests.get(response.id);344if (request) {345this._pendingRequests.delete(response.id);346request.promise.error(new MpcResponseError(response.error.message, response.error.code, response.error.data));347}348}349}350351/**352* Handle incoming server requests353*/354private async handleServerRequest(request: MCP.JSONRPCRequest & MCP.ServerRequest): Promise<void> {355try {356let response: MCP.Result | undefined;357if (request.method === 'ping') {358response = this.handlePing(request);359} else if (request.method === 'roots/list') {360response = this.handleRootsList(request);361} else if (request.method === 'sampling/createMessage' && this._createMessageRequestHandler) {362// Check if this is a task-augmented request363if (request.params.task) {364const taskResult = this._taskManager.createTask(365request.params.task.ttl ?? null,366(token) => this._createMessageRequestHandler!(request.params, token)367);368taskResult._meta ??= {};369taskResult._meta['io.modelcontextprotocol/related-task'] = { taskId: taskResult.task.taskId };370response = taskResult;371} else {372response = await this._createMessageRequestHandler(request.params);373}374} else if (request.method === 'elicitation/create' && this._elicitationRequestHandler) {375// Check if this is a task-augmented request376if (request.params.task) {377const taskResult = this._taskManager.createTask(378request.params.task.ttl ?? null,379(token) => this._elicitationRequestHandler!(request.params, token)380);381taskResult._meta ??= {};382taskResult._meta['io.modelcontextprotocol/related-task'] = { taskId: taskResult.task.taskId };383response = taskResult;384} else {385response = await this._elicitationRequestHandler(request.params);386}387} else if (request.method === 'tasks/get') {388response = this._taskManager.getTask(request.params.taskId);389} else if (request.method === 'tasks/result') {390response = await this._taskManager.getTaskResult(request.params.taskId);391} else if (request.method === 'tasks/cancel') {392response = this._taskManager.cancelTask(request.params.taskId);393} else if (request.method === 'tasks/list') {394response = this._taskManager.listTasks();395} else {396throw McpError.methodNotFound(request.method);397}398this.respondToRequest(request, response);399} catch (e) {400if (!(e instanceof McpError)) {401this.logger.error(`Error handling request ${request.method}:`, e);402e = McpError.unknown(e);403}404405const errorResponse: MCP.JSONRPCErrorResponse = {406jsonrpc: MCP.JSONRPC_VERSION,407id: request.id,408error: {409code: e.code,410message: e.message,411data: e.data,412}413};414415this.send(errorResponse);416}417}418/**419* Handle incoming server notifications420*/421private handleServerNotification(request: MCP.JSONRPCNotification & MCP.ServerNotification): void {422switch (request.method) {423case 'notifications/message':424return this.handleLoggingNotification(request);425case 'notifications/cancelled':426this._onDidReceiveCancelledNotification.fire(request);427return this.handleCancelledNotification(request);428case 'notifications/progress':429this._onDidReceiveProgressNotification.fire(request);430return;431case 'notifications/resources/list_changed':432this._onDidChangeResourceList.fire();433return;434case 'notifications/resources/updated':435this._onDidUpdateResource.fire(request);436return;437case 'notifications/tools/list_changed':438this._onDidChangeToolList.fire();439return;440case 'notifications/prompts/list_changed':441this._onDidChangePromptList.fire();442return;443case 'notifications/elicitation/complete':444this._onDidReceiveElicitationCompleteNotification.fire(request);445return;446case 'notifications/tasks/status':447this._taskManager.getClientTask(request.params.taskId)?.onDidUpdateState(request.params);448return;449default:450softAssertNever(request);451}452}453454private handleCancelledNotification(request: MCP.CancelledNotification): void {455if (request.params.requestId) {456const pendingRequest = this._pendingRequests.get(request.params.requestId);457if (pendingRequest) {458this._pendingRequests.delete(request.params.requestId);459pendingRequest.promise.cancel();460}461}462}463464private handleLoggingNotification(request: MCP.LoggingMessageNotification): void {465translateMcpLogMessage(this.logger, request.params);466}467468/**469* Send a generic response to a request470*/471private respondToRequest(request: MCP.JSONRPCRequest, result: MCP.Result): void {472const response: MCP.JSONRPCResponse = {473jsonrpc: MCP.JSONRPC_VERSION,474id: request.id,475result476};477this.send(response);478}479480/**481* Send a response to a ping request482*/483private handlePing(_request: MCP.PingRequest): {} {484return {};485}486487/**488* Send a response to a roots/list request489*/490private handleRootsList(_request: MCP.ListRootsRequest): MCP.ListRootsResult {491this._hasAnnouncedRoots = true;492return { roots: this._roots };493}494495private cancelAllRequests() {496this._pendingRequests.forEach(pending => pending.promise.cancel());497this._pendingRequests.clear();498}499500public override dispose(): void {501this.cancelAllRequests();502super.dispose();503}504505/**506* Forwards log level changes to the MCP server if it supports logging507*/508private async _sendLogLevelToServer(logLevel: LogLevel): Promise<void> {509try {510// Only send if the server supports logging capabilities511if (!this.capabilities.logging) {512return;513}514515await this.setLevel({ level: mapLogLevelToMcp(logLevel) });516} catch (error) {517this.logger.error(`Failed to set MCP server log level: ${error}`);518}519}520521/**522* Send an initialize request523*/524initialize(params: MCP.InitializeRequest['params'], token?: CancellationToken): Promise<MCP.InitializeResult> {525return this.sendRequest<MCP.InitializeRequest, MCP.InitializeResult>({ method: 'initialize', params }, token);526}527528/**529* List available resources530*/531listResources(params?: MCP.ListResourcesRequest['params'], token?: CancellationToken): Promise<MCP.Resource[]> {532return Iterable.asyncToArrayFlat(this.listResourcesIterable(params, token));533}534535/**536* List available resources (iterable)537*/538listResourcesIterable(params?: MCP.ListResourcesRequest['params'], token?: CancellationToken): AsyncIterable<MCP.Resource[]> {539return this.sendRequestPaginated<MCP.ListResourcesRequest, MCP.ListResourcesResult, MCP.Resource>('resources/list', result => result.resources, params, token);540}541542/**543* Read a specific resource544*/545readResource(params: MCP.ReadResourceRequest['params'], token?: CancellationToken): Promise<MCP.ReadResourceResult> {546return this.sendRequest<MCP.ReadResourceRequest, MCP.ReadResourceResult>({ method: 'resources/read', params }, token);547}548549/**550* List available resource templates551*/552listResourceTemplates(params?: MCP.ListResourceTemplatesRequest['params'], token?: CancellationToken): Promise<MCP.ResourceTemplate[]> {553return Iterable.asyncToArrayFlat(this.sendRequestPaginated<MCP.ListResourceTemplatesRequest, MCP.ListResourceTemplatesResult, MCP.ResourceTemplate>('resources/templates/list', result => result.resourceTemplates, params, token));554}555556/**557* Subscribe to resource updates558*/559subscribe(params: MCP.SubscribeRequest['params'], token?: CancellationToken): Promise<MCP.EmptyResult> {560return this.sendRequest<MCP.SubscribeRequest, MCP.EmptyResult>({ method: 'resources/subscribe', params }, token);561}562563/**564* Unsubscribe from resource updates565*/566unsubscribe(params: MCP.UnsubscribeRequest['params'], token?: CancellationToken): Promise<MCP.EmptyResult> {567return this.sendRequest<MCP.UnsubscribeRequest, MCP.EmptyResult>({ method: 'resources/unsubscribe', params }, token);568}569570/**571* List available prompts572*/573listPrompts(params?: MCP.ListPromptsRequest['params'], token?: CancellationToken): Promise<MCP.Prompt[]> {574return Iterable.asyncToArrayFlat(this.sendRequestPaginated<MCP.ListPromptsRequest, MCP.ListPromptsResult, MCP.Prompt>('prompts/list', result => result.prompts, params, token));575}576577/**578* Get a specific prompt579*/580getPrompt(params: MCP.GetPromptRequest['params'], token?: CancellationToken): Promise<MCP.GetPromptResult> {581return this.sendRequest<MCP.GetPromptRequest, MCP.GetPromptResult>({ method: 'prompts/get', params }, token);582}583584/**585* List available tools586*/587listTools(params?: MCP.ListToolsRequest['params'], token?: CancellationToken): Promise<MCP.Tool[]> {588return Iterable.asyncToArrayFlat(this.sendRequestPaginated<MCP.ListToolsRequest, MCP.ListToolsResult, MCP.Tool>('tools/list', result => result.tools, params, token));589}590591/**592* Call a specific tool. Supports tasks automatically if `task` is set on the request.593*/594async callTool(params: MCP.CallToolRequest['params'] & MCP.Request['params'], token?: CancellationToken): Promise<MCP.CallToolResult> {595const response = await this.sendRequest<MCP.CallToolRequest, MCP.CallToolResult | MCP.CreateTaskResult>({ method: 'tools/call', params }, token);596597if (isTaskResult(response)) {598const task = new McpTask<MCP.CallToolResult>(response.task, token);599this._taskManager.adoptClientTask(task);600task.setHandler(this);601return task.result.finally(() => {602this._taskManager.abandonClientTask(task.id);603});604}605606return response;607608}609610/**611* Set the logging level612*/613setLevel(params: MCP.SetLevelRequest['params'], token?: CancellationToken): Promise<MCP.EmptyResult> {614return this.sendRequest<MCP.SetLevelRequest, MCP.EmptyResult>({ method: 'logging/setLevel', params }, token);615}616617/**618* Find completions for an argument619*/620complete(params: MCP.CompleteRequest['params'], token?: CancellationToken): Promise<MCP.CompleteResult> {621return this.sendRequest<MCP.CompleteRequest, MCP.CompleteResult>({ method: 'completion/complete', params }, token);622}623624/**625* Get task status626*/627getTask(params: { taskId: string }, token?: CancellationToken): Promise<MCP.GetTaskResult> {628return this.sendRequest<MCP.GetTaskRequest, MCP.GetTaskResult>({ method: 'tasks/get', params }, token);629}630631/**632* Get task result633*/634getTaskResult(params: { taskId: string }, token?: CancellationToken): Promise<MCP.GetTaskPayloadResult> {635return this.sendRequest<MCP.GetTaskPayloadRequest, MCP.GetTaskPayloadResult>({ method: 'tasks/result', params }, token);636}637638/**639* Cancel a task640*/641cancelTask(params: { taskId: string }, token?: CancellationToken): Promise<MCP.CancelTaskResult> {642return this.sendRequest<MCP.CancelTaskRequest, MCP.CancelTaskResult>({ method: 'tasks/cancel', params }, token);643}644645/**646* List all tasks647*/648listTasks(params?: MCP.ListTasksRequest['params'], token?: CancellationToken): Promise<MCP.Task[]> {649return Iterable.asyncToArrayFlat(650this.sendRequestPaginated<MCP.ListTasksRequest, MCP.ListTasksResult, MCP.Task>(651'tasks/list', result => result.tasks, params, token652)653);654}655}656657function isTaskInTerminalState(task: MCP.Task): boolean {658return task.status === 'completed' || task.status === 'failed' || task.status === 'cancelled';659}660661/**662* Implementation of a task that handles polling, status notifications, and handler reconnections. It implements the task polling loop internally and can also be663* updated externally via `onDidUpdateState`, when notifications are received664* for example.665* @internal666*/667export class McpTask<T extends MCP.Result> extends Disposable implements IMcpTaskInternal {668private readonly promise = new DeferredPromise<T>();669670public get result(): Promise<T> {671return this.promise.p;672}673674public get id() {675return this._task.taskId;676}677678private _lastTaskState: ISettableObservable<MCP.Task>;679private _handler = observableValue<McpServerRequestHandler | undefined>('mcpTaskHandler', undefined);680681constructor(682private readonly _task: MCP.Task,683_token: CancellationToken = CancellationToken.None684) {685super();686687const expiresAt = _task.ttl ? (Date.now() + _task.ttl) : undefined;688this._lastTaskState = observableValue('lastTaskState', this._task);689690const store = this._register(new DisposableStore());691692// Handle external cancellation token693if (_token.isCancellationRequested) {694this._lastTaskState.set({ ...this._task, status: 'cancelled' }, undefined);695} else {696store.add(_token.onCancellationRequested(() => {697const current = this._lastTaskState.get();698if (!isTaskInTerminalState(current)) {699this._lastTaskState.set({ ...current, status: 'cancelled' }, undefined);700}701}));702}703704// Handle TTL expiration with an explicit timeout705if (expiresAt) {706const ttlTimeout = expiresAt - Date.now();707if (ttlTimeout <= 0) {708this._lastTaskState.set({ ...this._task, status: 'cancelled', statusMessage: 'Task timed out.' }, undefined);709} else {710store.add(disposableTimeout(() => {711const current = this._lastTaskState.get();712if (!isTaskInTerminalState(current)) {713this._lastTaskState.set({ ...current, status: 'cancelled', statusMessage: 'Task timed out.' }, undefined);714}715}, ttlTimeout));716}717}718719// A `tasks/result` call triggered by an input_required state.720const inputRequiredLookup = observableValue<ObservablePromise<MCP.Task> | undefined>('activeResultLookup', undefined);721722// 1. Poll for task updates when the task isn't in a terminal state723store.add(autorun(reader => {724const current = this._lastTaskState.read(reader);725if (isTaskInTerminalState(current)) {726return;727}728729// When a task goes into the input_required state, by spec we should call730// `tasks/result` which can return an SSE stream of task updates. No need731// to poll while such a lookup is going on, but once it resolves we should732// clear and update our state.733const lookup = inputRequiredLookup.read(reader);734if (lookup) {735const result = lookup.promiseResult.read(reader);736return transaction(tx => {737if (!result) {738// still ongoing739} else if (result.data) {740inputRequiredLookup.set(undefined, tx);741this._lastTaskState.set(result.data, tx);742} else {743inputRequiredLookup.set(undefined, tx);744if (result.error instanceof McpError && result.error.code === MCP.INVALID_PARAMS) {745this._lastTaskState.set({ ...current, status: 'cancelled' }, undefined);746} else {747// Maybe a connection error -- start polling again748this._lastTaskState.set({ ...current, status: 'working' }, undefined);749}750}751});752}753754const handler = this._handler.read(reader);755if (!handler) {756return;757}758759const pollInterval = _task.pollInterval ?? 2000;760const cts = new CancellationTokenSource(_token);761reader.store.add(toDisposable(() => cts.dispose(true)));762reader.store.add(disposableTimeout(() => {763handler.getTask({ taskId: current.taskId }, cts.token)764.catch((e): MCP.Task | undefined => {765if (e instanceof McpError && e.code === MCP.INVALID_PARAMS) {766return { ...current, status: 'cancelled' };767} else {768return { ...current }; // errors are already logged, keep in current state769}770})771.then(r => {772if (r && !cts.token.isCancellationRequested) {773this._lastTaskState.set(r, undefined);774}775});776}, pollInterval));777}));778779// 2. Get the result once it's available (or propagate errors). Trigger780// input_required handling as needed. Only react when the status itself changes.781const lastStatus = this._lastTaskState.map(task => task.status);782store.add(autorun(reader => {783const status = lastStatus.read(reader);784if (status === 'failed') {785const current = this._lastTaskState.read(undefined);786this.promise.error(new Error(`Task ${current.taskId} failed: ${current.statusMessage ?? 'unknown error'}`));787store.dispose();788} else if (status === 'cancelled') {789this.promise.cancel();790store.dispose();791} else if (status === 'input_required') {792const handler = this._handler.read(reader);793if (handler) {794const current = this._lastTaskState.read(undefined);795const cts = new CancellationTokenSource(_token);796reader.store.add(toDisposable(() => cts.dispose(true)));797inputRequiredLookup.set(new ObservablePromise<MCP.Task>(handler.getTask({ taskId: current.taskId }, cts.token)), undefined);798}799} else if (status === 'completed') {800const handler = this._handler.read(reader);801if (handler) {802this.promise.settleWith(handler.getTaskResult({ taskId: _task.taskId }, _token) as Promise<T>);803store.dispose();804}805} else if (status === 'working') {806// no-op807} else {808softAssertNever(status);809}810}));811}812813onDidUpdateState(task: MCP.Task) {814this._lastTaskState.set(task, undefined);815}816817setHandler(handler: McpServerRequestHandler | undefined): void {818this._handler.set(handler, undefined);819}820}821822/**823* Maps VSCode LogLevel to MCP LoggingLevel824*/825function mapLogLevelToMcp(logLevel: LogLevel): MCP.LoggingLevel {826switch (logLevel) {827case LogLevel.Trace:828return 'debug'; // MCP doesn't have trace, use debug829case LogLevel.Debug:830return 'debug';831case LogLevel.Info:832return 'info';833case LogLevel.Warning:834return 'warning';835case LogLevel.Error:836return 'error';837case LogLevel.Off:838return 'emergency'; // MCP doesn't have off, use emergency839default:840return assertNever(logLevel); // Off and other levels are not supported841}842}843844845