Path: blob/main/extensions/copilot/src/util/vs/base/common/async.ts
13405 views
//!!! DO NOT modify, this file was COPIED from 'microsoft/vscode'12/*---------------------------------------------------------------------------------------------3* Copyright (c) Microsoft Corporation. All rights reserved.4* Licensed under the MIT License. See License.txt in the project root for license information.5*--------------------------------------------------------------------------------------------*/67import { CancellationToken, CancellationTokenSource } from './cancellation';8import { BugIndicatingError, CancellationError } from './errors';9import { Emitter, Event } from './event';10import { Disposable, DisposableMap, DisposableStore, IDisposable, isDisposable, MutableDisposable, toDisposable } from './lifecycle';11import { extUri as defaultExtUri, IExtUri } from './resources';12import { URI } from './uri';13import { setTimeout0 } from './platform';14import { MicrotaskDelay } from './symbols';15import { Lazy } from './lazy';1617export function isThenable<T>(obj: unknown): obj is Promise<T> {18return !!obj && typeof (obj as unknown as Promise<T>).then === 'function';19}2021export interface CancelablePromise<T> extends Promise<T> {22cancel(): void;23}2425/**26* Returns a promise that can be cancelled using the provided cancellation token.27*28* @remarks When cancellation is requested, the promise will be rejected with a {@link CancellationError}.29* If the promise resolves to a disposable object, it will be automatically disposed when cancellation30* is requested.31*32* @param callback A function that accepts a cancellation token and returns a promise33* @returns A promise that can be cancelled34*/35export function createCancelablePromise<T>(callback: (token: CancellationToken) => Promise<T>): CancelablePromise<T> {36const source = new CancellationTokenSource();3738const thenable = callback(source.token);3940let isCancelled = false;4142const promise = new Promise<T>((resolve, reject) => {43const subscription = source.token.onCancellationRequested(() => {44isCancelled = true;45subscription.dispose();46reject(new CancellationError());47});48Promise.resolve(thenable).then(value => {49subscription.dispose();50source.dispose();5152if (!isCancelled) {53resolve(value);5455} else if (isDisposable(value)) {56// promise has been cancelled, result is disposable and will57// be cleaned up58value.dispose();59}60}, err => {61subscription.dispose();62source.dispose();63reject(err);64});65});6667return <CancelablePromise<T>>new class {68cancel() {69source.cancel();70source.dispose();71}72then<TResult1 = T, TResult2 = never>(resolve?: ((value: T) => TResult1 | Promise<TResult1>) | undefined | null, reject?: ((reason: unknown) => TResult2 | Promise<TResult2>) | undefined | null): Promise<TResult1 | TResult2> {73return promise.then(resolve, reject);74}75catch<TResult = never>(reject?: ((reason: unknown) => TResult | Promise<TResult>) | undefined | null): Promise<T | TResult> {76return this.then(undefined, reject);77}78finally(onfinally?: (() => void) | undefined | null): Promise<T> {79return promise.finally(onfinally);80}81};82}8384/**85* Returns a promise that resolves with `undefined` as soon as the passed token is cancelled.86* @see {@link raceCancellationError}87*/88export function raceCancellation<T>(promise: Promise<T>, token: CancellationToken): Promise<T | undefined>;8990/**91* Returns a promise that resolves with `defaultValue` as soon as the passed token is cancelled.92* @see {@link raceCancellationError}93*/94export function raceCancellation<T>(promise: Promise<T>, token: CancellationToken, defaultValue: T): Promise<T>;9596export function raceCancellation<T>(promise: Promise<T>, token: CancellationToken, defaultValue?: T): Promise<T | undefined> {97return new Promise((resolve, reject) => {98const ref = token.onCancellationRequested(() => {99ref.dispose();100resolve(defaultValue);101});102promise.then(resolve, reject).finally(() => ref.dispose());103});104}105106/**107* Returns a promise that rejects with an {@CancellationError} as soon as the passed token is cancelled.108* @see {@link raceCancellation}109*/110export function raceCancellationError<T>(promise: Promise<T>, token: CancellationToken): Promise<T> {111return new Promise((resolve, reject) => {112const ref = token.onCancellationRequested(() => {113ref.dispose();114reject(new CancellationError());115});116promise.then(resolve, reject).finally(() => ref.dispose());117});118}119120/**121* Wraps a cancellable promise such that it is no cancellable. Can be used to122* avoid issues with shared promises that would normally be returned as123* cancellable to consumers.124*/125export function notCancellablePromise<T>(promise: CancelablePromise<T>): Promise<T> {126return new Promise<T>((resolve, reject) => {127promise.then(resolve, reject);128});129}130131/**132* Returns as soon as one of the promises resolves or rejects and cancels remaining promises133*/134export function raceCancellablePromises<T>(cancellablePromises: (CancelablePromise<T> | Promise<T>)[]): CancelablePromise<T> {135let resolvedPromiseIndex = -1;136const promises = cancellablePromises.map((promise, index) => promise.then(result => { resolvedPromiseIndex = index; return result; }));137const promise = Promise.race(promises) as CancelablePromise<T>;138promise.cancel = () => {139cancellablePromises.forEach((cancellablePromise, index) => {140if (index !== resolvedPromiseIndex && (cancellablePromise as CancelablePromise<T>).cancel) {141(cancellablePromise as CancelablePromise<T>).cancel();142}143});144};145promise.finally(() => {146promise.cancel();147});148return promise;149}150151export function raceTimeout<T>(promise: Promise<T>, timeout: number, onTimeout?: () => void): Promise<T | undefined> {152let promiseResolve: ((value: T | undefined) => void) | undefined = undefined;153154const timer = setTimeout(() => {155promiseResolve?.(undefined);156onTimeout?.();157}, timeout);158159return Promise.race([160promise.finally(() => clearTimeout(timer)),161new Promise<T | undefined>(resolve => promiseResolve = resolve)162]);163}164165export function asPromise<T>(callback: () => T | Thenable<T>): Promise<T> {166return new Promise<T>((resolve, reject) => {167const item = callback();168if (isThenable<T>(item)) {169item.then(resolve, reject);170} else {171resolve(item);172}173});174}175176/**177* Creates and returns a new promise, plus its `resolve` and `reject` callbacks.178*179* Replace with standardized [`Promise.withResolvers`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/withResolvers) once it is supported180*/181export function promiseWithResolvers<T>(): { promise: Promise<T>; resolve: (value: T | PromiseLike<T>) => void; reject: (err?: any) => void } {182let resolve: (value: T | PromiseLike<T>) => void;183let reject: (reason?: any) => void;184const promise = new Promise<T>((res, rej) => {185resolve = res;186reject = rej;187});188return { promise, resolve: resolve!, reject: reject! };189}190191export interface ITask<T> {192(): T;193}194195export interface ICancellableTask<T> {196(token: CancellationToken): T;197}198199/**200* A helper to prevent accumulation of sequential async tasks.201*202* Imagine a mail man with the sole task of delivering letters. As soon as203* a letter submitted for delivery, he drives to the destination, delivers it204* and returns to his base. Imagine that during the trip, N more letters were submitted.205* When the mail man returns, he picks those N letters and delivers them all in a206* single trip. Even though N+1 submissions occurred, only 2 deliveries were made.207*208* The throttler implements this via the queue() method, by providing it a task209* factory. Following the example:210*211* const throttler = new Throttler();212* const letters = [];213*214* function deliver() {215* const lettersToDeliver = letters;216* letters = [];217* return makeTheTrip(lettersToDeliver);218* }219*220* function onLetterReceived(l) {221* letters.push(l);222* throttler.queue(deliver);223* }224*/225export class Throttler implements IDisposable {226227private activePromise: Promise<any> | null;228private queuedPromise: Promise<any> | null;229private queuedPromiseFactory: ICancellableTask<Promise<any>> | null;230private cancellationTokenSource: CancellationTokenSource;231232constructor() {233this.activePromise = null;234this.queuedPromise = null;235this.queuedPromiseFactory = null;236237this.cancellationTokenSource = new CancellationTokenSource();238}239240queue<T>(promiseFactory: ICancellableTask<Promise<T>>): Promise<T> {241if (this.cancellationTokenSource.token.isCancellationRequested) {242return Promise.reject(new Error('Throttler is disposed'));243}244245if (this.activePromise) {246this.queuedPromiseFactory = promiseFactory;247248if (!this.queuedPromise) {249const onComplete = () => {250this.queuedPromise = null;251252if (this.cancellationTokenSource.token.isCancellationRequested) {253return;254}255256const result = this.queue(this.queuedPromiseFactory!);257this.queuedPromiseFactory = null;258259return result;260};261262this.queuedPromise = new Promise(resolve => {263this.activePromise!.then(onComplete, onComplete).then(resolve);264});265}266267return new Promise((resolve, reject) => {268this.queuedPromise!.then(resolve, reject);269});270}271272this.activePromise = promiseFactory(this.cancellationTokenSource.token);273274return new Promise((resolve, reject) => {275this.activePromise!.then((result: T) => {276this.activePromise = null;277resolve(result);278}, (err: unknown) => {279this.activePromise = null;280reject(err);281});282});283}284285dispose(): void {286this.cancellationTokenSource.cancel();287}288}289290export class Sequencer {291292private current: Promise<unknown> = Promise.resolve(null);293294queue<T>(promiseTask: ITask<Promise<T>>): Promise<T> {295return this.current = this.current.then(() => promiseTask(), () => promiseTask());296}297}298299export class SequencerByKey<TKey> {300301private promiseMap = new Map<TKey, Promise<unknown>>();302303queue<T>(key: TKey, promiseTask: ITask<Promise<T>>): Promise<T> {304const runningPromise = this.promiseMap.get(key) ?? Promise.resolve();305const newPromise = runningPromise306.catch(() => { })307.then(promiseTask)308.finally(() => {309if (this.promiseMap.get(key) === newPromise) {310this.promiseMap.delete(key);311}312});313this.promiseMap.set(key, newPromise);314return newPromise;315}316317peek(key: TKey): Promise<unknown> | undefined {318return this.promiseMap.get(key) || undefined;319}320321keys(): IterableIterator<TKey> {322return this.promiseMap.keys();323}324}325326interface IScheduledLater extends IDisposable {327isTriggered(): boolean;328}329330const timeoutDeferred = (timeout: number, fn: () => void): IScheduledLater => {331let scheduled = true;332const handle = setTimeout(() => {333scheduled = false;334fn();335}, timeout);336return {337isTriggered: () => scheduled,338dispose: () => {339clearTimeout(handle);340scheduled = false;341},342};343};344345const microtaskDeferred = (fn: () => void): IScheduledLater => {346let scheduled = true;347queueMicrotask(() => {348if (scheduled) {349scheduled = false;350fn();351}352});353354return {355isTriggered: () => scheduled,356dispose: () => { scheduled = false; },357};358};359360/**361* A helper to delay (debounce) execution of a task that is being requested often.362*363* Following the throttler, now imagine the mail man wants to optimize the number of364* trips proactively. The trip itself can be long, so he decides not to make the trip365* as soon as a letter is submitted. Instead he waits a while, in case more366* letters are submitted. After said waiting period, if no letters were submitted, he367* decides to make the trip. Imagine that N more letters were submitted after the first368* one, all within a short period of time between each other. Even though N+1369* submissions occurred, only 1 delivery was made.370*371* The delayer offers this behavior via the trigger() method, into which both the task372* to be executed and the waiting period (delay) must be passed in as arguments. Following373* the example:374*375* const delayer = new Delayer(WAITING_PERIOD);376* const letters = [];377*378* function letterReceived(l) {379* letters.push(l);380* delayer.trigger(() => { return makeTheTrip(); });381* }382*/383export class Delayer<T> implements IDisposable {384385private deferred: IScheduledLater | null;386private completionPromise: Promise<any> | null;387private doResolve: ((value?: any | Promise<any>) => void) | null;388private doReject: ((err: unknown) => void) | null;389private task: ITask<T | Promise<T>> | null;390391constructor(public defaultDelay: number | typeof MicrotaskDelay) {392this.deferred = null;393this.completionPromise = null;394this.doResolve = null;395this.doReject = null;396this.task = null;397}398399trigger(task: ITask<T | Promise<T>>, delay = this.defaultDelay): Promise<T> {400this.task = task;401this.cancelTimeout();402403if (!this.completionPromise) {404this.completionPromise = new Promise((resolve, reject) => {405this.doResolve = resolve;406this.doReject = reject;407}).then(() => {408this.completionPromise = null;409this.doResolve = null;410if (this.task) {411const task = this.task;412this.task = null;413return task();414}415return undefined;416});417}418419const fn = () => {420this.deferred = null;421this.doResolve?.(null);422};423424this.deferred = delay === MicrotaskDelay ? microtaskDeferred(fn) : timeoutDeferred(delay, fn);425426return this.completionPromise;427}428429isTriggered(): boolean {430return !!this.deferred?.isTriggered();431}432433cancel(): void {434this.cancelTimeout();435436if (this.completionPromise) {437this.doReject?.(new CancellationError());438this.completionPromise = null;439}440}441442private cancelTimeout(): void {443this.deferred?.dispose();444this.deferred = null;445}446447dispose(): void {448this.cancel();449}450}451452/**453* A helper to delay execution of a task that is being requested often, while454* preventing accumulation of consecutive executions, while the task runs.455*456* The mail man is clever and waits for a certain amount of time, before going457* out to deliver letters. While the mail man is going out, more letters arrive458* and can only be delivered once he is back. Once he is back the mail man will459* do one more trip to deliver the letters that have accumulated while he was out.460*/461export class ThrottledDelayer<T> {462463private delayer: Delayer<Promise<T>>;464private throttler: Throttler;465466constructor(defaultDelay: number) {467this.delayer = new Delayer(defaultDelay);468this.throttler = new Throttler();469}470471trigger(promiseFactory: ICancellableTask<Promise<T>>, delay?: number): Promise<T> {472return this.delayer.trigger(() => this.throttler.queue(promiseFactory), delay) as unknown as Promise<T>;473}474475isTriggered(): boolean {476return this.delayer.isTriggered();477}478479cancel(): void {480this.delayer.cancel();481}482483dispose(): void {484this.delayer.dispose();485this.throttler.dispose();486}487}488489/**490* A barrier that is initially closed and then becomes opened permanently.491*/492export class Barrier {493private _isOpen: boolean;494private _promise: Promise<boolean>;495private _completePromise!: (v: boolean) => void;496497constructor() {498this._isOpen = false;499this._promise = new Promise<boolean>((c, e) => {500this._completePromise = c;501});502}503504isOpen(): boolean {505return this._isOpen;506}507508open(): void {509this._isOpen = true;510this._completePromise(true);511}512513wait(): Promise<boolean> {514return this._promise;515}516}517518/**519* A barrier that is initially closed and then becomes opened permanently after a certain period of520* time or when open is called explicitly521*/522export class AutoOpenBarrier extends Barrier {523524private readonly _timeout: Timeout;525526constructor(autoOpenTimeMs: number) {527super();528this._timeout = setTimeout(() => this.open(), autoOpenTimeMs);529}530531override open(): void {532clearTimeout(this._timeout);533super.open();534}535}536537export function timeout(millis: number): CancelablePromise<void>;538export function timeout(millis: number, token: CancellationToken): Promise<void>;539export function timeout(millis: number, token?: CancellationToken): CancelablePromise<void> | Promise<void> {540if (!token) {541return createCancelablePromise(token => timeout(millis, token));542}543544return new Promise((resolve, reject) => {545const handle = setTimeout(() => {546disposable.dispose();547resolve();548}, millis);549const disposable = token.onCancellationRequested(() => {550clearTimeout(handle);551disposable.dispose();552reject(new CancellationError());553});554});555}556557/**558* Creates a timeout that can be disposed using its returned value.559* @param handler The timeout handler.560* @param timeout An optional timeout in milliseconds.561* @param store An optional {@link DisposableStore} that will have the timeout disposable managed automatically.562*563* @example564* const store = new DisposableStore;565* // Call the timeout after 1000ms at which point it will be automatically566* // evicted from the store.567* const timeoutDisposable = disposableTimeout(() => {}, 1000, store);568*569* if (foo) {570* // Cancel the timeout and evict it from store.571* timeoutDisposable.dispose();572* }573*/574export function disposableTimeout(handler: () => void, timeout = 0, store?: DisposableStore): IDisposable {575const timer = setTimeout(() => {576handler();577if (store) {578disposable.dispose();579}580}, timeout);581const disposable = toDisposable(() => {582clearTimeout(timer);583store?.delete(disposable);584});585store?.add(disposable);586return disposable;587}588589/**590* Runs the provided list of promise factories in sequential order. The returned591* promise will complete to an array of results from each promise.592*/593594export function sequence<T>(promiseFactories: ITask<Promise<T>>[]): Promise<T[]> {595const results: T[] = [];596let index = 0;597const len = promiseFactories.length;598599function next(): Promise<T> | null {600return index < len ? promiseFactories[index++]() : null;601}602603function thenHandler(result: unknown): Promise<any> {604if (result !== undefined && result !== null) {605results.push(result as T);606}607608const n = next();609if (n) {610return n.then(thenHandler);611}612613return Promise.resolve(results);614}615616return Promise.resolve(null).then(thenHandler);617}618619export function first<T>(promiseFactories: ITask<Promise<T>>[], shouldStop: (t: T) => boolean = t => !!t, defaultValue: T | null = null): Promise<T | null> {620let index = 0;621const len = promiseFactories.length;622623const loop: () => Promise<T | null> = () => {624if (index >= len) {625return Promise.resolve(defaultValue);626}627628const factory = promiseFactories[index++];629const promise = Promise.resolve(factory());630631return promise.then(result => {632if (shouldStop(result)) {633return Promise.resolve(result);634}635636return loop();637});638};639640return loop();641}642643/**644* Returns the result of the first promise that matches the "shouldStop",645* running all promises in parallel. Supports cancelable promises.646*/647export function firstParallel<T>(promiseList: Promise<T>[], shouldStop?: (t: T) => boolean, defaultValue?: T | null): Promise<T | null>;648export function firstParallel<T, R extends T>(promiseList: Promise<T>[], shouldStop: (t: T) => t is R, defaultValue?: R | null): Promise<R | null>;649export function firstParallel<T>(promiseList: Promise<T>[], shouldStop: (t: T) => boolean = t => !!t, defaultValue: T | null = null) {650if (promiseList.length === 0) {651return Promise.resolve(defaultValue);652}653654let todo = promiseList.length;655const finish = () => {656todo = -1;657for (const promise of promiseList) {658(promise as Partial<CancelablePromise<T>>).cancel?.();659}660};661662return new Promise<T | null>((resolve, reject) => {663for (const promise of promiseList) {664promise.then(result => {665if (--todo >= 0 && shouldStop(result)) {666finish();667resolve(result);668} else if (todo === 0) {669resolve(defaultValue);670}671})672.catch(err => {673if (--todo >= 0) {674finish();675reject(err);676}677});678}679});680}681682interface ILimitedTaskFactory<T> {683factory: ITask<Promise<T>>;684c: (value: T | Promise<T>) => void;685e: (error?: unknown) => void;686}687688export interface ILimiter<T> {689690readonly size: number;691692queue(factory: ITask<Promise<T>>): Promise<T>;693694clear(): void;695}696697/**698* A helper to queue N promises and run them all with a max degree of parallelism. The helper699* ensures that at any time no more than M promises are running at the same time.700*/701export class Limiter<T> implements ILimiter<T> {702703private _size = 0;704private _isDisposed = false;705private runningPromises: number;706private readonly maxDegreeOfParalellism: number;707private readonly outstandingPromises: ILimitedTaskFactory<T>[];708private readonly _onDrained: Emitter<void>;709710constructor(maxDegreeOfParalellism: number) {711this.maxDegreeOfParalellism = maxDegreeOfParalellism;712this.outstandingPromises = [];713this.runningPromises = 0;714this._onDrained = new Emitter<void>();715}716717/**718*719* @returns A promise that resolved when all work is done (onDrained) or when720* there is nothing to do721*/722whenIdle(): Promise<void> {723return this.size > 0724? Event.toPromise(this.onDrained)725: Promise.resolve();726}727728get onDrained(): Event<void> {729return this._onDrained.event;730}731732get size(): number {733return this._size;734}735736queue(factory: ITask<Promise<T>>): Promise<T> {737if (this._isDisposed) {738throw new Error('Object has been disposed');739}740this._size++;741742return new Promise<T>((c, e) => {743this.outstandingPromises.push({ factory, c, e });744this.consume();745});746}747748private consume(): void {749while (this.outstandingPromises.length && this.runningPromises < this.maxDegreeOfParalellism) {750const iLimitedTask = this.outstandingPromises.shift()!;751this.runningPromises++;752753const promise = iLimitedTask.factory();754promise.then(iLimitedTask.c, iLimitedTask.e);755promise.then(() => this.consumed(), () => this.consumed());756}757}758759private consumed(): void {760if (this._isDisposed) {761return;762}763this.runningPromises--;764if (--this._size === 0) {765this._onDrained.fire();766}767768if (this.outstandingPromises.length > 0) {769this.consume();770}771}772773clear(): void {774if (this._isDisposed) {775throw new Error('Object has been disposed');776}777this.outstandingPromises.length = 0;778this._size = this.runningPromises;779}780781dispose(): void {782this._isDisposed = true;783this.outstandingPromises.length = 0; // stop further processing784this._size = 0;785this._onDrained.dispose();786}787}788789/**790* A queue is handles one promise at a time and guarantees that at any time only one promise is executing.791*/792export class Queue<T> extends Limiter<T> {793794constructor() {795super(1);796}797}798799/**800* Same as `Queue`, ensures that only 1 task is executed at the same time. The difference to `Queue` is that801* there is only 1 task about to be scheduled next. As such, calling `queue` while a task is executing will802* replace the currently queued task until it executes.803*804* As such, the returned promise may not be from the factory that is passed in but from the next factory that805* is running after having called `queue`.806*/807export class LimitedQueue {808809private readonly sequentializer = new TaskSequentializer();810811private tasks = 0;812813queue(factory: ITask<Promise<void>>): Promise<void> {814if (!this.sequentializer.isRunning()) {815return this.sequentializer.run(this.tasks++, factory());816}817818return this.sequentializer.queue(() => {819return this.sequentializer.run(this.tasks++, factory());820});821}822}823824/**825* A helper to organize queues per resource. The ResourceQueue makes sure to manage queues per resource826* by disposing them once the queue is empty.827*/828export class ResourceQueue implements IDisposable {829830private readonly queues = new Map<string, Queue<void>>();831832private readonly drainers = new Set<DeferredPromise<void>>();833834private drainListeners: DisposableMap<number> | undefined = undefined;835private drainListenerCount = 0;836837async whenDrained(): Promise<void> {838if (this.isDrained()) {839return;840}841842const promise = new DeferredPromise<void>();843this.drainers.add(promise);844845return promise.p;846}847848private isDrained(): boolean {849for (const [, queue] of this.queues) {850if (queue.size > 0) {851return false;852}853}854855return true;856}857858queueSize(resource: URI, extUri: IExtUri = defaultExtUri): number {859const key = extUri.getComparisonKey(resource);860861return this.queues.get(key)?.size ?? 0;862}863864queueFor(resource: URI, factory: ITask<Promise<void>>, extUri: IExtUri = defaultExtUri): Promise<void> {865const key = extUri.getComparisonKey(resource);866867let queue = this.queues.get(key);868if (!queue) {869queue = new Queue<void>();870const drainListenerId = this.drainListenerCount++;871const drainListener = Event.once(queue.onDrained)(() => {872queue?.dispose();873this.queues.delete(key);874this.onDidQueueDrain();875876this.drainListeners?.deleteAndDispose(drainListenerId);877878if (this.drainListeners?.size === 0) {879this.drainListeners.dispose();880this.drainListeners = undefined;881}882});883884if (!this.drainListeners) {885this.drainListeners = new DisposableMap();886}887this.drainListeners.set(drainListenerId, drainListener);888889this.queues.set(key, queue);890}891892return queue.queue(factory);893}894895private onDidQueueDrain(): void {896if (!this.isDrained()) {897return; // not done yet898}899900this.releaseDrainers();901}902903private releaseDrainers(): void {904for (const drainer of this.drainers) {905drainer.complete();906}907908this.drainers.clear();909}910911dispose(): void {912for (const [, queue] of this.queues) {913queue.dispose();914}915916this.queues.clear();917918// Even though we might still have pending919// tasks queued, after the queues have been920// disposed, we can no longer track them, so921// we release drainers to prevent hanging922// promises when the resource queue is being923// disposed.924this.releaseDrainers();925926this.drainListeners?.dispose();927}928}929930export type Task<T = void> = () => (Promise<T> | T);931932/**933* Wrap a type in an optional promise. This can be useful to avoid the runtime934* overhead of creating a promise.935*/936export type MaybePromise<T> = Promise<T> | T;937938/**939* Processes tasks in the order they were scheduled.940*/941export class TaskQueue {942private _runningTask: Task<any> | undefined = undefined;943private _pendingTasks: { task: Task<any>; deferred: DeferredPromise<any>; setUndefinedWhenCleared: boolean }[] = [];944945/**946* Waits for the current and pending tasks to finish, then runs and awaits the given task.947* If the task is skipped because of clearPending, the promise is rejected with a CancellationError.948*/949public schedule<T>(task: Task<T>): Promise<T> {950const deferred = new DeferredPromise<T>();951this._pendingTasks.push({ task, deferred, setUndefinedWhenCleared: false });952this._runIfNotRunning();953return deferred.p;954}955956/**957* Waits for the current and pending tasks to finish, then runs and awaits the given task.958* If the task is skipped because of clearPending, the promise is resolved with undefined.959*/960public scheduleSkipIfCleared<T>(task: Task<T>): Promise<T | undefined> {961const deferred = new DeferredPromise<T>();962this._pendingTasks.push({ task, deferred, setUndefinedWhenCleared: true });963this._runIfNotRunning();964return deferred.p;965}966967private _runIfNotRunning(): void {968if (this._runningTask === undefined) {969this._processQueue();970}971}972973private async _processQueue(): Promise<void> {974if (this._pendingTasks.length === 0) {975return;976}977978const next = this._pendingTasks.shift();979if (!next) {980return;981}982983if (this._runningTask) {984throw new BugIndicatingError();985}986987this._runningTask = next.task;988989try {990const result = await next.task();991next.deferred.complete(result);992} catch (e) {993next.deferred.error(e);994} finally {995this._runningTask = undefined;996this._processQueue();997}998}9991000/**1001* Clears all pending tasks. Does not cancel the currently running task.1002*/1003public clearPending(): void {1004const tasks = this._pendingTasks;1005this._pendingTasks = [];1006for (const task of tasks) {1007if (task.setUndefinedWhenCleared) {1008task.deferred.complete(undefined);1009} else {1010task.deferred.error(new CancellationError());1011}1012}1013}1014}10151016export class TimeoutTimer implements IDisposable {1017private _token: Timeout | undefined;1018private _isDisposed = false;10191020constructor();1021constructor(runner: () => void, timeout: number);1022constructor(runner?: () => void, timeout?: number) {1023this._token = undefined;10241025if (typeof runner === 'function' && typeof timeout === 'number') {1026this.setIfNotSet(runner, timeout);1027}1028}10291030dispose(): void {1031this.cancel();1032this._isDisposed = true;1033}10341035cancel(): void {1036if (this._token !== undefined) {1037clearTimeout(this._token);1038this._token = undefined;1039}1040}10411042cancelAndSet(runner: () => void, timeout: number): void {1043if (this._isDisposed) {1044throw new BugIndicatingError(`Calling 'cancelAndSet' on a disposed TimeoutTimer`);1045}10461047this.cancel();1048this._token = setTimeout(() => {1049this._token = undefined;1050runner();1051}, timeout);1052}10531054setIfNotSet(runner: () => void, timeout: number): void {1055if (this._isDisposed) {1056throw new BugIndicatingError(`Calling 'setIfNotSet' on a disposed TimeoutTimer`);1057}10581059if (this._token !== undefined) {1060// timer is already set1061return;1062}1063this._token = setTimeout(() => {1064this._token = undefined;1065runner();1066}, timeout);1067}1068}10691070export class IntervalTimer implements IDisposable {10711072private disposable: IDisposable | undefined = undefined;1073private isDisposed = false;10741075cancel(): void {1076this.disposable?.dispose();1077this.disposable = undefined;1078}10791080cancelAndSet(runner: () => void, interval: number, context = globalThis): void {1081if (this.isDisposed) {1082throw new BugIndicatingError(`Calling 'cancelAndSet' on a disposed IntervalTimer`);1083}10841085this.cancel();1086const handle = context.setInterval(() => {1087runner();1088}, interval);10891090this.disposable = toDisposable(() => {1091context.clearInterval(handle);1092this.disposable = undefined;1093});1094}10951096dispose(): void {1097this.cancel();1098this.isDisposed = true;1099}1100}11011102export class RunOnceScheduler implements IDisposable {11031104protected runner: ((...args: unknown[]) => void) | null;11051106private timeoutToken: Timeout | undefined;1107private timeout: number;1108private timeoutHandler: () => void;11091110constructor(runner: (...args: any[]) => void, delay: number) {1111this.timeoutToken = undefined;1112this.runner = runner;1113this.timeout = delay;1114this.timeoutHandler = this.onTimeout.bind(this);1115}11161117/**1118* Dispose RunOnceScheduler1119*/1120dispose(): void {1121this.cancel();1122this.runner = null;1123}11241125/**1126* Cancel current scheduled runner (if any).1127*/1128cancel(): void {1129if (this.isScheduled()) {1130clearTimeout(this.timeoutToken);1131this.timeoutToken = undefined;1132}1133}11341135/**1136* Cancel previous runner (if any) & schedule a new runner.1137*/1138schedule(delay = this.timeout): void {1139this.cancel();1140this.timeoutToken = setTimeout(this.timeoutHandler, delay);1141}11421143get delay(): number {1144return this.timeout;1145}11461147set delay(value: number) {1148this.timeout = value;1149}11501151/**1152* Returns true if scheduled.1153*/1154isScheduled(): boolean {1155return this.timeoutToken !== undefined;1156}11571158flush(): void {1159if (this.isScheduled()) {1160this.cancel();1161this.doRun();1162}1163}11641165private onTimeout() {1166this.timeoutToken = undefined;1167if (this.runner) {1168this.doRun();1169}1170}11711172protected doRun(): void {1173this.runner?.();1174}1175}11761177/**1178* Same as `RunOnceScheduler`, but doesn't count the time spent in sleep mode.1179* > **NOTE**: Only offers 1s resolution.1180*1181* When calling `setTimeout` with 3hrs, and putting the computer immediately to sleep1182* for 8hrs, `setTimeout` will fire **as soon as the computer wakes from sleep**. But1183* this scheduler will execute 3hrs **after waking the computer from sleep**.1184*/1185export class ProcessTimeRunOnceScheduler {11861187private runner: (() => void) | null;1188private timeout: number;11891190private counter: number;1191private intervalToken: Timeout | undefined;1192private intervalHandler: () => void;11931194constructor(runner: () => void, delay: number) {1195if (delay % 1000 !== 0) {1196console.warn(`ProcessTimeRunOnceScheduler resolution is 1s, ${delay}ms is not a multiple of 1000ms.`);1197}1198this.runner = runner;1199this.timeout = delay;1200this.counter = 0;1201this.intervalToken = undefined;1202this.intervalHandler = this.onInterval.bind(this);1203}12041205dispose(): void {1206this.cancel();1207this.runner = null;1208}12091210cancel(): void {1211if (this.isScheduled()) {1212clearInterval(this.intervalToken);1213this.intervalToken = undefined;1214}1215}12161217/**1218* Cancel previous runner (if any) & schedule a new runner.1219*/1220schedule(delay = this.timeout): void {1221if (delay % 1000 !== 0) {1222console.warn(`ProcessTimeRunOnceScheduler resolution is 1s, ${delay}ms is not a multiple of 1000ms.`);1223}1224this.cancel();1225this.counter = Math.ceil(delay / 1000);1226this.intervalToken = setInterval(this.intervalHandler, 1000);1227}12281229/**1230* Returns true if scheduled.1231*/1232isScheduled(): boolean {1233return this.intervalToken !== undefined;1234}12351236private onInterval() {1237this.counter--;1238if (this.counter > 0) {1239// still need to wait1240return;1241}12421243// time elapsed1244clearInterval(this.intervalToken);1245this.intervalToken = undefined;1246this.runner?.();1247}1248}12491250export class RunOnceWorker<T> extends RunOnceScheduler {12511252private units: T[] = [];12531254constructor(runner: (units: T[]) => void, timeout: number) {1255super(runner, timeout);1256}12571258work(unit: T): void {1259this.units.push(unit);12601261if (!this.isScheduled()) {1262this.schedule();1263}1264}12651266protected override doRun(): void {1267const units = this.units;1268this.units = [];12691270this.runner?.(units);1271}12721273override dispose(): void {1274this.units = [];12751276super.dispose();1277}1278}12791280export interface IThrottledWorkerOptions {12811282/**1283* maximum of units the worker will pass onto handler at once1284*/1285maxWorkChunkSize: number;12861287/**1288* maximum of units the worker will keep in memory for processing1289*/1290maxBufferedWork: number | undefined;12911292/**1293* delay before processing the next round of chunks when chunk size exceeds limits1294*/1295throttleDelay: number;12961297/**1298* When enabled will guarantee that two distinct calls to `work()` are not executed1299* without throttle delay between them.1300* Otherwise if the worker isn't currently throttling it will execute work immediately.1301*/1302waitThrottleDelayBetweenWorkUnits?: boolean;1303}13041305/**1306* The `ThrottledWorker` will accept units of work `T`1307* to handle. The contract is:1308* * there is a maximum of units the worker can handle at once (via `maxWorkChunkSize`)1309* * there is a maximum of units the worker will keep in memory for processing (via `maxBufferedWork`)1310* * after having handled `maxWorkChunkSize` units, the worker needs to rest (via `throttleDelay`)1311*/1312export class ThrottledWorker<T> extends Disposable {13131314private readonly pendingWork: T[] = [];13151316private readonly throttler = this._register(new MutableDisposable<RunOnceScheduler>());1317private disposed = false;1318private lastExecutionTime = 0;13191320constructor(1321private options: IThrottledWorkerOptions,1322private readonly handler: (units: T[]) => void1323) {1324super();1325}13261327/**1328* The number of work units that are pending to be processed.1329*/1330get pending(): number { return this.pendingWork.length; }13311332/**1333* Add units to be worked on. Use `pending` to figure out1334* how many units are not yet processed after this method1335* was called.1336*1337* @returns whether the work was accepted or not. If the1338* worker is disposed, it will not accept any more work.1339* If the number of pending units would become larger1340* than `maxPendingWork`, more work will also not be accepted.1341*/1342work(units: readonly T[]): boolean {1343if (this.disposed) {1344return false; // work not accepted: disposed1345}13461347// Check for reaching maximum of pending work1348if (typeof this.options.maxBufferedWork === 'number') {13491350// Throttled: simple check if pending + units exceeds max pending1351if (this.throttler.value) {1352if (this.pending + units.length > this.options.maxBufferedWork) {1353return false; // work not accepted: too much pending work1354}1355}13561357// Unthrottled: same as throttled, but account for max chunk getting1358// worked on directly without being pending1359else {1360if (this.pending + units.length - this.options.maxWorkChunkSize > this.options.maxBufferedWork) {1361return false; // work not accepted: too much pending work1362}1363}1364}13651366// Add to pending units first1367for (const unit of units) {1368this.pendingWork.push(unit);1369}13701371const timeSinceLastExecution = Date.now() - this.lastExecutionTime;13721373if (!this.throttler.value && (!this.options.waitThrottleDelayBetweenWorkUnits || timeSinceLastExecution >= this.options.throttleDelay)) {1374// Work directly if we are not throttling and we are not1375// enforced to throttle between `work()` calls.1376this.doWork();1377} else if (!this.throttler.value && this.options.waitThrottleDelayBetweenWorkUnits) {1378// Otherwise, schedule the throttler to work.1379this.scheduleThrottler(Math.max(this.options.throttleDelay - timeSinceLastExecution, 0));1380} else {1381// Otherwise, our work will be picked up by the running throttler1382}13831384return true; // work accepted1385}13861387private doWork(): void {1388this.lastExecutionTime = Date.now();13891390// Extract chunk to handle and handle it1391this.handler(this.pendingWork.splice(0, this.options.maxWorkChunkSize));13921393// If we have remaining work, schedule it after a delay1394if (this.pendingWork.length > 0) {1395this.scheduleThrottler();1396}1397}13981399private scheduleThrottler(delay = this.options.throttleDelay): void {1400this.throttler.value = new RunOnceScheduler(() => {1401this.throttler.clear();14021403this.doWork();1404}, delay);1405this.throttler.value.schedule();1406}14071408override dispose(): void {1409super.dispose();14101411this.pendingWork.length = 0;1412this.disposed = true;1413}1414}14151416//#region -- run on idle tricks ------------14171418export interface IdleDeadline {1419readonly didTimeout: boolean;1420timeRemaining(): number;1421}14221423type IdleApi = Pick<typeof globalThis, 'requestIdleCallback' | 'cancelIdleCallback'>;142414251426/**1427* Execute the callback the next time the browser is idle, returning an1428* {@link IDisposable} that will cancel the callback when disposed. This wraps1429* [requestIdleCallback] so it will fallback to [setTimeout] if the environment1430* doesn't support it.1431*1432* @param callback The callback to run when idle, this includes an1433* [IdleDeadline] that provides the time alloted for the idle callback by the1434* browser. Not respecting this deadline will result in a degraded user1435* experience.1436* @param timeout A timeout at which point to queue no longer wait for an idle1437* callback but queue it on the regular event loop (like setTimeout). Typically1438* this should not be used.1439*1440* [IdleDeadline]: https://developer.mozilla.org/en-US/docs/Web/API/IdleDeadline1441* [requestIdleCallback]: https://developer.mozilla.org/en-US/docs/Web/API/Window/requestIdleCallback1442* [setTimeout]: https://developer.mozilla.org/en-US/docs/Web/API/Window/setTimeout1443*1444* **Note** that there is `dom.ts#runWhenWindowIdle` which is better suited when running inside a browser1445* context1446*/1447export let runWhenGlobalIdle: (callback: (idle: IdleDeadline) => void, timeout?: number) => IDisposable;14481449export let _runWhenIdle: (targetWindow: IdleApi, callback: (idle: IdleDeadline) => void, timeout?: number) => IDisposable;14501451(function () {1452const safeGlobal: any = globalThis;1453if (typeof safeGlobal.requestIdleCallback !== 'function' || typeof safeGlobal.cancelIdleCallback !== 'function') {1454_runWhenIdle = (_targetWindow, runner, timeout?) => {1455setTimeout0(() => {1456if (disposed) {1457return;1458}1459const end = Date.now() + 15; // one frame at 64fps1460const deadline: IdleDeadline = {1461didTimeout: true,1462timeRemaining() {1463return Math.max(0, end - Date.now());1464}1465};1466runner(Object.freeze(deadline));1467});1468let disposed = false;1469return {1470dispose() {1471if (disposed) {1472return;1473}1474disposed = true;1475}1476};1477};1478} else {1479_runWhenIdle = (targetWindow: typeof safeGlobal, runner, timeout?) => {1480const handle: number = targetWindow.requestIdleCallback(runner, typeof timeout === 'number' ? { timeout } : undefined);1481let disposed = false;1482return {1483dispose() {1484if (disposed) {1485return;1486}1487disposed = true;1488targetWindow.cancelIdleCallback(handle);1489}1490};1491};1492}1493runWhenGlobalIdle = (runner, timeout) => _runWhenIdle(globalThis, runner, timeout);1494})();14951496export abstract class AbstractIdleValue<T> {14971498private readonly _executor: () => void;1499private readonly _handle: IDisposable;15001501private _didRun: boolean = false;1502private _value?: T;1503private _error: unknown;15041505constructor(targetWindow: IdleApi, executor: () => T) {1506this._executor = () => {1507try {1508this._value = executor();1509} catch (err) {1510this._error = err;1511} finally {1512this._didRun = true;1513}1514};1515this._handle = _runWhenIdle(targetWindow, () => this._executor());1516}15171518dispose(): void {1519this._handle.dispose();1520}15211522get value(): T {1523if (!this._didRun) {1524this._handle.dispose();1525this._executor();1526}1527if (this._error) {1528throw this._error;1529}1530return this._value!;1531}15321533get isInitialized(): boolean {1534return this._didRun;1535}1536}15371538/**1539* An `IdleValue` that always uses the current window (which might be throttled or inactive)1540*1541* **Note** that there is `dom.ts#WindowIdleValue` which is better suited when running inside a browser1542* context1543*/1544export class GlobalIdleValue<T> extends AbstractIdleValue<T> {15451546constructor(executor: () => T) {1547super(globalThis, executor);1548}1549}15501551//#endregion15521553export async function retry<T>(task: ITask<Promise<T>>, delay: number, retries: number): Promise<T> {1554let lastError: Error | undefined;15551556for (let i = 0; i < retries; i++) {1557try {1558return await task();1559} catch (error) {1560lastError = error;15611562await timeout(delay);1563}1564}15651566throw lastError;1567}15681569//#region Task Sequentializer15701571interface IRunningTask {1572readonly taskId: number;1573readonly cancel: () => void;1574readonly promise: Promise<void>;1575}15761577interface IQueuedTask {1578readonly promise: Promise<void>;1579readonly promiseResolve: () => void;1580readonly promiseReject: (error: Error) => void;1581run: ITask<Promise<void>>;1582}15831584export interface ITaskSequentializerWithRunningTask {1585readonly running: Promise<void>;1586}15871588export interface ITaskSequentializerWithQueuedTask {1589readonly queued: IQueuedTask;1590}15911592/**1593* @deprecated use `LimitedQueue` instead for an easier to use API1594*/1595export class TaskSequentializer {15961597private _running?: IRunningTask;1598private _queued?: IQueuedTask;15991600isRunning(taskId?: number): this is ITaskSequentializerWithRunningTask {1601if (typeof taskId === 'number') {1602return this._running?.taskId === taskId;1603}16041605return !!this._running;1606}16071608get running(): Promise<void> | undefined {1609return this._running?.promise;1610}16111612cancelRunning(): void {1613this._running?.cancel();1614}16151616run(taskId: number, promise: Promise<void>, onCancel?: () => void,): Promise<void> {1617this._running = { taskId, cancel: () => onCancel?.(), promise };16181619promise.then(() => this.doneRunning(taskId), () => this.doneRunning(taskId));16201621return promise;1622}16231624private doneRunning(taskId: number): void {1625if (this._running && taskId === this._running.taskId) {16261627// only set running to done if the promise finished that is associated with that taskId1628this._running = undefined;16291630// schedule the queued task now that we are free if we have any1631this.runQueued();1632}1633}16341635private runQueued(): void {1636if (this._queued) {1637const queued = this._queued;1638this._queued = undefined;16391640// Run queued task and complete on the associated promise1641queued.run().then(queued.promiseResolve, queued.promiseReject);1642}1643}16441645/**1646* Note: the promise to schedule as next run MUST itself call `run`.1647* Otherwise, this sequentializer will report `false` for `isRunning`1648* even when this task is running. Missing this detail means that1649* suddenly multiple tasks will run in parallel.1650*/1651queue(run: ITask<Promise<void>>): Promise<void> {16521653// this is our first queued task, so we create associated promise with it1654// so that we can return a promise that completes when the task has1655// completed.1656if (!this._queued) {1657const { promise, resolve: promiseResolve, reject: promiseReject } = promiseWithResolvers<void>();1658this._queued = {1659run,1660promise,1661promiseResolve,1662promiseReject1663};1664}16651666// we have a previous queued task, just overwrite it1667else {1668this._queued.run = run;1669}16701671return this._queued.promise;1672}16731674hasQueued(): this is ITaskSequentializerWithQueuedTask {1675return !!this._queued;1676}16771678async join(): Promise<void> {1679return this._queued?.promise ?? this._running?.promise;1680}1681}16821683//#endregion16841685//#region16861687/**1688* The `IntervalCounter` allows to count the number1689* of calls to `increment()` over a duration of1690* `interval`. This utility can be used to conditionally1691* throttle a frequent task when a certain threshold1692* is reached.1693*/1694export class IntervalCounter {16951696private lastIncrementTime = 0;16971698private value = 0;16991700constructor(private readonly interval: number, private readonly nowFn = () => Date.now()) { }17011702increment(): number {1703const now = this.nowFn();17041705// We are outside of the range of `interval` and as such1706// start counting from 0 and remember the time1707if (now - this.lastIncrementTime > this.interval) {1708this.lastIncrementTime = now;1709this.value = 0;1710}17111712this.value++;17131714return this.value;1715}1716}17171718//#endregion17191720//#region17211722export type ValueCallback<T = unknown> = (value: T | Promise<T>) => void;17231724const enum DeferredOutcome {1725Resolved,1726Rejected1727}17281729/**1730* Creates a promise whose resolution or rejection can be controlled imperatively.1731*/1732export class DeferredPromise<T> {17331734public static fromPromise<T>(promise: Promise<T>): DeferredPromise<T> {1735const deferred = new DeferredPromise<T>();1736deferred.settleWith(promise);1737return deferred;1738}17391740private completeCallback!: ValueCallback<T>;1741private errorCallback!: (err: unknown) => void;1742private outcome?: { outcome: DeferredOutcome.Rejected; value: unknown } | { outcome: DeferredOutcome.Resolved; value: T };17431744public get isRejected() {1745return this.outcome?.outcome === DeferredOutcome.Rejected;1746}17471748public get isResolved() {1749return this.outcome?.outcome === DeferredOutcome.Resolved;1750}17511752public get isSettled() {1753return !!this.outcome;1754}17551756public get value() {1757return this.outcome?.outcome === DeferredOutcome.Resolved ? this.outcome?.value : undefined;1758}17591760public readonly p: Promise<T>;17611762constructor() {1763this.p = new Promise<T>((c, e) => {1764this.completeCallback = c;1765this.errorCallback = e;1766});1767}17681769public complete(value: T) {1770if (this.isSettled) {1771return Promise.resolve();1772}17731774return new Promise<void>(resolve => {1775this.completeCallback(value);1776this.outcome = { outcome: DeferredOutcome.Resolved, value };1777resolve();1778});1779}17801781public error(err: unknown) {1782if (this.isSettled) {1783return Promise.resolve();1784}17851786return new Promise<void>(resolve => {1787this.errorCallback(err);1788this.outcome = { outcome: DeferredOutcome.Rejected, value: err };1789resolve();1790});1791}17921793public settleWith(promise: Promise<T>): Promise<void> {1794return promise.then(1795value => this.complete(value),1796error => this.error(error)1797);1798}17991800public cancel() {1801return this.error(new CancellationError());1802}1803}18041805//#endregion18061807//#region Promises18081809export namespace Promises {18101811/**1812* A drop-in replacement for `Promise.all` with the only difference1813* that the method awaits every promise to either fulfill or reject.1814*1815* Similar to `Promise.all`, only the first error will be returned1816* if any.1817*/1818export async function settled<T>(promises: Promise<T>[]): Promise<T[]> {1819let firstError: Error | undefined = undefined;18201821const result = await Promise.all(promises.map(promise => promise.then(value => value, error => {1822if (!firstError) {1823firstError = error;1824}18251826return undefined; // do not rethrow so that other promises can settle1827})));18281829if (typeof firstError !== 'undefined') {1830throw firstError;1831}18321833return result as unknown as T[]; // cast is needed and protected by the `throw` above1834}18351836/**1837* A helper to create a new `Promise<T>` with a body that is a promise1838* itself. By default, an error that raises from the async body will1839* end up as a unhandled rejection, so this utility properly awaits the1840* body and rejects the promise as a normal promise does without async1841* body.1842*1843* This method should only be used in rare cases where otherwise `async`1844* cannot be used (e.g. when callbacks are involved that require this).1845*/1846export function withAsyncBody<T, E = Error>(bodyFn: (resolve: (value: T) => unknown, reject: (error: E) => unknown) => Promise<unknown>): Promise<T> {1847// eslint-disable-next-line no-async-promise-executor1848return new Promise<T>(async (resolve, reject) => {1849try {1850await bodyFn(resolve, reject);1851} catch (error) {1852reject(error);1853}1854});1855}1856}18571858export class StatefulPromise<T> {1859private _value: T | undefined = undefined;1860get value(): T | undefined { return this._value; }18611862private _error: unknown = undefined;1863get error(): unknown { return this._error; }18641865private _isResolved = false;1866get isResolved() { return this._isResolved; }18671868public readonly promise: Promise<T>;18691870constructor(promise: Promise<T>) {1871this.promise = promise.then(1872value => {1873this._value = value;1874this._isResolved = true;1875return value;1876},1877error => {1878this._error = error;1879this._isResolved = true;1880throw error;1881}1882);1883}18841885/**1886* Returns the resolved value.1887* Throws if the promise is not resolved yet.1888*/1889public requireValue(): T {1890if (!this._isResolved) {1891throw new BugIndicatingError('Promise is not resolved yet');1892}1893if (this._error) {1894throw this._error;1895}1896return this._value!;1897}1898}18991900export class LazyStatefulPromise<T> {1901private readonly _promise = new Lazy(() => new StatefulPromise(this._compute()));19021903constructor(1904private readonly _compute: () => Promise<T>,1905) { }19061907/**1908* Returns the resolved value.1909* Throws if the promise is not resolved yet.1910*/1911public requireValue(): T {1912return this._promise.value.requireValue();1913}19141915/**1916* Returns the promise (and triggers a computation of the promise if not yet done so).1917*/1918public getPromise(): Promise<T> {1919return this._promise.value.promise;1920}19211922/**1923* Reads the current value without triggering a computation of the promise.1924*/1925public get currentValue(): T | undefined {1926return this._promise.rawValue?.value;1927}1928}19291930//#endregion19311932//#region19331934const enum AsyncIterableSourceState {1935Initial,1936DoneOK,1937DoneError,1938}19391940/**1941* An object that allows to emit async values asynchronously or bring the iterable to an error state using `reject()`.1942* This emitter is valid only for the duration of the executor (until the promise returned by the executor settles).1943*/1944export interface AsyncIterableEmitter<T> {1945/**1946* The value will be appended at the end.1947*1948* **NOTE** If `reject()` has already been called, this method has no effect.1949*/1950emitOne(value: T): void;1951/**1952* The values will be appended at the end.1953*1954* **NOTE** If `reject()` has already been called, this method has no effect.1955*/1956emitMany(values: T[]): void;1957/**1958* Writing an error will permanently invalidate this iterable.1959* The current users will receive an error thrown, as will all future users.1960*1961* **NOTE** If `reject()` have already been called, this method has no effect.1962*/1963reject(error: Error): void;1964}19651966/**1967* An executor for the `AsyncIterableObject` that has access to an emitter.1968*/1969export interface AsyncIterableExecutor<T> {1970/**1971* @param emitter An object that allows to emit async values valid only for the duration of the executor.1972*/1973(emitter: AsyncIterableEmitter<T>): unknown | Promise<unknown>;1974}19751976/**1977* A rich implementation for an `AsyncIterable<T>`.1978*/1979export class AsyncIterableObject<T> implements AsyncIterable<T> {19801981public static fromArray<T>(items: T[]): AsyncIterableObject<T> {1982return new AsyncIterableObject<T>((writer) => {1983writer.emitMany(items);1984});1985}19861987public static fromPromise<T>(promise: Promise<T[]>): AsyncIterableObject<T> {1988return new AsyncIterableObject<T>(async (emitter) => {1989emitter.emitMany(await promise);1990});1991}19921993public static fromPromisesResolveOrder<T>(promises: Promise<T>[]): AsyncIterableObject<T> {1994return new AsyncIterableObject<T>(async (emitter) => {1995await Promise.all(promises.map(async (p) => emitter.emitOne(await p)));1996});1997}19981999public static merge<T>(iterables: AsyncIterable<T>[]): AsyncIterableObject<T> {2000return new AsyncIterableObject(async (emitter) => {2001await Promise.all(iterables.map(async (iterable) => {2002for await (const item of iterable) {2003emitter.emitOne(item);2004}2005}));2006});2007}20082009public static EMPTY = AsyncIterableObject.fromArray<any>([]);20102011private _state: AsyncIterableSourceState;2012private _results: T[];2013private _error: Error | null;2014private readonly _onReturn?: () => void | Promise<void>;2015private readonly _onStateChanged: Emitter<void>;20162017constructor(executor: AsyncIterableExecutor<T>, onReturn?: () => void | Promise<void>) {2018this._state = AsyncIterableSourceState.Initial;2019this._results = [];2020this._error = null;2021this._onReturn = onReturn;2022this._onStateChanged = new Emitter<void>();20232024queueMicrotask(async () => {2025const writer: AsyncIterableEmitter<T> = {2026emitOne: (item) => this.emitOne(item),2027emitMany: (items) => this.emitMany(items),2028reject: (error) => this.reject(error)2029};2030try {2031await Promise.resolve(executor(writer));2032this.resolve();2033} catch (err) {2034this.reject(err);2035} finally {2036writer.emitOne = undefined!;2037writer.emitMany = undefined!;2038writer.reject = undefined!;2039}2040});2041}20422043[Symbol.asyncIterator](): AsyncIterator<T, undefined, undefined> {2044let i = 0;2045return {2046next: async () => {2047do {2048if (this._state === AsyncIterableSourceState.DoneError) {2049throw this._error;2050}2051if (i < this._results.length) {2052return { done: false, value: this._results[i++] };2053}2054if (this._state === AsyncIterableSourceState.DoneOK) {2055return { done: true, value: undefined };2056}2057await Event.toPromise(this._onStateChanged.event);2058} while (true);2059},2060return: async () => {2061this._onReturn?.();2062return { done: true, value: undefined };2063}2064};2065}20662067public static map<T, R>(iterable: AsyncIterable<T>, mapFn: (item: T) => R): AsyncIterableObject<R> {2068return new AsyncIterableObject<R>(async (emitter) => {2069for await (const item of iterable) {2070emitter.emitOne(mapFn(item));2071}2072});2073}20742075public map<R>(mapFn: (item: T) => R): AsyncIterableObject<R> {2076return AsyncIterableObject.map(this, mapFn);2077}20782079public static filter<T>(iterable: AsyncIterable<T>, filterFn: (item: T) => boolean): AsyncIterableObject<T> {2080return new AsyncIterableObject<T>(async (emitter) => {2081for await (const item of iterable) {2082if (filterFn(item)) {2083emitter.emitOne(item);2084}2085}2086});2087}20882089public filter<T2 extends T>(filterFn: (item: T) => item is T2): AsyncIterableObject<T2>;2090public filter(filterFn: (item: T) => boolean): AsyncIterableObject<T>;2091public filter(filterFn: (item: T) => boolean): AsyncIterableObject<T> {2092return AsyncIterableObject.filter(this, filterFn);2093}20942095public static coalesce<T>(iterable: AsyncIterable<T | undefined | null>): AsyncIterableObject<T> {2096return <AsyncIterableObject<T>>AsyncIterableObject.filter(iterable, item => !!item);2097}20982099public coalesce(): AsyncIterableObject<NonNullable<T>> {2100return AsyncIterableObject.coalesce(this) as AsyncIterableObject<NonNullable<T>>;2101}21022103public static async toPromise<T>(iterable: AsyncIterable<T>): Promise<T[]> {2104const result: T[] = [];2105for await (const item of iterable) {2106result.push(item);2107}2108return result;2109}21102111public toPromise(): Promise<T[]> {2112return AsyncIterableObject.toPromise(this);2113}21142115/**2116* The value will be appended at the end.2117*2118* **NOTE** If `resolve()` or `reject()` have already been called, this method has no effect.2119*/2120private emitOne(value: T): void {2121if (this._state !== AsyncIterableSourceState.Initial) {2122return;2123}2124// it is important to add new values at the end,2125// as we may have iterators already running on the array2126this._results.push(value);2127this._onStateChanged.fire();2128}21292130/**2131* The values will be appended at the end.2132*2133* **NOTE** If `resolve()` or `reject()` have already been called, this method has no effect.2134*/2135private emitMany(values: T[]): void {2136if (this._state !== AsyncIterableSourceState.Initial) {2137return;2138}2139// it is important to add new values at the end,2140// as we may have iterators already running on the array2141this._results = this._results.concat(values);2142this._onStateChanged.fire();2143}21442145/**2146* Calling `resolve()` will mark the result array as complete.2147*2148* **NOTE** `resolve()` must be called, otherwise all consumers of this iterable will hang indefinitely, similar to a non-resolved promise.2149* **NOTE** If `resolve()` or `reject()` have already been called, this method has no effect.2150*/2151private resolve(): void {2152if (this._state !== AsyncIterableSourceState.Initial) {2153return;2154}2155this._state = AsyncIterableSourceState.DoneOK;2156this._onStateChanged.fire();2157}21582159/**2160* Writing an error will permanently invalidate this iterable.2161* The current users will receive an error thrown, as will all future users.2162*2163* **NOTE** If `resolve()` or `reject()` have already been called, this method has no effect.2164*/2165private reject(error: Error) {2166if (this._state !== AsyncIterableSourceState.Initial) {2167return;2168}2169this._state = AsyncIterableSourceState.DoneError;2170this._error = error;2171this._onStateChanged.fire();2172}2173}217421752176export function createCancelableAsyncIterableProducer<T>(callback: (token: CancellationToken) => AsyncIterable<T>): CancelableAsyncIterableProducer<T> {2177const source = new CancellationTokenSource();2178const innerIterable = callback(source.token);21792180return new CancelableAsyncIterableProducer<T>(source, async (emitter) => {2181const subscription = source.token.onCancellationRequested(() => {2182subscription.dispose();2183source.dispose();2184emitter.reject(new CancellationError());2185});2186try {2187for await (const item of innerIterable) {2188if (source.token.isCancellationRequested) {2189// canceled in the meantime2190return;2191}2192emitter.emitOne(item);2193}2194subscription.dispose();2195source.dispose();2196} catch (err) {2197subscription.dispose();2198source.dispose();2199emitter.reject(err);2200}2201});2202}22032204export class AsyncIterableSource<T> {22052206private readonly _deferred = new DeferredPromise<void>();2207private readonly _asyncIterable: AsyncIterableObject<T>;22082209private _errorFn: (error: Error) => void;2210private _emitOneFn: (item: T) => void;2211private _emitManyFn: (item: T[]) => void;22122213/**2214*2215* @param onReturn A function that will be called when consuming the async iterable2216* has finished by the consumer, e.g the for-await-loop has be existed (break, return) early.2217* This is NOT called when resolving this source by its owner.2218*/2219constructor(onReturn?: () => Promise<void> | void) {2220this._asyncIterable = new AsyncIterableObject(emitter => {22212222if (earlyError) {2223emitter.reject(earlyError);2224return;2225}2226if (earlyItems) {2227emitter.emitMany(earlyItems);2228}2229this._errorFn = (error: Error) => emitter.reject(error);2230this._emitOneFn = (item: T) => emitter.emitOne(item);2231this._emitManyFn = (items: T[]) => emitter.emitMany(items);2232return this._deferred.p;2233}, onReturn);22342235let earlyError: Error | undefined;2236let earlyItems: T[] | undefined;223722382239this._errorFn = (error: Error) => {2240if (!earlyError) {2241earlyError = error;2242}2243};2244this._emitOneFn = (item: T) => {2245if (!earlyItems) {2246earlyItems = [];2247}2248earlyItems.push(item);2249};2250this._emitManyFn = (items: T[]) => {2251if (!earlyItems) {2252earlyItems = items.slice();2253} else {2254items.forEach(item => earlyItems!.push(item));2255}2256};2257}22582259get asyncIterable(): AsyncIterableObject<T> {2260return this._asyncIterable;2261}22622263resolve(): void {2264this._deferred.complete();2265}22662267reject(error: Error): void {2268this._errorFn(error);2269this._deferred.complete();2270}22712272emitOne(item: T): void {2273this._emitOneFn(item);2274}22752276emitMany(items: T[]) {2277this._emitManyFn(items);2278}2279}22802281export function cancellableIterable<T>(iterableOrIterator: AsyncIterator<T> | AsyncIterable<T>, token: CancellationToken): AsyncIterableIterator<T> {2282const iterator = Symbol.asyncIterator in iterableOrIterator ? iterableOrIterator[Symbol.asyncIterator]() : iterableOrIterator;22832284return {2285async next(): Promise<IteratorResult<T>> {2286if (token.isCancellationRequested) {2287return { done: true, value: undefined };2288}2289const result = await raceCancellation(iterator.next(), token);2290return result || { done: true, value: undefined };2291},2292throw: iterator.throw?.bind(iterator),2293return: iterator.return?.bind(iterator),2294[Symbol.asyncIterator]() {2295return this;2296}2297};2298}22992300type ProducerConsumerValue<T> = {2301ok: true;2302value: T;2303} | {2304ok: false;2305error: Error;2306};23072308class ProducerConsumer<T> {2309private readonly _unsatisfiedConsumers: DeferredPromise<T>[] = [];2310private readonly _unconsumedValues: ProducerConsumerValue<T>[] = [];2311private _finalValue: ProducerConsumerValue<T> | undefined;23122313public get hasFinalValue(): boolean {2314return !!this._finalValue;2315}23162317produce(value: ProducerConsumerValue<T>): void {2318this._ensureNoFinalValue();2319if (this._unsatisfiedConsumers.length > 0) {2320const deferred = this._unsatisfiedConsumers.shift()!;2321this._resolveOrRejectDeferred(deferred, value);2322} else {2323this._unconsumedValues.push(value);2324}2325}23262327produceFinal(value: ProducerConsumerValue<T>): void {2328this._ensureNoFinalValue();2329this._finalValue = value;2330for (const deferred of this._unsatisfiedConsumers) {2331this._resolveOrRejectDeferred(deferred, value);2332}2333this._unsatisfiedConsumers.length = 0;2334}23352336private _ensureNoFinalValue(): void {2337if (this._finalValue) {2338throw new BugIndicatingError('ProducerConsumer: cannot produce after final value has been set');2339}2340}23412342private _resolveOrRejectDeferred(deferred: DeferredPromise<T>, value: ProducerConsumerValue<T>): void {2343if (value.ok) {2344deferred.complete(value.value);2345} else {2346deferred.error(value.error);2347}2348}23492350consume(): Promise<T> {2351if (this._unconsumedValues.length > 0 || this._finalValue) {2352const value = this._unconsumedValues.length > 0 ? this._unconsumedValues.shift()! : this._finalValue!;2353if (value.ok) {2354return Promise.resolve(value.value);2355} else {2356return Promise.reject(value.error);2357}2358} else {2359const deferred = new DeferredPromise<T>();2360this._unsatisfiedConsumers.push(deferred);2361return deferred.p;2362}2363}2364}23652366/**2367* Important difference to AsyncIterableObject:2368* If it is iterated two times, the second iterator will not see the values emitted by the first iterator.2369*/2370export class AsyncIterableProducer<T> implements AsyncIterable<T> {2371private readonly _producerConsumer = new ProducerConsumer<IteratorResult<T>>();23722373constructor(executor: AsyncIterableExecutor<T>, private readonly _onReturn?: () => void) {2374queueMicrotask(async () => {2375const p = executor({2376emitOne: value => this._producerConsumer.produce({ ok: true, value: { done: false, value: value } }),2377emitMany: values => {2378for (const value of values) {2379this._producerConsumer.produce({ ok: true, value: { done: false, value: value } });2380}2381},2382reject: error => this._finishError(error),2383});23842385if (!this._producerConsumer.hasFinalValue) {2386try {2387await p;2388this._finishOk();2389} catch (error) {2390this._finishError(error);2391}2392}2393});2394}23952396public static fromArray<T>(items: T[]): AsyncIterableProducer<T> {2397return new AsyncIterableProducer<T>((writer) => {2398writer.emitMany(items);2399});2400}24012402public static fromPromise<T>(promise: Promise<T[]>): AsyncIterableProducer<T> {2403return new AsyncIterableProducer<T>(async (emitter) => {2404emitter.emitMany(await promise);2405});2406}24072408public static fromPromisesResolveOrder<T>(promises: Promise<T>[]): AsyncIterableProducer<T> {2409return new AsyncIterableProducer<T>(async (emitter) => {2410await Promise.all(promises.map(async (p) => emitter.emitOne(await p)));2411});2412}24132414public static merge<T>(iterables: AsyncIterable<T>[]): AsyncIterableProducer<T> {2415return new AsyncIterableProducer(async (emitter) => {2416await Promise.all(iterables.map(async (iterable) => {2417for await (const item of iterable) {2418emitter.emitOne(item);2419}2420}));2421});2422}24232424public static EMPTY = AsyncIterableProducer.fromArray<any>([]);24252426public static map<T, R>(iterable: AsyncIterable<T>, mapFn: (item: T) => R): AsyncIterableProducer<R> {2427return new AsyncIterableProducer<R>(async (emitter) => {2428for await (const item of iterable) {2429emitter.emitOne(mapFn(item));2430}2431});2432}24332434public static tee<T>(iterable: AsyncIterable<T>): [AsyncIterableProducer<T>, AsyncIterableProducer<T>] {2435let emitter1: AsyncIterableEmitter<T> | undefined;2436let emitter2: AsyncIterableEmitter<T> | undefined;24372438const defer = new DeferredPromise<void>();24392440const start = async () => {2441if (!emitter1 || !emitter2) {2442return; // not yet ready2443}2444try {2445for await (const item of iterable) {2446emitter1.emitOne(item);2447emitter2.emitOne(item);2448}2449} catch (err) {2450emitter1.reject(err);2451emitter2.reject(err);2452} finally {2453defer.complete();2454}2455};24562457const p1 = new AsyncIterableProducer<T>(async (emitter) => {2458emitter1 = emitter;2459start();2460return defer.p;2461});2462const p2 = new AsyncIterableProducer<T>(async (emitter) => {2463emitter2 = emitter;2464start();2465return defer.p;2466});2467return [p1, p2];2468}24692470public map<R>(mapFn: (item: T) => R): AsyncIterableProducer<R> {2471return AsyncIterableProducer.map(this, mapFn);2472}24732474public static coalesce<T>(iterable: AsyncIterable<T | undefined | null>): AsyncIterableProducer<T> {2475return <AsyncIterableProducer<T>>AsyncIterableProducer.filter(iterable, item => !!item);2476}24772478public coalesce(): AsyncIterableProducer<NonNullable<T>> {2479return AsyncIterableProducer.coalesce(this) as AsyncIterableProducer<NonNullable<T>>;2480}24812482public static filter<T>(iterable: AsyncIterable<T>, filterFn: (item: T) => boolean): AsyncIterableProducer<T> {2483return new AsyncIterableProducer<T>(async (emitter) => {2484for await (const item of iterable) {2485if (filterFn(item)) {2486emitter.emitOne(item);2487}2488}2489});2490}24912492public filter<T2 extends T>(filterFn: (item: T) => item is T2): AsyncIterableProducer<T2>;2493public filter(filterFn: (item: T) => boolean): AsyncIterableProducer<T>;2494public filter(filterFn: (item: T) => boolean): AsyncIterableProducer<T> {2495return AsyncIterableProducer.filter(this, filterFn);2496}24972498private _finishOk(): void {2499if (!this._producerConsumer.hasFinalValue) {2500this._producerConsumer.produceFinal({ ok: true, value: { done: true, value: undefined } });2501}2502}25032504private _finishError(error: Error): void {2505if (!this._producerConsumer.hasFinalValue) {2506this._producerConsumer.produceFinal({ ok: false, error: error });2507}2508// Warning: this can cause to dropped errors.2509}25102511private readonly _iterator: AsyncIterator<T, void, void> = {2512next: () => this._producerConsumer.consume(),2513return: () => {2514this._onReturn?.();2515return Promise.resolve({ done: true, value: undefined });2516},2517throw: async (e) => {2518this._finishError(e);2519return { done: true, value: undefined };2520},2521};25222523[Symbol.asyncIterator](): AsyncIterator<T, void, void> {2524return this._iterator;2525}2526}25272528export class CancelableAsyncIterableProducer<T> extends AsyncIterableProducer<T> {2529constructor(2530private readonly _source: CancellationTokenSource,2531executor: AsyncIterableExecutor<T>2532) {2533super(executor);2534}25352536cancel(): void {2537this._source.cancel();2538}2539}25402541//#endregion25422543export const AsyncReaderEndOfStream = Symbol('AsyncReaderEndOfStream');25442545export class AsyncReader<T> {2546private _buffer: T[] = [];2547private _atEnd = false;25482549public get endOfStream(): boolean { return this._buffer.length === 0 && this._atEnd; }2550private _extendBufferPromise: Promise<void> | undefined;25512552constructor(2553private readonly _source: AsyncIterator<T>2554) {2555}25562557public async read(): Promise<T | typeof AsyncReaderEndOfStream> {2558if (this._buffer.length === 0 && !this._atEnd) {2559await this._extendBuffer();2560}2561if (this._buffer.length === 0) {2562return AsyncReaderEndOfStream;2563}2564return this._buffer.shift()!;2565}25662567public async readWhile(predicate: (value: T) => boolean, callback: (element: T) => unknown): Promise<void> {2568do {2569const piece = await this.peek();2570if (piece === AsyncReaderEndOfStream) {2571break;2572}2573if (!predicate(piece)) {2574break;2575}2576await this.read(); // consume2577await callback(piece);2578} while (true);2579}25802581public readBufferedOrThrow(): T | typeof AsyncReaderEndOfStream {2582const value = this.peekBufferedOrThrow();2583this._buffer.shift();2584return value;2585}25862587public async consumeToEnd(): Promise<void> {2588while (!this.endOfStream) {2589await this.read();2590}2591}25922593public async peek(): Promise<T | typeof AsyncReaderEndOfStream> {2594if (this._buffer.length === 0 && !this._atEnd) {2595await this._extendBuffer();2596}2597if (this._buffer.length === 0) {2598return AsyncReaderEndOfStream;2599}2600return this._buffer[0];2601}26022603public peekBufferedOrThrow(): T | typeof AsyncReaderEndOfStream {2604if (this._buffer.length === 0) {2605if (this._atEnd) {2606return AsyncReaderEndOfStream;2607}2608throw new BugIndicatingError('No buffered elements');2609}26102611return this._buffer[0];2612}26132614public async peekTimeout(timeoutMs: number): Promise<T | typeof AsyncReaderEndOfStream | undefined> {2615if (this._buffer.length === 0 && !this._atEnd) {2616await raceTimeout(this._extendBuffer(), timeoutMs);2617}2618if (this._atEnd) {2619return AsyncReaderEndOfStream;2620}2621if (this._buffer.length === 0) {2622return undefined;2623}2624return this._buffer[0];2625}26262627private _extendBuffer(): Promise<void> {2628if (this._atEnd) {2629return Promise.resolve();2630}26312632if (!this._extendBufferPromise) {2633this._extendBufferPromise = (async () => {2634const { value, done } = await this._source.next();2635this._extendBufferPromise = undefined;2636if (done) {2637this._atEnd = true;2638} else {2639this._buffer.push(value);2640}2641})();2642}26432644return this._extendBufferPromise;2645}2646}264726482649