Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/dstream.ts
1710 views
1
/*
2
Eventually Consistent Distributed Message Stream
3
4
DEVELOPMENT:
5
6
7
# in node -- note the package directory!!
8
~/cocalc/src/packages/backend node
9
10
> s = await require("@cocalc/backend/conat/sync").dstream({name:'test'});
11
> s = await require("@cocalc/backend/conat/sync").dstream({project_id:cc.current().project_id,name:'foo'});0
12
13
See the guide for dkv, since it's very similar, especially for use in a browser.
14
*/
15
16
import { EventEmitter } from "events";
17
import {
18
CoreStream,
19
type RawMsg,
20
type ChangeEvent,
21
type PublishOptions,
22
} from "./core-stream";
23
import { randomId } from "@cocalc/conat/names";
24
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
25
import { isNumericString } from "@cocalc/util/misc";
26
import refCache from "@cocalc/util/refcache";
27
import {
28
type Client,
29
type Headers,
30
ConatError,
31
} from "@cocalc/conat/core/client";
32
import jsonStableStringify from "json-stable-stringify";
33
import type { JSONValue } from "@cocalc/util/types";
34
import { Configuration } from "./core-stream";
35
import { conat } from "@cocalc/conat/client";
36
import { delay, map as awaitMap } from "awaiting";
37
import { asyncThrottle, until } from "@cocalc/util/async-utils";
38
import {
39
inventory,
40
type Inventory,
41
INVENTORY_UPDATE_INTERVAL,
42
} from "./inventory";
43
import { getLogger } from "@cocalc/conat/client";
44
45
const logger = getLogger("sync:dstream");
46
47
export interface DStreamOptions {
48
// what it's called by us
49
name: string;
50
account_id?: string;
51
project_id?: string;
52
config?: Partial<Configuration>;
53
// only load historic messages starting at the given seq number.
54
start_seq?: number;
55
desc?: JSONValue;
56
57
client?: Client;
58
noAutosave?: boolean;
59
ephemeral?: boolean;
60
sync?: boolean;
61
62
noCache?: boolean;
63
noInventory?: boolean;
64
65
service?: string;
66
}
67
68
export class DStream<T = any> extends EventEmitter {
69
public readonly name: string;
70
private stream: CoreStream;
71
private messages: T[];
72
private raw: RawMsg[];
73
private noAutosave: boolean;
74
// TODO: using Map for these will be better because we use .length a bunch, which is O(n) instead of O(1).
75
private local: { [id: string]: T } = {};
76
private publishOptions: {
77
[id: string]: { headers?: Headers };
78
} = {};
79
private saved: { [seq: number]: T } = {};
80
private opts: DStreamOptions;
81
82
constructor(opts: DStreamOptions) {
83
super();
84
logger.debug("constructor", opts.name);
85
if (opts.client == null) {
86
throw Error("client must be specified");
87
}
88
this.opts = opts;
89
this.noAutosave = !!opts.noAutosave;
90
this.name = opts.name;
91
this.stream = new CoreStream(opts);
92
this.messages = this.stream.messages;
93
this.raw = this.stream.raw;
94
return new Proxy(this, {
95
get(target, prop) {
96
return typeof prop == "string" && isNumericString(prop)
97
? target.get(parseInt(prop))
98
: target[String(prop)];
99
},
100
});
101
}
102
103
private initialized = false;
104
init = async () => {
105
if (this.initialized) {
106
throw Error("init can only be called once");
107
}
108
this.initialized = true;
109
if (this.isClosed()) {
110
throw Error("closed");
111
}
112
this.stream.on("change", this.handleChange);
113
this.stream.on("reset", () => {
114
this.local = {};
115
this.saved = {};
116
});
117
await this.stream.init();
118
this.emit("connected");
119
};
120
121
private handleChange = ({ mesg, raw, msgID }: ChangeEvent<T>) => {
122
if (raw?.seq !== undefined) {
123
delete this.saved[raw.seq];
124
}
125
if (mesg === undefined) {
126
return;
127
}
128
if (msgID) {
129
// this is critical with core-stream.ts, since otherwise there is a moment
130
// when the same message is in both this.local *and* this.messages, and you'll
131
// see it doubled in this.getAll().
132
delete this.local[msgID];
133
}
134
this.emit("change", mesg, raw?.seq);
135
if (this.isStable()) {
136
this.emit("stable");
137
}
138
};
139
140
isStable = () => {
141
for (const _ in this.saved) {
142
return false;
143
}
144
for (const _ in this.local) {
145
return false;
146
}
147
return true;
148
};
149
150
isClosed = () => {
151
return this.stream == null;
152
};
153
154
close = () => {
155
if (this.isClosed()) {
156
return;
157
}
158
logger.debug("close", this.name);
159
const stream = this.stream;
160
stream.removeListener("change", this.handleChange);
161
// @ts-ignore
162
delete this.stream;
163
stream.close();
164
this.emit("closed");
165
this.removeAllListeners();
166
// @ts-ignore
167
delete this.local;
168
// @ts-ignore
169
delete this.messages;
170
// @ts-ignore
171
delete this.raw;
172
// @ts-ignore
173
delete this.opts;
174
};
175
176
get = (n?): T | T[] => {
177
if (this.isClosed()) {
178
throw Error("closed");
179
}
180
if (n == null) {
181
return this.getAll();
182
} else {
183
if (n < this.messages.length) {
184
return this.messages[n];
185
}
186
const v = Object.keys(this.saved);
187
if (n < v.length + this.messages.length) {
188
return this.saved[n - this.messages.length];
189
}
190
return Object.values(this.local)[n - this.messages.length - v.length];
191
}
192
};
193
194
getAll = (): T[] => {
195
if (this.isClosed()) {
196
throw Error("closed");
197
}
198
return [
199
...this.messages,
200
...Object.values(this.saved),
201
...Object.values(this.local),
202
];
203
};
204
205
// sequence number of n-th message
206
seq = (n: number): number | undefined => {
207
if (n < this.raw.length) {
208
return this.raw[n].seq;
209
}
210
const v = Object.keys(this.saved);
211
if (n < v.length + this.raw.length) {
212
return parseInt(v[n - this.raw.length]);
213
}
214
};
215
216
// all sequences numbers of messages
217
seqs = (): number[] => {
218
const seqs = this.raw.map(({ seq }) => seq);
219
for (const seq in this.saved) {
220
seqs.push(parseInt(seq));
221
}
222
return seqs;
223
};
224
225
time = (n: number): Date | undefined => {
226
if (this.isClosed()) {
227
throw Error("not initialized");
228
}
229
return this.stream.time(n);
230
};
231
232
// all server assigned times of messages in the stream.
233
times = (): (Date | undefined)[] => {
234
if (this.isClosed()) {
235
throw Error("not initialized");
236
}
237
return this.stream.times();
238
};
239
240
get length(): number {
241
return (
242
this.messages.length +
243
Object.keys(this.saved).length +
244
Object.keys(this.local).length
245
);
246
}
247
248
publish = (
249
mesg: T,
250
// NOTE: if you call this.headers(n) it is NOT visible until
251
// the publish is confirmed. This could be changed with more work if it matters.
252
options?: { headers?: Headers; ttl?: number },
253
): void => {
254
const id = randomId();
255
this.local[id] = mesg;
256
if (options != null) {
257
this.publishOptions[id] = options;
258
}
259
if (!this.noAutosave) {
260
this.save();
261
}
262
this.updateInventory();
263
};
264
265
headers = (n) => {
266
if (this.isClosed()) {
267
throw Error("closed");
268
}
269
return this.stream.headers(n);
270
};
271
272
push = (...args: T[]) => {
273
if (this.isClosed()) {
274
throw Error("closed");
275
}
276
for (const mesg of args) {
277
this.publish(mesg);
278
}
279
};
280
281
hasUnsavedChanges = (): boolean => {
282
if (this.isClosed()) {
283
return false;
284
}
285
return Object.keys(this.local).length > 0;
286
};
287
288
unsavedChanges = (): T[] => {
289
return Object.values(this.local);
290
};
291
292
save = reuseInFlight(async () => {
293
await until(
294
async () => {
295
if (this.isClosed()) {
296
return true;
297
}
298
try {
299
await this.attemptToSave();
300
//console.log("successfully saved");
301
} catch (err) {
302
if (false && !process.env.COCALC_TEST_MODE) {
303
console.log(
304
`WARNING: dstream attemptToSave failed - ${err}`,
305
this.name,
306
);
307
}
308
}
309
return !this.hasUnsavedChanges();
310
},
311
{ start: 150, decay: 1.3, max: 10000 },
312
);
313
});
314
315
private attemptToSave = async () => {
316
if (true) {
317
await this.attemptToSaveBatch();
318
} else {
319
await this.attemptToSaveParallel();
320
}
321
};
322
323
private attemptToSaveBatch = reuseInFlight(async () => {
324
if (this.isClosed()) {
325
throw Error("closed");
326
}
327
const v: { mesg: T; options: PublishOptions }[] = [];
328
const ids = Object.keys(this.local);
329
for (const id of ids) {
330
const mesg = this.local[id];
331
const options = {
332
...this.publishOptions[id],
333
msgID: id,
334
};
335
v.push({ mesg, options });
336
}
337
const w: (
338
| { seq: number; time: number; error?: undefined }
339
| { error: string; code?: any }
340
)[] = await this.stream.publishMany(v);
341
342
if (this.isClosed()) {
343
return;
344
}
345
346
let errors = false;
347
for (let i = 0; i < w.length; i++) {
348
const id = ids[i];
349
if (w[i].error) {
350
const x = w[i] as { error: string; code?: any };
351
if (x.code == "reject") {
352
delete this.local[id];
353
const err = new ConatError(x.error, { code: x.code });
354
// err has mesg and subject set.
355
this.emit("reject", { err, mesg: v[i].mesg });
356
}
357
if (!process.env.COCALC_TEST_MODE) {
358
console.warn(
359
`WARNING -- error saving dstream '${this.name}' -- ${w[i].error}`,
360
);
361
}
362
errors = true;
363
continue;
364
}
365
const { seq } = w[i] as { seq: number };
366
if ((this.raw[this.raw.length - 1]?.seq ?? -1) < seq) {
367
// it still isn't in this.raw
368
this.saved[seq] = v[i].mesg;
369
}
370
delete this.local[id];
371
delete this.publishOptions[id];
372
}
373
if (errors) {
374
throw Error(`there were errors saving dstream '${this.name}'`);
375
}
376
});
377
378
// non-batched version
379
private attemptToSaveParallel = reuseInFlight(async () => {
380
const f = async (id) => {
381
if (this.isClosed()) {
382
throw Error("closed");
383
}
384
const mesg = this.local[id];
385
try {
386
// @ts-ignore
387
const { seq } = await this.stream.publish(mesg, {
388
...this.publishOptions[id],
389
msgID: id,
390
});
391
if (this.isClosed()) {
392
return;
393
}
394
if ((this.raw[this.raw.length - 1]?.seq ?? -1) < seq) {
395
// it still isn't in this.raw
396
this.saved[seq] = mesg;
397
}
398
delete this.local[id];
399
delete this.publishOptions[id];
400
} catch (err) {
401
if (err.code == "reject") {
402
delete this.local[id];
403
// err has mesg and subject set.
404
this.emit("reject", { err, mesg });
405
} else {
406
if (!process.env.COCALC_TEST_MODE) {
407
console.warn(
408
`WARNING: problem saving dstream ${this.name} -- ${err}`,
409
);
410
}
411
}
412
}
413
if (this.isStable()) {
414
this.emit("stable");
415
}
416
};
417
// NOTE: ES6 spec guarantees "String keys are returned in the order
418
// in which they were added to the object."
419
const ids = Object.keys(this.local);
420
const MAX_PARALLEL = 50;
421
await awaitMap(ids, MAX_PARALLEL, f);
422
});
423
424
// load older messages starting at start_seq
425
load = async (opts: { start_seq: number }) => {
426
if (this.isClosed()) {
427
throw Error("closed");
428
}
429
await this.stream.load(opts);
430
};
431
432
// this is not synchronous -- it makes sure everything is saved out,
433
// then delete the persistent stream
434
// NOTE: for ephemeral streams, other clients will NOT see the result of a delete (unless they reconnect).
435
delete = async (opts?) => {
436
await this.save();
437
if (this.isClosed()) {
438
throw Error("closed");
439
}
440
return await this.stream.delete(opts);
441
};
442
443
get start_seq(): number | undefined {
444
return this.stream?.start_seq;
445
}
446
447
// get or set config
448
config = async (
449
config: Partial<Configuration> = {},
450
): Promise<Configuration> => {
451
if (this.isClosed()) {
452
throw Error("closed");
453
}
454
return await this.stream.config(config);
455
};
456
457
private updateInventory = asyncThrottle(
458
async () => {
459
if (this.isClosed() || this.opts == null || this.opts.noInventory) {
460
return;
461
}
462
await delay(500);
463
if (this.isClosed()) {
464
return;
465
}
466
let inv: Inventory | undefined = undefined;
467
try {
468
const { account_id, project_id, desc } = this.opts;
469
const inv = await inventory({
470
account_id,
471
project_id,
472
service: this.opts.service,
473
});
474
if (this.isClosed()) {
475
return;
476
}
477
const status = {
478
type: "stream" as "stream",
479
name: this.opts.name,
480
desc,
481
...(await this.stream.inventory()),
482
};
483
inv.set(status);
484
} catch (err) {
485
if (!process.env.COCALC_TEST_MODE) {
486
console.log(
487
`WARNING: unable to update inventory. name='${this.opts.name} -- ${err}'`,
488
);
489
}
490
} finally {
491
// @ts-ignore
492
inv?.close();
493
}
494
},
495
INVENTORY_UPDATE_INTERVAL,
496
{ leading: true, trailing: true },
497
);
498
}
499
500
export const cache = refCache<DStreamOptions, DStream>({
501
name: "dstream",
502
createKey: (options: DStreamOptions) => {
503
if (!options.name) {
504
throw Error("name must be specified");
505
}
506
const { name, account_id, project_id, client } = options;
507
const id = client?.id;
508
return jsonStableStringify({ name, account_id, project_id, id })!;
509
},
510
createObject: async (options: DStreamOptions) => {
511
if (options.client == null) {
512
options = { ...options, client: await conat() };
513
}
514
const dstream = new DStream(options);
515
await dstream.init();
516
return dstream;
517
},
518
});
519
520
export async function dstream<T>(options: DStreamOptions): Promise<DStream<T>> {
521
return await cache(options);
522
}
523
524