Path: blob/main/extensions/copilot/test/base/throttlingChatMLFetcher.ts
13389 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/4import type { CancellationToken } from 'vscode';5import { AbstractChatMLFetcher } from '../../src/extension/prompt/node/chatMLFetcher';6import { IChatMLFetcher, IFetchMLOptions } from '../../src/platform/chat/common/chatMLFetcher';7import { ChatFetchResponseType, ChatResponses } from '../../src/platform/chat/common/commonTypes';8import { IConversationOptions } from '../../src/platform/chat/common/conversationOptions';9import { IThrottledWorkerOptions } from '../../src/util/vs/base/common/async';10import { SyncDescriptor } from '../../src/util/vs/platform/instantiation/common/descriptors';11import { IInstantiationService } from '../../src/util/vs/platform/instantiation/common/instantiation';12import { PausableThrottledWorker } from './pausableThrottledWorker';1314/**15* Configures a maximum number of requests to start either per second or per minute16*17* **NOTE**: The number of requests running in parallel could be higher than this,18* this enforces just the maximum number of requests that start.19*/20export type ThrottlingLimits = Record<string, { limit: number; type: 'RPS' | 'RPM' }>;2122export class ChatModelThrottlingTaskLaunchers {23private _throttlers = new Map<string, PausableThrottledWorker<() => Promise<void>>>();24private readonly _limits: ThrottlingLimits;25private _rateLimitBackoff = new Map<string, Promise<void>>();26private _inFlightRequests = new Map<string, Set<Promise<void>>>();2728constructor(limits: ThrottlingLimits) {29this._limits = limits;30}3132private getInFlightRequests(model: string): Set<Promise<void>> {33if (!this._inFlightRequests.has(model)) {34this._inFlightRequests.set(model, new Set());35}36return this._inFlightRequests.get(model)!;37}3839getThrottler(model: string): PausableThrottledWorker<() => Promise<void>> {40if (!this._throttlers.has(model)) {41// If no limit is configured, the default limit is 1 RPS.42if (!this._limits[model]) {43this._limits[model] = { limit: 1, type: 'RPS' };44}45const limit = this._limits[model].type === 'RPM' ? this._limits[model].limit : (this._limits[model].limit * 60);46const options: IThrottledWorkerOptions = {47maxBufferedWork: undefined, // We want to hold as many requests as possible48maxWorkChunkSize: 1,49waitThrottleDelayBetweenWorkUnits: true,50throttleDelay: Math.ceil(60000 / limit)51};52this._throttlers.set(model, new PausableThrottledWorker(options, async (tasks) => {53for (const task of tasks) {54await task();55}56}));57}58return this._throttlers.get(model)!;59}6061isPaused(model: string): boolean {62return this._throttlers.get(model)?.isPaused() ?? false;63}6465pauseProcessing(model: string): void {66this.getThrottler(model).pause();67}6869resumeProcessing(model: string): void {70this.getThrottler(model).resume();71}7273/**74* Handles rate limit responses by implementing exponential backoff.75* This updated version uses a shared “backoff chain” to ensure that multiple inflight76* requests for the same model do not all retry at the same time.77*78* @param model The chat model that was rate limited79* @param baseDelay The base delay in milliseconds (usually from the retryAfter value)80* @returns Whether the request should be retried81*/82async handleRateLimit(model: string, baseDelay: number, retryCount: number): Promise<boolean> {83this.pauseProcessing(model);84if (retryCount > 3) {85return false; // Do not retry after too many attempts.86}8788// If any backoff is already in progress for this model, wait for it first.89const ongoingBackoff = this._rateLimitBackoff.get(model);90if (ongoingBackoff) {91await ongoingBackoff;92}9394// Calculate exponential backoff delay: 1x, 2x, 3x…95const delay = baseDelay * retryCount;9697// Create a new backoff promise and set it as active for this model.98const backoffPromise = new Promise<void>(resolve => {99setTimeout(resolve, delay);100});101this._rateLimitBackoff.set(model, backoffPromise);102await backoffPromise;103this._rateLimitBackoff.delete(model);104105return true; // Indicate we should retry.106}107108/**109* Execute a request with retry logic for rate limits.110* @param model The chat model to use111* @param requestFn The function that performs the actual request112* @returns The result from the request function113*/114async executeWithRateLimitHandling(115model: string,116requestFn: () => Promise<ChatResponses>117): Promise<ChatResponses> {118let result!: ChatResponses;119let continueRetrying = true;120const inFlightRequests = this.getInFlightRequests(model);121122const cleanup = () => {123inFlightRequests.delete(promise);124// Only resume processing if there are no more in-flight requests125if (inFlightRequests.size === 0) {126this.resumeProcessing(model);127}128};129130const promise = (async () => {131let retryCount = 1;132try {133while (continueRetrying) {134result = await requestFn();135if (result.type === ChatFetchResponseType.RateLimited) {136// Minimum wait should be 5 seconds137result.retryAfter ??= Math.max(5, result.retryAfter || 0);138// Convert the retryAfter value in seconds to milliseconds.139const retryAfterMs = result.retryAfter * 1000;140const shouldRetry = await this.handleRateLimit(model, retryAfterMs, retryCount);141if (shouldRetry) {142retryCount++;143continueRetrying = true;144continue;145}146}147// On successful (or non‑rate‑limited) responses:148continueRetrying = false;149}150} finally {151cleanup();152}153})();154155inFlightRequests.add(promise);156await promise;157return result;158}159}160161export class ThrottlingChatMLFetcher extends AbstractChatMLFetcher {162163private readonly _fetcher: IChatMLFetcher;164165constructor(166fetcherDescriptor: SyncDescriptor<IChatMLFetcher>,167private readonly _modelTaskLaunchers: ChatModelThrottlingTaskLaunchers,168@IInstantiationService instantiationService: IInstantiationService,169@IConversationOptions options: IConversationOptions,170) {171super(options);172this._fetcher = instantiationService.createInstance(fetcherDescriptor);173}174175override async fetchMany(opts: IFetchMLOptions, token: CancellationToken): Promise<ChatResponses> {176const taskLauncher = this._modelTaskLaunchers.getThrottler(opts.endpoint.model);177178return new Promise<ChatResponses>((resolve, reject) => {179taskLauncher.work([async () => {180try {181const result = await this._modelTaskLaunchers.executeWithRateLimitHandling(opts.endpoint.model, () =>182this._fetcher.fetchMany(opts, token)183);184resolve(result);185} catch (error) {186reject(error);187}188}]);189});190}191}192193194