Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
ulixee
GitHub Repository: ulixee/secret-agent
Path: blob/main/commons/Queue.ts
1028 views
1
import IResolvablePromise from '@secret-agent/interfaces/IResolvablePromise';
2
import { createPromise } from './utils';
3
import { CanceledPromiseError } from './interfaces/IPendingWaitEvent';
4
import Resolvable from './Resolvable';
5
import getPrototypeOf = Reflect.getPrototypeOf;
6
7
type AsyncCallback<T> = (value?: any) => Promise<T>;
8
9
export default class Queue {
10
public concurrency = 1;
11
public idletimeMillis = 500;
12
public idlePromise = createPromise();
13
public get isActive(): boolean {
14
return (this.activeCount > 0 || this.queue.length > 0) && !this.stopDequeuing;
15
}
16
17
private readonly abortPromise = new Resolvable<CanceledPromiseError>();
18
private idleTimout: NodeJS.Timeout;
19
private activeCount = 0;
20
21
private stopDequeuing = false;
22
23
private queue: IQueueEntry[] = [];
24
25
constructor(readonly stacktraceMarker = 'QUEUE', concurrency?: number) {
26
if (concurrency) this.concurrency = concurrency;
27
}
28
29
public run<T>(cb: AsyncCallback<T>, timeoutMillis?: number): Promise<T> {
30
const promise = createPromise<T>(timeoutMillis);
31
32
this.queue.push({
33
promise,
34
cb,
35
startStack: new Error('').stack.slice(8), // "Error: \n" is 8 chars
36
});
37
38
this.next().catch(() => null);
39
return promise.promise;
40
}
41
42
public willStop(): void {
43
this.stopDequeuing = true;
44
}
45
46
public stop(error?: CanceledPromiseError): void {
47
const canceledError = error ?? new CanceledPromiseError('Canceling Queue Item');
48
this.abortPromise.resolve(canceledError);
49
while (this.queue.length) {
50
const next = this.queue.shift();
51
if (!next) continue;
52
53
this.reject(next, canceledError);
54
}
55
}
56
57
public canRunMoreConcurrently(): boolean {
58
return this.activeCount < this.concurrency;
59
}
60
61
private async next(): Promise<void> {
62
clearTimeout(this.idleTimout);
63
64
if (!this.canRunMoreConcurrently() || this.stopDequeuing === true) return;
65
66
const next = this.queue.shift();
67
if (!next) {
68
if (this.activeCount === 0) {
69
this.idleTimout = setTimeout(() => this.idlePromise.resolve(), this.idletimeMillis).unref();
70
}
71
return;
72
}
73
74
if (this.activeCount === 0 && this.idlePromise.isResolved) {
75
const newPromise = createPromise();
76
this.idlePromise?.resolve(newPromise.promise);
77
this.idlePromise = newPromise;
78
}
79
80
this.activeCount += 1;
81
try {
82
const res = await Promise.race([next.cb(), this.abortPromise.promise]);
83
if (this.abortPromise.isResolved) {
84
return this.reject(next, await this.abortPromise.promise);
85
}
86
87
next.promise.resolve(res);
88
} catch (error) {
89
this.reject(next, error);
90
} finally {
91
this.activeCount -= 1;
92
}
93
94
process.nextTick(() => this.next().catch(() => null));
95
}
96
97
private reject(entry: IQueueEntry, sourceError: Error): void {
98
const error = <Error>Object.create(getPrototypeOf(sourceError));
99
error.message = sourceError.message;
100
Object.assign(error, sourceError);
101
102
const marker = `------${this.stacktraceMarker}`.padEnd(50, '-');
103
error.stack = `${sourceError.stack}\n${marker}\n${entry.startStack}`;
104
entry.promise.reject(error);
105
}
106
}
107
108
interface IQueueEntry {
109
promise: IResolvablePromise;
110
cb: AsyncCallback<any>;
111
startStack: string;
112
}
113
114