Path: blob/main/extensions/copilot/src/platform/chunking/common/chunkingEndpointClientImpl.ts
13400 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { RequestType } from '@vscode/copilot-api';6import { CallTracker } from '../../../util/common/telemetryCorrelationId';7import { coalesce } from '../../../util/vs/base/common/arrays';8import { DeferredPromise, raceCancellationError, timeout } from '../../../util/vs/base/common/async';9import { CancellationToken } from '../../../util/vs/base/common/cancellation';10import { Disposable } from '../../../util/vs/base/common/lifecycle';11import { LinkedList } from '../../../util/vs/base/common/linkedList';12import { isFalsyOrWhitespace } from '../../../util/vs/base/common/strings';13import { Range } from '../../../util/vs/editor/common/core/range';14import { IInstantiationService } from '../../../util/vs/platform/instantiation/common/instantiation';15import { Embedding, EmbeddingType, EmbeddingVector } from '../../embeddings/common/embeddingsComputer';16import { IEnvService } from '../../env/common/envService';17import { getGithubMetadataHeaders } from '../../github/common/githubApiFetcherService';18import { logExecTime } from '../../log/common/logExecTime';19import { ILogService } from '../../log/common/logService';20import { Response } from '../../networking/common/fetcherService';21import { postRequest } from '../../networking/common/networking';22import { IExperimentationService } from '../../telemetry/common/nullExperimentationService';23import { ITelemetryService } from '../../telemetry/common/telemetry';24import { getWorkspaceFileDisplayPath, IWorkspaceService } from '../../workspace/common/workspaceService';25import { FileChunkWithEmbedding, FileChunkWithOptionalEmbedding } from './chunk';26import { ChunkableContent, ComputeBatchInfo, EmbeddingsComputeQos, IChunkingEndpointClient } from './chunkingEndpointClient';27import { stripChunkTextMetadata } from './chunkingStringUtils';2829type RequestTask = (attempt: number) => Promise<Response>;3031class RequestRateLimiter extends Disposable {32private static readonly _abuseLimit = 1000.0 / 40.0; // 40 requests per second. Actually more like 20 but that causes too much stalling3334private readonly _maxParallelChunksRequests: number;3536/** Max number of times to retry a request before failing. */37private readonly _maxAttempts = 3;3839/**40* Target quota usage percentage that we want to maintain.41*42* Anything under this will be sent as fast as possible. Once we go over this, we start sending requests slower43* and slower as we approach 100% quota usage.44*/45private readonly targetQuota = 80; // %4647private readonly requestQueue = new LinkedList<{48readonly task: RequestTask;49readonly attempt: number;50readonly deferred: DeferredPromise<Response>;51readonly token: CancellationToken;52}>();5354// State55private _numberInFlightRequests = 0;56private _lastSendTime = Date.now();5758/** Timeout for the Github rate limit headers */59private _rateLimitTimeout?: Promise<void>;6061/** The most recent status for the Github rate limit headers */62private _latestRateLimitHint?: {63readonly timestamp: number;64readonly remaining: number;65readonly resetAt: number;66};6768/** The most recent status for the Github quota header */69private _latestQuotaUsed?: {70readonly timestamp: number;71readonly quota: number;72};7374constructor(75@IExperimentationService experimentationService: IExperimentationService,76) {77super();7879this._maxParallelChunksRequests = experimentationService.getTreatmentVariable<number>('workspace.embeddingIndex.maxParallelChunksRequests') ?? 8;80}8182public enqueue(task: RequestTask, token: CancellationToken): Promise<Response> {83const deferred = new DeferredPromise<Response>();84token.onCancellationRequested(() => deferred.cancel());8586this.requestQueue.push({ task, attempt: 0, deferred, token });87this.pump();88return deferred.p;89}9091private _isPumping = false;9293private async pump(): Promise<void> {94if (this._isPumping) {95return;96}9798try {99this._isPumping = true;100while (!this.requestQueue.isEmpty()) {101if (this._rateLimitTimeout) {102await this._rateLimitTimeout;103this._rateLimitTimeout = undefined;104}105106const elapsedSinceLastSend = Date.now() - this._lastSendTime;107if (elapsedSinceLastSend < RequestRateLimiter._abuseLimit) {108await timeout(RequestRateLimiter._abuseLimit - elapsedSinceLastSend);109}110111if (this._numberInFlightRequests >= this._maxParallelChunksRequests) {112await timeout(10);113continue; // Check again114}115116// Check the global github rate limit117if (this._latestRateLimitHint) {118const currentTime = Date.now();119if (currentTime < this._latestRateLimitHint.resetAt) {120if (this._latestRateLimitHint.remaining - this._numberInFlightRequests <= 0) {121// There are no remaining requests, wait until reset122const resetTimeSpan = this._latestRateLimitHint.resetAt - currentTime;123await timeout(Math.min(resetTimeSpan, 2_000));124}125}126}127128// Check the quota percent129if (this._latestQuotaUsed && this._latestQuotaUsed.quota > this.targetQuota) {130const currentTime = Date.now();131const quotaDelta = this._latestQuotaUsed.quota - this.targetQuota;132const quotaDeltaTime = currentTime - this._latestQuotaUsed.timestamp;133134const decayTime = 2500; // Estimated time for quota to reset135const maxDelay = 1000;136137let quotaAdjustment = (quotaDelta / (100 - this.targetQuota));138quotaAdjustment *= Math.max(1.0 - (quotaDeltaTime / decayTime), 0); // Adjust by time passed139140const delay = quotaAdjustment * maxDelay;141if (delay > 0) {142await timeout(Math.min(delay, maxDelay));143}144}145146const e = this.requestQueue.shift()!;147if (e.token.isCancellationRequested) {148e.deferred.cancel();149continue;150}151152// Send the request153this._numberInFlightRequests++;154this._lastSendTime = Date.now();155156const request = e.task(e.attempt);157request.then(response => {158this.updateQuotasFromResponse(response);159160if (e.token.isCancellationRequested) {161e.deferred.cancel();162return;163}164165if (response.ok) {166e.deferred.complete(response);167return;168}169170// Request failed, see if we can retry171if (e.attempt < this._maxAttempts) {172if (response.status === 429 || response.status === 403 || response.status === 408) {173const retryAfter_seconds = this.getRequestRetryDelay(response);174if (retryAfter_seconds > 0) {175this._rateLimitTimeout = timeout(retryAfter_seconds * 1000);176}177178// Add back into the queue179this.requestQueue.unshift({ task: e.task, attempt: e.attempt + 1, deferred: e.deferred, token: e.token });180this.pump();181return;182}183}184185// Unknown failure or max attempts reached, complete the failed response186e.deferred.complete(response);187}).catch(err => {188e.deferred.error(err);189}).finally(() => {190this._numberInFlightRequests--;191});192}193} finally {194this._isPumping = false;195}196}197198private updateQuotasFromResponse(response: Response) {199const timestamp = Date.now();200try {201const rateLimitRemaining = response.headers.get('x-ratelimit-remaining');202const rateLimitReset = response.headers.get('x-ratelimit-reset');203if (rateLimitRemaining && rateLimitReset) {204this._latestRateLimitHint = {205timestamp: timestamp,206remaining: parseFloat(rateLimitRemaining),207resetAt: parseFloat(rateLimitReset) * 1000, // convert to ms208};209}210211const totalQuotaUsed = response.headers.get('x-github-total-quota-used');212if (totalQuotaUsed) {213if (this._latestQuotaUsed) {214this._latestQuotaUsed = {215timestamp: timestamp,216quota: parseFloat(totalQuotaUsed)217};218} else {219this._latestQuotaUsed = {220timestamp: timestamp,221quota: parseFloat(totalQuotaUsed),222};223}224}225} catch (e) {226console.error('Error parsing rate limit headers', e);227// Ignore errors228}229}230231/**232* Get the retry delay for a request based on the response.233*234* @returns The retry delay in seconds.235*/236private getRequestRetryDelay(response: Response) {237// Check `retry-after` header238try {239const retryAfterHeader = response.headers.get('retry-after');240if (retryAfterHeader) {241const intValue = parseFloat(retryAfterHeader);242if (!isNaN(intValue)) {243return intValue;244}245}246} catch {247// Noop248}249250// Fallback to `x-ratelimit-reset` header251try {252const resetHeader = response.headers.get('x-ratelimit-reset');253if (resetHeader) {254const intValue = parseFloat(resetHeader);255if (!isNaN(intValue)) {256const currentEpochSeconds = Math.floor(Date.now() / 1000);257return intValue - currentEpochSeconds;258}259}260} catch {261// Noop262}263264// Seeing if the request timed out which lets us use a faster retry265if (response.status === 408) {266return 0.25;267}268269// Otherwise use a generic timeout270return 2;271}272}273274type ChunksEndpointResponse = {275readonly chunks: readonly {276readonly hash: string;277readonly range: { start: number; end: number };278readonly line_range: { start: number; end: number };279readonly text?: string;280readonly embedding?: { model: string; embedding: EmbeddingVector };281}[];282283readonly embedding_model: string;284};285286export class ChunkingEndpointClientImpl extends Disposable implements IChunkingEndpointClient {287declare readonly _serviceBrand: undefined;288289/**290* Limiter for request to the chunks endpoint.291*/292private readonly _requestLimiter: RequestRateLimiter;293294constructor(295@IInstantiationService private readonly _instantiationService: IInstantiationService,296@IEnvService private readonly _envService: IEnvService,297@ILogService private readonly _logService: ILogService,298@ITelemetryService private readonly _telemetryService: ITelemetryService,299@IWorkspaceService private readonly _workspaceService: IWorkspaceService,300) {301super();302303this._requestLimiter = this._register(this._instantiationService.createInstance(RequestRateLimiter));304}305306public computeChunks(authToken: string, embeddingType: EmbeddingType, content: ChunkableContent, batchInfo: ComputeBatchInfo, qos: EmbeddingsComputeQos, cache: ReadonlyMap<string, FileChunkWithEmbedding> | undefined, telemetryInfo: CallTracker, token: CancellationToken): Promise<readonly FileChunkWithOptionalEmbedding[] | undefined> {307return this.doComputeChunksAndEmbeddings(authToken, embeddingType, content, batchInfo, { qos, computeEmbeddings: false }, cache, telemetryInfo, token);308}309310public async computeChunksAndEmbeddings(authToken: string, embeddingType: EmbeddingType, content: ChunkableContent, batchInfo: ComputeBatchInfo, qos: EmbeddingsComputeQos, cache: ReadonlyMap<string, FileChunkWithEmbedding> | undefined, telemetryInfo: CallTracker, token: CancellationToken): Promise<readonly FileChunkWithEmbedding[] | undefined> {311const result = await this.doComputeChunksAndEmbeddings(authToken, embeddingType, content, batchInfo, { qos, computeEmbeddings: true }, cache, telemetryInfo, token);312return result as FileChunkWithEmbedding[] | undefined;313}314315private async doComputeChunksAndEmbeddings(316authToken: string,317embeddingType: EmbeddingType,318content: ChunkableContent,319batchInfo: ComputeBatchInfo,320options: {321qos: EmbeddingsComputeQos;322computeEmbeddings: boolean;323},324cache: ReadonlyMap<string, FileChunkWithEmbedding> | undefined,325telemetryInfo: CallTracker,326token: CancellationToken327): Promise<readonly FileChunkWithOptionalEmbedding[] | undefined> {328const text = await raceCancellationError(content.getText(), token);329if (isFalsyOrWhitespace(text)) {330return [];331}332333try {334const makeRequest = async (attempt: number) => {335return logExecTime(this._logService, `ChunksEndpointEmbeddingComputer.fetchChunksRequest(${content.uri}, attempt=${attempt})`, () => this._instantiationService.invokeFunction(postRequest, {336endpointOrUrl: { type: RequestType.Chunks },337secretKey: authToken,338intent: 'copilot-panel',339requestId: '',340body: {341embed: options.computeEmbeddings,342// Only to online set during re-ranking step343qos: options.qos,344content: text,345path: getWorkspaceFileDisplayPath(this._workspaceService, content.uri),346local_hashes: cache ? Array.from(cache.keys()) : [],347language_id: content.githubLanguageId,348embedding_model: embeddingType.id,349} satisfies {350embed: boolean;351qos: string;352content: string;353path: string;354local_hashes: string[];355language_id: number | undefined;356embedding_model: string;357} as any,358additionalHeaders: getGithubMetadataHeaders(telemetryInfo, this._envService),359cancelToken: token,360}));361};362363batchInfo.recomputedFileCount++;364batchInfo.sentContentTextLength += text.length;365366const response = await raceCancellationError(this._requestLimiter.enqueue(makeRequest, token), token);367if (!response.ok) {368this._logService.debug(`Error chunking '${content.uri}'. Status: ${response.status}. Status Text: ${response.statusText}.`);369370/* __GDPR__371"workspaceChunkEmbeddingsIndex.computeChunksAndEmbeddings.error" : {372"owner": "mjbvz",373"comment": "Tracks errors from the chunks service",374"source": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "comment": "Caller of computeChunksAndEmbeddings" },375"responseStatus": { "classification": "SystemMetaData", "purpose": "FeatureInsight", "isMeasurement": true, "comment": "Status code" }376}377*/378this._telemetryService.sendMSFTTelemetryEvent('workspaceChunkEmbeddingsIndex.computeChunksAndEmbeddings.error', {379source: telemetryInfo.toString(),380}, {381responseStatus: response.status,382});383384return undefined;385}386387const body: ChunksEndpointResponse = await response.json();388if (!body.chunks.length) {389return [];390}391392return coalesce(body.chunks.map((chunk): FileChunkWithOptionalEmbedding | undefined => {393const range = new Range(chunk.line_range.start, 0, chunk.line_range.end, 0);394const cached = cache?.get(chunk.hash);395if (cached) {396return {397chunk: {398file: content.uri,399text: stripChunkTextMetadata(cached.chunk.text),400rawText: undefined,401range,402isFullFile: cached.chunk.isFullFile, // TODO: get from endpoint403},404chunkHash: chunk.hash,405embedding: cached.embedding,406};407}408409if (typeof chunk.text !== 'string') {410// Invalid chunk411return undefined;412}413414let embedding: Embedding | undefined;415if (chunk.embedding?.embedding) {416const returnedEmbeddingsType = new EmbeddingType(body.embedding_model);417if (!returnedEmbeddingsType.equals(embeddingType)) {418throw new Error(`Unexpected embedding model. Got: ${returnedEmbeddingsType}. Expected: ${embeddingType}`);419}420421embedding = { type: returnedEmbeddingsType, value: chunk.embedding.embedding };422}423424if (options.computeEmbeddings && !embedding) {425// Invalid chunk426return undefined;427}428429return {430chunk: {431file: content.uri,432text: stripChunkTextMetadata(chunk.text),433rawText: undefined,434range,435isFullFile: false, // TODO: get from endpoint436},437chunkHash: chunk.hash,438embedding: embedding439};440}));441442} catch (e) {443this._logService.error(e);444return undefined;445}446}447}448449450