Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/extensions/copilot/src/platform/networking/node/chatWebSocketManager.ts
13400 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import type { OpenAI } from 'openai';
7
import { CloseEvent, ErrorEvent } from 'undici';
8
import { createServiceIdentifier } from '../../../util/common/services';
9
import { CancellationToken } from '../../../util/vs/base/common/cancellation';
10
import { CancellationError } from '../../../util/vs/base/common/errors';
11
import { Emitter, Event } from '../../../util/vs/base/common/event';
12
import { Disposable, IDisposable } from '../../../util/vs/base/common/lifecycle';
13
import { QuotaSnapshots } from '../../chat/common/chatQuotaService';
14
import { ConfigKey, IConfigurationService } from '../../configuration/common/configurationService';
15
import { ICAPIClientService } from '../../endpoint/common/capiClient';
16
import { ILogService, collectSingleLineErrorMessage } from '../../log/common/logService';
17
import { ITelemetryService } from '../../telemetry/common/telemetry';
18
import { HeadersImpl, IHeaders, WebSocketConnection } from '../common/fetcherService';
19
import { IEndpointBody } from '../common/networking';
20
import { getResponsesApiCompactionThresholdFromBody } from '../../endpoint/node/responsesApi';
21
import { ChatWebSocketRequestOutcome, ChatWebSocketTelemetrySender } from './chatWebSocketTelemetry';
22
23
export const IChatWebSocketManager = createServiceIdentifier<IChatWebSocketManager>('IChatWebSocketManager');
24
25
export interface IChatWebSocketManager {
26
readonly _serviceBrand: undefined;
27
28
/**
29
* Gets or creates a WebSocket connection for the given conversation.
30
* The connection is shared across turns and tool call rounds within
31
* the same conversation, keeping server-side context alive.
32
*/
33
getOrCreateConnection(conversationId: string, headers: Record<string, string>, initiatingRequestId: string): IChatWebSocketConnection;
34
35
/**
36
* Returns true if there is an open WebSocket connection for the given
37
* conversation. Used to decide whether the server already has context
38
* from earlier requests in this conversation.
39
*/
40
hasActiveConnection(conversationId: string): boolean;
41
42
/**
43
* Returns the stateful marker (last completed response ID) for the given
44
* conversation's active WebSocket connection, or undefined if there is
45
* no active connection or no marker yet.
46
*/
47
getStatefulMarker(conversationId: string): string | undefined;
48
49
/**
50
* Returns the round ID at which the last client-side summarization
51
* occurred for this connection, or undefined if none.
52
*/
53
getSummarizedAtRoundId(conversationId: string): string | undefined;
54
55
/**
56
* Closes and removes the connection for a specific conversation.
57
*/
58
closeConnection(conversationId: string): void;
59
60
/**
61
* Closes all active connections.
62
*/
63
closeAll(): void;
64
}
65
66
/**
67
* No-op implementation for contexts where WebSocket is not available (web, tests, chat-lib).
68
*/
69
export class NullChatWebSocketManager implements IChatWebSocketManager {
70
declare readonly _serviceBrand: undefined;
71
getOrCreateConnection(_conversationId: string, _headers?: Record<string, string>, _initiatingRequestId?: string): IChatWebSocketConnection {
72
throw new Error('WebSocket not available');
73
}
74
hasActiveConnection(_conversationId: string): boolean { return false; }
75
getStatefulMarker(_conversationId: string): string | undefined { return undefined; }
76
getSummarizedAtRoundId(_conversationId: string): string | undefined { return undefined; }
77
closeConnection(_conversationId: string): void { }
78
closeAll(): void { }
79
}
80
81
export interface IChatWebSocketRequestOptions {
82
userInitiated: boolean;
83
turnId: string;
84
requestId: string;
85
model: string;
86
countTokens: () => Promise<number>;
87
tokenCountMax: number;
88
modelMaxPromptTokens: number;
89
summarizedAtRoundId?: string;
90
modeChanged?: boolean;
91
}
92
93
export interface IChatWebSocketConnection extends IDisposable {
94
/** Opens the WebSocket connection. Must be called before sendRequest. */
95
connect(): Promise<void>;
96
97
/** Sends a response.create request and returns an async iterable of response events. */
98
sendRequest(
99
body: IEndpointBody,
100
options: IChatWebSocketRequestOptions,
101
token: CancellationToken,
102
): IChatWebSocketRequestHandle;
103
104
/** Whether the connection is currently open and usable. */
105
readonly isOpen: boolean;
106
107
/** Response headers from the WebSocket connection handshake. */
108
readonly responseHeaders: IHeaders;
109
110
/** Response status code from the WebSocket connection handshake. */
111
readonly responseStatusCode: number | undefined;
112
113
/** Response status text from the WebSocket connection handshake. */
114
readonly responseStatusText: string | undefined;
115
116
/** The GitHub request ID from response headers. */
117
readonly gitHubRequestId: string;
118
119
/**
120
* The response.id from the last completed response on this connection.
121
* Used as `previous_response_id` on subsequent requests to avoid
122
* re-sending the full message history.
123
*/
124
readonly statefulMarker: string | undefined;
125
}
126
127
export interface IChatWebSocketRequestHandle {
128
/** Fires for each OpenAI stream event received from the server. */
129
readonly onEvent: Event<OpenAI.Responses.ResponseStreamEvent>;
130
/** Fires when a CAPI WebSocket error is received (nested error shape). */
131
readonly onCAPIError: Event<CAPIWebSocketErrorEvent>;
132
/** Fires when a transport-level error occurs (connection lost, etc.). */
133
readonly onError: Event<Error>;
134
/**
135
* Resolves with the first event received from the server, or rejects
136
* if the connection errors/closes before any event arrives.
137
* Consumers can inspect the event type to decide the response kind
138
* (success stream vs. CAPI error) before processing remaining events.
139
*/
140
readonly firstEvent: Promise<OpenAI.Responses.ResponseStreamEvent | CAPIWebSocketErrorEvent>;
141
/** Resolves when the request has finished (completed or errored). */
142
readonly done: Promise<void>;
143
}
144
145
/**
146
* CAPI WebSocket error shape. Unlike the OpenAI SDK's flat `ResponseErrorEvent`
147
* (`{ type: "error", code, message }`), CAPI wraps the error details in a
148
* nested `error` object: `{ type: "error", error: { code, message } }`.
149
*
150
* Non-recoverable errors (rate limits, quota, upstream failures) also include
151
* `copilot_quota_snapshots` with per-model quota state.
152
*/
153
export interface CAPIWebSocketErrorEvent {
154
readonly type: 'error';
155
readonly error: {
156
readonly code: string;
157
readonly message: string;
158
};
159
readonly copilot_quota_snapshots?: QuotaSnapshots;
160
}
161
162
export function isCAPIWebSocketError(event: OpenAI.Responses.ResponseStreamEvent | CAPIWebSocketErrorEvent): event is CAPIWebSocketErrorEvent {
163
return event.type === 'error' && 'error' in event && typeof (event as CAPIWebSocketErrorEvent).error?.code === 'string';
164
}
165
166
const streamTerminatingOutcomes: Readonly<Record<string, ChatWebSocketRequestOutcome>> = {
167
'response.completed': 'completed',
168
'response.failed': 'response_failed',
169
'response.incomplete': 'response_incomplete',
170
'response.cancelled': 'response_cancelled',
171
'error': 'upstream_error',
172
};
173
174
function getStreamTerminatingOutcome(event: OpenAI.Responses.ResponseStreamEvent | CAPIWebSocketErrorEvent): ChatWebSocketRequestOutcome | undefined {
175
return streamTerminatingOutcomes[event.type];
176
}
177
178
export class ChatWebSocketManager extends Disposable implements IChatWebSocketManager {
179
declare readonly _serviceBrand: undefined;
180
181
private readonly _connections = new Map<string, ChatWebSocketConnection>();
182
183
constructor(
184
@ILogService private readonly _logService: ILogService,
185
@ICAPIClientService private readonly _capiClientService: ICAPIClientService,
186
@ITelemetryService private readonly _telemetryService: ITelemetryService,
187
@IConfigurationService private readonly _configurationService: IConfigurationService,
188
) {
189
super();
190
}
191
192
getOrCreateConnection(conversationId: string, headers: Record<string, string>, initiatingRequestId: string): IChatWebSocketConnection {
193
const existing = this._connections.get(conversationId);
194
195
// Reuse the connection if it's still open, even across turns.
196
if (existing?.isOpen) {
197
return existing;
198
}
199
200
if (existing) {
201
this._logService.debug(`[ChatWebSocketManager] Replacing closed connection for conversation ${conversationId}`);
202
existing.dispose();
203
this._connections.delete(conversationId);
204
}
205
206
const connection = new ChatWebSocketConnection(this._capiClientService, this._logService, this._telemetryService, this._configurationService, conversationId, headers, initiatingRequestId);
207
this._logService.debug(`[ChatWebSocketManager] Creating new connection for conversation ${conversationId}`);
208
this._connections.set(conversationId, connection);
209
210
// Remove from map when disposed externally
211
connection.onDidDispose(() => {
212
const entry = this._connections.get(conversationId);
213
if (entry === connection) {
214
this._connections.delete(conversationId);
215
}
216
});
217
218
return connection;
219
}
220
221
hasActiveConnection(conversationId: string): boolean {
222
const connection = this._connections.get(conversationId);
223
return !!connection?.isOpen;
224
}
225
226
getStatefulMarker(conversationId: string): string | undefined {
227
const connection = this._connections.get(conversationId);
228
return connection?.isOpen ? connection.statefulMarker : undefined;
229
}
230
231
getSummarizedAtRoundId(conversationId: string): string | undefined {
232
const connection = this._connections.get(conversationId);
233
return connection?.isOpen ? connection.summarizedAtRoundId : undefined;
234
}
235
236
closeConnection(conversationId: string): void {
237
const connection = this._connections.get(conversationId);
238
if (connection) {
239
if (connection.hasActiveRequest) {
240
this._logService.warn(`[ChatWebSocketManager] Closing connection for conversation ${conversationId} while turn ${connection.turnId} still has an active request`);
241
} else {
242
this._logService.debug(`[ChatWebSocketManager] Closing connection for conversation ${conversationId}`);
243
}
244
connection.dispose();
245
this._connections.delete(conversationId);
246
}
247
}
248
249
closeAll(): void {
250
for (const connection of this._connections.values()) {
251
connection.dispose();
252
}
253
this._connections.clear();
254
}
255
256
override dispose(): void {
257
this.closeAll();
258
super.dispose();
259
}
260
}
261
262
const enum ConnectionState {
263
Connecting,
264
Open,
265
Closed,
266
}
267
268
function wsCloseCodeToString(code: number): string {
269
switch (code) {
270
case 1000: return 'Normal Closure';
271
case 1001: return 'Going Away';
272
case 1002: return 'Protocol Error';
273
case 1003: return 'Unsupported Data';
274
case 1005: return 'No Status Received';
275
case 1006: return 'Abnormal Closure';
276
case 1007: return 'Invalid Payload';
277
case 1008: return 'Policy Violation';
278
case 1009: return 'Message Too Big';
279
case 1010: return 'Missing Extension';
280
case 1011: return 'Internal Error';
281
case 1012: return 'Service Restart';
282
case 1013: return 'Try Again Later';
283
case 1014: return 'Bad Gateway';
284
case 1015: return 'TLS Handshake Failed';
285
default: return 'Unknown';
286
}
287
}
288
289
class ChatWebSocketConnection extends Disposable implements IChatWebSocketConnection {
290
private _ws: WebSocket | undefined;
291
private _state: ConnectionState = ConnectionState.Closed;
292
private _activeRequest: ChatWebSocketActiveRequest | undefined;
293
private _statefulMarker: string | undefined;
294
private _summarizedAtRoundId: string | undefined;
295
296
private readonly _onDidDispose = this._register(new Emitter<void>());
297
readonly onDidDispose = this._onDidDispose.event;
298
299
private _connectStartTime: number | undefined;
300
private _connectedTime: number | undefined;
301
private _pendingErrorMessage: string | undefined;
302
private _totalSentMessageCount = 0;
303
private _totalReceivedMessageCount = 0;
304
private _totalSentCharacters = 0;
305
private _totalReceivedCharacters = 0;
306
private _responseHeaders: IHeaders = new HeadersImpl({});
307
private _responseStatusCode: number | undefined;
308
private _responseStatusText: string | undefined;
309
private _previousTurnId: string | undefined;
310
private _turnId: string | undefined;
311
private _hadActiveRequest = false;
312
313
constructor(
314
private readonly _capiClientService: ICAPIClientService,
315
private readonly _logService: ILogService,
316
private readonly _telemetryService: ITelemetryService,
317
private readonly _configurationService: IConfigurationService,
318
private readonly _conversationId: string,
319
private readonly _headers: Record<string, string>,
320
private readonly _initiatingRequestId: string,
321
) {
322
super();
323
}
324
325
get isOpen(): boolean {
326
return this._state === ConnectionState.Open && !!this._ws;
327
}
328
329
get hasActiveRequest(): boolean {
330
return !!this._activeRequest;
331
}
332
333
get turnId(): string | undefined {
334
return this._turnId;
335
}
336
337
get statefulMarker(): string | undefined {
338
return this._statefulMarker;
339
}
340
341
get summarizedAtRoundId(): string | undefined {
342
return this._summarizedAtRoundId;
343
}
344
345
get responseHeaders(): IHeaders {
346
return this._responseHeaders;
347
}
348
349
get responseStatusCode(): number | undefined {
350
return this._responseStatusCode;
351
}
352
353
get responseStatusText(): string | undefined {
354
return this._responseStatusText;
355
}
356
357
get gitHubRequestId(): string {
358
return this._responseHeaders.get('x-github-request-id') || '';
359
}
360
361
async connect(): Promise<void> {
362
if (this._state === ConnectionState.Open) {
363
return;
364
}
365
366
this._state = ConnectionState.Connecting;
367
this._connectStartTime = Date.now();
368
this._logService.debug(`[ChatWebSocketManager] Connecting WebSocket for conversation ${this._conversationId}`);
369
370
const connection: WebSocketConnection = await this._capiClientService.createResponsesWebSocket({
371
headers: this._headers,
372
});
373
374
return new Promise<void>((resolve, reject) => {
375
const ws = connection.webSocket;
376
377
const onOpen = () => {
378
cleanup();
379
this._state = ConnectionState.Open;
380
this._connectedTime = Date.now();
381
this._ws = ws;
382
this._responseHeaders = connection.responseHeaders;
383
this._responseStatusCode = connection.responseStatusCode;
384
this._responseStatusText = connection.responseStatusText;
385
this._setupMessageHandlers(ws);
386
const connectDurationMs = this._connectedTime - (this._connectStartTime ?? this._connectedTime);
387
this._logService.debug(`[ChatWebSocketManager] Connected for conversation ${this._conversationId}`);
388
ChatWebSocketTelemetrySender.sendConnectedTelemetry(this._telemetryService, {
389
conversationId: this._conversationId,
390
initiatingRequestId: this._initiatingRequestId,
391
gitHubRequestId: this.gitHubRequestId,
392
connectDurationMs,
393
});
394
resolve();
395
};
396
397
const onError = (event: ErrorEvent) => {
398
cleanup();
399
this._state = ConnectionState.Closed;
400
this._responseHeaders = connection.responseHeaders;
401
this._responseStatusCode = connection.responseStatusCode;
402
this._responseStatusText = connection.responseStatusText;
403
const errorMessage = event.error ? `${event.message}: ${collectSingleLineErrorMessage(event.error)}` : event.message || 'WebSocket error';
404
const networkError = event.error?.cause ?? connection.networkError;
405
const networkErrorMessage = networkError ? collectSingleLineErrorMessage(networkError) : undefined;
406
const connectDurationMs = Date.now() - (this._connectStartTime ?? Date.now());
407
this._logService.error(`[ChatWebSocketManager] Connection error for conversation ${this._conversationId}: ${errorMessage}${networkErrorMessage ? ` (cause: ${networkErrorMessage})` : ''}`);
408
ChatWebSocketTelemetrySender.sendConnectErrorTelemetry(this._telemetryService, {
409
conversationId: this._conversationId,
410
initiatingRequestId: this._initiatingRequestId,
411
gitHubRequestId: this.gitHubRequestId,
412
error: errorMessage,
413
connectDurationMs,
414
responseStatusCode: this._responseStatusCode,
415
responseStatusText: this._responseStatusText,
416
networkError: networkErrorMessage,
417
});
418
reject(new Error(errorMessage));
419
};
420
421
const onClose = (event: CloseEvent) => {
422
cleanup();
423
this._state = ConnectionState.Closed;
424
this._responseHeaders = connection.responseHeaders;
425
this._responseStatusCode = connection.responseStatusCode;
426
this._responseStatusText = connection.responseStatusText;
427
const connectDurationMs = Date.now() - (this._connectStartTime ?? Date.now());
428
const closeCodeDescription = wsCloseCodeToString(event.code);
429
this._logService.debug(`[ChatWebSocketManager] Connection closed during setup for conversation ${this._conversationId} (code: ${event.code} ${closeCodeDescription}, reason: ${event.reason || '<empty>'}, wasClean: ${event.wasClean})`);
430
ChatWebSocketTelemetrySender.sendCloseDuringSetupTelemetry(this._telemetryService, {
431
conversationId: this._conversationId,
432
initiatingRequestId: this._initiatingRequestId,
433
gitHubRequestId: this.gitHubRequestId,
434
closeCode: event.code,
435
closeReason: closeCodeDescription,
436
closeEventReason: event.reason,
437
closeEventWasClean: String(event.wasClean),
438
connectDurationMs,
439
});
440
reject(new Error('WebSocket closed during connection setup'));
441
};
442
443
const cleanup = () => {
444
ws.removeEventListener('open', onOpen);
445
ws.removeEventListener('error', onError);
446
ws.removeEventListener('close', onClose);
447
};
448
449
ws.addEventListener('open', onOpen);
450
ws.addEventListener('error', onError);
451
ws.addEventListener('close', onClose);
452
});
453
}
454
455
private _setupMessageHandlers(ws: WebSocket): void {
456
ws.addEventListener('message', (event) => {
457
if (typeof event.data !== 'string') {
458
return; // Only process text messages
459
}
460
461
const receivedMessageCharacters = event.data.length;
462
this._totalReceivedMessageCount += 1;
463
this._totalReceivedCharacters += receivedMessageCharacters;
464
const connectionDurationMs = Date.now() - (this._connectedTime ?? Date.now());
465
466
let parsed: OpenAI.Responses.ResponseStreamEvent | CAPIWebSocketErrorEvent;
467
try {
468
parsed = JSON.parse(event.data);
469
} catch (error) {
470
const parseErrorMessage = collectSingleLineErrorMessage(error) || 'Failed to parse websocket message';
471
this._logService.error(`[ChatWebSocketManager] Failed to parse message for conversation ${this._conversationId} turn ${this._turnId}: ${parseErrorMessage}`);
472
ChatWebSocketTelemetrySender.sendMessageParseErrorTelemetry(this._telemetryService, {
473
conversationId: this._conversationId,
474
initiatingRequestId: this._initiatingRequestId,
475
turnId: this._turnId,
476
previousTurnId: this._previousTurnId,
477
hadActiveRequest: this._hadActiveRequest,
478
requestId: this._activeRequest?.requestId,
479
gitHubRequestId: this.gitHubRequestId,
480
modelId: this._activeRequest?.modelId,
481
error: parseErrorMessage,
482
connectionDurationMs,
483
totalSentMessageCount: this._totalSentMessageCount,
484
totalReceivedMessageCount: this._totalReceivedMessageCount,
485
receivedMessageCharacters,
486
totalSentCharacters: this._totalSentCharacters,
487
totalReceivedCharacters: this._totalReceivedCharacters,
488
});
489
return;
490
}
491
492
if (!isCAPIWebSocketError(parsed) && parsed.type === 'response.completed') {
493
this._statefulMarker = parsed.response.id;
494
this._summarizedAtRoundId = this._activeRequest?.summarizedAtRoundId;
495
}
496
497
this._activeRequest?.handleEvent(parsed);
498
});
499
500
ws.addEventListener('close', (event) => {
501
this._state = ConnectionState.Closed;
502
const connectionDurationMs = Date.now() - (this._connectedTime ?? Date.now());
503
const closeCodeDescription = wsCloseCodeToString(event.code);
504
this._logService.debug(`[ChatWebSocketManager] Connection closed for conversation ${this._conversationId} turn ${this._turnId} (code: ${event.code} ${closeCodeDescription}, reason: ${event.reason || '<empty>'}, wasClean: ${event.wasClean})`);
505
ChatWebSocketTelemetrySender.sendCloseTelemetry(this._telemetryService, {
506
conversationId: this._conversationId,
507
initiatingRequestId: this._initiatingRequestId,
508
turnId: this._turnId,
509
previousTurnId: this._previousTurnId,
510
hadActiveRequest: this._hadActiveRequest,
511
requestId: this._activeRequest?.requestId,
512
gitHubRequestId: this.gitHubRequestId,
513
modelId: this._activeRequest?.modelId,
514
closeCode: event.code,
515
closeReason: closeCodeDescription,
516
closeEventReason: event.reason,
517
closeEventWasClean: String(event.wasClean),
518
connectionDurationMs,
519
totalSentMessageCount: this._totalSentMessageCount,
520
totalReceivedMessageCount: this._totalReceivedMessageCount,
521
totalSentCharacters: this._totalSentCharacters,
522
totalReceivedCharacters: this._totalReceivedCharacters,
523
});
524
const errorMessage = this._pendingErrorMessage;
525
this._pendingErrorMessage = undefined;
526
this._activeRequest?.handleConnectionClose(event.code, event.reason, errorMessage);
527
this._activeRequest = undefined;
528
});
529
530
ws.addEventListener('error', (event) => {
531
const errorMessage = event.error ? `${event.message}: ${collectSingleLineErrorMessage(event.error)}` : event.message || 'WebSocket error';
532
const connectionDurationMs = Date.now() - (this._connectedTime ?? Date.now());
533
this._logService.error(`[ChatWebSocketManager] Error for conversation ${this._conversationId} turn ${this._turnId}: ${errorMessage}`);
534
ChatWebSocketTelemetrySender.sendErrorTelemetry(this._telemetryService, {
535
conversationId: this._conversationId,
536
initiatingRequestId: this._initiatingRequestId,
537
turnId: this._turnId,
538
previousTurnId: this._previousTurnId,
539
hadActiveRequest: this._hadActiveRequest,
540
requestId: this._activeRequest?.requestId,
541
gitHubRequestId: this.gitHubRequestId,
542
modelId: this._activeRequest?.modelId,
543
error: errorMessage,
544
connectionDurationMs,
545
totalSentMessageCount: this._totalSentMessageCount,
546
totalReceivedMessageCount: this._totalReceivedMessageCount,
547
totalSentCharacters: this._totalSentCharacters,
548
totalReceivedCharacters: this._totalReceivedCharacters,
549
});
550
this._pendingErrorMessage ??= errorMessage;
551
});
552
}
553
554
sendRequest(body: IEndpointBody, options: IChatWebSocketRequestOptions, token: CancellationToken): IChatWebSocketRequestHandle {
555
if (!this._ws || this._state !== ConnectionState.Open) {
556
throw new Error('WebSocket is not connected');
557
}
558
559
const statefulMarkerMatched = this._statefulMarker === body.previous_response_id;
560
const previousResponseIdUnset = body.previous_response_id === undefined;
561
const hasCompactionData = body.input?.some(item => item?.type === 'compaction') ?? false;
562
const summarizedAtRoundIdSet = options.summarizedAtRoundId !== undefined;
563
const summarizedAtRoundIdMatched = options.summarizedAtRoundId === this._summarizedAtRoundId;
564
const compactionThreshold = getResponsesApiCompactionThresholdFromBody(body);
565
const statefulMarkerPrefix = this._statefulMarker?.slice(0, 5).concat('...') ?? '<none>';
566
const previousResponsePrefix = body.previous_response_id?.slice(0, 5).concat('...') ?? '<none>';
567
if (statefulMarkerMatched) {
568
this._logService.trace(`[ChatWebSocketManager] WebSocket stateful marker matches previous_response_id (${previousResponsePrefix}), summarizedAtRoundIdMatched: ${summarizedAtRoundIdMatched}`);
569
} else {
570
this._logService.debug(`[ChatWebSocketManager] WebSocket stateful marker (${statefulMarkerPrefix}) does not match previous_response_id (${previousResponsePrefix}), summarizedAtRoundIdMatched: ${summarizedAtRoundIdMatched}`);
571
}
572
573
// Supersede any in-flight request before updating turn state
574
const hadActiveRequest = !!this._activeRequest;
575
if (hadActiveRequest) {
576
this._logService.warn(`[ChatWebSocketManager] New request for conversation ${this._conversationId} turn ${options.turnId} while turn ${this._turnId} still has an active request`);
577
this._activeRequest!.handleSuperseded();
578
} else {
579
this._logService.debug(`[ChatWebSocketManager] New request for conversation ${this._conversationId} turn ${options.turnId} (previous turn: ${this._turnId})`);
580
}
581
582
// Update turn state after superseding so the old request's settle
583
// callback (which fires synchronously from handleSuperseded) still
584
// sees its own turnId on `this`.
585
const previousTurnId = this._turnId;
586
const turnId = options.turnId;
587
this._previousTurnId = previousTurnId;
588
this._turnId = turnId;
589
this._hadActiveRequest = hadActiveRequest;
590
591
const requestId = options.requestId;
592
593
const requestStartTime = Date.now();
594
const requestStartSentMessageCount = this._totalSentMessageCount;
595
const requestStartReceivedMessageCount = this._totalReceivedMessageCount;
596
const requestStartSentCharacters = this._totalSentCharacters;
597
const requestStartReceivedCharacters = this._totalReceivedCharacters;
598
const promptTokenCountPromise = options.countTokens();
599
let promptTokenCount = -1;
600
promptTokenCountPromise.then(count => { promptTokenCount = count; }, () => { promptTokenCount = -2; });
601
const request = new ChatWebSocketActiveRequest(requestId, options.model, options.summarizedAtRoundId, this._configurationService, this._logService);
602
request.onDidSettle(({ outcome, closeCode, closeReason, serverErrorMessage, serverErrorCode }) => {
603
if (this._activeRequest === request) {
604
this._activeRequest = undefined;
605
}
606
const connectionDurationMs = Date.now() - (this._connectedTime ?? Date.now());
607
const requestDurationMs = Date.now() - requestStartTime;
608
const requestSentMessageCount = this._totalSentMessageCount - requestStartSentMessageCount;
609
const requestReceivedMessageCount = this._totalReceivedMessageCount - requestStartReceivedMessageCount;
610
const requestSentCharacters = this._totalSentCharacters - requestStartSentCharacters;
611
const requestReceivedCharacters = this._totalReceivedCharacters - requestStartReceivedCharacters;
612
ChatWebSocketTelemetrySender.sendRequestOutcomeTelemetry(this._telemetryService, {
613
conversationId: this._conversationId,
614
initiatingRequestId: this._initiatingRequestId,
615
turnId,
616
previousTurnId,
617
hadActiveRequest,
618
requestId,
619
gitHubRequestId: this.gitHubRequestId,
620
modelId: options.model,
621
requestOutcome: outcome,
622
statefulMarkerMatched,
623
previousResponseIdUnset,
624
hasCompactionData,
625
summarizedAtRoundIdSet,
626
summarizedAtRoundIdMatched,
627
modeChanged: options.modeChanged,
628
compactionThreshold,
629
promptTokenCount,
630
tokenCountMax: options.tokenCountMax,
631
modelMaxPromptTokens: options.modelMaxPromptTokens,
632
connectionDurationMs,
633
requestDurationMs,
634
totalSentMessageCount: this._totalSentMessageCount,
635
totalReceivedMessageCount: this._totalReceivedMessageCount,
636
totalSentCharacters: this._totalSentCharacters,
637
totalReceivedCharacters: this._totalReceivedCharacters,
638
requestSentMessageCount,
639
requestReceivedMessageCount,
640
requestSentCharacters,
641
requestReceivedCharacters,
642
closeCode,
643
closeReason,
644
serverErrorMessage,
645
serverErrorCode,
646
});
647
});
648
this._activeRequest = request;
649
650
// Handle cancellation
651
const cancelDisposable = token.onCancellationRequested(() => {
652
if (this._activeRequest === request) {
653
request.handleCancellation();
654
this._activeRequest = undefined;
655
}
656
});
657
request.done.finally(() => cancelDisposable.dispose()).catch(() => { });
658
659
const { stream: _, ...rest } = body;
660
const message = {
661
type: 'response.create' as const,
662
...rest,
663
initiator: options.userInitiated ? 'user' : 'agent',
664
};
665
const serializedMessage = JSON.stringify(message);
666
const sentMessageCharacters = serializedMessage.length;
667
this._totalSentMessageCount += 1;
668
this._totalSentCharacters += sentMessageCharacters;
669
670
const connectionDurationMs = Date.now() - (this._connectedTime ?? Date.now());
671
this._logService.debug(`[ChatWebSocketManager] Sending request for conversation ${this._conversationId} turn ${this._turnId} (totalSentMessageCount: ${this._totalSentMessageCount}, sentMessageCharacters: ${sentMessageCharacters})`);
672
ChatWebSocketTelemetrySender.sendRequestSentTelemetry(this._telemetryService, {
673
conversationId: this._conversationId,
674
initiatingRequestId: this._initiatingRequestId,
675
turnId,
676
previousTurnId,
677
hadActiveRequest,
678
requestId,
679
gitHubRequestId: this.gitHubRequestId,
680
modelId: options.model,
681
statefulMarkerMatched,
682
previousResponseIdUnset,
683
hasCompactionData,
684
summarizedAtRoundIdSet,
685
summarizedAtRoundIdMatched,
686
modeChanged: options.modeChanged,
687
compactionThreshold,
688
tokenCountMax: options.tokenCountMax,
689
modelMaxPromptTokens: options.modelMaxPromptTokens,
690
connectionDurationMs,
691
totalSentMessageCount: this._totalSentMessageCount,
692
totalReceivedMessageCount: this._totalReceivedMessageCount,
693
sentMessageCharacters,
694
totalSentCharacters: this._totalSentCharacters,
695
totalReceivedCharacters: this._totalReceivedCharacters,
696
});
697
this._ws.send(serializedMessage);
698
699
return request;
700
}
701
702
override dispose(): void {
703
this._activeRequest?.handleConnectionDisposed();
704
this._activeRequest = undefined;
705
706
if (this._ws) {
707
this._ws.close();
708
this._ws = undefined;
709
}
710
this._state = ConnectionState.Closed;
711
this._onDidDispose.fire();
712
super.dispose();
713
}
714
}
715
716
class ChatWebSocketActiveRequest implements IChatWebSocketRequestHandle {
717
private readonly _onEvent = new Emitter<OpenAI.Responses.ResponseStreamEvent>();
718
readonly onEvent = this._onEvent.event;
719
720
private readonly _onCAPIError = new Emitter<CAPIWebSocketErrorEvent>();
721
readonly onCAPIError = this._onCAPIError.event;
722
723
private readonly _onError = new Emitter<Error>();
724
readonly onError = this._onError.event;
725
726
private _resolveFirstEvent!: (event: OpenAI.Responses.ResponseStreamEvent | CAPIWebSocketErrorEvent) => void;
727
private _rejectFirstEvent!: (err: Error) => void;
728
private _firstEventSettled = false;
729
readonly firstEvent: Promise<OpenAI.Responses.ResponseStreamEvent | CAPIWebSocketErrorEvent>;
730
731
private _resolve!: () => void;
732
private _reject!: (err: Error) => void;
733
private _settled = false;
734
private _onDidSettle?: (result: { outcome: ChatWebSocketRequestOutcome; closeCode?: number; closeReason?: string; serverErrorMessage?: string; serverErrorCode?: string }) => void;
735
736
readonly done: Promise<void>;
737
738
constructor(
739
readonly requestId: string,
740
readonly modelId: string | undefined,
741
readonly summarizedAtRoundId: string | undefined,
742
private readonly _configurationService: IConfigurationService,
743
private readonly _logService: ILogService,
744
) {
745
this.done = new Promise<void>((resolve, reject) => {
746
this._resolve = resolve;
747
this._reject = reject;
748
});
749
this.firstEvent = new Promise<OpenAI.Responses.ResponseStreamEvent | CAPIWebSocketErrorEvent>((resolve, reject) => {
750
this._resolveFirstEvent = resolve;
751
this._rejectFirstEvent = reject;
752
});
753
}
754
755
onDidSettle(callback: (result: { outcome: ChatWebSocketRequestOutcome; closeCode?: number; closeReason?: string; serverErrorMessage?: string; serverErrorCode?: string }) => void): void {
756
this._onDidSettle = callback;
757
}
758
759
handleEvent(event: OpenAI.Responses.ResponseStreamEvent | CAPIWebSocketErrorEvent): void {
760
if (this._settled) {
761
return;
762
}
763
764
// E.g.: "github.copilot.chat.advanced.debug.simulateWebSocketResponse": "{\"type\":\"error\",\"error\":{\"code\":\"user_global_rate_limited:enterprise\",\"message\":\"Rate limit exceeded\"}}"
765
// E.g.: "github.copilot.chat.advanced.debug.simulateWebSocketResponse": "{\"type\":\"error\",\"error\":{\"code\":\"service_unavailable\",\"message\":\"service temporarily unavailable, please retry\"}}"
766
const simulateResponse = this._configurationService.getConfig(ConfigKey.TeamInternal.DebugSimulateWebSocketResponse);
767
if (simulateResponse) {
768
try {
769
event = JSON.parse(simulateResponse);
770
this._logService.info(`[ChatWebSocketManager] Simulating WebSocket response event: ${simulateResponse}`);
771
} catch (e) {
772
this._logService.error(`[ChatWebSocketManager] Failed to parse simulated WebSocket response: ${collectSingleLineErrorMessage(e)}`);
773
}
774
}
775
776
if (!this._firstEventSettled) {
777
this._firstEventSettled = true;
778
this._resolveFirstEvent(event);
779
}
780
781
if (isCAPIWebSocketError(event)) {
782
this._finalizeCAPIError(event);
783
return;
784
}
785
786
this._onEvent.fire(event);
787
788
const outcome = getStreamTerminatingOutcome(event);
789
if (outcome) {
790
this._finalizeSuccess(outcome);
791
}
792
}
793
794
handleConnectionClose(code: number, reason: string, errorMessage?: string): void {
795
if (this._settled) {
796
return;
797
}
798
const error = errorMessage
799
? new Error(`${errorMessage} (close code: ${code} ${wsCloseCodeToString(code)}${reason ? `, reason: ${reason}` : ''})`)
800
: new Error(`WebSocket closed (code: ${code} ${wsCloseCodeToString(code)}${reason ? `, reason: ${reason}` : ''})`);
801
this._finalizeError('connection_closed', error, code, reason);
802
}
803
804
handleSuperseded(): void {
805
if (this._settled) {
806
return;
807
}
808
this._finalizeError('superseded', new Error('Request superseded by new request'));
809
}
810
811
handleCancellation(): void {
812
if (this._settled) {
813
return;
814
}
815
this._finalizeError('canceled', new CancellationError());
816
}
817
818
handleConnectionDisposed(): void {
819
if (this._settled) {
820
return;
821
}
822
this._finalizeError('connection_disposed', new Error('Connection disposed'));
823
}
824
825
private _finalizeSuccess(outcome: ChatWebSocketRequestOutcome): void {
826
this._settled = true;
827
this._onDidSettle?.({ outcome });
828
this._resolve();
829
this._dispose();
830
}
831
832
private _finalizeCAPIError(event: CAPIWebSocketErrorEvent): void {
833
const { code, message } = event.error;
834
this._onCAPIError.fire(event);
835
this._settled = true;
836
this._onDidSettle?.({ outcome: 'error_response', serverErrorMessage: message, serverErrorCode: code });
837
this._reject(new Error(`${message} (${code})`));
838
this._dispose();
839
}
840
841
private _finalizeError(outcome: ChatWebSocketRequestOutcome, error: Error, closeCode?: number, closeReason?: string, serverErrorMessage?: string, serverErrorCode?: string): void {
842
if (!this._firstEventSettled) {
843
this._firstEventSettled = true;
844
this._rejectFirstEvent(error);
845
}
846
this._onError.fire(error);
847
this._settled = true;
848
this._onDidSettle?.({ outcome, closeCode, closeReason, serverErrorMessage, serverErrorCode });
849
this._reject(error);
850
this._dispose();
851
}
852
853
private _dispose(): void {
854
this._onEvent.dispose();
855
this._onCAPIError.dispose();
856
this._onError.dispose();
857
}
858
}
859
860