Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/extensions/copilot/test/base/throttlingChatMLFetcher.ts
13389 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
import type { CancellationToken } from 'vscode';
6
import { AbstractChatMLFetcher } from '../../src/extension/prompt/node/chatMLFetcher';
7
import { IChatMLFetcher, IFetchMLOptions } from '../../src/platform/chat/common/chatMLFetcher';
8
import { ChatFetchResponseType, ChatResponses } from '../../src/platform/chat/common/commonTypes';
9
import { IConversationOptions } from '../../src/platform/chat/common/conversationOptions';
10
import { IThrottledWorkerOptions } from '../../src/util/vs/base/common/async';
11
import { SyncDescriptor } from '../../src/util/vs/platform/instantiation/common/descriptors';
12
import { IInstantiationService } from '../../src/util/vs/platform/instantiation/common/instantiation';
13
import { PausableThrottledWorker } from './pausableThrottledWorker';
14
15
/**
16
* Configures a maximum number of requests to start either per second or per minute
17
*
18
* **NOTE**: The number of requests running in parallel could be higher than this,
19
* this enforces just the maximum number of requests that start.
20
*/
21
export type ThrottlingLimits = Record<string, { limit: number; type: 'RPS' | 'RPM' }>;
22
23
export class ChatModelThrottlingTaskLaunchers {
24
private _throttlers = new Map<string, PausableThrottledWorker<() => Promise<void>>>();
25
private readonly _limits: ThrottlingLimits;
26
private _rateLimitBackoff = new Map<string, Promise<void>>();
27
private _inFlightRequests = new Map<string, Set<Promise<void>>>();
28
29
constructor(limits: ThrottlingLimits) {
30
this._limits = limits;
31
}
32
33
private getInFlightRequests(model: string): Set<Promise<void>> {
34
if (!this._inFlightRequests.has(model)) {
35
this._inFlightRequests.set(model, new Set());
36
}
37
return this._inFlightRequests.get(model)!;
38
}
39
40
getThrottler(model: string): PausableThrottledWorker<() => Promise<void>> {
41
if (!this._throttlers.has(model)) {
42
// If no limit is configured, the default limit is 1 RPS.
43
if (!this._limits[model]) {
44
this._limits[model] = { limit: 1, type: 'RPS' };
45
}
46
const limit = this._limits[model].type === 'RPM' ? this._limits[model].limit : (this._limits[model].limit * 60);
47
const options: IThrottledWorkerOptions = {
48
maxBufferedWork: undefined, // We want to hold as many requests as possible
49
maxWorkChunkSize: 1,
50
waitThrottleDelayBetweenWorkUnits: true,
51
throttleDelay: Math.ceil(60000 / limit)
52
};
53
this._throttlers.set(model, new PausableThrottledWorker(options, async (tasks) => {
54
for (const task of tasks) {
55
await task();
56
}
57
}));
58
}
59
return this._throttlers.get(model)!;
60
}
61
62
isPaused(model: string): boolean {
63
return this._throttlers.get(model)?.isPaused() ?? false;
64
}
65
66
pauseProcessing(model: string): void {
67
this.getThrottler(model).pause();
68
}
69
70
resumeProcessing(model: string): void {
71
this.getThrottler(model).resume();
72
}
73
74
/**
75
* Handles rate limit responses by implementing exponential backoff.
76
* This updated version uses a shared “backoff chain” to ensure that multiple inflight
77
* requests for the same model do not all retry at the same time.
78
*
79
* @param model The chat model that was rate limited
80
* @param baseDelay The base delay in milliseconds (usually from the retryAfter value)
81
* @returns Whether the request should be retried
82
*/
83
async handleRateLimit(model: string, baseDelay: number, retryCount: number): Promise<boolean> {
84
this.pauseProcessing(model);
85
if (retryCount > 3) {
86
return false; // Do not retry after too many attempts.
87
}
88
89
// If any backoff is already in progress for this model, wait for it first.
90
const ongoingBackoff = this._rateLimitBackoff.get(model);
91
if (ongoingBackoff) {
92
await ongoingBackoff;
93
}
94
95
// Calculate exponential backoff delay: 1x, 2x, 3x…
96
const delay = baseDelay * retryCount;
97
98
// Create a new backoff promise and set it as active for this model.
99
const backoffPromise = new Promise<void>(resolve => {
100
setTimeout(resolve, delay);
101
});
102
this._rateLimitBackoff.set(model, backoffPromise);
103
await backoffPromise;
104
this._rateLimitBackoff.delete(model);
105
106
return true; // Indicate we should retry.
107
}
108
109
/**
110
* Execute a request with retry logic for rate limits.
111
* @param model The chat model to use
112
* @param requestFn The function that performs the actual request
113
* @returns The result from the request function
114
*/
115
async executeWithRateLimitHandling(
116
model: string,
117
requestFn: () => Promise<ChatResponses>
118
): Promise<ChatResponses> {
119
let result!: ChatResponses;
120
let continueRetrying = true;
121
const inFlightRequests = this.getInFlightRequests(model);
122
123
const cleanup = () => {
124
inFlightRequests.delete(promise);
125
// Only resume processing if there are no more in-flight requests
126
if (inFlightRequests.size === 0) {
127
this.resumeProcessing(model);
128
}
129
};
130
131
const promise = (async () => {
132
let retryCount = 1;
133
try {
134
while (continueRetrying) {
135
result = await requestFn();
136
if (result.type === ChatFetchResponseType.RateLimited) {
137
// Minimum wait should be 5 seconds
138
result.retryAfter ??= Math.max(5, result.retryAfter || 0);
139
// Convert the retryAfter value in seconds to milliseconds.
140
const retryAfterMs = result.retryAfter * 1000;
141
const shouldRetry = await this.handleRateLimit(model, retryAfterMs, retryCount);
142
if (shouldRetry) {
143
retryCount++;
144
continueRetrying = true;
145
continue;
146
}
147
}
148
// On successful (or non‑rate‑limited) responses:
149
continueRetrying = false;
150
}
151
} finally {
152
cleanup();
153
}
154
})();
155
156
inFlightRequests.add(promise);
157
await promise;
158
return result;
159
}
160
}
161
162
export class ThrottlingChatMLFetcher extends AbstractChatMLFetcher {
163
164
private readonly _fetcher: IChatMLFetcher;
165
166
constructor(
167
fetcherDescriptor: SyncDescriptor<IChatMLFetcher>,
168
private readonly _modelTaskLaunchers: ChatModelThrottlingTaskLaunchers,
169
@IInstantiationService instantiationService: IInstantiationService,
170
@IConversationOptions options: IConversationOptions,
171
) {
172
super(options);
173
this._fetcher = instantiationService.createInstance(fetcherDescriptor);
174
}
175
176
override async fetchMany(opts: IFetchMLOptions, token: CancellationToken): Promise<ChatResponses> {
177
const taskLauncher = this._modelTaskLaunchers.getThrottler(opts.endpoint.model);
178
179
return new Promise<ChatResponses>((resolve, reject) => {
180
taskLauncher.work([async () => {
181
try {
182
const result = await this._modelTaskLaunchers.executeWithRateLimitHandling(opts.endpoint.model, () =>
183
this._fetcher.fetchMany(opts, token)
184
);
185
resolve(result);
186
} catch (error) {
187
reject(error);
188
}
189
}]);
190
});
191
}
192
}
193
194