CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/util/aggregate.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
/*
7
* decaffeinate suggestions:
8
* DS102: Remove unnecessary code created because of implicit returns
9
* DS205: Consider reworking code to avoid use of IIFEs
10
* DS207: Consider shorter variations of null checks
11
* Full docs: https://github.com/decaffeinate/decaffeinate/blob/master/docs/suggestions.md
12
*/
13
/*
14
Async aggregate.
15
16
Use like this:
17
18
g = aggregate (opts) ->
19
[bunch of code here...]
20
21
Given a function f that takes an object of the form
22
23
{foo:?, bar:?, other:?,..., cb:...}
24
25
as input -- where everything except cb is JSON-able, make a new function g that
26
takes as input
27
28
{foo:?, bar:?, other:?,..., cb:..., aggregate:?}
29
30
If you call g without setting aggregate, then f is just called as usual.
31
32
If you call g with aggregate set to a nondecreasing input (e.g., sequence
33
numbers or timestamp), then f only gets evaluated *once*. E.g., if you do
34
35
g(x:0, aggregate:0, cb:a)
36
...
37
g(x:0, aggregate:0, cb:b)
38
g(x:1, aggregate:0, cb:c)
39
...
40
g(x:0, aggregate:1, cb:d)
41
g(x:0, aggregate:0, cb:e) # no reason to do this; it's best if aggregate is a nondecreasing sequence (NOT required though).
42
43
Then:
44
45
- f(x:0,cb:?) gets called once and both a and b are called with that one result.
46
- f(x:1,cb:?) gets called once and c called with the result. This happens in
47
parallel with the above call to f(x:0,cb:?).
48
- f(x:0,cb:?) gets called once MORE and d is called with the result.
49
This final call only happens once the call to f(x:0,cb:?) finishes.
50
- g(x:0, aggregate:0, cb:e) results in just getting added to the cb's for f(x:0,cb:?),
51
if that call is still running; if not, f may or may not get called again, depending
52
on how much later (recent results are cached).
53
54
OPTIONS:
55
56
You can also do
57
58
aggregate(options, (opts) -> ...)
59
60
Where options is an object.
61
62
options = {omit: ['keys', 'of', 'opts', 'to', 'omit', 'in', 'comparing', 'inputs']}
63
64
*/
65
66
const json_stable = require("json-stable-stringify");
67
68
import { copy_without, field_cmp } from "./misc";
69
70
// To avoid using up too much memory, results are cached at most this long
71
// (so long as function is called periodically to clear the cache... if not,
72
// no point in clearing, since won't grow much.)
73
const DONE_CACHE_TIMEOUT_MS = 60000;
74
75
function clear_old(done) {
76
const now = Date.now();
77
for (let key in done) {
78
const s = done[key];
79
if (now - s.time >= DONE_CACHE_TIMEOUT_MS) {
80
delete done[key];
81
}
82
}
83
}
84
85
// Return true if a<=b.
86
// Except... a special case. If a is an object with a value attribute,
87
// return true only if a.value is equal to b.value.
88
// We use this so that aggregate can get recomputed for any change in aggregate,
89
// instead of requiring an increasing sequence of aggregate values.
90
function leq(a, b) {
91
if (typeof a === "object" && a.value != null) {
92
return a.value === b.value;
93
}
94
return a <= b;
95
}
96
97
export function aggregate(options, f?: any) {
98
if (f == null) {
99
f = options;
100
options = undefined;
101
}
102
if (typeof f !== "function") {
103
throw Error("f must be a function");
104
}
105
106
const state = {}; // in the closure, so scope is that of this function we are making below.
107
const done = {};
108
const omitted_fields = ["cb", "aggregate"];
109
if (options != null && options.omit) {
110
for (let field of options.omit) {
111
omitted_fields.push(field);
112
}
113
}
114
115
function just_call_f(opts) {
116
// Fallback behavior **without aggregate**. Used below when aggregate not set.
117
// This just deletes aggregate from opts and calls f.
118
delete opts.aggregate;
119
f(opts);
120
}
121
122
function aggregate_call_f(opts) {
123
// Key is a string that determines the inputs to f **that matter**.
124
const key: string = json_stable(copy_without(opts, omitted_fields));
125
// Check state
126
const current = state[key];
127
const recent = done[key];
128
if (recent != null && leq(opts.aggregate, recent.aggregate)) {
129
// result is known from a previous call.
130
opts.cb(...recent.args);
131
return;
132
}
133
if (current != null) {
134
// Call already in progress with given exactly the same inputs.
135
if (leq(opts.aggregate, current.aggregate)) {
136
// already running with old enough aggregate value -- just wait and return as part of that
137
current.callbacks.push(opts.cb);
138
} else {
139
// already running, but newer aggregate value. We will run this one once the current one is done.
140
current.next.push(opts);
141
}
142
return;
143
}
144
145
// Setup state, do the call, and call callbacks when done, then possibly
146
// call f again in case new requests came in during the call.
147
148
// Nothing is going on right now with the given key. Evaluate f.
149
state[key] = {
150
aggregate: opts.aggregate, // aggregate value for current run
151
next: [], // things requested to be run in the future -- these are opts with same key
152
callbacks: [opts.cb], // callbacks to call when this evaluation completes
153
time: new Date(),
154
args: undefined,
155
};
156
157
// This gets called when f completes.
158
opts.cb = function (...args) {
159
const { callbacks, next } = state[key];
160
done[key] = state[key];
161
done[key].args = args;
162
clear_old(done);
163
delete state[key];
164
// Call all the callbacks for which the result of running f is sufficient.
165
for (let cb of callbacks) {
166
if (typeof cb === "function") {
167
cb(...args);
168
}
169
}
170
if (next.length > 0) {
171
// Setup new call, since new requests came in during this call to f, which couldn't
172
// be handled via this call.
173
// Sort by aggregate from bigger to small
174
next.sort(field_cmp("aggregate"));
175
next.reverse();
176
// And just do the calls, which will add these to the new state[key].callbacks
177
next.map((opts0) => aggregate_call_f(opts0));
178
}
179
};
180
181
// Finaly actually run f, which eventually calls opts.cb, which is defined directly above.
182
f(opts);
183
}
184
185
// Construct and return new function: if called with aggregate not
186
// set or false-ish, just calls f. Othwerwise, aggregates calls.
187
return function (opts) {
188
if (opts.aggregate == null) {
189
just_call_f(opts);
190
} else {
191
aggregate_call_f(opts);
192
}
193
};
194
}
195
196