Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/util/aggregate.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45/*6* decaffeinate suggestions:7* DS102: Remove unnecessary code created because of implicit returns8* DS205: Consider reworking code to avoid use of IIFEs9* DS207: Consider shorter variations of null checks10* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md11*/12/*13Async aggregate.1415Use like this:1617g = aggregate (opts) ->18[bunch of code here...]1920Given a function f that takes an object of the form2122{foo:?, bar:?, other:?,..., cb:...}2324as input -- where everything except cb is JSON-able, make a new function g that25takes as input2627{foo:?, bar:?, other:?,..., cb:..., aggregate:?}2829If you call g without setting aggregate, then f is just called as usual.3031If you call g with aggregate set to a nondecreasing input (e.g., sequence32numbers or timestamp), then f only gets evaluated *once*. E.g., if you do3334g(x:0, aggregate:0, cb:a)35...36g(x:0, aggregate:0, cb:b)37g(x:1, aggregate:0, cb:c)38...39g(x:0, aggregate:1, cb:d)40g(x:0, aggregate:0, cb:e) # no reason to do this; it's best if aggregate is a nondecreasing sequence (NOT required though).4142Then:4344- f(x:0,cb:?) gets called once and both a and b are called with that one result.45- f(x:1,cb:?) gets called once and c called with the result. This happens in46parallel with the above call to f(x:0,cb:?).47- f(x:0,cb:?) gets called once MORE and d is called with the result.48This final call only happens once the call to f(x:0,cb:?) finishes.49- g(x:0, aggregate:0, cb:e) results in just getting added to the cb's for f(x:0,cb:?),50if that call is still running; if not, f may or may not get called again, depending51on how much later (recent results are cached).5253OPTIONS:5455You can also do5657aggregate(options, (opts) -> ...)5859Where options is an object.6061options = {omit: ['keys', 'of', 'opts', 'to', 'omit', 'in', 'comparing', 'inputs']}6263*/6465const json_stable = require("json-stable-stringify");6667import { copy_without, field_cmp } from "./misc";6869// To avoid using up too much memory, results are cached at most this long70// (so long as function is called periodically to clear the cache... if not,71// no point in clearing, since won't grow much.)72const DONE_CACHE_TIMEOUT_MS = 60000;7374function clear_old(done) {75const now = Date.now();76for (let key in done) {77const s = done[key];78if (now - s.time >= DONE_CACHE_TIMEOUT_MS) {79delete done[key];80}81}82}8384// Return true if a<=b.85// Except... a special case. If a is an object with a value attribute,86// return true only if a.value is equal to b.value.87// We use this so that aggregate can get recomputed for any change in aggregate,88// instead of requiring an increasing sequence of aggregate values.89function leq(a, b) {90if (typeof a === "object" && a.value != null) {91return a.value === b.value;92}93return a <= b;94}9596export function aggregate(options, f?: any) {97if (f == null) {98f = options;99options = undefined;100}101if (typeof f !== "function") {102throw Error("f must be a function");103}104105const state = {}; // in the closure, so scope is that of this function we are making below.106const done = {};107const omitted_fields = ["cb", "aggregate"];108if (options != null && options.omit) {109for (let field of options.omit) {110omitted_fields.push(field);111}112}113114function just_call_f(opts) {115// Fallback behavior **without aggregate**. Used below when aggregate not set.116// This just deletes aggregate from opts and calls f.117delete opts.aggregate;118f(opts);119}120121function aggregate_call_f(opts) {122// Key is a string that determines the inputs to f **that matter**.123const key: string = json_stable(copy_without(opts, omitted_fields));124// Check state125const current = state[key];126const recent = done[key];127if (recent != null && leq(opts.aggregate, recent.aggregate)) {128// result is known from a previous call.129opts.cb(...recent.args);130return;131}132if (current != null) {133// Call already in progress with given exactly the same inputs.134if (leq(opts.aggregate, current.aggregate)) {135// already running with old enough aggregate value -- just wait and return as part of that136current.callbacks.push(opts.cb);137} else {138// already running, but newer aggregate value. We will run this one once the current one is done.139current.next.push(opts);140}141return;142}143144// Setup state, do the call, and call callbacks when done, then possibly145// call f again in case new requests came in during the call.146147// Nothing is going on right now with the given key. Evaluate f.148state[key] = {149aggregate: opts.aggregate, // aggregate value for current run150next: [], // things requested to be run in the future -- these are opts with same key151callbacks: [opts.cb], // callbacks to call when this evaluation completes152time: new Date(),153args: undefined,154};155156// This gets called when f completes.157opts.cb = function (...args) {158const { callbacks, next } = state[key];159done[key] = state[key];160done[key].args = args;161clear_old(done);162delete state[key];163// Call all the callbacks for which the result of running f is sufficient.164for (let cb of callbacks) {165if (typeof cb === "function") {166cb(...args);167}168}169if (next.length > 0) {170// Setup new call, since new requests came in during this call to f, which couldn't171// be handled via this call.172// Sort by aggregate from bigger to small173next.sort(field_cmp("aggregate"));174next.reverse();175// And just do the calls, which will add these to the new state[key].callbacks176next.map((opts0) => aggregate_call_f(opts0));177}178};179180// Finaly actually run f, which eventually calls opts.cb, which is defined directly above.181f(opts);182}183184// Construct and return new function: if called with aggregate not185// set or false-ish, just calls f. Othwerwise, aggregates calls.186return function (opts) {187if (opts.aggregate == null) {188just_call_f(opts);189} else {190aggregate_call_f(opts);191}192};193}194195196