Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/sync/core-stream.ts
1710 views
1
/*
2
core-stream.ts = the Core Stream data structure for conat.
3
4
This is the core data structure that easy-to-use ephemeral and persistent
5
streams and kv stores are built on. It is NOT meant to be super easy and
6
simple to use, with save in the background. Instead, operations
7
are async, and the API is complicated. We build dkv, dstream, akv, etc. on
8
top of this with a much friendly API.
9
10
NOTE: unlike in conat, in kv mode, the keys can be any utf-8 string.
11
We use the subject to track communication involving this stream, but
12
otherwise it has no relevant to the keys. Conat's core pub/sub/request/
13
reply model is very similar to NATS, but the analogue of Jetstream is
14
different because I don't find Jetstream useful at all, and find this
15
much more useful.
16
17
DEVELOPMENT:
18
19
~/cocalc/src/packages/backend$ node
20
21
require('@cocalc/backend/conat'); a = require('@cocalc/conat/sync/core-stream'); s = await a.cstream({name:'test'})
22
23
*/
24
25
import { EventEmitter } from "events";
26
import {
27
Message,
28
type Headers,
29
messageData,
30
decode,
31
} from "@cocalc/conat/core/client";
32
import { isNumericString } from "@cocalc/util/misc";
33
import refCache from "@cocalc/util/refcache";
34
import { conat } from "@cocalc/conat/client";
35
import type { Client } from "@cocalc/conat/core/client";
36
import jsonStableStringify from "json-stable-stringify";
37
import type {
38
SetOperation,
39
DeleteOperation,
40
StoredMessage,
41
Configuration,
42
} from "@cocalc/conat/persist/storage";
43
import type { Changefeed } from "@cocalc/conat/persist/client";
44
export type { Configuration };
45
import { join } from "path";
46
import {
47
type StorageOptions,
48
type PersistStreamClient,
49
stream as persist,
50
type SetOptions,
51
} from "@cocalc/conat/persist/client";
52
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
53
import { until } from "@cocalc/util/async-utils";
54
import { type PartialInventory } from "@cocalc/conat/persist/storage";
55
import { getLogger } from "@cocalc/conat/client";
56
57
const logger = getLogger("sync:core-stream");
58
59
const PUBLISH_MANY_BATCH_SIZE = 500;
60
61
const DEFAULT_GET_ALL_TIMEOUT = 15000;
62
63
const log = (..._args) => {};
64
//const log = console.log;
65
66
// when this many bytes of key:value have been changed (so need to be freed),
67
// we do a garbage collection pass.
68
export const KEY_GC_THRESH = 10 * 1e6;
69
70
// NOTE: when you do delete this.deleteKv(key), we ensure the previous
71
// messages with the given key is completely deleted from sqlite, and
72
// also create a *new* lightweight tombstone. That tombstone has this
73
// ttl, which defaults to DEFAULT_TOMBSTONE_TTL (one week), so the tombstone
74
// itself will be removed after 1 week. The tombstone is only needed for
75
// clients that go offline during the delete, then come back, and reply the
76
// partial log of what was missed. Such clients should reset if the
77
// offline time is longer than DEFAULT_TOMBSTONE_TTL.
78
// This only happens if allow_msg_ttl is configured to true, which is
79
// done with dkv, but not on by default otherwise.
80
export const DEFAULT_TOMBSTONE_TTL = 7 * 24 * 60 * 60 * 1000; // 1 week
81
82
export interface RawMsg extends Message {
83
timestamp: number;
84
seq: number;
85
key?: string;
86
}
87
88
export interface ChangeEvent<T> {
89
mesg?: T;
90
raw?: Partial<RawMsg>;
91
key?: string;
92
prev?: T;
93
msgID?: string;
94
}
95
96
const HEADER_PREFIX = "CN-";
97
98
export const COCALC_TOMBSTONE_HEADER = `${HEADER_PREFIX}Tombstone`;
99
export const COCALC_STREAM_HEADER = `${HEADER_PREFIX}Stream`;
100
export const COCALC_OPTIONS_HEADER = `${HEADER_PREFIX}Options`;
101
102
export interface CoreStreamOptions {
103
// what it's called
104
name: string;
105
// where it is located -- this is who **owns the resource**, which
106
// may or may not being who is accessing it.
107
account_id?: string;
108
project_id?: string;
109
config?: Partial<Configuration>;
110
// only load historic messages starting at the given seq number.
111
start_seq?: number;
112
113
ephemeral?: boolean;
114
sync?: boolean;
115
116
client?: Client;
117
118
noCache?: boolean;
119
120
// the name of the cluster of persistence servers to use -- this is
121
// by default SERVICE from conat/persist/util.ts. Set it to something
122
// else to use special different servers, e.g., we use a different service
123
// for sharing cluster state date, where the servers are ephemeral and
124
// there is one for each node.
125
service?: string;
126
}
127
128
export interface User {
129
account_id?: string;
130
project_id?: string;
131
}
132
133
export function storagePath({
134
account_id,
135
project_id,
136
name,
137
}: User & { name: string }) {
138
let userPath;
139
if (account_id) {
140
userPath = `accounts/${account_id}`;
141
} else if (project_id) {
142
userPath = `projects/${project_id}`;
143
} else {
144
userPath = "hub";
145
}
146
return join(userPath, name);
147
}
148
149
export class CoreStream<T = any> extends EventEmitter {
150
public readonly name: string;
151
152
private configOptions?: Partial<Configuration>;
153
private _start_seq?: number;
154
155
// don't do "this.raw=" or "this.messages=" anywhere in this class
156
// because dstream directly references the public raw/messages.
157
public readonly raw: RawMsg[] = [];
158
public readonly messages: T[] = [];
159
public readonly kv: { [key: string]: { mesg: T; raw: RawMsg } } = {};
160
private kvChangeBytes = 0;
161
162
// this msgID's is ONLY used in ephemeral mode by the leader.
163
private readonly msgIDs = new Set<any>();
164
// lastSeq used by clients to keep track of what they have received; if one
165
// is skipped they reconnect starting with the last one they didn't miss.
166
private lastSeq: number = 0;
167
// IMPORTANT: user here means the *owner* of the resource, **NOT** the
168
// client who is accessing it! For example, a stream of edits of a file
169
// in a project has user {project_id} even if it is being accessed by
170
// an account.
171
private user: User;
172
private storage: StorageOptions;
173
private client?: Client;
174
private persistClient: PersistStreamClient;
175
private changefeed?: Changefeed;
176
private service?: string;
177
178
constructor({
179
name,
180
project_id,
181
account_id,
182
config,
183
start_seq,
184
ephemeral = false,
185
sync,
186
client,
187
service,
188
}: CoreStreamOptions) {
189
super();
190
logger.debug("constructor", name);
191
if (client == null) {
192
throw Error("client must be specified");
193
}
194
this.client = client;
195
this.service = service;
196
this.user = { account_id, project_id };
197
this.name = name;
198
this.storage = {
199
path: storagePath({ account_id, project_id, name }),
200
ephemeral,
201
sync,
202
};
203
this._start_seq = start_seq;
204
this.configOptions = config;
205
return new Proxy(this, {
206
get(target, prop) {
207
return typeof prop == "string" && isNumericString(prop)
208
? target.get(parseInt(prop))
209
: target[String(prop)];
210
},
211
});
212
}
213
214
private initialized = false;
215
init = async () => {
216
if (this.initialized) {
217
throw Error("init can only be called once");
218
}
219
this.initialized = true;
220
if (this.client == null) {
221
this.client = await conat();
222
}
223
this.persistClient = persist({
224
client: this.client,
225
user: this.user,
226
storage: this.storage,
227
service: this.service,
228
});
229
this.persistClient.on("error", (err) => {
230
if (!process.env.COCALC_TEST_MODE) {
231
console.log(`WARNING: persistent stream issue -- ${err}`);
232
}
233
});
234
await this.getAllFromPersist({
235
start_seq: this._start_seq,
236
noEmit: true,
237
});
238
239
await until(
240
async () => {
241
if (this.client == null) {
242
return true;
243
}
244
try {
245
this.configOptions = await this.config(this.configOptions);
246
return true;
247
} catch (err) {
248
if (err.code == 403) {
249
// fatal permission error
250
throw err;
251
}
252
}
253
return false;
254
},
255
{ start: 750 },
256
);
257
};
258
259
config = async (
260
config: Partial<Configuration> = {},
261
): Promise<Configuration> => {
262
if (this.storage == null) {
263
throw Error("bug -- storage must be set");
264
}
265
return await this.persistClient.config({ config });
266
};
267
268
private isClosed = () => {
269
return this.client === undefined;
270
};
271
272
close = () => {
273
logger.debug("close", this.name);
274
delete this.client;
275
this.removeAllListeners();
276
this.persistClient?.close();
277
// @ts-ignore
278
delete this.persistClient;
279
// @ts-ignore
280
delete this.kv;
281
// @ts-ignore
282
delete this.messages;
283
// @ts-ignore
284
delete this.raw;
285
// @ts-ignore
286
delete this.msgIDs;
287
// @ts-ignore
288
delete this.storage;
289
};
290
291
inventory = async (): Promise<PartialInventory> => {
292
return await this.persistClient.inventory();
293
};
294
295
// NOTE: It's assumed elsewhere that getAllFromPersist will not throw,
296
// and will keep retrying until (1) it works, or (2) self is closed,
297
// or (3) there is a fatal failure, e.g., lack of permissions.
298
private getAllFromPersist = async ({
299
start_seq = 0,
300
noEmit,
301
}: { start_seq?: number; noEmit?: boolean } = {}) => {
302
if (this.storage == null) {
303
throw Error("bug -- storage must be set");
304
}
305
await until(
306
async () => {
307
let messages: StoredMessage[] = [];
308
let changes: (SetOperation | DeleteOperation | StoredMessage)[] = [];
309
try {
310
if (this.isClosed()) {
311
return true;
312
}
313
if (this.changefeed == null) {
314
this.changefeed = await this.persistClient.changefeed();
315
}
316
// console.log("get persistent stream", { start_seq }, this.storage);
317
messages = await this.persistClient.getAll({
318
start_seq,
319
timeout: DEFAULT_GET_ALL_TIMEOUT,
320
});
321
} catch (err) {
322
if (this.isClosed()) {
323
return true;
324
}
325
if (!process.env.COCALC_TEST_MODE) {
326
console.log(
327
`WARNING: getAllFromPersist - failed -- ${err}, code=${err.code}, service=${this.service}, storage=${JSON.stringify(this.storage)} -- will retry`,
328
);
329
}
330
if (err.code == 503 || err.code == 408) {
331
// 503: temporary error due to messages being dropped,
332
// so return false to try again. This is expected to
333
// sometimes happen under heavy load, automatic failover, etc.
334
// 408: timeout waiting to be connected/ready
335
return false;
336
}
337
if (err.code == 403) {
338
// fatal permission error
339
throw err;
340
}
341
if (err.code == 429) {
342
// too many users
343
throw err;
344
}
345
// any other error that we might not address above -- just try again in a while.
346
return false;
347
}
348
this.processPersistentMessages(messages, {
349
noEmit,
350
noSeqCheck: true,
351
});
352
if (changes.length > 0) {
353
this.processPersistentMessages(changes, {
354
noEmit,
355
noSeqCheck: false,
356
});
357
}
358
// success!
359
return true;
360
},
361
{ start: 1000, max: 15000 },
362
);
363
364
this.listen();
365
};
366
367
private processPersistentMessages = (
368
messages: (SetOperation | DeleteOperation | StoredMessage)[],
369
opts: { noEmit?: boolean; noSeqCheck?: boolean },
370
) => {
371
if (this.raw === undefined) {
372
// closed
373
return;
374
}
375
let seq = undefined;
376
for (const mesg of messages) {
377
try {
378
this.processPersistentMessage(mesg, opts);
379
if (mesg["seq"] != null) {
380
seq = mesg["seq"];
381
}
382
} catch (err) {
383
console.log("WARNING: issue processing message", mesg, err);
384
}
385
}
386
return seq;
387
};
388
389
private processPersistentMessage = (
390
mesg: SetOperation | DeleteOperation | StoredMessage,
391
opts: { noEmit?: boolean; noSeqCheck?: boolean },
392
) => {
393
if ((mesg as DeleteOperation).op == "delete") {
394
this.processPersistentDelete(mesg as DeleteOperation, opts);
395
} else {
396
// set is the default
397
this.processPersistentSet(mesg as SetOperation, opts);
398
}
399
};
400
401
private processPersistentDelete = (
402
{ seqs }: DeleteOperation,
403
{ noEmit }: { noEmit?: boolean },
404
) => {
405
if (this.raw == null) return;
406
const X = new Set<number>(seqs);
407
// seqs is a list of integers. We remove
408
// every entry from this.raw, this.messages, and this.kv
409
// where this.raw.seq is in X by mutating raw/messages/kv,
410
// not by making new objects (since external references).
411
// This is a rare operation so we're not worried too much
412
// about performance.
413
const keys: { [seq: number]: string } = {};
414
for (const key in this.kv) {
415
const seq = this.kv[key]?.raw?.seq;
416
if (X.has(seq)) {
417
delete this.kv[key];
418
keys[key] = seq;
419
}
420
}
421
const indexes: number[] = [];
422
for (let i = 0; i < this.raw.length; i++) {
423
const seq = this.raw[i].seq;
424
if (X.has(seq)) {
425
indexes.push(i);
426
if (!noEmit) {
427
this.emitChange({
428
mesg: undefined,
429
raw: { seq },
430
key: keys[seq],
431
prev: this.messages[i],
432
});
433
}
434
}
435
}
436
437
// remove this.raw[i] and this.messages[i] for all i in indexes,
438
// with special case to be fast in the very common case of contiguous indexes.
439
if (indexes.length > 1 && indexes.every((v, i) => v === indexes[0] + i)) {
440
// Contiguous: bulk remove for performance
441
const start = indexes[0];
442
const deleteCount = indexes.length;
443
this.raw.splice(start, deleteCount);
444
this.messages.splice(start, deleteCount);
445
} else {
446
// Non-contiguous: fallback to individual reverse splices
447
for (let i = indexes.length - 1; i >= 0; i--) {
448
const idx = indexes[i];
449
this.raw.splice(idx, 1);
450
this.messages.splice(idx, 1);
451
}
452
}
453
};
454
455
private processPersistentSetLargestSeq: number = 0;
456
private missingMessages = new Set<number>();
457
private processPersistentSet = (
458
{ seq, time, key, encoding, raw: data, headers, msgID }: SetOperation,
459
{
460
noEmit,
461
noSeqCheck,
462
}: {
463
noEmit?: boolean;
464
noSeqCheck?: boolean;
465
},
466
) => {
467
if (this.raw == null) return;
468
if (!noSeqCheck) {
469
const expected = this.processPersistentSetLargestSeq + 1;
470
if (seq > expected) {
471
log(
472
"processPersistentSet -- detected missed seq number",
473
{ seq, expected: this.processPersistentSetLargestSeq + 1 },
474
this.storage,
475
);
476
// We record that some are missing.
477
for (let s = expected; s <= seq - 1; s++) {
478
this.missingMessages.add(s);
479
this.getAllMissingMessages();
480
}
481
}
482
}
483
484
if (seq > this.processPersistentSetLargestSeq) {
485
this.processPersistentSetLargestSeq = seq;
486
}
487
488
const mesg = decode({ encoding, data });
489
const raw = {
490
timestamp: time,
491
headers,
492
seq,
493
raw: data,
494
key,
495
} as RawMsg;
496
if (seq > (this.raw.slice(-1)[0]?.seq ?? 0)) {
497
// easy fast initial load to the end of the list (common special case)
498
this.messages.push(mesg);
499
this.raw.push(raw);
500
} else {
501
// Insert in the correct place. This should only
502
// happen when calling load of old data, which happens, e.g., during
503
// automatic failover. The algorithm below is
504
// dumb and could be replaced by a binary search. However, we'll
505
// change how we batch load so there's less point.
506
let i = 0;
507
while (i < this.raw.length && this.raw[i].seq < seq) {
508
i += 1;
509
}
510
// after the above loop, either:
511
// - this.raw[i] is undefined because i = this.raw.length and every known entry was less than seq,
512
// so we just append it, or
513
// - this.raw[i] is defined and this.raw[i].seq >= seq. If they are equal, do nothing, since we already
514
// have it. If not equal, then splice it in.
515
if (i >= this.raw.length) {
516
this.raw.push(raw);
517
this.messages.push(mesg);
518
} else if (this.raw[i].seq > seq) {
519
this.raw.splice(i, 0, raw);
520
this.messages.splice(i, 0, mesg);
521
} // other case -- we already have it.
522
}
523
let prev: T | undefined = undefined;
524
if (typeof key == "string") {
525
prev = this.kv[key]?.mesg;
526
if (raw.headers?.[COCALC_TOMBSTONE_HEADER]) {
527
delete this.kv[key];
528
} else {
529
if (this.kv[key] !== undefined) {
530
const { raw } = this.kv[key];
531
this.kvChangeBytes += raw.raw.length;
532
}
533
534
this.kv[key] = { raw, mesg };
535
536
if (this.kvChangeBytes >= KEY_GC_THRESH) {
537
this.gcKv();
538
}
539
}
540
}
541
this.lastSeq = Math.max(this.lastSeq, seq);
542
if (!noEmit) {
543
this.emitChange({ mesg, raw, key, prev, msgID });
544
}
545
};
546
547
private emitChange = (e: ChangeEvent<T>) => {
548
if (this.raw == null) return;
549
this.emit("change", e);
550
};
551
552
private listen = async () => {
553
// log("core-stream: listen", this.storage);
554
await until(
555
async () => {
556
if (this.isClosed()) {
557
return true;
558
}
559
try {
560
if (this.changefeed == null) {
561
this.changefeed = await this.persistClient.changefeed();
562
if (this.isClosed()) {
563
return true;
564
}
565
}
566
567
for await (const updates of this.changefeed) {
568
this.processPersistentMessages(updates, {
569
noEmit: false,
570
noSeqCheck: false,
571
});
572
if (this.isClosed()) {
573
return true;
574
}
575
}
576
} catch (err) {
577
// There should never be a case where the changefeed throws
578
// an error or ends without this whole streaming being closed.
579
// If that happens its an unexpected bug. Instead of failing,
580
// we log this, loop around, and make a new changefeed.
581
// This normally doesn't happen but could if a persist server is being restarted
582
// frequently or things are seriously broken. We cause this in
583
// backend/conat/test/core/core-stream-break.test.ts
584
if (!process.env.COCALC_TEST_MODE) {
585
log(
586
`WARNING: core-stream changefeed error -- ${err}`,
587
this.storage,
588
);
589
}
590
}
591
592
delete this.changefeed;
593
594
// above loop exits when the persistent server
595
// stops sending messages for some reason. In that
596
// case we reconnect, picking up where we left off.
597
598
if (this.client == null) {
599
return true;
600
}
601
602
await this.getAllFromPersist({
603
start_seq: this.lastSeq + 1,
604
noEmit: false,
605
});
606
return false;
607
},
608
{ start: 500, max: 7500, decay: 1.2 },
609
);
610
};
611
612
publish = async (
613
mesg: T,
614
options?: PublishOptions,
615
): Promise<{ seq: number; time: number } | undefined> => {
616
if (mesg === undefined) {
617
if (options?.key !== undefined) {
618
// undefined can't be JSON encoded, so we can't possibly represent it, and this
619
// *must* be treated as a delete.
620
this.deleteKv(options?.key, { previousSeq: options?.previousSeq });
621
return;
622
} else {
623
throw Error("stream non-kv publish - mesg must not be 'undefined'");
624
}
625
}
626
627
if (options?.msgID && this.msgIDs.has(options.msgID)) {
628
// it's a dup
629
return;
630
}
631
const md = messageData(mesg, { headers: options?.headers });
632
const x = await this.persistClient.set({
633
key: options?.key,
634
messageData: md,
635
previousSeq: options?.previousSeq,
636
msgID: options?.msgID,
637
ttl: options?.ttl,
638
timeout: options?.timeout,
639
});
640
if (options?.msgID) {
641
this.msgIDs?.add(options.msgID);
642
}
643
return x;
644
};
645
646
publishMany = async (
647
messages: { mesg: T; options?: PublishOptions }[],
648
): Promise<
649
({ seq: number; time: number } | { error: string; code?: any })[]
650
> => {
651
let result: (
652
| { seq: number; time: number }
653
| { error: string; code?: any }
654
)[] = [];
655
656
for (let i = 0; i < messages.length; i += PUBLISH_MANY_BATCH_SIZE) {
657
const batch = messages.slice(i, i + PUBLISH_MANY_BATCH_SIZE);
658
result = result.concat(await this.publishMany0(batch));
659
}
660
661
return result;
662
};
663
664
private publishMany0 = async (
665
messages: { mesg: T; options?: PublishOptions }[],
666
): Promise<
667
({ seq: number; time: number } | { error: string; code?: any })[]
668
> => {
669
const v: SetOptions[] = [];
670
let timeout: number | undefined = undefined;
671
for (const { mesg, options } of messages) {
672
if (options?.timeout) {
673
if (timeout === undefined) {
674
timeout = options.timeout;
675
} else {
676
timeout = Math.min(timeout, options.timeout);
677
}
678
}
679
const md = messageData(mesg, { headers: options?.headers });
680
v.push({
681
key: options?.key,
682
messageData: md,
683
previousSeq: options?.previousSeq,
684
msgID: options?.msgID,
685
ttl: options?.ttl,
686
});
687
}
688
return await this.persistClient.setMany(v, { timeout });
689
};
690
691
get = (n?): T | T[] => {
692
if (n == null) {
693
return this.getAll();
694
} else {
695
return this.messages[n];
696
}
697
};
698
699
seq = (n: number): number | undefined => {
700
return this.raw[n]?.seq;
701
};
702
703
getAll = (): T[] => {
704
return [...this.messages];
705
};
706
707
get length(): number {
708
return this.messages.length;
709
}
710
711
get start_seq(): number | undefined {
712
return this._start_seq;
713
}
714
715
headers = (n: number): { [key: string]: any } | undefined => {
716
return this.raw[n]?.headers;
717
};
718
719
// key:value interface for subset of messages pushed with key option set.
720
// NOTE: This does NOT throw an error if our local seq is out of date (leave that
721
// to dkv built on this).
722
setKv = async (
723
key: string,
724
mesg: T,
725
options?: {
726
headers?: Headers;
727
previousSeq?: number;
728
},
729
): Promise<{ seq: number; time: number } | undefined> => {
730
return await this.publish(mesg, { ...options, key });
731
};
732
733
setKvMany = async (
734
x: {
735
key: string;
736
mesg: T;
737
options?: {
738
headers?: Headers;
739
previousSeq?: number;
740
};
741
}[],
742
): Promise<
743
({ seq: number; time: number } | { error: string; code?: any })[]
744
> => {
745
const messages: { mesg: T; options?: PublishOptions }[] = [];
746
for (const { key, mesg, options } of x) {
747
messages.push({ mesg, options: { ...options, key } });
748
}
749
return await this.publishMany(messages);
750
};
751
752
deleteKv = async (
753
key: string,
754
options?: {
755
msgID?: string;
756
previousSeq?: number;
757
},
758
) => {
759
if (this.kv[key] === undefined) {
760
// nothing to do
761
return;
762
}
763
return await this.publish(null as any, {
764
...options,
765
headers: { [COCALC_TOMBSTONE_HEADER]: true },
766
key,
767
ttl: DEFAULT_TOMBSTONE_TTL,
768
});
769
};
770
771
getKv = (key: string): T | undefined => {
772
return this.kv[key]?.mesg;
773
};
774
775
hasKv = (key: string): boolean => {
776
return this.kv?.[key] !== undefined;
777
};
778
779
getAllKv = (): { [key: string]: T } => {
780
const all: { [key: string]: T } = {};
781
for (const key in this.kv) {
782
all[key] = this.kv[key].mesg;
783
}
784
return all;
785
};
786
787
// efficient way to get just the keys -- use this instead of
788
// getAllKv if you just need the keys.
789
keysKv = (): string[] => {
790
return Object.keys(this.kv);
791
};
792
793
seqKv = (key: string): number | undefined => {
794
return this.kv[key]?.raw.seq;
795
};
796
797
timeKv = (key?: string): Date | { [key: string]: Date } | undefined => {
798
if (key === undefined) {
799
const all: { [key: string]: Date } = {};
800
for (const key in this.kv) {
801
all[key] = new Date(this.kv[key].raw.timestamp);
802
}
803
return all;
804
}
805
const r = this.kv[key]?.raw;
806
if (r == null) {
807
return;
808
}
809
return new Date(r.timestamp);
810
};
811
812
headersKv = (key: string): { [key: string]: any } | undefined => {
813
return this.kv[key]?.raw?.headers;
814
};
815
816
get lengthKv(): number {
817
return Object.keys(this.kv).length;
818
}
819
820
// load older messages starting at start_seq up to the oldest message
821
// we currently have. This can throw an exception in case of heavy
822
// load or network issues, but should completely succeed or make
823
// no change.
824
load = async ({
825
start_seq,
826
noEmit,
827
}: {
828
start_seq: number;
829
noEmit?: boolean;
830
}) => {
831
// This is used for loading more TimeTravel history
832
if (this.storage == null) {
833
throw Error("bug");
834
}
835
// this is one before the oldest we have
836
const end_seq = (this.raw[0]?.seq ?? this._start_seq ?? 1) - 1;
837
if (start_seq > end_seq) {
838
// nothing to load
839
return;
840
}
841
// we're moving start_seq back to this point
842
this._start_seq = start_seq;
843
const messages = await this.persistClient.getAll({
844
start_seq,
845
end_seq,
846
});
847
this.processPersistentMessages(messages, { noEmit, noSeqCheck: true });
848
};
849
850
private getAllMissingMessages = reuseInFlight(async () => {
851
await until(
852
async () => {
853
if (this.client == null || this.missingMessages.size == 0) {
854
return true;
855
}
856
try {
857
const missing = Array.from(this.missingMessages);
858
missing.sort();
859
log("core-stream: getMissingSeq", missing, this.storage);
860
const messages = await this.persistClient.getAll({
861
start_seq: missing[0],
862
end_seq: missing[missing.length - 1],
863
});
864
this.processPersistentMessages(messages, {
865
noEmit: false,
866
noSeqCheck: true,
867
});
868
for (const seq of missing) {
869
this.missingMessages.delete(seq);
870
}
871
} catch (err) {
872
log(
873
"core-stream: WARNING -- issue getting missing updates",
874
err,
875
this.storage,
876
);
877
}
878
return false;
879
},
880
{ start: 1000, max: 15000, decay: 1.3 },
881
);
882
});
883
884
// get server assigned time of n-th message in stream
885
time = (n: number): Date | undefined => {
886
const r = this.raw[n];
887
if (r == null) {
888
return;
889
}
890
return new Date(r.timestamp);
891
};
892
893
times = () => {
894
const v: (Date | undefined)[] = [];
895
for (let i = 0; i < this.length; i++) {
896
v.push(this.time(i));
897
}
898
return v;
899
};
900
901
stats = ({
902
start_seq = 1,
903
}: {
904
start_seq?: number;
905
} = {}): { count: number; bytes: number } | undefined => {
906
if (this.raw == null) {
907
return;
908
}
909
let count = 0;
910
let bytes = 0;
911
for (const { raw, seq } of this.raw) {
912
if (seq == null) {
913
continue;
914
}
915
if (seq < start_seq) {
916
continue;
917
}
918
count += 1;
919
bytes += raw.length;
920
}
921
return { count, bytes };
922
};
923
924
// delete all messages up to and including the
925
// one at position index, i.e., this.messages[index]
926
// is deleted.
927
// NOTE: For ephemeral streams, clients will NOT see the result of a delete,
928
// except when they load the stream later. For persistent streams all
929
// **connected** clients will see the delete. THAT said, this is not a "proper"
930
// distributed computing primitive with tombstones, etc. This is primarily
931
// meant for reducing space usage, and shouldn't be relied on for
932
// any other purpose.
933
delete = async ({
934
all,
935
last_index,
936
seq,
937
last_seq,
938
key,
939
seqs,
940
}: {
941
// give exactly ONE parameter -- by default nothing happens with no params
942
// all: delete everything
943
all?: boolean;
944
// last_index: everything up to and including index'd message
945
last_index?: number;
946
// seq: delete message with this sequence number
947
seq?: number;
948
// seqs: delete the messages in this array of sequence numbers
949
seqs?: number[];
950
// last_seq: delete everything up to and including this sequence number
951
last_seq?: number;
952
// key: delete the message with this key
953
key?: string;
954
} = {}): Promise<{ seqs: number[] }> => {
955
let opts;
956
if (all) {
957
opts = { all: true };
958
} else if (last_index != null) {
959
if (last_index >= this.raw.length) {
960
opts = { all: true };
961
} else if (last_index < 0) {
962
return { seqs: [] };
963
} else {
964
const last_seq = this.raw[last_index].seq;
965
if (last_seq === undefined) {
966
throw Error(`BUG: invalid index ${last_index}`);
967
}
968
opts = { last_seq };
969
}
970
} else if (seq != null) {
971
opts = { seq };
972
} else if (seqs != null) {
973
opts = { seqs };
974
} else if (last_seq != null) {
975
opts = { last_seq };
976
} else if (key != null) {
977
const seq = this.kv[key]?.raw?.seq;
978
if (seq === undefined) {
979
return { seqs: [] };
980
}
981
opts = { seq };
982
}
983
return await this.persistClient.delete(opts);
984
};
985
986
// delete messages that are no longer needed since newer values have been written
987
gcKv = () => {
988
this.kvChangeBytes = 0;
989
for (let i = 0; i < this.raw.length; i++) {
990
const key = this.raw[i].key;
991
if (key !== undefined) {
992
if (this.raw[i].raw.length > 0 && this.raw[i] !== this.kv[key].raw) {
993
this.raw[i] = {
994
...this.raw[i],
995
headers: undefined,
996
raw: Buffer.from(""),
997
} as RawMsg;
998
this.messages[i] = undefined as T;
999
}
1000
}
1001
}
1002
};
1003
}
1004
1005
export interface PublishOptions {
1006
// headers for this message
1007
headers?: Headers;
1008
// unique id for this message to dedup so if you send the same
1009
// message more than once with the same id it doesn't get published
1010
// multiple times.
1011
msgID?: string;
1012
// key -- if specified a key field is also stored on the server,
1013
// and any previous messages with the same key are deleted. Also,
1014
// an entry is set in this.kv[key] so that this.getKv(key), etc. work.
1015
key?: string;
1016
// if key is specified and previousSeq is set, the server throws
1017
// an error if the sequence number of the current key is
1018
// not previousSeq. We use this with this.seqKv(key) to
1019
// provide read/change/write semantics and to know when we
1020
// should resovle a merge conflict. This is ignored if
1021
// key is not specified.
1022
previousSeq?: number;
1023
// if set to a number of ms AND the config option allow_msg_ttl
1024
// is set on this persistent stream, then
1025
// this message will be deleted after the given amount of time (in ms).
1026
ttl?: number;
1027
timeout?: number;
1028
}
1029
1030
export const cache = refCache<CoreStreamOptions, CoreStream>({
1031
name: "core-stream",
1032
createObject: async (options: CoreStreamOptions) => {
1033
if (options.client == null) {
1034
options = { ...options, client: await conat() };
1035
}
1036
const cstream = new CoreStream(options);
1037
await cstream.init();
1038
return cstream;
1039
},
1040
createKey: ({ client, ...options }) => {
1041
return jsonStableStringify({ id: client?.id, ...options })!;
1042
},
1043
});
1044
1045
export async function cstream<T>(
1046
options: CoreStreamOptions,
1047
): Promise<CoreStream<T>> {
1048
return await cache(options);
1049
}
1050
1051