Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/extensions/copilot/src/platform/chunking/common/chunkingEndpointClientImpl.ts
13400 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { RequestType } from '@vscode/copilot-api';
7
import { CallTracker } from '../../../util/common/telemetryCorrelationId';
8
import { coalesce } from '../../../util/vs/base/common/arrays';
9
import { DeferredPromise, raceCancellationError, timeout } from '../../../util/vs/base/common/async';
10
import { CancellationToken } from '../../../util/vs/base/common/cancellation';
11
import { Disposable } from '../../../util/vs/base/common/lifecycle';
12
import { LinkedList } from '../../../util/vs/base/common/linkedList';
13
import { isFalsyOrWhitespace } from '../../../util/vs/base/common/strings';
14
import { Range } from '../../../util/vs/editor/common/core/range';
15
import { IInstantiationService } from '../../../util/vs/platform/instantiation/common/instantiation';
16
import { Embedding, EmbeddingType, EmbeddingVector } from '../../embeddings/common/embeddingsComputer';
17
import { IEnvService } from '../../env/common/envService';
18
import { getGithubMetadataHeaders } from '../../github/common/githubApiFetcherService';
19
import { logExecTime } from '../../log/common/logExecTime';
20
import { ILogService } from '../../log/common/logService';
21
import { Response } from '../../networking/common/fetcherService';
22
import { postRequest } from '../../networking/common/networking';
23
import { IExperimentationService } from '../../telemetry/common/nullExperimentationService';
24
import { ITelemetryService } from '../../telemetry/common/telemetry';
25
import { getWorkspaceFileDisplayPath, IWorkspaceService } from '../../workspace/common/workspaceService';
26
import { FileChunkWithEmbedding, FileChunkWithOptionalEmbedding } from './chunk';
27
import { ChunkableContent, ComputeBatchInfo, EmbeddingsComputeQos, IChunkingEndpointClient } from './chunkingEndpointClient';
28
import { stripChunkTextMetadata } from './chunkingStringUtils';
29
30
type RequestTask = (attempt: number) => Promise<Response>;
31
32
class RequestRateLimiter extends Disposable {
33
private static readonly _abuseLimit = 1000.0 / 40.0; // 40 requests per second. Actually more like 20 but that causes too much stalling
34
35
private readonly _maxParallelChunksRequests: number;
36
37
/** Max number of times to retry a request before failing. */
38
private readonly _maxAttempts = 3;
39
40
/**
41
* Target quota usage percentage that we want to maintain.
42
*
43
* Anything under this will be sent as fast as possible. Once we go over this, we start sending requests slower
44
* and slower as we approach 100% quota usage.
45
*/
46
private readonly targetQuota = 80; // %
47
48
private readonly requestQueue = new LinkedList<{
49
readonly task: RequestTask;
50
readonly attempt: number;
51
readonly deferred: DeferredPromise<Response>;
52
readonly token: CancellationToken;
53
}>();
54
55
// State
56
private _numberInFlightRequests = 0;
57
private _lastSendTime = Date.now();
58
59
/** Timeout for the Github rate limit headers */
60
private _rateLimitTimeout?: Promise<void>;
61
62
/** The most recent status for the Github rate limit headers */
63
private _latestRateLimitHint?: {
64
readonly timestamp: number;
65
readonly remaining: number;
66
readonly resetAt: number;
67
};
68
69
/** The most recent status for the Github quota header */
70
private _latestQuotaUsed?: {
71
readonly timestamp: number;
72
readonly quota: number;
73
};
74
75
constructor(
76
@IExperimentationService experimentationService: IExperimentationService,
77
) {
78
super();
79
80
this._maxParallelChunksRequests = experimentationService.getTreatmentVariable<number>('workspace.embeddingIndex.maxParallelChunksRequests') ?? 8;
81
}
82
83
public enqueue(task: RequestTask, token: CancellationToken): Promise<Response> {
84
const deferred = new DeferredPromise<Response>();
85
token.onCancellationRequested(() => deferred.cancel());
86
87
this.requestQueue.push({ task, attempt: 0, deferred, token });
88
this.pump();
89
return deferred.p;
90
}
91
92
private _isPumping = false;
93
94
private async pump(): Promise<void> {
95
if (this._isPumping) {
96
return;
97
}
98
99
try {
100
this._isPumping = true;
101
while (!this.requestQueue.isEmpty()) {
102
if (this._rateLimitTimeout) {
103
await this._rateLimitTimeout;
104
this._rateLimitTimeout = undefined;
105
}
106
107
const elapsedSinceLastSend = Date.now() - this._lastSendTime;
108
if (elapsedSinceLastSend < RequestRateLimiter._abuseLimit) {
109
await timeout(RequestRateLimiter._abuseLimit - elapsedSinceLastSend);
110
}
111
112
if (this._numberInFlightRequests >= this._maxParallelChunksRequests) {
113
await timeout(10);
114
continue; // Check again
115
}
116
117
// Check the global github rate limit
118
if (this._latestRateLimitHint) {
119
const currentTime = Date.now();
120
if (currentTime < this._latestRateLimitHint.resetAt) {
121
if (this._latestRateLimitHint.remaining - this._numberInFlightRequests <= 0) {
122
// There are no remaining requests, wait until reset
123
const resetTimeSpan = this._latestRateLimitHint.resetAt - currentTime;
124
await timeout(Math.min(resetTimeSpan, 2_000));
125
}
126
}
127
}
128
129
// Check the quota percent
130
if (this._latestQuotaUsed && this._latestQuotaUsed.quota > this.targetQuota) {
131
const currentTime = Date.now();
132
const quotaDelta = this._latestQuotaUsed.quota - this.targetQuota;
133
const quotaDeltaTime = currentTime - this._latestQuotaUsed.timestamp;
134
135
const decayTime = 2500; // Estimated time for quota to reset
136
const maxDelay = 1000;
137
138
let quotaAdjustment = (quotaDelta / (100 - this.targetQuota));
139
quotaAdjustment *= Math.max(1.0 - (quotaDeltaTime / decayTime), 0); // Adjust by time passed
140
141
const delay = quotaAdjustment * maxDelay;
142
if (delay > 0) {
143
await timeout(Math.min(delay, maxDelay));
144
}
145
}
146
147
const e = this.requestQueue.shift()!;
148
if (e.token.isCancellationRequested) {
149
e.deferred.cancel();
150
continue;
151
}
152
153
// Send the request
154
this._numberInFlightRequests++;
155
this._lastSendTime = Date.now();
156
157
const request = e.task(e.attempt);
158
request.then(response => {
159
this.updateQuotasFromResponse(response);
160
161
if (e.token.isCancellationRequested) {
162
e.deferred.cancel();
163
return;
164
}
165
166
if (response.ok) {
167
e.deferred.complete(response);
168
return;
169
}
170
171
// Request failed, see if we can retry
172
if (e.attempt < this._maxAttempts) {
173
if (response.status === 429 || response.status === 403 || response.status === 408) {
174
const retryAfter_seconds = this.getRequestRetryDelay(response);
175
if (retryAfter_seconds > 0) {
176
this._rateLimitTimeout = timeout(retryAfter_seconds * 1000);
177
}
178
179
// Add back into the queue
180
this.requestQueue.unshift({ task: e.task, attempt: e.attempt + 1, deferred: e.deferred, token: e.token });
181
this.pump();
182
return;
183
}
184
}
185
186
// Unknown failure or max attempts reached, complete the failed response
187
e.deferred.complete(response);
188
}).catch(err => {
189
e.deferred.error(err);
190
}).finally(() => {
191
this._numberInFlightRequests--;
192
});
193
}
194
} finally {
195
this._isPumping = false;
196
}
197
}
198
199
private updateQuotasFromResponse(response: Response) {
200
const timestamp = Date.now();
201
try {
202
const rateLimitRemaining = response.headers.get('x-ratelimit-remaining');
203
const rateLimitReset = response.headers.get('x-ratelimit-reset');
204
if (rateLimitRemaining && rateLimitReset) {
205
this._latestRateLimitHint = {
206
timestamp: timestamp,
207
remaining: parseFloat(rateLimitRemaining),
208
resetAt: parseFloat(rateLimitReset) * 1000, // convert to ms
209
};
210
}
211
212
const totalQuotaUsed = response.headers.get('x-github-total-quota-used');
213
if (totalQuotaUsed) {
214
if (this._latestQuotaUsed) {
215
this._latestQuotaUsed = {
216
timestamp: timestamp,
217
quota: parseFloat(totalQuotaUsed)
218
};
219
} else {
220
this._latestQuotaUsed = {
221
timestamp: timestamp,
222
quota: parseFloat(totalQuotaUsed),
223
};
224
}
225
}
226
} catch (e) {
227
console.error('Error parsing rate limit headers', e);
228
// Ignore errors
229
}
230
}
231
232
/**
233
* Get the retry delay for a request based on the response.
234
*
235
* @returns The retry delay in seconds.
236
*/
237
private getRequestRetryDelay(response: Response) {
238
// Check `retry-after` header
239
try {
240
const retryAfterHeader = response.headers.get('retry-after');
241
if (retryAfterHeader) {
242
const intValue = parseFloat(retryAfterHeader);
243
if (!isNaN(intValue)) {
244
return intValue;
245
}
246
}
247
} catch {
248
// Noop
249
}
250
251
// Fallback to `x-ratelimit-reset` header
252
try {
253
const resetHeader = response.headers.get('x-ratelimit-reset');
254
if (resetHeader) {
255
const intValue = parseFloat(resetHeader);
256
if (!isNaN(intValue)) {
257
const currentEpochSeconds = Math.floor(Date.now() / 1000);
258
return intValue - currentEpochSeconds;
259
}
260
}
261
} catch {
262
// Noop
263
}
264
265
// Seeing if the request timed out which lets us use a faster retry
266
if (response.status === 408) {
267
return 0.25;
268
}
269
270
// Otherwise use a generic timeout
271
return 2;
272
}
273
}
274
275
type ChunksEndpointResponse = {
276
readonly chunks: readonly {
277
readonly hash: string;
278
readonly range: { start: number; end: number };
279
readonly line_range: { start: number; end: number };
280
readonly text?: string;
281
readonly embedding?: { model: string; embedding: EmbeddingVector };
282
}[];
283
284
readonly embedding_model: string;
285
};
286
287
export class ChunkingEndpointClientImpl extends Disposable implements IChunkingEndpointClient {
288
declare readonly _serviceBrand: undefined;
289
290
/**
291
* Limiter for request to the chunks endpoint.
292
*/
293
private readonly _requestLimiter: RequestRateLimiter;
294
295
constructor(
296
@IInstantiationService private readonly _instantiationService: IInstantiationService,
297
@IEnvService private readonly _envService: IEnvService,
298
@ILogService private readonly _logService: ILogService,
299
@ITelemetryService private readonly _telemetryService: ITelemetryService,
300
@IWorkspaceService private readonly _workspaceService: IWorkspaceService,
301
) {
302
super();
303
304
this._requestLimiter = this._register(this._instantiationService.createInstance(RequestRateLimiter));
305
}
306
307
public computeChunks(authToken: string, embeddingType: EmbeddingType, content: ChunkableContent, batchInfo: ComputeBatchInfo, qos: EmbeddingsComputeQos, cache: ReadonlyMap<string, FileChunkWithEmbedding> | undefined, telemetryInfo: CallTracker, token: CancellationToken): Promise<readonly FileChunkWithOptionalEmbedding[] | undefined> {
308
return this.doComputeChunksAndEmbeddings(authToken, embeddingType, content, batchInfo, { qos, computeEmbeddings: false }, cache, telemetryInfo, token);
309
}
310
311
public async computeChunksAndEmbeddings(authToken: string, embeddingType: EmbeddingType, content: ChunkableContent, batchInfo: ComputeBatchInfo, qos: EmbeddingsComputeQos, cache: ReadonlyMap<string, FileChunkWithEmbedding> | undefined, telemetryInfo: CallTracker, token: CancellationToken): Promise<readonly FileChunkWithEmbedding[] | undefined> {
312
const result = await this.doComputeChunksAndEmbeddings(authToken, embeddingType, content, batchInfo, { qos, computeEmbeddings: true }, cache, telemetryInfo, token);
313
return result as FileChunkWithEmbedding[] | undefined;
314
}
315
316
private async doComputeChunksAndEmbeddings(
317
authToken: string,
318
embeddingType: EmbeddingType,
319
content: ChunkableContent,
320
batchInfo: ComputeBatchInfo,
321
options: {
322
qos: EmbeddingsComputeQos;
323
computeEmbeddings: boolean;
324
},
325
cache: ReadonlyMap<string, FileChunkWithEmbedding> | undefined,
326
telemetryInfo: CallTracker,
327
token: CancellationToken
328
): Promise<readonly FileChunkWithOptionalEmbedding[] | undefined> {
329
const text = await raceCancellationError(content.getText(), token);
330
if (isFalsyOrWhitespace(text)) {
331
return [];
332
}
333
334
try {
335
const makeRequest = async (attempt: number) => {
336
return logExecTime(this._logService, `ChunksEndpointEmbeddingComputer.fetchChunksRequest(${content.uri}, attempt=${attempt})`, () => this._instantiationService.invokeFunction(postRequest, {
337
endpointOrUrl: { type: RequestType.Chunks },
338
secretKey: authToken,
339
intent: 'copilot-panel',
340
requestId: '',
341
body: {
342
embed: options.computeEmbeddings,
343
// Only to online set during re-ranking step
344
qos: options.qos,
345
content: text,
346
path: getWorkspaceFileDisplayPath(this._workspaceService, content.uri),
347
local_hashes: cache ? Array.from(cache.keys()) : [],
348
language_id: content.githubLanguageId,
349
embedding_model: embeddingType.id,
350
} satisfies {
351
embed: boolean;
352
qos: string;
353
content: string;
354
path: string;
355
local_hashes: string[];
356
language_id: number | undefined;
357
embedding_model: string;
358
} as any,
359
additionalHeaders: getGithubMetadataHeaders(telemetryInfo, this._envService),
360
cancelToken: token,
361
}));
362
};
363
364
batchInfo.recomputedFileCount++;
365
batchInfo.sentContentTextLength += text.length;
366
367
const response = await raceCancellationError(this._requestLimiter.enqueue(makeRequest, token), token);
368
if (!response.ok) {
369
this._logService.debug(`Error chunking '${content.uri}'. Status: ${response.status}. Status Text: ${response.statusText}.`);
370
371
/* __GDPR__
372
"workspaceChunkEmbeddingsIndex.computeChunksAndEmbeddings.error" : {
373
"owner": "mjbvz",
374
"comment": "Tracks errors from the chunks service",
375
"source": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "Caller of computeChunksAndEmbeddings" },
376
"responseStatus": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "isMeasurement": true, "comment": "Status code" }
377
}
378
*/
379
this._telemetryService.sendMSFTTelemetryEvent('workspaceChunkEmbeddingsIndex.computeChunksAndEmbeddings.error', {
380
source: telemetryInfo.toString(),
381
}, {
382
responseStatus: response.status,
383
});
384
385
return undefined;
386
}
387
388
const body: ChunksEndpointResponse = await response.json();
389
if (!body.chunks.length) {
390
return [];
391
}
392
393
return coalesce(body.chunks.map((chunk): FileChunkWithOptionalEmbedding | undefined => {
394
const range = new Range(chunk.line_range.start, 0, chunk.line_range.end, 0);
395
const cached = cache?.get(chunk.hash);
396
if (cached) {
397
return {
398
chunk: {
399
file: content.uri,
400
text: stripChunkTextMetadata(cached.chunk.text),
401
rawText: undefined,
402
range,
403
isFullFile: cached.chunk.isFullFile, // TODO: get from endpoint
404
},
405
chunkHash: chunk.hash,
406
embedding: cached.embedding,
407
};
408
}
409
410
if (typeof chunk.text !== 'string') {
411
// Invalid chunk
412
return undefined;
413
}
414
415
let embedding: Embedding | undefined;
416
if (chunk.embedding?.embedding) {
417
const returnedEmbeddingsType = new EmbeddingType(body.embedding_model);
418
if (!returnedEmbeddingsType.equals(embeddingType)) {
419
throw new Error(`Unexpected embedding model. Got: ${returnedEmbeddingsType}. Expected: ${embeddingType}`);
420
}
421
422
embedding = { type: returnedEmbeddingsType, value: chunk.embedding.embedding };
423
}
424
425
if (options.computeEmbeddings && !embedding) {
426
// Invalid chunk
427
return undefined;
428
}
429
430
return {
431
chunk: {
432
file: content.uri,
433
text: stripChunkTextMetadata(chunk.text),
434
rawText: undefined,
435
range,
436
isFullFile: false, // TODO: get from endpoint
437
},
438
chunkHash: chunk.hash,
439
embedding: embedding
440
};
441
}));
442
443
} catch (e) {
444
this._logService.error(e);
445
return undefined;
446
}
447
}
448
}
449
450