Path: blob/main/extensions/copilot/src/platform/networking/node/chatWebSocketManager.ts
13400 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import type { OpenAI } from 'openai';6import { CloseEvent, ErrorEvent } from 'undici';7import { createServiceIdentifier } from '../../../util/common/services';8import { CancellationToken } from '../../../util/vs/base/common/cancellation';9import { CancellationError } from '../../../util/vs/base/common/errors';10import { Emitter, Event } from '../../../util/vs/base/common/event';11import { Disposable, IDisposable } from '../../../util/vs/base/common/lifecycle';12import { QuotaSnapshots } from '../../chat/common/chatQuotaService';13import { ConfigKey, IConfigurationService } from '../../configuration/common/configurationService';14import { ICAPIClientService } from '../../endpoint/common/capiClient';15import { ILogService, collectSingleLineErrorMessage } from '../../log/common/logService';16import { ITelemetryService } from '../../telemetry/common/telemetry';17import { HeadersImpl, IHeaders, WebSocketConnection } from '../common/fetcherService';18import { IEndpointBody } from '../common/networking';19import { getResponsesApiCompactionThresholdFromBody } from '../../endpoint/node/responsesApi';20import { ChatWebSocketRequestOutcome, ChatWebSocketTelemetrySender } from './chatWebSocketTelemetry';2122export const IChatWebSocketManager = createServiceIdentifier<IChatWebSocketManager>('IChatWebSocketManager');2324export interface IChatWebSocketManager {25readonly _serviceBrand: undefined;2627/**28* Gets or creates a WebSocket connection for the given conversation.29* The connection is shared across turns and tool call rounds within30* the same conversation, keeping server-side context alive.31*/32getOrCreateConnection(conversationId: string, headers: Record<string, string>, initiatingRequestId: string): IChatWebSocketConnection;3334/**35* Returns true if there is an open WebSocket connection for the given36* conversation. Used to decide whether the server already has context37* from earlier requests in this conversation.38*/39hasActiveConnection(conversationId: string): boolean;4041/**42* Returns the stateful marker (last completed response ID) for the given43* conversation's active WebSocket connection, or undefined if there is44* no active connection or no marker yet.45*/46getStatefulMarker(conversationId: string): string | undefined;4748/**49* Returns the round ID at which the last client-side summarization50* occurred for this connection, or undefined if none.51*/52getSummarizedAtRoundId(conversationId: string): string | undefined;5354/**55* Closes and removes the connection for a specific conversation.56*/57closeConnection(conversationId: string): void;5859/**60* Closes all active connections.61*/62closeAll(): void;63}6465/**66* No-op implementation for contexts where WebSocket is not available (web, tests, chat-lib).67*/68export class NullChatWebSocketManager implements IChatWebSocketManager {69declare readonly _serviceBrand: undefined;70getOrCreateConnection(_conversationId: string, _headers?: Record<string, string>, _initiatingRequestId?: string): IChatWebSocketConnection {71throw new Error('WebSocket not available');72}73hasActiveConnection(_conversationId: string): boolean { return false; }74getStatefulMarker(_conversationId: string): string | undefined { return undefined; }75getSummarizedAtRoundId(_conversationId: string): string | undefined { return undefined; }76closeConnection(_conversationId: string): void { }77closeAll(): void { }78}7980export interface IChatWebSocketRequestOptions {81userInitiated: boolean;82turnId: string;83requestId: string;84model: string;85countTokens: () => Promise<number>;86tokenCountMax: number;87modelMaxPromptTokens: number;88summarizedAtRoundId?: string;89modeChanged?: boolean;90}9192export interface IChatWebSocketConnection extends IDisposable {93/** Opens the WebSocket connection. Must be called before sendRequest. */94connect(): Promise<void>;9596/** Sends a response.create request and returns an async iterable of response events. */97sendRequest(98body: IEndpointBody,99options: IChatWebSocketRequestOptions,100token: CancellationToken,101): IChatWebSocketRequestHandle;102103/** Whether the connection is currently open and usable. */104readonly isOpen: boolean;105106/** Response headers from the WebSocket connection handshake. */107readonly responseHeaders: IHeaders;108109/** Response status code from the WebSocket connection handshake. */110readonly responseStatusCode: number | undefined;111112/** Response status text from the WebSocket connection handshake. */113readonly responseStatusText: string | undefined;114115/** The GitHub request ID from response headers. */116readonly gitHubRequestId: string;117118/**119* The response.id from the last completed response on this connection.120* Used as `previous_response_id` on subsequent requests to avoid121* re-sending the full message history.122*/123readonly statefulMarker: string | undefined;124}125126export interface IChatWebSocketRequestHandle {127/** Fires for each OpenAI stream event received from the server. */128readonly onEvent: Event<OpenAI.Responses.ResponseStreamEvent>;129/** Fires when a CAPI WebSocket error is received (nested error shape). */130readonly onCAPIError: Event<CAPIWebSocketErrorEvent>;131/** Fires when a transport-level error occurs (connection lost, etc.). */132readonly onError: Event<Error>;133/**134* Resolves with the first event received from the server, or rejects135* if the connection errors/closes before any event arrives.136* Consumers can inspect the event type to decide the response kind137* (success stream vs. CAPI error) before processing remaining events.138*/139readonly firstEvent: Promise<OpenAI.Responses.ResponseStreamEvent | CAPIWebSocketErrorEvent>;140/** Resolves when the request has finished (completed or errored). */141readonly done: Promise<void>;142}143144/**145* CAPI WebSocket error shape. Unlike the OpenAI SDK's flat `ResponseErrorEvent`146* (`{ type: "error", code, message }`), CAPI wraps the error details in a147* nested `error` object: `{ type: "error", error: { code, message } }`.148*149* Non-recoverable errors (rate limits, quota, upstream failures) also include150* `copilot_quota_snapshots` with per-model quota state.151*/152export interface CAPIWebSocketErrorEvent {153readonly type: 'error';154readonly error: {155readonly code: string;156readonly message: string;157};158readonly copilot_quota_snapshots?: QuotaSnapshots;159}160161export function isCAPIWebSocketError(event: OpenAI.Responses.ResponseStreamEvent | CAPIWebSocketErrorEvent): event is CAPIWebSocketErrorEvent {162return event.type === 'error' && 'error' in event && typeof (event as CAPIWebSocketErrorEvent).error?.code === 'string';163}164165const streamTerminatingOutcomes: Readonly<Record<string, ChatWebSocketRequestOutcome>> = {166'response.completed': 'completed',167'response.failed': 'response_failed',168'response.incomplete': 'response_incomplete',169'response.cancelled': 'response_cancelled',170'error': 'upstream_error',171};172173function getStreamTerminatingOutcome(event: OpenAI.Responses.ResponseStreamEvent | CAPIWebSocketErrorEvent): ChatWebSocketRequestOutcome | undefined {174return streamTerminatingOutcomes[event.type];175}176177export class ChatWebSocketManager extends Disposable implements IChatWebSocketManager {178declare readonly _serviceBrand: undefined;179180private readonly _connections = new Map<string, ChatWebSocketConnection>();181182constructor(183@ILogService private readonly _logService: ILogService,184@ICAPIClientService private readonly _capiClientService: ICAPIClientService,185@ITelemetryService private readonly _telemetryService: ITelemetryService,186@IConfigurationService private readonly _configurationService: IConfigurationService,187) {188super();189}190191getOrCreateConnection(conversationId: string, headers: Record<string, string>, initiatingRequestId: string): IChatWebSocketConnection {192const existing = this._connections.get(conversationId);193194// Reuse the connection if it's still open, even across turns.195if (existing?.isOpen) {196return existing;197}198199if (existing) {200this._logService.debug(`[ChatWebSocketManager] Replacing closed connection for conversation ${conversationId}`);201existing.dispose();202this._connections.delete(conversationId);203}204205const connection = new ChatWebSocketConnection(this._capiClientService, this._logService, this._telemetryService, this._configurationService, conversationId, headers, initiatingRequestId);206this._logService.debug(`[ChatWebSocketManager] Creating new connection for conversation ${conversationId}`);207this._connections.set(conversationId, connection);208209// Remove from map when disposed externally210connection.onDidDispose(() => {211const entry = this._connections.get(conversationId);212if (entry === connection) {213this._connections.delete(conversationId);214}215});216217return connection;218}219220hasActiveConnection(conversationId: string): boolean {221const connection = this._connections.get(conversationId);222return !!connection?.isOpen;223}224225getStatefulMarker(conversationId: string): string | undefined {226const connection = this._connections.get(conversationId);227return connection?.isOpen ? connection.statefulMarker : undefined;228}229230getSummarizedAtRoundId(conversationId: string): string | undefined {231const connection = this._connections.get(conversationId);232return connection?.isOpen ? connection.summarizedAtRoundId : undefined;233}234235closeConnection(conversationId: string): void {236const connection = this._connections.get(conversationId);237if (connection) {238if (connection.hasActiveRequest) {239this._logService.warn(`[ChatWebSocketManager] Closing connection for conversation ${conversationId} while turn ${connection.turnId} still has an active request`);240} else {241this._logService.debug(`[ChatWebSocketManager] Closing connection for conversation ${conversationId}`);242}243connection.dispose();244this._connections.delete(conversationId);245}246}247248closeAll(): void {249for (const connection of this._connections.values()) {250connection.dispose();251}252this._connections.clear();253}254255override dispose(): void {256this.closeAll();257super.dispose();258}259}260261const enum ConnectionState {262Connecting,263Open,264Closed,265}266267function wsCloseCodeToString(code: number): string {268switch (code) {269case 1000: return 'Normal Closure';270case 1001: return 'Going Away';271case 1002: return 'Protocol Error';272case 1003: return 'Unsupported Data';273case 1005: return 'No Status Received';274case 1006: return 'Abnormal Closure';275case 1007: return 'Invalid Payload';276case 1008: return 'Policy Violation';277case 1009: return 'Message Too Big';278case 1010: return 'Missing Extension';279case 1011: return 'Internal Error';280case 1012: return 'Service Restart';281case 1013: return 'Try Again Later';282case 1014: return 'Bad Gateway';283case 1015: return 'TLS Handshake Failed';284default: return 'Unknown';285}286}287288class ChatWebSocketConnection extends Disposable implements IChatWebSocketConnection {289private _ws: WebSocket | undefined;290private _state: ConnectionState = ConnectionState.Closed;291private _activeRequest: ChatWebSocketActiveRequest | undefined;292private _statefulMarker: string | undefined;293private _summarizedAtRoundId: string | undefined;294295private readonly _onDidDispose = this._register(new Emitter<void>());296readonly onDidDispose = this._onDidDispose.event;297298private _connectStartTime: number | undefined;299private _connectedTime: number | undefined;300private _pendingErrorMessage: string | undefined;301private _totalSentMessageCount = 0;302private _totalReceivedMessageCount = 0;303private _totalSentCharacters = 0;304private _totalReceivedCharacters = 0;305private _responseHeaders: IHeaders = new HeadersImpl({});306private _responseStatusCode: number | undefined;307private _responseStatusText: string | undefined;308private _previousTurnId: string | undefined;309private _turnId: string | undefined;310private _hadActiveRequest = false;311312constructor(313private readonly _capiClientService: ICAPIClientService,314private readonly _logService: ILogService,315private readonly _telemetryService: ITelemetryService,316private readonly _configurationService: IConfigurationService,317private readonly _conversationId: string,318private readonly _headers: Record<string, string>,319private readonly _initiatingRequestId: string,320) {321super();322}323324get isOpen(): boolean {325return this._state === ConnectionState.Open && !!this._ws;326}327328get hasActiveRequest(): boolean {329return !!this._activeRequest;330}331332get turnId(): string | undefined {333return this._turnId;334}335336get statefulMarker(): string | undefined {337return this._statefulMarker;338}339340get summarizedAtRoundId(): string | undefined {341return this._summarizedAtRoundId;342}343344get responseHeaders(): IHeaders {345return this._responseHeaders;346}347348get responseStatusCode(): number | undefined {349return this._responseStatusCode;350}351352get responseStatusText(): string | undefined {353return this._responseStatusText;354}355356get gitHubRequestId(): string {357return this._responseHeaders.get('x-github-request-id') || '';358}359360async connect(): Promise<void> {361if (this._state === ConnectionState.Open) {362return;363}364365this._state = ConnectionState.Connecting;366this._connectStartTime = Date.now();367this._logService.debug(`[ChatWebSocketManager] Connecting WebSocket for conversation ${this._conversationId}`);368369const connection: WebSocketConnection = await this._capiClientService.createResponsesWebSocket({370headers: this._headers,371});372373return new Promise<void>((resolve, reject) => {374const ws = connection.webSocket;375376const onOpen = () => {377cleanup();378this._state = ConnectionState.Open;379this._connectedTime = Date.now();380this._ws = ws;381this._responseHeaders = connection.responseHeaders;382this._responseStatusCode = connection.responseStatusCode;383this._responseStatusText = connection.responseStatusText;384this._setupMessageHandlers(ws);385const connectDurationMs = this._connectedTime - (this._connectStartTime ?? this._connectedTime);386this._logService.debug(`[ChatWebSocketManager] Connected for conversation ${this._conversationId}`);387ChatWebSocketTelemetrySender.sendConnectedTelemetry(this._telemetryService, {388conversationId: this._conversationId,389initiatingRequestId: this._initiatingRequestId,390gitHubRequestId: this.gitHubRequestId,391connectDurationMs,392});393resolve();394};395396const onError = (event: ErrorEvent) => {397cleanup();398this._state = ConnectionState.Closed;399this._responseHeaders = connection.responseHeaders;400this._responseStatusCode = connection.responseStatusCode;401this._responseStatusText = connection.responseStatusText;402const errorMessage = event.error ? `${event.message}: ${collectSingleLineErrorMessage(event.error)}` : event.message || 'WebSocket error';403const networkError = event.error?.cause ?? connection.networkError;404const networkErrorMessage = networkError ? collectSingleLineErrorMessage(networkError) : undefined;405const connectDurationMs = Date.now() - (this._connectStartTime ?? Date.now());406this._logService.error(`[ChatWebSocketManager] Connection error for conversation ${this._conversationId}: ${errorMessage}${networkErrorMessage ? ` (cause: ${networkErrorMessage})` : ''}`);407ChatWebSocketTelemetrySender.sendConnectErrorTelemetry(this._telemetryService, {408conversationId: this._conversationId,409initiatingRequestId: this._initiatingRequestId,410gitHubRequestId: this.gitHubRequestId,411error: errorMessage,412connectDurationMs,413responseStatusCode: this._responseStatusCode,414responseStatusText: this._responseStatusText,415networkError: networkErrorMessage,416});417reject(new Error(errorMessage));418};419420const onClose = (event: CloseEvent) => {421cleanup();422this._state = ConnectionState.Closed;423this._responseHeaders = connection.responseHeaders;424this._responseStatusCode = connection.responseStatusCode;425this._responseStatusText = connection.responseStatusText;426const connectDurationMs = Date.now() - (this._connectStartTime ?? Date.now());427const closeCodeDescription = wsCloseCodeToString(event.code);428this._logService.debug(`[ChatWebSocketManager] Connection closed during setup for conversation ${this._conversationId} (code: ${event.code} ${closeCodeDescription}, reason: ${event.reason || '<empty>'}, wasClean: ${event.wasClean})`);429ChatWebSocketTelemetrySender.sendCloseDuringSetupTelemetry(this._telemetryService, {430conversationId: this._conversationId,431initiatingRequestId: this._initiatingRequestId,432gitHubRequestId: this.gitHubRequestId,433closeCode: event.code,434closeReason: closeCodeDescription,435closeEventReason: event.reason,436closeEventWasClean: String(event.wasClean),437connectDurationMs,438});439reject(new Error('WebSocket closed during connection setup'));440};441442const cleanup = () => {443ws.removeEventListener('open', onOpen);444ws.removeEventListener('error', onError);445ws.removeEventListener('close', onClose);446};447448ws.addEventListener('open', onOpen);449ws.addEventListener('error', onError);450ws.addEventListener('close', onClose);451});452}453454private _setupMessageHandlers(ws: WebSocket): void {455ws.addEventListener('message', (event) => {456if (typeof event.data !== 'string') {457return; // Only process text messages458}459460const receivedMessageCharacters = event.data.length;461this._totalReceivedMessageCount += 1;462this._totalReceivedCharacters += receivedMessageCharacters;463const connectionDurationMs = Date.now() - (this._connectedTime ?? Date.now());464465let parsed: OpenAI.Responses.ResponseStreamEvent | CAPIWebSocketErrorEvent;466try {467parsed = JSON.parse(event.data);468} catch (error) {469const parseErrorMessage = collectSingleLineErrorMessage(error) || 'Failed to parse websocket message';470this._logService.error(`[ChatWebSocketManager] Failed to parse message for conversation ${this._conversationId} turn ${this._turnId}: ${parseErrorMessage}`);471ChatWebSocketTelemetrySender.sendMessageParseErrorTelemetry(this._telemetryService, {472conversationId: this._conversationId,473initiatingRequestId: this._initiatingRequestId,474turnId: this._turnId,475previousTurnId: this._previousTurnId,476hadActiveRequest: this._hadActiveRequest,477requestId: this._activeRequest?.requestId,478gitHubRequestId: this.gitHubRequestId,479modelId: this._activeRequest?.modelId,480error: parseErrorMessage,481connectionDurationMs,482totalSentMessageCount: this._totalSentMessageCount,483totalReceivedMessageCount: this._totalReceivedMessageCount,484receivedMessageCharacters,485totalSentCharacters: this._totalSentCharacters,486totalReceivedCharacters: this._totalReceivedCharacters,487});488return;489}490491if (!isCAPIWebSocketError(parsed) && parsed.type === 'response.completed') {492this._statefulMarker = parsed.response.id;493this._summarizedAtRoundId = this._activeRequest?.summarizedAtRoundId;494}495496this._activeRequest?.handleEvent(parsed);497});498499ws.addEventListener('close', (event) => {500this._state = ConnectionState.Closed;501const connectionDurationMs = Date.now() - (this._connectedTime ?? Date.now());502const closeCodeDescription = wsCloseCodeToString(event.code);503this._logService.debug(`[ChatWebSocketManager] Connection closed for conversation ${this._conversationId} turn ${this._turnId} (code: ${event.code} ${closeCodeDescription}, reason: ${event.reason || '<empty>'}, wasClean: ${event.wasClean})`);504ChatWebSocketTelemetrySender.sendCloseTelemetry(this._telemetryService, {505conversationId: this._conversationId,506initiatingRequestId: this._initiatingRequestId,507turnId: this._turnId,508previousTurnId: this._previousTurnId,509hadActiveRequest: this._hadActiveRequest,510requestId: this._activeRequest?.requestId,511gitHubRequestId: this.gitHubRequestId,512modelId: this._activeRequest?.modelId,513closeCode: event.code,514closeReason: closeCodeDescription,515closeEventReason: event.reason,516closeEventWasClean: String(event.wasClean),517connectionDurationMs,518totalSentMessageCount: this._totalSentMessageCount,519totalReceivedMessageCount: this._totalReceivedMessageCount,520totalSentCharacters: this._totalSentCharacters,521totalReceivedCharacters: this._totalReceivedCharacters,522});523const errorMessage = this._pendingErrorMessage;524this._pendingErrorMessage = undefined;525this._activeRequest?.handleConnectionClose(event.code, event.reason, errorMessage);526this._activeRequest = undefined;527});528529ws.addEventListener('error', (event) => {530const errorMessage = event.error ? `${event.message}: ${collectSingleLineErrorMessage(event.error)}` : event.message || 'WebSocket error';531const connectionDurationMs = Date.now() - (this._connectedTime ?? Date.now());532this._logService.error(`[ChatWebSocketManager] Error for conversation ${this._conversationId} turn ${this._turnId}: ${errorMessage}`);533ChatWebSocketTelemetrySender.sendErrorTelemetry(this._telemetryService, {534conversationId: this._conversationId,535initiatingRequestId: this._initiatingRequestId,536turnId: this._turnId,537previousTurnId: this._previousTurnId,538hadActiveRequest: this._hadActiveRequest,539requestId: this._activeRequest?.requestId,540gitHubRequestId: this.gitHubRequestId,541modelId: this._activeRequest?.modelId,542error: errorMessage,543connectionDurationMs,544totalSentMessageCount: this._totalSentMessageCount,545totalReceivedMessageCount: this._totalReceivedMessageCount,546totalSentCharacters: this._totalSentCharacters,547totalReceivedCharacters: this._totalReceivedCharacters,548});549this._pendingErrorMessage ??= errorMessage;550});551}552553sendRequest(body: IEndpointBody, options: IChatWebSocketRequestOptions, token: CancellationToken): IChatWebSocketRequestHandle {554if (!this._ws || this._state !== ConnectionState.Open) {555throw new Error('WebSocket is not connected');556}557558const statefulMarkerMatched = this._statefulMarker === body.previous_response_id;559const previousResponseIdUnset = body.previous_response_id === undefined;560const hasCompactionData = body.input?.some(item => item?.type === 'compaction') ?? false;561const summarizedAtRoundIdSet = options.summarizedAtRoundId !== undefined;562const summarizedAtRoundIdMatched = options.summarizedAtRoundId === this._summarizedAtRoundId;563const compactionThreshold = getResponsesApiCompactionThresholdFromBody(body);564const statefulMarkerPrefix = this._statefulMarker?.slice(0, 5).concat('...') ?? '<none>';565const previousResponsePrefix = body.previous_response_id?.slice(0, 5).concat('...') ?? '<none>';566if (statefulMarkerMatched) {567this._logService.trace(`[ChatWebSocketManager] WebSocket stateful marker matches previous_response_id (${previousResponsePrefix}), summarizedAtRoundIdMatched: ${summarizedAtRoundIdMatched}`);568} else {569this._logService.debug(`[ChatWebSocketManager] WebSocket stateful marker (${statefulMarkerPrefix}) does not match previous_response_id (${previousResponsePrefix}), summarizedAtRoundIdMatched: ${summarizedAtRoundIdMatched}`);570}571572// Supersede any in-flight request before updating turn state573const hadActiveRequest = !!this._activeRequest;574if (hadActiveRequest) {575this._logService.warn(`[ChatWebSocketManager] New request for conversation ${this._conversationId} turn ${options.turnId} while turn ${this._turnId} still has an active request`);576this._activeRequest!.handleSuperseded();577} else {578this._logService.debug(`[ChatWebSocketManager] New request for conversation ${this._conversationId} turn ${options.turnId} (previous turn: ${this._turnId})`);579}580581// Update turn state after superseding so the old request's settle582// callback (which fires synchronously from handleSuperseded) still583// sees its own turnId on `this`.584const previousTurnId = this._turnId;585const turnId = options.turnId;586this._previousTurnId = previousTurnId;587this._turnId = turnId;588this._hadActiveRequest = hadActiveRequest;589590const requestId = options.requestId;591592const requestStartTime = Date.now();593const requestStartSentMessageCount = this._totalSentMessageCount;594const requestStartReceivedMessageCount = this._totalReceivedMessageCount;595const requestStartSentCharacters = this._totalSentCharacters;596const requestStartReceivedCharacters = this._totalReceivedCharacters;597const promptTokenCountPromise = options.countTokens();598let promptTokenCount = -1;599promptTokenCountPromise.then(count => { promptTokenCount = count; }, () => { promptTokenCount = -2; });600const request = new ChatWebSocketActiveRequest(requestId, options.model, options.summarizedAtRoundId, this._configurationService, this._logService);601request.onDidSettle(({ outcome, closeCode, closeReason, serverErrorMessage, serverErrorCode }) => {602if (this._activeRequest === request) {603this._activeRequest = undefined;604}605const connectionDurationMs = Date.now() - (this._connectedTime ?? Date.now());606const requestDurationMs = Date.now() - requestStartTime;607const requestSentMessageCount = this._totalSentMessageCount - requestStartSentMessageCount;608const requestReceivedMessageCount = this._totalReceivedMessageCount - requestStartReceivedMessageCount;609const requestSentCharacters = this._totalSentCharacters - requestStartSentCharacters;610const requestReceivedCharacters = this._totalReceivedCharacters - requestStartReceivedCharacters;611ChatWebSocketTelemetrySender.sendRequestOutcomeTelemetry(this._telemetryService, {612conversationId: this._conversationId,613initiatingRequestId: this._initiatingRequestId,614turnId,615previousTurnId,616hadActiveRequest,617requestId,618gitHubRequestId: this.gitHubRequestId,619modelId: options.model,620requestOutcome: outcome,621statefulMarkerMatched,622previousResponseIdUnset,623hasCompactionData,624summarizedAtRoundIdSet,625summarizedAtRoundIdMatched,626modeChanged: options.modeChanged,627compactionThreshold,628promptTokenCount,629tokenCountMax: options.tokenCountMax,630modelMaxPromptTokens: options.modelMaxPromptTokens,631connectionDurationMs,632requestDurationMs,633totalSentMessageCount: this._totalSentMessageCount,634totalReceivedMessageCount: this._totalReceivedMessageCount,635totalSentCharacters: this._totalSentCharacters,636totalReceivedCharacters: this._totalReceivedCharacters,637requestSentMessageCount,638requestReceivedMessageCount,639requestSentCharacters,640requestReceivedCharacters,641closeCode,642closeReason,643serverErrorMessage,644serverErrorCode,645});646});647this._activeRequest = request;648649// Handle cancellation650const cancelDisposable = token.onCancellationRequested(() => {651if (this._activeRequest === request) {652request.handleCancellation();653this._activeRequest = undefined;654}655});656request.done.finally(() => cancelDisposable.dispose()).catch(() => { });657658const { stream: _, ...rest } = body;659const message = {660type: 'response.create' as const,661...rest,662initiator: options.userInitiated ? 'user' : 'agent',663};664const serializedMessage = JSON.stringify(message);665const sentMessageCharacters = serializedMessage.length;666this._totalSentMessageCount += 1;667this._totalSentCharacters += sentMessageCharacters;668669const connectionDurationMs = Date.now() - (this._connectedTime ?? Date.now());670this._logService.debug(`[ChatWebSocketManager] Sending request for conversation ${this._conversationId} turn ${this._turnId} (totalSentMessageCount: ${this._totalSentMessageCount}, sentMessageCharacters: ${sentMessageCharacters})`);671ChatWebSocketTelemetrySender.sendRequestSentTelemetry(this._telemetryService, {672conversationId: this._conversationId,673initiatingRequestId: this._initiatingRequestId,674turnId,675previousTurnId,676hadActiveRequest,677requestId,678gitHubRequestId: this.gitHubRequestId,679modelId: options.model,680statefulMarkerMatched,681previousResponseIdUnset,682hasCompactionData,683summarizedAtRoundIdSet,684summarizedAtRoundIdMatched,685modeChanged: options.modeChanged,686compactionThreshold,687tokenCountMax: options.tokenCountMax,688modelMaxPromptTokens: options.modelMaxPromptTokens,689connectionDurationMs,690totalSentMessageCount: this._totalSentMessageCount,691totalReceivedMessageCount: this._totalReceivedMessageCount,692sentMessageCharacters,693totalSentCharacters: this._totalSentCharacters,694totalReceivedCharacters: this._totalReceivedCharacters,695});696this._ws.send(serializedMessage);697698return request;699}700701override dispose(): void {702this._activeRequest?.handleConnectionDisposed();703this._activeRequest = undefined;704705if (this._ws) {706this._ws.close();707this._ws = undefined;708}709this._state = ConnectionState.Closed;710this._onDidDispose.fire();711super.dispose();712}713}714715class ChatWebSocketActiveRequest implements IChatWebSocketRequestHandle {716private readonly _onEvent = new Emitter<OpenAI.Responses.ResponseStreamEvent>();717readonly onEvent = this._onEvent.event;718719private readonly _onCAPIError = new Emitter<CAPIWebSocketErrorEvent>();720readonly onCAPIError = this._onCAPIError.event;721722private readonly _onError = new Emitter<Error>();723readonly onError = this._onError.event;724725private _resolveFirstEvent!: (event: OpenAI.Responses.ResponseStreamEvent | CAPIWebSocketErrorEvent) => void;726private _rejectFirstEvent!: (err: Error) => void;727private _firstEventSettled = false;728readonly firstEvent: Promise<OpenAI.Responses.ResponseStreamEvent | CAPIWebSocketErrorEvent>;729730private _resolve!: () => void;731private _reject!: (err: Error) => void;732private _settled = false;733private _onDidSettle?: (result: { outcome: ChatWebSocketRequestOutcome; closeCode?: number; closeReason?: string; serverErrorMessage?: string; serverErrorCode?: string }) => void;734735readonly done: Promise<void>;736737constructor(738readonly requestId: string,739readonly modelId: string | undefined,740readonly summarizedAtRoundId: string | undefined,741private readonly _configurationService: IConfigurationService,742private readonly _logService: ILogService,743) {744this.done = new Promise<void>((resolve, reject) => {745this._resolve = resolve;746this._reject = reject;747});748this.firstEvent = new Promise<OpenAI.Responses.ResponseStreamEvent | CAPIWebSocketErrorEvent>((resolve, reject) => {749this._resolveFirstEvent = resolve;750this._rejectFirstEvent = reject;751});752}753754onDidSettle(callback: (result: { outcome: ChatWebSocketRequestOutcome; closeCode?: number; closeReason?: string; serverErrorMessage?: string; serverErrorCode?: string }) => void): void {755this._onDidSettle = callback;756}757758handleEvent(event: OpenAI.Responses.ResponseStreamEvent | CAPIWebSocketErrorEvent): void {759if (this._settled) {760return;761}762763// E.g.: "github.copilot.chat.advanced.debug.simulateWebSocketResponse": "{\"type\":\"error\",\"error\":{\"code\":\"user_global_rate_limited:enterprise\",\"message\":\"Rate limit exceeded\"}}"764// E.g.: "github.copilot.chat.advanced.debug.simulateWebSocketResponse": "{\"type\":\"error\",\"error\":{\"code\":\"service_unavailable\",\"message\":\"service temporarily unavailable, please retry\"}}"765const simulateResponse = this._configurationService.getConfig(ConfigKey.TeamInternal.DebugSimulateWebSocketResponse);766if (simulateResponse) {767try {768event = JSON.parse(simulateResponse);769this._logService.info(`[ChatWebSocketManager] Simulating WebSocket response event: ${simulateResponse}`);770} catch (e) {771this._logService.error(`[ChatWebSocketManager] Failed to parse simulated WebSocket response: ${collectSingleLineErrorMessage(e)}`);772}773}774775if (!this._firstEventSettled) {776this._firstEventSettled = true;777this._resolveFirstEvent(event);778}779780if (isCAPIWebSocketError(event)) {781this._finalizeCAPIError(event);782return;783}784785this._onEvent.fire(event);786787const outcome = getStreamTerminatingOutcome(event);788if (outcome) {789this._finalizeSuccess(outcome);790}791}792793handleConnectionClose(code: number, reason: string, errorMessage?: string): void {794if (this._settled) {795return;796}797const error = errorMessage798? new Error(`${errorMessage} (close code: ${code} ${wsCloseCodeToString(code)}${reason ? `, reason: ${reason}` : ''})`)799: new Error(`WebSocket closed (code: ${code} ${wsCloseCodeToString(code)}${reason ? `, reason: ${reason}` : ''})`);800this._finalizeError('connection_closed', error, code, reason);801}802803handleSuperseded(): void {804if (this._settled) {805return;806}807this._finalizeError('superseded', new Error('Request superseded by new request'));808}809810handleCancellation(): void {811if (this._settled) {812return;813}814this._finalizeError('canceled', new CancellationError());815}816817handleConnectionDisposed(): void {818if (this._settled) {819return;820}821this._finalizeError('connection_disposed', new Error('Connection disposed'));822}823824private _finalizeSuccess(outcome: ChatWebSocketRequestOutcome): void {825this._settled = true;826this._onDidSettle?.({ outcome });827this._resolve();828this._dispose();829}830831private _finalizeCAPIError(event: CAPIWebSocketErrorEvent): void {832const { code, message } = event.error;833this._onCAPIError.fire(event);834this._settled = true;835this._onDidSettle?.({ outcome: 'error_response', serverErrorMessage: message, serverErrorCode: code });836this._reject(new Error(`${message} (${code})`));837this._dispose();838}839840private _finalizeError(outcome: ChatWebSocketRequestOutcome, error: Error, closeCode?: number, closeReason?: string, serverErrorMessage?: string, serverErrorCode?: string): void {841if (!this._firstEventSettled) {842this._firstEventSettled = true;843this._rejectFirstEvent(error);844}845this._onError.fire(error);846this._settled = true;847this._onDidSettle?.({ outcome, closeCode, closeReason, serverErrorMessage, serverErrorCode });848this._reject(error);849this._dispose();850}851852private _dispose(): void {853this._onEvent.dispose();854this._onCAPIError.dispose();855this._onError.dispose();856}857}858859860