Path: blob/master/src/packages/conat/persist/storage.ts
1710 views
/*1Persistent storage of a specific stream or kv store.23You can set a message by providing optionally a key, buffer and/or json value.4A sequence number and time (in ms since epoch) is assigned and returned.5If the key is provided, it is an arbitrary string and all older messages6with that same key are deleted. You can efficiently retrieve a message7by its key. The message content itself is given by the buffer and/or json8value. The buffer is like the "payload" in NATS, and the json is like9the headers in NATS.1011This module is:1213- efficient -- buffer is automatically compressed using zstandard14- synchronous -- fast enough to meet our requirements even with blocking15- memory efficient -- nothing in memory beyond whatever key you request1617We care about memory efficiency here since it's likely we'll want to have18possibly thousands of these in a single nodejs process at once, but with19less than 1 read/write per second for each. Thus memory is critical, and20supporting at least 1000 writes/second is what we need.21Fortunately, this implementation can do ~50,000+ writes per second and read22over 500,000 per second. Yes, it blocks the main thread, but by using23better-sqlite3 and zstd-napi, we get 10x speed increases over async code,24so this is worth it.252627COMPRESSION:2829I implemented *sync* lz4-napi compression here and it's very fast,30but it has to be run with async waits in a loop or it doesn't give back31memory, and such throttling may significantly negatively impact performance32and mean we don't get a 100% sync api (like we have now).33The async functions in lz4-napi seem fine. Upstream report (by me):34https://github.com/antoniomuso/lz4-napi/issues/67835I also tried the rust sync snappy and it had a similar memory leak. Finally,36I tried zstd-napi and it has a very fast sync implementation that does *not*37need async pauses to not leak memory. So zstd-napi it is.38And I like zstandard anyways.3940TIERED STORAGE:4142You can provide a second path archive for the sqlite file. If provided, on creation,43this will stat both the main path and the archive path. If the archive path is44newer, then the file is first copied from the archive path to the normal path,45then opened. Also, if the archive path is provided, then a backup of the database46is made to the archive path periodically. We use this for tiered storage in47CoCalc as follows. The archive path is on a Google Cloud Storage autoclass bucket48that is mounted using gcsfuse. The normal primary path is on a small fast SSD49persistent disk, which we view as a cache.5051NOTE:5253We use seconds instead of ms in sqlite since that is the standard54convention for times in sqlite.5556DEVELOPMENT:575859s = require('@cocalc/backend/conat/persist').pstream({path:'/tmp/a.db'})6061*/6263import { refCacheSync } from "@cocalc/util/refcache";64import {65createDatabase,66type Database,67compress,68decompress,69statSync,70copyFileSync,71} from "./context";72import type { JSONValue } from "@cocalc/util/types";73import { EventEmitter } from "events";74import {75DataEncoding,76type Headers,77ConatError,78} from "@cocalc/conat/core/client";79import TTL from "@isaacs/ttlcache";80import { getLogger } from "@cocalc/conat/client";81import { reuseInFlight } from "@cocalc/util/reuse-in-flight";82import { throttle } from "lodash";8384const logger = getLogger("persist:storage");8586export interface PartialInventory {87// how much space is used by this stream88bytes: number;89limits: Partial<Configuration>;90// number of messages91count: number;92// for streams, the seq number up to which this data is valid, i.e.,93// this data is for all elements of the stream with sequence94// number <= seq.95seq: number;96}9798export interface Configuration {99// How many messages may be in a Stream, oldest messages will be removed100// if the Stream exceeds this size. -1 for unlimited.101max_msgs: number;102103// Maximum age of any message in the stream,104// expressed in milliseconds. 0 for unlimited.105// **Note that max_age is in milliseconds.**106max_age: number;107108// How big the Stream may be. When the stream size109// exceeds this, old messages are removed. -1 for unlimited.110// The size of a message is the sum of the raw uncompressed blob111// size, the headers json and the key length.112max_bytes: number;113114// The largest message that will be accepted. -1 for unlimited.115max_msg_size: number;116117// Attempting to publish a message that causes either of the following118// two rate limits to be exceeded throws an exception.119// For dstream, the messages are explicitly rejected and the client120// gets a "reject" event emitted. E.g., the terminal running in the project121// writes [...] when it gets these rejects, indicating that data was dropped.122// -1 for unlimited123max_bytes_per_second: number;124125// -1 for unlimited126max_msgs_per_second: number;127128// old = delete old messages to make room for nw129// new = refuse writes if they exceed the limits130discard_policy: "old" | "new";131132// If true (default: false), messages will be automatically deleted after their ttl133// Use the option {ttl:number of MILLISECONDS} when publishing to set a ttl.134allow_msg_ttl: boolean;135136// description of this table137desc: JSONValue;138}139140const CONFIGURATION = {141max_msgs: { def: -1, fromDb: parseInt, toDb: (x) => `${parseInt(x)}` },142max_age: { def: 0, fromDb: parseInt, toDb: (x) => `${parseInt(x)}` },143max_bytes: { def: -1, fromDb: parseInt, toDb: (x) => `${parseInt(x)}` },144max_msg_size: { def: -1, fromDb: parseInt, toDb: (x) => `${parseInt(x)}` },145max_bytes_per_second: {146def: -1,147fromDb: parseInt,148toDb: (x) => `${parseInt(x)}`,149},150max_msgs_per_second: {151def: -1,152fromDb: parseInt,153toDb: (x) => `${parseInt(x)}`,154},155discard_policy: {156def: "old",157fromDb: (x) => `${x}`,158toDb: (x) => (x == "new" ? "new" : "old"),159},160allow_msg_ttl: {161def: false,162fromDb: (x) => x == "true",163toDb: (x) => `${!!x}`,164},165desc: {166def: null,167fromDb: JSON.parse,168toDb: JSON.stringify,169},170};171172export const EPHEMERAL_MAX_BYTES = 64 * 1e6;173174enum CompressionAlgorithm {175None = 0,176Zstd = 1,177}178179interface Compression {180// compression algorithm to use181algorithm: CompressionAlgorithm;182// only compress data above this size183threshold: number;184}185186const DEFAULT_COMPRESSION = {187algorithm: CompressionAlgorithm.Zstd,188threshold: 1024,189};190191export interface StoredMessage {192// server assigned positive increasing integer number193seq: number;194// server assigned time in ms since epoch195time: number;196// user assigned key -- when set all previous messages with that key are deleted.197key?: string;198// the encoding used to encode the raw data199encoding: DataEncoding;200// arbitrary binary data201raw: Buffer;202// arbitrary JSON-able object -- analogue of NATS headers, but anything JSON-able203headers?: Headers;204}205206export interface SetOperation extends StoredMessage {207op?: undefined;208msgID?: string;209}210211export interface DeleteOperation {212op: "delete";213// sequence numbers of deleted messages214seqs: number[];215}216217export const DEFAULT_ARCHIVE_INTERVAL = 30_000; // 30 seconds218219export interface StorageOptions {220// absolute path to sqlite database file. This needs to be a valid filename221// path, and must also be kept under 1000 characters in length so it can be222// stored in cloud storage.223path: string;224// another absolute pat. If this is given, then (1)225// it will be copied to path before opening path if it is newer, and (2) a226// backup will be saved to archive (using sqlite's backup feature) every227// archiveInteral ms. NOTE: we actually append ".db" to path and to archive.228archive?: string;229// the archive interval, if archive is given. defaults to DEFAULT_ARCHIVE_INTERVAL230// Depending on your setup, this is likely your tolerance for data loss in the worst case scenario, e.g.,231// "loss of the last 30 seconds of TimeTravel edit history".232archiveInterval?: number;233// another path which will be written to when the database is closed,234// but not otherwise. NOTE: '.db' is appended to name.235// this backup is *NOT* used in any way except as a backup; in particular,236// it won't be used even if archive and path were both gone.237backup?: string;238239// if false (the default) do not require sync writes to disk on every set240sync?: boolean;241// if set, then data is never saved to disk at all. To avoid using a lot of server242// RAM there is always a hard cap of at most EPHEMERAL_MAX_BYTES on any ephemeral243// table, which is enforced on all writes. Clients should always set max_bytes,244// possibly as low as they can, and check by reading back what is set.245ephemeral?: boolean;246// compression configuration247compression?: Compression;248}249250// persistence for stream of messages with subject251export class PersistentStream extends EventEmitter {252private readonly options: StorageOptions;253private readonly db: Database;254private readonly msgIDs = new TTL({ ttl: 2 * 60 * 1000 });255private conf: Configuration;256private throttledBackup?;257258constructor(options: StorageOptions) {259super();260openPaths.add(options.path);261logger.debug("constructor ", options.path);262this.setMaxListeners(1000);263options = { compression: DEFAULT_COMPRESSION, ...options };264this.options = options;265const location = this.options.ephemeral266? ":memory:"267: this.options.path + ".db";268this.initArchive();269this.db = createDatabase(location);270this.initSchema();271}272273private initArchive = () => {274if (!this.options.archive) {275this.throttledBackup = () => {};276return;277}278this.throttledBackup = throttle(279this.backup,280this.options.archiveInterval ?? DEFAULT_ARCHIVE_INTERVAL,281);282283const archive = this.options.archive + ".db";284const archiveAge = age(archive);285286const path = this.options.path + ".db";287const pathAge = age(path);288289if (archiveAge > pathAge) {290copyFileSync(archive, path);291}292};293294private initSchema = () => {295if (!this.options.sync && !this.options.ephemeral) {296// Unless sync is set, we do not require that the filesystem has commited changes297// to disk after every insert. This can easily make things 10x faster. sets are298// typically going to come in one-by-one as users edit files, so this works well299// for our application. Also, loss of a few seconds persistence is acceptable300// in a lot of applications, e.g., if it is just edit history for a file.301this.db.prepare("PRAGMA synchronous=OFF").run();302}303// time is in *seconds* since the epoch, since that is standard for sqlite.304// ttl is in milliseconds.305this.db306.prepare(307`CREATE TABLE IF NOT EXISTS messages (308seq INTEGER PRIMARY KEY AUTOINCREMENT, key TEXT UNIQUE, time INTEGER NOT NULL, headers TEXT, compress NUMBER NOT NULL, encoding NUMBER NOT NULL, raw BLOB NOT NULL, size NUMBER NOT NULL, ttl NUMBER309)310`,311)312.run();313this.db314.prepare(315`316CREATE TABLE IF NOT EXISTS config (317field TEXT PRIMARY KEY, value TEXT NOT NULL318)`,319)320.run();321this.db322.prepare("CREATE INDEX IF NOT EXISTS idx_messages_key ON messages(key)")323.run();324this.db325.prepare("CREATE INDEX IF NOT EXISTS idx_messages_time ON messages(time)")326.run();327328this.conf = this.config();329};330331close = async () => {332const path = this.options?.path;333if (path == null) {334return;335}336logger.debug("close ", path);337if (this.db != null) {338this.vacuum();339this.db.prepare("PRAGMA wal_checkpoint(FULL)").run();340await this.backup();341if (this.options.backup) {342await this.backup(this.options.backup);343}344this.db.close();345}346// @ts-ignore347delete this.options;348this.msgIDs?.clear();349// @ts-ignore350delete this.msgIDs;351openPaths.delete(path);352};353354private backup = reuseInFlight(async (path?: string): Promise<void> => {355if (this.options == null) {356// can happen due to this.throttledBackup.357return;358}359// reuseInFlight since probably doing a backup on top360// of itself would corrupt data.361if (path === undefined && !this.options.archive) {362return;363}364path = (path ?? this.options.archive) + ".db";365//console.log("backup", { path });366try {367await this.db.backup(path);368} catch (err) {369if (!process.env.COCALC_TEST_MODE) {370console.log(err);371}372logger.debug("WARNING: error creating a backup", path, err);373}374});375376private compress = (377raw: Buffer,378): { raw: Buffer; compress: CompressionAlgorithm } => {379if (380this.options.compression!.algorithm == CompressionAlgorithm.None ||381raw.length <= this.options.compression!.threshold382) {383return { raw, compress: CompressionAlgorithm.None };384}385if (this.options.compression!.algorithm == CompressionAlgorithm.Zstd) {386return { raw: compress(raw), compress: CompressionAlgorithm.Zstd };387}388throw Error(389`unknown compression algorithm: ${this.options.compression!.algorithm}`,390);391};392393set = ({394encoding,395raw,396headers,397key,398ttl,399previousSeq,400msgID,401}: {402encoding: DataEncoding;403raw: Buffer;404headers?: JSONValue;405key?: string;406ttl?: number;407previousSeq?: number;408// if given, any attempt to publish something again with the same msgID409// is deduplicated. Use this to prevent accidentally writing twice, e.g.,410// due to not getting a response back from the server.411msgID?: string;412}): { seq: number; time: number } => {413if (previousSeq === null) {414previousSeq = undefined;415}416if (key === null) {417key = undefined;418}419if (msgID != null && this.msgIDs?.has(msgID)) {420return this.msgIDs.get(msgID)!;421}422if (key !== undefined && previousSeq !== undefined) {423// throw error if current seq number for the row424// with this key is not previousSeq.425const { seq } = this.db // there is an index on the key so this is fast426.prepare("SELECT seq FROM messages WHERE key=?")427.get(key) as any;428if (seq != previousSeq) {429throw new ConatError("wrong last sequence", {430code: "wrong-last-sequence",431});432}433}434const time = Date.now();435const compressedRaw = this.compress(raw);436const serializedHeaders = JSON.stringify(headers);437const size =438(serializedHeaders?.length ?? 0) +439(raw?.length ?? 0) +440(key?.length ?? 0);441442this.enforceLimits(size);443444const tx = this.db.transaction(445(time, compress, encoding, raw, headers, key, size, ttl) => {446if (key !== undefined) {447// insert with key -- delete all previous messages, as they will448// never be needed again and waste space.449this.db.prepare("DELETE FROM messages WHERE key = ?").run(key);450}451return this.db452.prepare(453"INSERT INTO messages(time, compress, encoding, raw, headers, key, size, ttl) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING seq",454)455.get(time / 1000, compress, encoding, raw, headers, key, size, ttl);456},457);458const row = tx(459time,460compressedRaw.compress,461encoding,462compressedRaw.raw,463serializedHeaders,464key,465size,466ttl,467);468const seq = Number((row as any).seq);469// lastInsertRowid - is a bigint from sqlite, but we won't hit that limit470this.emit("change", {471seq,472time,473key,474encoding,475raw,476headers,477msgID,478});479this.throttledBackup();480if (msgID !== undefined) {481this.msgIDs.set(msgID, { time, seq });482}483return { time, seq };484};485486get = ({487seq,488key,489}: { seq: number; key: undefined } | { seq: undefined; key: string }):490| StoredMessage491| undefined => {492let x;493if (seq) {494x = this.db495.prepare(496"SELECT seq, key, time, compress, encoding, raw, headers FROM messages WHERE seq=?",497)498.get(seq);499} else if (key != null) {500// NOTE: we guarantee when doing set above that there is at most one501// row with a given key. Also there's a unique constraint.502x = this.db503.prepare(504"SELECT seq, key, time, compress, encoding, raw, headers FROM messages WHERE key=?",505)506.get(key);507} else {508x = undefined;509}510return dbToMessage(x as any);511};512513*getAll({514start_seq,515end_seq,516}: {517end_seq?: number;518start_seq?: number;519} = {}): IterableIterator<StoredMessage> {520let query: string, stmt;521522const where: string[] = [];523const v: number[] = [];524if (start_seq != null) {525where.push("seq>=?");526v.push(start_seq);527}528if (end_seq != null) {529where.push("seq<=?");530v.push(end_seq);531}532query = `SELECT seq, key, time, compress, encoding, raw, headers FROM messages ${where.length == 0 ? "" : " where " + where.join(" AND ")} ORDER BY seq`;533stmt = this.db.prepare(query);534for (const row of stmt.iterate(...v)) {535yield dbToMessage(row)!;536}537}538539delete = ({540seq,541seqs: seqs0,542last_seq,543all,544}: {545seq?: number;546seqs?: number[];547last_seq?: number;548all?: boolean;549}): { seqs: number[] } => {550let seqs: number[] = [];551if (all) {552seqs = this.db553.prepare("SELECT seq FROM messages")554.all()555.map((row: any) => row.seq);556this.db.prepare("DELETE FROM messages").run();557this.vacuum();558} else if (last_seq) {559seqs = this.db560.prepare("SELECT seq FROM messages WHERE seq<=?")561.all(last_seq)562.map((row: any) => row.seq);563this.db.prepare("DELETE FROM messages WHERE seq<=?").run(last_seq);564this.vacuum();565} else if (seq) {566seqs = this.db567.prepare("SELECT seq FROM messages WHERE seq=?")568.all(seq)569.map((row: any) => row.seq);570this.db.prepare("DELETE FROM messages WHERE seq=?").run(seq);571} else if (seqs0) {572const statement = this.db.prepare("DELETE FROM messages WHERE seq=?");573const transaction = this.db.transaction((seqs) => {574for (const s of seqs) {575statement.run(s);576}577});578transaction(seqs0);579seqs = seqs0;580}581this.emit("change", { op: "delete", seqs });582this.throttledBackup();583return { seqs };584};585586vacuum = () => {587try {588this.db.prepare("VACUUM").run();589} catch {}590};591592get length(): number {593const { length } = this.db594.prepare("SELECT COUNT(*) AS length FROM messages")595.get() as { length: number };596return length;597}598599totalSize = (): number => {600return (601(this.db.prepare(`SELECT SUM(size) AS sum FROM messages`).get() as any)602.sum ?? 0603);604};605606seq = (): number => {607return (608(this.db.prepare(`SELECT MAX(seq) AS seq FROM messages`).get() as any)609.seq ?? 0610);611};612613inventory = (): PartialInventory => {614return {615bytes: this.totalSize(),616count: this.length,617limits: this.getConfig(),618seq: this.seq(),619};620};621622keys = (): string[] => {623const v = this.db624.prepare("SELECT key FROM messages WHERE key IS NOT NULL")625.all() as {626key: string;627}[];628return v.map(({ key }) => key);629};630631sqlite = (statement: string, params: any[] = []): any[] => {632// Matches "attach database" (case-insensitive, ignores whitespace)633if (/\battach\s+database\b/i.test(statement)) {634throw Error("ATTACH DATABASE not allowed");635}636const stmt = this.db.prepare(statement);637try {638return stmt.all(...params);639} catch (err) {640if (err.message.includes("run() instead")) {641stmt.run(...params);642return [];643} else {644throw err;645}646}647};648649// only returns fields that are not set to their default value,650// and doesn't enforce any limits651getConfig = (): Partial<Configuration> => {652const cur: any = {};653for (const { field, value } of this.db654.prepare("SELECT * FROM config")655.all() as any) {656const { def, fromDb } = CONFIGURATION[field];657cur[field] = fromDb(value);658if (cur[field] == def) {659delete cur[field];660}661}662return cur;663};664665config = (config?: Partial<Configuration>): Configuration => {666const cur: any = {};667for (const { field, value } of this.db668.prepare("SELECT * FROM config")669.all() as any) {670cur[field] = value;671}672const full: Partial<Configuration> = {};673for (const key in CONFIGURATION) {674const { def, fromDb, toDb } = CONFIGURATION[key];675full[key] =676config?.[key] ?? (cur[key] !== undefined ? fromDb(cur[key]) : def);677let x = toDb(full[key]);678if (config?.[key] != null && full[key] != (cur[key] ?? def)) {679// making a change680this.db681.prepare(682`INSERT INTO config (field, value) VALUES(?, ?) ON CONFLICT(field) DO UPDATE SET value=excluded.value`,683)684.run(key, x);685}686full[key] = fromDb(x);687if (688this.options.ephemeral &&689key == "max_bytes" &&690(full[key] == null || full[key] <= 0 || full[key] > EPHEMERAL_MAX_BYTES)691) {692// for ephemeral we always make it so max_bytes is capped693// (note -- this isn't explicitly set in the sqlite database, since we might694// change it, and by not setting it in the database we can)695full[key] = EPHEMERAL_MAX_BYTES;696}697}698this.conf = full as Configuration;699// ensure any new limits are enforced700this.enforceLimits(0);701this.throttledBackup();702return full as Configuration;703};704705private emitDelete = (rows) => {706if (rows.length > 0) {707const seqs = rows.map((row: { seq: number }) => row.seq);708this.emit("change", { op: "delete", seqs });709this.throttledBackup();710}711};712713// do whatever limit enforcement and throttling is needed when inserting one new message714// with the given size; if size=0 assume not actually inserting a new message, and just715// enforcingt current limits716private enforceLimits = (size: number = 0) => {717if (718size > 0 &&719(this.conf.max_msgs_per_second > 0 || this.conf.max_bytes_per_second > 0)720) {721const { msgs, bytes } = this.db722.prepare(723"SELECT COUNT(*) AS msgs, SUM(size) AS bytes FROM messages WHERE time >= ?",724)725.get(Date.now() / 1000 - 1) as { msgs: number; bytes: number };726if (727this.conf.max_msgs_per_second > 0 &&728msgs > this.conf.max_msgs_per_second729) {730throw new ConatError("max_msgs_per_second exceeded", {731code: "reject",732});733}734if (735this.conf.max_bytes_per_second > 0 &&736bytes > this.conf.max_bytes_per_second737) {738throw new ConatError("max_bytes_per_second exceeded", {739code: "reject",740});741}742}743744if (this.conf.max_msgs > -1) {745const length = this.length + (size > 0 ? 1 : 0);746if (length > this.conf.max_msgs) {747if (this.conf.discard_policy == "new") {748if (size > 0) {749throw new ConatError("max_msgs limit reached", { code: "reject" });750}751} else {752// delete earliest messages to make room753const rows = this.db754.prepare(755`DELETE FROM messages WHERE seq IN (SELECT seq FROM messages ORDER BY seq ASC LIMIT ?) RETURNING seq`,756)757.all(length - this.conf.max_msgs);758this.emitDelete(rows);759}760}761}762763if (this.conf.max_age > 0) {764const rows = this.db765.prepare(766`DELETE FROM messages WHERE seq IN (SELECT seq FROM messages WHERE time <= ?) RETURNING seq`,767)768.all((Date.now() - this.conf.max_age) / 1000);769this.emitDelete(rows);770}771772if (this.conf.max_bytes > -1) {773if (size > this.conf.max_bytes) {774if (this.conf.discard_policy == "new") {775if (size > 0) {776throw new ConatError("max_bytes limit reached", { code: "reject" });777}778} else {779// new message exceeds total, so this is the same as adding in the new message,780// then deleting everything.781this.delete({ all: true });782}783} else {784// delete all the earliest (in terms of seq number) messages785// so that the sum of the remaining786// sizes along with the new size is <= max_bytes.787// Only enforce if actually inserting, or if current sum is over788const totalSize = this.totalSize();789const newTotal = totalSize + size;790if (newTotal > this.conf.max_bytes) {791const bytesToFree = newTotal - this.conf.max_bytes;792let freed = 0;793let lastSeqToDelete: number | null = null;794795for (const { seq, size: msgSize } of this.db796.prepare(`SELECT seq, size FROM messages ORDER BY seq ASC`)797.iterate() as any) {798if (freed >= bytesToFree) break;799freed += msgSize;800lastSeqToDelete = seq;801}802803if (lastSeqToDelete !== null) {804if (this.conf.discard_policy == "new") {805if (size > 0) {806throw new ConatError("max_bytes limit reached", {807code: "reject",808});809}810} else {811const rows = this.db812.prepare(`DELETE FROM messages WHERE seq <= ? RETURNING seq`)813.all(lastSeqToDelete);814this.emitDelete(rows);815}816}817}818}819}820821if (this.conf.allow_msg_ttl) {822const rows = this.db823.prepare(824`DELETE FROM messages WHERE ttl IS NOT null AND time + ttl/1000 < ? RETURNING seq`,825)826.all(Date.now() / 1000);827this.emitDelete(rows);828}829830if (this.conf.max_msg_size > -1 && size > this.conf.max_msg_size) {831throw new ConatError(832`max_msg_size of ${this.conf.max_msg_size} bytes exceeded`,833{ code: "reject" },834);835}836};837}838839function dbToMessage(840x:841| {842seq: number;843key?: string;844time: number;845compress: CompressionAlgorithm;846encoding: DataEncoding;847raw: Buffer;848headers?: string;849}850| undefined,851): StoredMessage | undefined {852if (x === undefined) {853return x;854}855return {856seq: x.seq,857time: x.time * 1000,858key: x.key != null ? x.key : undefined,859encoding: x.encoding,860raw: handleDecompress(x),861headers: x.headers ? JSON.parse(x.headers) : undefined,862};863}864865function handleDecompress({866raw,867compress,868}: {869raw: Buffer;870compress: CompressionAlgorithm;871}) {872if (compress == CompressionAlgorithm.None) {873return raw;874} else if (compress == CompressionAlgorithm.Zstd) {875return decompress(raw);876} else {877throw Error(`unknown compression ${compress}`);878}879}880881interface CreateOptions extends StorageOptions {882noCache?: boolean;883}884885export const openPaths = new Set<string>();886887export const cache = refCacheSync<CreateOptions, PersistentStream>({888name: "persistent-storage-stream",889createKey: ({ path }: CreateOptions) => path,890createObject: (options: CreateOptions) => {891return new PersistentStream(options);892},893});894895export function pstream(896options: StorageOptions & { noCache?: boolean },897): PersistentStream {898return cache(options);899}900901function age(path: string) {902try {903return statSync(path).mtimeMs;904} catch {905return 0;906}907}908909910