Path: blob/master/src/packages/conat/sync/core-stream.ts
5837 views
/*1core-stream.ts = the Core Stream data structure for conat.23This is the core data structure that easy-to-use ephemeral and persistent4streams and kv stores are built on. It is NOT meant to be super easy and5simple to use, with save in the background. Instead, operations6are async, and the API is complicated. We build dkv, dstream, akv, etc. on7top of this with a much friendly API.89NOTE: unlike in conat, in kv mode, the keys can be any utf-8 string.10We use the subject to track communication involving this stream, but11otherwise it has no relevant to the keys. Conat's core pub/sub/request/12reply model is very similar to NATS, but the analogue of Jetstream is13different because I don't find Jetstream useful at all, and find this14much more useful.1516DEVELOPMENT:1718~/cocalc/src/packages/backend$ node1920require('@cocalc/backend/conat'); a = require('@cocalc/conat/sync/core-stream'); s = await a.cstream({name:'test'})2122*/2324import { EventEmitter } from "events";25import {26Message,27type Headers,28messageData,29decode,30} from "@cocalc/conat/core/client";31import { isNumericString } from "@cocalc/util/misc";32import refCache from "@cocalc/util/refcache";33import { conat } from "@cocalc/conat/client";34import type { Client } from "@cocalc/conat/core/client";35import jsonStableStringify from "json-stable-stringify";36import type {37SetOperation,38DeleteOperation,39StoredMessage,40Configuration,41} from "@cocalc/conat/persist/storage";42import type { Changefeed } from "@cocalc/conat/persist/client";43export type { Configuration };44import { join } from "path";45import {46type StorageOptions,47type PersistStreamClient,48stream as persist,49type SetOptions,50} from "@cocalc/conat/persist/client";51import { reuseInFlight } from "@cocalc/util/reuse-in-flight";52import { until } from "@cocalc/util/async-utils";53import { type PartialInventory } from "@cocalc/conat/persist/storage";54import { getLogger } from "@cocalc/conat/client";5556const logger = getLogger("sync:core-stream");5758const PUBLISH_MANY_BATCH_SIZE = 500;5960const DEFAULT_GET_ALL_TIMEOUT = 15000;6162const log = (..._args) => {};63//const log = console.log;6465// when this many bytes of key:value have been changed (so need to be freed),66// we do a garbage collection pass.67export const KEY_GC_THRESH = 10 * 1e6;6869// NOTE: when you do delete this.deleteKv(key), we ensure the previous70// messages with the given key is completely deleted from sqlite, and71// also create a *new* lightweight tombstone. That tombstone has this72// ttl, which defaults to DEFAULT_TOMBSTONE_TTL (one day), so the tombstone73// itself will be removed after 1 day. The tombstone is only needed for74// clients that go offline during the delete, then come back, and replay the75// partial log of what was missed. Such clients should reset if the76// offline time is longer than DEFAULT_TOMBSTONE_TTL.77// This only happens if allow_msg_ttl is configured to true, which is78// done with dkv, but not on by default otherwise.79export const DEFAULT_TOMBSTONE_TTL = 24 * 60 * 60 * 1000; // 1 day8081export interface RawMsg extends Message {82timestamp: number;83seq: number;84key?: string;85}8687export interface ChangeEvent<T> {88mesg?: T;89raw?: Partial<RawMsg>;90key?: string;91prev?: T;92msgID?: string;93}9495const HEADER_PREFIX = "CN-";9697export const COCALC_TOMBSTONE_HEADER = `${HEADER_PREFIX}Tombstone`;98export const COCALC_STREAM_HEADER = `${HEADER_PREFIX}Stream`;99export const COCALC_OPTIONS_HEADER = `${HEADER_PREFIX}Options`;100101export interface CoreStreamOptions {102// what it's called103name: string;104// where it is located -- this is who **owns the resource**, which105// may or may not being who is accessing it.106account_id?: string;107project_id?: string;108config?: Partial<Configuration>;109// only load historic messages starting at the given seq number.110start_seq?: number;111112ephemeral?: boolean;113sync?: boolean;114115client?: Client;116117noCache?: boolean;118119// the name of the cluster of persistence servers to use -- this is120// by default SERVICE from conat/persist/util.ts. Set it to something121// else to use special different servers, e.g., we use a different service122// for sharing cluster state date, where the servers are ephemeral and123// there is one for each node.124service?: string;125}126127export interface User {128account_id?: string;129project_id?: string;130}131132export function storagePath({133account_id,134project_id,135name,136}: User & { name: string }) {137let userPath;138if (account_id) {139userPath = `accounts/${account_id}`;140} else if (project_id) {141userPath = `projects/${project_id}`;142} else {143userPath = "hub";144}145return join(userPath, name);146}147148export class CoreStream<T = any> extends EventEmitter {149public readonly name: string;150151private configOptions?: Partial<Configuration>;152private _start_seq?: number;153154// don't do "this.raw=" or "this.messages=" anywhere in this class155// because dstream directly references the public raw/messages.156public readonly raw: RawMsg[] = [];157public readonly messages: T[] = [];158public readonly kv: { [key: string]: { mesg: T; raw: RawMsg } } = {};159private kvChangeBytes = 0;160161// this msgID's is ONLY used in ephemeral mode by the leader.162private readonly msgIDs = new Set<any>();163// lastSeq used by clients to keep track of what they have received; if one164// is skipped they reconnect starting with the last one they didn't miss.165private lastSeq: number = 0;166private lastValueByKey = new Map<string, T>();167// IMPORTANT: user here means the *owner* of the resource, **NOT** the168// client who is accessing it! For example, a stream of edits of a file169// in a project has user {project_id} even if it is being accessed by170// an account.171private user: User;172private storage: StorageOptions;173private client?: Client;174private persistClient: PersistStreamClient;175private changefeed?: Changefeed;176private service?: string;177178constructor({179name,180project_id,181account_id,182config,183start_seq,184ephemeral = false,185sync,186client,187service,188}: CoreStreamOptions) {189super();190logger.debug("constructor", name);191if (client == null) {192throw Error("client must be specified");193}194this.client = client;195this.service = service;196this.user = { account_id, project_id };197this.name = name;198this.storage = {199path: storagePath({ account_id, project_id, name }),200ephemeral,201sync,202};203this._start_seq = start_seq;204this.configOptions = config;205return new Proxy(this, {206get(target, prop) {207return typeof prop == "string" && isNumericString(prop)208? target.get(parseInt(prop))209: target[String(prop)];210},211});212}213214private initialized = false;215init = async () => {216if (this.initialized) {217throw Error("init can only be called once");218}219this.initialized = true;220if (this.client == null) {221this.client = await conat();222}223this.persistClient = persist({224client: this.client,225user: this.user,226storage: this.storage,227service: this.service,228});229this.persistClient.on("error", (err) => {230if (!process.env.COCALC_TEST_MODE) {231console.log(`WARNING: persistent stream issue -- ${err}`);232}233});234await this.getAllFromPersist({235start_seq: this._start_seq,236noEmit: true,237});238239await until(240async () => {241if (this.client == null) {242return true;243}244try {245this.configOptions = await this.config(this.configOptions);246return true;247} catch (err) {248if (err.code == 403) {249// fatal permission error250throw err;251}252}253return false;254},255{ start: 750 },256);257};258259debugStats = () => {260return {261rawLength: this.raw.length,262messagesLength: this.messages.length,263kvLength: this.lengthKv,264lastValueByKeySize: this.lastValueByKey.size,265};266};267268config = async (269config: Partial<Configuration> = {},270): Promise<Configuration> => {271if (this.storage == null) {272throw Error("bug -- storage must be set");273}274return await this.persistClient.config({ config });275};276277private isClosed = () => {278return this.client === undefined;279};280281close = () => {282logger.debug("close", this.name);283delete this.client;284this.removeAllListeners();285this.persistClient?.close();286// @ts-ignore287delete this.persistClient;288// @ts-ignore289delete this.kv;290// @ts-ignore291delete this.messages;292// @ts-ignore293delete this.raw;294// @ts-ignore295delete this.msgIDs;296// @ts-ignore297delete this.storage;298};299300inventory = async (): Promise<PartialInventory> => {301return await this.persistClient.inventory();302};303304// NOTE: It's assumed elsewhere that getAllFromPersist will not throw,305// and will keep retrying until (1) it works, or (2) self is closed,306// or (3) there is a fatal failure, e.g., lack of permissions.307private getAllFromPersist = async ({308start_seq = 0,309noEmit,310}: { start_seq?: number; noEmit?: boolean } = {}) => {311if (this.storage == null) {312throw Error("bug -- storage must be set");313}314await until(315async () => {316let messages: StoredMessage[] = [];317// TODO: tracking changes during getAll and suppressing sending duplicates318// via the changefeed is not implemented.319// let changes: (SetOperation | DeleteOperation | StoredMessage)[] = [];320try {321if (this.isClosed()) {322return true;323}324if (this.changefeed == null) {325this.changefeed = await this.persistClient.changefeed();326}327// console.log("get persistent stream", { start_seq }, this.storage);328messages = await this.persistClient.getAll({329start_seq,330timeout: DEFAULT_GET_ALL_TIMEOUT,331});332} catch (err) {333if (this.isClosed()) {334return true;335}336if (!process.env.COCALC_TEST_MODE) {337console.log(338`WARNING: getAllFromPersist - failed -- ${err}, code=${err.code}, service=${this.service}, storage=${JSON.stringify(this.storage)} -- will retry`,339);340}341if (err.code == 503 || err.code == 408) {342// 503: temporary error due to messages being dropped,343// so return false to try again. This is expected to344// sometimes happen under heavy load, automatic failover, etc.345// 408: timeout waiting to be connected/ready346return false;347}348if (err.code == 403) {349// fatal permission error350throw err;351}352if (err.code == 429) {353// too many users354throw err;355}356// any other error that we might not address above -- just try again in a while.357return false;358}359this.processPersistentMessages(messages, {360noEmit,361noSeqCheck: true,362});363// if (changes.length > 0) {364// this.processPersistentMessages(changes, {365// noEmit,366// noSeqCheck: false,367// });368// }369// success!370return true;371},372{ start: 1000, max: 15000 },373);374375this.listen();376};377378private processPersistentMessages = (379messages: (SetOperation | DeleteOperation | StoredMessage)[],380opts: { noEmit?: boolean; noSeqCheck?: boolean },381) => {382if (this.raw === undefined) {383// closed384return;385}386let seq = undefined;387for (const mesg of messages) {388try {389this.processPersistentMessage(mesg, opts);390if (mesg["seq"] != null) {391seq = mesg["seq"];392}393} catch (err) {394console.log("WARNING: issue processing message", mesg, err);395}396}397return seq;398};399400private processPersistentMessage = (401mesg: SetOperation | DeleteOperation | StoredMessage,402opts: { noEmit?: boolean; noSeqCheck?: boolean },403) => {404if ((mesg as DeleteOperation).op == "delete") {405this.processPersistentDelete(mesg as DeleteOperation, opts);406} else {407// set is the default408this.processPersistentSet(mesg as SetOperation, opts);409}410};411412private processPersistentDelete = (413{ seqs }: DeleteOperation,414{ noEmit }: { noEmit?: boolean },415) => {416if (this.raw == null) return;417const X = new Set<number>(seqs);418// seqs is a list of integers. We remove419// every entry from this.raw, this.messages, and this.kv420// where this.raw.seq is in X by mutating raw/messages/kv,421// not by making new objects (since external references).422// This is a rare operation so we're not worried too much423// about performance.424const keys: { [seq: number]: string } = {};425for (const key in this.kv) {426const seq = this.kv[key]?.raw?.seq;427if (X.has(seq)) {428delete this.kv[key];429this.lastValueByKey.delete(key);430keys[key] = seq;431}432}433const indexes: number[] = [];434for (let i = 0; i < this.raw.length; i++) {435const seq = this.raw[i].seq;436if (X.has(seq)) {437indexes.push(i);438if (!noEmit) {439this.emitChange({440mesg: undefined,441raw: { seq },442key: keys[seq],443prev: this.messages[i],444});445}446}447}448449// remove this.raw[i] and this.messages[i] for all i in indexes,450// with special case to be fast in the very common case of contiguous indexes.451if (indexes.length > 1 && indexes.every((v, i) => v === indexes[0] + i)) {452// Contiguous: bulk remove for performance453const start = indexes[0];454const deleteCount = indexes.length;455this.raw.splice(start, deleteCount);456this.messages.splice(start, deleteCount);457} else {458// Non-contiguous: fallback to individual reverse splices459for (let i = indexes.length - 1; i >= 0; i--) {460const idx = indexes[i];461this.raw.splice(idx, 1);462this.messages.splice(idx, 1);463}464}465};466467private processPersistentSetLargestSeq: number = 0;468private missingMessages = new Set<number>();469private processPersistentSet = (470{ seq, time, key, encoding, raw: data, headers, msgID }: SetOperation,471{472noEmit,473noSeqCheck,474}: {475noEmit?: boolean;476noSeqCheck?: boolean;477},478) => {479if (this.raw == null) return;480if (!noSeqCheck) {481const expected = this.processPersistentSetLargestSeq + 1;482if (seq > expected) {483log(484"processPersistentSet -- detected missed seq number",485{ seq, expected: this.processPersistentSetLargestSeq + 1 },486this.storage,487);488// We record that some are missing.489for (let s = expected; s <= seq - 1; s++) {490this.missingMessages.add(s);491this.getAllMissingMessages();492}493}494}495496if (seq > this.processPersistentSetLargestSeq) {497this.processPersistentSetLargestSeq = seq;498}499500const mesg = decode({ encoding, data });501const raw = {502timestamp: time,503headers,504seq,505raw: data,506key,507} as RawMsg;508if (seq > (this.raw.slice(-1)[0]?.seq ?? 0)) {509// easy fast initial load to the end of the list (common special case)510this.messages.push(mesg);511this.raw.push(raw);512} else {513// Insert in the correct place. This should only514// happen when calling load of old data, which happens, e.g., during515// automatic failover. The algorithm below is516// dumb and could be replaced by a binary search. However, we'll517// change how we batch load so there's less point.518let i = 0;519while (i < this.raw.length && this.raw[i].seq < seq) {520i += 1;521}522// after the above loop, either:523// - this.raw[i] is undefined because i = this.raw.length and every known entry was less than seq,524// so we just append it, or525// - this.raw[i] is defined and this.raw[i].seq >= seq. If they are equal, do nothing, since we already526// have it. If not equal, then splice it in.527if (i >= this.raw.length) {528this.raw.push(raw);529this.messages.push(mesg);530} else if (this.raw[i].seq > seq) {531this.raw.splice(i, 0, raw);532this.messages.splice(i, 0, mesg);533} // other case -- we already have it.534}535let prev: T | undefined = undefined;536// Issue #8702: Capture the previous raw message for this key BEFORE updating this.kv.537// This is needed for the client-side cleanup below (see the processPersistentDelete call).538// https://github.com/sagemathinc/cocalc/issues/8702539const prevRaw = typeof key == "string" ? this.kv[key]?.raw : undefined;540if (typeof key == "string") {541prev = this.kv[key]?.mesg ?? this.lastValueByKey.get(key);542if (raw.headers?.[COCALC_TOMBSTONE_HEADER]) {543delete this.kv[key];544this.lastValueByKey.delete(key);545} else {546if (this.kv[key] !== undefined) {547const { raw } = this.kv[key];548this.kvChangeBytes += raw.raw.length;549}550551this.kv[key] = { raw, mesg };552this.lastValueByKey.set(key, mesg);553554if (this.kvChangeBytes >= KEY_GC_THRESH) {555this.gcKv();556}557}558}559// Issue #8702: Client-side cleanup for keyed updates.560// When the server processes a keyed update, it deletes the old row for that key561// and inserts a new one. However, the server does NOT emit delete events for562// these overwrites (see storage.ts:450). Without this cleanup, clients would563// never remove old entries from raw[] and messages[], causing unbounded memory564// growth. This fix removes the old entry when a new keyed message arrives.565// https://github.com/sagemathinc/cocalc/issues/8702566if (typeof key == "string" && prevRaw?.seq != null && prevRaw.seq !== seq) {567this.processPersistentDelete(568{ op: "delete", seqs: [prevRaw.seq] },569{ noEmit: true },570);571}572this.lastSeq = Math.max(this.lastSeq, seq);573if (!noEmit) {574this.emitChange({ mesg, raw, key, prev, msgID });575}576};577578private emitChange = (e: ChangeEvent<T>) => {579if (this.raw == null) return;580this.emit("change", e);581};582583private listen = async () => {584// log("core-stream: listen", this.storage);585await until(586async () => {587if (this.isClosed()) {588return true;589}590try {591if (this.changefeed == null) {592this.changefeed = await this.persistClient.changefeed();593if (this.isClosed()) {594return true;595}596}597598for await (const updates of this.changefeed) {599this.processPersistentMessages(updates, {600noEmit: false,601noSeqCheck: false,602});603if (this.isClosed()) {604return true;605}606}607} catch (err) {608// There should never be a case where the changefeed throws609// an error or ends without this whole streaming being closed.610// If that happens its an unexpected bug. Instead of failing,611// we log this, loop around, and make a new changefeed.612// This normally doesn't happen but could if a persist server is being restarted613// frequently or things are seriously broken. We cause this in614// backend/conat/test/core/core-stream-break.test.ts615if (!process.env.COCALC_TEST_MODE) {616log(617`WARNING: core-stream changefeed error -- ${err}`,618this.storage,619);620}621}622623delete this.changefeed;624625// above loop exits when the persistent server626// stops sending messages for some reason. In that627// case we reconnect, picking up where we left off.628629if (this.client == null) {630return true;631}632633await this.getAllFromPersist({634start_seq: this.lastSeq + 1,635noEmit: false,636});637return false;638},639{ start: 500, max: 7500, decay: 1.2 },640);641};642643publish = async (644mesg: T,645options?: PublishOptions,646): Promise<{ seq: number; time: number } | undefined> => {647if (mesg === undefined) {648if (options?.key !== undefined) {649// undefined can't be JSON encoded, so we can't possibly represent it, and this650// *must* be treated as a delete.651this.deleteKv(options?.key, { previousSeq: options?.previousSeq });652return;653} else {654throw Error("stream non-kv publish - mesg must not be 'undefined'");655}656}657658if (options?.msgID && this.msgIDs.has(options.msgID)) {659// it's a dup660return;661}662const md = messageData(mesg, { headers: options?.headers });663const x = await this.persistClient.set({664key: options?.key,665messageData: md,666previousSeq: options?.previousSeq,667msgID: options?.msgID,668ttl: options?.ttl,669timeout: options?.timeout,670});671if (options?.msgID) {672this.msgIDs?.add(options.msgID);673}674return x;675};676677publishMany = async (678messages: { mesg: T; options?: PublishOptions }[],679): Promise<680({ seq: number; time: number } | { error: string; code?: any })[]681> => {682let result: (683| { seq: number; time: number }684| { error: string; code?: any }685)[] = [];686687for (let i = 0; i < messages.length; i += PUBLISH_MANY_BATCH_SIZE) {688const batch = messages.slice(i, i + PUBLISH_MANY_BATCH_SIZE);689result = result.concat(await this.publishMany0(batch));690}691692return result;693};694695private publishMany0 = async (696messages: { mesg: T; options?: PublishOptions }[],697): Promise<698({ seq: number; time: number } | { error: string; code?: any })[]699> => {700const v: SetOptions[] = [];701let timeout: number | undefined = undefined;702for (const { mesg, options } of messages) {703if (options?.timeout) {704if (timeout === undefined) {705timeout = options.timeout;706} else {707timeout = Math.min(timeout, options.timeout);708}709}710const md = messageData(mesg, { headers: options?.headers });711v.push({712key: options?.key,713messageData: md,714previousSeq: options?.previousSeq,715msgID: options?.msgID,716ttl: options?.ttl,717});718}719return await this.persistClient.setMany(v, { timeout });720};721722get = (n?): T | T[] => {723if (n == null) {724return this.getAll();725} else {726return this.messages[n];727}728};729730seq = (n: number): number | undefined => {731return this.raw[n]?.seq;732};733734getAll = (): T[] => {735return [...this.messages];736};737738get length(): number {739return this.messages.length;740}741742get start_seq(): number | undefined {743return this._start_seq;744}745746headers = (n: number): { [key: string]: any } | undefined => {747return this.raw[n]?.headers;748};749750// key:value interface for subset of messages pushed with key option set.751// NOTE: This does NOT throw an error if our local seq is out of date (leave that752// to dkv built on this).753setKv = async (754key: string,755mesg: T,756options?: {757headers?: Headers;758previousSeq?: number;759},760): Promise<{ seq: number; time: number } | undefined> => {761return await this.publish(mesg, { ...options, key });762};763764setKvMany = async (765x: {766key: string;767mesg: T;768options?: {769headers?: Headers;770previousSeq?: number;771};772}[],773): Promise<774({ seq: number; time: number } | { error: string; code?: any })[]775> => {776const messages: { mesg: T; options?: PublishOptions }[] = [];777for (const { key, mesg, options } of x) {778messages.push({ mesg, options: { ...options, key } });779}780return await this.publishMany(messages);781};782783deleteKv = async (784key: string,785options?: {786msgID?: string;787previousSeq?: number;788},789) => {790if (this.kv[key] === undefined) {791// nothing to do792return;793}794return await this.publish(null as any, {795...options,796headers: { [COCALC_TOMBSTONE_HEADER]: true },797key,798ttl: DEFAULT_TOMBSTONE_TTL,799});800};801802getKv = (key: string): T | undefined => {803return this.kv[key]?.mesg;804};805806hasKv = (key: string): boolean => {807return this.kv?.[key] !== undefined;808};809810getAllKv = (): { [key: string]: T } => {811const all: { [key: string]: T } = {};812for (const key in this.kv) {813all[key] = this.kv[key].mesg;814}815return all;816};817818// efficient way to get just the keys -- use this instead of819// getAllKv if you just need the keys.820keysKv = (): string[] => {821return Object.keys(this.kv);822};823824seqKv = (key: string): number | undefined => {825return this.kv[key]?.raw.seq;826};827828timeKv = (key?: string): Date | { [key: string]: Date } | undefined => {829if (key === undefined) {830const all: { [key: string]: Date } = {};831for (const key in this.kv) {832all[key] = new Date(this.kv[key].raw.timestamp);833}834return all;835}836const r = this.kv[key]?.raw;837if (r == null) {838return;839}840return new Date(r.timestamp);841};842843headersKv = (key: string): { [key: string]: any } | undefined => {844return this.kv[key]?.raw?.headers;845};846847get lengthKv(): number {848return Object.keys(this.kv).length;849}850851// load older messages starting at start_seq up to the oldest message852// we currently have. This can throw an exception in case of heavy853// load or network issues, but should completely succeed or make854// no change.855load = async ({856start_seq,857noEmit,858}: {859start_seq: number;860noEmit?: boolean;861}) => {862// This is used for loading more TimeTravel history863if (this.storage == null) {864throw Error("bug");865}866// this is one before the oldest we have867const end_seq = (this.raw[0]?.seq ?? this._start_seq ?? 1) - 1;868if (start_seq > end_seq) {869// nothing to load870return;871}872// we're moving start_seq back to this point873this._start_seq = start_seq;874const messages = await this.persistClient.getAll({875start_seq,876end_seq,877});878this.processPersistentMessages(messages, { noEmit, noSeqCheck: true });879};880881private getAllMissingMessages = reuseInFlight(async () => {882await until(883async () => {884if (this.client == null || this.missingMessages.size == 0) {885return true;886}887try {888const missing = Array.from(this.missingMessages);889missing.sort();890log("core-stream: getMissingSeq", missing, this.storage);891const messages = await this.persistClient.getAll({892start_seq: missing[0],893end_seq: missing[missing.length - 1],894});895this.processPersistentMessages(messages, {896noEmit: false,897noSeqCheck: true,898});899for (const seq of missing) {900this.missingMessages.delete(seq);901}902} catch (err) {903log(904"core-stream: WARNING -- issue getting missing updates",905err,906this.storage,907);908}909return false;910},911{ start: 1000, max: 15000, decay: 1.3 },912);913});914915// get server assigned time of n-th message in stream916time = (n: number): Date | undefined => {917const r = this.raw[n];918if (r == null) {919return;920}921return new Date(r.timestamp);922};923924times = () => {925const v: (Date | undefined)[] = [];926for (let i = 0; i < this.length; i++) {927v.push(this.time(i));928}929return v;930};931932stats = ({933start_seq = 1,934}: {935start_seq?: number;936} = {}): { count: number; bytes: number } | undefined => {937if (this.raw == null) {938return;939}940let count = 0;941let bytes = 0;942for (const { raw, seq } of this.raw) {943if (seq == null) {944continue;945}946if (seq < start_seq) {947continue;948}949count += 1;950bytes += raw.length;951}952return { count, bytes };953};954955// delete all messages up to and including the956// one at position index, i.e., this.messages[index]957// is deleted.958// NOTE: For ephemeral streams, clients will NOT see the result of a delete,959// except when they load the stream later. For persistent streams all960// **connected** clients will see the delete. THAT said, this is not a "proper"961// distributed computing primitive with tombstones, etc. This is primarily962// meant for reducing space usage, and shouldn't be relied on for963// any other purpose.964delete = async ({965all,966last_index,967seq,968last_seq,969key,970seqs,971}: {972// give exactly ONE parameter -- by default nothing happens with no params973// all: delete everything974all?: boolean;975// last_index: everything up to and including index'd message976last_index?: number;977// seq: delete message with this sequence number978seq?: number;979// seqs: delete the messages in this array of sequence numbers980seqs?: number[];981// last_seq: delete everything up to and including this sequence number982last_seq?: number;983// key: delete the message with this key984key?: string;985} = {}): Promise<{ seqs: number[] }> => {986let opts;987if (all) {988opts = { all: true };989} else if (last_index != null) {990if (last_index >= this.raw.length) {991opts = { all: true };992} else if (last_index < 0) {993return { seqs: [] };994} else {995const last_seq = this.raw[last_index].seq;996if (last_seq === undefined) {997throw Error(`BUG: invalid index ${last_index}`);998}999opts = { last_seq };1000}1001} else if (seq != null) {1002opts = { seq };1003} else if (seqs != null) {1004opts = { seqs };1005} else if (last_seq != null) {1006opts = { last_seq };1007} else if (key != null) {1008const seq = this.kv[key]?.raw?.seq;1009if (seq === undefined) {1010return { seqs: [] };1011}1012opts = { seq };1013}1014return await this.persistClient.delete(opts);1015};10161017// delete messages that are no longer needed since newer values have been written1018gcKv = () => {1019this.kvChangeBytes = 0;1020for (let i = 0; i < this.raw.length; i++) {1021const key = this.raw[i].key;1022if (key !== undefined) {1023if (this.raw[i].raw.length > 0 && this.raw[i] !== this.kv[key].raw) {1024this.raw[i] = {1025...this.raw[i],1026headers: undefined,1027raw: Buffer.from(""),1028} as RawMsg;1029this.messages[i] = undefined as T;1030}1031}1032}1033};1034}10351036export interface PublishOptions {1037// headers for this message1038headers?: Headers;1039// unique id for this message to dedup so if you send the same1040// message more than once with the same id it doesn't get published1041// multiple times.1042msgID?: string;1043// key -- if specified a key field is also stored on the server,1044// and any previous messages with the same key are deleted. Also,1045// an entry is set in this.kv[key] so that this.getKv(key), etc. work.1046key?: string;1047// if key is specified and previousSeq is set, the server throws1048// an error if the sequence number of the current key is1049// not previousSeq. We use this with this.seqKv(key) to1050// provide read/change/write semantics and to know when we1051// should resovle a merge conflict. This is ignored if1052// key is not specified.1053previousSeq?: number;1054// if set to a number of ms AND the config option allow_msg_ttl1055// is set on this persistent stream, then1056// this message will be deleted after the given amount of time (in ms).1057ttl?: number;1058timeout?: number;1059}10601061export const cache = refCache<CoreStreamOptions, CoreStream>({1062name: "core-stream",1063createObject: async (options: CoreStreamOptions) => {1064if (options.client == null) {1065options = { ...options, client: await conat() };1066}1067const cstream = new CoreStream(options);1068await cstream.init();1069return cstream;1070},1071createKey: ({ client, ...options }) => {1072return jsonStableStringify({ id: client?.id, ...options })!;1073},1074});10751076export async function cstream<T>(1077options: CoreStreamOptions,1078): Promise<CoreStream<T>> {1079return await cache(options);1080}108110821083