Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/util/async-utils.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6Some async utils.78(Obviously should be moved somewhere else when the dust settles!)910The two helpful async/await libraries I found are:1112- https://github.com/hunterloftis/awaiting13- https://github.com/masotime/async-await-utils1415*/1617import * as awaiting from "awaiting";1819import { reuseInFlight } from "./reuse-in-flight";2021// turns a function of opts, which has a cb input into22// an async function that takes an opts with no cb as input; this is just like23// awaiting.callback, but for our functions that take opts.24// WARNING: this is different than callback from awaiting, which25// on which you do: callback(f, args...)26// With callback_opts, you do: callback_opts(f)(opts)27// TODO: maybe change this everwhere to callback_opts(f, opts) for consistency!28export function callback_opts(f: Function) {29return async function (opts?: any): Promise<any> {30if (opts === undefined) {31opts = {};32}33function g(cb: Function) {34opts.cb = cb;35f(opts);36}37return await awaiting.callback(g);38};39}4041/**42* convert the given error to a string, by either serializing the object or returning the string as it is43*/44function err2str(err: any): string {45if (typeof err === "string") {46return err;47} else {48return JSON.stringify(err);49}50}5152/* retry_until_success keeps calling an async function f with53exponential backoff until f does NOT raise an exception.54Then retry_until_success returns whatever f returned.55*/5657interface RetryUntilSuccess<T> {58f: () => Promise<T>; // an async function that takes no input.59start_delay?: number; // milliseconds -- delay before calling second time.60max_delay?: number; // milliseconds -- delay at most this amount between calls61max_tries?: number; // maximum number of times to call f62max_time?: number; // milliseconds -- don't call f again if the call would start after this much time from first call63factor?: number; // multiply delay by this each time64log?: Function; // optional verbose logging function65desc?: string; // useful for making error messages better.66}6768export async function retry_until_success<T>(69opts: RetryUntilSuccess<T>,70): Promise<T> {71if (!opts.start_delay) opts.start_delay = 100;72if (!opts.max_delay) opts.max_delay = 20000;73if (!opts.factor) opts.factor = 1.4;7475let next_delay: number = opts.start_delay;76let tries: number = 0;77const start_time: number = Date.now();78let last_exc: Error | undefined;7980// Return nonempty string if time or tries exceeded.81function check_done(): string {82if (83opts.max_time &&84next_delay + Date.now() - start_time > opts.max_time85) {86return "maximum time exceeded";87}88if (opts.max_tries && tries >= opts.max_tries) {89return "maximum tries exceeded";90}91return "";92}9394while (true) {95try {96return await opts.f();97} catch (exc) {98//console.warn('retry_until_success', exc);99if (opts.log !== undefined) {100opts.log("failed ", exc);101}102// might try again -- update state...103tries += 1;104next_delay = Math.min(opts.max_delay, opts.factor * next_delay);105// check if too long or too many tries106const err = check_done();107if (err) {108// yep -- game over, throw an error109let e;110if (last_exc) {111e = Error(112`${err} -- last error was ${err2str(last_exc)} -- ${opts.desc}`,113);114} else {115e = Error(`${err} -- ${opts.desc}`);116}117//console.warn(e);118throw e;119}120// record exception so can use it later.121last_exc = exc;122123// wait before trying again124await awaiting.delay(next_delay);125}126}127}128129import { EventEmitter } from "events";130import { CB } from "./types/database";131132/* Wait for an event emitter to emit any event at all once.133Returns array of args emitted by that event.134If timeout_ms is 0 (the default) this can wait an unbounded135amount of time. That's intentional and does make sense136in our applications.137If timeout_ms is nonzero and event doesn't happen an138exception is thrown.139*/140export async function once(141obj: EventEmitter,142event: string,143timeout_ms: number = 0,144): Promise<any> {145if (!(obj instanceof EventEmitter)) {146// just in case typescript doesn't catch something:147throw Error("obj must be an EventEmitter");148}149if (timeout_ms > 0) {150// just to keep both versions more readable...151return once_with_timeout(obj, event, timeout_ms);152}153let val: any[] = [];154function wait(cb: Function): void {155obj.once(event, function (...args): void {156val = args;157cb();158});159}160await awaiting.callback(wait);161return val;162}163164async function once_with_timeout(165obj: EventEmitter,166event: string,167timeout_ms: number,168): Promise<any> {169let val: any[] = [];170function wait(cb: Function): void {171function fail(): void {172obj.removeListener(event, handler);173cb("timeout");174}175const timer = setTimeout(fail, timeout_ms);176function handler(...args): void {177clearTimeout(timer);178val = args;179cb();180}181obj.once(event, handler);182}183await awaiting.callback(wait);184return val;185}186187// Alternative to callback_opts that behaves like the callback defined in awaiting.188// Pass in the type of the returned value, and it will be inferred.189export async function callback2<R = any>(190f: (opts) => void,191opts?: object,192): Promise<R> {193const optsCB = (opts ?? {}) as typeof opts & { cb: CB<R> };194function g(cb: CB<R>): void {195optsCB.cb = cb;196f(optsCB);197}198return await awaiting.callback(g);199}200201export function reuse_in_flight_methods(202obj: any,203method_names: string[],204): void {205for (const method_name of method_names) {206obj[method_name] = reuseInFlight(obj[method_name].bind(obj));207}208}209210// Cancel pending throttle or debounce, where f is the211// output of underscore.throttle (or debounce). Safe to call212// with f null or a normal function.213export function cancel_scheduled(f: any): void {214if (f != null && f.cancel != null) {215f.cancel();216}217}218219// WARNING -- not tested220export async function async_as_callback(221f: Function,222cb: Function,223...args224): Promise<void> {225try {226await f(...args);227cb();228} catch (err) {229cb(err);230}231}232233// From https://stackoverflow.com/questions/70470728/how-can-i-execute-some-async-tasks-in-parallel-with-limit-in-generator-function234export async function mapParallelLimit(values, fn, max = 10) {235const promises = new Set();236237for (const i in values) {238while (promises.size >= max) {239await Promise.race(promises.values());240}241242let promise = fn(values[i], i).finally(() => promises.delete(promise));243promises.add(promise);244}245246return Promise.all(promises.values());247}248249250