Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/src/vs/base/common/async.ts
3291 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
import { CancellationToken, CancellationTokenSource } from './cancellation.js';
7
import { BugIndicatingError, CancellationError } from './errors.js';
8
import { Emitter, Event } from './event.js';
9
import { Disposable, DisposableMap, DisposableStore, IDisposable, isDisposable, MutableDisposable, toDisposable } from './lifecycle.js';
10
import { extUri as defaultExtUri, IExtUri } from './resources.js';
11
import { URI } from './uri.js';
12
import { setTimeout0 } from './platform.js';
13
import { MicrotaskDelay } from './symbols.js';
14
import { Lazy } from './lazy.js';
15
16
export function isThenable<T>(obj: unknown): obj is Promise<T> {
17
return !!obj && typeof (obj as unknown as Promise<T>).then === 'function';
18
}
19
20
export interface CancelablePromise<T> extends Promise<T> {
21
cancel(): void;
22
}
23
24
/**
25
* Returns a promise that can be cancelled using the provided cancellation token.
26
*
27
* @remarks When cancellation is requested, the promise will be rejected with a {@link CancellationError}.
28
* If the promise resolves to a disposable object, it will be automatically disposed when cancellation
29
* is requested.
30
*
31
* @param callback A function that accepts a cancellation token and returns a promise
32
* @returns A promise that can be cancelled
33
*/
34
export function createCancelablePromise<T>(callback: (token: CancellationToken) => Promise<T>): CancelablePromise<T> {
35
const source = new CancellationTokenSource();
36
37
const thenable = callback(source.token);
38
39
let isCancelled = false;
40
41
const promise = new Promise<T>((resolve, reject) => {
42
const subscription = source.token.onCancellationRequested(() => {
43
isCancelled = true;
44
subscription.dispose();
45
reject(new CancellationError());
46
});
47
Promise.resolve(thenable).then(value => {
48
subscription.dispose();
49
source.dispose();
50
51
if (!isCancelled) {
52
resolve(value);
53
54
} else if (isDisposable(value)) {
55
// promise has been cancelled, result is disposable and will
56
// be cleaned up
57
value.dispose();
58
}
59
}, err => {
60
subscription.dispose();
61
source.dispose();
62
reject(err);
63
});
64
});
65
66
return <CancelablePromise<T>>new class {
67
cancel() {
68
source.cancel();
69
source.dispose();
70
}
71
then<TResult1 = T, TResult2 = never>(resolve?: ((value: T) => TResult1 | Promise<TResult1>) | undefined | null, reject?: ((reason: unknown) => TResult2 | Promise<TResult2>) | undefined | null): Promise<TResult1 | TResult2> {
72
return promise.then(resolve, reject);
73
}
74
catch<TResult = never>(reject?: ((reason: unknown) => TResult | Promise<TResult>) | undefined | null): Promise<T | TResult> {
75
return this.then(undefined, reject);
76
}
77
finally(onfinally?: (() => void) | undefined | null): Promise<T> {
78
return promise.finally(onfinally);
79
}
80
};
81
}
82
83
/**
84
* Returns a promise that resolves with `undefined` as soon as the passed token is cancelled.
85
* @see {@link raceCancellationError}
86
*/
87
export function raceCancellation<T>(promise: Promise<T>, token: CancellationToken): Promise<T | undefined>;
88
89
/**
90
* Returns a promise that resolves with `defaultValue` as soon as the passed token is cancelled.
91
* @see {@link raceCancellationError}
92
*/
93
export function raceCancellation<T>(promise: Promise<T>, token: CancellationToken, defaultValue: T): Promise<T>;
94
95
export function raceCancellation<T>(promise: Promise<T>, token: CancellationToken, defaultValue?: T): Promise<T | undefined> {
96
return new Promise((resolve, reject) => {
97
const ref = token.onCancellationRequested(() => {
98
ref.dispose();
99
resolve(defaultValue);
100
});
101
promise.then(resolve, reject).finally(() => ref.dispose());
102
});
103
}
104
105
/**
106
* Returns a promise that rejects with an {@CancellationError} as soon as the passed token is cancelled.
107
* @see {@link raceCancellation}
108
*/
109
export function raceCancellationError<T>(promise: Promise<T>, token: CancellationToken): Promise<T> {
110
return new Promise((resolve, reject) => {
111
const ref = token.onCancellationRequested(() => {
112
ref.dispose();
113
reject(new CancellationError());
114
});
115
promise.then(resolve, reject).finally(() => ref.dispose());
116
});
117
}
118
119
/**
120
* Wraps a cancellable promise such that it is no cancellable. Can be used to
121
* avoid issues with shared promises that would normally be returned as
122
* cancellable to consumers.
123
*/
124
export function notCancellablePromise<T>(promise: CancelablePromise<T>): Promise<T> {
125
return new Promise<T>((resolve, reject) => {
126
promise.then(resolve, reject);
127
});
128
}
129
130
/**
131
* Returns as soon as one of the promises resolves or rejects and cancels remaining promises
132
*/
133
export function raceCancellablePromises<T>(cancellablePromises: (CancelablePromise<T> | Promise<T>)[]): CancelablePromise<T> {
134
let resolvedPromiseIndex = -1;
135
const promises = cancellablePromises.map((promise, index) => promise.then(result => { resolvedPromiseIndex = index; return result; }));
136
const promise = Promise.race(promises) as CancelablePromise<T>;
137
promise.cancel = () => {
138
cancellablePromises.forEach((cancellablePromise, index) => {
139
if (index !== resolvedPromiseIndex && (cancellablePromise as CancelablePromise<T>).cancel) {
140
(cancellablePromise as CancelablePromise<T>).cancel();
141
}
142
});
143
};
144
promise.finally(() => {
145
promise.cancel();
146
});
147
return promise;
148
}
149
150
export function raceTimeout<T>(promise: Promise<T>, timeout: number, onTimeout?: () => void): Promise<T | undefined> {
151
let promiseResolve: ((value: T | undefined) => void) | undefined = undefined;
152
153
const timer = setTimeout(() => {
154
promiseResolve?.(undefined);
155
onTimeout?.();
156
}, timeout);
157
158
return Promise.race([
159
promise.finally(() => clearTimeout(timer)),
160
new Promise<T | undefined>(resolve => promiseResolve = resolve)
161
]);
162
}
163
164
export function asPromise<T>(callback: () => T | Thenable<T>): Promise<T> {
165
return new Promise<T>((resolve, reject) => {
166
const item = callback();
167
if (isThenable<T>(item)) {
168
item.then(resolve, reject);
169
} else {
170
resolve(item);
171
}
172
});
173
}
174
175
/**
176
* Creates and returns a new promise, plus its `resolve` and `reject` callbacks.
177
*
178
* Replace with standardized [`Promise.withResolvers`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/withResolvers) once it is supported
179
*/
180
export function promiseWithResolvers<T>(): { promise: Promise<T>; resolve: (value: T | PromiseLike<T>) => void; reject: (err?: any) => void } {
181
let resolve: (value: T | PromiseLike<T>) => void;
182
let reject: (reason?: any) => void;
183
const promise = new Promise<T>((res, rej) => {
184
resolve = res;
185
reject = rej;
186
});
187
return { promise, resolve: resolve!, reject: reject! };
188
}
189
190
export interface ITask<T> {
191
(): T;
192
}
193
194
/**
195
* A helper to prevent accumulation of sequential async tasks.
196
*
197
* Imagine a mail man with the sole task of delivering letters. As soon as
198
* a letter submitted for delivery, he drives to the destination, delivers it
199
* and returns to his base. Imagine that during the trip, N more letters were submitted.
200
* When the mail man returns, he picks those N letters and delivers them all in a
201
* single trip. Even though N+1 submissions occurred, only 2 deliveries were made.
202
*
203
* The throttler implements this via the queue() method, by providing it a task
204
* factory. Following the example:
205
*
206
* const throttler = new Throttler();
207
* const letters = [];
208
*
209
* function deliver() {
210
* const lettersToDeliver = letters;
211
* letters = [];
212
* return makeTheTrip(lettersToDeliver);
213
* }
214
*
215
* function onLetterReceived(l) {
216
* letters.push(l);
217
* throttler.queue(deliver);
218
* }
219
*/
220
export class Throttler implements IDisposable {
221
222
private activePromise: Promise<any> | null;
223
private queuedPromise: Promise<any> | null;
224
private queuedPromiseFactory: ITask<Promise<any>> | null;
225
226
private isDisposed = false;
227
228
constructor() {
229
this.activePromise = null;
230
this.queuedPromise = null;
231
this.queuedPromiseFactory = null;
232
}
233
234
queue<T>(promiseFactory: ITask<Promise<T>>): Promise<T> {
235
if (this.isDisposed) {
236
return Promise.reject(new Error('Throttler is disposed'));
237
}
238
239
if (this.activePromise) {
240
this.queuedPromiseFactory = promiseFactory;
241
242
if (!this.queuedPromise) {
243
const onComplete = () => {
244
this.queuedPromise = null;
245
246
if (this.isDisposed) {
247
return;
248
}
249
250
const result = this.queue(this.queuedPromiseFactory!);
251
this.queuedPromiseFactory = null;
252
253
return result;
254
};
255
256
this.queuedPromise = new Promise(resolve => {
257
this.activePromise!.then(onComplete, onComplete).then(resolve);
258
});
259
}
260
261
return new Promise((resolve, reject) => {
262
this.queuedPromise!.then(resolve, reject);
263
});
264
}
265
266
this.activePromise = promiseFactory();
267
268
return new Promise((resolve, reject) => {
269
this.activePromise!.then((result: T) => {
270
this.activePromise = null;
271
resolve(result);
272
}, (err: unknown) => {
273
this.activePromise = null;
274
reject(err);
275
});
276
});
277
}
278
279
dispose(): void {
280
this.isDisposed = true;
281
}
282
}
283
284
export class Sequencer {
285
286
private current: Promise<unknown> = Promise.resolve(null);
287
288
queue<T>(promiseTask: ITask<Promise<T>>): Promise<T> {
289
return this.current = this.current.then(() => promiseTask(), () => promiseTask());
290
}
291
}
292
293
export class SequencerByKey<TKey> {
294
295
private promiseMap = new Map<TKey, Promise<unknown>>();
296
297
queue<T>(key: TKey, promiseTask: ITask<Promise<T>>): Promise<T> {
298
const runningPromise = this.promiseMap.get(key) ?? Promise.resolve();
299
const newPromise = runningPromise
300
.catch(() => { })
301
.then(promiseTask)
302
.finally(() => {
303
if (this.promiseMap.get(key) === newPromise) {
304
this.promiseMap.delete(key);
305
}
306
});
307
this.promiseMap.set(key, newPromise);
308
return newPromise;
309
}
310
311
keys(): IterableIterator<TKey> {
312
return this.promiseMap.keys();
313
}
314
}
315
316
interface IScheduledLater extends IDisposable {
317
isTriggered(): boolean;
318
}
319
320
const timeoutDeferred = (timeout: number, fn: () => void): IScheduledLater => {
321
let scheduled = true;
322
const handle = setTimeout(() => {
323
scheduled = false;
324
fn();
325
}, timeout);
326
return {
327
isTriggered: () => scheduled,
328
dispose: () => {
329
clearTimeout(handle);
330
scheduled = false;
331
},
332
};
333
};
334
335
const microtaskDeferred = (fn: () => void): IScheduledLater => {
336
let scheduled = true;
337
queueMicrotask(() => {
338
if (scheduled) {
339
scheduled = false;
340
fn();
341
}
342
});
343
344
return {
345
isTriggered: () => scheduled,
346
dispose: () => { scheduled = false; },
347
};
348
};
349
350
/**
351
* A helper to delay (debounce) execution of a task that is being requested often.
352
*
353
* Following the throttler, now imagine the mail man wants to optimize the number of
354
* trips proactively. The trip itself can be long, so he decides not to make the trip
355
* as soon as a letter is submitted. Instead he waits a while, in case more
356
* letters are submitted. After said waiting period, if no letters were submitted, he
357
* decides to make the trip. Imagine that N more letters were submitted after the first
358
* one, all within a short period of time between each other. Even though N+1
359
* submissions occurred, only 1 delivery was made.
360
*
361
* The delayer offers this behavior via the trigger() method, into which both the task
362
* to be executed and the waiting period (delay) must be passed in as arguments. Following
363
* the example:
364
*
365
* const delayer = new Delayer(WAITING_PERIOD);
366
* const letters = [];
367
*
368
* function letterReceived(l) {
369
* letters.push(l);
370
* delayer.trigger(() => { return makeTheTrip(); });
371
* }
372
*/
373
export class Delayer<T> implements IDisposable {
374
375
private deferred: IScheduledLater | null;
376
private completionPromise: Promise<any> | null;
377
private doResolve: ((value?: any | Promise<any>) => void) | null;
378
private doReject: ((err: unknown) => void) | null;
379
private task: ITask<T | Promise<T>> | null;
380
381
constructor(public defaultDelay: number | typeof MicrotaskDelay) {
382
this.deferred = null;
383
this.completionPromise = null;
384
this.doResolve = null;
385
this.doReject = null;
386
this.task = null;
387
}
388
389
trigger(task: ITask<T | Promise<T>>, delay = this.defaultDelay): Promise<T> {
390
this.task = task;
391
this.cancelTimeout();
392
393
if (!this.completionPromise) {
394
this.completionPromise = new Promise((resolve, reject) => {
395
this.doResolve = resolve;
396
this.doReject = reject;
397
}).then(() => {
398
this.completionPromise = null;
399
this.doResolve = null;
400
if (this.task) {
401
const task = this.task;
402
this.task = null;
403
return task();
404
}
405
return undefined;
406
});
407
}
408
409
const fn = () => {
410
this.deferred = null;
411
this.doResolve?.(null);
412
};
413
414
this.deferred = delay === MicrotaskDelay ? microtaskDeferred(fn) : timeoutDeferred(delay, fn);
415
416
return this.completionPromise;
417
}
418
419
isTriggered(): boolean {
420
return !!this.deferred?.isTriggered();
421
}
422
423
cancel(): void {
424
this.cancelTimeout();
425
426
if (this.completionPromise) {
427
this.doReject?.(new CancellationError());
428
this.completionPromise = null;
429
}
430
}
431
432
private cancelTimeout(): void {
433
this.deferred?.dispose();
434
this.deferred = null;
435
}
436
437
dispose(): void {
438
this.cancel();
439
}
440
}
441
442
/**
443
* A helper to delay execution of a task that is being requested often, while
444
* preventing accumulation of consecutive executions, while the task runs.
445
*
446
* The mail man is clever and waits for a certain amount of time, before going
447
* out to deliver letters. While the mail man is going out, more letters arrive
448
* and can only be delivered once he is back. Once he is back the mail man will
449
* do one more trip to deliver the letters that have accumulated while he was out.
450
*/
451
export class ThrottledDelayer<T> {
452
453
private delayer: Delayer<Promise<T>>;
454
private throttler: Throttler;
455
456
constructor(defaultDelay: number) {
457
this.delayer = new Delayer(defaultDelay);
458
this.throttler = new Throttler();
459
}
460
461
trigger(promiseFactory: ITask<Promise<T>>, delay?: number): Promise<T> {
462
return this.delayer.trigger(() => this.throttler.queue(promiseFactory), delay) as unknown as Promise<T>;
463
}
464
465
isTriggered(): boolean {
466
return this.delayer.isTriggered();
467
}
468
469
cancel(): void {
470
this.delayer.cancel();
471
}
472
473
dispose(): void {
474
this.delayer.dispose();
475
this.throttler.dispose();
476
}
477
}
478
479
/**
480
* A barrier that is initially closed and then becomes opened permanently.
481
*/
482
export class Barrier {
483
private _isOpen: boolean;
484
private _promise: Promise<boolean>;
485
private _completePromise!: (v: boolean) => void;
486
487
constructor() {
488
this._isOpen = false;
489
this._promise = new Promise<boolean>((c, e) => {
490
this._completePromise = c;
491
});
492
}
493
494
isOpen(): boolean {
495
return this._isOpen;
496
}
497
498
open(): void {
499
this._isOpen = true;
500
this._completePromise(true);
501
}
502
503
wait(): Promise<boolean> {
504
return this._promise;
505
}
506
}
507
508
/**
509
* A barrier that is initially closed and then becomes opened permanently after a certain period of
510
* time or when open is called explicitly
511
*/
512
export class AutoOpenBarrier extends Barrier {
513
514
private readonly _timeout: Timeout;
515
516
constructor(autoOpenTimeMs: number) {
517
super();
518
this._timeout = setTimeout(() => this.open(), autoOpenTimeMs);
519
}
520
521
override open(): void {
522
clearTimeout(this._timeout);
523
super.open();
524
}
525
}
526
527
export function timeout(millis: number): CancelablePromise<void>;
528
export function timeout(millis: number, token: CancellationToken): Promise<void>;
529
export function timeout(millis: number, token?: CancellationToken): CancelablePromise<void> | Promise<void> {
530
if (!token) {
531
return createCancelablePromise(token => timeout(millis, token));
532
}
533
534
return new Promise((resolve, reject) => {
535
const handle = setTimeout(() => {
536
disposable.dispose();
537
resolve();
538
}, millis);
539
const disposable = token.onCancellationRequested(() => {
540
clearTimeout(handle);
541
disposable.dispose();
542
reject(new CancellationError());
543
});
544
});
545
}
546
547
/**
548
* Creates a timeout that can be disposed using its returned value.
549
* @param handler The timeout handler.
550
* @param timeout An optional timeout in milliseconds.
551
* @param store An optional {@link DisposableStore} that will have the timeout disposable managed automatically.
552
*
553
* @example
554
* const store = new DisposableStore;
555
* // Call the timeout after 1000ms at which point it will be automatically
556
* // evicted from the store.
557
* const timeoutDisposable = disposableTimeout(() => {}, 1000, store);
558
*
559
* if (foo) {
560
* // Cancel the timeout and evict it from store.
561
* timeoutDisposable.dispose();
562
* }
563
*/
564
export function disposableTimeout(handler: () => void, timeout = 0, store?: DisposableStore): IDisposable {
565
const timer = setTimeout(() => {
566
handler();
567
if (store) {
568
disposable.dispose();
569
}
570
}, timeout);
571
const disposable = toDisposable(() => {
572
clearTimeout(timer);
573
store?.delete(disposable);
574
});
575
store?.add(disposable);
576
return disposable;
577
}
578
579
/**
580
* Runs the provided list of promise factories in sequential order. The returned
581
* promise will complete to an array of results from each promise.
582
*/
583
584
export function sequence<T>(promiseFactories: ITask<Promise<T>>[]): Promise<T[]> {
585
const results: T[] = [];
586
let index = 0;
587
const len = promiseFactories.length;
588
589
function next(): Promise<T> | null {
590
return index < len ? promiseFactories[index++]() : null;
591
}
592
593
function thenHandler(result: unknown): Promise<any> {
594
if (result !== undefined && result !== null) {
595
results.push(result as T);
596
}
597
598
const n = next();
599
if (n) {
600
return n.then(thenHandler);
601
}
602
603
return Promise.resolve(results);
604
}
605
606
return Promise.resolve(null).then(thenHandler);
607
}
608
609
export function first<T>(promiseFactories: ITask<Promise<T>>[], shouldStop: (t: T) => boolean = t => !!t, defaultValue: T | null = null): Promise<T | null> {
610
let index = 0;
611
const len = promiseFactories.length;
612
613
const loop: () => Promise<T | null> = () => {
614
if (index >= len) {
615
return Promise.resolve(defaultValue);
616
}
617
618
const factory = promiseFactories[index++];
619
const promise = Promise.resolve(factory());
620
621
return promise.then(result => {
622
if (shouldStop(result)) {
623
return Promise.resolve(result);
624
}
625
626
return loop();
627
});
628
};
629
630
return loop();
631
}
632
633
/**
634
* Returns the result of the first promise that matches the "shouldStop",
635
* running all promises in parallel. Supports cancelable promises.
636
*/
637
export function firstParallel<T>(promiseList: Promise<T>[], shouldStop?: (t: T) => boolean, defaultValue?: T | null): Promise<T | null>;
638
export function firstParallel<T, R extends T>(promiseList: Promise<T>[], shouldStop: (t: T) => t is R, defaultValue?: R | null): Promise<R | null>;
639
export function firstParallel<T>(promiseList: Promise<T>[], shouldStop: (t: T) => boolean = t => !!t, defaultValue: T | null = null) {
640
if (promiseList.length === 0) {
641
return Promise.resolve(defaultValue);
642
}
643
644
let todo = promiseList.length;
645
const finish = () => {
646
todo = -1;
647
for (const promise of promiseList) {
648
(promise as Partial<CancelablePromise<T>>).cancel?.();
649
}
650
};
651
652
return new Promise<T | null>((resolve, reject) => {
653
for (const promise of promiseList) {
654
promise.then(result => {
655
if (--todo >= 0 && shouldStop(result)) {
656
finish();
657
resolve(result);
658
} else if (todo === 0) {
659
resolve(defaultValue);
660
}
661
})
662
.catch(err => {
663
if (--todo >= 0) {
664
finish();
665
reject(err);
666
}
667
});
668
}
669
});
670
}
671
672
interface ILimitedTaskFactory<T> {
673
factory: ITask<Promise<T>>;
674
c: (value: T | Promise<T>) => void;
675
e: (error?: unknown) => void;
676
}
677
678
export interface ILimiter<T> {
679
680
readonly size: number;
681
682
queue(factory: ITask<Promise<T>>): Promise<T>;
683
684
clear(): void;
685
}
686
687
/**
688
* A helper to queue N promises and run them all with a max degree of parallelism. The helper
689
* ensures that at any time no more than M promises are running at the same time.
690
*/
691
export class Limiter<T> implements ILimiter<T> {
692
693
private _size = 0;
694
private _isDisposed = false;
695
private runningPromises: number;
696
private readonly maxDegreeOfParalellism: number;
697
private readonly outstandingPromises: ILimitedTaskFactory<T>[];
698
private readonly _onDrained: Emitter<void>;
699
700
constructor(maxDegreeOfParalellism: number) {
701
this.maxDegreeOfParalellism = maxDegreeOfParalellism;
702
this.outstandingPromises = [];
703
this.runningPromises = 0;
704
this._onDrained = new Emitter<void>();
705
}
706
707
/**
708
*
709
* @returns A promise that resolved when all work is done (onDrained) or when
710
* there is nothing to do
711
*/
712
whenIdle(): Promise<void> {
713
return this.size > 0
714
? Event.toPromise(this.onDrained)
715
: Promise.resolve();
716
}
717
718
get onDrained(): Event<void> {
719
return this._onDrained.event;
720
}
721
722
get size(): number {
723
return this._size;
724
}
725
726
queue(factory: ITask<Promise<T>>): Promise<T> {
727
if (this._isDisposed) {
728
throw new Error('Object has been disposed');
729
}
730
this._size++;
731
732
return new Promise<T>((c, e) => {
733
this.outstandingPromises.push({ factory, c, e });
734
this.consume();
735
});
736
}
737
738
private consume(): void {
739
while (this.outstandingPromises.length && this.runningPromises < this.maxDegreeOfParalellism) {
740
const iLimitedTask = this.outstandingPromises.shift()!;
741
this.runningPromises++;
742
743
const promise = iLimitedTask.factory();
744
promise.then(iLimitedTask.c, iLimitedTask.e);
745
promise.then(() => this.consumed(), () => this.consumed());
746
}
747
}
748
749
private consumed(): void {
750
if (this._isDisposed) {
751
return;
752
}
753
this.runningPromises--;
754
if (--this._size === 0) {
755
this._onDrained.fire();
756
}
757
758
if (this.outstandingPromises.length > 0) {
759
this.consume();
760
}
761
}
762
763
clear(): void {
764
if (this._isDisposed) {
765
throw new Error('Object has been disposed');
766
}
767
this.outstandingPromises.length = 0;
768
this._size = this.runningPromises;
769
}
770
771
dispose(): void {
772
this._isDisposed = true;
773
this.outstandingPromises.length = 0; // stop further processing
774
this._size = 0;
775
this._onDrained.dispose();
776
}
777
}
778
779
/**
780
* A queue is handles one promise at a time and guarantees that at any time only one promise is executing.
781
*/
782
export class Queue<T> extends Limiter<T> {
783
784
constructor() {
785
super(1);
786
}
787
}
788
789
/**
790
* Same as `Queue`, ensures that only 1 task is executed at the same time. The difference to `Queue` is that
791
* there is only 1 task about to be scheduled next. As such, calling `queue` while a task is executing will
792
* replace the currently queued task until it executes.
793
*
794
* As such, the returned promise may not be from the factory that is passed in but from the next factory that
795
* is running after having called `queue`.
796
*/
797
export class LimitedQueue {
798
799
private readonly sequentializer = new TaskSequentializer();
800
801
private tasks = 0;
802
803
queue(factory: ITask<Promise<void>>): Promise<void> {
804
if (!this.sequentializer.isRunning()) {
805
return this.sequentializer.run(this.tasks++, factory());
806
}
807
808
return this.sequentializer.queue(() => {
809
return this.sequentializer.run(this.tasks++, factory());
810
});
811
}
812
}
813
814
/**
815
* A helper to organize queues per resource. The ResourceQueue makes sure to manage queues per resource
816
* by disposing them once the queue is empty.
817
*/
818
export class ResourceQueue implements IDisposable {
819
820
private readonly queues = new Map<string, Queue<void>>();
821
822
private readonly drainers = new Set<DeferredPromise<void>>();
823
824
private drainListeners: DisposableMap<number> | undefined = undefined;
825
private drainListenerCount = 0;
826
827
async whenDrained(): Promise<void> {
828
if (this.isDrained()) {
829
return;
830
}
831
832
const promise = new DeferredPromise<void>();
833
this.drainers.add(promise);
834
835
return promise.p;
836
}
837
838
private isDrained(): boolean {
839
for (const [, queue] of this.queues) {
840
if (queue.size > 0) {
841
return false;
842
}
843
}
844
845
return true;
846
}
847
848
queueSize(resource: URI, extUri: IExtUri = defaultExtUri): number {
849
const key = extUri.getComparisonKey(resource);
850
851
return this.queues.get(key)?.size ?? 0;
852
}
853
854
queueFor(resource: URI, factory: ITask<Promise<void>>, extUri: IExtUri = defaultExtUri): Promise<void> {
855
const key = extUri.getComparisonKey(resource);
856
857
let queue = this.queues.get(key);
858
if (!queue) {
859
queue = new Queue<void>();
860
const drainListenerId = this.drainListenerCount++;
861
const drainListener = Event.once(queue.onDrained)(() => {
862
queue?.dispose();
863
this.queues.delete(key);
864
this.onDidQueueDrain();
865
866
this.drainListeners?.deleteAndDispose(drainListenerId);
867
868
if (this.drainListeners?.size === 0) {
869
this.drainListeners.dispose();
870
this.drainListeners = undefined;
871
}
872
});
873
874
if (!this.drainListeners) {
875
this.drainListeners = new DisposableMap();
876
}
877
this.drainListeners.set(drainListenerId, drainListener);
878
879
this.queues.set(key, queue);
880
}
881
882
return queue.queue(factory);
883
}
884
885
private onDidQueueDrain(): void {
886
if (!this.isDrained()) {
887
return; // not done yet
888
}
889
890
this.releaseDrainers();
891
}
892
893
private releaseDrainers(): void {
894
for (const drainer of this.drainers) {
895
drainer.complete();
896
}
897
898
this.drainers.clear();
899
}
900
901
dispose(): void {
902
for (const [, queue] of this.queues) {
903
queue.dispose();
904
}
905
906
this.queues.clear();
907
908
// Even though we might still have pending
909
// tasks queued, after the queues have been
910
// disposed, we can no longer track them, so
911
// we release drainers to prevent hanging
912
// promises when the resource queue is being
913
// disposed.
914
this.releaseDrainers();
915
916
this.drainListeners?.dispose();
917
}
918
}
919
920
export type Task<T = void> = () => (Promise<T> | T);
921
922
/**
923
* Processes tasks in the order they were scheduled.
924
*/
925
export class TaskQueue {
926
private _runningTask: Task<any> | undefined = undefined;
927
private _pendingTasks: { task: Task<any>; deferred: DeferredPromise<any>; setUndefinedWhenCleared: boolean }[] = [];
928
929
/**
930
* Waits for the current and pending tasks to finish, then runs and awaits the given task.
931
* If the task is skipped because of clearPending, the promise is rejected with a CancellationError.
932
*/
933
public schedule<T>(task: Task<T>): Promise<T> {
934
const deferred = new DeferredPromise<T>();
935
this._pendingTasks.push({ task, deferred, setUndefinedWhenCleared: false });
936
this._runIfNotRunning();
937
return deferred.p;
938
}
939
940
/**
941
* Waits for the current and pending tasks to finish, then runs and awaits the given task.
942
* If the task is skipped because of clearPending, the promise is resolved with undefined.
943
*/
944
public scheduleSkipIfCleared<T>(task: Task<T>): Promise<T | undefined> {
945
const deferred = new DeferredPromise<T>();
946
this._pendingTasks.push({ task, deferred, setUndefinedWhenCleared: true });
947
this._runIfNotRunning();
948
return deferred.p;
949
}
950
951
private _runIfNotRunning(): void {
952
if (this._runningTask === undefined) {
953
this._processQueue();
954
}
955
}
956
957
private async _processQueue(): Promise<void> {
958
if (this._pendingTasks.length === 0) {
959
return;
960
}
961
962
const next = this._pendingTasks.shift();
963
if (!next) {
964
return;
965
}
966
967
if (this._runningTask) {
968
throw new BugIndicatingError();
969
}
970
971
this._runningTask = next.task;
972
973
try {
974
const result = await next.task();
975
next.deferred.complete(result);
976
} catch (e) {
977
next.deferred.error(e);
978
} finally {
979
this._runningTask = undefined;
980
this._processQueue();
981
}
982
}
983
984
/**
985
* Clears all pending tasks. Does not cancel the currently running task.
986
*/
987
public clearPending(): void {
988
const tasks = this._pendingTasks;
989
this._pendingTasks = [];
990
for (const task of tasks) {
991
if (task.setUndefinedWhenCleared) {
992
task.deferred.complete(undefined);
993
} else {
994
task.deferred.error(new CancellationError());
995
}
996
}
997
}
998
}
999
1000
export class TimeoutTimer implements IDisposable {
1001
private _token: Timeout | undefined;
1002
private _isDisposed = false;
1003
1004
constructor();
1005
constructor(runner: () => void, timeout: number);
1006
constructor(runner?: () => void, timeout?: number) {
1007
this._token = undefined;
1008
1009
if (typeof runner === 'function' && typeof timeout === 'number') {
1010
this.setIfNotSet(runner, timeout);
1011
}
1012
}
1013
1014
dispose(): void {
1015
this.cancel();
1016
this._isDisposed = true;
1017
}
1018
1019
cancel(): void {
1020
if (this._token !== undefined) {
1021
clearTimeout(this._token);
1022
this._token = undefined;
1023
}
1024
}
1025
1026
cancelAndSet(runner: () => void, timeout: number): void {
1027
if (this._isDisposed) {
1028
throw new BugIndicatingError(`Calling 'cancelAndSet' on a disposed TimeoutTimer`);
1029
}
1030
1031
this.cancel();
1032
this._token = setTimeout(() => {
1033
this._token = undefined;
1034
runner();
1035
}, timeout);
1036
}
1037
1038
setIfNotSet(runner: () => void, timeout: number): void {
1039
if (this._isDisposed) {
1040
throw new BugIndicatingError(`Calling 'setIfNotSet' on a disposed TimeoutTimer`);
1041
}
1042
1043
if (this._token !== undefined) {
1044
// timer is already set
1045
return;
1046
}
1047
this._token = setTimeout(() => {
1048
this._token = undefined;
1049
runner();
1050
}, timeout);
1051
}
1052
}
1053
1054
export class IntervalTimer implements IDisposable {
1055
1056
private disposable: IDisposable | undefined = undefined;
1057
private isDisposed = false;
1058
1059
cancel(): void {
1060
this.disposable?.dispose();
1061
this.disposable = undefined;
1062
}
1063
1064
cancelAndSet(runner: () => void, interval: number, context = globalThis): void {
1065
if (this.isDisposed) {
1066
throw new BugIndicatingError(`Calling 'cancelAndSet' on a disposed IntervalTimer`);
1067
}
1068
1069
this.cancel();
1070
const handle = context.setInterval(() => {
1071
runner();
1072
}, interval);
1073
1074
this.disposable = toDisposable(() => {
1075
context.clearInterval(handle);
1076
this.disposable = undefined;
1077
});
1078
}
1079
1080
dispose(): void {
1081
this.cancel();
1082
this.isDisposed = true;
1083
}
1084
}
1085
1086
export class RunOnceScheduler implements IDisposable {
1087
1088
protected runner: ((...args: unknown[]) => void) | null;
1089
1090
private timeoutToken: Timeout | undefined;
1091
private timeout: number;
1092
private timeoutHandler: () => void;
1093
1094
constructor(runner: (...args: any[]) => void, delay: number) {
1095
this.timeoutToken = undefined;
1096
this.runner = runner;
1097
this.timeout = delay;
1098
this.timeoutHandler = this.onTimeout.bind(this);
1099
}
1100
1101
/**
1102
* Dispose RunOnceScheduler
1103
*/
1104
dispose(): void {
1105
this.cancel();
1106
this.runner = null;
1107
}
1108
1109
/**
1110
* Cancel current scheduled runner (if any).
1111
*/
1112
cancel(): void {
1113
if (this.isScheduled()) {
1114
clearTimeout(this.timeoutToken);
1115
this.timeoutToken = undefined;
1116
}
1117
}
1118
1119
/**
1120
* Cancel previous runner (if any) & schedule a new runner.
1121
*/
1122
schedule(delay = this.timeout): void {
1123
this.cancel();
1124
this.timeoutToken = setTimeout(this.timeoutHandler, delay);
1125
}
1126
1127
get delay(): number {
1128
return this.timeout;
1129
}
1130
1131
set delay(value: number) {
1132
this.timeout = value;
1133
}
1134
1135
/**
1136
* Returns true if scheduled.
1137
*/
1138
isScheduled(): boolean {
1139
return this.timeoutToken !== undefined;
1140
}
1141
1142
flush(): void {
1143
if (this.isScheduled()) {
1144
this.cancel();
1145
this.doRun();
1146
}
1147
}
1148
1149
private onTimeout() {
1150
this.timeoutToken = undefined;
1151
if (this.runner) {
1152
this.doRun();
1153
}
1154
}
1155
1156
protected doRun(): void {
1157
this.runner?.();
1158
}
1159
}
1160
1161
/**
1162
* Same as `RunOnceScheduler`, but doesn't count the time spent in sleep mode.
1163
* > **NOTE**: Only offers 1s resolution.
1164
*
1165
* When calling `setTimeout` with 3hrs, and putting the computer immediately to sleep
1166
* for 8hrs, `setTimeout` will fire **as soon as the computer wakes from sleep**. But
1167
* this scheduler will execute 3hrs **after waking the computer from sleep**.
1168
*/
1169
export class ProcessTimeRunOnceScheduler {
1170
1171
private runner: (() => void) | null;
1172
private timeout: number;
1173
1174
private counter: number;
1175
private intervalToken: Timeout | undefined;
1176
private intervalHandler: () => void;
1177
1178
constructor(runner: () => void, delay: number) {
1179
if (delay % 1000 !== 0) {
1180
console.warn(`ProcessTimeRunOnceScheduler resolution is 1s, ${delay}ms is not a multiple of 1000ms.`);
1181
}
1182
this.runner = runner;
1183
this.timeout = delay;
1184
this.counter = 0;
1185
this.intervalToken = undefined;
1186
this.intervalHandler = this.onInterval.bind(this);
1187
}
1188
1189
dispose(): void {
1190
this.cancel();
1191
this.runner = null;
1192
}
1193
1194
cancel(): void {
1195
if (this.isScheduled()) {
1196
clearInterval(this.intervalToken);
1197
this.intervalToken = undefined;
1198
}
1199
}
1200
1201
/**
1202
* Cancel previous runner (if any) & schedule a new runner.
1203
*/
1204
schedule(delay = this.timeout): void {
1205
if (delay % 1000 !== 0) {
1206
console.warn(`ProcessTimeRunOnceScheduler resolution is 1s, ${delay}ms is not a multiple of 1000ms.`);
1207
}
1208
this.cancel();
1209
this.counter = Math.ceil(delay / 1000);
1210
this.intervalToken = setInterval(this.intervalHandler, 1000);
1211
}
1212
1213
/**
1214
* Returns true if scheduled.
1215
*/
1216
isScheduled(): boolean {
1217
return this.intervalToken !== undefined;
1218
}
1219
1220
private onInterval() {
1221
this.counter--;
1222
if (this.counter > 0) {
1223
// still need to wait
1224
return;
1225
}
1226
1227
// time elapsed
1228
clearInterval(this.intervalToken);
1229
this.intervalToken = undefined;
1230
this.runner?.();
1231
}
1232
}
1233
1234
export class RunOnceWorker<T> extends RunOnceScheduler {
1235
1236
private units: T[] = [];
1237
1238
constructor(runner: (units: T[]) => void, timeout: number) {
1239
super(runner, timeout);
1240
}
1241
1242
work(unit: T): void {
1243
this.units.push(unit);
1244
1245
if (!this.isScheduled()) {
1246
this.schedule();
1247
}
1248
}
1249
1250
protected override doRun(): void {
1251
const units = this.units;
1252
this.units = [];
1253
1254
this.runner?.(units);
1255
}
1256
1257
override dispose(): void {
1258
this.units = [];
1259
1260
super.dispose();
1261
}
1262
}
1263
1264
export interface IThrottledWorkerOptions {
1265
1266
/**
1267
* maximum of units the worker will pass onto handler at once
1268
*/
1269
maxWorkChunkSize: number;
1270
1271
/**
1272
* maximum of units the worker will keep in memory for processing
1273
*/
1274
maxBufferedWork: number | undefined;
1275
1276
/**
1277
* delay before processing the next round of chunks when chunk size exceeds limits
1278
*/
1279
throttleDelay: number;
1280
1281
/**
1282
* When enabled will guarantee that two distinct calls to `work()` are not executed
1283
* without throttle delay between them.
1284
* Otherwise if the worker isn't currently throttling it will execute work immediately.
1285
*/
1286
waitThrottleDelayBetweenWorkUnits?: boolean;
1287
}
1288
1289
/**
1290
* The `ThrottledWorker` will accept units of work `T`
1291
* to handle. The contract is:
1292
* * there is a maximum of units the worker can handle at once (via `maxWorkChunkSize`)
1293
* * there is a maximum of units the worker will keep in memory for processing (via `maxBufferedWork`)
1294
* * after having handled `maxWorkChunkSize` units, the worker needs to rest (via `throttleDelay`)
1295
*/
1296
export class ThrottledWorker<T> extends Disposable {
1297
1298
private readonly pendingWork: T[] = [];
1299
1300
private readonly throttler = this._register(new MutableDisposable<RunOnceScheduler>());
1301
private disposed = false;
1302
private lastExecutionTime = 0;
1303
1304
constructor(
1305
private options: IThrottledWorkerOptions,
1306
private readonly handler: (units: T[]) => void
1307
) {
1308
super();
1309
}
1310
1311
/**
1312
* The number of work units that are pending to be processed.
1313
*/
1314
get pending(): number { return this.pendingWork.length; }
1315
1316
/**
1317
* Add units to be worked on. Use `pending` to figure out
1318
* how many units are not yet processed after this method
1319
* was called.
1320
*
1321
* @returns whether the work was accepted or not. If the
1322
* worker is disposed, it will not accept any more work.
1323
* If the number of pending units would become larger
1324
* than `maxPendingWork`, more work will also not be accepted.
1325
*/
1326
work(units: readonly T[]): boolean {
1327
if (this.disposed) {
1328
return false; // work not accepted: disposed
1329
}
1330
1331
// Check for reaching maximum of pending work
1332
if (typeof this.options.maxBufferedWork === 'number') {
1333
1334
// Throttled: simple check if pending + units exceeds max pending
1335
if (this.throttler.value) {
1336
if (this.pending + units.length > this.options.maxBufferedWork) {
1337
return false; // work not accepted: too much pending work
1338
}
1339
}
1340
1341
// Unthrottled: same as throttled, but account for max chunk getting
1342
// worked on directly without being pending
1343
else {
1344
if (this.pending + units.length - this.options.maxWorkChunkSize > this.options.maxBufferedWork) {
1345
return false; // work not accepted: too much pending work
1346
}
1347
}
1348
}
1349
1350
// Add to pending units first
1351
for (const unit of units) {
1352
this.pendingWork.push(unit);
1353
}
1354
1355
const timeSinceLastExecution = Date.now() - this.lastExecutionTime;
1356
1357
if (!this.throttler.value && (!this.options.waitThrottleDelayBetweenWorkUnits || timeSinceLastExecution >= this.options.throttleDelay)) {
1358
// Work directly if we are not throttling and we are not
1359
// enforced to throttle between `work()` calls.
1360
this.doWork();
1361
} else if (!this.throttler.value && this.options.waitThrottleDelayBetweenWorkUnits) {
1362
// Otherwise, schedule the throttler to work.
1363
this.scheduleThrottler(Math.max(this.options.throttleDelay - timeSinceLastExecution, 0));
1364
} else {
1365
// Otherwise, our work will be picked up by the running throttler
1366
}
1367
1368
return true; // work accepted
1369
}
1370
1371
private doWork(): void {
1372
this.lastExecutionTime = Date.now();
1373
1374
// Extract chunk to handle and handle it
1375
this.handler(this.pendingWork.splice(0, this.options.maxWorkChunkSize));
1376
1377
// If we have remaining work, schedule it after a delay
1378
if (this.pendingWork.length > 0) {
1379
this.scheduleThrottler();
1380
}
1381
}
1382
1383
private scheduleThrottler(delay = this.options.throttleDelay): void {
1384
this.throttler.value = new RunOnceScheduler(() => {
1385
this.throttler.clear();
1386
1387
this.doWork();
1388
}, delay);
1389
this.throttler.value.schedule();
1390
}
1391
1392
override dispose(): void {
1393
super.dispose();
1394
1395
this.pendingWork.length = 0;
1396
this.disposed = true;
1397
}
1398
}
1399
1400
//#region -- run on idle tricks ------------
1401
1402
export interface IdleDeadline {
1403
readonly didTimeout: boolean;
1404
timeRemaining(): number;
1405
}
1406
1407
type IdleApi = Pick<typeof globalThis, 'requestIdleCallback' | 'cancelIdleCallback'>;
1408
1409
1410
/**
1411
* Execute the callback the next time the browser is idle, returning an
1412
* {@link IDisposable} that will cancel the callback when disposed. This wraps
1413
* [requestIdleCallback] so it will fallback to [setTimeout] if the environment
1414
* doesn't support it.
1415
*
1416
* @param callback The callback to run when idle, this includes an
1417
* [IdleDeadline] that provides the time alloted for the idle callback by the
1418
* browser. Not respecting this deadline will result in a degraded user
1419
* experience.
1420
* @param timeout A timeout at which point to queue no longer wait for an idle
1421
* callback but queue it on the regular event loop (like setTimeout). Typically
1422
* this should not be used.
1423
*
1424
* [IdleDeadline]: https://developer.mozilla.org/en-US/docs/Web/API/IdleDeadline
1425
* [requestIdleCallback]: https://developer.mozilla.org/en-US/docs/Web/API/Window/requestIdleCallback
1426
* [setTimeout]: https://developer.mozilla.org/en-US/docs/Web/API/Window/setTimeout
1427
*
1428
* **Note** that there is `dom.ts#runWhenWindowIdle` which is better suited when running inside a browser
1429
* context
1430
*/
1431
export let runWhenGlobalIdle: (callback: (idle: IdleDeadline) => void, timeout?: number) => IDisposable;
1432
1433
export let _runWhenIdle: (targetWindow: IdleApi, callback: (idle: IdleDeadline) => void, timeout?: number) => IDisposable;
1434
1435
(function () {
1436
const safeGlobal: any = globalThis;
1437
if (typeof safeGlobal.requestIdleCallback !== 'function' || typeof safeGlobal.cancelIdleCallback !== 'function') {
1438
_runWhenIdle = (_targetWindow, runner, timeout?) => {
1439
setTimeout0(() => {
1440
if (disposed) {
1441
return;
1442
}
1443
const end = Date.now() + 15; // one frame at 64fps
1444
const deadline: IdleDeadline = {
1445
didTimeout: true,
1446
timeRemaining() {
1447
return Math.max(0, end - Date.now());
1448
}
1449
};
1450
runner(Object.freeze(deadline));
1451
});
1452
let disposed = false;
1453
return {
1454
dispose() {
1455
if (disposed) {
1456
return;
1457
}
1458
disposed = true;
1459
}
1460
};
1461
};
1462
} else {
1463
_runWhenIdle = (targetWindow: typeof safeGlobal, runner, timeout?) => {
1464
const handle: number = targetWindow.requestIdleCallback(runner, typeof timeout === 'number' ? { timeout } : undefined);
1465
let disposed = false;
1466
return {
1467
dispose() {
1468
if (disposed) {
1469
return;
1470
}
1471
disposed = true;
1472
targetWindow.cancelIdleCallback(handle);
1473
}
1474
};
1475
};
1476
}
1477
runWhenGlobalIdle = (runner, timeout) => _runWhenIdle(globalThis, runner, timeout);
1478
})();
1479
1480
export abstract class AbstractIdleValue<T> {
1481
1482
private readonly _executor: () => void;
1483
private readonly _handle: IDisposable;
1484
1485
private _didRun: boolean = false;
1486
private _value?: T;
1487
private _error: unknown;
1488
1489
constructor(targetWindow: IdleApi, executor: () => T) {
1490
this._executor = () => {
1491
try {
1492
this._value = executor();
1493
} catch (err) {
1494
this._error = err;
1495
} finally {
1496
this._didRun = true;
1497
}
1498
};
1499
this._handle = _runWhenIdle(targetWindow, () => this._executor());
1500
}
1501
1502
dispose(): void {
1503
this._handle.dispose();
1504
}
1505
1506
get value(): T {
1507
if (!this._didRun) {
1508
this._handle.dispose();
1509
this._executor();
1510
}
1511
if (this._error) {
1512
throw this._error;
1513
}
1514
return this._value!;
1515
}
1516
1517
get isInitialized(): boolean {
1518
return this._didRun;
1519
}
1520
}
1521
1522
/**
1523
* An `IdleValue` that always uses the current window (which might be throttled or inactive)
1524
*
1525
* **Note** that there is `dom.ts#WindowIdleValue` which is better suited when running inside a browser
1526
* context
1527
*/
1528
export class GlobalIdleValue<T> extends AbstractIdleValue<T> {
1529
1530
constructor(executor: () => T) {
1531
super(globalThis, executor);
1532
}
1533
}
1534
1535
//#endregion
1536
1537
export async function retry<T>(task: ITask<Promise<T>>, delay: number, retries: number): Promise<T> {
1538
let lastError: Error | undefined;
1539
1540
for (let i = 0; i < retries; i++) {
1541
try {
1542
return await task();
1543
} catch (error) {
1544
lastError = error;
1545
1546
await timeout(delay);
1547
}
1548
}
1549
1550
throw lastError;
1551
}
1552
1553
//#region Task Sequentializer
1554
1555
interface IRunningTask {
1556
readonly taskId: number;
1557
readonly cancel: () => void;
1558
readonly promise: Promise<void>;
1559
}
1560
1561
interface IQueuedTask {
1562
readonly promise: Promise<void>;
1563
readonly promiseResolve: () => void;
1564
readonly promiseReject: (error: Error) => void;
1565
run: ITask<Promise<void>>;
1566
}
1567
1568
export interface ITaskSequentializerWithRunningTask {
1569
readonly running: Promise<void>;
1570
}
1571
1572
export interface ITaskSequentializerWithQueuedTask {
1573
readonly queued: IQueuedTask;
1574
}
1575
1576
/**
1577
* @deprecated use `LimitedQueue` instead for an easier to use API
1578
*/
1579
export class TaskSequentializer {
1580
1581
private _running?: IRunningTask;
1582
private _queued?: IQueuedTask;
1583
1584
isRunning(taskId?: number): this is ITaskSequentializerWithRunningTask {
1585
if (typeof taskId === 'number') {
1586
return this._running?.taskId === taskId;
1587
}
1588
1589
return !!this._running;
1590
}
1591
1592
get running(): Promise<void> | undefined {
1593
return this._running?.promise;
1594
}
1595
1596
cancelRunning(): void {
1597
this._running?.cancel();
1598
}
1599
1600
run(taskId: number, promise: Promise<void>, onCancel?: () => void,): Promise<void> {
1601
this._running = { taskId, cancel: () => onCancel?.(), promise };
1602
1603
promise.then(() => this.doneRunning(taskId), () => this.doneRunning(taskId));
1604
1605
return promise;
1606
}
1607
1608
private doneRunning(taskId: number): void {
1609
if (this._running && taskId === this._running.taskId) {
1610
1611
// only set running to done if the promise finished that is associated with that taskId
1612
this._running = undefined;
1613
1614
// schedule the queued task now that we are free if we have any
1615
this.runQueued();
1616
}
1617
}
1618
1619
private runQueued(): void {
1620
if (this._queued) {
1621
const queued = this._queued;
1622
this._queued = undefined;
1623
1624
// Run queued task and complete on the associated promise
1625
queued.run().then(queued.promiseResolve, queued.promiseReject);
1626
}
1627
}
1628
1629
/**
1630
* Note: the promise to schedule as next run MUST itself call `run`.
1631
* Otherwise, this sequentializer will report `false` for `isRunning`
1632
* even when this task is running. Missing this detail means that
1633
* suddenly multiple tasks will run in parallel.
1634
*/
1635
queue(run: ITask<Promise<void>>): Promise<void> {
1636
1637
// this is our first queued task, so we create associated promise with it
1638
// so that we can return a promise that completes when the task has
1639
// completed.
1640
if (!this._queued) {
1641
const { promise, resolve: promiseResolve, reject: promiseReject } = promiseWithResolvers<void>();
1642
this._queued = {
1643
run,
1644
promise,
1645
promiseResolve: promiseResolve!,
1646
promiseReject: promiseReject!
1647
};
1648
}
1649
1650
// we have a previous queued task, just overwrite it
1651
else {
1652
this._queued.run = run;
1653
}
1654
1655
return this._queued.promise;
1656
}
1657
1658
hasQueued(): this is ITaskSequentializerWithQueuedTask {
1659
return !!this._queued;
1660
}
1661
1662
async join(): Promise<void> {
1663
return this._queued?.promise ?? this._running?.promise;
1664
}
1665
}
1666
1667
//#endregion
1668
1669
//#region
1670
1671
/**
1672
* The `IntervalCounter` allows to count the number
1673
* of calls to `increment()` over a duration of
1674
* `interval`. This utility can be used to conditionally
1675
* throttle a frequent task when a certain threshold
1676
* is reached.
1677
*/
1678
export class IntervalCounter {
1679
1680
private lastIncrementTime = 0;
1681
1682
private value = 0;
1683
1684
constructor(private readonly interval: number, private readonly nowFn = () => Date.now()) { }
1685
1686
increment(): number {
1687
const now = this.nowFn();
1688
1689
// We are outside of the range of `interval` and as such
1690
// start counting from 0 and remember the time
1691
if (now - this.lastIncrementTime > this.interval) {
1692
this.lastIncrementTime = now;
1693
this.value = 0;
1694
}
1695
1696
this.value++;
1697
1698
return this.value;
1699
}
1700
}
1701
1702
//#endregion
1703
1704
//#region
1705
1706
export type ValueCallback<T = unknown> = (value: T | Promise<T>) => void;
1707
1708
const enum DeferredOutcome {
1709
Resolved,
1710
Rejected
1711
}
1712
1713
/**
1714
* Creates a promise whose resolution or rejection can be controlled imperatively.
1715
*/
1716
export class DeferredPromise<T> {
1717
1718
private completeCallback!: ValueCallback<T>;
1719
private errorCallback!: (err: unknown) => void;
1720
private outcome?: { outcome: DeferredOutcome.Rejected; value: unknown } | { outcome: DeferredOutcome.Resolved; value: T };
1721
1722
public get isRejected() {
1723
return this.outcome?.outcome === DeferredOutcome.Rejected;
1724
}
1725
1726
public get isResolved() {
1727
return this.outcome?.outcome === DeferredOutcome.Resolved;
1728
}
1729
1730
public get isSettled() {
1731
return !!this.outcome;
1732
}
1733
1734
public get value() {
1735
return this.outcome?.outcome === DeferredOutcome.Resolved ? this.outcome?.value : undefined;
1736
}
1737
1738
public readonly p: Promise<T>;
1739
1740
constructor() {
1741
this.p = new Promise<T>((c, e) => {
1742
this.completeCallback = c;
1743
this.errorCallback = e;
1744
});
1745
}
1746
1747
public complete(value: T) {
1748
return new Promise<void>(resolve => {
1749
this.completeCallback(value);
1750
this.outcome = { outcome: DeferredOutcome.Resolved, value };
1751
resolve();
1752
});
1753
}
1754
1755
public error(err: unknown) {
1756
return new Promise<void>(resolve => {
1757
this.errorCallback(err);
1758
this.outcome = { outcome: DeferredOutcome.Rejected, value: err };
1759
resolve();
1760
});
1761
}
1762
1763
public settleWith(promise: Promise<T>): Promise<void> {
1764
return promise.then(
1765
value => this.complete(value),
1766
error => this.error(error)
1767
);
1768
}
1769
1770
public cancel() {
1771
return this.error(new CancellationError());
1772
}
1773
}
1774
1775
//#endregion
1776
1777
//#region Promises
1778
1779
export namespace Promises {
1780
1781
/**
1782
* A drop-in replacement for `Promise.all` with the only difference
1783
* that the method awaits every promise to either fulfill or reject.
1784
*
1785
* Similar to `Promise.all`, only the first error will be returned
1786
* if any.
1787
*/
1788
export async function settled<T>(promises: Promise<T>[]): Promise<T[]> {
1789
let firstError: Error | undefined = undefined;
1790
1791
const result = await Promise.all(promises.map(promise => promise.then(value => value, error => {
1792
if (!firstError) {
1793
firstError = error;
1794
}
1795
1796
return undefined; // do not rethrow so that other promises can settle
1797
})));
1798
1799
if (typeof firstError !== 'undefined') {
1800
throw firstError;
1801
}
1802
1803
return result as unknown as T[]; // cast is needed and protected by the `throw` above
1804
}
1805
1806
/**
1807
* A helper to create a new `Promise<T>` with a body that is a promise
1808
* itself. By default, an error that raises from the async body will
1809
* end up as a unhandled rejection, so this utility properly awaits the
1810
* body and rejects the promise as a normal promise does without async
1811
* body.
1812
*
1813
* This method should only be used in rare cases where otherwise `async`
1814
* cannot be used (e.g. when callbacks are involved that require this).
1815
*/
1816
export function withAsyncBody<T, E = Error>(bodyFn: (resolve: (value: T) => unknown, reject: (error: E) => unknown) => Promise<unknown>): Promise<T> {
1817
// eslint-disable-next-line no-async-promise-executor
1818
return new Promise<T>(async (resolve, reject) => {
1819
try {
1820
await bodyFn(resolve, reject);
1821
} catch (error) {
1822
reject(error);
1823
}
1824
});
1825
}
1826
}
1827
1828
export class StatefulPromise<T> {
1829
private _value: T | undefined = undefined;
1830
get value(): T | undefined { return this._value; }
1831
1832
private _error: unknown = undefined;
1833
get error(): unknown { return this._error; }
1834
1835
private _isResolved = false;
1836
get isResolved() { return this._isResolved; }
1837
1838
public readonly promise: Promise<T>;
1839
1840
constructor(promise: Promise<T>) {
1841
this.promise = promise.then(
1842
value => {
1843
this._value = value;
1844
this._isResolved = true;
1845
return value;
1846
},
1847
error => {
1848
this._error = error;
1849
this._isResolved = true;
1850
throw error;
1851
}
1852
);
1853
}
1854
1855
/**
1856
* Returns the resolved value.
1857
* Throws if the promise is not resolved yet.
1858
*/
1859
public requireValue(): T {
1860
if (!this._isResolved) {
1861
throw new BugIndicatingError('Promise is not resolved yet');
1862
}
1863
if (this._error) {
1864
throw this._error;
1865
}
1866
return this._value!;
1867
}
1868
}
1869
1870
export class LazyStatefulPromise<T> {
1871
private readonly _promise = new Lazy(() => new StatefulPromise(this._compute()));
1872
1873
constructor(
1874
private readonly _compute: () => Promise<T>,
1875
) { }
1876
1877
/**
1878
* Returns the resolved value.
1879
* Throws if the promise is not resolved yet.
1880
*/
1881
public requireValue(): T {
1882
return this._promise.value.requireValue();
1883
}
1884
1885
/**
1886
* Returns the promise (and triggers a computation of the promise if not yet done so).
1887
*/
1888
public getPromise(): Promise<T> {
1889
return this._promise.value.promise;
1890
}
1891
1892
/**
1893
* Reads the current value without triggering a computation of the promise.
1894
*/
1895
public get currentValue(): T | undefined {
1896
return this._promise.rawValue?.value;
1897
}
1898
}
1899
1900
//#endregion
1901
1902
//#region
1903
1904
const enum AsyncIterableSourceState {
1905
Initial,
1906
DoneOK,
1907
DoneError,
1908
}
1909
1910
/**
1911
* An object that allows to emit async values asynchronously or bring the iterable to an error state using `reject()`.
1912
* This emitter is valid only for the duration of the executor (until the promise returned by the executor settles).
1913
*/
1914
export interface AsyncIterableEmitter<T> {
1915
/**
1916
* The value will be appended at the end.
1917
*
1918
* **NOTE** If `reject()` has already been called, this method has no effect.
1919
*/
1920
emitOne(value: T): void;
1921
/**
1922
* The values will be appended at the end.
1923
*
1924
* **NOTE** If `reject()` has already been called, this method has no effect.
1925
*/
1926
emitMany(values: T[]): void;
1927
/**
1928
* Writing an error will permanently invalidate this iterable.
1929
* The current users will receive an error thrown, as will all future users.
1930
*
1931
* **NOTE** If `reject()` have already been called, this method has no effect.
1932
*/
1933
reject(error: Error): void;
1934
}
1935
1936
/**
1937
* An executor for the `AsyncIterableObject` that has access to an emitter.
1938
*/
1939
export interface AsyncIterableExecutor<T> {
1940
/**
1941
* @param emitter An object that allows to emit async values valid only for the duration of the executor.
1942
*/
1943
(emitter: AsyncIterableEmitter<T>): unknown | Promise<unknown>;
1944
}
1945
1946
/**
1947
* A rich implementation for an `AsyncIterable<T>`.
1948
*/
1949
export class AsyncIterableObject<T> implements AsyncIterable<T> {
1950
1951
public static fromArray<T>(items: T[]): AsyncIterableObject<T> {
1952
return new AsyncIterableObject<T>((writer) => {
1953
writer.emitMany(items);
1954
});
1955
}
1956
1957
public static fromPromise<T>(promise: Promise<T[]>): AsyncIterableObject<T> {
1958
return new AsyncIterableObject<T>(async (emitter) => {
1959
emitter.emitMany(await promise);
1960
});
1961
}
1962
1963
public static fromPromisesResolveOrder<T>(promises: Promise<T>[]): AsyncIterableObject<T> {
1964
return new AsyncIterableObject<T>(async (emitter) => {
1965
await Promise.all(promises.map(async (p) => emitter.emitOne(await p)));
1966
});
1967
}
1968
1969
public static merge<T>(iterables: AsyncIterable<T>[]): AsyncIterableObject<T> {
1970
return new AsyncIterableObject(async (emitter) => {
1971
await Promise.all(iterables.map(async (iterable) => {
1972
for await (const item of iterable) {
1973
emitter.emitOne(item);
1974
}
1975
}));
1976
});
1977
}
1978
1979
public static EMPTY = AsyncIterableObject.fromArray<any>([]);
1980
1981
private _state: AsyncIterableSourceState;
1982
private _results: T[];
1983
private _error: Error | null;
1984
private readonly _onReturn?: () => void | Promise<void>;
1985
private readonly _onStateChanged: Emitter<void>;
1986
1987
constructor(executor: AsyncIterableExecutor<T>, onReturn?: () => void | Promise<void>) {
1988
this._state = AsyncIterableSourceState.Initial;
1989
this._results = [];
1990
this._error = null;
1991
this._onReturn = onReturn;
1992
this._onStateChanged = new Emitter<void>();
1993
1994
queueMicrotask(async () => {
1995
const writer: AsyncIterableEmitter<T> = {
1996
emitOne: (item) => this.emitOne(item),
1997
emitMany: (items) => this.emitMany(items),
1998
reject: (error) => this.reject(error)
1999
};
2000
try {
2001
await Promise.resolve(executor(writer));
2002
this.resolve();
2003
} catch (err) {
2004
this.reject(err);
2005
} finally {
2006
writer.emitOne = undefined!;
2007
writer.emitMany = undefined!;
2008
writer.reject = undefined!;
2009
}
2010
});
2011
}
2012
2013
[Symbol.asyncIterator](): AsyncIterator<T, undefined, undefined> {
2014
let i = 0;
2015
return {
2016
next: async () => {
2017
do {
2018
if (this._state === AsyncIterableSourceState.DoneError) {
2019
throw this._error;
2020
}
2021
if (i < this._results.length) {
2022
return { done: false, value: this._results[i++] };
2023
}
2024
if (this._state === AsyncIterableSourceState.DoneOK) {
2025
return { done: true, value: undefined };
2026
}
2027
await Event.toPromise(this._onStateChanged.event);
2028
} while (true);
2029
},
2030
return: async () => {
2031
this._onReturn?.();
2032
return { done: true, value: undefined };
2033
}
2034
};
2035
}
2036
2037
public static map<T, R>(iterable: AsyncIterable<T>, mapFn: (item: T) => R): AsyncIterableObject<R> {
2038
return new AsyncIterableObject<R>(async (emitter) => {
2039
for await (const item of iterable) {
2040
emitter.emitOne(mapFn(item));
2041
}
2042
});
2043
}
2044
2045
public map<R>(mapFn: (item: T) => R): AsyncIterableObject<R> {
2046
return AsyncIterableObject.map(this, mapFn);
2047
}
2048
2049
public static filter<T>(iterable: AsyncIterable<T>, filterFn: (item: T) => boolean): AsyncIterableObject<T> {
2050
return new AsyncIterableObject<T>(async (emitter) => {
2051
for await (const item of iterable) {
2052
if (filterFn(item)) {
2053
emitter.emitOne(item);
2054
}
2055
}
2056
});
2057
}
2058
2059
public filter<T2 extends T>(filterFn: (item: T) => item is T2): AsyncIterableObject<T2>;
2060
public filter(filterFn: (item: T) => boolean): AsyncIterableObject<T>;
2061
public filter(filterFn: (item: T) => boolean): AsyncIterableObject<T> {
2062
return AsyncIterableObject.filter(this, filterFn);
2063
}
2064
2065
public static coalesce<T>(iterable: AsyncIterable<T | undefined | null>): AsyncIterableObject<T> {
2066
return <AsyncIterableObject<T>>AsyncIterableObject.filter(iterable, item => !!item);
2067
}
2068
2069
public coalesce(): AsyncIterableObject<NonNullable<T>> {
2070
return AsyncIterableObject.coalesce(this) as AsyncIterableObject<NonNullable<T>>;
2071
}
2072
2073
public static async toPromise<T>(iterable: AsyncIterable<T>): Promise<T[]> {
2074
const result: T[] = [];
2075
for await (const item of iterable) {
2076
result.push(item);
2077
}
2078
return result;
2079
}
2080
2081
public toPromise(): Promise<T[]> {
2082
return AsyncIterableObject.toPromise(this);
2083
}
2084
2085
/**
2086
* The value will be appended at the end.
2087
*
2088
* **NOTE** If `resolve()` or `reject()` have already been called, this method has no effect.
2089
*/
2090
private emitOne(value: T): void {
2091
if (this._state !== AsyncIterableSourceState.Initial) {
2092
return;
2093
}
2094
// it is important to add new values at the end,
2095
// as we may have iterators already running on the array
2096
this._results.push(value);
2097
this._onStateChanged.fire();
2098
}
2099
2100
/**
2101
* The values will be appended at the end.
2102
*
2103
* **NOTE** If `resolve()` or `reject()` have already been called, this method has no effect.
2104
*/
2105
private emitMany(values: T[]): void {
2106
if (this._state !== AsyncIterableSourceState.Initial) {
2107
return;
2108
}
2109
// it is important to add new values at the end,
2110
// as we may have iterators already running on the array
2111
this._results = this._results.concat(values);
2112
this._onStateChanged.fire();
2113
}
2114
2115
/**
2116
* Calling `resolve()` will mark the result array as complete.
2117
*
2118
* **NOTE** `resolve()` must be called, otherwise all consumers of this iterable will hang indefinitely, similar to a non-resolved promise.
2119
* **NOTE** If `resolve()` or `reject()` have already been called, this method has no effect.
2120
*/
2121
private resolve(): void {
2122
if (this._state !== AsyncIterableSourceState.Initial) {
2123
return;
2124
}
2125
this._state = AsyncIterableSourceState.DoneOK;
2126
this._onStateChanged.fire();
2127
}
2128
2129
/**
2130
* Writing an error will permanently invalidate this iterable.
2131
* The current users will receive an error thrown, as will all future users.
2132
*
2133
* **NOTE** If `resolve()` or `reject()` have already been called, this method has no effect.
2134
*/
2135
private reject(error: Error) {
2136
if (this._state !== AsyncIterableSourceState.Initial) {
2137
return;
2138
}
2139
this._state = AsyncIterableSourceState.DoneError;
2140
this._error = error;
2141
this._onStateChanged.fire();
2142
}
2143
}
2144
2145
2146
export function createCancelableAsyncIterableProducer<T>(callback: (token: CancellationToken) => AsyncIterable<T>): CancelableAsyncIterableProducer<T> {
2147
const source = new CancellationTokenSource();
2148
const innerIterable = callback(source.token);
2149
2150
return new CancelableAsyncIterableProducer<T>(source, async (emitter) => {
2151
const subscription = source.token.onCancellationRequested(() => {
2152
subscription.dispose();
2153
source.dispose();
2154
emitter.reject(new CancellationError());
2155
});
2156
try {
2157
for await (const item of innerIterable) {
2158
if (source.token.isCancellationRequested) {
2159
// canceled in the meantime
2160
return;
2161
}
2162
emitter.emitOne(item);
2163
}
2164
subscription.dispose();
2165
source.dispose();
2166
} catch (err) {
2167
subscription.dispose();
2168
source.dispose();
2169
emitter.reject(err);
2170
}
2171
});
2172
}
2173
2174
export class AsyncIterableSource<T> {
2175
2176
private readonly _deferred = new DeferredPromise<void>();
2177
private readonly _asyncIterable: AsyncIterableObject<T>;
2178
2179
private _errorFn: (error: Error) => void;
2180
private _emitOneFn: (item: T) => void;
2181
private _emitManyFn: (item: T[]) => void;
2182
2183
/**
2184
*
2185
* @param onReturn A function that will be called when consuming the async iterable
2186
* has finished by the consumer, e.g the for-await-loop has be existed (break, return) early.
2187
* This is NOT called when resolving this source by its owner.
2188
*/
2189
constructor(onReturn?: () => Promise<void> | void) {
2190
this._asyncIterable = new AsyncIterableObject(emitter => {
2191
2192
if (earlyError) {
2193
emitter.reject(earlyError);
2194
return;
2195
}
2196
if (earlyItems) {
2197
emitter.emitMany(earlyItems);
2198
}
2199
this._errorFn = (error: Error) => emitter.reject(error);
2200
this._emitOneFn = (item: T) => emitter.emitOne(item);
2201
this._emitManyFn = (items: T[]) => emitter.emitMany(items);
2202
return this._deferred.p;
2203
}, onReturn);
2204
2205
let earlyError: Error | undefined;
2206
let earlyItems: T[] | undefined;
2207
2208
2209
this._errorFn = (error: Error) => {
2210
if (!earlyError) {
2211
earlyError = error;
2212
}
2213
};
2214
this._emitOneFn = (item: T) => {
2215
if (!earlyItems) {
2216
earlyItems = [];
2217
}
2218
earlyItems.push(item);
2219
};
2220
this._emitManyFn = (items: T[]) => {
2221
if (!earlyItems) {
2222
earlyItems = items.slice();
2223
} else {
2224
items.forEach(item => earlyItems!.push(item));
2225
}
2226
};
2227
}
2228
2229
get asyncIterable(): AsyncIterableObject<T> {
2230
return this._asyncIterable;
2231
}
2232
2233
resolve(): void {
2234
this._deferred.complete();
2235
}
2236
2237
reject(error: Error): void {
2238
this._errorFn(error);
2239
this._deferred.complete();
2240
}
2241
2242
emitOne(item: T): void {
2243
this._emitOneFn(item);
2244
}
2245
2246
emitMany(items: T[]) {
2247
this._emitManyFn(items);
2248
}
2249
}
2250
2251
export function cancellableIterable<T>(iterableOrIterator: AsyncIterator<T> | AsyncIterable<T>, token: CancellationToken): AsyncIterableIterator<T> {
2252
const iterator = Symbol.asyncIterator in iterableOrIterator ? iterableOrIterator[Symbol.asyncIterator]() : iterableOrIterator;
2253
2254
return {
2255
async next(): Promise<IteratorResult<T>> {
2256
if (token.isCancellationRequested) {
2257
return { done: true, value: undefined };
2258
}
2259
const result = await raceCancellation(iterator.next(), token);
2260
return result || { done: true, value: undefined };
2261
},
2262
throw: iterator.throw?.bind(iterator),
2263
return: iterator.return?.bind(iterator),
2264
[Symbol.asyncIterator]() {
2265
return this;
2266
}
2267
};
2268
}
2269
2270
type ProducerConsumerValue<T> = {
2271
ok: true;
2272
value: T;
2273
} | {
2274
ok: false;
2275
error: Error;
2276
};
2277
2278
class ProducerConsumer<T> {
2279
private readonly _unsatisfiedConsumers: DeferredPromise<T>[] = [];
2280
private readonly _unconsumedValues: ProducerConsumerValue<T>[] = [];
2281
private _finalValue: ProducerConsumerValue<T> | undefined;
2282
2283
public get hasFinalValue(): boolean {
2284
return !!this._finalValue;
2285
}
2286
2287
produce(value: ProducerConsumerValue<T>): void {
2288
this._ensureNoFinalValue();
2289
if (this._unsatisfiedConsumers.length > 0) {
2290
const deferred = this._unsatisfiedConsumers.shift()!;
2291
this._resolveOrRejectDeferred(deferred, value);
2292
} else {
2293
this._unconsumedValues.push(value);
2294
}
2295
}
2296
2297
produceFinal(value: ProducerConsumerValue<T>): void {
2298
this._ensureNoFinalValue();
2299
this._finalValue = value;
2300
for (const deferred of this._unsatisfiedConsumers) {
2301
this._resolveOrRejectDeferred(deferred, value);
2302
}
2303
this._unsatisfiedConsumers.length = 0;
2304
}
2305
2306
private _ensureNoFinalValue(): void {
2307
if (this._finalValue) {
2308
throw new BugIndicatingError('ProducerConsumer: cannot produce after final value has been set');
2309
}
2310
}
2311
2312
private _resolveOrRejectDeferred(deferred: DeferredPromise<T>, value: ProducerConsumerValue<T>): void {
2313
if (value.ok) {
2314
deferred.complete(value.value);
2315
} else {
2316
deferred.error(value.error);
2317
}
2318
}
2319
2320
consume(): Promise<T> {
2321
if (this._unconsumedValues.length > 0 || this._finalValue) {
2322
const value = this._unconsumedValues.length > 0 ? this._unconsumedValues.shift()! : this._finalValue!;
2323
if (value.ok) {
2324
return Promise.resolve(value.value);
2325
} else {
2326
return Promise.reject(value.error);
2327
}
2328
} else {
2329
const deferred = new DeferredPromise<T>();
2330
this._unsatisfiedConsumers.push(deferred);
2331
return deferred.p;
2332
}
2333
}
2334
}
2335
2336
/**
2337
* Important difference to AsyncIterableObject:
2338
* If it is iterated two times, the second iterator will not see the values emitted by the first iterator.
2339
*/
2340
export class AsyncIterableProducer<T> implements AsyncIterable<T> {
2341
private readonly _producerConsumer = new ProducerConsumer<IteratorResult<T>>();
2342
2343
constructor(executor: AsyncIterableExecutor<T>, private readonly _onReturn?: () => void) {
2344
queueMicrotask(async () => {
2345
const p = executor({
2346
emitOne: value => this._producerConsumer.produce({ ok: true, value: { done: false, value: value } }),
2347
emitMany: values => {
2348
for (const value of values) {
2349
this._producerConsumer.produce({ ok: true, value: { done: false, value: value } });
2350
}
2351
},
2352
reject: error => this._finishError(error),
2353
});
2354
2355
if (!this._producerConsumer.hasFinalValue) {
2356
try {
2357
await p;
2358
this._finishOk();
2359
} catch (error) {
2360
this._finishError(error);
2361
}
2362
}
2363
});
2364
}
2365
2366
public static fromArray<T>(items: T[]): AsyncIterableProducer<T> {
2367
return new AsyncIterableProducer<T>((writer) => {
2368
writer.emitMany(items);
2369
});
2370
}
2371
2372
public static fromPromise<T>(promise: Promise<T[]>): AsyncIterableProducer<T> {
2373
return new AsyncIterableProducer<T>(async (emitter) => {
2374
emitter.emitMany(await promise);
2375
});
2376
}
2377
2378
public static fromPromisesResolveOrder<T>(promises: Promise<T>[]): AsyncIterableProducer<T> {
2379
return new AsyncIterableProducer<T>(async (emitter) => {
2380
await Promise.all(promises.map(async (p) => emitter.emitOne(await p)));
2381
});
2382
}
2383
2384
public static merge<T>(iterables: AsyncIterable<T>[]): AsyncIterableProducer<T> {
2385
return new AsyncIterableProducer(async (emitter) => {
2386
await Promise.all(iterables.map(async (iterable) => {
2387
for await (const item of iterable) {
2388
emitter.emitOne(item);
2389
}
2390
}));
2391
});
2392
}
2393
2394
public static EMPTY = AsyncIterableProducer.fromArray<any>([]);
2395
2396
public static map<T, R>(iterable: AsyncIterable<T>, mapFn: (item: T) => R): AsyncIterableProducer<R> {
2397
return new AsyncIterableProducer<R>(async (emitter) => {
2398
for await (const item of iterable) {
2399
emitter.emitOne(mapFn(item));
2400
}
2401
});
2402
}
2403
2404
public map<R>(mapFn: (item: T) => R): AsyncIterableProducer<R> {
2405
return AsyncIterableProducer.map(this, mapFn);
2406
}
2407
2408
public static coalesce<T>(iterable: AsyncIterable<T | undefined | null>): AsyncIterableProducer<T> {
2409
return <AsyncIterableProducer<T>>AsyncIterableProducer.filter(iterable, item => !!item);
2410
}
2411
2412
public coalesce(): AsyncIterableProducer<NonNullable<T>> {
2413
return AsyncIterableProducer.coalesce(this) as AsyncIterableProducer<NonNullable<T>>;
2414
}
2415
2416
public static filter<T>(iterable: AsyncIterable<T>, filterFn: (item: T) => boolean): AsyncIterableProducer<T> {
2417
return new AsyncIterableProducer<T>(async (emitter) => {
2418
for await (const item of iterable) {
2419
if (filterFn(item)) {
2420
emitter.emitOne(item);
2421
}
2422
}
2423
});
2424
}
2425
2426
public filter<T2 extends T>(filterFn: (item: T) => item is T2): AsyncIterableProducer<T2>;
2427
public filter(filterFn: (item: T) => boolean): AsyncIterableProducer<T>;
2428
public filter(filterFn: (item: T) => boolean): AsyncIterableProducer<T> {
2429
return AsyncIterableProducer.filter(this, filterFn);
2430
}
2431
2432
private _finishOk(): void {
2433
if (!this._producerConsumer.hasFinalValue) {
2434
this._producerConsumer.produceFinal({ ok: true, value: { done: true, value: undefined } });
2435
}
2436
}
2437
2438
private _finishError(error: Error): void {
2439
if (!this._producerConsumer.hasFinalValue) {
2440
this._producerConsumer.produceFinal({ ok: false, error: error });
2441
}
2442
// Warning: this can cause to dropped errors.
2443
}
2444
2445
private readonly _iterator: AsyncIterator<T, void, void> = {
2446
next: () => this._producerConsumer.consume(),
2447
return: () => {
2448
this._onReturn?.();
2449
return Promise.resolve({ done: true, value: undefined });
2450
},
2451
throw: async (e) => {
2452
this._finishError(e);
2453
return { done: true, value: undefined };
2454
},
2455
};
2456
2457
[Symbol.asyncIterator](): AsyncIterator<T, void, void> {
2458
return this._iterator;
2459
}
2460
}
2461
2462
export class CancelableAsyncIterableProducer<T> extends AsyncIterableProducer<T> {
2463
constructor(
2464
private readonly _source: CancellationTokenSource,
2465
executor: AsyncIterableExecutor<T>
2466
) {
2467
super(executor);
2468
}
2469
2470
cancel(): void {
2471
this._source.cancel();
2472
}
2473
}
2474
2475
//#endregion
2476
2477
export const AsyncReaderEndOfStream = Symbol('AsyncReaderEndOfStream');
2478
2479
export class AsyncReader<T> {
2480
private _buffer: T[] = [];
2481
private _atEnd = false;
2482
2483
public get endOfStream(): boolean { return this._buffer.length === 0 && this._atEnd; }
2484
private _extendBufferPromise: Promise<void> | undefined;
2485
2486
constructor(
2487
private readonly _source: AsyncIterator<T>
2488
) {
2489
}
2490
2491
public async read(): Promise<T | typeof AsyncReaderEndOfStream> {
2492
if (this._buffer.length === 0 && !this._atEnd) {
2493
await this._extendBuffer();
2494
}
2495
if (this._buffer.length === 0) {
2496
return AsyncReaderEndOfStream;
2497
}
2498
return this._buffer.shift()!;
2499
}
2500
2501
public async readWhile(predicate: (value: T) => boolean, callback: (element: T) => unknown): Promise<void> {
2502
do {
2503
const piece = await this.peek();
2504
if (piece === AsyncReaderEndOfStream) {
2505
break;
2506
}
2507
if (!predicate(piece)) {
2508
break;
2509
}
2510
await this.read(); // consume
2511
await callback(piece);
2512
} while (true);
2513
}
2514
2515
public readBufferedOrThrow(): T | typeof AsyncReaderEndOfStream {
2516
const value = this.peekBufferedOrThrow();
2517
this._buffer.shift();
2518
return value;
2519
}
2520
2521
public async consumeToEnd(): Promise<void> {
2522
while (!this.endOfStream) {
2523
await this.read();
2524
}
2525
}
2526
2527
public async peek(): Promise<T | typeof AsyncReaderEndOfStream> {
2528
if (this._buffer.length === 0 && !this._atEnd) {
2529
await this._extendBuffer();
2530
}
2531
if (this._buffer.length === 0) {
2532
return AsyncReaderEndOfStream;
2533
}
2534
return this._buffer[0];
2535
}
2536
2537
public peekBufferedOrThrow(): T | typeof AsyncReaderEndOfStream {
2538
if (this._buffer.length === 0) {
2539
if (this._atEnd) {
2540
return AsyncReaderEndOfStream;
2541
}
2542
throw new BugIndicatingError('No buffered elements');
2543
}
2544
2545
return this._buffer[0];
2546
}
2547
2548
public async peekTimeout(timeoutMs: number): Promise<T | typeof AsyncReaderEndOfStream | undefined> {
2549
if (this._buffer.length === 0 && !this._atEnd) {
2550
await raceTimeout(this._extendBuffer(), timeoutMs);
2551
}
2552
if (this._atEnd) {
2553
return AsyncReaderEndOfStream;
2554
}
2555
if (this._buffer.length === 0) {
2556
return undefined;
2557
}
2558
return this._buffer[0];
2559
}
2560
2561
private _extendBuffer(): Promise<void> {
2562
if (this._atEnd) {
2563
return Promise.resolve();
2564
}
2565
2566
if (!this._extendBufferPromise) {
2567
this._extendBufferPromise = (async () => {
2568
const { value, done } = await this._source.next();
2569
this._extendBufferPromise = undefined;
2570
if (done) {
2571
this._atEnd = true;
2572
} else {
2573
this._buffer.push(value);
2574
}
2575
})();
2576
}
2577
2578
return this._extendBufferPromise;
2579
}
2580
}
2581
2582