Path: blob/master/src/packages/conat/sync/dstream.ts
1710 views
/*1Eventually Consistent Distributed Message Stream23DEVELOPMENT:456# in node -- note the package directory!!7~/cocalc/src/packages/backend node89> s = await require("@cocalc/backend/conat/sync").dstream({name:'test'});10> s = await require("@cocalc/backend/conat/sync").dstream({project_id:cc.current().project_id,name:'foo'});01112See the guide for dkv, since it's very similar, especially for use in a browser.13*/1415import { EventEmitter } from "events";16import {17CoreStream,18type RawMsg,19type ChangeEvent,20type PublishOptions,21} from "./core-stream";22import { randomId } from "@cocalc/conat/names";23import { reuseInFlight } from "@cocalc/util/reuse-in-flight";24import { isNumericString } from "@cocalc/util/misc";25import refCache from "@cocalc/util/refcache";26import {27type Client,28type Headers,29ConatError,30} from "@cocalc/conat/core/client";31import jsonStableStringify from "json-stable-stringify";32import type { JSONValue } from "@cocalc/util/types";33import { Configuration } from "./core-stream";34import { conat } from "@cocalc/conat/client";35import { delay, map as awaitMap } from "awaiting";36import { asyncThrottle, until } from "@cocalc/util/async-utils";37import {38inventory,39type Inventory,40INVENTORY_UPDATE_INTERVAL,41} from "./inventory";42import { getLogger } from "@cocalc/conat/client";4344const logger = getLogger("sync:dstream");4546export interface DStreamOptions {47// what it's called by us48name: string;49account_id?: string;50project_id?: string;51config?: Partial<Configuration>;52// only load historic messages starting at the given seq number.53start_seq?: number;54desc?: JSONValue;5556client?: Client;57noAutosave?: boolean;58ephemeral?: boolean;59sync?: boolean;6061noCache?: boolean;62noInventory?: boolean;6364service?: string;65}6667export class DStream<T = any> extends EventEmitter {68public readonly name: string;69private stream: CoreStream;70private messages: T[];71private raw: RawMsg[];72private noAutosave: boolean;73// TODO: using Map for these will be better because we use .length a bunch, which is O(n) instead of O(1).74private local: { [id: string]: T } = {};75private publishOptions: {76[id: string]: { headers?: Headers };77} = {};78private saved: { [seq: number]: T } = {};79private opts: DStreamOptions;8081constructor(opts: DStreamOptions) {82super();83logger.debug("constructor", opts.name);84if (opts.client == null) {85throw Error("client must be specified");86}87this.opts = opts;88this.noAutosave = !!opts.noAutosave;89this.name = opts.name;90this.stream = new CoreStream(opts);91this.messages = this.stream.messages;92this.raw = this.stream.raw;93return new Proxy(this, {94get(target, prop) {95return typeof prop == "string" && isNumericString(prop)96? target.get(parseInt(prop))97: target[String(prop)];98},99});100}101102private initialized = false;103init = async () => {104if (this.initialized) {105throw Error("init can only be called once");106}107this.initialized = true;108if (this.isClosed()) {109throw Error("closed");110}111this.stream.on("change", this.handleChange);112this.stream.on("reset", () => {113this.local = {};114this.saved = {};115});116await this.stream.init();117this.emit("connected");118};119120private handleChange = ({ mesg, raw, msgID }: ChangeEvent<T>) => {121if (raw?.seq !== undefined) {122delete this.saved[raw.seq];123}124if (mesg === undefined) {125return;126}127if (msgID) {128// this is critical with core-stream.ts, since otherwise there is a moment129// when the same message is in both this.local *and* this.messages, and you'll130// see it doubled in this.getAll().131delete this.local[msgID];132}133this.emit("change", mesg, raw?.seq);134if (this.isStable()) {135this.emit("stable");136}137};138139isStable = () => {140for (const _ in this.saved) {141return false;142}143for (const _ in this.local) {144return false;145}146return true;147};148149isClosed = () => {150return this.stream == null;151};152153close = () => {154if (this.isClosed()) {155return;156}157logger.debug("close", this.name);158const stream = this.stream;159stream.removeListener("change", this.handleChange);160// @ts-ignore161delete this.stream;162stream.close();163this.emit("closed");164this.removeAllListeners();165// @ts-ignore166delete this.local;167// @ts-ignore168delete this.messages;169// @ts-ignore170delete this.raw;171// @ts-ignore172delete this.opts;173};174175get = (n?): T | T[] => {176if (this.isClosed()) {177throw Error("closed");178}179if (n == null) {180return this.getAll();181} else {182if (n < this.messages.length) {183return this.messages[n];184}185const v = Object.keys(this.saved);186if (n < v.length + this.messages.length) {187return this.saved[n - this.messages.length];188}189return Object.values(this.local)[n - this.messages.length - v.length];190}191};192193getAll = (): T[] => {194if (this.isClosed()) {195throw Error("closed");196}197return [198...this.messages,199...Object.values(this.saved),200...Object.values(this.local),201];202};203204// sequence number of n-th message205seq = (n: number): number | undefined => {206if (n < this.raw.length) {207return this.raw[n].seq;208}209const v = Object.keys(this.saved);210if (n < v.length + this.raw.length) {211return parseInt(v[n - this.raw.length]);212}213};214215// all sequences numbers of messages216seqs = (): number[] => {217const seqs = this.raw.map(({ seq }) => seq);218for (const seq in this.saved) {219seqs.push(parseInt(seq));220}221return seqs;222};223224time = (n: number): Date | undefined => {225if (this.isClosed()) {226throw Error("not initialized");227}228return this.stream.time(n);229};230231// all server assigned times of messages in the stream.232times = (): (Date | undefined)[] => {233if (this.isClosed()) {234throw Error("not initialized");235}236return this.stream.times();237};238239get length(): number {240return (241this.messages.length +242Object.keys(this.saved).length +243Object.keys(this.local).length244);245}246247publish = (248mesg: T,249// NOTE: if you call this.headers(n) it is NOT visible until250// the publish is confirmed. This could be changed with more work if it matters.251options?: { headers?: Headers; ttl?: number },252): void => {253const id = randomId();254this.local[id] = mesg;255if (options != null) {256this.publishOptions[id] = options;257}258if (!this.noAutosave) {259this.save();260}261this.updateInventory();262};263264headers = (n) => {265if (this.isClosed()) {266throw Error("closed");267}268return this.stream.headers(n);269};270271push = (...args: T[]) => {272if (this.isClosed()) {273throw Error("closed");274}275for (const mesg of args) {276this.publish(mesg);277}278};279280hasUnsavedChanges = (): boolean => {281if (this.isClosed()) {282return false;283}284return Object.keys(this.local).length > 0;285};286287unsavedChanges = (): T[] => {288return Object.values(this.local);289};290291save = reuseInFlight(async () => {292await until(293async () => {294if (this.isClosed()) {295return true;296}297try {298await this.attemptToSave();299//console.log("successfully saved");300} catch (err) {301if (false && !process.env.COCALC_TEST_MODE) {302console.log(303`WARNING: dstream attemptToSave failed - ${err}`,304this.name,305);306}307}308return !this.hasUnsavedChanges();309},310{ start: 150, decay: 1.3, max: 10000 },311);312});313314private attemptToSave = async () => {315if (true) {316await this.attemptToSaveBatch();317} else {318await this.attemptToSaveParallel();319}320};321322private attemptToSaveBatch = reuseInFlight(async () => {323if (this.isClosed()) {324throw Error("closed");325}326const v: { mesg: T; options: PublishOptions }[] = [];327const ids = Object.keys(this.local);328for (const id of ids) {329const mesg = this.local[id];330const options = {331...this.publishOptions[id],332msgID: id,333};334v.push({ mesg, options });335}336const w: (337| { seq: number; time: number; error?: undefined }338| { error: string; code?: any }339)[] = await this.stream.publishMany(v);340341if (this.isClosed()) {342return;343}344345let errors = false;346for (let i = 0; i < w.length; i++) {347const id = ids[i];348if (w[i].error) {349const x = w[i] as { error: string; code?: any };350if (x.code == "reject") {351delete this.local[id];352const err = new ConatError(x.error, { code: x.code });353// err has mesg and subject set.354this.emit("reject", { err, mesg: v[i].mesg });355}356if (!process.env.COCALC_TEST_MODE) {357console.warn(358`WARNING -- error saving dstream '${this.name}' -- ${w[i].error}`,359);360}361errors = true;362continue;363}364const { seq } = w[i] as { seq: number };365if ((this.raw[this.raw.length - 1]?.seq ?? -1) < seq) {366// it still isn't in this.raw367this.saved[seq] = v[i].mesg;368}369delete this.local[id];370delete this.publishOptions[id];371}372if (errors) {373throw Error(`there were errors saving dstream '${this.name}'`);374}375});376377// non-batched version378private attemptToSaveParallel = reuseInFlight(async () => {379const f = async (id) => {380if (this.isClosed()) {381throw Error("closed");382}383const mesg = this.local[id];384try {385// @ts-ignore386const { seq } = await this.stream.publish(mesg, {387...this.publishOptions[id],388msgID: id,389});390if (this.isClosed()) {391return;392}393if ((this.raw[this.raw.length - 1]?.seq ?? -1) < seq) {394// it still isn't in this.raw395this.saved[seq] = mesg;396}397delete this.local[id];398delete this.publishOptions[id];399} catch (err) {400if (err.code == "reject") {401delete this.local[id];402// err has mesg and subject set.403this.emit("reject", { err, mesg });404} else {405if (!process.env.COCALC_TEST_MODE) {406console.warn(407`WARNING: problem saving dstream ${this.name} -- ${err}`,408);409}410}411}412if (this.isStable()) {413this.emit("stable");414}415};416// NOTE: ES6 spec guarantees "String keys are returned in the order417// in which they were added to the object."418const ids = Object.keys(this.local);419const MAX_PARALLEL = 50;420await awaitMap(ids, MAX_PARALLEL, f);421});422423// load older messages starting at start_seq424load = async (opts: { start_seq: number }) => {425if (this.isClosed()) {426throw Error("closed");427}428await this.stream.load(opts);429};430431// this is not synchronous -- it makes sure everything is saved out,432// then delete the persistent stream433// NOTE: for ephemeral streams, other clients will NOT see the result of a delete (unless they reconnect).434delete = async (opts?) => {435await this.save();436if (this.isClosed()) {437throw Error("closed");438}439return await this.stream.delete(opts);440};441442get start_seq(): number | undefined {443return this.stream?.start_seq;444}445446// get or set config447config = async (448config: Partial<Configuration> = {},449): Promise<Configuration> => {450if (this.isClosed()) {451throw Error("closed");452}453return await this.stream.config(config);454};455456private updateInventory = asyncThrottle(457async () => {458if (this.isClosed() || this.opts == null || this.opts.noInventory) {459return;460}461await delay(500);462if (this.isClosed()) {463return;464}465let inv: Inventory | undefined = undefined;466try {467const { account_id, project_id, desc } = this.opts;468const inv = await inventory({469account_id,470project_id,471service: this.opts.service,472});473if (this.isClosed()) {474return;475}476const status = {477type: "stream" as "stream",478name: this.opts.name,479desc,480...(await this.stream.inventory()),481};482inv.set(status);483} catch (err) {484if (!process.env.COCALC_TEST_MODE) {485console.log(486`WARNING: unable to update inventory. name='${this.opts.name} -- ${err}'`,487);488}489} finally {490// @ts-ignore491inv?.close();492}493},494INVENTORY_UPDATE_INTERVAL,495{ leading: true, trailing: true },496);497}498499export const cache = refCache<DStreamOptions, DStream>({500name: "dstream",501createKey: (options: DStreamOptions) => {502if (!options.name) {503throw Error("name must be specified");504}505const { name, account_id, project_id, client } = options;506const id = client?.id;507return jsonStableStringify({ name, account_id, project_id, id })!;508},509createObject: async (options: DStreamOptions) => {510if (options.client == null) {511options = { ...options, client: await conat() };512}513const dstream = new DStream(options);514await dstream.init();515return dstream;516},517});518519export async function dstream<T>(options: DStreamOptions): Promise<DStream<T>> {520return await cache(options);521}522523524