Path: blob/main/extensions/copilot/src/util/common/async.ts
13397 views
/*---------------------------------------------------------------------------------------------1* Copyright (c) Microsoft Corporation. All rights reserved.2* Licensed under the MIT License. See License.txt in the project root for license information.3*--------------------------------------------------------------------------------------------*/45import { DeferredPromise } from '../vs/base/common/async';6import { BugIndicatingError, CancellationError } from '../vs/base/common/errors';78export type Task<T = void> = () => (Promise<T> | T);910/**11* Processes tasks in the order they were scheduled.12*/13export class TaskQueue {14private _runningTask: Task<any> | undefined = undefined;15private _pendingTasks: { task: Task<any>; deferred: DeferredPromise<any>; setUndefinedWhenCleared: boolean }[] = [];1617/**18* Waits for the current and pending tasks to finish, then runs and awaits the given task.19* If the task is skipped because of clearPending, the promise is rejected with a CancellationError.20*/21public schedule<T>(task: Task<T>): Promise<T> {22const deferred = new DeferredPromise<T>();23this._pendingTasks.push({ task, deferred, setUndefinedWhenCleared: false });24this._runIfNotRunning();25return deferred.p;26}2728/**29* Waits for the current and pending tasks to finish, then runs and awaits the given task.30* If the task is skipped because of clearPending, the promise is resolved with undefined.31*/32public scheduleSkipIfCleared<T>(task: Task<T>): Promise<T | undefined> {33const deferred = new DeferredPromise<T>();34this._pendingTasks.push({ task, deferred, setUndefinedWhenCleared: true });35this._runIfNotRunning();36return deferred.p;37}3839private _runIfNotRunning(): void {40if (this._runningTask === undefined) {41this._processQueue();42}43}4445private async _processQueue(): Promise<void> {46if (this._pendingTasks.length === 0) {47return;48}4950const next = this._pendingTasks.shift();51if (!next) {52return;53}5455if (this._runningTask) {56throw new BugIndicatingError();57}5859this._runningTask = next.task;6061try {62const result = await next.task();63next.deferred.complete(result);64} catch (e) {65next.deferred.error(e);66} finally {67this._runningTask = undefined;68this._processQueue();69}70}7172/**73* Clears all pending tasks. Does not cancel the currently running task.74*/75public clearPending(): void {76const tasks = this._pendingTasks;77this._pendingTasks = [];78for (const task of tasks) {79if (task.setUndefinedWhenCleared) {80task.deferred.complete(undefined);81} else {82task.deferred.error(new CancellationError());83}84}85}86}8788export class BatchedProcessor<TArg, TResult> {89private _queue: { arg: TArg; promise: DeferredPromise<TResult> }[] = [];90private _timeout: any | null = null;9192constructor(93private readonly _fn: (args: TArg[]) => Promise<TResult[]>,94private readonly _waitingTimeMs: number95) { }9697request(arg: TArg): Promise<TResult> {98if (this._timeout === null) {99this._timeout = setTimeout(() => this._flush(), this._waitingTimeMs);100}101102const p = new DeferredPromise<TResult>();103this._queue.push({ arg, promise: p });104return p.p;105}106107private async _flush() {108const queue = this._queue;109this._queue = [];110this._timeout = null;111112const args = queue.map(e => e.arg);113114let results: TResult[];115try {116results = await this._fn(args);117} catch (e) {118for (const entry of queue) {119entry.promise.error(e);120}121return;122}123124for (const [i, result] of results.entries()) {125queue[i].promise.complete(result);126}127}128}129130export function raceFilter<T>(promises: Promise<T>[], filter: (result: T) => boolean): Promise<T | undefined> {131return new Promise((resolve, reject) => {132if (promises.length === 0) {133resolve(undefined);134return;135}136137let resolved = false;138let unresolvedCount = promises.length;139for (const promise of promises) {140promise.then(result => {141unresolvedCount--;142if (!resolved) {143if (filter(result)) {144resolved = true;145resolve(result);146} else if (unresolvedCount === 0) {147// Last one has to resolve the promise148resolve(undefined);149}150}151}).catch(reject);152}153});154}155156157