Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/persist/storage.ts
1710 views
1
/*
2
Persistent storage of a specific stream or kv store.
3
4
You can set a message by providing optionally a key, buffer and/or json value.
5
A sequence number and time (in ms since epoch) is assigned and returned.
6
If the key is provided, it is an arbitrary string and all older messages
7
with that same key are deleted. You can efficiently retrieve a message
8
by its key. The message content itself is given by the buffer and/or json
9
value. The buffer is like the "payload" in NATS, and the json is like
10
the headers in NATS.
11
12
This module is:
13
14
- efficient -- buffer is automatically compressed using zstandard
15
- synchronous -- fast enough to meet our requirements even with blocking
16
- memory efficient -- nothing in memory beyond whatever key you request
17
18
We care about memory efficiency here since it's likely we'll want to have
19
possibly thousands of these in a single nodejs process at once, but with
20
less than 1 read/write per second for each. Thus memory is critical, and
21
supporting at least 1000 writes/second is what we need.
22
Fortunately, this implementation can do ~50,000+ writes per second and read
23
over 500,000 per second. Yes, it blocks the main thread, but by using
24
better-sqlite3 and zstd-napi, we get 10x speed increases over async code,
25
so this is worth it.
26
27
28
COMPRESSION:
29
30
I implemented *sync* lz4-napi compression here and it's very fast,
31
but it has to be run with async waits in a loop or it doesn't give back
32
memory, and such throttling may significantly negatively impact performance
33
and mean we don't get a 100% sync api (like we have now).
34
The async functions in lz4-napi seem fine. Upstream report (by me):
35
https://github.com/antoniomuso/lz4-napi/issues/678
36
I also tried the rust sync snappy and it had a similar memory leak. Finally,
37
I tried zstd-napi and it has a very fast sync implementation that does *not*
38
need async pauses to not leak memory. So zstd-napi it is.
39
And I like zstandard anyways.
40
41
TIERED STORAGE:
42
43
You can provide a second path archive for the sqlite file. If provided, on creation,
44
this will stat both the main path and the archive path. If the archive path is
45
newer, then the file is first copied from the archive path to the normal path,
46
then opened. Also, if the archive path is provided, then a backup of the database
47
is made to the archive path periodically. We use this for tiered storage in
48
CoCalc as follows. The archive path is on a Google Cloud Storage autoclass bucket
49
that is mounted using gcsfuse. The normal primary path is on a small fast SSD
50
persistent disk, which we view as a cache.
51
52
NOTE:
53
54
We use seconds instead of ms in sqlite since that is the standard
55
convention for times in sqlite.
56
57
DEVELOPMENT:
58
59
60
s = require('@cocalc/backend/conat/persist').pstream({path:'/tmp/a.db'})
61
62
*/
63
64
import { refCacheSync } from "@cocalc/util/refcache";
65
import {
66
createDatabase,
67
type Database,
68
compress,
69
decompress,
70
statSync,
71
copyFileSync,
72
} from "./context";
73
import type { JSONValue } from "@cocalc/util/types";
74
import { EventEmitter } from "events";
75
import {
76
DataEncoding,
77
type Headers,
78
ConatError,
79
} from "@cocalc/conat/core/client";
80
import TTL from "@isaacs/ttlcache";
81
import { getLogger } from "@cocalc/conat/client";
82
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
83
import { throttle } from "lodash";
84
85
const logger = getLogger("persist:storage");
86
87
export interface PartialInventory {
88
// how much space is used by this stream
89
bytes: number;
90
limits: Partial<Configuration>;
91
// number of messages
92
count: number;
93
// for streams, the seq number up to which this data is valid, i.e.,
94
// this data is for all elements of the stream with sequence
95
// number <= seq.
96
seq: number;
97
}
98
99
export interface Configuration {
100
// How many messages may be in a Stream, oldest messages will be removed
101
// if the Stream exceeds this size. -1 for unlimited.
102
max_msgs: number;
103
104
// Maximum age of any message in the stream,
105
// expressed in milliseconds. 0 for unlimited.
106
// **Note that max_age is in milliseconds.**
107
max_age: number;
108
109
// How big the Stream may be. When the stream size
110
// exceeds this, old messages are removed. -1 for unlimited.
111
// The size of a message is the sum of the raw uncompressed blob
112
// size, the headers json and the key length.
113
max_bytes: number;
114
115
// The largest message that will be accepted. -1 for unlimited.
116
max_msg_size: number;
117
118
// Attempting to publish a message that causes either of the following
119
// two rate limits to be exceeded throws an exception.
120
// For dstream, the messages are explicitly rejected and the client
121
// gets a "reject" event emitted. E.g., the terminal running in the project
122
// writes [...] when it gets these rejects, indicating that data was dropped.
123
// -1 for unlimited
124
max_bytes_per_second: number;
125
126
// -1 for unlimited
127
max_msgs_per_second: number;
128
129
// old = delete old messages to make room for nw
130
// new = refuse writes if they exceed the limits
131
discard_policy: "old" | "new";
132
133
// If true (default: false), messages will be automatically deleted after their ttl
134
// Use the option {ttl:number of MILLISECONDS} when publishing to set a ttl.
135
allow_msg_ttl: boolean;
136
137
// description of this table
138
desc: JSONValue;
139
}
140
141
const CONFIGURATION = {
142
max_msgs: { def: -1, fromDb: parseInt, toDb: (x) => `${parseInt(x)}` },
143
max_age: { def: 0, fromDb: parseInt, toDb: (x) => `${parseInt(x)}` },
144
max_bytes: { def: -1, fromDb: parseInt, toDb: (x) => `${parseInt(x)}` },
145
max_msg_size: { def: -1, fromDb: parseInt, toDb: (x) => `${parseInt(x)}` },
146
max_bytes_per_second: {
147
def: -1,
148
fromDb: parseInt,
149
toDb: (x) => `${parseInt(x)}`,
150
},
151
max_msgs_per_second: {
152
def: -1,
153
fromDb: parseInt,
154
toDb: (x) => `${parseInt(x)}`,
155
},
156
discard_policy: {
157
def: "old",
158
fromDb: (x) => `${x}`,
159
toDb: (x) => (x == "new" ? "new" : "old"),
160
},
161
allow_msg_ttl: {
162
def: false,
163
fromDb: (x) => x == "true",
164
toDb: (x) => `${!!x}`,
165
},
166
desc: {
167
def: null,
168
fromDb: JSON.parse,
169
toDb: JSON.stringify,
170
},
171
};
172
173
export const EPHEMERAL_MAX_BYTES = 64 * 1e6;
174
175
enum CompressionAlgorithm {
176
None = 0,
177
Zstd = 1,
178
}
179
180
interface Compression {
181
// compression algorithm to use
182
algorithm: CompressionAlgorithm;
183
// only compress data above this size
184
threshold: number;
185
}
186
187
const DEFAULT_COMPRESSION = {
188
algorithm: CompressionAlgorithm.Zstd,
189
threshold: 1024,
190
};
191
192
export interface StoredMessage {
193
// server assigned positive increasing integer number
194
seq: number;
195
// server assigned time in ms since epoch
196
time: number;
197
// user assigned key -- when set all previous messages with that key are deleted.
198
key?: string;
199
// the encoding used to encode the raw data
200
encoding: DataEncoding;
201
// arbitrary binary data
202
raw: Buffer;
203
// arbitrary JSON-able object -- analogue of NATS headers, but anything JSON-able
204
headers?: Headers;
205
}
206
207
export interface SetOperation extends StoredMessage {
208
op?: undefined;
209
msgID?: string;
210
}
211
212
export interface DeleteOperation {
213
op: "delete";
214
// sequence numbers of deleted messages
215
seqs: number[];
216
}
217
218
export const DEFAULT_ARCHIVE_INTERVAL = 30_000; // 30 seconds
219
220
export interface StorageOptions {
221
// absolute path to sqlite database file. This needs to be a valid filename
222
// path, and must also be kept under 1000 characters in length so it can be
223
// stored in cloud storage.
224
path: string;
225
// another absolute pat. If this is given, then (1)
226
// it will be copied to path before opening path if it is newer, and (2) a
227
// backup will be saved to archive (using sqlite's backup feature) every
228
// archiveInteral ms. NOTE: we actually append ".db" to path and to archive.
229
archive?: string;
230
// the archive interval, if archive is given. defaults to DEFAULT_ARCHIVE_INTERVAL
231
// Depending on your setup, this is likely your tolerance for data loss in the worst case scenario, e.g.,
232
// "loss of the last 30 seconds of TimeTravel edit history".
233
archiveInterval?: number;
234
// another path which will be written to when the database is closed,
235
// but not otherwise. NOTE: '.db' is appended to name.
236
// this backup is *NOT* used in any way except as a backup; in particular,
237
// it won't be used even if archive and path were both gone.
238
backup?: string;
239
240
// if false (the default) do not require sync writes to disk on every set
241
sync?: boolean;
242
// if set, then data is never saved to disk at all. To avoid using a lot of server
243
// RAM there is always a hard cap of at most EPHEMERAL_MAX_BYTES on any ephemeral
244
// table, which is enforced on all writes. Clients should always set max_bytes,
245
// possibly as low as they can, and check by reading back what is set.
246
ephemeral?: boolean;
247
// compression configuration
248
compression?: Compression;
249
}
250
251
// persistence for stream of messages with subject
252
export class PersistentStream extends EventEmitter {
253
private readonly options: StorageOptions;
254
private readonly db: Database;
255
private readonly msgIDs = new TTL({ ttl: 2 * 60 * 1000 });
256
private conf: Configuration;
257
private throttledBackup?;
258
259
constructor(options: StorageOptions) {
260
super();
261
openPaths.add(options.path);
262
logger.debug("constructor ", options.path);
263
this.setMaxListeners(1000);
264
options = { compression: DEFAULT_COMPRESSION, ...options };
265
this.options = options;
266
const location = this.options.ephemeral
267
? ":memory:"
268
: this.options.path + ".db";
269
this.initArchive();
270
this.db = createDatabase(location);
271
this.initSchema();
272
}
273
274
private initArchive = () => {
275
if (!this.options.archive) {
276
this.throttledBackup = () => {};
277
return;
278
}
279
this.throttledBackup = throttle(
280
this.backup,
281
this.options.archiveInterval ?? DEFAULT_ARCHIVE_INTERVAL,
282
);
283
284
const archive = this.options.archive + ".db";
285
const archiveAge = age(archive);
286
287
const path = this.options.path + ".db";
288
const pathAge = age(path);
289
290
if (archiveAge > pathAge) {
291
copyFileSync(archive, path);
292
}
293
};
294
295
private initSchema = () => {
296
if (!this.options.sync && !this.options.ephemeral) {
297
// Unless sync is set, we do not require that the filesystem has commited changes
298
// to disk after every insert. This can easily make things 10x faster. sets are
299
// typically going to come in one-by-one as users edit files, so this works well
300
// for our application. Also, loss of a few seconds persistence is acceptable
301
// in a lot of applications, e.g., if it is just edit history for a file.
302
this.db.prepare("PRAGMA synchronous=OFF").run();
303
}
304
// time is in *seconds* since the epoch, since that is standard for sqlite.
305
// ttl is in milliseconds.
306
this.db
307
.prepare(
308
`CREATE TABLE IF NOT EXISTS messages (
309
seq INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT UNIQUE, time INTEGER NOT NULL, headers TEXT, compress NUMBER NOT NULL, encoding NUMBER NOT NULL, raw BLOB NOT NULL, size NUMBER NOT NULL, ttl NUMBER
310
)
311
`,
312
)
313
.run();
314
this.db
315
.prepare(
316
`
317
CREATE TABLE IF NOT EXISTS config (
318
field TEXT PRIMARY KEY, value TEXT NOT NULL
319
)`,
320
)
321
.run();
322
this.db
323
.prepare("CREATE INDEX IF NOT EXISTS idx_messages_key ON messages(key)")
324
.run();
325
this.db
326
.prepare("CREATE INDEX IF NOT EXISTS idx_messages_time ON messages(time)")
327
.run();
328
329
this.conf = this.config();
330
};
331
332
close = async () => {
333
const path = this.options?.path;
334
if (path == null) {
335
return;
336
}
337
logger.debug("close ", path);
338
if (this.db != null) {
339
this.vacuum();
340
this.db.prepare("PRAGMA wal_checkpoint(FULL)").run();
341
await this.backup();
342
if (this.options.backup) {
343
await this.backup(this.options.backup);
344
}
345
this.db.close();
346
}
347
// @ts-ignore
348
delete this.options;
349
this.msgIDs?.clear();
350
// @ts-ignore
351
delete this.msgIDs;
352
openPaths.delete(path);
353
};
354
355
private backup = reuseInFlight(async (path?: string): Promise<void> => {
356
if (this.options == null) {
357
// can happen due to this.throttledBackup.
358
return;
359
}
360
// reuseInFlight since probably doing a backup on top
361
// of itself would corrupt data.
362
if (path === undefined && !this.options.archive) {
363
return;
364
}
365
path = (path ?? this.options.archive) + ".db";
366
//console.log("backup", { path });
367
try {
368
await this.db.backup(path);
369
} catch (err) {
370
if (!process.env.COCALC_TEST_MODE) {
371
console.log(err);
372
}
373
logger.debug("WARNING: error creating a backup", path, err);
374
}
375
});
376
377
private compress = (
378
raw: Buffer,
379
): { raw: Buffer; compress: CompressionAlgorithm } => {
380
if (
381
this.options.compression!.algorithm == CompressionAlgorithm.None ||
382
raw.length <= this.options.compression!.threshold
383
) {
384
return { raw, compress: CompressionAlgorithm.None };
385
}
386
if (this.options.compression!.algorithm == CompressionAlgorithm.Zstd) {
387
return { raw: compress(raw), compress: CompressionAlgorithm.Zstd };
388
}
389
throw Error(
390
`unknown compression algorithm: ${this.options.compression!.algorithm}`,
391
);
392
};
393
394
set = ({
395
encoding,
396
raw,
397
headers,
398
key,
399
ttl,
400
previousSeq,
401
msgID,
402
}: {
403
encoding: DataEncoding;
404
raw: Buffer;
405
headers?: JSONValue;
406
key?: string;
407
ttl?: number;
408
previousSeq?: number;
409
// if given, any attempt to publish something again with the same msgID
410
// is deduplicated. Use this to prevent accidentally writing twice, e.g.,
411
// due to not getting a response back from the server.
412
msgID?: string;
413
}): { seq: number; time: number } => {
414
if (previousSeq === null) {
415
previousSeq = undefined;
416
}
417
if (key === null) {
418
key = undefined;
419
}
420
if (msgID != null && this.msgIDs?.has(msgID)) {
421
return this.msgIDs.get(msgID)!;
422
}
423
if (key !== undefined && previousSeq !== undefined) {
424
// throw error if current seq number for the row
425
// with this key is not previousSeq.
426
const { seq } = this.db // there is an index on the key so this is fast
427
.prepare("SELECT seq FROM messages WHERE key=?")
428
.get(key) as any;
429
if (seq != previousSeq) {
430
throw new ConatError("wrong last sequence", {
431
code: "wrong-last-sequence",
432
});
433
}
434
}
435
const time = Date.now();
436
const compressedRaw = this.compress(raw);
437
const serializedHeaders = JSON.stringify(headers);
438
const size =
439
(serializedHeaders?.length ?? 0) +
440
(raw?.length ?? 0) +
441
(key?.length ?? 0);
442
443
this.enforceLimits(size);
444
445
const tx = this.db.transaction(
446
(time, compress, encoding, raw, headers, key, size, ttl) => {
447
if (key !== undefined) {
448
// insert with key -- delete all previous messages, as they will
449
// never be needed again and waste space.
450
this.db.prepare("DELETE FROM messages WHERE key = ?").run(key);
451
}
452
return this.db
453
.prepare(
454
"INSERT INTO messages(time, compress, encoding, raw, headers, key, size, ttl) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING seq",
455
)
456
.get(time / 1000, compress, encoding, raw, headers, key, size, ttl);
457
},
458
);
459
const row = tx(
460
time,
461
compressedRaw.compress,
462
encoding,
463
compressedRaw.raw,
464
serializedHeaders,
465
key,
466
size,
467
ttl,
468
);
469
const seq = Number((row as any).seq);
470
// lastInsertRowid - is a bigint from sqlite, but we won't hit that limit
471
this.emit("change", {
472
seq,
473
time,
474
key,
475
encoding,
476
raw,
477
headers,
478
msgID,
479
});
480
this.throttledBackup();
481
if (msgID !== undefined) {
482
this.msgIDs.set(msgID, { time, seq });
483
}
484
return { time, seq };
485
};
486
487
get = ({
488
seq,
489
key,
490
}: { seq: number; key: undefined } | { seq: undefined; key: string }):
491
| StoredMessage
492
| undefined => {
493
let x;
494
if (seq) {
495
x = this.db
496
.prepare(
497
"SELECT seq, key, time, compress, encoding, raw, headers FROM messages WHERE seq=?",
498
)
499
.get(seq);
500
} else if (key != null) {
501
// NOTE: we guarantee when doing set above that there is at most one
502
// row with a given key. Also there's a unique constraint.
503
x = this.db
504
.prepare(
505
"SELECT seq, key, time, compress, encoding, raw, headers FROM messages WHERE key=?",
506
)
507
.get(key);
508
} else {
509
x = undefined;
510
}
511
return dbToMessage(x as any);
512
};
513
514
*getAll({
515
start_seq,
516
end_seq,
517
}: {
518
end_seq?: number;
519
start_seq?: number;
520
} = {}): IterableIterator<StoredMessage> {
521
let query: string, stmt;
522
523
const where: string[] = [];
524
const v: number[] = [];
525
if (start_seq != null) {
526
where.push("seq>=?");
527
v.push(start_seq);
528
}
529
if (end_seq != null) {
530
where.push("seq<=?");
531
v.push(end_seq);
532
}
533
query = `SELECT seq, key, time, compress, encoding, raw, headers FROM messages ${where.length == 0 ? "" : " where " + where.join(" AND ")} ORDER BY seq`;
534
stmt = this.db.prepare(query);
535
for (const row of stmt.iterate(...v)) {
536
yield dbToMessage(row)!;
537
}
538
}
539
540
delete = ({
541
seq,
542
seqs: seqs0,
543
last_seq,
544
all,
545
}: {
546
seq?: number;
547
seqs?: number[];
548
last_seq?: number;
549
all?: boolean;
550
}): { seqs: number[] } => {
551
let seqs: number[] = [];
552
if (all) {
553
seqs = this.db
554
.prepare("SELECT seq FROM messages")
555
.all()
556
.map((row: any) => row.seq);
557
this.db.prepare("DELETE FROM messages").run();
558
this.vacuum();
559
} else if (last_seq) {
560
seqs = this.db
561
.prepare("SELECT seq FROM messages WHERE seq<=?")
562
.all(last_seq)
563
.map((row: any) => row.seq);
564
this.db.prepare("DELETE FROM messages WHERE seq<=?").run(last_seq);
565
this.vacuum();
566
} else if (seq) {
567
seqs = this.db
568
.prepare("SELECT seq FROM messages WHERE seq=?")
569
.all(seq)
570
.map((row: any) => row.seq);
571
this.db.prepare("DELETE FROM messages WHERE seq=?").run(seq);
572
} else if (seqs0) {
573
const statement = this.db.prepare("DELETE FROM messages WHERE seq=?");
574
const transaction = this.db.transaction((seqs) => {
575
for (const s of seqs) {
576
statement.run(s);
577
}
578
});
579
transaction(seqs0);
580
seqs = seqs0;
581
}
582
this.emit("change", { op: "delete", seqs });
583
this.throttledBackup();
584
return { seqs };
585
};
586
587
vacuum = () => {
588
try {
589
this.db.prepare("VACUUM").run();
590
} catch {}
591
};
592
593
get length(): number {
594
const { length } = this.db
595
.prepare("SELECT COUNT(*) AS length FROM messages")
596
.get() as { length: number };
597
return length;
598
}
599
600
totalSize = (): number => {
601
return (
602
(this.db.prepare(`SELECT SUM(size) AS sum FROM messages`).get() as any)
603
.sum ?? 0
604
);
605
};
606
607
seq = (): number => {
608
return (
609
(this.db.prepare(`SELECT MAX(seq) AS seq FROM messages`).get() as any)
610
.seq ?? 0
611
);
612
};
613
614
inventory = (): PartialInventory => {
615
return {
616
bytes: this.totalSize(),
617
count: this.length,
618
limits: this.getConfig(),
619
seq: this.seq(),
620
};
621
};
622
623
keys = (): string[] => {
624
const v = this.db
625
.prepare("SELECT key FROM messages WHERE key IS NOT NULL")
626
.all() as {
627
key: string;
628
}[];
629
return v.map(({ key }) => key);
630
};
631
632
sqlite = (statement: string, params: any[] = []): any[] => {
633
// Matches "attach database" (case-insensitive, ignores whitespace)
634
if (/\battach\s+database\b/i.test(statement)) {
635
throw Error("ATTACH DATABASE not allowed");
636
}
637
const stmt = this.db.prepare(statement);
638
try {
639
return stmt.all(...params);
640
} catch (err) {
641
if (err.message.includes("run() instead")) {
642
stmt.run(...params);
643
return [];
644
} else {
645
throw err;
646
}
647
}
648
};
649
650
// only returns fields that are not set to their default value,
651
// and doesn't enforce any limits
652
getConfig = (): Partial<Configuration> => {
653
const cur: any = {};
654
for (const { field, value } of this.db
655
.prepare("SELECT * FROM config")
656
.all() as any) {
657
const { def, fromDb } = CONFIGURATION[field];
658
cur[field] = fromDb(value);
659
if (cur[field] == def) {
660
delete cur[field];
661
}
662
}
663
return cur;
664
};
665
666
config = (config?: Partial<Configuration>): Configuration => {
667
const cur: any = {};
668
for (const { field, value } of this.db
669
.prepare("SELECT * FROM config")
670
.all() as any) {
671
cur[field] = value;
672
}
673
const full: Partial<Configuration> = {};
674
for (const key in CONFIGURATION) {
675
const { def, fromDb, toDb } = CONFIGURATION[key];
676
full[key] =
677
config?.[key] ?? (cur[key] !== undefined ? fromDb(cur[key]) : def);
678
let x = toDb(full[key]);
679
if (config?.[key] != null && full[key] != (cur[key] ?? def)) {
680
// making a change
681
this.db
682
.prepare(
683
`INSERT INTO config (field, value) VALUES(?, ?) ON CONFLICT(field) DO UPDATE SET value=excluded.value`,
684
)
685
.run(key, x);
686
}
687
full[key] = fromDb(x);
688
if (
689
this.options.ephemeral &&
690
key == "max_bytes" &&
691
(full[key] == null || full[key] <= 0 || full[key] > EPHEMERAL_MAX_BYTES)
692
) {
693
// for ephemeral we always make it so max_bytes is capped
694
// (note -- this isn't explicitly set in the sqlite database, since we might
695
// change it, and by not setting it in the database we can)
696
full[key] = EPHEMERAL_MAX_BYTES;
697
}
698
}
699
this.conf = full as Configuration;
700
// ensure any new limits are enforced
701
this.enforceLimits(0);
702
this.throttledBackup();
703
return full as Configuration;
704
};
705
706
private emitDelete = (rows) => {
707
if (rows.length > 0) {
708
const seqs = rows.map((row: { seq: number }) => row.seq);
709
this.emit("change", { op: "delete", seqs });
710
this.throttledBackup();
711
}
712
};
713
714
// do whatever limit enforcement and throttling is needed when inserting one new message
715
// with the given size; if size=0 assume not actually inserting a new message, and just
716
// enforcingt current limits
717
private enforceLimits = (size: number = 0) => {
718
if (
719
size > 0 &&
720
(this.conf.max_msgs_per_second > 0 || this.conf.max_bytes_per_second > 0)
721
) {
722
const { msgs, bytes } = this.db
723
.prepare(
724
"SELECT COUNT(*) AS msgs, SUM(size) AS bytes FROM messages WHERE time >= ?",
725
)
726
.get(Date.now() / 1000 - 1) as { msgs: number; bytes: number };
727
if (
728
this.conf.max_msgs_per_second > 0 &&
729
msgs > this.conf.max_msgs_per_second
730
) {
731
throw new ConatError("max_msgs_per_second exceeded", {
732
code: "reject",
733
});
734
}
735
if (
736
this.conf.max_bytes_per_second > 0 &&
737
bytes > this.conf.max_bytes_per_second
738
) {
739
throw new ConatError("max_bytes_per_second exceeded", {
740
code: "reject",
741
});
742
}
743
}
744
745
if (this.conf.max_msgs > -1) {
746
const length = this.length + (size > 0 ? 1 : 0);
747
if (length > this.conf.max_msgs) {
748
if (this.conf.discard_policy == "new") {
749
if (size > 0) {
750
throw new ConatError("max_msgs limit reached", { code: "reject" });
751
}
752
} else {
753
// delete earliest messages to make room
754
const rows = this.db
755
.prepare(
756
`DELETE FROM messages WHERE seq IN (SELECT seq FROM messages ORDER BY seq ASC LIMIT ?) RETURNING seq`,
757
)
758
.all(length - this.conf.max_msgs);
759
this.emitDelete(rows);
760
}
761
}
762
}
763
764
if (this.conf.max_age > 0) {
765
const rows = this.db
766
.prepare(
767
`DELETE FROM messages WHERE seq IN (SELECT seq FROM messages WHERE time <= ?) RETURNING seq`,
768
)
769
.all((Date.now() - this.conf.max_age) / 1000);
770
this.emitDelete(rows);
771
}
772
773
if (this.conf.max_bytes > -1) {
774
if (size > this.conf.max_bytes) {
775
if (this.conf.discard_policy == "new") {
776
if (size > 0) {
777
throw new ConatError("max_bytes limit reached", { code: "reject" });
778
}
779
} else {
780
// new message exceeds total, so this is the same as adding in the new message,
781
// then deleting everything.
782
this.delete({ all: true });
783
}
784
} else {
785
// delete all the earliest (in terms of seq number) messages
786
// so that the sum of the remaining
787
// sizes along with the new size is <= max_bytes.
788
// Only enforce if actually inserting, or if current sum is over
789
const totalSize = this.totalSize();
790
const newTotal = totalSize + size;
791
if (newTotal > this.conf.max_bytes) {
792
const bytesToFree = newTotal - this.conf.max_bytes;
793
let freed = 0;
794
let lastSeqToDelete: number | null = null;
795
796
for (const { seq, size: msgSize } of this.db
797
.prepare(`SELECT seq, size FROM messages ORDER BY seq ASC`)
798
.iterate() as any) {
799
if (freed >= bytesToFree) break;
800
freed += msgSize;
801
lastSeqToDelete = seq;
802
}
803
804
if (lastSeqToDelete !== null) {
805
if (this.conf.discard_policy == "new") {
806
if (size > 0) {
807
throw new ConatError("max_bytes limit reached", {
808
code: "reject",
809
});
810
}
811
} else {
812
const rows = this.db
813
.prepare(`DELETE FROM messages WHERE seq <= ? RETURNING seq`)
814
.all(lastSeqToDelete);
815
this.emitDelete(rows);
816
}
817
}
818
}
819
}
820
}
821
822
if (this.conf.allow_msg_ttl) {
823
const rows = this.db
824
.prepare(
825
`DELETE FROM messages WHERE ttl IS NOT null AND time + ttl/1000 < ? RETURNING seq`,
826
)
827
.all(Date.now() / 1000);
828
this.emitDelete(rows);
829
}
830
831
if (this.conf.max_msg_size > -1 && size > this.conf.max_msg_size) {
832
throw new ConatError(
833
`max_msg_size of ${this.conf.max_msg_size} bytes exceeded`,
834
{ code: "reject" },
835
);
836
}
837
};
838
}
839
840
function dbToMessage(
841
x:
842
| {
843
seq: number;
844
key?: string;
845
time: number;
846
compress: CompressionAlgorithm;
847
encoding: DataEncoding;
848
raw: Buffer;
849
headers?: string;
850
}
851
| undefined,
852
): StoredMessage | undefined {
853
if (x === undefined) {
854
return x;
855
}
856
return {
857
seq: x.seq,
858
time: x.time * 1000,
859
key: x.key != null ? x.key : undefined,
860
encoding: x.encoding,
861
raw: handleDecompress(x),
862
headers: x.headers ? JSON.parse(x.headers) : undefined,
863
};
864
}
865
866
function handleDecompress({
867
raw,
868
compress,
869
}: {
870
raw: Buffer;
871
compress: CompressionAlgorithm;
872
}) {
873
if (compress == CompressionAlgorithm.None) {
874
return raw;
875
} else if (compress == CompressionAlgorithm.Zstd) {
876
return decompress(raw);
877
} else {
878
throw Error(`unknown compression ${compress}`);
879
}
880
}
881
882
interface CreateOptions extends StorageOptions {
883
noCache?: boolean;
884
}
885
886
export const openPaths = new Set<string>();
887
888
export const cache = refCacheSync<CreateOptions, PersistentStream>({
889
name: "persistent-storage-stream",
890
createKey: ({ path }: CreateOptions) => path,
891
createObject: (options: CreateOptions) => {
892
return new PersistentStream(options);
893
},
894
});
895
896
export function pstream(
897
options: StorageOptions & { noCache?: boolean },
898
): PersistentStream {
899
return cache(options);
900
}
901
902
function age(path: string) {
903
try {
904
return statSync(path).mtimeMs;
905
} catch {
906
return 0;
907
}
908
}
909
910