Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/base/common/async.ts
5240 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { CancellationToken, CancellationTokenSource } from './cancellation.js';
7
import { BugIndicatingError, CancellationError } from './errors.js';
8
import { Emitter, Event } from './event.js';
9
import { Disposable, DisposableMap, DisposableStore, IDisposable, isDisposable, MutableDisposable, toDisposable } from './lifecycle.js';
10
import { extUri as defaultExtUri, IExtUri } from './resources.js';
11
import { URI } from './uri.js';
12
import { setTimeout0 } from './platform.js';
13
import { MicrotaskDelay } from './symbols.js';
14
import { Lazy } from './lazy.js';
15
16
export function isThenable<T>(obj: unknown): obj is Promise<T> {
17
return !!obj && typeof (obj as unknown as Promise<T>).then === 'function';
18
}
19
20
export interface CancelablePromise<T> extends Promise<T> {
21
cancel(): void;
22
}
23
24
/**
25
* Returns a promise that can be cancelled using the provided cancellation token.
26
*
27
* @remarks When cancellation is requested, the promise will be rejected with a {@link CancellationError}.
28
* If the promise resolves to a disposable object, it will be automatically disposed when cancellation
29
* is requested.
30
*
31
* @param callback A function that accepts a cancellation token and returns a promise
32
* @returns A promise that can be cancelled
33
*/
34
export function createCancelablePromise<T>(callback: (token: CancellationToken) => Promise<T>): CancelablePromise<T> {
35
const source = new CancellationTokenSource();
36
37
const thenable = callback(source.token);
38
39
let isCancelled = false;
40
41
const promise = new Promise<T>((resolve, reject) => {
42
const subscription = source.token.onCancellationRequested(() => {
43
isCancelled = true;
44
subscription.dispose();
45
reject(new CancellationError());
46
});
47
Promise.resolve(thenable).then(value => {
48
subscription.dispose();
49
source.dispose();
50
51
if (!isCancelled) {
52
resolve(value);
53
54
} else if (isDisposable(value)) {
55
// promise has been cancelled, result is disposable and will
56
// be cleaned up
57
value.dispose();
58
}
59
}, err => {
60
subscription.dispose();
61
source.dispose();
62
reject(err);
63
});
64
});
65
66
return <CancelablePromise<T>>new class {
67
cancel() {
68
source.cancel();
69
source.dispose();
70
}
71
then<TResult1 = T, TResult2 = never>(resolve?: ((value: T) => TResult1 | Promise<TResult1>) | undefined | null, reject?: ((reason: unknown) => TResult2 | Promise<TResult2>) | undefined | null): Promise<TResult1 | TResult2> {
72
return promise.then(resolve, reject);
73
}
74
catch<TResult = never>(reject?: ((reason: unknown) => TResult | Promise<TResult>) | undefined | null): Promise<T | TResult> {
75
return this.then(undefined, reject);
76
}
77
finally(onfinally?: (() => void) | undefined | null): Promise<T> {
78
return promise.finally(onfinally);
79
}
80
};
81
}
82
83
/**
84
* Returns a promise that resolves with `undefined` as soon as the passed token is cancelled.
85
* @see {@link raceCancellationError}
86
*/
87
export function raceCancellation<T>(promise: Promise<T>, token: CancellationToken): Promise<T | undefined>;
88
89
/**
90
* Returns a promise that resolves with `defaultValue` as soon as the passed token is cancelled.
91
* @see {@link raceCancellationError}
92
*/
93
export function raceCancellation<T>(promise: Promise<T>, token: CancellationToken, defaultValue: T): Promise<T>;
94
95
export function raceCancellation<T>(promise: Promise<T>, token: CancellationToken, defaultValue?: T): Promise<T | undefined> {
96
return new Promise((resolve, reject) => {
97
const ref = token.onCancellationRequested(() => {
98
ref.dispose();
99
resolve(defaultValue);
100
});
101
promise.then(resolve, reject).finally(() => ref.dispose());
102
});
103
}
104
105
/**
106
* Returns a promise that rejects with an {@CancellationError} as soon as the passed token is cancelled.
107
* @see {@link raceCancellation}
108
*/
109
export function raceCancellationError<T>(promise: Promise<T>, token: CancellationToken): Promise<T> {
110
return new Promise((resolve, reject) => {
111
const ref = token.onCancellationRequested(() => {
112
ref.dispose();
113
reject(new CancellationError());
114
});
115
promise.then(resolve, reject).finally(() => ref.dispose());
116
});
117
}
118
119
/**
120
* Wraps a cancellable promise such that it is no cancellable. Can be used to
121
* avoid issues with shared promises that would normally be returned as
122
* cancellable to consumers.
123
*/
124
export function notCancellablePromise<T>(promise: CancelablePromise<T>): Promise<T> {
125
return new Promise<T>((resolve, reject) => {
126
promise.then(resolve, reject);
127
});
128
}
129
130
/**
131
* Returns as soon as one of the promises resolves or rejects and cancels remaining promises
132
*/
133
export function raceCancellablePromises<T>(cancellablePromises: (CancelablePromise<T> | Promise<T>)[]): CancelablePromise<T> {
134
let resolvedPromiseIndex = -1;
135
const promises = cancellablePromises.map((promise, index) => promise.then(result => { resolvedPromiseIndex = index; return result; }));
136
const promise = Promise.race(promises) as CancelablePromise<T>;
137
promise.cancel = () => {
138
cancellablePromises.forEach((cancellablePromise, index) => {
139
if (index !== resolvedPromiseIndex && (cancellablePromise as CancelablePromise<T>).cancel) {
140
(cancellablePromise as CancelablePromise<T>).cancel();
141
}
142
});
143
};
144
promise.finally(() => {
145
promise.cancel();
146
});
147
return promise;
148
}
149
150
export function raceTimeout<T>(promise: Promise<T>, timeout: number, onTimeout?: () => void): Promise<T | undefined> {
151
let promiseResolve: ((value: T | undefined) => void) | undefined = undefined;
152
153
const timer = setTimeout(() => {
154
promiseResolve?.(undefined);
155
onTimeout?.();
156
}, timeout);
157
158
return Promise.race([
159
promise.finally(() => clearTimeout(timer)),
160
new Promise<T | undefined>(resolve => promiseResolve = resolve)
161
]);
162
}
163
164
export function asPromise<T>(callback: () => T | Thenable<T>): Promise<T> {
165
return new Promise<T>((resolve, reject) => {
166
const item = callback();
167
if (isThenable<T>(item)) {
168
item.then(resolve, reject);
169
} else {
170
resolve(item);
171
}
172
});
173
}
174
175
/**
176
* Creates and returns a new promise, plus its `resolve` and `reject` callbacks.
177
*
178
* Replace with standardized [`Promise.withResolvers`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/withResolvers) once it is supported
179
*/
180
export function promiseWithResolvers<T>(): { promise: Promise<T>; resolve: (value: T | PromiseLike<T>) => void; reject: (err?: any) => void } {
181
let resolve: (value: T | PromiseLike<T>) => void;
182
let reject: (reason?: any) => void;
183
const promise = new Promise<T>((res, rej) => {
184
resolve = res;
185
reject = rej;
186
});
187
return { promise, resolve: resolve!, reject: reject! };
188
}
189
190
export interface ITask<T> {
191
(): T;
192
}
193
194
export interface ICancellableTask<T> {
195
(token: CancellationToken): T;
196
}
197
198
/**
199
* A helper to prevent accumulation of sequential async tasks.
200
*
201
* Imagine a mail man with the sole task of delivering letters. As soon as
202
* a letter submitted for delivery, he drives to the destination, delivers it
203
* and returns to his base. Imagine that during the trip, N more letters were submitted.
204
* When the mail man returns, he picks those N letters and delivers them all in a
205
* single trip. Even though N+1 submissions occurred, only 2 deliveries were made.
206
*
207
* The throttler implements this via the queue() method, by providing it a task
208
* factory. Following the example:
209
*
210
* const throttler = new Throttler();
211
* const letters = [];
212
*
213
* function deliver() {
214
* const lettersToDeliver = letters;
215
* letters = [];
216
* return makeTheTrip(lettersToDeliver);
217
* }
218
*
219
* function onLetterReceived(l) {
220
* letters.push(l);
221
* throttler.queue(deliver);
222
* }
223
*/
224
export class Throttler implements IDisposable {
225
226
private activePromise: Promise<any> | null;
227
private queuedPromise: Promise<any> | null;
228
private queuedPromiseFactory: ICancellableTask<Promise<any>> | null;
229
private cancellationTokenSource: CancellationTokenSource;
230
231
constructor() {
232
this.activePromise = null;
233
this.queuedPromise = null;
234
this.queuedPromiseFactory = null;
235
236
this.cancellationTokenSource = new CancellationTokenSource();
237
}
238
239
queue<T>(promiseFactory: ICancellableTask<Promise<T>>): Promise<T> {
240
if (this.cancellationTokenSource.token.isCancellationRequested) {
241
return Promise.reject(new Error('Throttler is disposed'));
242
}
243
244
if (this.activePromise) {
245
this.queuedPromiseFactory = promiseFactory;
246
247
if (!this.queuedPromise) {
248
const onComplete = () => {
249
this.queuedPromise = null;
250
251
if (this.cancellationTokenSource.token.isCancellationRequested) {
252
return;
253
}
254
255
const result = this.queue(this.queuedPromiseFactory!);
256
this.queuedPromiseFactory = null;
257
258
return result;
259
};
260
261
this.queuedPromise = new Promise(resolve => {
262
this.activePromise!.then(onComplete, onComplete).then(resolve);
263
});
264
}
265
266
return new Promise((resolve, reject) => {
267
this.queuedPromise!.then(resolve, reject);
268
});
269
}
270
271
this.activePromise = promiseFactory(this.cancellationTokenSource.token);
272
273
return new Promise((resolve, reject) => {
274
this.activePromise!.then((result: T) => {
275
this.activePromise = null;
276
resolve(result);
277
}, (err: unknown) => {
278
this.activePromise = null;
279
reject(err);
280
});
281
});
282
}
283
284
dispose(): void {
285
this.cancellationTokenSource.cancel();
286
}
287
}
288
289
export class Sequencer {
290
291
private current: Promise<unknown> = Promise.resolve(null);
292
293
queue<T>(promiseTask: ITask<Promise<T>>): Promise<T> {
294
return this.current = this.current.then(() => promiseTask(), () => promiseTask());
295
}
296
}
297
298
export class SequencerByKey<TKey> {
299
300
private promiseMap = new Map<TKey, Promise<unknown>>();
301
302
queue<T>(key: TKey, promiseTask: ITask<Promise<T>>): Promise<T> {
303
const runningPromise = this.promiseMap.get(key) ?? Promise.resolve();
304
const newPromise = runningPromise
305
.catch(() => { })
306
.then(promiseTask)
307
.finally(() => {
308
if (this.promiseMap.get(key) === newPromise) {
309
this.promiseMap.delete(key);
310
}
311
});
312
this.promiseMap.set(key, newPromise);
313
return newPromise;
314
}
315
316
peek(key: TKey): Promise<unknown> | undefined {
317
return this.promiseMap.get(key) || undefined;
318
}
319
320
keys(): IterableIterator<TKey> {
321
return this.promiseMap.keys();
322
}
323
}
324
325
interface IScheduledLater extends IDisposable {
326
isTriggered(): boolean;
327
}
328
329
const timeoutDeferred = (timeout: number, fn: () => void): IScheduledLater => {
330
let scheduled = true;
331
const handle = setTimeout(() => {
332
scheduled = false;
333
fn();
334
}, timeout);
335
return {
336
isTriggered: () => scheduled,
337
dispose: () => {
338
clearTimeout(handle);
339
scheduled = false;
340
},
341
};
342
};
343
344
const microtaskDeferred = (fn: () => void): IScheduledLater => {
345
let scheduled = true;
346
queueMicrotask(() => {
347
if (scheduled) {
348
scheduled = false;
349
fn();
350
}
351
});
352
353
return {
354
isTriggered: () => scheduled,
355
dispose: () => { scheduled = false; },
356
};
357
};
358
359
/**
360
* A helper to delay (debounce) execution of a task that is being requested often.
361
*
362
* Following the throttler, now imagine the mail man wants to optimize the number of
363
* trips proactively. The trip itself can be long, so he decides not to make the trip
364
* as soon as a letter is submitted. Instead he waits a while, in case more
365
* letters are submitted. After said waiting period, if no letters were submitted, he
366
* decides to make the trip. Imagine that N more letters were submitted after the first
367
* one, all within a short period of time between each other. Even though N+1
368
* submissions occurred, only 1 delivery was made.
369
*
370
* The delayer offers this behavior via the trigger() method, into which both the task
371
* to be executed and the waiting period (delay) must be passed in as arguments. Following
372
* the example:
373
*
374
* const delayer = new Delayer(WAITING_PERIOD);
375
* const letters = [];
376
*
377
* function letterReceived(l) {
378
* letters.push(l);
379
* delayer.trigger(() => { return makeTheTrip(); });
380
* }
381
*/
382
export class Delayer<T> implements IDisposable {
383
384
private deferred: IScheduledLater | null;
385
private completionPromise: Promise<any> | null;
386
private doResolve: ((value?: any | Promise<any>) => void) | null;
387
private doReject: ((err: unknown) => void) | null;
388
private task: ITask<T | Promise<T>> | null;
389
390
constructor(public defaultDelay: number | typeof MicrotaskDelay) {
391
this.deferred = null;
392
this.completionPromise = null;
393
this.doResolve = null;
394
this.doReject = null;
395
this.task = null;
396
}
397
398
trigger(task: ITask<T | Promise<T>>, delay = this.defaultDelay): Promise<T> {
399
this.task = task;
400
this.cancelTimeout();
401
402
if (!this.completionPromise) {
403
this.completionPromise = new Promise((resolve, reject) => {
404
this.doResolve = resolve;
405
this.doReject = reject;
406
}).then(() => {
407
this.completionPromise = null;
408
this.doResolve = null;
409
if (this.task) {
410
const task = this.task;
411
this.task = null;
412
return task();
413
}
414
return undefined;
415
});
416
}
417
418
const fn = () => {
419
this.deferred = null;
420
this.doResolve?.(null);
421
};
422
423
this.deferred = delay === MicrotaskDelay ? microtaskDeferred(fn) : timeoutDeferred(delay, fn);
424
425
return this.completionPromise;
426
}
427
428
isTriggered(): boolean {
429
return !!this.deferred?.isTriggered();
430
}
431
432
cancel(): void {
433
this.cancelTimeout();
434
435
if (this.completionPromise) {
436
this.doReject?.(new CancellationError());
437
this.completionPromise = null;
438
}
439
}
440
441
private cancelTimeout(): void {
442
this.deferred?.dispose();
443
this.deferred = null;
444
}
445
446
dispose(): void {
447
this.cancel();
448
}
449
}
450
451
/**
452
* A helper to delay execution of a task that is being requested often, while
453
* preventing accumulation of consecutive executions, while the task runs.
454
*
455
* The mail man is clever and waits for a certain amount of time, before going
456
* out to deliver letters. While the mail man is going out, more letters arrive
457
* and can only be delivered once he is back. Once he is back the mail man will
458
* do one more trip to deliver the letters that have accumulated while he was out.
459
*/
460
export class ThrottledDelayer<T> {
461
462
private delayer: Delayer<Promise<T>>;
463
private throttler: Throttler;
464
465
constructor(defaultDelay: number) {
466
this.delayer = new Delayer(defaultDelay);
467
this.throttler = new Throttler();
468
}
469
470
trigger(promiseFactory: ICancellableTask<Promise<T>>, delay?: number): Promise<T> {
471
return this.delayer.trigger(() => this.throttler.queue(promiseFactory), delay) as unknown as Promise<T>;
472
}
473
474
isTriggered(): boolean {
475
return this.delayer.isTriggered();
476
}
477
478
cancel(): void {
479
this.delayer.cancel();
480
}
481
482
dispose(): void {
483
this.delayer.dispose();
484
this.throttler.dispose();
485
}
486
}
487
488
/**
489
* A barrier that is initially closed and then becomes opened permanently.
490
*/
491
export class Barrier {
492
private _isOpen: boolean;
493
private _promise: Promise<boolean>;
494
private _completePromise!: (v: boolean) => void;
495
496
constructor() {
497
this._isOpen = false;
498
this._promise = new Promise<boolean>((c, e) => {
499
this._completePromise = c;
500
});
501
}
502
503
isOpen(): boolean {
504
return this._isOpen;
505
}
506
507
open(): void {
508
this._isOpen = true;
509
this._completePromise(true);
510
}
511
512
wait(): Promise<boolean> {
513
return this._promise;
514
}
515
}
516
517
/**
518
* A barrier that is initially closed and then becomes opened permanently after a certain period of
519
* time or when open is called explicitly
520
*/
521
export class AutoOpenBarrier extends Barrier {
522
523
private readonly _timeout: Timeout;
524
525
constructor(autoOpenTimeMs: number) {
526
super();
527
this._timeout = setTimeout(() => this.open(), autoOpenTimeMs);
528
}
529
530
override open(): void {
531
clearTimeout(this._timeout);
532
super.open();
533
}
534
}
535
536
export function timeout(millis: number): CancelablePromise<void>;
537
export function timeout(millis: number, token: CancellationToken): Promise<void>;
538
export function timeout(millis: number, token?: CancellationToken): CancelablePromise<void> | Promise<void> {
539
if (!token) {
540
return createCancelablePromise(token => timeout(millis, token));
541
}
542
543
return new Promise((resolve, reject) => {
544
const handle = setTimeout(() => {
545
disposable.dispose();
546
resolve();
547
}, millis);
548
const disposable = token.onCancellationRequested(() => {
549
clearTimeout(handle);
550
disposable.dispose();
551
reject(new CancellationError());
552
});
553
});
554
}
555
556
/**
557
* Creates a timeout that can be disposed using its returned value.
558
* @param handler The timeout handler.
559
* @param timeout An optional timeout in milliseconds.
560
* @param store An optional {@link DisposableStore} that will have the timeout disposable managed automatically.
561
*
562
* @example
563
* const store = new DisposableStore;
564
* // Call the timeout after 1000ms at which point it will be automatically
565
* // evicted from the store.
566
* const timeoutDisposable = disposableTimeout(() => {}, 1000, store);
567
*
568
* if (foo) {
569
* // Cancel the timeout and evict it from store.
570
* timeoutDisposable.dispose();
571
* }
572
*/
573
export function disposableTimeout(handler: () => void, timeout = 0, store?: DisposableStore): IDisposable {
574
const timer = setTimeout(() => {
575
handler();
576
if (store) {
577
disposable.dispose();
578
}
579
}, timeout);
580
const disposable = toDisposable(() => {
581
clearTimeout(timer);
582
store?.delete(disposable);
583
});
584
store?.add(disposable);
585
return disposable;
586
}
587
588
/**
589
* Runs the provided list of promise factories in sequential order. The returned
590
* promise will complete to an array of results from each promise.
591
*/
592
593
export function sequence<T>(promiseFactories: ITask<Promise<T>>[]): Promise<T[]> {
594
const results: T[] = [];
595
let index = 0;
596
const len = promiseFactories.length;
597
598
function next(): Promise<T> | null {
599
return index < len ? promiseFactories[index++]() : null;
600
}
601
602
function thenHandler(result: unknown): Promise<any> {
603
if (result !== undefined && result !== null) {
604
results.push(result as T);
605
}
606
607
const n = next();
608
if (n) {
609
return n.then(thenHandler);
610
}
611
612
return Promise.resolve(results);
613
}
614
615
return Promise.resolve(null).then(thenHandler);
616
}
617
618
export function first<T>(promiseFactories: ITask<Promise<T>>[], shouldStop: (t: T) => boolean = t => !!t, defaultValue: T | null = null): Promise<T | null> {
619
let index = 0;
620
const len = promiseFactories.length;
621
622
const loop: () => Promise<T | null> = () => {
623
if (index >= len) {
624
return Promise.resolve(defaultValue);
625
}
626
627
const factory = promiseFactories[index++];
628
const promise = Promise.resolve(factory());
629
630
return promise.then(result => {
631
if (shouldStop(result)) {
632
return Promise.resolve(result);
633
}
634
635
return loop();
636
});
637
};
638
639
return loop();
640
}
641
642
/**
643
* Returns the result of the first promise that matches the "shouldStop",
644
* running all promises in parallel. Supports cancelable promises.
645
*/
646
export function firstParallel<T>(promiseList: Promise<T>[], shouldStop?: (t: T) => boolean, defaultValue?: T | null): Promise<T | null>;
647
export function firstParallel<T, R extends T>(promiseList: Promise<T>[], shouldStop: (t: T) => t is R, defaultValue?: R | null): Promise<R | null>;
648
export function firstParallel<T>(promiseList: Promise<T>[], shouldStop: (t: T) => boolean = t => !!t, defaultValue: T | null = null) {
649
if (promiseList.length === 0) {
650
return Promise.resolve(defaultValue);
651
}
652
653
let todo = promiseList.length;
654
const finish = () => {
655
todo = -1;
656
for (const promise of promiseList) {
657
(promise as Partial<CancelablePromise<T>>).cancel?.();
658
}
659
};
660
661
return new Promise<T | null>((resolve, reject) => {
662
for (const promise of promiseList) {
663
promise.then(result => {
664
if (--todo >= 0 && shouldStop(result)) {
665
finish();
666
resolve(result);
667
} else if (todo === 0) {
668
resolve(defaultValue);
669
}
670
})
671
.catch(err => {
672
if (--todo >= 0) {
673
finish();
674
reject(err);
675
}
676
});
677
}
678
});
679
}
680
681
interface ILimitedTaskFactory<T> {
682
factory: ITask<Promise<T>>;
683
c: (value: T | Promise<T>) => void;
684
e: (error?: unknown) => void;
685
}
686
687
export interface ILimiter<T> {
688
689
readonly size: number;
690
691
queue(factory: ITask<Promise<T>>): Promise<T>;
692
693
clear(): void;
694
}
695
696
/**
697
* A helper to queue N promises and run them all with a max degree of parallelism. The helper
698
* ensures that at any time no more than M promises are running at the same time.
699
*/
700
export class Limiter<T> implements ILimiter<T> {
701
702
private _size = 0;
703
private _isDisposed = false;
704
private runningPromises: number;
705
private readonly maxDegreeOfParalellism: number;
706
private readonly outstandingPromises: ILimitedTaskFactory<T>[];
707
private readonly _onDrained: Emitter<void>;
708
709
constructor(maxDegreeOfParalellism: number) {
710
this.maxDegreeOfParalellism = maxDegreeOfParalellism;
711
this.outstandingPromises = [];
712
this.runningPromises = 0;
713
this._onDrained = new Emitter<void>();
714
}
715
716
/**
717
*
718
* @returns A promise that resolved when all work is done (onDrained) or when
719
* there is nothing to do
720
*/
721
whenIdle(): Promise<void> {
722
return this.size > 0
723
? Event.toPromise(this.onDrained)
724
: Promise.resolve();
725
}
726
727
get onDrained(): Event<void> {
728
return this._onDrained.event;
729
}
730
731
get size(): number {
732
return this._size;
733
}
734
735
queue(factory: ITask<Promise<T>>): Promise<T> {
736
if (this._isDisposed) {
737
throw new Error('Object has been disposed');
738
}
739
this._size++;
740
741
return new Promise<T>((c, e) => {
742
this.outstandingPromises.push({ factory, c, e });
743
this.consume();
744
});
745
}
746
747
private consume(): void {
748
while (this.outstandingPromises.length && this.runningPromises < this.maxDegreeOfParalellism) {
749
const iLimitedTask = this.outstandingPromises.shift()!;
750
this.runningPromises++;
751
752
const promise = iLimitedTask.factory();
753
promise.then(iLimitedTask.c, iLimitedTask.e);
754
promise.then(() => this.consumed(), () => this.consumed());
755
}
756
}
757
758
private consumed(): void {
759
if (this._isDisposed) {
760
return;
761
}
762
this.runningPromises--;
763
if (--this._size === 0) {
764
this._onDrained.fire();
765
}
766
767
if (this.outstandingPromises.length > 0) {
768
this.consume();
769
}
770
}
771
772
clear(): void {
773
if (this._isDisposed) {
774
throw new Error('Object has been disposed');
775
}
776
this.outstandingPromises.length = 0;
777
this._size = this.runningPromises;
778
}
779
780
dispose(): void {
781
this._isDisposed = true;
782
this.outstandingPromises.length = 0; // stop further processing
783
this._size = 0;
784
this._onDrained.dispose();
785
}
786
}
787
788
/**
789
* A queue is handles one promise at a time and guarantees that at any time only one promise is executing.
790
*/
791
export class Queue<T> extends Limiter<T> {
792
793
constructor() {
794
super(1);
795
}
796
}
797
798
/**
799
* Same as `Queue`, ensures that only 1 task is executed at the same time. The difference to `Queue` is that
800
* there is only 1 task about to be scheduled next. As such, calling `queue` while a task is executing will
801
* replace the currently queued task until it executes.
802
*
803
* As such, the returned promise may not be from the factory that is passed in but from the next factory that
804
* is running after having called `queue`.
805
*/
806
export class LimitedQueue {
807
808
private readonly sequentializer = new TaskSequentializer();
809
810
private tasks = 0;
811
812
queue(factory: ITask<Promise<void>>): Promise<void> {
813
if (!this.sequentializer.isRunning()) {
814
return this.sequentializer.run(this.tasks++, factory());
815
}
816
817
return this.sequentializer.queue(() => {
818
return this.sequentializer.run(this.tasks++, factory());
819
});
820
}
821
}
822
823
/**
824
* A helper to organize queues per resource. The ResourceQueue makes sure to manage queues per resource
825
* by disposing them once the queue is empty.
826
*/
827
export class ResourceQueue implements IDisposable {
828
829
private readonly queues = new Map<string, Queue<void>>();
830
831
private readonly drainers = new Set<DeferredPromise<void>>();
832
833
private drainListeners: DisposableMap<number> | undefined = undefined;
834
private drainListenerCount = 0;
835
836
async whenDrained(): Promise<void> {
837
if (this.isDrained()) {
838
return;
839
}
840
841
const promise = new DeferredPromise<void>();
842
this.drainers.add(promise);
843
844
return promise.p;
845
}
846
847
private isDrained(): boolean {
848
for (const [, queue] of this.queues) {
849
if (queue.size > 0) {
850
return false;
851
}
852
}
853
854
return true;
855
}
856
857
queueSize(resource: URI, extUri: IExtUri = defaultExtUri): number {
858
const key = extUri.getComparisonKey(resource);
859
860
return this.queues.get(key)?.size ?? 0;
861
}
862
863
queueFor(resource: URI, factory: ITask<Promise<void>>, extUri: IExtUri = defaultExtUri): Promise<void> {
864
const key = extUri.getComparisonKey(resource);
865
866
let queue = this.queues.get(key);
867
if (!queue) {
868
queue = new Queue<void>();
869
const drainListenerId = this.drainListenerCount++;
870
const drainListener = Event.once(queue.onDrained)(() => {
871
queue?.dispose();
872
this.queues.delete(key);
873
this.onDidQueueDrain();
874
875
this.drainListeners?.deleteAndDispose(drainListenerId);
876
877
if (this.drainListeners?.size === 0) {
878
this.drainListeners.dispose();
879
this.drainListeners = undefined;
880
}
881
});
882
883
if (!this.drainListeners) {
884
this.drainListeners = new DisposableMap();
885
}
886
this.drainListeners.set(drainListenerId, drainListener);
887
888
this.queues.set(key, queue);
889
}
890
891
return queue.queue(factory);
892
}
893
894
private onDidQueueDrain(): void {
895
if (!this.isDrained()) {
896
return; // not done yet
897
}
898
899
this.releaseDrainers();
900
}
901
902
private releaseDrainers(): void {
903
for (const drainer of this.drainers) {
904
drainer.complete();
905
}
906
907
this.drainers.clear();
908
}
909
910
dispose(): void {
911
for (const [, queue] of this.queues) {
912
queue.dispose();
913
}
914
915
this.queues.clear();
916
917
// Even though we might still have pending
918
// tasks queued, after the queues have been
919
// disposed, we can no longer track them, so
920
// we release drainers to prevent hanging
921
// promises when the resource queue is being
922
// disposed.
923
this.releaseDrainers();
924
925
this.drainListeners?.dispose();
926
}
927
}
928
929
export type Task<T = void> = () => (Promise<T> | T);
930
931
/**
932
* Wrap a type in an optional promise. This can be useful to avoid the runtime
933
* overhead of creating a promise.
934
*/
935
export type MaybePromise<T> = Promise<T> | T;
936
937
/**
938
* Processes tasks in the order they were scheduled.
939
*/
940
export class TaskQueue {
941
private _runningTask: Task<any> | undefined = undefined;
942
private _pendingTasks: { task: Task<any>; deferred: DeferredPromise<any>; setUndefinedWhenCleared: boolean }[] = [];
943
944
/**
945
* Waits for the current and pending tasks to finish, then runs and awaits the given task.
946
* If the task is skipped because of clearPending, the promise is rejected with a CancellationError.
947
*/
948
public schedule<T>(task: Task<T>): Promise<T> {
949
const deferred = new DeferredPromise<T>();
950
this._pendingTasks.push({ task, deferred, setUndefinedWhenCleared: false });
951
this._runIfNotRunning();
952
return deferred.p;
953
}
954
955
/**
956
* Waits for the current and pending tasks to finish, then runs and awaits the given task.
957
* If the task is skipped because of clearPending, the promise is resolved with undefined.
958
*/
959
public scheduleSkipIfCleared<T>(task: Task<T>): Promise<T | undefined> {
960
const deferred = new DeferredPromise<T>();
961
this._pendingTasks.push({ task, deferred, setUndefinedWhenCleared: true });
962
this._runIfNotRunning();
963
return deferred.p;
964
}
965
966
private _runIfNotRunning(): void {
967
if (this._runningTask === undefined) {
968
this._processQueue();
969
}
970
}
971
972
private async _processQueue(): Promise<void> {
973
if (this._pendingTasks.length === 0) {
974
return;
975
}
976
977
const next = this._pendingTasks.shift();
978
if (!next) {
979
return;
980
}
981
982
if (this._runningTask) {
983
throw new BugIndicatingError();
984
}
985
986
this._runningTask = next.task;
987
988
try {
989
const result = await next.task();
990
next.deferred.complete(result);
991
} catch (e) {
992
next.deferred.error(e);
993
} finally {
994
this._runningTask = undefined;
995
this._processQueue();
996
}
997
}
998
999
/**
1000
* Clears all pending tasks. Does not cancel the currently running task.
1001
*/
1002
public clearPending(): void {
1003
const tasks = this._pendingTasks;
1004
this._pendingTasks = [];
1005
for (const task of tasks) {
1006
if (task.setUndefinedWhenCleared) {
1007
task.deferred.complete(undefined);
1008
} else {
1009
task.deferred.error(new CancellationError());
1010
}
1011
}
1012
}
1013
}
1014
1015
export class TimeoutTimer implements IDisposable {
1016
private _token: Timeout | undefined;
1017
private _isDisposed = false;
1018
1019
constructor();
1020
constructor(runner: () => void, timeout: number);
1021
constructor(runner?: () => void, timeout?: number) {
1022
this._token = undefined;
1023
1024
if (typeof runner === 'function' && typeof timeout === 'number') {
1025
this.setIfNotSet(runner, timeout);
1026
}
1027
}
1028
1029
dispose(): void {
1030
this.cancel();
1031
this._isDisposed = true;
1032
}
1033
1034
cancel(): void {
1035
if (this._token !== undefined) {
1036
clearTimeout(this._token);
1037
this._token = undefined;
1038
}
1039
}
1040
1041
cancelAndSet(runner: () => void, timeout: number): void {
1042
if (this._isDisposed) {
1043
throw new BugIndicatingError(`Calling 'cancelAndSet' on a disposed TimeoutTimer`);
1044
}
1045
1046
this.cancel();
1047
this._token = setTimeout(() => {
1048
this._token = undefined;
1049
runner();
1050
}, timeout);
1051
}
1052
1053
setIfNotSet(runner: () => void, timeout: number): void {
1054
if (this._isDisposed) {
1055
throw new BugIndicatingError(`Calling 'setIfNotSet' on a disposed TimeoutTimer`);
1056
}
1057
1058
if (this._token !== undefined) {
1059
// timer is already set
1060
return;
1061
}
1062
this._token = setTimeout(() => {
1063
this._token = undefined;
1064
runner();
1065
}, timeout);
1066
}
1067
}
1068
1069
export class IntervalTimer implements IDisposable {
1070
1071
private disposable: IDisposable | undefined = undefined;
1072
private isDisposed = false;
1073
1074
cancel(): void {
1075
this.disposable?.dispose();
1076
this.disposable = undefined;
1077
}
1078
1079
cancelAndSet(runner: () => void, interval: number, context = globalThis): void {
1080
if (this.isDisposed) {
1081
throw new BugIndicatingError(`Calling 'cancelAndSet' on a disposed IntervalTimer`);
1082
}
1083
1084
this.cancel();
1085
const handle = context.setInterval(() => {
1086
runner();
1087
}, interval);
1088
1089
this.disposable = toDisposable(() => {
1090
context.clearInterval(handle);
1091
this.disposable = undefined;
1092
});
1093
}
1094
1095
dispose(): void {
1096
this.cancel();
1097
this.isDisposed = true;
1098
}
1099
}
1100
1101
export class RunOnceScheduler implements IDisposable {
1102
1103
protected runner: ((...args: unknown[]) => void) | null;
1104
1105
private timeoutToken: Timeout | undefined;
1106
private timeout: number;
1107
private timeoutHandler: () => void;
1108
1109
constructor(runner: (...args: any[]) => void, delay: number) {
1110
this.timeoutToken = undefined;
1111
this.runner = runner;
1112
this.timeout = delay;
1113
this.timeoutHandler = this.onTimeout.bind(this);
1114
}
1115
1116
/**
1117
* Dispose RunOnceScheduler
1118
*/
1119
dispose(): void {
1120
this.cancel();
1121
this.runner = null;
1122
}
1123
1124
/**
1125
* Cancel current scheduled runner (if any).
1126
*/
1127
cancel(): void {
1128
if (this.isScheduled()) {
1129
clearTimeout(this.timeoutToken);
1130
this.timeoutToken = undefined;
1131
}
1132
}
1133
1134
/**
1135
* Cancel previous runner (if any) & schedule a new runner.
1136
*/
1137
schedule(delay = this.timeout): void {
1138
this.cancel();
1139
this.timeoutToken = setTimeout(this.timeoutHandler, delay);
1140
}
1141
1142
get delay(): number {
1143
return this.timeout;
1144
}
1145
1146
set delay(value: number) {
1147
this.timeout = value;
1148
}
1149
1150
/**
1151
* Returns true if scheduled.
1152
*/
1153
isScheduled(): boolean {
1154
return this.timeoutToken !== undefined;
1155
}
1156
1157
flush(): void {
1158
if (this.isScheduled()) {
1159
this.cancel();
1160
this.doRun();
1161
}
1162
}
1163
1164
private onTimeout() {
1165
this.timeoutToken = undefined;
1166
if (this.runner) {
1167
this.doRun();
1168
}
1169
}
1170
1171
protected doRun(): void {
1172
this.runner?.();
1173
}
1174
}
1175
1176
/**
1177
* Same as `RunOnceScheduler`, but doesn't count the time spent in sleep mode.
1178
* > **NOTE**: Only offers 1s resolution.
1179
*
1180
* When calling `setTimeout` with 3hrs, and putting the computer immediately to sleep
1181
* for 8hrs, `setTimeout` will fire **as soon as the computer wakes from sleep**. But
1182
* this scheduler will execute 3hrs **after waking the computer from sleep**.
1183
*/
1184
export class ProcessTimeRunOnceScheduler {
1185
1186
private runner: (() => void) | null;
1187
private timeout: number;
1188
1189
private counter: number;
1190
private intervalToken: Timeout | undefined;
1191
private intervalHandler: () => void;
1192
1193
constructor(runner: () => void, delay: number) {
1194
if (delay % 1000 !== 0) {
1195
console.warn(`ProcessTimeRunOnceScheduler resolution is 1s, ${delay}ms is not a multiple of 1000ms.`);
1196
}
1197
this.runner = runner;
1198
this.timeout = delay;
1199
this.counter = 0;
1200
this.intervalToken = undefined;
1201
this.intervalHandler = this.onInterval.bind(this);
1202
}
1203
1204
dispose(): void {
1205
this.cancel();
1206
this.runner = null;
1207
}
1208
1209
cancel(): void {
1210
if (this.isScheduled()) {
1211
clearInterval(this.intervalToken);
1212
this.intervalToken = undefined;
1213
}
1214
}
1215
1216
/**
1217
* Cancel previous runner (if any) & schedule a new runner.
1218
*/
1219
schedule(delay = this.timeout): void {
1220
if (delay % 1000 !== 0) {
1221
console.warn(`ProcessTimeRunOnceScheduler resolution is 1s, ${delay}ms is not a multiple of 1000ms.`);
1222
}
1223
this.cancel();
1224
this.counter = Math.ceil(delay / 1000);
1225
this.intervalToken = setInterval(this.intervalHandler, 1000);
1226
}
1227
1228
/**
1229
* Returns true if scheduled.
1230
*/
1231
isScheduled(): boolean {
1232
return this.intervalToken !== undefined;
1233
}
1234
1235
private onInterval() {
1236
this.counter--;
1237
if (this.counter > 0) {
1238
// still need to wait
1239
return;
1240
}
1241
1242
// time elapsed
1243
clearInterval(this.intervalToken);
1244
this.intervalToken = undefined;
1245
this.runner?.();
1246
}
1247
}
1248
1249
export class RunOnceWorker<T> extends RunOnceScheduler {
1250
1251
private units: T[] = [];
1252
1253
constructor(runner: (units: T[]) => void, timeout: number) {
1254
super(runner, timeout);
1255
}
1256
1257
work(unit: T): void {
1258
this.units.push(unit);
1259
1260
if (!this.isScheduled()) {
1261
this.schedule();
1262
}
1263
}
1264
1265
protected override doRun(): void {
1266
const units = this.units;
1267
this.units = [];
1268
1269
this.runner?.(units);
1270
}
1271
1272
override dispose(): void {
1273
this.units = [];
1274
1275
super.dispose();
1276
}
1277
}
1278
1279
export interface IThrottledWorkerOptions {
1280
1281
/**
1282
* maximum of units the worker will pass onto handler at once
1283
*/
1284
maxWorkChunkSize: number;
1285
1286
/**
1287
* maximum of units the worker will keep in memory for processing
1288
*/
1289
maxBufferedWork: number | undefined;
1290
1291
/**
1292
* delay before processing the next round of chunks when chunk size exceeds limits
1293
*/
1294
throttleDelay: number;
1295
1296
/**
1297
* When enabled will guarantee that two distinct calls to `work()` are not executed
1298
* without throttle delay between them.
1299
* Otherwise if the worker isn't currently throttling it will execute work immediately.
1300
*/
1301
waitThrottleDelayBetweenWorkUnits?: boolean;
1302
}
1303
1304
/**
1305
* The `ThrottledWorker` will accept units of work `T`
1306
* to handle. The contract is:
1307
* * there is a maximum of units the worker can handle at once (via `maxWorkChunkSize`)
1308
* * there is a maximum of units the worker will keep in memory for processing (via `maxBufferedWork`)
1309
* * after having handled `maxWorkChunkSize` units, the worker needs to rest (via `throttleDelay`)
1310
*/
1311
export class ThrottledWorker<T> extends Disposable {
1312
1313
private readonly pendingWork: T[] = [];
1314
1315
private readonly throttler = this._register(new MutableDisposable<RunOnceScheduler>());
1316
private disposed = false;
1317
private lastExecutionTime = 0;
1318
1319
constructor(
1320
private options: IThrottledWorkerOptions,
1321
private readonly handler: (units: T[]) => void
1322
) {
1323
super();
1324
}
1325
1326
/**
1327
* The number of work units that are pending to be processed.
1328
*/
1329
get pending(): number { return this.pendingWork.length; }
1330
1331
/**
1332
* Add units to be worked on. Use `pending` to figure out
1333
* how many units are not yet processed after this method
1334
* was called.
1335
*
1336
* @returns whether the work was accepted or not. If the
1337
* worker is disposed, it will not accept any more work.
1338
* If the number of pending units would become larger
1339
* than `maxPendingWork`, more work will also not be accepted.
1340
*/
1341
work(units: readonly T[]): boolean {
1342
if (this.disposed) {
1343
return false; // work not accepted: disposed
1344
}
1345
1346
// Check for reaching maximum of pending work
1347
if (typeof this.options.maxBufferedWork === 'number') {
1348
1349
// Throttled: simple check if pending + units exceeds max pending
1350
if (this.throttler.value) {
1351
if (this.pending + units.length > this.options.maxBufferedWork) {
1352
return false; // work not accepted: too much pending work
1353
}
1354
}
1355
1356
// Unthrottled: same as throttled, but account for max chunk getting
1357
// worked on directly without being pending
1358
else {
1359
if (this.pending + units.length - this.options.maxWorkChunkSize > this.options.maxBufferedWork) {
1360
return false; // work not accepted: too much pending work
1361
}
1362
}
1363
}
1364
1365
// Add to pending units first
1366
for (const unit of units) {
1367
this.pendingWork.push(unit);
1368
}
1369
1370
const timeSinceLastExecution = Date.now() - this.lastExecutionTime;
1371
1372
if (!this.throttler.value && (!this.options.waitThrottleDelayBetweenWorkUnits || timeSinceLastExecution >= this.options.throttleDelay)) {
1373
// Work directly if we are not throttling and we are not
1374
// enforced to throttle between `work()` calls.
1375
this.doWork();
1376
} else if (!this.throttler.value && this.options.waitThrottleDelayBetweenWorkUnits) {
1377
// Otherwise, schedule the throttler to work.
1378
this.scheduleThrottler(Math.max(this.options.throttleDelay - timeSinceLastExecution, 0));
1379
} else {
1380
// Otherwise, our work will be picked up by the running throttler
1381
}
1382
1383
return true; // work accepted
1384
}
1385
1386
private doWork(): void {
1387
this.lastExecutionTime = Date.now();
1388
1389
// Extract chunk to handle and handle it
1390
this.handler(this.pendingWork.splice(0, this.options.maxWorkChunkSize));
1391
1392
// If we have remaining work, schedule it after a delay
1393
if (this.pendingWork.length > 0) {
1394
this.scheduleThrottler();
1395
}
1396
}
1397
1398
private scheduleThrottler(delay = this.options.throttleDelay): void {
1399
this.throttler.value = new RunOnceScheduler(() => {
1400
this.throttler.clear();
1401
1402
this.doWork();
1403
}, delay);
1404
this.throttler.value.schedule();
1405
}
1406
1407
override dispose(): void {
1408
super.dispose();
1409
1410
this.pendingWork.length = 0;
1411
this.disposed = true;
1412
}
1413
}
1414
1415
//#region -- run on idle tricks ------------
1416
1417
export interface IdleDeadline {
1418
readonly didTimeout: boolean;
1419
timeRemaining(): number;
1420
}
1421
1422
type IdleApi = Pick<typeof globalThis, 'requestIdleCallback' | 'cancelIdleCallback'>;
1423
1424
1425
/**
1426
* Execute the callback the next time the browser is idle, returning an
1427
* {@link IDisposable} that will cancel the callback when disposed. This wraps
1428
* [requestIdleCallback] so it will fallback to [setTimeout] if the environment
1429
* doesn't support it.
1430
*
1431
* @param callback The callback to run when idle, this includes an
1432
* [IdleDeadline] that provides the time alloted for the idle callback by the
1433
* browser. Not respecting this deadline will result in a degraded user
1434
* experience.
1435
* @param timeout A timeout at which point to queue no longer wait for an idle
1436
* callback but queue it on the regular event loop (like setTimeout). Typically
1437
* this should not be used.
1438
*
1439
* [IdleDeadline]: https://developer.mozilla.org/en-US/docs/Web/API/IdleDeadline
1440
* [requestIdleCallback]: https://developer.mozilla.org/en-US/docs/Web/API/Window/requestIdleCallback
1441
* [setTimeout]: https://developer.mozilla.org/en-US/docs/Web/API/Window/setTimeout
1442
*
1443
* **Note** that there is `dom.ts#runWhenWindowIdle` which is better suited when running inside a browser
1444
* context
1445
*/
1446
export let runWhenGlobalIdle: (callback: (idle: IdleDeadline) => void, timeout?: number) => IDisposable;
1447
1448
export let _runWhenIdle: (targetWindow: IdleApi, callback: (idle: IdleDeadline) => void, timeout?: number) => IDisposable;
1449
1450
(function () {
1451
const safeGlobal: any = globalThis;
1452
if (typeof safeGlobal.requestIdleCallback !== 'function' || typeof safeGlobal.cancelIdleCallback !== 'function') {
1453
_runWhenIdle = (_targetWindow, runner, timeout?) => {
1454
setTimeout0(() => {
1455
if (disposed) {
1456
return;
1457
}
1458
const end = Date.now() + 15; // one frame at 64fps
1459
const deadline: IdleDeadline = {
1460
didTimeout: true,
1461
timeRemaining() {
1462
return Math.max(0, end - Date.now());
1463
}
1464
};
1465
runner(Object.freeze(deadline));
1466
});
1467
let disposed = false;
1468
return {
1469
dispose() {
1470
if (disposed) {
1471
return;
1472
}
1473
disposed = true;
1474
}
1475
};
1476
};
1477
} else {
1478
_runWhenIdle = (targetWindow: typeof safeGlobal, runner, timeout?) => {
1479
const handle: number = targetWindow.requestIdleCallback(runner, typeof timeout === 'number' ? { timeout } : undefined);
1480
let disposed = false;
1481
return {
1482
dispose() {
1483
if (disposed) {
1484
return;
1485
}
1486
disposed = true;
1487
targetWindow.cancelIdleCallback(handle);
1488
}
1489
};
1490
};
1491
}
1492
runWhenGlobalIdle = (runner, timeout) => _runWhenIdle(globalThis, runner, timeout);
1493
})();
1494
1495
export abstract class AbstractIdleValue<T> {
1496
1497
private readonly _executor: () => void;
1498
private readonly _handle: IDisposable;
1499
1500
private _didRun: boolean = false;
1501
private _value?: T;
1502
private _error: unknown;
1503
1504
constructor(targetWindow: IdleApi, executor: () => T) {
1505
this._executor = () => {
1506
try {
1507
this._value = executor();
1508
} catch (err) {
1509
this._error = err;
1510
} finally {
1511
this._didRun = true;
1512
}
1513
};
1514
this._handle = _runWhenIdle(targetWindow, () => this._executor());
1515
}
1516
1517
dispose(): void {
1518
this._handle.dispose();
1519
}
1520
1521
get value(): T {
1522
if (!this._didRun) {
1523
this._handle.dispose();
1524
this._executor();
1525
}
1526
if (this._error) {
1527
throw this._error;
1528
}
1529
return this._value!;
1530
}
1531
1532
get isInitialized(): boolean {
1533
return this._didRun;
1534
}
1535
}
1536
1537
/**
1538
* An `IdleValue` that always uses the current window (which might be throttled or inactive)
1539
*
1540
* **Note** that there is `dom.ts#WindowIdleValue` which is better suited when running inside a browser
1541
* context
1542
*/
1543
export class GlobalIdleValue<T> extends AbstractIdleValue<T> {
1544
1545
constructor(executor: () => T) {
1546
super(globalThis, executor);
1547
}
1548
}
1549
1550
//#endregion
1551
1552
export async function retry<T>(task: ITask<Promise<T>>, delay: number, retries: number): Promise<T> {
1553
let lastError: Error | undefined;
1554
1555
for (let i = 0; i < retries; i++) {
1556
try {
1557
return await task();
1558
} catch (error) {
1559
lastError = error;
1560
1561
await timeout(delay);
1562
}
1563
}
1564
1565
throw lastError;
1566
}
1567
1568
//#region Task Sequentializer
1569
1570
interface IRunningTask {
1571
readonly taskId: number;
1572
readonly cancel: () => void;
1573
readonly promise: Promise<void>;
1574
}
1575
1576
interface IQueuedTask {
1577
readonly promise: Promise<void>;
1578
readonly promiseResolve: () => void;
1579
readonly promiseReject: (error: Error) => void;
1580
run: ITask<Promise<void>>;
1581
}
1582
1583
export interface ITaskSequentializerWithRunningTask {
1584
readonly running: Promise<void>;
1585
}
1586
1587
export interface ITaskSequentializerWithQueuedTask {
1588
readonly queued: IQueuedTask;
1589
}
1590
1591
/**
1592
* @deprecated use `LimitedQueue` instead for an easier to use API
1593
*/
1594
export class TaskSequentializer {
1595
1596
private _running?: IRunningTask;
1597
private _queued?: IQueuedTask;
1598
1599
isRunning(taskId?: number): this is ITaskSequentializerWithRunningTask {
1600
if (typeof taskId === 'number') {
1601
return this._running?.taskId === taskId;
1602
}
1603
1604
return !!this._running;
1605
}
1606
1607
get running(): Promise<void> | undefined {
1608
return this._running?.promise;
1609
}
1610
1611
cancelRunning(): void {
1612
this._running?.cancel();
1613
}
1614
1615
run(taskId: number, promise: Promise<void>, onCancel?: () => void,): Promise<void> {
1616
this._running = { taskId, cancel: () => onCancel?.(), promise };
1617
1618
promise.then(() => this.doneRunning(taskId), () => this.doneRunning(taskId));
1619
1620
return promise;
1621
}
1622
1623
private doneRunning(taskId: number): void {
1624
if (this._running && taskId === this._running.taskId) {
1625
1626
// only set running to done if the promise finished that is associated with that taskId
1627
this._running = undefined;
1628
1629
// schedule the queued task now that we are free if we have any
1630
this.runQueued();
1631
}
1632
}
1633
1634
private runQueued(): void {
1635
if (this._queued) {
1636
const queued = this._queued;
1637
this._queued = undefined;
1638
1639
// Run queued task and complete on the associated promise
1640
queued.run().then(queued.promiseResolve, queued.promiseReject);
1641
}
1642
}
1643
1644
/**
1645
* Note: the promise to schedule as next run MUST itself call `run`.
1646
* Otherwise, this sequentializer will report `false` for `isRunning`
1647
* even when this task is running. Missing this detail means that
1648
* suddenly multiple tasks will run in parallel.
1649
*/
1650
queue(run: ITask<Promise<void>>): Promise<void> {
1651
1652
// this is our first queued task, so we create associated promise with it
1653
// so that we can return a promise that completes when the task has
1654
// completed.
1655
if (!this._queued) {
1656
const { promise, resolve: promiseResolve, reject: promiseReject } = promiseWithResolvers<void>();
1657
this._queued = {
1658
run,
1659
promise,
1660
promiseResolve,
1661
promiseReject
1662
};
1663
}
1664
1665
// we have a previous queued task, just overwrite it
1666
else {
1667
this._queued.run = run;
1668
}
1669
1670
return this._queued.promise;
1671
}
1672
1673
hasQueued(): this is ITaskSequentializerWithQueuedTask {
1674
return !!this._queued;
1675
}
1676
1677
async join(): Promise<void> {
1678
return this._queued?.promise ?? this._running?.promise;
1679
}
1680
}
1681
1682
//#endregion
1683
1684
//#region
1685
1686
/**
1687
* The `IntervalCounter` allows to count the number
1688
* of calls to `increment()` over a duration of
1689
* `interval`. This utility can be used to conditionally
1690
* throttle a frequent task when a certain threshold
1691
* is reached.
1692
*/
1693
export class IntervalCounter {
1694
1695
private lastIncrementTime = 0;
1696
1697
private value = 0;
1698
1699
constructor(private readonly interval: number, private readonly nowFn = () => Date.now()) { }
1700
1701
increment(): number {
1702
const now = this.nowFn();
1703
1704
// We are outside of the range of `interval` and as such
1705
// start counting from 0 and remember the time
1706
if (now - this.lastIncrementTime > this.interval) {
1707
this.lastIncrementTime = now;
1708
this.value = 0;
1709
}
1710
1711
this.value++;
1712
1713
return this.value;
1714
}
1715
}
1716
1717
//#endregion
1718
1719
//#region
1720
1721
export type ValueCallback<T = unknown> = (value: T | Promise<T>) => void;
1722
1723
const enum DeferredOutcome {
1724
Resolved,
1725
Rejected
1726
}
1727
1728
/**
1729
* Creates a promise whose resolution or rejection can be controlled imperatively.
1730
*/
1731
export class DeferredPromise<T> {
1732
1733
public static fromPromise<T>(promise: Promise<T>): DeferredPromise<T> {
1734
const deferred = new DeferredPromise<T>();
1735
deferred.settleWith(promise);
1736
return deferred;
1737
}
1738
1739
private completeCallback!: ValueCallback<T>;
1740
private errorCallback!: (err: unknown) => void;
1741
private outcome?: { outcome: DeferredOutcome.Rejected; value: unknown } | { outcome: DeferredOutcome.Resolved; value: T };
1742
1743
public get isRejected() {
1744
return this.outcome?.outcome === DeferredOutcome.Rejected;
1745
}
1746
1747
public get isResolved() {
1748
return this.outcome?.outcome === DeferredOutcome.Resolved;
1749
}
1750
1751
public get isSettled() {
1752
return !!this.outcome;
1753
}
1754
1755
public get value() {
1756
return this.outcome?.outcome === DeferredOutcome.Resolved ? this.outcome?.value : undefined;
1757
}
1758
1759
public readonly p: Promise<T>;
1760
1761
constructor() {
1762
this.p = new Promise<T>((c, e) => {
1763
this.completeCallback = c;
1764
this.errorCallback = e;
1765
});
1766
}
1767
1768
public complete(value: T) {
1769
if (this.isSettled) {
1770
return Promise.resolve();
1771
}
1772
1773
return new Promise<void>(resolve => {
1774
this.completeCallback(value);
1775
this.outcome = { outcome: DeferredOutcome.Resolved, value };
1776
resolve();
1777
});
1778
}
1779
1780
public error(err: unknown) {
1781
if (this.isSettled) {
1782
return Promise.resolve();
1783
}
1784
1785
return new Promise<void>(resolve => {
1786
this.errorCallback(err);
1787
this.outcome = { outcome: DeferredOutcome.Rejected, value: err };
1788
resolve();
1789
});
1790
}
1791
1792
public settleWith(promise: Promise<T>): Promise<void> {
1793
return promise.then(
1794
value => this.complete(value),
1795
error => this.error(error)
1796
);
1797
}
1798
1799
public cancel() {
1800
return this.error(new CancellationError());
1801
}
1802
}
1803
1804
//#endregion
1805
1806
//#region Promises
1807
1808
export namespace Promises {
1809
1810
/**
1811
* A drop-in replacement for `Promise.all` with the only difference
1812
* that the method awaits every promise to either fulfill or reject.
1813
*
1814
* Similar to `Promise.all`, only the first error will be returned
1815
* if any.
1816
*/
1817
export async function settled<T>(promises: Promise<T>[]): Promise<T[]> {
1818
let firstError: Error | undefined = undefined;
1819
1820
const result = await Promise.all(promises.map(promise => promise.then(value => value, error => {
1821
if (!firstError) {
1822
firstError = error;
1823
}
1824
1825
return undefined; // do not rethrow so that other promises can settle
1826
})));
1827
1828
if (typeof firstError !== 'undefined') {
1829
throw firstError;
1830
}
1831
1832
return result as unknown as T[]; // cast is needed and protected by the `throw` above
1833
}
1834
1835
/**
1836
* A helper to create a new `Promise<T>` with a body that is a promise
1837
* itself. By default, an error that raises from the async body will
1838
* end up as a unhandled rejection, so this utility properly awaits the
1839
* body and rejects the promise as a normal promise does without async
1840
* body.
1841
*
1842
* This method should only be used in rare cases where otherwise `async`
1843
* cannot be used (e.g. when callbacks are involved that require this).
1844
*/
1845
export function withAsyncBody<T, E = Error>(bodyFn: (resolve: (value: T) => unknown, reject: (error: E) => unknown) => Promise<unknown>): Promise<T> {
1846
// eslint-disable-next-line no-async-promise-executor
1847
return new Promise<T>(async (resolve, reject) => {
1848
try {
1849
await bodyFn(resolve, reject);
1850
} catch (error) {
1851
reject(error);
1852
}
1853
});
1854
}
1855
}
1856
1857
export class StatefulPromise<T> {
1858
private _value: T | undefined = undefined;
1859
get value(): T | undefined { return this._value; }
1860
1861
private _error: unknown = undefined;
1862
get error(): unknown { return this._error; }
1863
1864
private _isResolved = false;
1865
get isResolved() { return this._isResolved; }
1866
1867
public readonly promise: Promise<T>;
1868
1869
constructor(promise: Promise<T>) {
1870
this.promise = promise.then(
1871
value => {
1872
this._value = value;
1873
this._isResolved = true;
1874
return value;
1875
},
1876
error => {
1877
this._error = error;
1878
this._isResolved = true;
1879
throw error;
1880
}
1881
);
1882
}
1883
1884
/**
1885
* Returns the resolved value.
1886
* Throws if the promise is not resolved yet.
1887
*/
1888
public requireValue(): T {
1889
if (!this._isResolved) {
1890
throw new BugIndicatingError('Promise is not resolved yet');
1891
}
1892
if (this._error) {
1893
throw this._error;
1894
}
1895
return this._value!;
1896
}
1897
}
1898
1899
export class LazyStatefulPromise<T> {
1900
private readonly _promise = new Lazy(() => new StatefulPromise(this._compute()));
1901
1902
constructor(
1903
private readonly _compute: () => Promise<T>,
1904
) { }
1905
1906
/**
1907
* Returns the resolved value.
1908
* Throws if the promise is not resolved yet.
1909
*/
1910
public requireValue(): T {
1911
return this._promise.value.requireValue();
1912
}
1913
1914
/**
1915
* Returns the promise (and triggers a computation of the promise if not yet done so).
1916
*/
1917
public getPromise(): Promise<T> {
1918
return this._promise.value.promise;
1919
}
1920
1921
/**
1922
* Reads the current value without triggering a computation of the promise.
1923
*/
1924
public get currentValue(): T | undefined {
1925
return this._promise.rawValue?.value;
1926
}
1927
}
1928
1929
//#endregion
1930
1931
//#region
1932
1933
const enum AsyncIterableSourceState {
1934
Initial,
1935
DoneOK,
1936
DoneError,
1937
}
1938
1939
/**
1940
* An object that allows to emit async values asynchronously or bring the iterable to an error state using `reject()`.
1941
* This emitter is valid only for the duration of the executor (until the promise returned by the executor settles).
1942
*/
1943
export interface AsyncIterableEmitter<T> {
1944
/**
1945
* The value will be appended at the end.
1946
*
1947
* **NOTE** If `reject()` has already been called, this method has no effect.
1948
*/
1949
emitOne(value: T): void;
1950
/**
1951
* The values will be appended at the end.
1952
*
1953
* **NOTE** If `reject()` has already been called, this method has no effect.
1954
*/
1955
emitMany(values: T[]): void;
1956
/**
1957
* Writing an error will permanently invalidate this iterable.
1958
* The current users will receive an error thrown, as will all future users.
1959
*
1960
* **NOTE** If `reject()` have already been called, this method has no effect.
1961
*/
1962
reject(error: Error): void;
1963
}
1964
1965
/**
1966
* An executor for the `AsyncIterableObject` that has access to an emitter.
1967
*/
1968
export interface AsyncIterableExecutor<T> {
1969
/**
1970
* @param emitter An object that allows to emit async values valid only for the duration of the executor.
1971
*/
1972
(emitter: AsyncIterableEmitter<T>): unknown | Promise<unknown>;
1973
}
1974
1975
/**
1976
* A rich implementation for an `AsyncIterable<T>`.
1977
*/
1978
export class AsyncIterableObject<T> implements AsyncIterable<T> {
1979
1980
public static fromArray<T>(items: T[]): AsyncIterableObject<T> {
1981
return new AsyncIterableObject<T>((writer) => {
1982
writer.emitMany(items);
1983
});
1984
}
1985
1986
public static fromPromise<T>(promise: Promise<T[]>): AsyncIterableObject<T> {
1987
return new AsyncIterableObject<T>(async (emitter) => {
1988
emitter.emitMany(await promise);
1989
});
1990
}
1991
1992
public static fromPromisesResolveOrder<T>(promises: Promise<T>[]): AsyncIterableObject<T> {
1993
return new AsyncIterableObject<T>(async (emitter) => {
1994
await Promise.all(promises.map(async (p) => emitter.emitOne(await p)));
1995
});
1996
}
1997
1998
public static merge<T>(iterables: AsyncIterable<T>[]): AsyncIterableObject<T> {
1999
return new AsyncIterableObject(async (emitter) => {
2000
await Promise.all(iterables.map(async (iterable) => {
2001
for await (const item of iterable) {
2002
emitter.emitOne(item);
2003
}
2004
}));
2005
});
2006
}
2007
2008
public static EMPTY = AsyncIterableObject.fromArray<any>([]);
2009
2010
private _state: AsyncIterableSourceState;
2011
private _results: T[];
2012
private _error: Error | null;
2013
private readonly _onReturn?: () => void | Promise<void>;
2014
private readonly _onStateChanged: Emitter<void>;
2015
2016
constructor(executor: AsyncIterableExecutor<T>, onReturn?: () => void | Promise<void>) {
2017
this._state = AsyncIterableSourceState.Initial;
2018
this._results = [];
2019
this._error = null;
2020
this._onReturn = onReturn;
2021
this._onStateChanged = new Emitter<void>();
2022
2023
queueMicrotask(async () => {
2024
const writer: AsyncIterableEmitter<T> = {
2025
emitOne: (item) => this.emitOne(item),
2026
emitMany: (items) => this.emitMany(items),
2027
reject: (error) => this.reject(error)
2028
};
2029
try {
2030
await Promise.resolve(executor(writer));
2031
this.resolve();
2032
} catch (err) {
2033
this.reject(err);
2034
} finally {
2035
writer.emitOne = undefined!;
2036
writer.emitMany = undefined!;
2037
writer.reject = undefined!;
2038
}
2039
});
2040
}
2041
2042
[Symbol.asyncIterator](): AsyncIterator<T, undefined, undefined> {
2043
let i = 0;
2044
return {
2045
next: async () => {
2046
do {
2047
if (this._state === AsyncIterableSourceState.DoneError) {
2048
throw this._error;
2049
}
2050
if (i < this._results.length) {
2051
return { done: false, value: this._results[i++] };
2052
}
2053
if (this._state === AsyncIterableSourceState.DoneOK) {
2054
return { done: true, value: undefined };
2055
}
2056
await Event.toPromise(this._onStateChanged.event);
2057
} while (true);
2058
},
2059
return: async () => {
2060
this._onReturn?.();
2061
return { done: true, value: undefined };
2062
}
2063
};
2064
}
2065
2066
public static map<T, R>(iterable: AsyncIterable<T>, mapFn: (item: T) => R): AsyncIterableObject<R> {
2067
return new AsyncIterableObject<R>(async (emitter) => {
2068
for await (const item of iterable) {
2069
emitter.emitOne(mapFn(item));
2070
}
2071
});
2072
}
2073
2074
public map<R>(mapFn: (item: T) => R): AsyncIterableObject<R> {
2075
return AsyncIterableObject.map(this, mapFn);
2076
}
2077
2078
public static filter<T>(iterable: AsyncIterable<T>, filterFn: (item: T) => boolean): AsyncIterableObject<T> {
2079
return new AsyncIterableObject<T>(async (emitter) => {
2080
for await (const item of iterable) {
2081
if (filterFn(item)) {
2082
emitter.emitOne(item);
2083
}
2084
}
2085
});
2086
}
2087
2088
public filter<T2 extends T>(filterFn: (item: T) => item is T2): AsyncIterableObject<T2>;
2089
public filter(filterFn: (item: T) => boolean): AsyncIterableObject<T>;
2090
public filter(filterFn: (item: T) => boolean): AsyncIterableObject<T> {
2091
return AsyncIterableObject.filter(this, filterFn);
2092
}
2093
2094
public static coalesce<T>(iterable: AsyncIterable<T | undefined | null>): AsyncIterableObject<T> {
2095
return <AsyncIterableObject<T>>AsyncIterableObject.filter(iterable, item => !!item);
2096
}
2097
2098
public coalesce(): AsyncIterableObject<NonNullable<T>> {
2099
return AsyncIterableObject.coalesce(this) as AsyncIterableObject<NonNullable<T>>;
2100
}
2101
2102
public static async toPromise<T>(iterable: AsyncIterable<T>): Promise<T[]> {
2103
const result: T[] = [];
2104
for await (const item of iterable) {
2105
result.push(item);
2106
}
2107
return result;
2108
}
2109
2110
public toPromise(): Promise<T[]> {
2111
return AsyncIterableObject.toPromise(this);
2112
}
2113
2114
/**
2115
* The value will be appended at the end.
2116
*
2117
* **NOTE** If `resolve()` or `reject()` have already been called, this method has no effect.
2118
*/
2119
private emitOne(value: T): void {
2120
if (this._state !== AsyncIterableSourceState.Initial) {
2121
return;
2122
}
2123
// it is important to add new values at the end,
2124
// as we may have iterators already running on the array
2125
this._results.push(value);
2126
this._onStateChanged.fire();
2127
}
2128
2129
/**
2130
* The values will be appended at the end.
2131
*
2132
* **NOTE** If `resolve()` or `reject()` have already been called, this method has no effect.
2133
*/
2134
private emitMany(values: T[]): void {
2135
if (this._state !== AsyncIterableSourceState.Initial) {
2136
return;
2137
}
2138
// it is important to add new values at the end,
2139
// as we may have iterators already running on the array
2140
this._results = this._results.concat(values);
2141
this._onStateChanged.fire();
2142
}
2143
2144
/**
2145
* Calling `resolve()` will mark the result array as complete.
2146
*
2147
* **NOTE** `resolve()` must be called, otherwise all consumers of this iterable will hang indefinitely, similar to a non-resolved promise.
2148
* **NOTE** If `resolve()` or `reject()` have already been called, this method has no effect.
2149
*/
2150
private resolve(): void {
2151
if (this._state !== AsyncIterableSourceState.Initial) {
2152
return;
2153
}
2154
this._state = AsyncIterableSourceState.DoneOK;
2155
this._onStateChanged.fire();
2156
}
2157
2158
/**
2159
* Writing an error will permanently invalidate this iterable.
2160
* The current users will receive an error thrown, as will all future users.
2161
*
2162
* **NOTE** If `resolve()` or `reject()` have already been called, this method has no effect.
2163
*/
2164
private reject(error: Error) {
2165
if (this._state !== AsyncIterableSourceState.Initial) {
2166
return;
2167
}
2168
this._state = AsyncIterableSourceState.DoneError;
2169
this._error = error;
2170
this._onStateChanged.fire();
2171
}
2172
}
2173
2174
2175
export function createCancelableAsyncIterableProducer<T>(callback: (token: CancellationToken) => AsyncIterable<T>): CancelableAsyncIterableProducer<T> {
2176
const source = new CancellationTokenSource();
2177
const innerIterable = callback(source.token);
2178
2179
return new CancelableAsyncIterableProducer<T>(source, async (emitter) => {
2180
const subscription = source.token.onCancellationRequested(() => {
2181
subscription.dispose();
2182
source.dispose();
2183
emitter.reject(new CancellationError());
2184
});
2185
try {
2186
for await (const item of innerIterable) {
2187
if (source.token.isCancellationRequested) {
2188
// canceled in the meantime
2189
return;
2190
}
2191
emitter.emitOne(item);
2192
}
2193
subscription.dispose();
2194
source.dispose();
2195
} catch (err) {
2196
subscription.dispose();
2197
source.dispose();
2198
emitter.reject(err);
2199
}
2200
});
2201
}
2202
2203
export class AsyncIterableSource<T> {
2204
2205
private readonly _deferred = new DeferredPromise<void>();
2206
private readonly _asyncIterable: AsyncIterableObject<T>;
2207
2208
private _errorFn: (error: Error) => void;
2209
private _emitOneFn: (item: T) => void;
2210
private _emitManyFn: (item: T[]) => void;
2211
2212
/**
2213
*
2214
* @param onReturn A function that will be called when consuming the async iterable
2215
* has finished by the consumer, e.g the for-await-loop has be existed (break, return) early.
2216
* This is NOT called when resolving this source by its owner.
2217
*/
2218
constructor(onReturn?: () => Promise<void> | void) {
2219
this._asyncIterable = new AsyncIterableObject(emitter => {
2220
2221
if (earlyError) {
2222
emitter.reject(earlyError);
2223
return;
2224
}
2225
if (earlyItems) {
2226
emitter.emitMany(earlyItems);
2227
}
2228
this._errorFn = (error: Error) => emitter.reject(error);
2229
this._emitOneFn = (item: T) => emitter.emitOne(item);
2230
this._emitManyFn = (items: T[]) => emitter.emitMany(items);
2231
return this._deferred.p;
2232
}, onReturn);
2233
2234
let earlyError: Error | undefined;
2235
let earlyItems: T[] | undefined;
2236
2237
2238
this._errorFn = (error: Error) => {
2239
if (!earlyError) {
2240
earlyError = error;
2241
}
2242
};
2243
this._emitOneFn = (item: T) => {
2244
if (!earlyItems) {
2245
earlyItems = [];
2246
}
2247
earlyItems.push(item);
2248
};
2249
this._emitManyFn = (items: T[]) => {
2250
if (!earlyItems) {
2251
earlyItems = items.slice();
2252
} else {
2253
items.forEach(item => earlyItems!.push(item));
2254
}
2255
};
2256
}
2257
2258
get asyncIterable(): AsyncIterableObject<T> {
2259
return this._asyncIterable;
2260
}
2261
2262
resolve(): void {
2263
this._deferred.complete();
2264
}
2265
2266
reject(error: Error): void {
2267
this._errorFn(error);
2268
this._deferred.complete();
2269
}
2270
2271
emitOne(item: T): void {
2272
this._emitOneFn(item);
2273
}
2274
2275
emitMany(items: T[]) {
2276
this._emitManyFn(items);
2277
}
2278
}
2279
2280
export function cancellableIterable<T>(iterableOrIterator: AsyncIterator<T> | AsyncIterable<T>, token: CancellationToken): AsyncIterableIterator<T> {
2281
const iterator = Symbol.asyncIterator in iterableOrIterator ? iterableOrIterator[Symbol.asyncIterator]() : iterableOrIterator;
2282
2283
return {
2284
async next(): Promise<IteratorResult<T>> {
2285
if (token.isCancellationRequested) {
2286
return { done: true, value: undefined };
2287
}
2288
const result = await raceCancellation(iterator.next(), token);
2289
return result || { done: true, value: undefined };
2290
},
2291
throw: iterator.throw?.bind(iterator),
2292
return: iterator.return?.bind(iterator),
2293
[Symbol.asyncIterator]() {
2294
return this;
2295
}
2296
};
2297
}
2298
2299
type ProducerConsumerValue<T> = {
2300
ok: true;
2301
value: T;
2302
} | {
2303
ok: false;
2304
error: Error;
2305
};
2306
2307
class ProducerConsumer<T> {
2308
private readonly _unsatisfiedConsumers: DeferredPromise<T>[] = [];
2309
private readonly _unconsumedValues: ProducerConsumerValue<T>[] = [];
2310
private _finalValue: ProducerConsumerValue<T> | undefined;
2311
2312
public get hasFinalValue(): boolean {
2313
return !!this._finalValue;
2314
}
2315
2316
produce(value: ProducerConsumerValue<T>): void {
2317
this._ensureNoFinalValue();
2318
if (this._unsatisfiedConsumers.length > 0) {
2319
const deferred = this._unsatisfiedConsumers.shift()!;
2320
this._resolveOrRejectDeferred(deferred, value);
2321
} else {
2322
this._unconsumedValues.push(value);
2323
}
2324
}
2325
2326
produceFinal(value: ProducerConsumerValue<T>): void {
2327
this._ensureNoFinalValue();
2328
this._finalValue = value;
2329
for (const deferred of this._unsatisfiedConsumers) {
2330
this._resolveOrRejectDeferred(deferred, value);
2331
}
2332
this._unsatisfiedConsumers.length = 0;
2333
}
2334
2335
private _ensureNoFinalValue(): void {
2336
if (this._finalValue) {
2337
throw new BugIndicatingError('ProducerConsumer: cannot produce after final value has been set');
2338
}
2339
}
2340
2341
private _resolveOrRejectDeferred(deferred: DeferredPromise<T>, value: ProducerConsumerValue<T>): void {
2342
if (value.ok) {
2343
deferred.complete(value.value);
2344
} else {
2345
deferred.error(value.error);
2346
}
2347
}
2348
2349
consume(): Promise<T> {
2350
if (this._unconsumedValues.length > 0 || this._finalValue) {
2351
const value = this._unconsumedValues.length > 0 ? this._unconsumedValues.shift()! : this._finalValue!;
2352
if (value.ok) {
2353
return Promise.resolve(value.value);
2354
} else {
2355
return Promise.reject(value.error);
2356
}
2357
} else {
2358
const deferred = new DeferredPromise<T>();
2359
this._unsatisfiedConsumers.push(deferred);
2360
return deferred.p;
2361
}
2362
}
2363
}
2364
2365
/**
2366
* Important difference to AsyncIterableObject:
2367
* If it is iterated two times, the second iterator will not see the values emitted by the first iterator.
2368
*/
2369
export class AsyncIterableProducer<T> implements AsyncIterable<T> {
2370
private readonly _producerConsumer = new ProducerConsumer<IteratorResult<T>>();
2371
2372
constructor(executor: AsyncIterableExecutor<T>, private readonly _onReturn?: () => void) {
2373
queueMicrotask(async () => {
2374
const p = executor({
2375
emitOne: value => this._producerConsumer.produce({ ok: true, value: { done: false, value: value } }),
2376
emitMany: values => {
2377
for (const value of values) {
2378
this._producerConsumer.produce({ ok: true, value: { done: false, value: value } });
2379
}
2380
},
2381
reject: error => this._finishError(error),
2382
});
2383
2384
if (!this._producerConsumer.hasFinalValue) {
2385
try {
2386
await p;
2387
this._finishOk();
2388
} catch (error) {
2389
this._finishError(error);
2390
}
2391
}
2392
});
2393
}
2394
2395
public static fromArray<T>(items: T[]): AsyncIterableProducer<T> {
2396
return new AsyncIterableProducer<T>((writer) => {
2397
writer.emitMany(items);
2398
});
2399
}
2400
2401
public static fromPromise<T>(promise: Promise<T[]>): AsyncIterableProducer<T> {
2402
return new AsyncIterableProducer<T>(async (emitter) => {
2403
emitter.emitMany(await promise);
2404
});
2405
}
2406
2407
public static fromPromisesResolveOrder<T>(promises: Promise<T>[]): AsyncIterableProducer<T> {
2408
return new AsyncIterableProducer<T>(async (emitter) => {
2409
await Promise.all(promises.map(async (p) => emitter.emitOne(await p)));
2410
});
2411
}
2412
2413
public static merge<T>(iterables: AsyncIterable<T>[]): AsyncIterableProducer<T> {
2414
return new AsyncIterableProducer(async (emitter) => {
2415
await Promise.all(iterables.map(async (iterable) => {
2416
for await (const item of iterable) {
2417
emitter.emitOne(item);
2418
}
2419
}));
2420
});
2421
}
2422
2423
public static EMPTY = AsyncIterableProducer.fromArray<any>([]);
2424
2425
public static map<T, R>(iterable: AsyncIterable<T>, mapFn: (item: T) => R): AsyncIterableProducer<R> {
2426
return new AsyncIterableProducer<R>(async (emitter) => {
2427
for await (const item of iterable) {
2428
emitter.emitOne(mapFn(item));
2429
}
2430
});
2431
}
2432
2433
public static tee<T>(iterable: AsyncIterable<T>): [AsyncIterableProducer<T>, AsyncIterableProducer<T>] {
2434
let emitter1: AsyncIterableEmitter<T> | undefined;
2435
let emitter2: AsyncIterableEmitter<T> | undefined;
2436
2437
const defer = new DeferredPromise<void>();
2438
2439
const start = async () => {
2440
if (!emitter1 || !emitter2) {
2441
return; // not yet ready
2442
}
2443
try {
2444
for await (const item of iterable) {
2445
emitter1.emitOne(item);
2446
emitter2.emitOne(item);
2447
}
2448
} catch (err) {
2449
emitter1.reject(err);
2450
emitter2.reject(err);
2451
} finally {
2452
defer.complete();
2453
}
2454
};
2455
2456
const p1 = new AsyncIterableProducer<T>(async (emitter) => {
2457
emitter1 = emitter;
2458
start();
2459
return defer.p;
2460
});
2461
const p2 = new AsyncIterableProducer<T>(async (emitter) => {
2462
emitter2 = emitter;
2463
start();
2464
return defer.p;
2465
});
2466
return [p1, p2];
2467
}
2468
2469
public map<R>(mapFn: (item: T) => R): AsyncIterableProducer<R> {
2470
return AsyncIterableProducer.map(this, mapFn);
2471
}
2472
2473
public static coalesce<T>(iterable: AsyncIterable<T | undefined | null>): AsyncIterableProducer<T> {
2474
return <AsyncIterableProducer<T>>AsyncIterableProducer.filter(iterable, item => !!item);
2475
}
2476
2477
public coalesce(): AsyncIterableProducer<NonNullable<T>> {
2478
return AsyncIterableProducer.coalesce(this) as AsyncIterableProducer<NonNullable<T>>;
2479
}
2480
2481
public static filter<T>(iterable: AsyncIterable<T>, filterFn: (item: T) => boolean): AsyncIterableProducer<T> {
2482
return new AsyncIterableProducer<T>(async (emitter) => {
2483
for await (const item of iterable) {
2484
if (filterFn(item)) {
2485
emitter.emitOne(item);
2486
}
2487
}
2488
});
2489
}
2490
2491
public filter<T2 extends T>(filterFn: (item: T) => item is T2): AsyncIterableProducer<T2>;
2492
public filter(filterFn: (item: T) => boolean): AsyncIterableProducer<T>;
2493
public filter(filterFn: (item: T) => boolean): AsyncIterableProducer<T> {
2494
return AsyncIterableProducer.filter(this, filterFn);
2495
}
2496
2497
private _finishOk(): void {
2498
if (!this._producerConsumer.hasFinalValue) {
2499
this._producerConsumer.produceFinal({ ok: true, value: { done: true, value: undefined } });
2500
}
2501
}
2502
2503
private _finishError(error: Error): void {
2504
if (!this._producerConsumer.hasFinalValue) {
2505
this._producerConsumer.produceFinal({ ok: false, error: error });
2506
}
2507
// Warning: this can cause to dropped errors.
2508
}
2509
2510
private readonly _iterator: AsyncIterator<T, void, void> = {
2511
next: () => this._producerConsumer.consume(),
2512
return: () => {
2513
this._onReturn?.();
2514
return Promise.resolve({ done: true, value: undefined });
2515
},
2516
throw: async (e) => {
2517
this._finishError(e);
2518
return { done: true, value: undefined };
2519
},
2520
};
2521
2522
[Symbol.asyncIterator](): AsyncIterator<T, void, void> {
2523
return this._iterator;
2524
}
2525
}
2526
2527
export class CancelableAsyncIterableProducer<T> extends AsyncIterableProducer<T> {
2528
constructor(
2529
private readonly _source: CancellationTokenSource,
2530
executor: AsyncIterableExecutor<T>
2531
) {
2532
super(executor);
2533
}
2534
2535
cancel(): void {
2536
this._source.cancel();
2537
}
2538
}
2539
2540
//#endregion
2541
2542
export const AsyncReaderEndOfStream = Symbol('AsyncReaderEndOfStream');
2543
2544
export class AsyncReader<T> {
2545
private _buffer: T[] = [];
2546
private _atEnd = false;
2547
2548
public get endOfStream(): boolean { return this._buffer.length === 0 && this._atEnd; }
2549
private _extendBufferPromise: Promise<void> | undefined;
2550
2551
constructor(
2552
private readonly _source: AsyncIterator<T>
2553
) {
2554
}
2555
2556
public async read(): Promise<T | typeof AsyncReaderEndOfStream> {
2557
if (this._buffer.length === 0 && !this._atEnd) {
2558
await this._extendBuffer();
2559
}
2560
if (this._buffer.length === 0) {
2561
return AsyncReaderEndOfStream;
2562
}
2563
return this._buffer.shift()!;
2564
}
2565
2566
public async readWhile(predicate: (value: T) => boolean, callback: (element: T) => unknown): Promise<void> {
2567
do {
2568
const piece = await this.peek();
2569
if (piece === AsyncReaderEndOfStream) {
2570
break;
2571
}
2572
if (!predicate(piece)) {
2573
break;
2574
}
2575
await this.read(); // consume
2576
await callback(piece);
2577
} while (true);
2578
}
2579
2580
public readBufferedOrThrow(): T | typeof AsyncReaderEndOfStream {
2581
const value = this.peekBufferedOrThrow();
2582
this._buffer.shift();
2583
return value;
2584
}
2585
2586
public async consumeToEnd(): Promise<void> {
2587
while (!this.endOfStream) {
2588
await this.read();
2589
}
2590
}
2591
2592
public async peek(): Promise<T | typeof AsyncReaderEndOfStream> {
2593
if (this._buffer.length === 0 && !this._atEnd) {
2594
await this._extendBuffer();
2595
}
2596
if (this._buffer.length === 0) {
2597
return AsyncReaderEndOfStream;
2598
}
2599
return this._buffer[0];
2600
}
2601
2602
public peekBufferedOrThrow(): T | typeof AsyncReaderEndOfStream {
2603
if (this._buffer.length === 0) {
2604
if (this._atEnd) {
2605
return AsyncReaderEndOfStream;
2606
}
2607
throw new BugIndicatingError('No buffered elements');
2608
}
2609
2610
return this._buffer[0];
2611
}
2612
2613
public async peekTimeout(timeoutMs: number): Promise<T | typeof AsyncReaderEndOfStream | undefined> {
2614
if (this._buffer.length === 0 && !this._atEnd) {
2615
await raceTimeout(this._extendBuffer(), timeoutMs);
2616
}
2617
if (this._atEnd) {
2618
return AsyncReaderEndOfStream;
2619
}
2620
if (this._buffer.length === 0) {
2621
return undefined;
2622
}
2623
return this._buffer[0];
2624
}
2625
2626
private _extendBuffer(): Promise<void> {
2627
if (this._atEnd) {
2628
return Promise.resolve();
2629
}
2630
2631
if (!this._extendBufferPromise) {
2632
this._extendBufferPromise = (async () => {
2633
const { value, done } = await this._source.next();
2634
this._extendBufferPromise = undefined;
2635
if (done) {
2636
this._atEnd = true;
2637
} else {
2638
this._buffer.push(value);
2639
}
2640
})();
2641
}
2642
2643
return this._extendBufferPromise;
2644
}
2645
}
2646
2647