Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/extensions/copilot/src/util/vs/base/common/async.ts
13405 views
1
//!!! DO NOT modify, this file was COPIED from 'microsoft/vscode'
2
3
/*---------------------------------------------------------------------------------------------
4
* Copyright (c) Microsoft Corporation. All rights reserved.
5
* Licensed under the MIT License. See License.txt in the project root for license information.
6
*--------------------------------------------------------------------------------------------*/
7
8
import { CancellationToken, CancellationTokenSource } from './cancellation';
9
import { BugIndicatingError, CancellationError } from './errors';
10
import { Emitter, Event } from './event';
11
import { Disposable, DisposableMap, DisposableStore, IDisposable, isDisposable, MutableDisposable, toDisposable } from './lifecycle';
12
import { extUri as defaultExtUri, IExtUri } from './resources';
13
import { URI } from './uri';
14
import { setTimeout0 } from './platform';
15
import { MicrotaskDelay } from './symbols';
16
import { Lazy } from './lazy';
17
18
export function isThenable<T>(obj: unknown): obj is Promise<T> {
19
return !!obj && typeof (obj as unknown as Promise<T>).then === 'function';
20
}
21
22
export interface CancelablePromise<T> extends Promise<T> {
23
cancel(): void;
24
}
25
26
/**
27
* Returns a promise that can be cancelled using the provided cancellation token.
28
*
29
* @remarks When cancellation is requested, the promise will be rejected with a {@link CancellationError}.
30
* If the promise resolves to a disposable object, it will be automatically disposed when cancellation
31
* is requested.
32
*
33
* @param callback A function that accepts a cancellation token and returns a promise
34
* @returns A promise that can be cancelled
35
*/
36
export function createCancelablePromise<T>(callback: (token: CancellationToken) => Promise<T>): CancelablePromise<T> {
37
const source = new CancellationTokenSource();
38
39
const thenable = callback(source.token);
40
41
let isCancelled = false;
42
43
const promise = new Promise<T>((resolve, reject) => {
44
const subscription = source.token.onCancellationRequested(() => {
45
isCancelled = true;
46
subscription.dispose();
47
reject(new CancellationError());
48
});
49
Promise.resolve(thenable).then(value => {
50
subscription.dispose();
51
source.dispose();
52
53
if (!isCancelled) {
54
resolve(value);
55
56
} else if (isDisposable(value)) {
57
// promise has been cancelled, result is disposable and will
58
// be cleaned up
59
value.dispose();
60
}
61
}, err => {
62
subscription.dispose();
63
source.dispose();
64
reject(err);
65
});
66
});
67
68
return <CancelablePromise<T>>new class {
69
cancel() {
70
source.cancel();
71
source.dispose();
72
}
73
then<TResult1 = T, TResult2 = never>(resolve?: ((value: T) => TResult1 | Promise<TResult1>) | undefined | null, reject?: ((reason: unknown) => TResult2 | Promise<TResult2>) | undefined | null): Promise<TResult1 | TResult2> {
74
return promise.then(resolve, reject);
75
}
76
catch<TResult = never>(reject?: ((reason: unknown) => TResult | Promise<TResult>) | undefined | null): Promise<T | TResult> {
77
return this.then(undefined, reject);
78
}
79
finally(onfinally?: (() => void) | undefined | null): Promise<T> {
80
return promise.finally(onfinally);
81
}
82
};
83
}
84
85
/**
86
* Returns a promise that resolves with `undefined` as soon as the passed token is cancelled.
87
* @see {@link raceCancellationError}
88
*/
89
export function raceCancellation<T>(promise: Promise<T>, token: CancellationToken): Promise<T | undefined>;
90
91
/**
92
* Returns a promise that resolves with `defaultValue` as soon as the passed token is cancelled.
93
* @see {@link raceCancellationError}
94
*/
95
export function raceCancellation<T>(promise: Promise<T>, token: CancellationToken, defaultValue: T): Promise<T>;
96
97
export function raceCancellation<T>(promise: Promise<T>, token: CancellationToken, defaultValue?: T): Promise<T | undefined> {
98
return new Promise((resolve, reject) => {
99
const ref = token.onCancellationRequested(() => {
100
ref.dispose();
101
resolve(defaultValue);
102
});
103
promise.then(resolve, reject).finally(() => ref.dispose());
104
});
105
}
106
107
/**
108
* Returns a promise that rejects with an {@CancellationError} as soon as the passed token is cancelled.
109
* @see {@link raceCancellation}
110
*/
111
export function raceCancellationError<T>(promise: Promise<T>, token: CancellationToken): Promise<T> {
112
return new Promise((resolve, reject) => {
113
const ref = token.onCancellationRequested(() => {
114
ref.dispose();
115
reject(new CancellationError());
116
});
117
promise.then(resolve, reject).finally(() => ref.dispose());
118
});
119
}
120
121
/**
122
* Wraps a cancellable promise such that it is no cancellable. Can be used to
123
* avoid issues with shared promises that would normally be returned as
124
* cancellable to consumers.
125
*/
126
export function notCancellablePromise<T>(promise: CancelablePromise<T>): Promise<T> {
127
return new Promise<T>((resolve, reject) => {
128
promise.then(resolve, reject);
129
});
130
}
131
132
/**
133
* Returns as soon as one of the promises resolves or rejects and cancels remaining promises
134
*/
135
export function raceCancellablePromises<T>(cancellablePromises: (CancelablePromise<T> | Promise<T>)[]): CancelablePromise<T> {
136
let resolvedPromiseIndex = -1;
137
const promises = cancellablePromises.map((promise, index) => promise.then(result => { resolvedPromiseIndex = index; return result; }));
138
const promise = Promise.race(promises) as CancelablePromise<T>;
139
promise.cancel = () => {
140
cancellablePromises.forEach((cancellablePromise, index) => {
141
if (index !== resolvedPromiseIndex && (cancellablePromise as CancelablePromise<T>).cancel) {
142
(cancellablePromise as CancelablePromise<T>).cancel();
143
}
144
});
145
};
146
promise.finally(() => {
147
promise.cancel();
148
});
149
return promise;
150
}
151
152
export function raceTimeout<T>(promise: Promise<T>, timeout: number, onTimeout?: () => void): Promise<T | undefined> {
153
let promiseResolve: ((value: T | undefined) => void) | undefined = undefined;
154
155
const timer = setTimeout(() => {
156
promiseResolve?.(undefined);
157
onTimeout?.();
158
}, timeout);
159
160
return Promise.race([
161
promise.finally(() => clearTimeout(timer)),
162
new Promise<T | undefined>(resolve => promiseResolve = resolve)
163
]);
164
}
165
166
export function asPromise<T>(callback: () => T | Thenable<T>): Promise<T> {
167
return new Promise<T>((resolve, reject) => {
168
const item = callback();
169
if (isThenable<T>(item)) {
170
item.then(resolve, reject);
171
} else {
172
resolve(item);
173
}
174
});
175
}
176
177
/**
178
* Creates and returns a new promise, plus its `resolve` and `reject` callbacks.
179
*
180
* Replace with standardized [`Promise.withResolvers`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/withResolvers) once it is supported
181
*/
182
export function promiseWithResolvers<T>(): { promise: Promise<T>; resolve: (value: T | PromiseLike<T>) => void; reject: (err?: any) => void } {
183
let resolve: (value: T | PromiseLike<T>) => void;
184
let reject: (reason?: any) => void;
185
const promise = new Promise<T>((res, rej) => {
186
resolve = res;
187
reject = rej;
188
});
189
return { promise, resolve: resolve!, reject: reject! };
190
}
191
192
export interface ITask<T> {
193
(): T;
194
}
195
196
export interface ICancellableTask<T> {
197
(token: CancellationToken): T;
198
}
199
200
/**
201
* A helper to prevent accumulation of sequential async tasks.
202
*
203
* Imagine a mail man with the sole task of delivering letters. As soon as
204
* a letter submitted for delivery, he drives to the destination, delivers it
205
* and returns to his base. Imagine that during the trip, N more letters were submitted.
206
* When the mail man returns, he picks those N letters and delivers them all in a
207
* single trip. Even though N+1 submissions occurred, only 2 deliveries were made.
208
*
209
* The throttler implements this via the queue() method, by providing it a task
210
* factory. Following the example:
211
*
212
* const throttler = new Throttler();
213
* const letters = [];
214
*
215
* function deliver() {
216
* const lettersToDeliver = letters;
217
* letters = [];
218
* return makeTheTrip(lettersToDeliver);
219
* }
220
*
221
* function onLetterReceived(l) {
222
* letters.push(l);
223
* throttler.queue(deliver);
224
* }
225
*/
226
export class Throttler implements IDisposable {
227
228
private activePromise: Promise<any> | null;
229
private queuedPromise: Promise<any> | null;
230
private queuedPromiseFactory: ICancellableTask<Promise<any>> | null;
231
private cancellationTokenSource: CancellationTokenSource;
232
233
constructor() {
234
this.activePromise = null;
235
this.queuedPromise = null;
236
this.queuedPromiseFactory = null;
237
238
this.cancellationTokenSource = new CancellationTokenSource();
239
}
240
241
queue<T>(promiseFactory: ICancellableTask<Promise<T>>): Promise<T> {
242
if (this.cancellationTokenSource.token.isCancellationRequested) {
243
return Promise.reject(new Error('Throttler is disposed'));
244
}
245
246
if (this.activePromise) {
247
this.queuedPromiseFactory = promiseFactory;
248
249
if (!this.queuedPromise) {
250
const onComplete = () => {
251
this.queuedPromise = null;
252
253
if (this.cancellationTokenSource.token.isCancellationRequested) {
254
return;
255
}
256
257
const result = this.queue(this.queuedPromiseFactory!);
258
this.queuedPromiseFactory = null;
259
260
return result;
261
};
262
263
this.queuedPromise = new Promise(resolve => {
264
this.activePromise!.then(onComplete, onComplete).then(resolve);
265
});
266
}
267
268
return new Promise((resolve, reject) => {
269
this.queuedPromise!.then(resolve, reject);
270
});
271
}
272
273
this.activePromise = promiseFactory(this.cancellationTokenSource.token);
274
275
return new Promise((resolve, reject) => {
276
this.activePromise!.then((result: T) => {
277
this.activePromise = null;
278
resolve(result);
279
}, (err: unknown) => {
280
this.activePromise = null;
281
reject(err);
282
});
283
});
284
}
285
286
dispose(): void {
287
this.cancellationTokenSource.cancel();
288
}
289
}
290
291
export class Sequencer {
292
293
private current: Promise<unknown> = Promise.resolve(null);
294
295
queue<T>(promiseTask: ITask<Promise<T>>): Promise<T> {
296
return this.current = this.current.then(() => promiseTask(), () => promiseTask());
297
}
298
}
299
300
export class SequencerByKey<TKey> {
301
302
private promiseMap = new Map<TKey, Promise<unknown>>();
303
304
queue<T>(key: TKey, promiseTask: ITask<Promise<T>>): Promise<T> {
305
const runningPromise = this.promiseMap.get(key) ?? Promise.resolve();
306
const newPromise = runningPromise
307
.catch(() => { })
308
.then(promiseTask)
309
.finally(() => {
310
if (this.promiseMap.get(key) === newPromise) {
311
this.promiseMap.delete(key);
312
}
313
});
314
this.promiseMap.set(key, newPromise);
315
return newPromise;
316
}
317
318
peek(key: TKey): Promise<unknown> | undefined {
319
return this.promiseMap.get(key) || undefined;
320
}
321
322
keys(): IterableIterator<TKey> {
323
return this.promiseMap.keys();
324
}
325
}
326
327
interface IScheduledLater extends IDisposable {
328
isTriggered(): boolean;
329
}
330
331
const timeoutDeferred = (timeout: number, fn: () => void): IScheduledLater => {
332
let scheduled = true;
333
const handle = setTimeout(() => {
334
scheduled = false;
335
fn();
336
}, timeout);
337
return {
338
isTriggered: () => scheduled,
339
dispose: () => {
340
clearTimeout(handle);
341
scheduled = false;
342
},
343
};
344
};
345
346
const microtaskDeferred = (fn: () => void): IScheduledLater => {
347
let scheduled = true;
348
queueMicrotask(() => {
349
if (scheduled) {
350
scheduled = false;
351
fn();
352
}
353
});
354
355
return {
356
isTriggered: () => scheduled,
357
dispose: () => { scheduled = false; },
358
};
359
};
360
361
/**
362
* A helper to delay (debounce) execution of a task that is being requested often.
363
*
364
* Following the throttler, now imagine the mail man wants to optimize the number of
365
* trips proactively. The trip itself can be long, so he decides not to make the trip
366
* as soon as a letter is submitted. Instead he waits a while, in case more
367
* letters are submitted. After said waiting period, if no letters were submitted, he
368
* decides to make the trip. Imagine that N more letters were submitted after the first
369
* one, all within a short period of time between each other. Even though N+1
370
* submissions occurred, only 1 delivery was made.
371
*
372
* The delayer offers this behavior via the trigger() method, into which both the task
373
* to be executed and the waiting period (delay) must be passed in as arguments. Following
374
* the example:
375
*
376
* const delayer = new Delayer(WAITING_PERIOD);
377
* const letters = [];
378
*
379
* function letterReceived(l) {
380
* letters.push(l);
381
* delayer.trigger(() => { return makeTheTrip(); });
382
* }
383
*/
384
export class Delayer<T> implements IDisposable {
385
386
private deferred: IScheduledLater | null;
387
private completionPromise: Promise<any> | null;
388
private doResolve: ((value?: any | Promise<any>) => void) | null;
389
private doReject: ((err: unknown) => void) | null;
390
private task: ITask<T | Promise<T>> | null;
391
392
constructor(public defaultDelay: number | typeof MicrotaskDelay) {
393
this.deferred = null;
394
this.completionPromise = null;
395
this.doResolve = null;
396
this.doReject = null;
397
this.task = null;
398
}
399
400
trigger(task: ITask<T | Promise<T>>, delay = this.defaultDelay): Promise<T> {
401
this.task = task;
402
this.cancelTimeout();
403
404
if (!this.completionPromise) {
405
this.completionPromise = new Promise((resolve, reject) => {
406
this.doResolve = resolve;
407
this.doReject = reject;
408
}).then(() => {
409
this.completionPromise = null;
410
this.doResolve = null;
411
if (this.task) {
412
const task = this.task;
413
this.task = null;
414
return task();
415
}
416
return undefined;
417
});
418
}
419
420
const fn = () => {
421
this.deferred = null;
422
this.doResolve?.(null);
423
};
424
425
this.deferred = delay === MicrotaskDelay ? microtaskDeferred(fn) : timeoutDeferred(delay, fn);
426
427
return this.completionPromise;
428
}
429
430
isTriggered(): boolean {
431
return !!this.deferred?.isTriggered();
432
}
433
434
cancel(): void {
435
this.cancelTimeout();
436
437
if (this.completionPromise) {
438
this.doReject?.(new CancellationError());
439
this.completionPromise = null;
440
}
441
}
442
443
private cancelTimeout(): void {
444
this.deferred?.dispose();
445
this.deferred = null;
446
}
447
448
dispose(): void {
449
this.cancel();
450
}
451
}
452
453
/**
454
* A helper to delay execution of a task that is being requested often, while
455
* preventing accumulation of consecutive executions, while the task runs.
456
*
457
* The mail man is clever and waits for a certain amount of time, before going
458
* out to deliver letters. While the mail man is going out, more letters arrive
459
* and can only be delivered once he is back. Once he is back the mail man will
460
* do one more trip to deliver the letters that have accumulated while he was out.
461
*/
462
export class ThrottledDelayer<T> {
463
464
private delayer: Delayer<Promise<T>>;
465
private throttler: Throttler;
466
467
constructor(defaultDelay: number) {
468
this.delayer = new Delayer(defaultDelay);
469
this.throttler = new Throttler();
470
}
471
472
trigger(promiseFactory: ICancellableTask<Promise<T>>, delay?: number): Promise<T> {
473
return this.delayer.trigger(() => this.throttler.queue(promiseFactory), delay) as unknown as Promise<T>;
474
}
475
476
isTriggered(): boolean {
477
return this.delayer.isTriggered();
478
}
479
480
cancel(): void {
481
this.delayer.cancel();
482
}
483
484
dispose(): void {
485
this.delayer.dispose();
486
this.throttler.dispose();
487
}
488
}
489
490
/**
491
* A barrier that is initially closed and then becomes opened permanently.
492
*/
493
export class Barrier {
494
private _isOpen: boolean;
495
private _promise: Promise<boolean>;
496
private _completePromise!: (v: boolean) => void;
497
498
constructor() {
499
this._isOpen = false;
500
this._promise = new Promise<boolean>((c, e) => {
501
this._completePromise = c;
502
});
503
}
504
505
isOpen(): boolean {
506
return this._isOpen;
507
}
508
509
open(): void {
510
this._isOpen = true;
511
this._completePromise(true);
512
}
513
514
wait(): Promise<boolean> {
515
return this._promise;
516
}
517
}
518
519
/**
520
* A barrier that is initially closed and then becomes opened permanently after a certain period of
521
* time or when open is called explicitly
522
*/
523
export class AutoOpenBarrier extends Barrier {
524
525
private readonly _timeout: Timeout;
526
527
constructor(autoOpenTimeMs: number) {
528
super();
529
this._timeout = setTimeout(() => this.open(), autoOpenTimeMs);
530
}
531
532
override open(): void {
533
clearTimeout(this._timeout);
534
super.open();
535
}
536
}
537
538
export function timeout(millis: number): CancelablePromise<void>;
539
export function timeout(millis: number, token: CancellationToken): Promise<void>;
540
export function timeout(millis: number, token?: CancellationToken): CancelablePromise<void> | Promise<void> {
541
if (!token) {
542
return createCancelablePromise(token => timeout(millis, token));
543
}
544
545
return new Promise((resolve, reject) => {
546
const handle = setTimeout(() => {
547
disposable.dispose();
548
resolve();
549
}, millis);
550
const disposable = token.onCancellationRequested(() => {
551
clearTimeout(handle);
552
disposable.dispose();
553
reject(new CancellationError());
554
});
555
});
556
}
557
558
/**
559
* Creates a timeout that can be disposed using its returned value.
560
* @param handler The timeout handler.
561
* @param timeout An optional timeout in milliseconds.
562
* @param store An optional {@link DisposableStore} that will have the timeout disposable managed automatically.
563
*
564
* @example
565
* const store = new DisposableStore;
566
* // Call the timeout after 1000ms at which point it will be automatically
567
* // evicted from the store.
568
* const timeoutDisposable = disposableTimeout(() => {}, 1000, store);
569
*
570
* if (foo) {
571
* // Cancel the timeout and evict it from store.
572
* timeoutDisposable.dispose();
573
* }
574
*/
575
export function disposableTimeout(handler: () => void, timeout = 0, store?: DisposableStore): IDisposable {
576
const timer = setTimeout(() => {
577
handler();
578
if (store) {
579
disposable.dispose();
580
}
581
}, timeout);
582
const disposable = toDisposable(() => {
583
clearTimeout(timer);
584
store?.delete(disposable);
585
});
586
store?.add(disposable);
587
return disposable;
588
}
589
590
/**
591
* Runs the provided list of promise factories in sequential order. The returned
592
* promise will complete to an array of results from each promise.
593
*/
594
595
export function sequence<T>(promiseFactories: ITask<Promise<T>>[]): Promise<T[]> {
596
const results: T[] = [];
597
let index = 0;
598
const len = promiseFactories.length;
599
600
function next(): Promise<T> | null {
601
return index < len ? promiseFactories[index++]() : null;
602
}
603
604
function thenHandler(result: unknown): Promise<any> {
605
if (result !== undefined && result !== null) {
606
results.push(result as T);
607
}
608
609
const n = next();
610
if (n) {
611
return n.then(thenHandler);
612
}
613
614
return Promise.resolve(results);
615
}
616
617
return Promise.resolve(null).then(thenHandler);
618
}
619
620
export function first<T>(promiseFactories: ITask<Promise<T>>[], shouldStop: (t: T) => boolean = t => !!t, defaultValue: T | null = null): Promise<T | null> {
621
let index = 0;
622
const len = promiseFactories.length;
623
624
const loop: () => Promise<T | null> = () => {
625
if (index >= len) {
626
return Promise.resolve(defaultValue);
627
}
628
629
const factory = promiseFactories[index++];
630
const promise = Promise.resolve(factory());
631
632
return promise.then(result => {
633
if (shouldStop(result)) {
634
return Promise.resolve(result);
635
}
636
637
return loop();
638
});
639
};
640
641
return loop();
642
}
643
644
/**
645
* Returns the result of the first promise that matches the "shouldStop",
646
* running all promises in parallel. Supports cancelable promises.
647
*/
648
export function firstParallel<T>(promiseList: Promise<T>[], shouldStop?: (t: T) => boolean, defaultValue?: T | null): Promise<T | null>;
649
export function firstParallel<T, R extends T>(promiseList: Promise<T>[], shouldStop: (t: T) => t is R, defaultValue?: R | null): Promise<R | null>;
650
export function firstParallel<T>(promiseList: Promise<T>[], shouldStop: (t: T) => boolean = t => !!t, defaultValue: T | null = null) {
651
if (promiseList.length === 0) {
652
return Promise.resolve(defaultValue);
653
}
654
655
let todo = promiseList.length;
656
const finish = () => {
657
todo = -1;
658
for (const promise of promiseList) {
659
(promise as Partial<CancelablePromise<T>>).cancel?.();
660
}
661
};
662
663
return new Promise<T | null>((resolve, reject) => {
664
for (const promise of promiseList) {
665
promise.then(result => {
666
if (--todo >= 0 && shouldStop(result)) {
667
finish();
668
resolve(result);
669
} else if (todo === 0) {
670
resolve(defaultValue);
671
}
672
})
673
.catch(err => {
674
if (--todo >= 0) {
675
finish();
676
reject(err);
677
}
678
});
679
}
680
});
681
}
682
683
interface ILimitedTaskFactory<T> {
684
factory: ITask<Promise<T>>;
685
c: (value: T | Promise<T>) => void;
686
e: (error?: unknown) => void;
687
}
688
689
export interface ILimiter<T> {
690
691
readonly size: number;
692
693
queue(factory: ITask<Promise<T>>): Promise<T>;
694
695
clear(): void;
696
}
697
698
/**
699
* A helper to queue N promises and run them all with a max degree of parallelism. The helper
700
* ensures that at any time no more than M promises are running at the same time.
701
*/
702
export class Limiter<T> implements ILimiter<T> {
703
704
private _size = 0;
705
private _isDisposed = false;
706
private runningPromises: number;
707
private readonly maxDegreeOfParalellism: number;
708
private readonly outstandingPromises: ILimitedTaskFactory<T>[];
709
private readonly _onDrained: Emitter<void>;
710
711
constructor(maxDegreeOfParalellism: number) {
712
this.maxDegreeOfParalellism = maxDegreeOfParalellism;
713
this.outstandingPromises = [];
714
this.runningPromises = 0;
715
this._onDrained = new Emitter<void>();
716
}
717
718
/**
719
*
720
* @returns A promise that resolved when all work is done (onDrained) or when
721
* there is nothing to do
722
*/
723
whenIdle(): Promise<void> {
724
return this.size > 0
725
? Event.toPromise(this.onDrained)
726
: Promise.resolve();
727
}
728
729
get onDrained(): Event<void> {
730
return this._onDrained.event;
731
}
732
733
get size(): number {
734
return this._size;
735
}
736
737
queue(factory: ITask<Promise<T>>): Promise<T> {
738
if (this._isDisposed) {
739
throw new Error('Object has been disposed');
740
}
741
this._size++;
742
743
return new Promise<T>((c, e) => {
744
this.outstandingPromises.push({ factory, c, e });
745
this.consume();
746
});
747
}
748
749
private consume(): void {
750
while (this.outstandingPromises.length && this.runningPromises < this.maxDegreeOfParalellism) {
751
const iLimitedTask = this.outstandingPromises.shift()!;
752
this.runningPromises++;
753
754
const promise = iLimitedTask.factory();
755
promise.then(iLimitedTask.c, iLimitedTask.e);
756
promise.then(() => this.consumed(), () => this.consumed());
757
}
758
}
759
760
private consumed(): void {
761
if (this._isDisposed) {
762
return;
763
}
764
this.runningPromises--;
765
if (--this._size === 0) {
766
this._onDrained.fire();
767
}
768
769
if (this.outstandingPromises.length > 0) {
770
this.consume();
771
}
772
}
773
774
clear(): void {
775
if (this._isDisposed) {
776
throw new Error('Object has been disposed');
777
}
778
this.outstandingPromises.length = 0;
779
this._size = this.runningPromises;
780
}
781
782
dispose(): void {
783
this._isDisposed = true;
784
this.outstandingPromises.length = 0; // stop further processing
785
this._size = 0;
786
this._onDrained.dispose();
787
}
788
}
789
790
/**
791
* A queue is handles one promise at a time and guarantees that at any time only one promise is executing.
792
*/
793
export class Queue<T> extends Limiter<T> {
794
795
constructor() {
796
super(1);
797
}
798
}
799
800
/**
801
* Same as `Queue`, ensures that only 1 task is executed at the same time. The difference to `Queue` is that
802
* there is only 1 task about to be scheduled next. As such, calling `queue` while a task is executing will
803
* replace the currently queued task until it executes.
804
*
805
* As such, the returned promise may not be from the factory that is passed in but from the next factory that
806
* is running after having called `queue`.
807
*/
808
export class LimitedQueue {
809
810
private readonly sequentializer = new TaskSequentializer();
811
812
private tasks = 0;
813
814
queue(factory: ITask<Promise<void>>): Promise<void> {
815
if (!this.sequentializer.isRunning()) {
816
return this.sequentializer.run(this.tasks++, factory());
817
}
818
819
return this.sequentializer.queue(() => {
820
return this.sequentializer.run(this.tasks++, factory());
821
});
822
}
823
}
824
825
/**
826
* A helper to organize queues per resource. The ResourceQueue makes sure to manage queues per resource
827
* by disposing them once the queue is empty.
828
*/
829
export class ResourceQueue implements IDisposable {
830
831
private readonly queues = new Map<string, Queue<void>>();
832
833
private readonly drainers = new Set<DeferredPromise<void>>();
834
835
private drainListeners: DisposableMap<number> | undefined = undefined;
836
private drainListenerCount = 0;
837
838
async whenDrained(): Promise<void> {
839
if (this.isDrained()) {
840
return;
841
}
842
843
const promise = new DeferredPromise<void>();
844
this.drainers.add(promise);
845
846
return promise.p;
847
}
848
849
private isDrained(): boolean {
850
for (const [, queue] of this.queues) {
851
if (queue.size > 0) {
852
return false;
853
}
854
}
855
856
return true;
857
}
858
859
queueSize(resource: URI, extUri: IExtUri = defaultExtUri): number {
860
const key = extUri.getComparisonKey(resource);
861
862
return this.queues.get(key)?.size ?? 0;
863
}
864
865
queueFor(resource: URI, factory: ITask<Promise<void>>, extUri: IExtUri = defaultExtUri): Promise<void> {
866
const key = extUri.getComparisonKey(resource);
867
868
let queue = this.queues.get(key);
869
if (!queue) {
870
queue = new Queue<void>();
871
const drainListenerId = this.drainListenerCount++;
872
const drainListener = Event.once(queue.onDrained)(() => {
873
queue?.dispose();
874
this.queues.delete(key);
875
this.onDidQueueDrain();
876
877
this.drainListeners?.deleteAndDispose(drainListenerId);
878
879
if (this.drainListeners?.size === 0) {
880
this.drainListeners.dispose();
881
this.drainListeners = undefined;
882
}
883
});
884
885
if (!this.drainListeners) {
886
this.drainListeners = new DisposableMap();
887
}
888
this.drainListeners.set(drainListenerId, drainListener);
889
890
this.queues.set(key, queue);
891
}
892
893
return queue.queue(factory);
894
}
895
896
private onDidQueueDrain(): void {
897
if (!this.isDrained()) {
898
return; // not done yet
899
}
900
901
this.releaseDrainers();
902
}
903
904
private releaseDrainers(): void {
905
for (const drainer of this.drainers) {
906
drainer.complete();
907
}
908
909
this.drainers.clear();
910
}
911
912
dispose(): void {
913
for (const [, queue] of this.queues) {
914
queue.dispose();
915
}
916
917
this.queues.clear();
918
919
// Even though we might still have pending
920
// tasks queued, after the queues have been
921
// disposed, we can no longer track them, so
922
// we release drainers to prevent hanging
923
// promises when the resource queue is being
924
// disposed.
925
this.releaseDrainers();
926
927
this.drainListeners?.dispose();
928
}
929
}
930
931
export type Task<T = void> = () => (Promise<T> | T);
932
933
/**
934
* Wrap a type in an optional promise. This can be useful to avoid the runtime
935
* overhead of creating a promise.
936
*/
937
export type MaybePromise<T> = Promise<T> | T;
938
939
/**
940
* Processes tasks in the order they were scheduled.
941
*/
942
export class TaskQueue {
943
private _runningTask: Task<any> | undefined = undefined;
944
private _pendingTasks: { task: Task<any>; deferred: DeferredPromise<any>; setUndefinedWhenCleared: boolean }[] = [];
945
946
/**
947
* Waits for the current and pending tasks to finish, then runs and awaits the given task.
948
* If the task is skipped because of clearPending, the promise is rejected with a CancellationError.
949
*/
950
public schedule<T>(task: Task<T>): Promise<T> {
951
const deferred = new DeferredPromise<T>();
952
this._pendingTasks.push({ task, deferred, setUndefinedWhenCleared: false });
953
this._runIfNotRunning();
954
return deferred.p;
955
}
956
957
/**
958
* Waits for the current and pending tasks to finish, then runs and awaits the given task.
959
* If the task is skipped because of clearPending, the promise is resolved with undefined.
960
*/
961
public scheduleSkipIfCleared<T>(task: Task<T>): Promise<T | undefined> {
962
const deferred = new DeferredPromise<T>();
963
this._pendingTasks.push({ task, deferred, setUndefinedWhenCleared: true });
964
this._runIfNotRunning();
965
return deferred.p;
966
}
967
968
private _runIfNotRunning(): void {
969
if (this._runningTask === undefined) {
970
this._processQueue();
971
}
972
}
973
974
private async _processQueue(): Promise<void> {
975
if (this._pendingTasks.length === 0) {
976
return;
977
}
978
979
const next = this._pendingTasks.shift();
980
if (!next) {
981
return;
982
}
983
984
if (this._runningTask) {
985
throw new BugIndicatingError();
986
}
987
988
this._runningTask = next.task;
989
990
try {
991
const result = await next.task();
992
next.deferred.complete(result);
993
} catch (e) {
994
next.deferred.error(e);
995
} finally {
996
this._runningTask = undefined;
997
this._processQueue();
998
}
999
}
1000
1001
/**
1002
* Clears all pending tasks. Does not cancel the currently running task.
1003
*/
1004
public clearPending(): void {
1005
const tasks = this._pendingTasks;
1006
this._pendingTasks = [];
1007
for (const task of tasks) {
1008
if (task.setUndefinedWhenCleared) {
1009
task.deferred.complete(undefined);
1010
} else {
1011
task.deferred.error(new CancellationError());
1012
}
1013
}
1014
}
1015
}
1016
1017
export class TimeoutTimer implements IDisposable {
1018
private _token: Timeout | undefined;
1019
private _isDisposed = false;
1020
1021
constructor();
1022
constructor(runner: () => void, timeout: number);
1023
constructor(runner?: () => void, timeout?: number) {
1024
this._token = undefined;
1025
1026
if (typeof runner === 'function' && typeof timeout === 'number') {
1027
this.setIfNotSet(runner, timeout);
1028
}
1029
}
1030
1031
dispose(): void {
1032
this.cancel();
1033
this._isDisposed = true;
1034
}
1035
1036
cancel(): void {
1037
if (this._token !== undefined) {
1038
clearTimeout(this._token);
1039
this._token = undefined;
1040
}
1041
}
1042
1043
cancelAndSet(runner: () => void, timeout: number): void {
1044
if (this._isDisposed) {
1045
throw new BugIndicatingError(`Calling 'cancelAndSet' on a disposed TimeoutTimer`);
1046
}
1047
1048
this.cancel();
1049
this._token = setTimeout(() => {
1050
this._token = undefined;
1051
runner();
1052
}, timeout);
1053
}
1054
1055
setIfNotSet(runner: () => void, timeout: number): void {
1056
if (this._isDisposed) {
1057
throw new BugIndicatingError(`Calling 'setIfNotSet' on a disposed TimeoutTimer`);
1058
}
1059
1060
if (this._token !== undefined) {
1061
// timer is already set
1062
return;
1063
}
1064
this._token = setTimeout(() => {
1065
this._token = undefined;
1066
runner();
1067
}, timeout);
1068
}
1069
}
1070
1071
export class IntervalTimer implements IDisposable {
1072
1073
private disposable: IDisposable | undefined = undefined;
1074
private isDisposed = false;
1075
1076
cancel(): void {
1077
this.disposable?.dispose();
1078
this.disposable = undefined;
1079
}
1080
1081
cancelAndSet(runner: () => void, interval: number, context = globalThis): void {
1082
if (this.isDisposed) {
1083
throw new BugIndicatingError(`Calling 'cancelAndSet' on a disposed IntervalTimer`);
1084
}
1085
1086
this.cancel();
1087
const handle = context.setInterval(() => {
1088
runner();
1089
}, interval);
1090
1091
this.disposable = toDisposable(() => {
1092
context.clearInterval(handle);
1093
this.disposable = undefined;
1094
});
1095
}
1096
1097
dispose(): void {
1098
this.cancel();
1099
this.isDisposed = true;
1100
}
1101
}
1102
1103
export class RunOnceScheduler implements IDisposable {
1104
1105
protected runner: ((...args: unknown[]) => void) | null;
1106
1107
private timeoutToken: Timeout | undefined;
1108
private timeout: number;
1109
private timeoutHandler: () => void;
1110
1111
constructor(runner: (...args: any[]) => void, delay: number) {
1112
this.timeoutToken = undefined;
1113
this.runner = runner;
1114
this.timeout = delay;
1115
this.timeoutHandler = this.onTimeout.bind(this);
1116
}
1117
1118
/**
1119
* Dispose RunOnceScheduler
1120
*/
1121
dispose(): void {
1122
this.cancel();
1123
this.runner = null;
1124
}
1125
1126
/**
1127
* Cancel current scheduled runner (if any).
1128
*/
1129
cancel(): void {
1130
if (this.isScheduled()) {
1131
clearTimeout(this.timeoutToken);
1132
this.timeoutToken = undefined;
1133
}
1134
}
1135
1136
/**
1137
* Cancel previous runner (if any) & schedule a new runner.
1138
*/
1139
schedule(delay = this.timeout): void {
1140
this.cancel();
1141
this.timeoutToken = setTimeout(this.timeoutHandler, delay);
1142
}
1143
1144
get delay(): number {
1145
return this.timeout;
1146
}
1147
1148
set delay(value: number) {
1149
this.timeout = value;
1150
}
1151
1152
/**
1153
* Returns true if scheduled.
1154
*/
1155
isScheduled(): boolean {
1156
return this.timeoutToken !== undefined;
1157
}
1158
1159
flush(): void {
1160
if (this.isScheduled()) {
1161
this.cancel();
1162
this.doRun();
1163
}
1164
}
1165
1166
private onTimeout() {
1167
this.timeoutToken = undefined;
1168
if (this.runner) {
1169
this.doRun();
1170
}
1171
}
1172
1173
protected doRun(): void {
1174
this.runner?.();
1175
}
1176
}
1177
1178
/**
1179
* Same as `RunOnceScheduler`, but doesn't count the time spent in sleep mode.
1180
* > **NOTE**: Only offers 1s resolution.
1181
*
1182
* When calling `setTimeout` with 3hrs, and putting the computer immediately to sleep
1183
* for 8hrs, `setTimeout` will fire **as soon as the computer wakes from sleep**. But
1184
* this scheduler will execute 3hrs **after waking the computer from sleep**.
1185
*/
1186
export class ProcessTimeRunOnceScheduler {
1187
1188
private runner: (() => void) | null;
1189
private timeout: number;
1190
1191
private counter: number;
1192
private intervalToken: Timeout | undefined;
1193
private intervalHandler: () => void;
1194
1195
constructor(runner: () => void, delay: number) {
1196
if (delay % 1000 !== 0) {
1197
console.warn(`ProcessTimeRunOnceScheduler resolution is 1s, ${delay}ms is not a multiple of 1000ms.`);
1198
}
1199
this.runner = runner;
1200
this.timeout = delay;
1201
this.counter = 0;
1202
this.intervalToken = undefined;
1203
this.intervalHandler = this.onInterval.bind(this);
1204
}
1205
1206
dispose(): void {
1207
this.cancel();
1208
this.runner = null;
1209
}
1210
1211
cancel(): void {
1212
if (this.isScheduled()) {
1213
clearInterval(this.intervalToken);
1214
this.intervalToken = undefined;
1215
}
1216
}
1217
1218
/**
1219
* Cancel previous runner (if any) & schedule a new runner.
1220
*/
1221
schedule(delay = this.timeout): void {
1222
if (delay % 1000 !== 0) {
1223
console.warn(`ProcessTimeRunOnceScheduler resolution is 1s, ${delay}ms is not a multiple of 1000ms.`);
1224
}
1225
this.cancel();
1226
this.counter = Math.ceil(delay / 1000);
1227
this.intervalToken = setInterval(this.intervalHandler, 1000);
1228
}
1229
1230
/**
1231
* Returns true if scheduled.
1232
*/
1233
isScheduled(): boolean {
1234
return this.intervalToken !== undefined;
1235
}
1236
1237
private onInterval() {
1238
this.counter--;
1239
if (this.counter > 0) {
1240
// still need to wait
1241
return;
1242
}
1243
1244
// time elapsed
1245
clearInterval(this.intervalToken);
1246
this.intervalToken = undefined;
1247
this.runner?.();
1248
}
1249
}
1250
1251
export class RunOnceWorker<T> extends RunOnceScheduler {
1252
1253
private units: T[] = [];
1254
1255
constructor(runner: (units: T[]) => void, timeout: number) {
1256
super(runner, timeout);
1257
}
1258
1259
work(unit: T): void {
1260
this.units.push(unit);
1261
1262
if (!this.isScheduled()) {
1263
this.schedule();
1264
}
1265
}
1266
1267
protected override doRun(): void {
1268
const units = this.units;
1269
this.units = [];
1270
1271
this.runner?.(units);
1272
}
1273
1274
override dispose(): void {
1275
this.units = [];
1276
1277
super.dispose();
1278
}
1279
}
1280
1281
export interface IThrottledWorkerOptions {
1282
1283
/**
1284
* maximum of units the worker will pass onto handler at once
1285
*/
1286
maxWorkChunkSize: number;
1287
1288
/**
1289
* maximum of units the worker will keep in memory for processing
1290
*/
1291
maxBufferedWork: number | undefined;
1292
1293
/**
1294
* delay before processing the next round of chunks when chunk size exceeds limits
1295
*/
1296
throttleDelay: number;
1297
1298
/**
1299
* When enabled will guarantee that two distinct calls to `work()` are not executed
1300
* without throttle delay between them.
1301
* Otherwise if the worker isn't currently throttling it will execute work immediately.
1302
*/
1303
waitThrottleDelayBetweenWorkUnits?: boolean;
1304
}
1305
1306
/**
1307
* The `ThrottledWorker` will accept units of work `T`
1308
* to handle. The contract is:
1309
* * there is a maximum of units the worker can handle at once (via `maxWorkChunkSize`)
1310
* * there is a maximum of units the worker will keep in memory for processing (via `maxBufferedWork`)
1311
* * after having handled `maxWorkChunkSize` units, the worker needs to rest (via `throttleDelay`)
1312
*/
1313
export class ThrottledWorker<T> extends Disposable {
1314
1315
private readonly pendingWork: T[] = [];
1316
1317
private readonly throttler = this._register(new MutableDisposable<RunOnceScheduler>());
1318
private disposed = false;
1319
private lastExecutionTime = 0;
1320
1321
constructor(
1322
private options: IThrottledWorkerOptions,
1323
private readonly handler: (units: T[]) => void
1324
) {
1325
super();
1326
}
1327
1328
/**
1329
* The number of work units that are pending to be processed.
1330
*/
1331
get pending(): number { return this.pendingWork.length; }
1332
1333
/**
1334
* Add units to be worked on. Use `pending` to figure out
1335
* how many units are not yet processed after this method
1336
* was called.
1337
*
1338
* @returns whether the work was accepted or not. If the
1339
* worker is disposed, it will not accept any more work.
1340
* If the number of pending units would become larger
1341
* than `maxPendingWork`, more work will also not be accepted.
1342
*/
1343
work(units: readonly T[]): boolean {
1344
if (this.disposed) {
1345
return false; // work not accepted: disposed
1346
}
1347
1348
// Check for reaching maximum of pending work
1349
if (typeof this.options.maxBufferedWork === 'number') {
1350
1351
// Throttled: simple check if pending + units exceeds max pending
1352
if (this.throttler.value) {
1353
if (this.pending + units.length > this.options.maxBufferedWork) {
1354
return false; // work not accepted: too much pending work
1355
}
1356
}
1357
1358
// Unthrottled: same as throttled, but account for max chunk getting
1359
// worked on directly without being pending
1360
else {
1361
if (this.pending + units.length - this.options.maxWorkChunkSize > this.options.maxBufferedWork) {
1362
return false; // work not accepted: too much pending work
1363
}
1364
}
1365
}
1366
1367
// Add to pending units first
1368
for (const unit of units) {
1369
this.pendingWork.push(unit);
1370
}
1371
1372
const timeSinceLastExecution = Date.now() - this.lastExecutionTime;
1373
1374
if (!this.throttler.value && (!this.options.waitThrottleDelayBetweenWorkUnits || timeSinceLastExecution >= this.options.throttleDelay)) {
1375
// Work directly if we are not throttling and we are not
1376
// enforced to throttle between `work()` calls.
1377
this.doWork();
1378
} else if (!this.throttler.value && this.options.waitThrottleDelayBetweenWorkUnits) {
1379
// Otherwise, schedule the throttler to work.
1380
this.scheduleThrottler(Math.max(this.options.throttleDelay - timeSinceLastExecution, 0));
1381
} else {
1382
// Otherwise, our work will be picked up by the running throttler
1383
}
1384
1385
return true; // work accepted
1386
}
1387
1388
private doWork(): void {
1389
this.lastExecutionTime = Date.now();
1390
1391
// Extract chunk to handle and handle it
1392
this.handler(this.pendingWork.splice(0, this.options.maxWorkChunkSize));
1393
1394
// If we have remaining work, schedule it after a delay
1395
if (this.pendingWork.length > 0) {
1396
this.scheduleThrottler();
1397
}
1398
}
1399
1400
private scheduleThrottler(delay = this.options.throttleDelay): void {
1401
this.throttler.value = new RunOnceScheduler(() => {
1402
this.throttler.clear();
1403
1404
this.doWork();
1405
}, delay);
1406
this.throttler.value.schedule();
1407
}
1408
1409
override dispose(): void {
1410
super.dispose();
1411
1412
this.pendingWork.length = 0;
1413
this.disposed = true;
1414
}
1415
}
1416
1417
//#region -- run on idle tricks ------------
1418
1419
export interface IdleDeadline {
1420
readonly didTimeout: boolean;
1421
timeRemaining(): number;
1422
}
1423
1424
type IdleApi = Pick<typeof globalThis, 'requestIdleCallback' | 'cancelIdleCallback'>;
1425
1426
1427
/**
1428
* Execute the callback the next time the browser is idle, returning an
1429
* {@link IDisposable} that will cancel the callback when disposed. This wraps
1430
* [requestIdleCallback] so it will fallback to [setTimeout] if the environment
1431
* doesn't support it.
1432
*
1433
* @param callback The callback to run when idle, this includes an
1434
* [IdleDeadline] that provides the time alloted for the idle callback by the
1435
* browser. Not respecting this deadline will result in a degraded user
1436
* experience.
1437
* @param timeout A timeout at which point to queue no longer wait for an idle
1438
* callback but queue it on the regular event loop (like setTimeout). Typically
1439
* this should not be used.
1440
*
1441
* [IdleDeadline]: https://developer.mozilla.org/en-US/docs/Web/API/IdleDeadline
1442
* [requestIdleCallback]: https://developer.mozilla.org/en-US/docs/Web/API/Window/requestIdleCallback
1443
* [setTimeout]: https://developer.mozilla.org/en-US/docs/Web/API/Window/setTimeout
1444
*
1445
* **Note** that there is `dom.ts#runWhenWindowIdle` which is better suited when running inside a browser
1446
* context
1447
*/
1448
export let runWhenGlobalIdle: (callback: (idle: IdleDeadline) => void, timeout?: number) => IDisposable;
1449
1450
export let _runWhenIdle: (targetWindow: IdleApi, callback: (idle: IdleDeadline) => void, timeout?: number) => IDisposable;
1451
1452
(function () {
1453
const safeGlobal: any = globalThis;
1454
if (typeof safeGlobal.requestIdleCallback !== 'function' || typeof safeGlobal.cancelIdleCallback !== 'function') {
1455
_runWhenIdle = (_targetWindow, runner, timeout?) => {
1456
setTimeout0(() => {
1457
if (disposed) {
1458
return;
1459
}
1460
const end = Date.now() + 15; // one frame at 64fps
1461
const deadline: IdleDeadline = {
1462
didTimeout: true,
1463
timeRemaining() {
1464
return Math.max(0, end - Date.now());
1465
}
1466
};
1467
runner(Object.freeze(deadline));
1468
});
1469
let disposed = false;
1470
return {
1471
dispose() {
1472
if (disposed) {
1473
return;
1474
}
1475
disposed = true;
1476
}
1477
};
1478
};
1479
} else {
1480
_runWhenIdle = (targetWindow: typeof safeGlobal, runner, timeout?) => {
1481
const handle: number = targetWindow.requestIdleCallback(runner, typeof timeout === 'number' ? { timeout } : undefined);
1482
let disposed = false;
1483
return {
1484
dispose() {
1485
if (disposed) {
1486
return;
1487
}
1488
disposed = true;
1489
targetWindow.cancelIdleCallback(handle);
1490
}
1491
};
1492
};
1493
}
1494
runWhenGlobalIdle = (runner, timeout) => _runWhenIdle(globalThis, runner, timeout);
1495
})();
1496
1497
export abstract class AbstractIdleValue<T> {
1498
1499
private readonly _executor: () => void;
1500
private readonly _handle: IDisposable;
1501
1502
private _didRun: boolean = false;
1503
private _value?: T;
1504
private _error: unknown;
1505
1506
constructor(targetWindow: IdleApi, executor: () => T) {
1507
this._executor = () => {
1508
try {
1509
this._value = executor();
1510
} catch (err) {
1511
this._error = err;
1512
} finally {
1513
this._didRun = true;
1514
}
1515
};
1516
this._handle = _runWhenIdle(targetWindow, () => this._executor());
1517
}
1518
1519
dispose(): void {
1520
this._handle.dispose();
1521
}
1522
1523
get value(): T {
1524
if (!this._didRun) {
1525
this._handle.dispose();
1526
this._executor();
1527
}
1528
if (this._error) {
1529
throw this._error;
1530
}
1531
return this._value!;
1532
}
1533
1534
get isInitialized(): boolean {
1535
return this._didRun;
1536
}
1537
}
1538
1539
/**
1540
* An `IdleValue` that always uses the current window (which might be throttled or inactive)
1541
*
1542
* **Note** that there is `dom.ts#WindowIdleValue` which is better suited when running inside a browser
1543
* context
1544
*/
1545
export class GlobalIdleValue<T> extends AbstractIdleValue<T> {
1546
1547
constructor(executor: () => T) {
1548
super(globalThis, executor);
1549
}
1550
}
1551
1552
//#endregion
1553
1554
export async function retry<T>(task: ITask<Promise<T>>, delay: number, retries: number): Promise<T> {
1555
let lastError: Error | undefined;
1556
1557
for (let i = 0; i < retries; i++) {
1558
try {
1559
return await task();
1560
} catch (error) {
1561
lastError = error;
1562
1563
await timeout(delay);
1564
}
1565
}
1566
1567
throw lastError;
1568
}
1569
1570
//#region Task Sequentializer
1571
1572
interface IRunningTask {
1573
readonly taskId: number;
1574
readonly cancel: () => void;
1575
readonly promise: Promise<void>;
1576
}
1577
1578
interface IQueuedTask {
1579
readonly promise: Promise<void>;
1580
readonly promiseResolve: () => void;
1581
readonly promiseReject: (error: Error) => void;
1582
run: ITask<Promise<void>>;
1583
}
1584
1585
export interface ITaskSequentializerWithRunningTask {
1586
readonly running: Promise<void>;
1587
}
1588
1589
export interface ITaskSequentializerWithQueuedTask {
1590
readonly queued: IQueuedTask;
1591
}
1592
1593
/**
1594
* @deprecated use `LimitedQueue` instead for an easier to use API
1595
*/
1596
export class TaskSequentializer {
1597
1598
private _running?: IRunningTask;
1599
private _queued?: IQueuedTask;
1600
1601
isRunning(taskId?: number): this is ITaskSequentializerWithRunningTask {
1602
if (typeof taskId === 'number') {
1603
return this._running?.taskId === taskId;
1604
}
1605
1606
return !!this._running;
1607
}
1608
1609
get running(): Promise<void> | undefined {
1610
return this._running?.promise;
1611
}
1612
1613
cancelRunning(): void {
1614
this._running?.cancel();
1615
}
1616
1617
run(taskId: number, promise: Promise<void>, onCancel?: () => void,): Promise<void> {
1618
this._running = { taskId, cancel: () => onCancel?.(), promise };
1619
1620
promise.then(() => this.doneRunning(taskId), () => this.doneRunning(taskId));
1621
1622
return promise;
1623
}
1624
1625
private doneRunning(taskId: number): void {
1626
if (this._running && taskId === this._running.taskId) {
1627
1628
// only set running to done if the promise finished that is associated with that taskId
1629
this._running = undefined;
1630
1631
// schedule the queued task now that we are free if we have any
1632
this.runQueued();
1633
}
1634
}
1635
1636
private runQueued(): void {
1637
if (this._queued) {
1638
const queued = this._queued;
1639
this._queued = undefined;
1640
1641
// Run queued task and complete on the associated promise
1642
queued.run().then(queued.promiseResolve, queued.promiseReject);
1643
}
1644
}
1645
1646
/**
1647
* Note: the promise to schedule as next run MUST itself call `run`.
1648
* Otherwise, this sequentializer will report `false` for `isRunning`
1649
* even when this task is running. Missing this detail means that
1650
* suddenly multiple tasks will run in parallel.
1651
*/
1652
queue(run: ITask<Promise<void>>): Promise<void> {
1653
1654
// this is our first queued task, so we create associated promise with it
1655
// so that we can return a promise that completes when the task has
1656
// completed.
1657
if (!this._queued) {
1658
const { promise, resolve: promiseResolve, reject: promiseReject } = promiseWithResolvers<void>();
1659
this._queued = {
1660
run,
1661
promise,
1662
promiseResolve,
1663
promiseReject
1664
};
1665
}
1666
1667
// we have a previous queued task, just overwrite it
1668
else {
1669
this._queued.run = run;
1670
}
1671
1672
return this._queued.promise;
1673
}
1674
1675
hasQueued(): this is ITaskSequentializerWithQueuedTask {
1676
return !!this._queued;
1677
}
1678
1679
async join(): Promise<void> {
1680
return this._queued?.promise ?? this._running?.promise;
1681
}
1682
}
1683
1684
//#endregion
1685
1686
//#region
1687
1688
/**
1689
* The `IntervalCounter` allows to count the number
1690
* of calls to `increment()` over a duration of
1691
* `interval`. This utility can be used to conditionally
1692
* throttle a frequent task when a certain threshold
1693
* is reached.
1694
*/
1695
export class IntervalCounter {
1696
1697
private lastIncrementTime = 0;
1698
1699
private value = 0;
1700
1701
constructor(private readonly interval: number, private readonly nowFn = () => Date.now()) { }
1702
1703
increment(): number {
1704
const now = this.nowFn();
1705
1706
// We are outside of the range of `interval` and as such
1707
// start counting from 0 and remember the time
1708
if (now - this.lastIncrementTime > this.interval) {
1709
this.lastIncrementTime = now;
1710
this.value = 0;
1711
}
1712
1713
this.value++;
1714
1715
return this.value;
1716
}
1717
}
1718
1719
//#endregion
1720
1721
//#region
1722
1723
export type ValueCallback<T = unknown> = (value: T | Promise<T>) => void;
1724
1725
const enum DeferredOutcome {
1726
Resolved,
1727
Rejected
1728
}
1729
1730
/**
1731
* Creates a promise whose resolution or rejection can be controlled imperatively.
1732
*/
1733
export class DeferredPromise<T> {
1734
1735
public static fromPromise<T>(promise: Promise<T>): DeferredPromise<T> {
1736
const deferred = new DeferredPromise<T>();
1737
deferred.settleWith(promise);
1738
return deferred;
1739
}
1740
1741
private completeCallback!: ValueCallback<T>;
1742
private errorCallback!: (err: unknown) => void;
1743
private outcome?: { outcome: DeferredOutcome.Rejected; value: unknown } | { outcome: DeferredOutcome.Resolved; value: T };
1744
1745
public get isRejected() {
1746
return this.outcome?.outcome === DeferredOutcome.Rejected;
1747
}
1748
1749
public get isResolved() {
1750
return this.outcome?.outcome === DeferredOutcome.Resolved;
1751
}
1752
1753
public get isSettled() {
1754
return !!this.outcome;
1755
}
1756
1757
public get value() {
1758
return this.outcome?.outcome === DeferredOutcome.Resolved ? this.outcome?.value : undefined;
1759
}
1760
1761
public readonly p: Promise<T>;
1762
1763
constructor() {
1764
this.p = new Promise<T>((c, e) => {
1765
this.completeCallback = c;
1766
this.errorCallback = e;
1767
});
1768
}
1769
1770
public complete(value: T) {
1771
if (this.isSettled) {
1772
return Promise.resolve();
1773
}
1774
1775
return new Promise<void>(resolve => {
1776
this.completeCallback(value);
1777
this.outcome = { outcome: DeferredOutcome.Resolved, value };
1778
resolve();
1779
});
1780
}
1781
1782
public error(err: unknown) {
1783
if (this.isSettled) {
1784
return Promise.resolve();
1785
}
1786
1787
return new Promise<void>(resolve => {
1788
this.errorCallback(err);
1789
this.outcome = { outcome: DeferredOutcome.Rejected, value: err };
1790
resolve();
1791
});
1792
}
1793
1794
public settleWith(promise: Promise<T>): Promise<void> {
1795
return promise.then(
1796
value => this.complete(value),
1797
error => this.error(error)
1798
);
1799
}
1800
1801
public cancel() {
1802
return this.error(new CancellationError());
1803
}
1804
}
1805
1806
//#endregion
1807
1808
//#region Promises
1809
1810
export namespace Promises {
1811
1812
/**
1813
* A drop-in replacement for `Promise.all` with the only difference
1814
* that the method awaits every promise to either fulfill or reject.
1815
*
1816
* Similar to `Promise.all`, only the first error will be returned
1817
* if any.
1818
*/
1819
export async function settled<T>(promises: Promise<T>[]): Promise<T[]> {
1820
let firstError: Error | undefined = undefined;
1821
1822
const result = await Promise.all(promises.map(promise => promise.then(value => value, error => {
1823
if (!firstError) {
1824
firstError = error;
1825
}
1826
1827
return undefined; // do not rethrow so that other promises can settle
1828
})));
1829
1830
if (typeof firstError !== 'undefined') {
1831
throw firstError;
1832
}
1833
1834
return result as unknown as T[]; // cast is needed and protected by the `throw` above
1835
}
1836
1837
/**
1838
* A helper to create a new `Promise<T>` with a body that is a promise
1839
* itself. By default, an error that raises from the async body will
1840
* end up as a unhandled rejection, so this utility properly awaits the
1841
* body and rejects the promise as a normal promise does without async
1842
* body.
1843
*
1844
* This method should only be used in rare cases where otherwise `async`
1845
* cannot be used (e.g. when callbacks are involved that require this).
1846
*/
1847
export function withAsyncBody<T, E = Error>(bodyFn: (resolve: (value: T) => unknown, reject: (error: E) => unknown) => Promise<unknown>): Promise<T> {
1848
// eslint-disable-next-line no-async-promise-executor
1849
return new Promise<T>(async (resolve, reject) => {
1850
try {
1851
await bodyFn(resolve, reject);
1852
} catch (error) {
1853
reject(error);
1854
}
1855
});
1856
}
1857
}
1858
1859
export class StatefulPromise<T> {
1860
private _value: T | undefined = undefined;
1861
get value(): T | undefined { return this._value; }
1862
1863
private _error: unknown = undefined;
1864
get error(): unknown { return this._error; }
1865
1866
private _isResolved = false;
1867
get isResolved() { return this._isResolved; }
1868
1869
public readonly promise: Promise<T>;
1870
1871
constructor(promise: Promise<T>) {
1872
this.promise = promise.then(
1873
value => {
1874
this._value = value;
1875
this._isResolved = true;
1876
return value;
1877
},
1878
error => {
1879
this._error = error;
1880
this._isResolved = true;
1881
throw error;
1882
}
1883
);
1884
}
1885
1886
/**
1887
* Returns the resolved value.
1888
* Throws if the promise is not resolved yet.
1889
*/
1890
public requireValue(): T {
1891
if (!this._isResolved) {
1892
throw new BugIndicatingError('Promise is not resolved yet');
1893
}
1894
if (this._error) {
1895
throw this._error;
1896
}
1897
return this._value!;
1898
}
1899
}
1900
1901
export class LazyStatefulPromise<T> {
1902
private readonly _promise = new Lazy(() => new StatefulPromise(this._compute()));
1903
1904
constructor(
1905
private readonly _compute: () => Promise<T>,
1906
) { }
1907
1908
/**
1909
* Returns the resolved value.
1910
* Throws if the promise is not resolved yet.
1911
*/
1912
public requireValue(): T {
1913
return this._promise.value.requireValue();
1914
}
1915
1916
/**
1917
* Returns the promise (and triggers a computation of the promise if not yet done so).
1918
*/
1919
public getPromise(): Promise<T> {
1920
return this._promise.value.promise;
1921
}
1922
1923
/**
1924
* Reads the current value without triggering a computation of the promise.
1925
*/
1926
public get currentValue(): T | undefined {
1927
return this._promise.rawValue?.value;
1928
}
1929
}
1930
1931
//#endregion
1932
1933
//#region
1934
1935
const enum AsyncIterableSourceState {
1936
Initial,
1937
DoneOK,
1938
DoneError,
1939
}
1940
1941
/**
1942
* An object that allows to emit async values asynchronously or bring the iterable to an error state using `reject()`.
1943
* This emitter is valid only for the duration of the executor (until the promise returned by the executor settles).
1944
*/
1945
export interface AsyncIterableEmitter<T> {
1946
/**
1947
* The value will be appended at the end.
1948
*
1949
* **NOTE** If `reject()` has already been called, this method has no effect.
1950
*/
1951
emitOne(value: T): void;
1952
/**
1953
* The values will be appended at the end.
1954
*
1955
* **NOTE** If `reject()` has already been called, this method has no effect.
1956
*/
1957
emitMany(values: T[]): void;
1958
/**
1959
* Writing an error will permanently invalidate this iterable.
1960
* The current users will receive an error thrown, as will all future users.
1961
*
1962
* **NOTE** If `reject()` have already been called, this method has no effect.
1963
*/
1964
reject(error: Error): void;
1965
}
1966
1967
/**
1968
* An executor for the `AsyncIterableObject` that has access to an emitter.
1969
*/
1970
export interface AsyncIterableExecutor<T> {
1971
/**
1972
* @param emitter An object that allows to emit async values valid only for the duration of the executor.
1973
*/
1974
(emitter: AsyncIterableEmitter<T>): unknown | Promise<unknown>;
1975
}
1976
1977
/**
1978
* A rich implementation for an `AsyncIterable<T>`.
1979
*/
1980
export class AsyncIterableObject<T> implements AsyncIterable<T> {
1981
1982
public static fromArray<T>(items: T[]): AsyncIterableObject<T> {
1983
return new AsyncIterableObject<T>((writer) => {
1984
writer.emitMany(items);
1985
});
1986
}
1987
1988
public static fromPromise<T>(promise: Promise<T[]>): AsyncIterableObject<T> {
1989
return new AsyncIterableObject<T>(async (emitter) => {
1990
emitter.emitMany(await promise);
1991
});
1992
}
1993
1994
public static fromPromisesResolveOrder<T>(promises: Promise<T>[]): AsyncIterableObject<T> {
1995
return new AsyncIterableObject<T>(async (emitter) => {
1996
await Promise.all(promises.map(async (p) => emitter.emitOne(await p)));
1997
});
1998
}
1999
2000
public static merge<T>(iterables: AsyncIterable<T>[]): AsyncIterableObject<T> {
2001
return new AsyncIterableObject(async (emitter) => {
2002
await Promise.all(iterables.map(async (iterable) => {
2003
for await (const item of iterable) {
2004
emitter.emitOne(item);
2005
}
2006
}));
2007
});
2008
}
2009
2010
public static EMPTY = AsyncIterableObject.fromArray<any>([]);
2011
2012
private _state: AsyncIterableSourceState;
2013
private _results: T[];
2014
private _error: Error | null;
2015
private readonly _onReturn?: () => void | Promise<void>;
2016
private readonly _onStateChanged: Emitter<void>;
2017
2018
constructor(executor: AsyncIterableExecutor<T>, onReturn?: () => void | Promise<void>) {
2019
this._state = AsyncIterableSourceState.Initial;
2020
this._results = [];
2021
this._error = null;
2022
this._onReturn = onReturn;
2023
this._onStateChanged = new Emitter<void>();
2024
2025
queueMicrotask(async () => {
2026
const writer: AsyncIterableEmitter<T> = {
2027
emitOne: (item) => this.emitOne(item),
2028
emitMany: (items) => this.emitMany(items),
2029
reject: (error) => this.reject(error)
2030
};
2031
try {
2032
await Promise.resolve(executor(writer));
2033
this.resolve();
2034
} catch (err) {
2035
this.reject(err);
2036
} finally {
2037
writer.emitOne = undefined!;
2038
writer.emitMany = undefined!;
2039
writer.reject = undefined!;
2040
}
2041
});
2042
}
2043
2044
[Symbol.asyncIterator](): AsyncIterator<T, undefined, undefined> {
2045
let i = 0;
2046
return {
2047
next: async () => {
2048
do {
2049
if (this._state === AsyncIterableSourceState.DoneError) {
2050
throw this._error;
2051
}
2052
if (i < this._results.length) {
2053
return { done: false, value: this._results[i++] };
2054
}
2055
if (this._state === AsyncIterableSourceState.DoneOK) {
2056
return { done: true, value: undefined };
2057
}
2058
await Event.toPromise(this._onStateChanged.event);
2059
} while (true);
2060
},
2061
return: async () => {
2062
this._onReturn?.();
2063
return { done: true, value: undefined };
2064
}
2065
};
2066
}
2067
2068
public static map<T, R>(iterable: AsyncIterable<T>, mapFn: (item: T) => R): AsyncIterableObject<R> {
2069
return new AsyncIterableObject<R>(async (emitter) => {
2070
for await (const item of iterable) {
2071
emitter.emitOne(mapFn(item));
2072
}
2073
});
2074
}
2075
2076
public map<R>(mapFn: (item: T) => R): AsyncIterableObject<R> {
2077
return AsyncIterableObject.map(this, mapFn);
2078
}
2079
2080
public static filter<T>(iterable: AsyncIterable<T>, filterFn: (item: T) => boolean): AsyncIterableObject<T> {
2081
return new AsyncIterableObject<T>(async (emitter) => {
2082
for await (const item of iterable) {
2083
if (filterFn(item)) {
2084
emitter.emitOne(item);
2085
}
2086
}
2087
});
2088
}
2089
2090
public filter<T2 extends T>(filterFn: (item: T) => item is T2): AsyncIterableObject<T2>;
2091
public filter(filterFn: (item: T) => boolean): AsyncIterableObject<T>;
2092
public filter(filterFn: (item: T) => boolean): AsyncIterableObject<T> {
2093
return AsyncIterableObject.filter(this, filterFn);
2094
}
2095
2096
public static coalesce<T>(iterable: AsyncIterable<T | undefined | null>): AsyncIterableObject<T> {
2097
return <AsyncIterableObject<T>>AsyncIterableObject.filter(iterable, item => !!item);
2098
}
2099
2100
public coalesce(): AsyncIterableObject<NonNullable<T>> {
2101
return AsyncIterableObject.coalesce(this) as AsyncIterableObject<NonNullable<T>>;
2102
}
2103
2104
public static async toPromise<T>(iterable: AsyncIterable<T>): Promise<T[]> {
2105
const result: T[] = [];
2106
for await (const item of iterable) {
2107
result.push(item);
2108
}
2109
return result;
2110
}
2111
2112
public toPromise(): Promise<T[]> {
2113
return AsyncIterableObject.toPromise(this);
2114
}
2115
2116
/**
2117
* The value will be appended at the end.
2118
*
2119
* **NOTE** If `resolve()` or `reject()` have already been called, this method has no effect.
2120
*/
2121
private emitOne(value: T): void {
2122
if (this._state !== AsyncIterableSourceState.Initial) {
2123
return;
2124
}
2125
// it is important to add new values at the end,
2126
// as we may have iterators already running on the array
2127
this._results.push(value);
2128
this._onStateChanged.fire();
2129
}
2130
2131
/**
2132
* The values will be appended at the end.
2133
*
2134
* **NOTE** If `resolve()` or `reject()` have already been called, this method has no effect.
2135
*/
2136
private emitMany(values: T[]): void {
2137
if (this._state !== AsyncIterableSourceState.Initial) {
2138
return;
2139
}
2140
// it is important to add new values at the end,
2141
// as we may have iterators already running on the array
2142
this._results = this._results.concat(values);
2143
this._onStateChanged.fire();
2144
}
2145
2146
/**
2147
* Calling `resolve()` will mark the result array as complete.
2148
*
2149
* **NOTE** `resolve()` must be called, otherwise all consumers of this iterable will hang indefinitely, similar to a non-resolved promise.
2150
* **NOTE** If `resolve()` or `reject()` have already been called, this method has no effect.
2151
*/
2152
private resolve(): void {
2153
if (this._state !== AsyncIterableSourceState.Initial) {
2154
return;
2155
}
2156
this._state = AsyncIterableSourceState.DoneOK;
2157
this._onStateChanged.fire();
2158
}
2159
2160
/**
2161
* Writing an error will permanently invalidate this iterable.
2162
* The current users will receive an error thrown, as will all future users.
2163
*
2164
* **NOTE** If `resolve()` or `reject()` have already been called, this method has no effect.
2165
*/
2166
private reject(error: Error) {
2167
if (this._state !== AsyncIterableSourceState.Initial) {
2168
return;
2169
}
2170
this._state = AsyncIterableSourceState.DoneError;
2171
this._error = error;
2172
this._onStateChanged.fire();
2173
}
2174
}
2175
2176
2177
export function createCancelableAsyncIterableProducer<T>(callback: (token: CancellationToken) => AsyncIterable<T>): CancelableAsyncIterableProducer<T> {
2178
const source = new CancellationTokenSource();
2179
const innerIterable = callback(source.token);
2180
2181
return new CancelableAsyncIterableProducer<T>(source, async (emitter) => {
2182
const subscription = source.token.onCancellationRequested(() => {
2183
subscription.dispose();
2184
source.dispose();
2185
emitter.reject(new CancellationError());
2186
});
2187
try {
2188
for await (const item of innerIterable) {
2189
if (source.token.isCancellationRequested) {
2190
// canceled in the meantime
2191
return;
2192
}
2193
emitter.emitOne(item);
2194
}
2195
subscription.dispose();
2196
source.dispose();
2197
} catch (err) {
2198
subscription.dispose();
2199
source.dispose();
2200
emitter.reject(err);
2201
}
2202
});
2203
}
2204
2205
export class AsyncIterableSource<T> {
2206
2207
private readonly _deferred = new DeferredPromise<void>();
2208
private readonly _asyncIterable: AsyncIterableObject<T>;
2209
2210
private _errorFn: (error: Error) => void;
2211
private _emitOneFn: (item: T) => void;
2212
private _emitManyFn: (item: T[]) => void;
2213
2214
/**
2215
*
2216
* @param onReturn A function that will be called when consuming the async iterable
2217
* has finished by the consumer, e.g the for-await-loop has be existed (break, return) early.
2218
* This is NOT called when resolving this source by its owner.
2219
*/
2220
constructor(onReturn?: () => Promise<void> | void) {
2221
this._asyncIterable = new AsyncIterableObject(emitter => {
2222
2223
if (earlyError) {
2224
emitter.reject(earlyError);
2225
return;
2226
}
2227
if (earlyItems) {
2228
emitter.emitMany(earlyItems);
2229
}
2230
this._errorFn = (error: Error) => emitter.reject(error);
2231
this._emitOneFn = (item: T) => emitter.emitOne(item);
2232
this._emitManyFn = (items: T[]) => emitter.emitMany(items);
2233
return this._deferred.p;
2234
}, onReturn);
2235
2236
let earlyError: Error | undefined;
2237
let earlyItems: T[] | undefined;
2238
2239
2240
this._errorFn = (error: Error) => {
2241
if (!earlyError) {
2242
earlyError = error;
2243
}
2244
};
2245
this._emitOneFn = (item: T) => {
2246
if (!earlyItems) {
2247
earlyItems = [];
2248
}
2249
earlyItems.push(item);
2250
};
2251
this._emitManyFn = (items: T[]) => {
2252
if (!earlyItems) {
2253
earlyItems = items.slice();
2254
} else {
2255
items.forEach(item => earlyItems!.push(item));
2256
}
2257
};
2258
}
2259
2260
get asyncIterable(): AsyncIterableObject<T> {
2261
return this._asyncIterable;
2262
}
2263
2264
resolve(): void {
2265
this._deferred.complete();
2266
}
2267
2268
reject(error: Error): void {
2269
this._errorFn(error);
2270
this._deferred.complete();
2271
}
2272
2273
emitOne(item: T): void {
2274
this._emitOneFn(item);
2275
}
2276
2277
emitMany(items: T[]) {
2278
this._emitManyFn(items);
2279
}
2280
}
2281
2282
export function cancellableIterable<T>(iterableOrIterator: AsyncIterator<T> | AsyncIterable<T>, token: CancellationToken): AsyncIterableIterator<T> {
2283
const iterator = Symbol.asyncIterator in iterableOrIterator ? iterableOrIterator[Symbol.asyncIterator]() : iterableOrIterator;
2284
2285
return {
2286
async next(): Promise<IteratorResult<T>> {
2287
if (token.isCancellationRequested) {
2288
return { done: true, value: undefined };
2289
}
2290
const result = await raceCancellation(iterator.next(), token);
2291
return result || { done: true, value: undefined };
2292
},
2293
throw: iterator.throw?.bind(iterator),
2294
return: iterator.return?.bind(iterator),
2295
[Symbol.asyncIterator]() {
2296
return this;
2297
}
2298
};
2299
}
2300
2301
type ProducerConsumerValue<T> = {
2302
ok: true;
2303
value: T;
2304
} | {
2305
ok: false;
2306
error: Error;
2307
};
2308
2309
class ProducerConsumer<T> {
2310
private readonly _unsatisfiedConsumers: DeferredPromise<T>[] = [];
2311
private readonly _unconsumedValues: ProducerConsumerValue<T>[] = [];
2312
private _finalValue: ProducerConsumerValue<T> | undefined;
2313
2314
public get hasFinalValue(): boolean {
2315
return !!this._finalValue;
2316
}
2317
2318
produce(value: ProducerConsumerValue<T>): void {
2319
this._ensureNoFinalValue();
2320
if (this._unsatisfiedConsumers.length > 0) {
2321
const deferred = this._unsatisfiedConsumers.shift()!;
2322
this._resolveOrRejectDeferred(deferred, value);
2323
} else {
2324
this._unconsumedValues.push(value);
2325
}
2326
}
2327
2328
produceFinal(value: ProducerConsumerValue<T>): void {
2329
this._ensureNoFinalValue();
2330
this._finalValue = value;
2331
for (const deferred of this._unsatisfiedConsumers) {
2332
this._resolveOrRejectDeferred(deferred, value);
2333
}
2334
this._unsatisfiedConsumers.length = 0;
2335
}
2336
2337
private _ensureNoFinalValue(): void {
2338
if (this._finalValue) {
2339
throw new BugIndicatingError('ProducerConsumer: cannot produce after final value has been set');
2340
}
2341
}
2342
2343
private _resolveOrRejectDeferred(deferred: DeferredPromise<T>, value: ProducerConsumerValue<T>): void {
2344
if (value.ok) {
2345
deferred.complete(value.value);
2346
} else {
2347
deferred.error(value.error);
2348
}
2349
}
2350
2351
consume(): Promise<T> {
2352
if (this._unconsumedValues.length > 0 || this._finalValue) {
2353
const value = this._unconsumedValues.length > 0 ? this._unconsumedValues.shift()! : this._finalValue!;
2354
if (value.ok) {
2355
return Promise.resolve(value.value);
2356
} else {
2357
return Promise.reject(value.error);
2358
}
2359
} else {
2360
const deferred = new DeferredPromise<T>();
2361
this._unsatisfiedConsumers.push(deferred);
2362
return deferred.p;
2363
}
2364
}
2365
}
2366
2367
/**
2368
* Important difference to AsyncIterableObject:
2369
* If it is iterated two times, the second iterator will not see the values emitted by the first iterator.
2370
*/
2371
export class AsyncIterableProducer<T> implements AsyncIterable<T> {
2372
private readonly _producerConsumer = new ProducerConsumer<IteratorResult<T>>();
2373
2374
constructor(executor: AsyncIterableExecutor<T>, private readonly _onReturn?: () => void) {
2375
queueMicrotask(async () => {
2376
const p = executor({
2377
emitOne: value => this._producerConsumer.produce({ ok: true, value: { done: false, value: value } }),
2378
emitMany: values => {
2379
for (const value of values) {
2380
this._producerConsumer.produce({ ok: true, value: { done: false, value: value } });
2381
}
2382
},
2383
reject: error => this._finishError(error),
2384
});
2385
2386
if (!this._producerConsumer.hasFinalValue) {
2387
try {
2388
await p;
2389
this._finishOk();
2390
} catch (error) {
2391
this._finishError(error);
2392
}
2393
}
2394
});
2395
}
2396
2397
public static fromArray<T>(items: T[]): AsyncIterableProducer<T> {
2398
return new AsyncIterableProducer<T>((writer) => {
2399
writer.emitMany(items);
2400
});
2401
}
2402
2403
public static fromPromise<T>(promise: Promise<T[]>): AsyncIterableProducer<T> {
2404
return new AsyncIterableProducer<T>(async (emitter) => {
2405
emitter.emitMany(await promise);
2406
});
2407
}
2408
2409
public static fromPromisesResolveOrder<T>(promises: Promise<T>[]): AsyncIterableProducer<T> {
2410
return new AsyncIterableProducer<T>(async (emitter) => {
2411
await Promise.all(promises.map(async (p) => emitter.emitOne(await p)));
2412
});
2413
}
2414
2415
public static merge<T>(iterables: AsyncIterable<T>[]): AsyncIterableProducer<T> {
2416
return new AsyncIterableProducer(async (emitter) => {
2417
await Promise.all(iterables.map(async (iterable) => {
2418
for await (const item of iterable) {
2419
emitter.emitOne(item);
2420
}
2421
}));
2422
});
2423
}
2424
2425
public static EMPTY = AsyncIterableProducer.fromArray<any>([]);
2426
2427
public static map<T, R>(iterable: AsyncIterable<T>, mapFn: (item: T) => R): AsyncIterableProducer<R> {
2428
return new AsyncIterableProducer<R>(async (emitter) => {
2429
for await (const item of iterable) {
2430
emitter.emitOne(mapFn(item));
2431
}
2432
});
2433
}
2434
2435
public static tee<T>(iterable: AsyncIterable<T>): [AsyncIterableProducer<T>, AsyncIterableProducer<T>] {
2436
let emitter1: AsyncIterableEmitter<T> | undefined;
2437
let emitter2: AsyncIterableEmitter<T> | undefined;
2438
2439
const defer = new DeferredPromise<void>();
2440
2441
const start = async () => {
2442
if (!emitter1 || !emitter2) {
2443
return; // not yet ready
2444
}
2445
try {
2446
for await (const item of iterable) {
2447
emitter1.emitOne(item);
2448
emitter2.emitOne(item);
2449
}
2450
} catch (err) {
2451
emitter1.reject(err);
2452
emitter2.reject(err);
2453
} finally {
2454
defer.complete();
2455
}
2456
};
2457
2458
const p1 = new AsyncIterableProducer<T>(async (emitter) => {
2459
emitter1 = emitter;
2460
start();
2461
return defer.p;
2462
});
2463
const p2 = new AsyncIterableProducer<T>(async (emitter) => {
2464
emitter2 = emitter;
2465
start();
2466
return defer.p;
2467
});
2468
return [p1, p2];
2469
}
2470
2471
public map<R>(mapFn: (item: T) => R): AsyncIterableProducer<R> {
2472
return AsyncIterableProducer.map(this, mapFn);
2473
}
2474
2475
public static coalesce<T>(iterable: AsyncIterable<T | undefined | null>): AsyncIterableProducer<T> {
2476
return <AsyncIterableProducer<T>>AsyncIterableProducer.filter(iterable, item => !!item);
2477
}
2478
2479
public coalesce(): AsyncIterableProducer<NonNullable<T>> {
2480
return AsyncIterableProducer.coalesce(this) as AsyncIterableProducer<NonNullable<T>>;
2481
}
2482
2483
public static filter<T>(iterable: AsyncIterable<T>, filterFn: (item: T) => boolean): AsyncIterableProducer<T> {
2484
return new AsyncIterableProducer<T>(async (emitter) => {
2485
for await (const item of iterable) {
2486
if (filterFn(item)) {
2487
emitter.emitOne(item);
2488
}
2489
}
2490
});
2491
}
2492
2493
public filter<T2 extends T>(filterFn: (item: T) => item is T2): AsyncIterableProducer<T2>;
2494
public filter(filterFn: (item: T) => boolean): AsyncIterableProducer<T>;
2495
public filter(filterFn: (item: T) => boolean): AsyncIterableProducer<T> {
2496
return AsyncIterableProducer.filter(this, filterFn);
2497
}
2498
2499
private _finishOk(): void {
2500
if (!this._producerConsumer.hasFinalValue) {
2501
this._producerConsumer.produceFinal({ ok: true, value: { done: true, value: undefined } });
2502
}
2503
}
2504
2505
private _finishError(error: Error): void {
2506
if (!this._producerConsumer.hasFinalValue) {
2507
this._producerConsumer.produceFinal({ ok: false, error: error });
2508
}
2509
// Warning: this can cause to dropped errors.
2510
}
2511
2512
private readonly _iterator: AsyncIterator<T, void, void> = {
2513
next: () => this._producerConsumer.consume(),
2514
return: () => {
2515
this._onReturn?.();
2516
return Promise.resolve({ done: true, value: undefined });
2517
},
2518
throw: async (e) => {
2519
this._finishError(e);
2520
return { done: true, value: undefined };
2521
},
2522
};
2523
2524
[Symbol.asyncIterator](): AsyncIterator<T, void, void> {
2525
return this._iterator;
2526
}
2527
}
2528
2529
export class CancelableAsyncIterableProducer<T> extends AsyncIterableProducer<T> {
2530
constructor(
2531
private readonly _source: CancellationTokenSource,
2532
executor: AsyncIterableExecutor<T>
2533
) {
2534
super(executor);
2535
}
2536
2537
cancel(): void {
2538
this._source.cancel();
2539
}
2540
}
2541
2542
//#endregion
2543
2544
export const AsyncReaderEndOfStream = Symbol('AsyncReaderEndOfStream');
2545
2546
export class AsyncReader<T> {
2547
private _buffer: T[] = [];
2548
private _atEnd = false;
2549
2550
public get endOfStream(): boolean { return this._buffer.length === 0 && this._atEnd; }
2551
private _extendBufferPromise: Promise<void> | undefined;
2552
2553
constructor(
2554
private readonly _source: AsyncIterator<T>
2555
) {
2556
}
2557
2558
public async read(): Promise<T | typeof AsyncReaderEndOfStream> {
2559
if (this._buffer.length === 0 && !this._atEnd) {
2560
await this._extendBuffer();
2561
}
2562
if (this._buffer.length === 0) {
2563
return AsyncReaderEndOfStream;
2564
}
2565
return this._buffer.shift()!;
2566
}
2567
2568
public async readWhile(predicate: (value: T) => boolean, callback: (element: T) => unknown): Promise<void> {
2569
do {
2570
const piece = await this.peek();
2571
if (piece === AsyncReaderEndOfStream) {
2572
break;
2573
}
2574
if (!predicate(piece)) {
2575
break;
2576
}
2577
await this.read(); // consume
2578
await callback(piece);
2579
} while (true);
2580
}
2581
2582
public readBufferedOrThrow(): T | typeof AsyncReaderEndOfStream {
2583
const value = this.peekBufferedOrThrow();
2584
this._buffer.shift();
2585
return value;
2586
}
2587
2588
public async consumeToEnd(): Promise<void> {
2589
while (!this.endOfStream) {
2590
await this.read();
2591
}
2592
}
2593
2594
public async peek(): Promise<T | typeof AsyncReaderEndOfStream> {
2595
if (this._buffer.length === 0 && !this._atEnd) {
2596
await this._extendBuffer();
2597
}
2598
if (this._buffer.length === 0) {
2599
return AsyncReaderEndOfStream;
2600
}
2601
return this._buffer[0];
2602
}
2603
2604
public peekBufferedOrThrow(): T | typeof AsyncReaderEndOfStream {
2605
if (this._buffer.length === 0) {
2606
if (this._atEnd) {
2607
return AsyncReaderEndOfStream;
2608
}
2609
throw new BugIndicatingError('No buffered elements');
2610
}
2611
2612
return this._buffer[0];
2613
}
2614
2615
public async peekTimeout(timeoutMs: number): Promise<T | typeof AsyncReaderEndOfStream | undefined> {
2616
if (this._buffer.length === 0 && !this._atEnd) {
2617
await raceTimeout(this._extendBuffer(), timeoutMs);
2618
}
2619
if (this._atEnd) {
2620
return AsyncReaderEndOfStream;
2621
}
2622
if (this._buffer.length === 0) {
2623
return undefined;
2624
}
2625
return this._buffer[0];
2626
}
2627
2628
private _extendBuffer(): Promise<void> {
2629
if (this._atEnd) {
2630
return Promise.resolve();
2631
}
2632
2633
if (!this._extendBufferPromise) {
2634
this._extendBufferPromise = (async () => {
2635
const { value, done } = await this._source.next();
2636
this._extendBufferPromise = undefined;
2637
if (done) {
2638
this._atEnd = true;
2639
} else {
2640
this._buffer.push(value);
2641
}
2642
})();
2643
}
2644
2645
return this._extendBufferPromise;
2646
}
2647
}
2648
2649