Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/extensions/copilot/src/platform/nesFetch/common/responseStream.ts
13401 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { ErrorUtils } from '../../../util/common/errors';
7
import { Result } from '../../../util/common/result';
8
import { DeferredPromise } from '../../../util/vs/base/common/async';
9
import { assertType } from '../../../util/vs/base/common/types';
10
import { RequestId } from '../../networking/common/fetch';
11
import { IHeaders, Response } from '../../networking/common/fetcherService';
12
import { Completion } from './completionsAPI';
13
14
export class ResponseStream {
15
/**
16
* A promise that resolves to the array of completions that were emitted by the stream.
17
*
18
* (it's expected to not throw)
19
*/
20
public readonly aggregatedStream: Promise<Result<Completion[], Error>>;
21
22
/**
23
* A completion that aggregates completions stream.
24
*
25
* (it's expected to not throw)
26
*/
27
public readonly response: Promise<Result<Completion, Error>>;
28
29
/**
30
* The stream of completions that were emitted by the response.
31
*
32
* @remarks This stream is single-use — it can only be iterated once.
33
*
34
* @throws {Error} if the response stream throws an error.
35
*/
36
public readonly stream: AsyncIterable<Completion>;
37
38
constructor(private readonly fetcherResponse: Response, stream: AsyncIterable<Completion>, public readonly requestId: RequestId, public readonly headers: IHeaders) {
39
const tokensDeferredPromise = new DeferredPromise<Result<Completion[], Error>>();
40
this.aggregatedStream = tokensDeferredPromise.p;
41
this.response = this.aggregatedStream.then((completions) => {
42
if (completions.isError()) {
43
return completions;
44
}
45
try {
46
return Result.ok(ResponseStream.aggregateCompletionsStream(completions.val));
47
} catch (err) {
48
return Result.error(err);
49
}
50
});
51
52
this.stream = streamWithAggregation(stream, tokensDeferredPromise);
53
}
54
55
/**
56
* @throws client of the method should handle the error
57
*/
58
public async destroy(): Promise<void> {
59
await this.fetcherResponse.body.destroy();
60
}
61
62
private static aggregateCompletionsStream(stream: Completion[]): Completion {
63
let text = '';
64
let finishReason: Completion.FinishReason | null = null;
65
let aggregatedLogsProbs: Completion.LogProbs | null = null;
66
let aggregatedUsage: Completion.Usage | undefined = undefined;
67
68
for (const completion of stream) {
69
const choice = completion.choices[0]; // TODO@ulugbekna: we only support choice.index=0
70
text += choice.text ?? '';
71
if (choice.logprobs) {
72
if (aggregatedLogsProbs === null) {
73
aggregatedLogsProbs = {
74
tokens: [...choice.logprobs.tokens],
75
token_logprobs: [...choice.logprobs.token_logprobs],
76
text_offset: [...choice.logprobs.text_offset],
77
top_logprobs: [...choice.logprobs.top_logprobs],
78
};
79
} else {
80
aggregatedLogsProbs.tokens.push(...choice.logprobs.tokens);
81
aggregatedLogsProbs.token_logprobs.push(...choice.logprobs.token_logprobs);
82
aggregatedLogsProbs.text_offset.push(...choice.logprobs.text_offset);
83
aggregatedLogsProbs.top_logprobs.push(...choice.logprobs.top_logprobs);
84
}
85
}
86
if (completion.usage) {
87
if (aggregatedUsage === undefined) {
88
aggregatedUsage = {
89
completion_tokens: completion.usage.completion_tokens,
90
prompt_tokens: completion.usage.prompt_tokens,
91
total_tokens: completion.usage.total_tokens,
92
completion_tokens_details: {
93
audio_tokens: completion.usage.completion_tokens_details.audio_tokens,
94
reasoning_tokens: completion.usage.completion_tokens_details.reasoning_tokens,
95
},
96
prompt_tokens_details: {
97
audio_tokens: completion.usage.prompt_tokens_details.audio_tokens,
98
reasoning_tokens: completion.usage.prompt_tokens_details.reasoning_tokens,
99
}
100
};
101
} else {
102
aggregatedUsage.completion_tokens += completion.usage.completion_tokens;
103
aggregatedUsage.prompt_tokens += completion.usage.prompt_tokens;
104
aggregatedUsage.total_tokens += completion.usage.total_tokens;
105
aggregatedUsage.completion_tokens_details.audio_tokens += completion.usage.completion_tokens_details.audio_tokens;
106
aggregatedUsage.completion_tokens_details.reasoning_tokens += completion.usage.completion_tokens_details.reasoning_tokens;
107
aggregatedUsage.prompt_tokens_details.audio_tokens += completion.usage.prompt_tokens_details.audio_tokens;
108
aggregatedUsage.prompt_tokens_details.reasoning_tokens += completion.usage.prompt_tokens_details.reasoning_tokens;
109
}
110
}
111
if (choice.finish_reason) {
112
assertType(
113
finishReason === null,
114
'cannot already have finishReason if just seeing choice.finish_reason'
115
);
116
finishReason = choice.finish_reason;
117
}
118
}
119
120
if (stream.length === 0) {
121
throw new Error(`Response is empty!`);
122
}
123
124
const completion = stream[0];
125
126
const choice: Completion.Choice = {
127
index: 0,
128
finish_reason: finishReason,
129
logprobs: aggregatedLogsProbs,
130
text,
131
};
132
133
const aggregatedCompletion: Completion = {
134
choices: [choice],
135
system_fingerprint: completion.system_fingerprint,
136
object: completion.object,
137
usage: aggregatedUsage,
138
};
139
140
return aggregatedCompletion;
141
}
142
}
143
144
/**
145
* Wraps an async iterable stream into an async generator that also collects completions
146
* for aggregation and resolves the deferred promise when done.
147
*/
148
async function* streamWithAggregation(
149
stream: AsyncIterable<Completion>,
150
deferredPromise: DeferredPromise<Result<Completion[], Error>>
151
): AsyncGenerator<Completion> {
152
const completions: Completion[] = [];
153
let error: Error | undefined;
154
try {
155
for await (const completion of stream) {
156
completions.push(completion);
157
yield completion;
158
}
159
} catch (e: unknown) {
160
error = ErrorUtils.fromUnknown(e);
161
throw error;
162
} finally {
163
deferredPromise.complete(
164
error ? Result.error(error) : Result.ok(completions)
165
);
166
}
167
}
168
169