CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/util/async-utils.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
Some async utils.
8
9
(Obviously should be moved somewhere else when the dust settles!)
10
11
The two helpful async/await libraries I found are:
12
13
- https://github.com/hunterloftis/awaiting
14
- https://github.com/masotime/async-await-utils
15
16
*/
17
18
import * as awaiting from "awaiting";
19
20
import { reuseInFlight } from "./reuse-in-flight";
21
22
// turns a function of opts, which has a cb input into
23
// an async function that takes an opts with no cb as input; this is just like
24
// awaiting.callback, but for our functions that take opts.
25
// WARNING: this is different than callback from awaiting, which
26
// on which you do: callback(f, args...)
27
// With callback_opts, you do: callback_opts(f)(opts)
28
// TODO: maybe change this everwhere to callback_opts(f, opts) for consistency!
29
export function callback_opts(f: Function) {
30
return async function (opts?: any): Promise<any> {
31
if (opts === undefined) {
32
opts = {};
33
}
34
function g(cb: Function) {
35
opts.cb = cb;
36
f(opts);
37
}
38
return await awaiting.callback(g);
39
};
40
}
41
42
/**
43
* convert the given error to a string, by either serializing the object or returning the string as it is
44
*/
45
function err2str(err: any): string {
46
if (typeof err === "string") {
47
return err;
48
} else {
49
return JSON.stringify(err);
50
}
51
}
52
53
/* retry_until_success keeps calling an async function f with
54
exponential backoff until f does NOT raise an exception.
55
Then retry_until_success returns whatever f returned.
56
*/
57
58
interface RetryUntilSuccess<T> {
59
f: () => Promise<T>; // an async function that takes no input.
60
start_delay?: number; // milliseconds -- delay before calling second time.
61
max_delay?: number; // milliseconds -- delay at most this amount between calls
62
max_tries?: number; // maximum number of times to call f
63
max_time?: number; // milliseconds -- don't call f again if the call would start after this much time from first call
64
factor?: number; // multiply delay by this each time
65
log?: Function; // optional verbose logging function
66
desc?: string; // useful for making error messages better.
67
}
68
69
export async function retry_until_success<T>(
70
opts: RetryUntilSuccess<T>,
71
): Promise<T> {
72
if (!opts.start_delay) opts.start_delay = 100;
73
if (!opts.max_delay) opts.max_delay = 20000;
74
if (!opts.factor) opts.factor = 1.4;
75
76
let next_delay: number = opts.start_delay;
77
let tries: number = 0;
78
const start_time: number = Date.now();
79
let last_exc: Error | undefined;
80
81
// Return nonempty string if time or tries exceeded.
82
function check_done(): string {
83
if (
84
opts.max_time &&
85
next_delay + Date.now() - start_time > opts.max_time
86
) {
87
return "maximum time exceeded";
88
}
89
if (opts.max_tries && tries >= opts.max_tries) {
90
return "maximum tries exceeded";
91
}
92
return "";
93
}
94
95
while (true) {
96
try {
97
return await opts.f();
98
} catch (exc) {
99
//console.warn('retry_until_success', exc);
100
if (opts.log !== undefined) {
101
opts.log("failed ", exc);
102
}
103
// might try again -- update state...
104
tries += 1;
105
next_delay = Math.min(opts.max_delay, opts.factor * next_delay);
106
// check if too long or too many tries
107
const err = check_done();
108
if (err) {
109
// yep -- game over, throw an error
110
let e;
111
if (last_exc) {
112
e = Error(
113
`${err} -- last error was ${err2str(last_exc)} -- ${opts.desc}`,
114
);
115
} else {
116
e = Error(`${err} -- ${opts.desc}`);
117
}
118
//console.warn(e);
119
throw e;
120
}
121
// record exception so can use it later.
122
last_exc = exc;
123
124
// wait before trying again
125
await awaiting.delay(next_delay);
126
}
127
}
128
}
129
130
import { EventEmitter } from "events";
131
import { CB } from "./types/database";
132
133
/* Wait for an event emitter to emit any event at all once.
134
Returns array of args emitted by that event.
135
If timeout_ms is 0 (the default) this can wait an unbounded
136
amount of time. That's intentional and does make sense
137
in our applications.
138
If timeout_ms is nonzero and event doesn't happen an
139
exception is thrown.
140
*/
141
export async function once(
142
obj: EventEmitter,
143
event: string,
144
timeout_ms: number = 0,
145
): Promise<any> {
146
if (!(obj instanceof EventEmitter)) {
147
// just in case typescript doesn't catch something:
148
throw Error("obj must be an EventEmitter");
149
}
150
if (timeout_ms > 0) {
151
// just to keep both versions more readable...
152
return once_with_timeout(obj, event, timeout_ms);
153
}
154
let val: any[] = [];
155
function wait(cb: Function): void {
156
obj.once(event, function (...args): void {
157
val = args;
158
cb();
159
});
160
}
161
await awaiting.callback(wait);
162
return val;
163
}
164
165
async function once_with_timeout(
166
obj: EventEmitter,
167
event: string,
168
timeout_ms: number,
169
): Promise<any> {
170
let val: any[] = [];
171
function wait(cb: Function): void {
172
function fail(): void {
173
obj.removeListener(event, handler);
174
cb("timeout");
175
}
176
const timer = setTimeout(fail, timeout_ms);
177
function handler(...args): void {
178
clearTimeout(timer);
179
val = args;
180
cb();
181
}
182
obj.once(event, handler);
183
}
184
await awaiting.callback(wait);
185
return val;
186
}
187
188
// Alternative to callback_opts that behaves like the callback defined in awaiting.
189
// Pass in the type of the returned value, and it will be inferred.
190
export async function callback2<R = any>(
191
f: (opts) => void,
192
opts?: object,
193
): Promise<R> {
194
const optsCB = (opts ?? {}) as typeof opts & { cb: CB<R> };
195
function g(cb: CB<R>): void {
196
optsCB.cb = cb;
197
f(optsCB);
198
}
199
return await awaiting.callback(g);
200
}
201
202
export function reuse_in_flight_methods(
203
obj: any,
204
method_names: string[],
205
): void {
206
for (const method_name of method_names) {
207
obj[method_name] = reuseInFlight(obj[method_name].bind(obj));
208
}
209
}
210
211
// Cancel pending throttle or debounce, where f is the
212
// output of underscore.throttle (or debounce). Safe to call
213
// with f null or a normal function.
214
export function cancel_scheduled(f: any): void {
215
if (f != null && f.cancel != null) {
216
f.cancel();
217
}
218
}
219
220
// WARNING -- not tested
221
export async function async_as_callback(
222
f: Function,
223
cb: Function,
224
...args
225
): Promise<void> {
226
try {
227
await f(...args);
228
cb();
229
} catch (err) {
230
cb(err);
231
}
232
}
233
234
// From https://stackoverflow.com/questions/70470728/how-can-i-execute-some-async-tasks-in-parallel-with-limit-in-generator-function
235
export async function mapParallelLimit(values, fn, max = 10) {
236
const promises = new Set();
237
238
for (const i in values) {
239
while (promises.size >= max) {
240
await Promise.race(promises.values());
241
}
242
243
let promise = fn(values[i], i).finally(() => promises.delete(promise));
244
promises.add(promise);
245
}
246
247
return Promise.all(promises.values());
248
}
249
250