Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/core-stream.ts
5837 views
1
/*
2
core-stream.ts = the Core Stream data structure for conat.
3
4
This is the core data structure that easy-to-use ephemeral and persistent
5
streams and kv stores are built on. It is NOT meant to be super easy and
6
simple to use, with save in the background. Instead, operations
7
are async, and the API is complicated. We build dkv, dstream, akv, etc. on
8
top of this with a much friendly API.
9
10
NOTE: unlike in conat, in kv mode, the keys can be any utf-8 string.
11
We use the subject to track communication involving this stream, but
12
otherwise it has no relevant to the keys. Conat's core pub/sub/request/
13
reply model is very similar to NATS, but the analogue of Jetstream is
14
different because I don't find Jetstream useful at all, and find this
15
much more useful.
16
17
DEVELOPMENT:
18
19
~/cocalc/src/packages/backend$ node
20
21
require('@cocalc/backend/conat'); a = require('@cocalc/conat/sync/core-stream'); s = await a.cstream({name:'test'})
22
23
*/
24
25
import { EventEmitter } from "events";
26
import {
27
Message,
28
type Headers,
29
messageData,
30
decode,
31
} from "@cocalc/conat/core/client";
32
import { isNumericString } from "@cocalc/util/misc";
33
import refCache from "@cocalc/util/refcache";
34
import { conat } from "@cocalc/conat/client";
35
import type { Client } from "@cocalc/conat/core/client";
36
import jsonStableStringify from "json-stable-stringify";
37
import type {
38
SetOperation,
39
DeleteOperation,
40
StoredMessage,
41
Configuration,
42
} from "@cocalc/conat/persist/storage";
43
import type { Changefeed } from "@cocalc/conat/persist/client";
44
export type { Configuration };
45
import { join } from "path";
46
import {
47
type StorageOptions,
48
type PersistStreamClient,
49
stream as persist,
50
type SetOptions,
51
} from "@cocalc/conat/persist/client";
52
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
53
import { until } from "@cocalc/util/async-utils";
54
import { type PartialInventory } from "@cocalc/conat/persist/storage";
55
import { getLogger } from "@cocalc/conat/client";
56
57
const logger = getLogger("sync:core-stream");
58
59
const PUBLISH_MANY_BATCH_SIZE = 500;
60
61
const DEFAULT_GET_ALL_TIMEOUT = 15000;
62
63
const log = (..._args) => {};
64
//const log = console.log;
65
66
// when this many bytes of key:value have been changed (so need to be freed),
67
// we do a garbage collection pass.
68
export const KEY_GC_THRESH = 10 * 1e6;
69
70
// NOTE: when you do delete this.deleteKv(key), we ensure the previous
71
// messages with the given key is completely deleted from sqlite, and
72
// also create a *new* lightweight tombstone. That tombstone has this
73
// ttl, which defaults to DEFAULT_TOMBSTONE_TTL (one day), so the tombstone
74
// itself will be removed after 1 day. The tombstone is only needed for
75
// clients that go offline during the delete, then come back, and replay the
76
// partial log of what was missed. Such clients should reset if the
77
// offline time is longer than DEFAULT_TOMBSTONE_TTL.
78
// This only happens if allow_msg_ttl is configured to true, which is
79
// done with dkv, but not on by default otherwise.
80
export const DEFAULT_TOMBSTONE_TTL = 24 * 60 * 60 * 1000; // 1 day
81
82
export interface RawMsg extends Message {
83
timestamp: number;
84
seq: number;
85
key?: string;
86
}
87
88
export interface ChangeEvent<T> {
89
mesg?: T;
90
raw?: Partial<RawMsg>;
91
key?: string;
92
prev?: T;
93
msgID?: string;
94
}
95
96
const HEADER_PREFIX = "CN-";
97
98
export const COCALC_TOMBSTONE_HEADER = `${HEADER_PREFIX}Tombstone`;
99
export const COCALC_STREAM_HEADER = `${HEADER_PREFIX}Stream`;
100
export const COCALC_OPTIONS_HEADER = `${HEADER_PREFIX}Options`;
101
102
export interface CoreStreamOptions {
103
// what it's called
104
name: string;
105
// where it is located -- this is who **owns the resource**, which
106
// may or may not being who is accessing it.
107
account_id?: string;
108
project_id?: string;
109
config?: Partial<Configuration>;
110
// only load historic messages starting at the given seq number.
111
start_seq?: number;
112
113
ephemeral?: boolean;
114
sync?: boolean;
115
116
client?: Client;
117
118
noCache?: boolean;
119
120
// the name of the cluster of persistence servers to use -- this is
121
// by default SERVICE from conat/persist/util.ts. Set it to something
122
// else to use special different servers, e.g., we use a different service
123
// for sharing cluster state date, where the servers are ephemeral and
124
// there is one for each node.
125
service?: string;
126
}
127
128
export interface User {
129
account_id?: string;
130
project_id?: string;
131
}
132
133
export function storagePath({
134
account_id,
135
project_id,
136
name,
137
}: User & { name: string }) {
138
let userPath;
139
if (account_id) {
140
userPath = `accounts/${account_id}`;
141
} else if (project_id) {
142
userPath = `projects/${project_id}`;
143
} else {
144
userPath = "hub";
145
}
146
return join(userPath, name);
147
}
148
149
export class CoreStream<T = any> extends EventEmitter {
150
public readonly name: string;
151
152
private configOptions?: Partial<Configuration>;
153
private _start_seq?: number;
154
155
// don't do "this.raw=" or "this.messages=" anywhere in this class
156
// because dstream directly references the public raw/messages.
157
public readonly raw: RawMsg[] = [];
158
public readonly messages: T[] = [];
159
public readonly kv: { [key: string]: { mesg: T; raw: RawMsg } } = {};
160
private kvChangeBytes = 0;
161
162
// this msgID's is ONLY used in ephemeral mode by the leader.
163
private readonly msgIDs = new Set<any>();
164
// lastSeq used by clients to keep track of what they have received; if one
165
// is skipped they reconnect starting with the last one they didn't miss.
166
private lastSeq: number = 0;
167
private lastValueByKey = new Map<string, T>();
168
// IMPORTANT: user here means the *owner* of the resource, **NOT** the
169
// client who is accessing it! For example, a stream of edits of a file
170
// in a project has user {project_id} even if it is being accessed by
171
// an account.
172
private user: User;
173
private storage: StorageOptions;
174
private client?: Client;
175
private persistClient: PersistStreamClient;
176
private changefeed?: Changefeed;
177
private service?: string;
178
179
constructor({
180
name,
181
project_id,
182
account_id,
183
config,
184
start_seq,
185
ephemeral = false,
186
sync,
187
client,
188
service,
189
}: CoreStreamOptions) {
190
super();
191
logger.debug("constructor", name);
192
if (client == null) {
193
throw Error("client must be specified");
194
}
195
this.client = client;
196
this.service = service;
197
this.user = { account_id, project_id };
198
this.name = name;
199
this.storage = {
200
path: storagePath({ account_id, project_id, name }),
201
ephemeral,
202
sync,
203
};
204
this._start_seq = start_seq;
205
this.configOptions = config;
206
return new Proxy(this, {
207
get(target, prop) {
208
return typeof prop == "string" && isNumericString(prop)
209
? target.get(parseInt(prop))
210
: target[String(prop)];
211
},
212
});
213
}
214
215
private initialized = false;
216
init = async () => {
217
if (this.initialized) {
218
throw Error("init can only be called once");
219
}
220
this.initialized = true;
221
if (this.client == null) {
222
this.client = await conat();
223
}
224
this.persistClient = persist({
225
client: this.client,
226
user: this.user,
227
storage: this.storage,
228
service: this.service,
229
});
230
this.persistClient.on("error", (err) => {
231
if (!process.env.COCALC_TEST_MODE) {
232
console.log(`WARNING: persistent stream issue -- ${err}`);
233
}
234
});
235
await this.getAllFromPersist({
236
start_seq: this._start_seq,
237
noEmit: true,
238
});
239
240
await until(
241
async () => {
242
if (this.client == null) {
243
return true;
244
}
245
try {
246
this.configOptions = await this.config(this.configOptions);
247
return true;
248
} catch (err) {
249
if (err.code == 403) {
250
// fatal permission error
251
throw err;
252
}
253
}
254
return false;
255
},
256
{ start: 750 },
257
);
258
};
259
260
debugStats = () => {
261
return {
262
rawLength: this.raw.length,
263
messagesLength: this.messages.length,
264
kvLength: this.lengthKv,
265
lastValueByKeySize: this.lastValueByKey.size,
266
};
267
};
268
269
config = async (
270
config: Partial<Configuration> = {},
271
): Promise<Configuration> => {
272
if (this.storage == null) {
273
throw Error("bug -- storage must be set");
274
}
275
return await this.persistClient.config({ config });
276
};
277
278
private isClosed = () => {
279
return this.client === undefined;
280
};
281
282
close = () => {
283
logger.debug("close", this.name);
284
delete this.client;
285
this.removeAllListeners();
286
this.persistClient?.close();
287
// @ts-ignore
288
delete this.persistClient;
289
// @ts-ignore
290
delete this.kv;
291
// @ts-ignore
292
delete this.messages;
293
// @ts-ignore
294
delete this.raw;
295
// @ts-ignore
296
delete this.msgIDs;
297
// @ts-ignore
298
delete this.storage;
299
};
300
301
inventory = async (): Promise<PartialInventory> => {
302
return await this.persistClient.inventory();
303
};
304
305
// NOTE: It's assumed elsewhere that getAllFromPersist will not throw,
306
// and will keep retrying until (1) it works, or (2) self is closed,
307
// or (3) there is a fatal failure, e.g., lack of permissions.
308
private getAllFromPersist = async ({
309
start_seq = 0,
310
noEmit,
311
}: { start_seq?: number; noEmit?: boolean } = {}) => {
312
if (this.storage == null) {
313
throw Error("bug -- storage must be set");
314
}
315
await until(
316
async () => {
317
let messages: StoredMessage[] = [];
318
// TODO: tracking changes during getAll and suppressing sending duplicates
319
// via the changefeed is not implemented.
320
// let changes: (SetOperation | DeleteOperation | StoredMessage)[] = [];
321
try {
322
if (this.isClosed()) {
323
return true;
324
}
325
if (this.changefeed == null) {
326
this.changefeed = await this.persistClient.changefeed();
327
}
328
// console.log("get persistent stream", { start_seq }, this.storage);
329
messages = await this.persistClient.getAll({
330
start_seq,
331
timeout: DEFAULT_GET_ALL_TIMEOUT,
332
});
333
} catch (err) {
334
if (this.isClosed()) {
335
return true;
336
}
337
if (!process.env.COCALC_TEST_MODE) {
338
console.log(
339
`WARNING: getAllFromPersist - failed -- ${err}, code=${err.code}, service=${this.service}, storage=${JSON.stringify(this.storage)} -- will retry`,
340
);
341
}
342
if (err.code == 503 || err.code == 408) {
343
// 503: temporary error due to messages being dropped,
344
// so return false to try again. This is expected to
345
// sometimes happen under heavy load, automatic failover, etc.
346
// 408: timeout waiting to be connected/ready
347
return false;
348
}
349
if (err.code == 403) {
350
// fatal permission error
351
throw err;
352
}
353
if (err.code == 429) {
354
// too many users
355
throw err;
356
}
357
// any other error that we might not address above -- just try again in a while.
358
return false;
359
}
360
this.processPersistentMessages(messages, {
361
noEmit,
362
noSeqCheck: true,
363
});
364
// if (changes.length > 0) {
365
// this.processPersistentMessages(changes, {
366
// noEmit,
367
// noSeqCheck: false,
368
// });
369
// }
370
// success!
371
return true;
372
},
373
{ start: 1000, max: 15000 },
374
);
375
376
this.listen();
377
};
378
379
private processPersistentMessages = (
380
messages: (SetOperation | DeleteOperation | StoredMessage)[],
381
opts: { noEmit?: boolean; noSeqCheck?: boolean },
382
) => {
383
if (this.raw === undefined) {
384
// closed
385
return;
386
}
387
let seq = undefined;
388
for (const mesg of messages) {
389
try {
390
this.processPersistentMessage(mesg, opts);
391
if (mesg["seq"] != null) {
392
seq = mesg["seq"];
393
}
394
} catch (err) {
395
console.log("WARNING: issue processing message", mesg, err);
396
}
397
}
398
return seq;
399
};
400
401
private processPersistentMessage = (
402
mesg: SetOperation | DeleteOperation | StoredMessage,
403
opts: { noEmit?: boolean; noSeqCheck?: boolean },
404
) => {
405
if ((mesg as DeleteOperation).op == "delete") {
406
this.processPersistentDelete(mesg as DeleteOperation, opts);
407
} else {
408
// set is the default
409
this.processPersistentSet(mesg as SetOperation, opts);
410
}
411
};
412
413
private processPersistentDelete = (
414
{ seqs }: DeleteOperation,
415
{ noEmit }: { noEmit?: boolean },
416
) => {
417
if (this.raw == null) return;
418
const X = new Set<number>(seqs);
419
// seqs is a list of integers. We remove
420
// every entry from this.raw, this.messages, and this.kv
421
// where this.raw.seq is in X by mutating raw/messages/kv,
422
// not by making new objects (since external references).
423
// This is a rare operation so we're not worried too much
424
// about performance.
425
const keys: { [seq: number]: string } = {};
426
for (const key in this.kv) {
427
const seq = this.kv[key]?.raw?.seq;
428
if (X.has(seq)) {
429
delete this.kv[key];
430
this.lastValueByKey.delete(key);
431
keys[key] = seq;
432
}
433
}
434
const indexes: number[] = [];
435
for (let i = 0; i < this.raw.length; i++) {
436
const seq = this.raw[i].seq;
437
if (X.has(seq)) {
438
indexes.push(i);
439
if (!noEmit) {
440
this.emitChange({
441
mesg: undefined,
442
raw: { seq },
443
key: keys[seq],
444
prev: this.messages[i],
445
});
446
}
447
}
448
}
449
450
// remove this.raw[i] and this.messages[i] for all i in indexes,
451
// with special case to be fast in the very common case of contiguous indexes.
452
if (indexes.length > 1 && indexes.every((v, i) => v === indexes[0] + i)) {
453
// Contiguous: bulk remove for performance
454
const start = indexes[0];
455
const deleteCount = indexes.length;
456
this.raw.splice(start, deleteCount);
457
this.messages.splice(start, deleteCount);
458
} else {
459
// Non-contiguous: fallback to individual reverse splices
460
for (let i = indexes.length - 1; i >= 0; i--) {
461
const idx = indexes[i];
462
this.raw.splice(idx, 1);
463
this.messages.splice(idx, 1);
464
}
465
}
466
};
467
468
private processPersistentSetLargestSeq: number = 0;
469
private missingMessages = new Set<number>();
470
private processPersistentSet = (
471
{ seq, time, key, encoding, raw: data, headers, msgID }: SetOperation,
472
{
473
noEmit,
474
noSeqCheck,
475
}: {
476
noEmit?: boolean;
477
noSeqCheck?: boolean;
478
},
479
) => {
480
if (this.raw == null) return;
481
if (!noSeqCheck) {
482
const expected = this.processPersistentSetLargestSeq + 1;
483
if (seq > expected) {
484
log(
485
"processPersistentSet -- detected missed seq number",
486
{ seq, expected: this.processPersistentSetLargestSeq + 1 },
487
this.storage,
488
);
489
// We record that some are missing.
490
for (let s = expected; s <= seq - 1; s++) {
491
this.missingMessages.add(s);
492
this.getAllMissingMessages();
493
}
494
}
495
}
496
497
if (seq > this.processPersistentSetLargestSeq) {
498
this.processPersistentSetLargestSeq = seq;
499
}
500
501
const mesg = decode({ encoding, data });
502
const raw = {
503
timestamp: time,
504
headers,
505
seq,
506
raw: data,
507
key,
508
} as RawMsg;
509
if (seq > (this.raw.slice(-1)[0]?.seq ?? 0)) {
510
// easy fast initial load to the end of the list (common special case)
511
this.messages.push(mesg);
512
this.raw.push(raw);
513
} else {
514
// Insert in the correct place. This should only
515
// happen when calling load of old data, which happens, e.g., during
516
// automatic failover. The algorithm below is
517
// dumb and could be replaced by a binary search. However, we'll
518
// change how we batch load so there's less point.
519
let i = 0;
520
while (i < this.raw.length && this.raw[i].seq < seq) {
521
i += 1;
522
}
523
// after the above loop, either:
524
// - this.raw[i] is undefined because i = this.raw.length and every known entry was less than seq,
525
// so we just append it, or
526
// - this.raw[i] is defined and this.raw[i].seq >= seq. If they are equal, do nothing, since we already
527
// have it. If not equal, then splice it in.
528
if (i >= this.raw.length) {
529
this.raw.push(raw);
530
this.messages.push(mesg);
531
} else if (this.raw[i].seq > seq) {
532
this.raw.splice(i, 0, raw);
533
this.messages.splice(i, 0, mesg);
534
} // other case -- we already have it.
535
}
536
let prev: T | undefined = undefined;
537
// Issue #8702: Capture the previous raw message for this key BEFORE updating this.kv.
538
// This is needed for the client-side cleanup below (see the processPersistentDelete call).
539
// https://github.com/sagemathinc/cocalc/issues/8702
540
const prevRaw = typeof key == "string" ? this.kv[key]?.raw : undefined;
541
if (typeof key == "string") {
542
prev = this.kv[key]?.mesg ?? this.lastValueByKey.get(key);
543
if (raw.headers?.[COCALC_TOMBSTONE_HEADER]) {
544
delete this.kv[key];
545
this.lastValueByKey.delete(key);
546
} else {
547
if (this.kv[key] !== undefined) {
548
const { raw } = this.kv[key];
549
this.kvChangeBytes += raw.raw.length;
550
}
551
552
this.kv[key] = { raw, mesg };
553
this.lastValueByKey.set(key, mesg);
554
555
if (this.kvChangeBytes >= KEY_GC_THRESH) {
556
this.gcKv();
557
}
558
}
559
}
560
// Issue #8702: Client-side cleanup for keyed updates.
561
// When the server processes a keyed update, it deletes the old row for that key
562
// and inserts a new one. However, the server does NOT emit delete events for
563
// these overwrites (see storage.ts:450). Without this cleanup, clients would
564
// never remove old entries from raw[] and messages[], causing unbounded memory
565
// growth. This fix removes the old entry when a new keyed message arrives.
566
// https://github.com/sagemathinc/cocalc/issues/8702
567
if (typeof key == "string" && prevRaw?.seq != null && prevRaw.seq !== seq) {
568
this.processPersistentDelete(
569
{ op: "delete", seqs: [prevRaw.seq] },
570
{ noEmit: true },
571
);
572
}
573
this.lastSeq = Math.max(this.lastSeq, seq);
574
if (!noEmit) {
575
this.emitChange({ mesg, raw, key, prev, msgID });
576
}
577
};
578
579
private emitChange = (e: ChangeEvent<T>) => {
580
if (this.raw == null) return;
581
this.emit("change", e);
582
};
583
584
private listen = async () => {
585
// log("core-stream: listen", this.storage);
586
await until(
587
async () => {
588
if (this.isClosed()) {
589
return true;
590
}
591
try {
592
if (this.changefeed == null) {
593
this.changefeed = await this.persistClient.changefeed();
594
if (this.isClosed()) {
595
return true;
596
}
597
}
598
599
for await (const updates of this.changefeed) {
600
this.processPersistentMessages(updates, {
601
noEmit: false,
602
noSeqCheck: false,
603
});
604
if (this.isClosed()) {
605
return true;
606
}
607
}
608
} catch (err) {
609
// There should never be a case where the changefeed throws
610
// an error or ends without this whole streaming being closed.
611
// If that happens its an unexpected bug. Instead of failing,
612
// we log this, loop around, and make a new changefeed.
613
// This normally doesn't happen but could if a persist server is being restarted
614
// frequently or things are seriously broken. We cause this in
615
// backend/conat/test/core/core-stream-break.test.ts
616
if (!process.env.COCALC_TEST_MODE) {
617
log(
618
`WARNING: core-stream changefeed error -- ${err}`,
619
this.storage,
620
);
621
}
622
}
623
624
delete this.changefeed;
625
626
// above loop exits when the persistent server
627
// stops sending messages for some reason. In that
628
// case we reconnect, picking up where we left off.
629
630
if (this.client == null) {
631
return true;
632
}
633
634
await this.getAllFromPersist({
635
start_seq: this.lastSeq + 1,
636
noEmit: false,
637
});
638
return false;
639
},
640
{ start: 500, max: 7500, decay: 1.2 },
641
);
642
};
643
644
publish = async (
645
mesg: T,
646
options?: PublishOptions,
647
): Promise<{ seq: number; time: number } | undefined> => {
648
if (mesg === undefined) {
649
if (options?.key !== undefined) {
650
// undefined can't be JSON encoded, so we can't possibly represent it, and this
651
// *must* be treated as a delete.
652
this.deleteKv(options?.key, { previousSeq: options?.previousSeq });
653
return;
654
} else {
655
throw Error("stream non-kv publish - mesg must not be 'undefined'");
656
}
657
}
658
659
if (options?.msgID && this.msgIDs.has(options.msgID)) {
660
// it's a dup
661
return;
662
}
663
const md = messageData(mesg, { headers: options?.headers });
664
const x = await this.persistClient.set({
665
key: options?.key,
666
messageData: md,
667
previousSeq: options?.previousSeq,
668
msgID: options?.msgID,
669
ttl: options?.ttl,
670
timeout: options?.timeout,
671
});
672
if (options?.msgID) {
673
this.msgIDs?.add(options.msgID);
674
}
675
return x;
676
};
677
678
publishMany = async (
679
messages: { mesg: T; options?: PublishOptions }[],
680
): Promise<
681
({ seq: number; time: number } | { error: string; code?: any })[]
682
> => {
683
let result: (
684
| { seq: number; time: number }
685
| { error: string; code?: any }
686
)[] = [];
687
688
for (let i = 0; i < messages.length; i += PUBLISH_MANY_BATCH_SIZE) {
689
const batch = messages.slice(i, i + PUBLISH_MANY_BATCH_SIZE);
690
result = result.concat(await this.publishMany0(batch));
691
}
692
693
return result;
694
};
695
696
private publishMany0 = async (
697
messages: { mesg: T; options?: PublishOptions }[],
698
): Promise<
699
({ seq: number; time: number } | { error: string; code?: any })[]
700
> => {
701
const v: SetOptions[] = [];
702
let timeout: number | undefined = undefined;
703
for (const { mesg, options } of messages) {
704
if (options?.timeout) {
705
if (timeout === undefined) {
706
timeout = options.timeout;
707
} else {
708
timeout = Math.min(timeout, options.timeout);
709
}
710
}
711
const md = messageData(mesg, { headers: options?.headers });
712
v.push({
713
key: options?.key,
714
messageData: md,
715
previousSeq: options?.previousSeq,
716
msgID: options?.msgID,
717
ttl: options?.ttl,
718
});
719
}
720
return await this.persistClient.setMany(v, { timeout });
721
};
722
723
get = (n?): T | T[] => {
724
if (n == null) {
725
return this.getAll();
726
} else {
727
return this.messages[n];
728
}
729
};
730
731
seq = (n: number): number | undefined => {
732
return this.raw[n]?.seq;
733
};
734
735
getAll = (): T[] => {
736
return [...this.messages];
737
};
738
739
get length(): number {
740
return this.messages.length;
741
}
742
743
get start_seq(): number | undefined {
744
return this._start_seq;
745
}
746
747
headers = (n: number): { [key: string]: any } | undefined => {
748
return this.raw[n]?.headers;
749
};
750
751
// key:value interface for subset of messages pushed with key option set.
752
// NOTE: This does NOT throw an error if our local seq is out of date (leave that
753
// to dkv built on this).
754
setKv = async (
755
key: string,
756
mesg: T,
757
options?: {
758
headers?: Headers;
759
previousSeq?: number;
760
},
761
): Promise<{ seq: number; time: number } | undefined> => {
762
return await this.publish(mesg, { ...options, key });
763
};
764
765
setKvMany = async (
766
x: {
767
key: string;
768
mesg: T;
769
options?: {
770
headers?: Headers;
771
previousSeq?: number;
772
};
773
}[],
774
): Promise<
775
({ seq: number; time: number } | { error: string; code?: any })[]
776
> => {
777
const messages: { mesg: T; options?: PublishOptions }[] = [];
778
for (const { key, mesg, options } of x) {
779
messages.push({ mesg, options: { ...options, key } });
780
}
781
return await this.publishMany(messages);
782
};
783
784
deleteKv = async (
785
key: string,
786
options?: {
787
msgID?: string;
788
previousSeq?: number;
789
},
790
) => {
791
if (this.kv[key] === undefined) {
792
// nothing to do
793
return;
794
}
795
return await this.publish(null as any, {
796
...options,
797
headers: { [COCALC_TOMBSTONE_HEADER]: true },
798
key,
799
ttl: DEFAULT_TOMBSTONE_TTL,
800
});
801
};
802
803
getKv = (key: string): T | undefined => {
804
return this.kv[key]?.mesg;
805
};
806
807
hasKv = (key: string): boolean => {
808
return this.kv?.[key] !== undefined;
809
};
810
811
getAllKv = (): { [key: string]: T } => {
812
const all: { [key: string]: T } = {};
813
for (const key in this.kv) {
814
all[key] = this.kv[key].mesg;
815
}
816
return all;
817
};
818
819
// efficient way to get just the keys -- use this instead of
820
// getAllKv if you just need the keys.
821
keysKv = (): string[] => {
822
return Object.keys(this.kv);
823
};
824
825
seqKv = (key: string): number | undefined => {
826
return this.kv[key]?.raw.seq;
827
};
828
829
timeKv = (key?: string): Date | { [key: string]: Date } | undefined => {
830
if (key === undefined) {
831
const all: { [key: string]: Date } = {};
832
for (const key in this.kv) {
833
all[key] = new Date(this.kv[key].raw.timestamp);
834
}
835
return all;
836
}
837
const r = this.kv[key]?.raw;
838
if (r == null) {
839
return;
840
}
841
return new Date(r.timestamp);
842
};
843
844
headersKv = (key: string): { [key: string]: any } | undefined => {
845
return this.kv[key]?.raw?.headers;
846
};
847
848
get lengthKv(): number {
849
return Object.keys(this.kv).length;
850
}
851
852
// load older messages starting at start_seq up to the oldest message
853
// we currently have. This can throw an exception in case of heavy
854
// load or network issues, but should completely succeed or make
855
// no change.
856
load = async ({
857
start_seq,
858
noEmit,
859
}: {
860
start_seq: number;
861
noEmit?: boolean;
862
}) => {
863
// This is used for loading more TimeTravel history
864
if (this.storage == null) {
865
throw Error("bug");
866
}
867
// this is one before the oldest we have
868
const end_seq = (this.raw[0]?.seq ?? this._start_seq ?? 1) - 1;
869
if (start_seq > end_seq) {
870
// nothing to load
871
return;
872
}
873
// we're moving start_seq back to this point
874
this._start_seq = start_seq;
875
const messages = await this.persistClient.getAll({
876
start_seq,
877
end_seq,
878
});
879
this.processPersistentMessages(messages, { noEmit, noSeqCheck: true });
880
};
881
882
private getAllMissingMessages = reuseInFlight(async () => {
883
await until(
884
async () => {
885
if (this.client == null || this.missingMessages.size == 0) {
886
return true;
887
}
888
try {
889
const missing = Array.from(this.missingMessages);
890
missing.sort();
891
log("core-stream: getMissingSeq", missing, this.storage);
892
const messages = await this.persistClient.getAll({
893
start_seq: missing[0],
894
end_seq: missing[missing.length - 1],
895
});
896
this.processPersistentMessages(messages, {
897
noEmit: false,
898
noSeqCheck: true,
899
});
900
for (const seq of missing) {
901
this.missingMessages.delete(seq);
902
}
903
} catch (err) {
904
log(
905
"core-stream: WARNING -- issue getting missing updates",
906
err,
907
this.storage,
908
);
909
}
910
return false;
911
},
912
{ start: 1000, max: 15000, decay: 1.3 },
913
);
914
});
915
916
// get server assigned time of n-th message in stream
917
time = (n: number): Date | undefined => {
918
const r = this.raw[n];
919
if (r == null) {
920
return;
921
}
922
return new Date(r.timestamp);
923
};
924
925
times = () => {
926
const v: (Date | undefined)[] = [];
927
for (let i = 0; i < this.length; i++) {
928
v.push(this.time(i));
929
}
930
return v;
931
};
932
933
stats = ({
934
start_seq = 1,
935
}: {
936
start_seq?: number;
937
} = {}): { count: number; bytes: number } | undefined => {
938
if (this.raw == null) {
939
return;
940
}
941
let count = 0;
942
let bytes = 0;
943
for (const { raw, seq } of this.raw) {
944
if (seq == null) {
945
continue;
946
}
947
if (seq < start_seq) {
948
continue;
949
}
950
count += 1;
951
bytes += raw.length;
952
}
953
return { count, bytes };
954
};
955
956
// delete all messages up to and including the
957
// one at position index, i.e., this.messages[index]
958
// is deleted.
959
// NOTE: For ephemeral streams, clients will NOT see the result of a delete,
960
// except when they load the stream later. For persistent streams all
961
// **connected** clients will see the delete. THAT said, this is not a "proper"
962
// distributed computing primitive with tombstones, etc. This is primarily
963
// meant for reducing space usage, and shouldn't be relied on for
964
// any other purpose.
965
delete = async ({
966
all,
967
last_index,
968
seq,
969
last_seq,
970
key,
971
seqs,
972
}: {
973
// give exactly ONE parameter -- by default nothing happens with no params
974
// all: delete everything
975
all?: boolean;
976
// last_index: everything up to and including index'd message
977
last_index?: number;
978
// seq: delete message with this sequence number
979
seq?: number;
980
// seqs: delete the messages in this array of sequence numbers
981
seqs?: number[];
982
// last_seq: delete everything up to and including this sequence number
983
last_seq?: number;
984
// key: delete the message with this key
985
key?: string;
986
} = {}): Promise<{ seqs: number[] }> => {
987
let opts;
988
if (all) {
989
opts = { all: true };
990
} else if (last_index != null) {
991
if (last_index >= this.raw.length) {
992
opts = { all: true };
993
} else if (last_index < 0) {
994
return { seqs: [] };
995
} else {
996
const last_seq = this.raw[last_index].seq;
997
if (last_seq === undefined) {
998
throw Error(`BUG: invalid index ${last_index}`);
999
}
1000
opts = { last_seq };
1001
}
1002
} else if (seq != null) {
1003
opts = { seq };
1004
} else if (seqs != null) {
1005
opts = { seqs };
1006
} else if (last_seq != null) {
1007
opts = { last_seq };
1008
} else if (key != null) {
1009
const seq = this.kv[key]?.raw?.seq;
1010
if (seq === undefined) {
1011
return { seqs: [] };
1012
}
1013
opts = { seq };
1014
}
1015
return await this.persistClient.delete(opts);
1016
};
1017
1018
// delete messages that are no longer needed since newer values have been written
1019
gcKv = () => {
1020
this.kvChangeBytes = 0;
1021
for (let i = 0; i < this.raw.length; i++) {
1022
const key = this.raw[i].key;
1023
if (key !== undefined) {
1024
if (this.raw[i].raw.length > 0 && this.raw[i] !== this.kv[key].raw) {
1025
this.raw[i] = {
1026
...this.raw[i],
1027
headers: undefined,
1028
raw: Buffer.from(""),
1029
} as RawMsg;
1030
this.messages[i] = undefined as T;
1031
}
1032
}
1033
}
1034
};
1035
}
1036
1037
export interface PublishOptions {
1038
// headers for this message
1039
headers?: Headers;
1040
// unique id for this message to dedup so if you send the same
1041
// message more than once with the same id it doesn't get published
1042
// multiple times.
1043
msgID?: string;
1044
// key -- if specified a key field is also stored on the server,
1045
// and any previous messages with the same key are deleted. Also,
1046
// an entry is set in this.kv[key] so that this.getKv(key), etc. work.
1047
key?: string;
1048
// if key is specified and previousSeq is set, the server throws
1049
// an error if the sequence number of the current key is
1050
// not previousSeq. We use this with this.seqKv(key) to
1051
// provide read/change/write semantics and to know when we
1052
// should resovle a merge conflict. This is ignored if
1053
// key is not specified.
1054
previousSeq?: number;
1055
// if set to a number of ms AND the config option allow_msg_ttl
1056
// is set on this persistent stream, then
1057
// this message will be deleted after the given amount of time (in ms).
1058
ttl?: number;
1059
timeout?: number;
1060
}
1061
1062
export const cache = refCache<CoreStreamOptions, CoreStream>({
1063
name: "core-stream",
1064
createObject: async (options: CoreStreamOptions) => {
1065
if (options.client == null) {
1066
options = { ...options, client: await conat() };
1067
}
1068
const cstream = new CoreStream(options);
1069
await cstream.init();
1070
return cstream;
1071
},
1072
createKey: ({ client, ...options }) => {
1073
return jsonStableStringify({ id: client?.id, ...options })!;
1074
},
1075
});
1076
1077
export async function cstream<T>(
1078
options: CoreStreamOptions,
1079
): Promise<CoreStream<T>> {
1080
return await cache(options);
1081
}
1082
1083