Path: blob/master/src/packages/conat/sync/core-stream.ts
1710 views
/*1core-stream.ts = the Core Stream data structure for conat.23This is the core data structure that easy-to-use ephemeral and persistent4streams and kv stores are built on. It is NOT meant to be super easy and5simple to use, with save in the background. Instead, operations6are async, and the API is complicated. We build dkv, dstream, akv, etc. on7top of this with a much friendly API.89NOTE: unlike in conat, in kv mode, the keys can be any utf-8 string.10We use the subject to track communication involving this stream, but11otherwise it has no relevant to the keys. Conat's core pub/sub/request/12reply model is very similar to NATS, but the analogue of Jetstream is13different because I don't find Jetstream useful at all, and find this14much more useful.1516DEVELOPMENT:1718~/cocalc/src/packages/backend$ node1920require('@cocalc/backend/conat'); a = require('@cocalc/conat/sync/core-stream'); s = await a.cstream({name:'test'})2122*/2324import { EventEmitter } from "events";25import {26Message,27type Headers,28messageData,29decode,30} from "@cocalc/conat/core/client";31import { isNumericString } from "@cocalc/util/misc";32import refCache from "@cocalc/util/refcache";33import { conat } from "@cocalc/conat/client";34import type { Client } from "@cocalc/conat/core/client";35import jsonStableStringify from "json-stable-stringify";36import type {37SetOperation,38DeleteOperation,39StoredMessage,40Configuration,41} from "@cocalc/conat/persist/storage";42import type { Changefeed } from "@cocalc/conat/persist/client";43export type { Configuration };44import { join } from "path";45import {46type StorageOptions,47type PersistStreamClient,48stream as persist,49type SetOptions,50} from "@cocalc/conat/persist/client";51import { reuseInFlight } from "@cocalc/util/reuse-in-flight";52import { until } from "@cocalc/util/async-utils";53import { type PartialInventory } from "@cocalc/conat/persist/storage";54import { getLogger } from "@cocalc/conat/client";5556const logger = getLogger("sync:core-stream");5758const PUBLISH_MANY_BATCH_SIZE = 500;5960const DEFAULT_GET_ALL_TIMEOUT = 15000;6162const log = (..._args) => {};63//const log = console.log;6465// when this many bytes of key:value have been changed (so need to be freed),66// we do a garbage collection pass.67export const KEY_GC_THRESH = 10 * 1e6;6869// NOTE: when you do delete this.deleteKv(key), we ensure the previous70// messages with the given key is completely deleted from sqlite, and71// also create a *new* lightweight tombstone. That tombstone has this72// ttl, which defaults to DEFAULT_TOMBSTONE_TTL (one week), so the tombstone73// itself will be removed after 1 week. The tombstone is only needed for74// clients that go offline during the delete, then come back, and reply the75// partial log of what was missed. Such clients should reset if the76// offline time is longer than DEFAULT_TOMBSTONE_TTL.77// This only happens if allow_msg_ttl is configured to true, which is78// done with dkv, but not on by default otherwise.79export const DEFAULT_TOMBSTONE_TTL = 7 * 24 * 60 * 60 * 1000; // 1 week8081export interface RawMsg extends Message {82timestamp: number;83seq: number;84key?: string;85}8687export interface ChangeEvent<T> {88mesg?: T;89raw?: Partial<RawMsg>;90key?: string;91prev?: T;92msgID?: string;93}9495const HEADER_PREFIX = "CN-";9697export const COCALC_TOMBSTONE_HEADER = `${HEADER_PREFIX}Tombstone`;98export const COCALC_STREAM_HEADER = `${HEADER_PREFIX}Stream`;99export const COCALC_OPTIONS_HEADER = `${HEADER_PREFIX}Options`;100101export interface CoreStreamOptions {102// what it's called103name: string;104// where it is located -- this is who **owns the resource**, which105// may or may not being who is accessing it.106account_id?: string;107project_id?: string;108config?: Partial<Configuration>;109// only load historic messages starting at the given seq number.110start_seq?: number;111112ephemeral?: boolean;113sync?: boolean;114115client?: Client;116117noCache?: boolean;118119// the name of the cluster of persistence servers to use -- this is120// by default SERVICE from conat/persist/util.ts. Set it to something121// else to use special different servers, e.g., we use a different service122// for sharing cluster state date, where the servers are ephemeral and123// there is one for each node.124service?: string;125}126127export interface User {128account_id?: string;129project_id?: string;130}131132export function storagePath({133account_id,134project_id,135name,136}: User & { name: string }) {137let userPath;138if (account_id) {139userPath = `accounts/${account_id}`;140} else if (project_id) {141userPath = `projects/${project_id}`;142} else {143userPath = "hub";144}145return join(userPath, name);146}147148export class CoreStream<T = any> extends EventEmitter {149public readonly name: string;150151private configOptions?: Partial<Configuration>;152private _start_seq?: number;153154// don't do "this.raw=" or "this.messages=" anywhere in this class155// because dstream directly references the public raw/messages.156public readonly raw: RawMsg[] = [];157public readonly messages: T[] = [];158public readonly kv: { [key: string]: { mesg: T; raw: RawMsg } } = {};159private kvChangeBytes = 0;160161// this msgID's is ONLY used in ephemeral mode by the leader.162private readonly msgIDs = new Set<any>();163// lastSeq used by clients to keep track of what they have received; if one164// is skipped they reconnect starting with the last one they didn't miss.165private lastSeq: number = 0;166// IMPORTANT: user here means the *owner* of the resource, **NOT** the167// client who is accessing it! For example, a stream of edits of a file168// in a project has user {project_id} even if it is being accessed by169// an account.170private user: User;171private storage: StorageOptions;172private client?: Client;173private persistClient: PersistStreamClient;174private changefeed?: Changefeed;175private service?: string;176177constructor({178name,179project_id,180account_id,181config,182start_seq,183ephemeral = false,184sync,185client,186service,187}: CoreStreamOptions) {188super();189logger.debug("constructor", name);190if (client == null) {191throw Error("client must be specified");192}193this.client = client;194this.service = service;195this.user = { account_id, project_id };196this.name = name;197this.storage = {198path: storagePath({ account_id, project_id, name }),199ephemeral,200sync,201};202this._start_seq = start_seq;203this.configOptions = config;204return new Proxy(this, {205get(target, prop) {206return typeof prop == "string" && isNumericString(prop)207? target.get(parseInt(prop))208: target[String(prop)];209},210});211}212213private initialized = false;214init = async () => {215if (this.initialized) {216throw Error("init can only be called once");217}218this.initialized = true;219if (this.client == null) {220this.client = await conat();221}222this.persistClient = persist({223client: this.client,224user: this.user,225storage: this.storage,226service: this.service,227});228this.persistClient.on("error", (err) => {229if (!process.env.COCALC_TEST_MODE) {230console.log(`WARNING: persistent stream issue -- ${err}`);231}232});233await this.getAllFromPersist({234start_seq: this._start_seq,235noEmit: true,236});237238await until(239async () => {240if (this.client == null) {241return true;242}243try {244this.configOptions = await this.config(this.configOptions);245return true;246} catch (err) {247if (err.code == 403) {248// fatal permission error249throw err;250}251}252return false;253},254{ start: 750 },255);256};257258config = async (259config: Partial<Configuration> = {},260): Promise<Configuration> => {261if (this.storage == null) {262throw Error("bug -- storage must be set");263}264return await this.persistClient.config({ config });265};266267private isClosed = () => {268return this.client === undefined;269};270271close = () => {272logger.debug("close", this.name);273delete this.client;274this.removeAllListeners();275this.persistClient?.close();276// @ts-ignore277delete this.persistClient;278// @ts-ignore279delete this.kv;280// @ts-ignore281delete this.messages;282// @ts-ignore283delete this.raw;284// @ts-ignore285delete this.msgIDs;286// @ts-ignore287delete this.storage;288};289290inventory = async (): Promise<PartialInventory> => {291return await this.persistClient.inventory();292};293294// NOTE: It's assumed elsewhere that getAllFromPersist will not throw,295// and will keep retrying until (1) it works, or (2) self is closed,296// or (3) there is a fatal failure, e.g., lack of permissions.297private getAllFromPersist = async ({298start_seq = 0,299noEmit,300}: { start_seq?: number; noEmit?: boolean } = {}) => {301if (this.storage == null) {302throw Error("bug -- storage must be set");303}304await until(305async () => {306let messages: StoredMessage[] = [];307let changes: (SetOperation | DeleteOperation | StoredMessage)[] = [];308try {309if (this.isClosed()) {310return true;311}312if (this.changefeed == null) {313this.changefeed = await this.persistClient.changefeed();314}315// console.log("get persistent stream", { start_seq }, this.storage);316messages = await this.persistClient.getAll({317start_seq,318timeout: DEFAULT_GET_ALL_TIMEOUT,319});320} catch (err) {321if (this.isClosed()) {322return true;323}324if (!process.env.COCALC_TEST_MODE) {325console.log(326`WARNING: getAllFromPersist - failed -- ${err}, code=${err.code}, service=${this.service}, storage=${JSON.stringify(this.storage)} -- will retry`,327);328}329if (err.code == 503 || err.code == 408) {330// 503: temporary error due to messages being dropped,331// so return false to try again. This is expected to332// sometimes happen under heavy load, automatic failover, etc.333// 408: timeout waiting to be connected/ready334return false;335}336if (err.code == 403) {337// fatal permission error338throw err;339}340if (err.code == 429) {341// too many users342throw err;343}344// any other error that we might not address above -- just try again in a while.345return false;346}347this.processPersistentMessages(messages, {348noEmit,349noSeqCheck: true,350});351if (changes.length > 0) {352this.processPersistentMessages(changes, {353noEmit,354noSeqCheck: false,355});356}357// success!358return true;359},360{ start: 1000, max: 15000 },361);362363this.listen();364};365366private processPersistentMessages = (367messages: (SetOperation | DeleteOperation | StoredMessage)[],368opts: { noEmit?: boolean; noSeqCheck?: boolean },369) => {370if (this.raw === undefined) {371// closed372return;373}374let seq = undefined;375for (const mesg of messages) {376try {377this.processPersistentMessage(mesg, opts);378if (mesg["seq"] != null) {379seq = mesg["seq"];380}381} catch (err) {382console.log("WARNING: issue processing message", mesg, err);383}384}385return seq;386};387388private processPersistentMessage = (389mesg: SetOperation | DeleteOperation | StoredMessage,390opts: { noEmit?: boolean; noSeqCheck?: boolean },391) => {392if ((mesg as DeleteOperation).op == "delete") {393this.processPersistentDelete(mesg as DeleteOperation, opts);394} else {395// set is the default396this.processPersistentSet(mesg as SetOperation, opts);397}398};399400private processPersistentDelete = (401{ seqs }: DeleteOperation,402{ noEmit }: { noEmit?: boolean },403) => {404if (this.raw == null) return;405const X = new Set<number>(seqs);406// seqs is a list of integers. We remove407// every entry from this.raw, this.messages, and this.kv408// where this.raw.seq is in X by mutating raw/messages/kv,409// not by making new objects (since external references).410// This is a rare operation so we're not worried too much411// about performance.412const keys: { [seq: number]: string } = {};413for (const key in this.kv) {414const seq = this.kv[key]?.raw?.seq;415if (X.has(seq)) {416delete this.kv[key];417keys[key] = seq;418}419}420const indexes: number[] = [];421for (let i = 0; i < this.raw.length; i++) {422const seq = this.raw[i].seq;423if (X.has(seq)) {424indexes.push(i);425if (!noEmit) {426this.emitChange({427mesg: undefined,428raw: { seq },429key: keys[seq],430prev: this.messages[i],431});432}433}434}435436// remove this.raw[i] and this.messages[i] for all i in indexes,437// with special case to be fast in the very common case of contiguous indexes.438if (indexes.length > 1 && indexes.every((v, i) => v === indexes[0] + i)) {439// Contiguous: bulk remove for performance440const start = indexes[0];441const deleteCount = indexes.length;442this.raw.splice(start, deleteCount);443this.messages.splice(start, deleteCount);444} else {445// Non-contiguous: fallback to individual reverse splices446for (let i = indexes.length - 1; i >= 0; i--) {447const idx = indexes[i];448this.raw.splice(idx, 1);449this.messages.splice(idx, 1);450}451}452};453454private processPersistentSetLargestSeq: number = 0;455private missingMessages = new Set<number>();456private processPersistentSet = (457{ seq, time, key, encoding, raw: data, headers, msgID }: SetOperation,458{459noEmit,460noSeqCheck,461}: {462noEmit?: boolean;463noSeqCheck?: boolean;464},465) => {466if (this.raw == null) return;467if (!noSeqCheck) {468const expected = this.processPersistentSetLargestSeq + 1;469if (seq > expected) {470log(471"processPersistentSet -- detected missed seq number",472{ seq, expected: this.processPersistentSetLargestSeq + 1 },473this.storage,474);475// We record that some are missing.476for (let s = expected; s <= seq - 1; s++) {477this.missingMessages.add(s);478this.getAllMissingMessages();479}480}481}482483if (seq > this.processPersistentSetLargestSeq) {484this.processPersistentSetLargestSeq = seq;485}486487const mesg = decode({ encoding, data });488const raw = {489timestamp: time,490headers,491seq,492raw: data,493key,494} as RawMsg;495if (seq > (this.raw.slice(-1)[0]?.seq ?? 0)) {496// easy fast initial load to the end of the list (common special case)497this.messages.push(mesg);498this.raw.push(raw);499} else {500// Insert in the correct place. This should only501// happen when calling load of old data, which happens, e.g., during502// automatic failover. The algorithm below is503// dumb and could be replaced by a binary search. However, we'll504// change how we batch load so there's less point.505let i = 0;506while (i < this.raw.length && this.raw[i].seq < seq) {507i += 1;508}509// after the above loop, either:510// - this.raw[i] is undefined because i = this.raw.length and every known entry was less than seq,511// so we just append it, or512// - this.raw[i] is defined and this.raw[i].seq >= seq. If they are equal, do nothing, since we already513// have it. If not equal, then splice it in.514if (i >= this.raw.length) {515this.raw.push(raw);516this.messages.push(mesg);517} else if (this.raw[i].seq > seq) {518this.raw.splice(i, 0, raw);519this.messages.splice(i, 0, mesg);520} // other case -- we already have it.521}522let prev: T | undefined = undefined;523if (typeof key == "string") {524prev = this.kv[key]?.mesg;525if (raw.headers?.[COCALC_TOMBSTONE_HEADER]) {526delete this.kv[key];527} else {528if (this.kv[key] !== undefined) {529const { raw } = this.kv[key];530this.kvChangeBytes += raw.raw.length;531}532533this.kv[key] = { raw, mesg };534535if (this.kvChangeBytes >= KEY_GC_THRESH) {536this.gcKv();537}538}539}540this.lastSeq = Math.max(this.lastSeq, seq);541if (!noEmit) {542this.emitChange({ mesg, raw, key, prev, msgID });543}544};545546private emitChange = (e: ChangeEvent<T>) => {547if (this.raw == null) return;548this.emit("change", e);549};550551private listen = async () => {552// log("core-stream: listen", this.storage);553await until(554async () => {555if (this.isClosed()) {556return true;557}558try {559if (this.changefeed == null) {560this.changefeed = await this.persistClient.changefeed();561if (this.isClosed()) {562return true;563}564}565566for await (const updates of this.changefeed) {567this.processPersistentMessages(updates, {568noEmit: false,569noSeqCheck: false,570});571if (this.isClosed()) {572return true;573}574}575} catch (err) {576// There should never be a case where the changefeed throws577// an error or ends without this whole streaming being closed.578// If that happens its an unexpected bug. Instead of failing,579// we log this, loop around, and make a new changefeed.580// This normally doesn't happen but could if a persist server is being restarted581// frequently or things are seriously broken. We cause this in582// backend/conat/test/core/core-stream-break.test.ts583if (!process.env.COCALC_TEST_MODE) {584log(585`WARNING: core-stream changefeed error -- ${err}`,586this.storage,587);588}589}590591delete this.changefeed;592593// above loop exits when the persistent server594// stops sending messages for some reason. In that595// case we reconnect, picking up where we left off.596597if (this.client == null) {598return true;599}600601await this.getAllFromPersist({602start_seq: this.lastSeq + 1,603noEmit: false,604});605return false;606},607{ start: 500, max: 7500, decay: 1.2 },608);609};610611publish = async (612mesg: T,613options?: PublishOptions,614): Promise<{ seq: number; time: number } | undefined> => {615if (mesg === undefined) {616if (options?.key !== undefined) {617// undefined can't be JSON encoded, so we can't possibly represent it, and this618// *must* be treated as a delete.619this.deleteKv(options?.key, { previousSeq: options?.previousSeq });620return;621} else {622throw Error("stream non-kv publish - mesg must not be 'undefined'");623}624}625626if (options?.msgID && this.msgIDs.has(options.msgID)) {627// it's a dup628return;629}630const md = messageData(mesg, { headers: options?.headers });631const x = await this.persistClient.set({632key: options?.key,633messageData: md,634previousSeq: options?.previousSeq,635msgID: options?.msgID,636ttl: options?.ttl,637timeout: options?.timeout,638});639if (options?.msgID) {640this.msgIDs?.add(options.msgID);641}642return x;643};644645publishMany = async (646messages: { mesg: T; options?: PublishOptions }[],647): Promise<648({ seq: number; time: number } | { error: string; code?: any })[]649> => {650let result: (651| { seq: number; time: number }652| { error: string; code?: any }653)[] = [];654655for (let i = 0; i < messages.length; i += PUBLISH_MANY_BATCH_SIZE) {656const batch = messages.slice(i, i + PUBLISH_MANY_BATCH_SIZE);657result = result.concat(await this.publishMany0(batch));658}659660return result;661};662663private publishMany0 = async (664messages: { mesg: T; options?: PublishOptions }[],665): Promise<666({ seq: number; time: number } | { error: string; code?: any })[]667> => {668const v: SetOptions[] = [];669let timeout: number | undefined = undefined;670for (const { mesg, options } of messages) {671if (options?.timeout) {672if (timeout === undefined) {673timeout = options.timeout;674} else {675timeout = Math.min(timeout, options.timeout);676}677}678const md = messageData(mesg, { headers: options?.headers });679v.push({680key: options?.key,681messageData: md,682previousSeq: options?.previousSeq,683msgID: options?.msgID,684ttl: options?.ttl,685});686}687return await this.persistClient.setMany(v, { timeout });688};689690get = (n?): T | T[] => {691if (n == null) {692return this.getAll();693} else {694return this.messages[n];695}696};697698seq = (n: number): number | undefined => {699return this.raw[n]?.seq;700};701702getAll = (): T[] => {703return [...this.messages];704};705706get length(): number {707return this.messages.length;708}709710get start_seq(): number | undefined {711return this._start_seq;712}713714headers = (n: number): { [key: string]: any } | undefined => {715return this.raw[n]?.headers;716};717718// key:value interface for subset of messages pushed with key option set.719// NOTE: This does NOT throw an error if our local seq is out of date (leave that720// to dkv built on this).721setKv = async (722key: string,723mesg: T,724options?: {725headers?: Headers;726previousSeq?: number;727},728): Promise<{ seq: number; time: number } | undefined> => {729return await this.publish(mesg, { ...options, key });730};731732setKvMany = async (733x: {734key: string;735mesg: T;736options?: {737headers?: Headers;738previousSeq?: number;739};740}[],741): Promise<742({ seq: number; time: number } | { error: string; code?: any })[]743> => {744const messages: { mesg: T; options?: PublishOptions }[] = [];745for (const { key, mesg, options } of x) {746messages.push({ mesg, options: { ...options, key } });747}748return await this.publishMany(messages);749};750751deleteKv = async (752key: string,753options?: {754msgID?: string;755previousSeq?: number;756},757) => {758if (this.kv[key] === undefined) {759// nothing to do760return;761}762return await this.publish(null as any, {763...options,764headers: { [COCALC_TOMBSTONE_HEADER]: true },765key,766ttl: DEFAULT_TOMBSTONE_TTL,767});768};769770getKv = (key: string): T | undefined => {771return this.kv[key]?.mesg;772};773774hasKv = (key: string): boolean => {775return this.kv?.[key] !== undefined;776};777778getAllKv = (): { [key: string]: T } => {779const all: { [key: string]: T } = {};780for (const key in this.kv) {781all[key] = this.kv[key].mesg;782}783return all;784};785786// efficient way to get just the keys -- use this instead of787// getAllKv if you just need the keys.788keysKv = (): string[] => {789return Object.keys(this.kv);790};791792seqKv = (key: string): number | undefined => {793return this.kv[key]?.raw.seq;794};795796timeKv = (key?: string): Date | { [key: string]: Date } | undefined => {797if (key === undefined) {798const all: { [key: string]: Date } = {};799for (const key in this.kv) {800all[key] = new Date(this.kv[key].raw.timestamp);801}802return all;803}804const r = this.kv[key]?.raw;805if (r == null) {806return;807}808return new Date(r.timestamp);809};810811headersKv = (key: string): { [key: string]: any } | undefined => {812return this.kv[key]?.raw?.headers;813};814815get lengthKv(): number {816return Object.keys(this.kv).length;817}818819// load older messages starting at start_seq up to the oldest message820// we currently have. This can throw an exception in case of heavy821// load or network issues, but should completely succeed or make822// no change.823load = async ({824start_seq,825noEmit,826}: {827start_seq: number;828noEmit?: boolean;829}) => {830// This is used for loading more TimeTravel history831if (this.storage == null) {832throw Error("bug");833}834// this is one before the oldest we have835const end_seq = (this.raw[0]?.seq ?? this._start_seq ?? 1) - 1;836if (start_seq > end_seq) {837// nothing to load838return;839}840// we're moving start_seq back to this point841this._start_seq = start_seq;842const messages = await this.persistClient.getAll({843start_seq,844end_seq,845});846this.processPersistentMessages(messages, { noEmit, noSeqCheck: true });847};848849private getAllMissingMessages = reuseInFlight(async () => {850await until(851async () => {852if (this.client == null || this.missingMessages.size == 0) {853return true;854}855try {856const missing = Array.from(this.missingMessages);857missing.sort();858log("core-stream: getMissingSeq", missing, this.storage);859const messages = await this.persistClient.getAll({860start_seq: missing[0],861end_seq: missing[missing.length - 1],862});863this.processPersistentMessages(messages, {864noEmit: false,865noSeqCheck: true,866});867for (const seq of missing) {868this.missingMessages.delete(seq);869}870} catch (err) {871log(872"core-stream: WARNING -- issue getting missing updates",873err,874this.storage,875);876}877return false;878},879{ start: 1000, max: 15000, decay: 1.3 },880);881});882883// get server assigned time of n-th message in stream884time = (n: number): Date | undefined => {885const r = this.raw[n];886if (r == null) {887return;888}889return new Date(r.timestamp);890};891892times = () => {893const v: (Date | undefined)[] = [];894for (let i = 0; i < this.length; i++) {895v.push(this.time(i));896}897return v;898};899900stats = ({901start_seq = 1,902}: {903start_seq?: number;904} = {}): { count: number; bytes: number } | undefined => {905if (this.raw == null) {906return;907}908let count = 0;909let bytes = 0;910for (const { raw, seq } of this.raw) {911if (seq == null) {912continue;913}914if (seq < start_seq) {915continue;916}917count += 1;918bytes += raw.length;919}920return { count, bytes };921};922923// delete all messages up to and including the924// one at position index, i.e., this.messages[index]925// is deleted.926// NOTE: For ephemeral streams, clients will NOT see the result of a delete,927// except when they load the stream later. For persistent streams all928// **connected** clients will see the delete. THAT said, this is not a "proper"929// distributed computing primitive with tombstones, etc. This is primarily930// meant for reducing space usage, and shouldn't be relied on for931// any other purpose.932delete = async ({933all,934last_index,935seq,936last_seq,937key,938seqs,939}: {940// give exactly ONE parameter -- by default nothing happens with no params941// all: delete everything942all?: boolean;943// last_index: everything up to and including index'd message944last_index?: number;945// seq: delete message with this sequence number946seq?: number;947// seqs: delete the messages in this array of sequence numbers948seqs?: number[];949// last_seq: delete everything up to and including this sequence number950last_seq?: number;951// key: delete the message with this key952key?: string;953} = {}): Promise<{ seqs: number[] }> => {954let opts;955if (all) {956opts = { all: true };957} else if (last_index != null) {958if (last_index >= this.raw.length) {959opts = { all: true };960} else if (last_index < 0) {961return { seqs: [] };962} else {963const last_seq = this.raw[last_index].seq;964if (last_seq === undefined) {965throw Error(`BUG: invalid index ${last_index}`);966}967opts = { last_seq };968}969} else if (seq != null) {970opts = { seq };971} else if (seqs != null) {972opts = { seqs };973} else if (last_seq != null) {974opts = { last_seq };975} else if (key != null) {976const seq = this.kv[key]?.raw?.seq;977if (seq === undefined) {978return { seqs: [] };979}980opts = { seq };981}982return await this.persistClient.delete(opts);983};984985// delete messages that are no longer needed since newer values have been written986gcKv = () => {987this.kvChangeBytes = 0;988for (let i = 0; i < this.raw.length; i++) {989const key = this.raw[i].key;990if (key !== undefined) {991if (this.raw[i].raw.length > 0 && this.raw[i] !== this.kv[key].raw) {992this.raw[i] = {993...this.raw[i],994headers: undefined,995raw: Buffer.from(""),996} as RawMsg;997this.messages[i] = undefined as T;998}999}1000}1001};1002}10031004export interface PublishOptions {1005// headers for this message1006headers?: Headers;1007// unique id for this message to dedup so if you send the same1008// message more than once with the same id it doesn't get published1009// multiple times.1010msgID?: string;1011// key -- if specified a key field is also stored on the server,1012// and any previous messages with the same key are deleted. Also,1013// an entry is set in this.kv[key] so that this.getKv(key), etc. work.1014key?: string;1015// if key is specified and previousSeq is set, the server throws1016// an error if the sequence number of the current key is1017// not previousSeq. We use this with this.seqKv(key) to1018// provide read/change/write semantics and to know when we1019// should resovle a merge conflict. This is ignored if1020// key is not specified.1021previousSeq?: number;1022// if set to a number of ms AND the config option allow_msg_ttl1023// is set on this persistent stream, then1024// this message will be deleted after the given amount of time (in ms).1025ttl?: number;1026timeout?: number;1027}10281029export const cache = refCache<CoreStreamOptions, CoreStream>({1030name: "core-stream",1031createObject: async (options: CoreStreamOptions) => {1032if (options.client == null) {1033options = { ...options, client: await conat() };1034}1035const cstream = new CoreStream(options);1036await cstream.init();1037return cstream;1038},1039createKey: ({ client, ...options }) => {1040return jsonStableStringify({ id: client?.id, ...options })!;1041},1042});10431044export async function cstream<T>(1045options: CoreStreamOptions,1046): Promise<CoreStream<T>> {1047return await cache(options);1048}104910501051