Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/extensions/copilot/src/util/common/async.ts
13397 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { DeferredPromise } from '../vs/base/common/async';
7
import { BugIndicatingError, CancellationError } from '../vs/base/common/errors';
8
9
export type Task<T = void> = () => (Promise<T> | T);
10
11
/**
12
* Processes tasks in the order they were scheduled.
13
*/
14
export class TaskQueue {
15
private _runningTask: Task<any> | undefined = undefined;
16
private _pendingTasks: { task: Task<any>; deferred: DeferredPromise<any>; setUndefinedWhenCleared: boolean }[] = [];
17
18
/**
19
* Waits for the current and pending tasks to finish, then runs and awaits the given task.
20
* If the task is skipped because of clearPending, the promise is rejected with a CancellationError.
21
*/
22
public schedule<T>(task: Task<T>): Promise<T> {
23
const deferred = new DeferredPromise<T>();
24
this._pendingTasks.push({ task, deferred, setUndefinedWhenCleared: false });
25
this._runIfNotRunning();
26
return deferred.p;
27
}
28
29
/**
30
* Waits for the current and pending tasks to finish, then runs and awaits the given task.
31
* If the task is skipped because of clearPending, the promise is resolved with undefined.
32
*/
33
public scheduleSkipIfCleared<T>(task: Task<T>): Promise<T | undefined> {
34
const deferred = new DeferredPromise<T>();
35
this._pendingTasks.push({ task, deferred, setUndefinedWhenCleared: true });
36
this._runIfNotRunning();
37
return deferred.p;
38
}
39
40
private _runIfNotRunning(): void {
41
if (this._runningTask === undefined) {
42
this._processQueue();
43
}
44
}
45
46
private async _processQueue(): Promise<void> {
47
if (this._pendingTasks.length === 0) {
48
return;
49
}
50
51
const next = this._pendingTasks.shift();
52
if (!next) {
53
return;
54
}
55
56
if (this._runningTask) {
57
throw new BugIndicatingError();
58
}
59
60
this._runningTask = next.task;
61
62
try {
63
const result = await next.task();
64
next.deferred.complete(result);
65
} catch (e) {
66
next.deferred.error(e);
67
} finally {
68
this._runningTask = undefined;
69
this._processQueue();
70
}
71
}
72
73
/**
74
* Clears all pending tasks. Does not cancel the currently running task.
75
*/
76
public clearPending(): void {
77
const tasks = this._pendingTasks;
78
this._pendingTasks = [];
79
for (const task of tasks) {
80
if (task.setUndefinedWhenCleared) {
81
task.deferred.complete(undefined);
82
} else {
83
task.deferred.error(new CancellationError());
84
}
85
}
86
}
87
}
88
89
export class BatchedProcessor<TArg, TResult> {
90
private _queue: { arg: TArg; promise: DeferredPromise<TResult> }[] = [];
91
private _timeout: any | null = null;
92
93
constructor(
94
private readonly _fn: (args: TArg[]) => Promise<TResult[]>,
95
private readonly _waitingTimeMs: number
96
) { }
97
98
request(arg: TArg): Promise<TResult> {
99
if (this._timeout === null) {
100
this._timeout = setTimeout(() => this._flush(), this._waitingTimeMs);
101
}
102
103
const p = new DeferredPromise<TResult>();
104
this._queue.push({ arg, promise: p });
105
return p.p;
106
}
107
108
private async _flush() {
109
const queue = this._queue;
110
this._queue = [];
111
this._timeout = null;
112
113
const args = queue.map(e => e.arg);
114
115
let results: TResult[];
116
try {
117
results = await this._fn(args);
118
} catch (e) {
119
for (const entry of queue) {
120
entry.promise.error(e);
121
}
122
return;
123
}
124
125
for (const [i, result] of results.entries()) {
126
queue[i].promise.complete(result);
127
}
128
}
129
}
130
131
export function raceFilter<T>(promises: Promise<T>[], filter: (result: T) => boolean): Promise<T | undefined> {
132
return new Promise((resolve, reject) => {
133
if (promises.length === 0) {
134
resolve(undefined);
135
return;
136
}
137
138
let resolved = false;
139
let unresolvedCount = promises.length;
140
for (const promise of promises) {
141
promise.then(result => {
142
unresolvedCount--;
143
if (!resolved) {
144
if (filter(result)) {
145
resolved = true;
146
resolve(result);
147
} else if (unresolvedCount === 0) {
148
// Last one has to resolve the promise
149
resolve(undefined);
150
}
151
}
152
}).catch(reject);
153
}
154
});
155
}
156
157