Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/extensions/copilot/src/platform/networking/node/stream.ts
13401 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import type { CancellationToken } from 'vscode';
7
import { ILogService, LogLevel } from '../../log/common/logService';
8
import { ITelemetryService } from '../../telemetry/common/telemetry';
9
import { TelemetryData } from '../../telemetry/common/telemetryData';
10
import { RawThinkingDelta, ThinkingDelta } from '../../thinking/common/thinking';
11
import { extractThinkingDeltaFromChoice, } from '../../thinking/common/thinkingUtils';
12
import { FinishedCallback, getRequestId, ICodeVulnerabilityAnnotation, ICopilotBeginToolCall, ICopilotConfirmation, ICopilotError, ICopilotFunctionCall, ICopilotReference, ICopilotToolCall, ICopilotToolCallStreamUpdate, IIPCodeCitation, isCodeCitationAnnotation, isCopilotAnnotation, RequestId } from '../common/fetch';
13
import { DestroyableStream, Response } from '../common/fetcherService';
14
import { APIErrorResponse, APIJsonData, APIUsage, ChoiceLogProbs, FilterReason, FinishedCompletionReason, isApiUsage, IToolCall } from '../common/openai';
15
16
/** Gathers together many chunks of a single completion choice. */
17
class APIJsonDataStreaming {
18
19
constructor(public readonly model: string) { }
20
21
get text(): readonly string[] {
22
return this._text;
23
}
24
25
private _text: string[] = [];
26
private _newText: string[] = [];
27
28
append(choice: ExtendedChoiceJSON) {
29
if (choice.text) {
30
const str = APIJsonDataStreaming._removeCR(choice.text);
31
this._text.push(str);
32
this._newText.push(str);
33
}
34
if (choice.delta?.content) {
35
const str = APIJsonDataStreaming._removeCR(choice.delta.content);
36
this._text.push(str);
37
this._newText.push(str);
38
}
39
if (choice.delta?.function_call && (choice.delta.function_call.name || choice.delta.function_call.arguments)) {
40
const str = APIJsonDataStreaming._removeCR(choice.delta.function_call.arguments);
41
this._text.push(str);
42
this._newText.push(str);
43
}
44
}
45
46
flush(): string {
47
const delta = this._newText.join('');
48
this._newText = [];
49
return delta;
50
}
51
52
private static _removeCR(text: string): string {
53
return text.replace(/\r$/g, '');
54
}
55
56
toJSON() {
57
return {
58
text: this._text,
59
newText: this._newText
60
};
61
}
62
}
63
64
class StreamingToolCall {
65
public id: string | undefined;
66
public name: string | undefined;
67
public arguments: string = '';
68
69
constructor() { }
70
71
update(toolCall: IToolCall): boolean {
72
let argumentsChanged = false;
73
74
if (toolCall.id) {
75
this.id = toolCall.id;
76
}
77
78
if (toolCall.function?.name) {
79
this.name = toolCall.function.name;
80
}
81
82
if (toolCall.function?.arguments) {
83
this.arguments += toolCall.function.arguments;
84
argumentsChanged = true;
85
}
86
87
return argumentsChanged;
88
}
89
}
90
91
class StreamingToolCalls {
92
private toolCalls: StreamingToolCall[] = [];
93
94
constructor() { }
95
96
getToolCalls(): ICopilotToolCall[] {
97
return this.toolCalls.map(call => {
98
return {
99
name: call.name!,
100
arguments: call.arguments,
101
id: call.id!,
102
};
103
});
104
}
105
106
hasToolCalls(): boolean {
107
return this.toolCalls.length > 0;
108
}
109
110
update(choice: ExtendedChoiceJSON): ICopilotToolCallStreamUpdate[] {
111
const updates: ICopilotToolCallStreamUpdate[] = [];
112
choice.delta?.tool_calls?.forEach(toolCall => {
113
let currentCall: StreamingToolCall | undefined;
114
if (toolCall.id) {
115
currentCall = this.toolCalls.find(call => call.id === toolCall.id);
116
}
117
if (!currentCall) {
118
currentCall = this.toolCalls.at(-1);
119
}
120
if (!currentCall || (toolCall.id && currentCall.id && currentCall.id !== toolCall.id)) {
121
currentCall = new StreamingToolCall();
122
this.toolCalls.push(currentCall);
123
}
124
125
const argumentsChanged = currentCall.update(toolCall);
126
if (argumentsChanged && currentCall.name) {
127
updates.push({
128
name: currentCall.name,
129
arguments: currentCall.arguments,
130
id: currentCall.id,
131
});
132
}
133
});
134
return updates;
135
}
136
}
137
138
// Given a string of lines separated by one or more newlines, returns complete
139
// lines and any remaining partial line data. Exported for test only.
140
export function splitChunk(chunk: string): [string[], string] {
141
const dataLines = chunk.split('\n');
142
const newExtra = dataLines.pop(); // will be empty string if chunk ends with "\n"
143
return [dataLines.filter(line => line !== ''), newExtra!];
144
}
145
146
/**
147
* A single finished completion returned from the model or proxy, along with
148
* some metadata.
149
*/
150
export interface FinishedCompletion {
151
solution: APIJsonDataStreaming;
152
/** An optional offset into `solution.text.join('')` where the completion finishes. */
153
finishOffset: number | undefined;
154
/** A copilot-specific human-readable reason for the completion finishing. */
155
reason: FinishedCompletionReason;
156
/** A copilot-specific reason for filtering the response. Only returns when reason === FinishedCompletionReason.ContentFilter */
157
filterReason?: FilterReason;
158
error?: APIErrorResponse;
159
/** The token usage reported from CAPI */
160
usage?: APIUsage;
161
requestId: RequestId;
162
index: number;
163
}
164
165
/** What comes back from the OpenAI API for a single choice in an SSE chunk. */
166
interface ChoiceJSON {
167
index: number;
168
/**
169
* The text attribute as defined in completions streaming.
170
* See https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
171
*/
172
text?: string;
173
/**
174
* The delta attribute as defined in chat streaming.
175
* See https://github.com/openai/openai-cookbook/blob/main/examples/How_to_stream_completions.ipynb
176
*/
177
delta?: { content: string | null };
178
finish_reason?: FinishedCompletionReason.Stop | FinishedCompletionReason.Length | FinishedCompletionReason.FunctionCall | FinishedCompletionReason.ContentFilter | FinishedCompletionReason.ServerError | FinishedCompletionReason.ToolCalls | null;
179
logprobs?: ChoiceLogProbs;
180
}
181
182
/**
183
* Extensions to the OpenAI stream format
184
*/
185
interface ExtendedChoiceJSON extends ChoiceJSON {
186
content_filter_results?: Record<Exclude<FilterReason, FilterReason.Copyright>, { filtered: boolean; severity: string }>;
187
message?: RawThinkingDelta;
188
delta?: {
189
content: string | null;
190
copilot_annotations?: {
191
CodeVulnerability: ICodeVulnerabilityAnnotation[];
192
IPCodeCitations: IIPCodeCitation[];
193
TextCopyright: boolean | undefined;
194
Sexual: boolean | undefined;
195
SexualPattern: boolean | undefined;
196
Violence: boolean | undefined;
197
HateSpeech: boolean | undefined;
198
HateSpeechPattern: boolean | undefined;
199
SelfHarm: boolean | undefined;
200
PromptPromBlockList: boolean | undefined;
201
};
202
function_call?: { name: string; arguments: string };
203
tool_calls?: IToolCall[];
204
role?: string;
205
name?: string;
206
} & RawThinkingDelta;
207
}
208
209
/**
210
* Processes an HTTP request containing what is assumed to be an SSE stream of
211
* OpenAI API data. Yields a stream of `FinishedCompletion` objects, each as
212
* soon as it's finished.
213
*/
214
export class SSEProcessor {
215
private requestId: RequestId = getRequestId(this.response.headers);
216
/**
217
* A key & value being here means at least one chunk with that choice index
218
* has been received. A null value means we've already finished the given
219
* solution and should not process incoming tokens further.
220
*/
221
private readonly solutions: Record<number, APIJsonDataStreaming | null> = {};
222
223
private readonly completedFunctionCallIdxs: Map<number /* index */, 'function' | 'tool'> = new Map();
224
private readonly functionCalls: Record<string, APIJsonDataStreaming | null> = {};
225
private readonly toolCalls = new StreamingToolCalls();
226
private functionCallName: string | undefined = undefined;
227
228
private constructor(
229
private readonly logService: ILogService,
230
private readonly telemetryService: ITelemetryService,
231
private readonly expectedNumChoices: number,
232
private readonly response: Response,
233
private readonly body: DestroyableStream<string>,
234
private readonly cancellationToken?: CancellationToken
235
) { }
236
237
static async create(
238
logService: ILogService,
239
telemetryService: ITelemetryService,
240
expectedNumChoices: number,
241
response: Response,
242
cancellationToken?: CancellationToken
243
) {
244
const body = response.body.pipeThrough(new TextDecoderStream());
245
return new SSEProcessor(
246
logService,
247
telemetryService,
248
expectedNumChoices,
249
response,
250
body,
251
cancellationToken
252
);
253
}
254
255
/**
256
* Yields finished completions as soon as they are available. The finishedCb
257
* is used to determine when a completion is done and should be truncated.
258
* It is called on the whole of the received solution text, once at the end
259
* of the completion (if it stops by itself) and also on any chunk that has
260
* a newline in it.
261
*
262
* Closes the server request stream when all choices are finished/truncated
263
* (as long as fastCancellation is true).
264
*
265
* Note that for this to work, the caller must consume the entire stream.
266
* This happens automatically when using a `for await` loop, but when
267
* iterating manually this needs to be done by calling `.next()` until it
268
* returns an item with done = true (or calling `.return()`).
269
*/
270
async *processSSE(finishedCb: FinishedCallback = async () => undefined): AsyncIterable<FinishedCompletion> {
271
try {
272
// If it's n > 1 we don't handle usage as the usage is global for the stream and all our code assumes per choice
273
// Therefore we will just skip over the usage and yield the completions
274
if (this.expectedNumChoices > 1) {
275
for await (const usageOrCompletions of this.processSSEInner(finishedCb)) {
276
if (!isApiUsage(usageOrCompletions)) {
277
yield usageOrCompletions;
278
}
279
}
280
} else {
281
let completion: FinishedCompletion | undefined;
282
let usage: APIUsage | undefined;
283
284
// Process both the usage and the completions, then yield one combined completions
285
for await (const usageOrCompletions of this.processSSEInner(finishedCb)) {
286
if (isApiUsage(usageOrCompletions)) {
287
usage = usageOrCompletions;
288
} else {
289
completion = usageOrCompletions;
290
}
291
}
292
293
if (await this.maybeCancel('after receiving the completion, but maybe before we got the usage')) {
294
return;
295
}
296
297
if (completion) {
298
completion.usage = usage;
299
yield completion;
300
}
301
}
302
} finally {
303
await this.cancel();
304
this.logService.info(
305
`request done: requestId: [${this.requestId.headerRequestId}] model deployment ID: [${this.requestId.deploymentId}]`
306
);
307
}
308
}
309
310
private async *processSSEInner(finishedCb: FinishedCallback): AsyncIterable<FinishedCompletion | APIUsage> {
311
// Collects pieces of the SSE stream that haven't been fully processed yet.
312
let extraData = '';
313
// This flag is set when at least for one solution we finished early (via `finishedCb`).
314
let hadEarlyFinishedSolution = false;
315
316
// The platform agent can return a 'function_call' finish_reason, which isn't a real function call
317
// but is echoing internal function call messages back to us. So don't treat them as real function calls
318
// if we received more data after that
319
let allowCompletingSolution = true;
320
let thinkingFound = false;
321
322
// Iterate over arbitrarily sized chunks coming in from the network.
323
for await (const chunk of this.body) {
324
if (await this.maybeCancel('after awaiting body chunk')) {
325
return;
326
}
327
328
// this.logService.debug(chunk.toString());
329
const [dataLines, remainder] = splitChunk(extraData + chunk.toString());
330
extraData = remainder;
331
332
// Each dataLine is complete since we've seen at least one \n after it
333
334
for (const dataLine of dataLines) {
335
// Lines which start with a `:` are SSE Comments per the spec and can be ignored
336
if (dataLine.startsWith(':')) {
337
continue;
338
}
339
const lineWithoutData = dataLine.slice('data:'.length).trim();
340
if (lineWithoutData === '[DONE]') {
341
yield* this.finishSolutions();
342
return;
343
}
344
345
// TODO @lramos15 - This should not be an ugly inlined type like this
346
let json: {
347
choices: ExtendedChoiceJSON[] | undefined | null;
348
model: string;
349
error?: APIErrorResponse;
350
copilot_references?: any;
351
copilot_confirmation?: any;
352
copilot_errors: any;
353
usage: APIUsage | undefined;
354
};
355
try {
356
json = JSON.parse(lineWithoutData);
357
} catch (e) {
358
this.logService.error(`Error parsing JSON stream data for request id ${this.requestId.headerRequestId}:${dataLine}`);
359
sendCommunicationErrorTelemetry(this.telemetryService, `Error parsing JSON stream data for request id ${this.requestId.headerRequestId}:`, dataLine);
360
continue;
361
}
362
363
// Track usage data for this stream. Usage is global and not per choice. Therefore it's emitted as its own chunk
364
if (json.usage) {
365
yield json.usage;
366
}
367
368
// A message with a confirmation may or may not have 'choices'
369
if (json.copilot_confirmation && isCopilotConfirmation(json.copilot_confirmation)) {
370
await finishedCb('', 0, { text: '', copilotConfirmation: json.copilot_confirmation });
371
}
372
373
if (!json.choices) {
374
// Currently there are messages with a null 'choices' that include copilot_references- ignore these
375
if (!json.copilot_references && !json.copilot_confirmation) {
376
if (json.error !== undefined) {
377
this.logService.error(`Error in response for request id ${this.requestId.headerRequestId}:${json.error.message}`);
378
sendCommunicationErrorTelemetry(this.telemetryService, `Error in response for request id ${this.requestId.headerRequestId}:`, json.error.message);
379
// Encountered an error mid stream we immediately yield as the response is not usable.
380
yield {
381
index: 0,
382
finishOffset: undefined,
383
solution: new APIJsonDataStreaming(json.model || ''),
384
reason: FinishedCompletionReason.ServerError,
385
error: json.error,
386
requestId: this.requestId,
387
};
388
} else {
389
this.logService.error(`Unexpected response with no choices or error for request id ${this.requestId.headerRequestId}`);
390
sendCommunicationErrorTelemetry(this.telemetryService, `Unexpected response with no choices or error for request id ${this.requestId.headerRequestId}`);
391
}
392
}
393
394
// There are also messages with a null 'choices' that include copilot_errors- report these
395
if (json.copilot_errors) {
396
await finishedCb('', 0, { text: '', copilotErrors: json.copilot_errors });
397
}
398
399
if (json.copilot_references) {
400
await finishedCb('', 0, { text: '', copilotReferences: json.copilot_references });
401
}
402
403
continue;
404
}
405
406
if (this.requestId.created === 0) {
407
// Would only be 0 if we're the first actual response chunk
408
this.requestId = getRequestId(this.response.headers, json);
409
if (this.requestId.created === 0 && json.choices?.length) { // An initial chunk is sent with an empty choices array and no id, to hold `prompt_filter_results`
410
this.requestId.created = Math.floor(Date.now() / 1000);
411
}
412
}
413
414
for (let i = 0; i < json.choices.length; i++) {
415
const choice = json.choices[i];
416
417
this.logChoice(choice);
418
419
420
const thinkingDelta = extractThinkingDeltaFromChoice(choice);
421
422
// Once we observe any thinking text or an id in this batch, keep the flag true
423
thinkingFound ||= !!(thinkingDelta?.text || thinkingDelta?.id);
424
425
if (!(choice.index in this.solutions)) {
426
this.solutions[choice.index] = new APIJsonDataStreaming(json.model);
427
}
428
429
const solution = this.solutions[choice.index];
430
if (solution === null) {
431
if (thinkingDelta) {
432
await finishedCb('', choice.index, { text: '', thinking: thinkingDelta });
433
}
434
continue; // already finished
435
}
436
437
let finishOffset: number | undefined;
438
439
const emitSolution = async (delta?: { vulnAnnotations?: ICodeVulnerabilityAnnotation[]; ipCodeCitations?: IIPCodeCitation[]; references?: ICopilotReference[]; toolCalls?: ICopilotToolCall[]; toolCallStreamUpdates?: ICopilotToolCallStreamUpdate[]; functionCalls?: ICopilotFunctionCall[]; errors?: ICopilotError[]; beginToolCalls?: ICopilotBeginToolCall[]; thinking?: ThinkingDelta }) => {
440
if (delta?.vulnAnnotations && (!Array.isArray(delta.vulnAnnotations) || !delta.vulnAnnotations.every(a => isCopilotAnnotation(a)))) {
441
delta.vulnAnnotations = undefined;
442
}
443
444
// Validate code citation annotations carefully, because the API is a work in progress
445
if (delta?.ipCodeCitations && (!Array.isArray(delta.ipCodeCitations) || !delta.ipCodeCitations.every(isCodeCitationAnnotation))) {
446
delta.ipCodeCitations = undefined;
447
}
448
449
finishOffset = await finishedCb(solution.text.join(''), choice.index, {
450
text: solution.flush(),
451
logprobs: choice.logprobs,
452
codeVulnAnnotations: delta?.vulnAnnotations,
453
ipCitations: delta?.ipCodeCitations,
454
copilotReferences: delta?.references,
455
copilotToolCalls: delta?.toolCalls,
456
copilotToolCallStreamUpdates: delta?.toolCallStreamUpdates,
457
_deprecatedCopilotFunctionCalls: delta?.functionCalls,
458
beginToolCalls: delta?.beginToolCalls,
459
copilotErrors: delta?.errors,
460
thinking: thinkingDelta ?? delta?.thinking,
461
});
462
if (finishOffset !== undefined) {
463
hadEarlyFinishedSolution = true;
464
}
465
return await this.maybeCancel('after awaiting finishedCb');
466
};
467
468
let handled = true;
469
if (choice.delta?.tool_calls) {
470
const hadExistingToolCalls = this.toolCalls.hasToolCalls();
471
if (!hadExistingToolCalls) {
472
const firstToolCall = choice.delta.tool_calls.at(0);
473
const firstToolName = firstToolCall?.function?.name;
474
if (firstToolName) {
475
if (solution.text.length) {
476
// Flush the linkifier stream. See #16465
477
solution.append({ index: 0, delta: { content: ' ' } });
478
}
479
if (await emitSolution({ beginToolCalls: [{ name: firstToolName, id: firstToolCall?.id }] })) {
480
continue;
481
}
482
}
483
}
484
const toolCallStreamUpdates = this.toolCalls.update(choice);
485
if (toolCallStreamUpdates.length) {
486
if (await emitSolution({ toolCallStreamUpdates })) {
487
continue;
488
}
489
}
490
} else if (choice.delta?.copilot_annotations?.CodeVulnerability || choice.delta?.copilot_annotations?.IPCodeCitations) {
491
if (await emitSolution()) {
492
continue;
493
}
494
495
if (!hadEarlyFinishedSolution) {
496
solution.append(choice);
497
if (await emitSolution({ vulnAnnotations: choice.delta?.copilot_annotations?.CodeVulnerability, ipCodeCitations: choice.delta?.copilot_annotations?.IPCodeCitations })) {
498
continue;
499
}
500
}
501
} else if (choice.delta?.role === 'function') {
502
if (choice.delta.content) {
503
try {
504
const references = JSON.parse(choice.delta.content);
505
if (Array.isArray(references)) {
506
if (await emitSolution({ references: references })) {
507
continue;
508
}
509
}
510
} catch (ex) {
511
this.logService.error(`Error parsing function references: ${JSON.stringify(ex)}`);
512
}
513
}
514
} else if (choice.delta?.function_call && (choice.delta.function_call.name || choice.delta.function_call.arguments)) {
515
allowCompletingSolution = false;
516
this.functionCallName ??= choice.delta.function_call.name;
517
this.functionCalls[this.functionCallName] ??= new APIJsonDataStreaming(json.model);
518
const functionCall = this.functionCalls[this.functionCallName];
519
functionCall!.append(choice);
520
} else if ((choice.finish_reason === FinishedCompletionReason.FunctionCall || choice.finish_reason === FinishedCompletionReason.Stop) && this.functionCallName) {
521
// We don't want to yield the function call until we have all the data
522
const functionCallStreamObj = this.functionCalls[this.functionCallName];
523
const functionCall = { name: this.functionCallName, arguments: functionCallStreamObj!.flush() };
524
this.completedFunctionCallIdxs.set(choice.index, 'function');
525
try {
526
if (await emitSolution({ functionCalls: [functionCall] })) {
527
continue;
528
}
529
} catch (error) {
530
this.logService.error(error);
531
}
532
533
this.functionCalls[this.functionCallName] = null;
534
this.functionCallName = undefined;
535
if (choice.finish_reason === FinishedCompletionReason.FunctionCall) {
536
// See note about the 'function_call' finish_reason below
537
continue;
538
}
539
} else {
540
handled = false;
541
}
542
543
if ((choice.finish_reason === FinishedCompletionReason.ToolCalls || choice.finish_reason === FinishedCompletionReason.Stop) && this.toolCalls.hasToolCalls()) {
544
handled = true;
545
const toolCalls = this.toolCalls.getToolCalls();
546
this.completedFunctionCallIdxs.set(choice.index, 'tool');
547
const toolId = toolCalls.length > 0 ? toolCalls[0].id : undefined;
548
try {
549
if (await emitSolution({ toolCalls: toolCalls, thinking: (toolId && thinkingFound) ? { metadata: { toolId } } : undefined })) {
550
continue;
551
}
552
} catch (error) {
553
this.logService.error(error);
554
}
555
}
556
557
if (!handled) {
558
solution.append(choice);
559
560
// Call finishedCb to determine if the solution is now complete.
561
if (await emitSolution()) {
562
continue;
563
}
564
}
565
566
const solutionDone = Boolean(choice.finish_reason) || finishOffset !== undefined;
567
if (!solutionDone) {
568
continue;
569
}
570
// NOTE: When there is a finish_reason the text of subsequent chunks is always '',
571
// (current chunk might still have useful text, that is why we add it above).
572
// So we know that we already got all the text to be displayed for the user.
573
// TODO: This might contain additional logprobs for excluded next tokens. We should
574
// filter out indices that correspond to excluded tokens. It will not affect the
575
// text though.
576
yield {
577
solution,
578
finishOffset,
579
reason: choice.finish_reason ?? FinishedCompletionReason.ClientTrimmed,
580
filterReason: choiceToFilterReason(choice),
581
requestId: this.requestId,
582
index: choice.index,
583
};
584
585
if (await this.maybeCancel('after yielding finished choice')) {
586
return;
587
}
588
589
if (allowCompletingSolution) {
590
this.solutions[choice.index] = null;
591
}
592
}
593
}
594
}
595
596
// Yield whatever solutions remain incomplete in case no [DONE] was received.
597
// This shouldn't happen in practice unless there was an error somewhere.
598
for (const [index, solution] of Object.entries(this.solutions)) {
599
const solutionIndex = Number(index); // Convert `index` from string to number
600
if (solution === null) {
601
continue; // already finished
602
}
603
yield {
604
solution,
605
finishOffset: undefined,
606
reason: FinishedCompletionReason.ClientIterationDone,
607
requestId: this.requestId,
608
index: solutionIndex,
609
};
610
611
if (await this.maybeCancel('after yielding after iteration done')) {
612
return;
613
}
614
}
615
616
// Error message can be present in `extraData`
617
//
618
// When `finishedCb` decides to finish a solution early, it is possible that
619
// we will have unfinished or partial JSON data in `extraData` because we
620
// break out of the above for loop as soon as all solutions are finished.
621
//
622
// We don't want to alarm ourselves with such partial JSON data.
623
if (extraData.length > 0 && !hadEarlyFinishedSolution) {
624
try {
625
const extraDataJson = JSON.parse(extraData);
626
if (extraDataJson.error !== undefined) {
627
this.logService.error(extraDataJson.error, `Error in response: ${extraDataJson.error.message}`);
628
sendCommunicationErrorTelemetry(this.telemetryService, `Error in response: ${extraDataJson.error.message}`, extraDataJson.error);
629
}
630
} catch (e) {
631
this.logService.error(`Error parsing extraData for request id ${this.requestId.headerRequestId}: ${extraData}`);
632
sendCommunicationErrorTelemetry(this.telemetryService, `Error parsing extraData for request id ${this.requestId.headerRequestId}: ${extraData}`);
633
}
634
}
635
}
636
637
/** Yields the solutions that weren't yet finished, with a 'DONE' reason. */
638
private async *finishSolutions(): AsyncIterable<FinishedCompletion> {
639
for (const [index, solution] of Object.entries(this.solutions)) {
640
const solutionIndex = Number(index); // Convert `index` from string to number
641
if (solution === null) {
642
continue; // already finished
643
}
644
if (this.completedFunctionCallIdxs.has(solutionIndex)) {
645
yield {
646
solution,
647
finishOffset: undefined,
648
reason: this.completedFunctionCallIdxs.get(solutionIndex) === 'function' ? FinishedCompletionReason.FunctionCall : FinishedCompletionReason.ToolCalls,
649
requestId: this.requestId,
650
index: solutionIndex,
651
};
652
continue;
653
}
654
yield {
655
solution,
656
finishOffset: undefined,
657
reason: FinishedCompletionReason.ClientDone,
658
requestId: this.requestId,
659
index: solutionIndex,
660
};
661
662
if (await this.maybeCancel('after yielding on DONE')) {
663
return;
664
}
665
}
666
}
667
668
/**
669
* Returns whether the cancellation token was cancelled and closes the
670
* stream if it was.
671
*/
672
private async maybeCancel(description: string) {
673
if (this.cancellationToken?.isCancellationRequested) {
674
this.logService.debug('Cancelled: ' + description);
675
await this.cancel();
676
return true;
677
}
678
return false;
679
}
680
681
private async cancel() {
682
await this.response.body.destroy();
683
}
684
685
private logChoice(choice: ExtendedChoiceJSON) {
686
const choiceCopy: any = { ...choice };
687
delete choiceCopy.index;
688
delete choiceCopy.content_filter_results;
689
delete choiceCopy.content_filter_offsets;
690
this.logService.trace(`choice ${JSON.stringify(choiceCopy)}`);
691
}
692
}
693
694
// data: {"choices":null,"copilot_confirmation":{"type":"action","title":"Are you sure you want to proceed?","message":"This action is irreversible.","confirmation":{"id":"123"}},"id":null}
695
function isCopilotConfirmation(obj: unknown): obj is ICopilotConfirmation {
696
return typeof (obj as ICopilotConfirmation).title === 'string' &&
697
typeof (obj as ICopilotConfirmation).message === 'string' &&
698
!!(obj as ICopilotConfirmation).confirmation;
699
}
700
701
// Function to convert from APIJsonDataStreaming to APIJsonData format
702
export function convertToAPIJsonData(streamingData: APIJsonDataStreaming): APIJsonData {
703
const joinedText = streamingData.text.join('');
704
const out: APIJsonData = {
705
text: joinedText,
706
tokens: streamingData.text,
707
};
708
return out;
709
}
710
711
/**
712
* Given a choice from the API call, returns the reason for filtering out the choice, or undefined if the choice should not be filtered out.
713
* @param choice The choice from the API call
714
* @returns The reason for filtering out the choice, or undefined if the choice should not be filtered out.
715
*/
716
function choiceToFilterReason(choice: ExtendedChoiceJSON): FilterReason | undefined {
717
if (choice.finish_reason !== FinishedCompletionReason.ContentFilter) {
718
return undefined;
719
}
720
721
if (choice.delta?.copilot_annotations?.TextCopyright) {
722
return FilterReason.Copyright;
723
}
724
725
if (choice.delta?.copilot_annotations?.Sexual || choice.delta?.copilot_annotations?.SexualPattern) {
726
return FilterReason.Sexual;
727
}
728
if (choice.delta?.copilot_annotations?.Violence) {
729
return FilterReason.Violence;
730
}
731
732
if (choice.delta?.copilot_annotations?.HateSpeech || choice.delta?.copilot_annotations?.HateSpeechPattern) {
733
return FilterReason.Hate;
734
}
735
736
if (choice.delta?.copilot_annotations?.SelfHarm) {
737
return FilterReason.SelfHarm;
738
}
739
740
if (choice.delta?.copilot_annotations?.PromptPromBlockList) {
741
return FilterReason.Prompt;
742
}
743
744
if (!choice.content_filter_results) {
745
return undefined;
746
}
747
748
for (const filter of Object.keys(choice.content_filter_results) as Exclude<FilterReason, FilterReason.Copyright>[]) {
749
if (choice.content_filter_results[filter]?.filtered) {
750
return filter;
751
}
752
}
753
return undefined;
754
}
755
756
export function sendCommunicationErrorTelemetry(telemetryService: ITelemetryService, message: string, extra?: any) {
757
const args = [message, extra];
758
const secureMessage = (args.length > 0 ? JSON.stringify(args) : 'no msg');
759
760
const enhancedData = TelemetryData.createAndMarkAsIssued({
761
context: 'fetch',
762
level: LogLevel[LogLevel.Error],
763
message: secureMessage,
764
});
765
766
// send full content to secure telemetry
767
telemetryService.sendEnhancedGHTelemetryErrorEvent('log', enhancedData.properties, enhancedData.measurements);
768
769
const data = TelemetryData.createAndMarkAsIssued({
770
context: 'fetch',
771
level: LogLevel[LogLevel.Error],
772
message: '[redacted]',
773
});
774
775
// send content that excludes customer data to standard telemetry
776
telemetryService.sendGHTelemetryErrorEvent(
777
'log',
778
data.properties,
779
data.measurements
780
);
781
}
782
783