Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/util/async-utils.ts
5614 views
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Some async utils.
8
9
(Obviously should be moved somewhere else when the dust settles!)
10
11
The two helpful async/await libraries I found are:
12
13
- https://github.com/hunterloftis/awaiting
14
- https://github.com/masotime/async-await-utils
15
16
*/
17
18
import * as awaiting from "awaiting";
19
import { reuseInFlight } from "./reuse-in-flight";
20
21
interface RetryOptions {
22
start?: number;
23
decay?: number;
24
max?: number;
25
min?: number;
26
timeout?: number;
27
log?: (...args) => void;
28
}
29
30
// loop calling the async function f until it returns true.
31
// It optionally can take a timeout, which if hit it will
32
// throw Error('timeout'). retry_until_success below is an
33
// a variant of this pattern keeps retrying until f doesn't throw.
34
// The input function f must always return true or false,
35
// which helps a lot to avoid bugs.
36
export async function until(
37
f: (() => Promise<boolean>) | (() => boolean),
38
{
39
start = 500,
40
decay = 1.3,
41
max = 15000,
42
min = 50,
43
timeout = 0,
44
log,
45
}: RetryOptions = {},
46
) {
47
const end = timeout ? Date.now() + timeout : undefined;
48
let d = Math.max(min, start);
49
while (end === undefined || Date.now() < end) {
50
const x = await f();
51
if (x) {
52
return;
53
}
54
if (end) {
55
d = Math.max(min, Math.min(end - Date.now(), Math.min(max, d * decay)));
56
} else {
57
d = Math.max(min, Math.min(max, d * decay));
58
}
59
log?.(`will retry in ${Math.round(d / 1000)} seconds`);
60
await awaiting.delay(d);
61
}
62
log?.(`FAILED: timeout -- ${timeout} ms`);
63
throw Error(`timeout -- ${timeout} ms`);
64
}
65
66
export { asyncDebounce, asyncThrottle } from "./async-debounce-throttle";
67
68
// turns a function of opts, which has a cb input into
69
// an async function that takes an opts with no cb as input; this is just like
70
// awaiting.callback, but for our functions that take opts.
71
// WARNING: this is different than callback from awaiting, which
72
// on which you do: callback(f, args...)
73
// With callback_opts, you do: callback_opts(f)(opts)
74
// TODO: maybe change this everwhere to callback_opts(f, opts) for consistency!
75
export function callback_opts(f: Function) {
76
return async function (opts?: any): Promise<any> {
77
if (opts === undefined) {
78
opts = {};
79
}
80
function g(cb: Function) {
81
opts.cb = cb;
82
f(opts);
83
}
84
return await awaiting.callback(g);
85
};
86
}
87
88
/* retry_until_success keeps calling an async function f with
89
exponential backoff until f does NOT raise an exception.
90
Then retry_until_success returns whatever f returned.
91
*/
92
93
interface RetryUntilSuccess<T> {
94
f: () => Promise<T>; // an async function that takes no input.
95
start_delay?: number; // milliseconds -- delay before calling second time.
96
max_delay?: number; // milliseconds -- delay at most this amount between calls
97
max_tries?: number; // maximum number of times to call f
98
max_time?: number; // milliseconds -- don't call f again if the call would start after this much time from first call
99
factor?: number; // multiply delay by this each time
100
log?: Function; // optional verbose logging function
101
desc?: string; // useful for making error messages better.
102
}
103
104
export async function retry_until_success<T>(
105
opts: RetryUntilSuccess<T>,
106
): Promise<T> {
107
if (!opts.start_delay) opts.start_delay = 100;
108
if (!opts.max_delay) opts.max_delay = 20000;
109
if (!opts.factor) opts.factor = 1.4;
110
111
let next_delay: number = opts.start_delay;
112
let tries: number = 0;
113
const start_time: number = Date.now();
114
let last_exc: Error | undefined;
115
116
// Return nonempty string if time or tries exceeded.
117
function check_done(): string {
118
if (opts.max_time && next_delay + Date.now() - start_time > opts.max_time) {
119
return "maximum time exceeded";
120
}
121
if (opts.max_tries && tries >= opts.max_tries) {
122
return "maximum tries exceeded";
123
}
124
return "";
125
}
126
127
while (true) {
128
try {
129
return await opts.f();
130
} catch (exc) {
131
//console.warn('retry_until_success', exc);
132
if (opts.log !== undefined) {
133
opts.log("failed ", exc);
134
}
135
// might try again -- update state...
136
tries += 1;
137
next_delay = Math.min(opts.max_delay, opts.factor * next_delay);
138
// check if too long or too many tries
139
const err = check_done();
140
if (err) {
141
// yep -- game over, throw an error
142
let e;
143
if (last_exc) {
144
e = Error(`${err} -- last error was '${last_exc}' -- ${opts.desc}`);
145
} else {
146
e = Error(`${err} -- ${opts.desc}`);
147
}
148
//console.warn(e);
149
throw e;
150
}
151
// record exception so can use it later.
152
last_exc = exc;
153
154
// wait before trying again
155
await awaiting.delay(next_delay);
156
}
157
}
158
}
159
160
import { EventEmitter } from "events";
161
import { CB } from "./types/database";
162
163
export class TimeoutError extends Error {
164
code: number;
165
constructor(mesg: string) {
166
super(mesg);
167
this.code = 408;
168
}
169
}
170
171
/* Wait for an event emitter to emit any event at all once.
172
Returns array of args emitted by that event.
173
If timeout_ms is 0 (the default) this can wait an unbounded
174
amount of time. That's intentional and does make sense
175
in our applications.
176
If timeout_ms is nonzero and event doesn't happen an
177
exception is thrown.
178
If the obj throws 'closed' before the event is emitted,
179
then this throws an error, since clearly event can never be emitted.
180
*/
181
export async function once(
182
obj: EventEmitter,
183
event: string,
184
timeout_ms: number | undefined = 0,
185
): Promise<any> {
186
if (obj == null) throw Error("once -- obj is undefined");
187
if (timeout_ms == null) {
188
// clients might explicitly pass in undefined, but below we expect 0 to mean "no timeout"
189
timeout_ms = 0;
190
}
191
if (typeof obj.once != "function")
192
throw Error("once -- obj.once must be a function");
193
194
return new Promise((resolve, reject) => {
195
let timer: NodeJS.Timeout | undefined;
196
197
function cleanup() {
198
obj.removeListener(event, onEvent);
199
obj.removeListener("closed", onClosed);
200
if (timer) clearTimeout(timer);
201
}
202
203
function onEvent(...args: any[]) {
204
cleanup();
205
resolve(args);
206
}
207
208
function onClosed() {
209
cleanup();
210
reject(new TimeoutError(`once: "${event}" not emitted before "closed"`));
211
}
212
213
function onTimeout() {
214
cleanup();
215
reject(
216
new TimeoutError(
217
`once: timeout of ${timeout_ms}ms waiting for "${event}"`,
218
),
219
);
220
}
221
222
obj.once(event, onEvent);
223
obj.once("closed", onClosed);
224
225
if (timeout_ms > 0) {
226
timer = setTimeout(onTimeout, timeout_ms);
227
}
228
});
229
}
230
231
// Alternative to callback_opts that behaves like the callback defined in awaiting.
232
// Pass in the type of the returned value, and it will be inferred.
233
export async function callback2<R = any>(
234
f: (opts) => void,
235
opts?: object,
236
): Promise<R> {
237
const optsCB = (opts ?? {}) as typeof opts & { cb: CB<R> };
238
function g(cb: CB<R>): void {
239
optsCB.cb = cb;
240
f(optsCB);
241
}
242
return await awaiting.callback(g);
243
}
244
245
export function reuse_in_flight_methods(
246
obj: any,
247
method_names: string[],
248
): void {
249
for (const method_name of method_names) {
250
obj[method_name] = reuseInFlight(obj[method_name].bind(obj));
251
}
252
}
253
254
// Cancel pending throttle or debounce, where f is the
255
// output of underscore.throttle (or debounce). Safe to call
256
// with f null or a normal function.
257
export function cancel_scheduled(f: any): void {
258
if (f != null && f.cancel != null) {
259
f.cancel();
260
}
261
}
262
263
// WARNING -- not tested
264
export async function async_as_callback(
265
f: Function,
266
cb: Function,
267
...args
268
): Promise<void> {
269
try {
270
await f(...args);
271
cb();
272
} catch (err) {
273
cb(err);
274
}
275
}
276
277
// From https://stackoverflow.com/questions/70470728/how-can-i-execute-some-async-tasks-in-parallel-with-limit-in-generator-function
278
export async function mapParallelLimit(values, fn, max = 10) {
279
const promises = new Set();
280
281
for (const i in values) {
282
while (promises.size >= max) {
283
await Promise.race(promises.values());
284
}
285
286
let promise = fn(values[i], i).finally(() => promises.delete(promise));
287
promises.add(promise);
288
}
289
290
return Promise.all(promises.values());
291
}
292
293
export async function parallelHandler({
294
iterable,
295
limit,
296
handle,
297
}: {
298
iterable: AsyncIterable<any>;
299
limit: number;
300
handle: (any) => Promise<void>;
301
}) {
302
const promiseQueue: Promise<void>[] = [];
303
for await (const mesg of iterable) {
304
const promise = handle(mesg).then(() => {
305
// Remove the promise from the promiseQueue once done
306
promiseQueue.splice(promiseQueue.indexOf(promise), 1);
307
});
308
promiseQueue.push(promise);
309
// If we reached the PARALLEL limit, wait for one of the
310
// promises to resolve
311
if (promiseQueue.length >= limit) {
312
await Promise.race(promiseQueue);
313
}
314
}
315
// Wait for all remaining promises to finish
316
await Promise.all(promiseQueue);
317
}
318
319
// use it like this:
320
// resp = await withTimeout(promise, 3000);
321
// and if will throw a timeout if promise takes more than 3s to resolve,
322
// though of course whatever code is running in promise doesn't actually
323
// get interrupted.
324
export async function withTimeout(p: Promise<any>, ms: number) {
325
let afterFired = false;
326
p.catch((err) => {
327
if (afterFired) {
328
console.warn("WARNING: withTimeout promise rejected", err);
329
}
330
});
331
let to;
332
return Promise.race([
333
p,
334
new Promise(
335
(_, reject) =>
336
(to = setTimeout(() => {
337
afterFired = true;
338
reject(new Error("timeout"));
339
}, ms)),
340
),
341
]).finally(() => clearTimeout(to));
342
}
343
344