/*1core/client.s -- core conat client23This is a client that has a similar API to NATS / Socket.io, but is much,4much better in so many ways:56- It has global pub/sub just like with NATS. This uses the server to7rebroadcast messages, and for authentication.8Better than NATS: Authentication is done for a subject *as9needed* instead of at connection time.1011- Message can be arbitrarily large and they are *automatically* divided12into chunks and reassembled. Better than both NATS and socket.io.1314- There are multiple supported ways of encoding messages, and15no coordination is required with the server or other clients! E.g.,16one message can be sent with one encoding and the next with a different17encoding and that's fine.18- MsgPack: https://msgpack.org/ -- a very compact encoding that handles19dates nicely and small numbers efficiently. This also works20well with binary Buffer objects, which is nice.21- JsonCodec: uses JSON.stringify and TextEncoder. This does not work22with Buffer or Date and is less compact, but can be very fast.2324- One BIG DIFFERENCE from Nats is that when a message is sent the sender25can optionally find out how many clients received it. They can also wait26until there is interest and send the message again. This is automated so27it's very easy to use, and it makes writing distributed services without28race conditions making them broken temporarily much easier. HOWEVER,29there is one caveat -- if as an admin you create a "tap", i.e., you30subscribe to all messages matching some pattern just to see what's going31on, then currently that counts in the delivery count and interest, and that32would then cause these race conditions to happen again. E.g., a user33signs in, subscribes to their INBOX, sends a request, and gets a response34to that inbox, but does this all quickly, and in a cluster, the server doesn't35see the inbox yet, so it fails. As a workaround, subscriptions to the36subject pattern '>' are invisible, so you can always tap into '>' for debugging37purposes. TODO: implement a general way of making an invisible subscriber that38doesn't count.394041THE CORE API4243This section contains the crucial information you have to know to build a distributed44system using Conat. It's our take on the NATS primitives (it's not exactly the45same, but it is close). It's basically a symmetrical pub/sub/reqest/respond model46for messaging on which you can build distributed systems. The tricky part, which47NATS.js gets wrong (in my opinion), is implementing this in a way that is robust48and scalable, in terms for authentication, real world browser connectivity and49so on. Our approach is to use proven mature technology like socket.io and sqlite,50instead of writing everything from scratch.5152Clients: We view all clients as plugged into a common "dial tone",53except for optional permissions that are configured when starting the server.54The methods you call on the client to build everything are:5556- subscribe, subscribeSync - subscribe to a subject which returns an57async iterator over all messages that match the subject published by58anyone with permission to do so. If you provide the same optional59queue parameter for multiple subscribers, then one subscriber in each queue60group receives each message. The async form of this functino confirms61the subscription was created before returning. If a client creates multiple62subscriptions at the same time, the queue group must be the same.63Subscriptions are guaranteed to stay valid until the client ends them;64they do not stop working due to client or server reconnects or restarts.65(If you need more subscriptions with different queue groups, make another66client object.)6768- publish, publishSync - publish to a subject. The async version returns69a count of the number of recipients, whereas the sync version is70fire-and-forget.71**There is no a priori size limit on messages since chunking72is automatic. However, we have to impose some limit, but73it can be much larger than the socketio message size limit.**7475- request - send a message to a subject, and if there is at least one76subscriber listening, it may respond. If there are no subscribers,77it throws a 503 error. To create a microservice, subscribe78to a subject pattern and called mesg.respond(...) on each message you79receive.8081- requestMany - send a message to a subject, and receive many82messages in reply. Typically you end the response stream by sending83a null message, but what you do is up to you. This is very useful84for streaming arbitrarily large data, long running changefeeds, LLM85responses, etc.868788Messages: A message mesg is:8990- Data:91- subject - the subject the message was sent to92- encoding - usually MessagePack93- raw - encoded binary data94- headers - a JSON-able Javascript object.9596- Methods:97- data: this is a property, so if you do mesg.data, then it decodes raw98and returns the resulting Javascript object.99- respond, respondSync: if REPLY_HEADER is set, calling this publishes a100respond message to the original sender of the message.101102103Persistence:104105We also implement persistent streams, where you can also set a key. This can106be used to build the analogue of Jetstream's streams and kv stores. The object107store isn't necessary since there is no limit on message size. Conat's persistent108streams are compressed by default and backed by individual sqlite files, which109makes them very memory efficient and it is easy to tier storage to cloud storage.110111UNIT TESTS: See packages/server/conat/test/core112113MISC NOTES:114115NOTE: There is a socketio msgpack parser, but it just doesn't116work at all, which is weird. Also, I think it's impossible to117do the sort of chunking we want at the level of a socket.io118parser -- it's just not possible in that the encoding. We customize119things purely client side without using a parser, and get a much120simpler and better result, inspired by how NATS approaches things121with opaque messages.122123124SUBSCRIPTION ROBUSTNESS: When you call client.subscribe(...) you get back an async iterator.125It ONLY ends when you explicitly do the standard ways of terminating126such an iterator, including calling .close() on it. It is a MAJOR BUG127if it were to terminate for any other reason. In particular, the subscription128MUST NEVER throw an error or silently end when the connection is dropped129then resumed, or the server is restarted, or the client connects to130a different server! These situations can, of course, result in missing131some messages, but that's understood. There are no guarantees at all with132a subscription that every message is received. Finally, any time a client133disconnects and reconnects, the client ensures that all subscriptions134exist for it on the server via a sync process.135136Subscription robustness is a major difference with NATS.js, which would137mysteriously terminate subscriptions for a variety of reasons, meaning that any138code using subscriptions had to be wrapped in ugly complexity to be139usable in production.140141INTEREST AWARENESS: In Conat there is a cluster-aware event driving way to142wait for interest in a subject. This is an extremely useful extension to143NATS functionality, since it makes it much easier to dynamically setup144a client and a server and exchange messages without having to poll and fail145potentially a few times. This makes certain operations involving complicated146steps behind the scenes -- upload a file, open a file to edit with sync, etc. --147feel more responsive.148149USAGE:150151The following should mostly work to interactively play around with this152code and develop it. It's NOT automatically tested and depends on your153environment though, so may break. See the unit tests in154155packages/server/conat/test/core/156157for something that definitely works perfectly.158159160For developing at the command line, cd to packages/backend, then in node:161162c = require('@cocalc/backend/conat/conat').connect()163164or165166c = require('@cocalc/conat/core/client').connect('http://localhost:3000')167168c.watch('a')169170s = await c.subscribe('a'); for await (const x of s) { console.log(x.length)}171172// in another console173174c = require('@cocalc/backend/conat/conat').connect()175c.publish('a', 'hello there')176177// in browser (right now)178179cc.conat.conat()180181// client server:182183s = await c.subscribe('eval'); for await(const x of s) { x.respond(eval(x.data)) }184185then in another console186187f = async () => (await c.request('eval', '2+3')).data188await f()189190t = Date.now(); for(i=0;i<1000;i++) { await f()} ; Date.now()-t191192// slower, but won't silently fail due to errors, etc.193194f2 = async () => (await c.request('eval', '2+3', {confirm:true})).data195196Wildcard subject:197198199c = require('@cocalc/conat/core/client').connect(); c.watch('a.*');200201202c = require('@cocalc/conat/core/client').connect(); c.publish('a.x', 'foo')203204205Testing disconnect206207c.sub('>')208c.conn.io.engine.close();0;209210other:211212a=0; setInterval(()=>c.pub('a',a++), 250)213214*/215216import {217connect as connectToSocketIO,218type SocketOptions,219type ManagerOptions,220} from "socket.io-client";221import { EventIterator } from "@cocalc/util/event-iterator";222import type { ConnectionStats, ServerInfo } from "./types";223import * as msgpack from "@msgpack/msgpack";224import { randomId } from "@cocalc/conat/names";225import type { JSONValue } from "@cocalc/util/types";226import { EventEmitter } from "events";227import {228isValidSubject,229isValidSubjectWithoutWildcards,230} from "@cocalc/conat/util";231import { reuseInFlight } from "@cocalc/util/reuse-in-flight";232import { once, until } from "@cocalc/util/async-utils";233import { delay } from "awaiting";234import { getLogger } from "@cocalc/conat/client";235import { refCacheSync } from "@cocalc/util/refcache";236import { join } from "path";237import { dko, type DKO } from "@cocalc/conat/sync/dko";238import { dkv, type DKVOptions, type DKV } from "@cocalc/conat/sync/dkv";239import {240dstream,241type DStreamOptions,242type DStream,243} from "@cocalc/conat/sync/dstream";244import { akv, type AKV } from "@cocalc/conat/sync/akv";245import { astream, type AStream } from "@cocalc/conat/sync/astream";246import TTL from "@isaacs/ttlcache";247import {248ConatSocketServer,249ConatSocketClient,250ServerSocket,251type SocketConfiguration,252} from "@cocalc/conat/socket";253export { type ConatSocketServer, ConatSocketClient, ServerSocket };254import {255type SyncTableOptions,256type ConatSyncTable,257createSyncTable,258} from "@cocalc/conat/sync/synctable";259260export const MAX_INTEREST_TIMEOUT = 90_000;261262const DEFAULT_WAIT_FOR_INTEREST_TIMEOUT = 30_000;263264const MSGPACK_ENCODER_OPTIONS = {265// ignoreUndefined is critical so database queries work properly, and266// also we have a lot of api calls with tons of wasted undefined values.267ignoreUndefined: true,268};269270export const STICKY_QUEUE_GROUP = "sticky";271272export const DEFAULT_SOCKETIO_CLIENT_OPTIONS = {273// A major problem if we allow long polling is that we must always use at most274// half the chunk size... because there is no way to know if recipients will be275// using long polling to RECEIVE messages. Not insurmountable.276transports: ["websocket"],277rememberUpgrade: true,278279// nodejs specific for project/compute server in some settings280rejectUnauthorized: false,281282reconnection: true,283reconnectionDelay: process.env.COCALC_TEST_MODE ? 50 : 500,284reconnectionDelayMax: process.env.COCALC_TEST_MODE ? 500 : 15000,285reconnectionAttempts: 9999999999, // infinite286};287288type State = "disconnected" | "connected" | "closed";289290const logger = getLogger("core/client");291292interface Options {293// address = the address of a cocalc server, including the base url, e.g.,294//295// https://cocalc.com296//297// or for a dev server running locally with a base url:298//299// http://localhost:4043/3fa218e5-7196-4020-8b30-e2127847cc4f/port/5002300//301// The socketio path is always /conat (after the base url) and is set automatically.302//303address?: string;304inboxPrefix?: string;305systemAccountPassword?: string;306}307308export type ClientOptions = Options & {309noCache?: boolean;310} & Partial<SocketOptions> &311Partial<ManagerOptions>;312313const INBOX_PREFIX = "_INBOX";314const REPLY_HEADER = "CN-Reply";315const MAX_HEADER_SIZE = 100000;316317const STATS_LOOP = 5000;318319// fairly long since this is to avoid leaks, not for responsiveness in the UI.320export const DEFAULT_SUBSCRIPTION_TIMEOUT = 60_000;321322// long so servers don't get DOS's on startup, etc. Also, we use interest-based323// checks when publish and request fail, so we're not depending on these to324// fail as part of the normal startup process for anything.325export let DEFAULT_REQUEST_TIMEOUT = 30_000;326export let DEFAULT_PUBLISH_TIMEOUT = 30_000;327328export function setDefaultTimeouts({329request = DEFAULT_REQUEST_TIMEOUT,330publish = DEFAULT_PUBLISH_TIMEOUT,331}: {332request?: number;333publish?: number;334}) {335DEFAULT_REQUEST_TIMEOUT = request;336DEFAULT_PUBLISH_TIMEOUT = publish;337}338339export enum DataEncoding {340MsgPack = 0,341JsonCodec = 1,342}343344interface SubscriptionOptions {345maxWait?: number;346mesgLimit?: number;347queue?: string;348// sticky: when a choice from a queue group is made, the same choice is always made349// in the future for any message with subject matching subject with the last segment350// replaced by a *, until the target for the choice goes away. Setting this just351// sets the queue option to the constant string STICKY_QUEUE_GROUP.352//353// Examples of two subjects matching "except possibly last segments" are354// - foo.bar.lJcBSieLn355// - foo.bar.ZzsDC376ge356//357// You can put anything random in the last segment and all messages358// that match foo.bar.* get the same choice from the queue group.359// The idea is that *when* the message with subject foo.bar.lJcBSieLn gets360// sent, the backend server selects a target from the queue group to receive361// that message. It remembers the choice, and so long as that target is subscribed,362// it sends any message matching foo.bar.* to that same target.363// This is used in our implementation of persistent socket connections that364// are built on pub/sub.365366// The underlying implementation uses (1) consistent hashing and (2) a stream367// to sync state of the servers.368//369// If the members of the sticky queue group have been stable for a while (e.g., a minute)370// then all servers in a local cluster have the same list of subscribers in the sticky371// queue group, and using consistent hashing any server will make the same choice of372// where to route messages. If the members of the sticky queue group are dynamically changing,373// it is possible for an inconsistent choice to be made. If this happens, it will be fixed374// within a few seconds. During that time, it's possible a virtual conat socket375// connection could get created then destroyed a few seconds laters, due to the sticky376// assignment changing.377//378// The following isn't implemented yet -- one idea. Another idea would be the stickiness379// is local to a cluster. Not sure!380// Regarding a *supercluster*, if no choice has already been made in any cluster for381// a given subject (except last segment), then a choice is made using consistent hashing382// from the subscribers in the *nearest* cluster that has at least one subscriber.383// If choices are made simultaneously across the supercluster, then it is likely that384// they are inconsistent. As soon as these choices are visible, they are resolved, with385// the tie broken using lexicographic sort of the "socketio room" (this is a386// random string that is used to send messages to a subscriber).387388sticky?: boolean;389respond?: Function;390// timeout to create the subscription -- this may wait *until* you connect before391// it starts ticking.392timeout?: number;393}394395// WARNING! This is the default and you can't just change it!396// Yes, for specific messages you can, but in general DO NOT. The reason is because, e.g.,397// JSON will turn Dates into strings, and we no longer fix that. So unless you modify the398// JsonCodec to handle Date's properly, don't change this!!399const DEFAULT_ENCODING = DataEncoding.MsgPack;400401function cocalcServerToSocketioAddress(url: string): {402address: string;403path: string;404} {405const u = new URL(url, "http://dummy.org");406const address = u.origin;407const path = join(u.pathname, "conat");408return { address, path };409}410411const cache = refCacheSync<ClientOptions, Client>({412name: "conat-client",413createObject: (opts: ClientOptions) => {414return new Client(opts);415},416});417418export function connect(opts: ClientOptions = {}) {419//console.trace("connect", opts);420if (!opts.address) {421const x = cache.one();422if (x != null) {423return x;424}425}426return cache(opts);427}428429// Get any cached client, if there is one; otherwise make one430// with default options.431export function getClient() {432return cache.one() ?? connect();433}434435export class Client extends EventEmitter {436public conn: ReturnType<typeof connectToSocketIO>;437// queueGroups is a map from subject to the queue group for the subscription to that subject438private queueGroups: { [subject: string]: string } = {};439private subs: { [subject: string]: SubscriptionEmitter } = {};440private sockets: {441// all socket servers created using this Client442servers: { [subject: string]: ConatSocketServer };443// all client connections created using this Client.444clients: { [subject: string]: { [id: string]: ConatSocketClient } };445} = { servers: {}, clients: {} };446public readonly options: ClientOptions;447private inboxSubject: string;448private inbox?: EventEmitter;449private permissionError = {450pub: new TTL<string, string>({ ttl: 1000 * 60 }),451sub: new TTL<string, string>({ ttl: 1000 * 60 }),452};453public info: ServerInfo | undefined = undefined;454// total number of455public readonly stats: ConnectionStats & {456recv0: { messages: number; bytes: number };457} = {458send: { messages: 0, bytes: 0 },459recv: { messages: 0, bytes: 0 },460// recv0 = count since last connect461recv0: { messages: 0, bytes: 0 },462subs: 0,463};464465public readonly id: string = randomId();466public state: State = "disconnected";467468constructor(options: ClientOptions) {469super();470if (!options.address) {471if (!process.env.CONAT_SERVER) {472throw Error(473"Must specificy address or set CONAT_SERVER environment variable",474);475}476options = { ...options, address: process.env.CONAT_SERVER };477}478this.options = options;479this.setMaxListeners(1000);480481// for socket.io the address has no base url482const { address, path } = cocalcServerToSocketioAddress(483this.options.address!,484);485logger.debug(`Conat: Connecting to ${this.options.address}...`);486// if (options.extraHeaders == null) {487// console.trace("WARNING: no auth set");488// }489this.conn = connectToSocketIO(address, {490...DEFAULT_SOCKETIO_CLIENT_OPTIONS,491// it is necessary to manually managed reconnects due to a bugs492// in socketio that has stumped their devs493// -- https://github.com/socketio/socket.io/issues/5197494// So no matter what options are set, we never use socketio's495// reconnection logic. if options.reconnection is true or496// not given, then we implement (in this file) reconnect ourselves.497// The browser frontend explicit sets options.reconnection false498// and uses its own logic.499...options,500...(options.systemAccountPassword501? {502extraHeaders: {503...options.extraHeaders,504Cookie: `sys=${options.systemAccountPassword}`,505},506}507: undefined),508path,509reconnection: true,510});511512this.conn.on("info", (info, ack) => {513if (typeof ack == "function") {514ack();515}516const firstTime = this.info == null;517this.info = info;518this.emit("info", info);519setTimeout(this.syncSubscriptions, firstTime ? 3000 : 0);520});521this.conn.on("permission", ({ message, type, subject }) => {522logger.debug(message);523this.permissionError[type]?.set(subject, message);524});525this.conn.on("connect", async () => {526logger.debug(`Conat: Connected to ${this.options.address}`);527if (this.conn.connected) {528this.setState("connected");529}530});531this.conn.io.on("error", (...args) => {532logger.debug(533`Conat: Error connecting to ${this.options.address} -- `,534...args,535);536});537this.conn.on("disconnect", async () => {538if (this.isClosed()) {539return;540}541this.stats.recv0 = { messages: 0, bytes: 0 }; // reset on disconnect542this.setState("disconnected");543this.disconnectAllSockets();544});545this.conn.io.connect();546this.initInbox();547this.statsLoop();548}549550cluster = async () => {551return await this.conn.timeout(10000).emitWithAck("cluster");552};553554disconnect = () => {555if (this.isClosed()) {556return;557}558this.disconnectAllSockets();559// @ts-ignore560setTimeout(() => this.conn.io.disconnect(), 1);561};562563// this has NO timeout by default564waitUntilSignedIn = reuseInFlight(565async ({ timeout }: { timeout?: number } = {}) => {566// not "signed in" if --567// - not connected, or568// - no info at all (which gets sent on sign in)569// - or the user is {error:....}, which is what happens when sign in fails570// e.g., do to an expired cookie571if (572this.info == null ||573this.state != "connected" ||574this.info?.user?.error575) {576await once(this, "info", timeout);577}578if (579this.info == null ||580this.state != "connected" ||581this.info?.user?.error582) {583throw Error("failed to sign in");584}585},586);587588private statsLoop = async () => {589await until(590async () => {591if (this.isClosed()) {592return true;593}594try {595await this.waitUntilConnected();596if (this.isClosed()) {597return true;598}599this.conn.emit("stats", { recv0: this.stats.recv0 });600} catch {}601return false;602},603{ start: STATS_LOOP, max: STATS_LOOP },604);605};606607interest = async (subject: string): Promise<boolean> => {608return await this.waitForInterest(subject, { timeout: 0 });609};610611waitForInterest = async (612subject: string,613{614timeout = MAX_INTEREST_TIMEOUT,615}: {616timeout?: number;617} = {},618) => {619if (!isValidSubjectWithoutWildcards(subject)) {620throw Error(621`subject ${subject} must be a valid subject without wildcards`,622);623}624timeout = Math.min(timeout, MAX_INTEREST_TIMEOUT);625try {626const response = await this.conn627.timeout(timeout ? timeout : 10000)628.emitWithAck("wait-for-interest", { subject, timeout });629return response;630} catch (err) {631throw toConatError(err);632}633};634635recvStats = (bytes: number) => {636this.stats.recv.messages += 1;637this.stats.recv.bytes += bytes;638this.stats.recv0.messages += 1;639this.stats.recv0.bytes += bytes;640};641642// There should usually be no reason to call this because socket.io643// is so good at abstracting this away. It's useful for unit testing.644waitUntilConnected = reuseInFlight(async () => {645if (this.conn.connected) {646return;647}648// @ts-ignore649await once(this.conn, "connect");650});651652waitUntilReady = reuseInFlight(async () => {653await this.waitUntilSignedIn();654await this.waitUntilConnected();655});656657private setState = (state: State) => {658if (this.isClosed() || this.state == state) {659return;660}661this.state = state;662this.emit(state);663};664665private temporaryInboxSubject = () => {666if (!this.inboxSubject) {667throw Error("inbox not setup properly");668}669return `${this.inboxSubject}.${randomId()}`;670};671672private getInbox = reuseInFlight(async (): Promise<EventEmitter> => {673if (this.inbox == null) {674if (this.isClosed()) {675throw Error("closed");676}677await once(this, "inbox");678}679if (this.inbox == null) {680throw Error("bug");681}682return this.inbox;683});684685private initInbox = async () => {686// For request/respond instead of setting up one687// inbox *every time there is a request*, we setup a single688// inbox once and for all for all responses. We listen for689// everything to inbox...Prefix.* and emit it via this.inbox.690// The request sender then listens on this.inbox for the response.691// We *could* use a regular subscription for each request,692// but (1) that massively increases the load on the server for693// every single request (having to create and destroy subscriptions)694// and (2) there is a race condition between creating that subscription695// and getting the response; it's fine with one server, but with696// multiple servers solving the race condition would slow everything down697// due to having to wait for so many acknowledgements. Instead, we698// remove all those problems by just using a single inbox subscription.699const inboxPrefix = this.options.inboxPrefix ?? INBOX_PREFIX;700if (!inboxPrefix.startsWith(INBOX_PREFIX)) {701throw Error(`custom inboxPrefix must start with '${INBOX_PREFIX}'`);702}703this.inboxSubject = `${inboxPrefix}.${randomId()}`;704let sub;705await until(706async () => {707try {708await this.waitUntilSignedIn();709sub = await this.subscribe(this.inboxSubject + ".*");710return true;711} catch (err) {712if (this.isClosed()) {713return true;714}715// this should only fail due to permissions issues, at which point716// request can't work, but pub/sub can.717if (!process.env.COCALC_TEST_MODE) {718console.log(`WARNING: inbox not available -- ${err}`);719}720}721return false;722},723{ start: 1000, max: 15000 },724);725if (this.isClosed()) {726return;727}728729this.inbox = new EventEmitter();730(async () => {731for await (const mesg of sub) {732if (this.inbox == null) {733return;734}735this.inbox.emit(mesg.subject, mesg);736}737})();738this.emit("inbox", this.inboxSubject);739};740741private isClosed = () => {742return this.state == "closed";743};744745close = () => {746if (this.isClosed()) {747return;748}749this.setState("closed");750this.removeAllListeners();751this.closeAllSockets();752// @ts-ignore753delete this.sockets;754for (const subject in this.queueGroups) {755this.conn.emit("unsubscribe", { subject });756delete this.queueGroups[subject];757}758for (const sub of Object.values(this.subs)) {759sub.refCount = 0;760sub.close();761// @ts-ignore762delete this.subs;763}764// @ts-ignore765delete this.queueGroups;766// @ts-ignore767delete this.inboxSubject;768delete this.inbox;769// @ts-ignore770delete this.options;771// @ts-ignore772delete this.info;773// @ts-ignore774delete this.permissionError;775776try {777this.conn.close();778} catch {}779};780781private syncSubscriptions = reuseInFlight(async () => {782let fails = 0;783await until(784async () => {785if (this.isClosed()) return true;786try {787if (this.info == null) {788// no point in trying until we are signed in and connected789await once(this, "info");790}791if (this.isClosed()) return true;792await this.waitUntilConnected();793if (this.isClosed()) return true;794const stable = await this.syncSubscriptions0(10000);795if (stable) {796return true;797}798} catch (err) {799fails++;800if (fails >= 3) {801console.log(802`WARNING: failed to sync subscriptions ${fails} times -- ${err}`,803);804}805}806return false;807},808{ start: 1000, max: 15000 },809);810});811812// syncSubscriptions0 ensures that we're subscribed on server813// to what we think we're subscribed to, or throws an error.814private syncSubscriptions0 = async (timeout: number): Promise<boolean> => {815if (this.isClosed()) return true;816if (this.info == null) {817throw Error("not signed in");818}819const subs = await this.getSubscriptions(timeout);820// console.log("syncSubscriptions", {821// server: subs,822// client: Object.keys(this.queueGroups),823// });824const missing: { subject: string; queue: string }[] = [];825for (const subject in this.queueGroups) {826// subscribe on backend to all subscriptions we think we should have that827// the server does not have828if (!subs.has(subject)) {829missing.push({830subject,831queue: this.queueGroups[subject],832});833}834}835let stable = true;836if (missing.length > 0) {837stable = false;838const resp = await this.conn839.timeout(timeout)840.emitWithAck("subscribe", missing);841// some subscription could fail due to permissions changes, e.g., user got842// removed from a project.843for (let i = 0; i < missing.length; i++) {844if (resp[i].error) {845const sub = this.subs[missing[i].subject];846if (sub != null) {847sub.close(true);848}849}850}851}852const extra: { subject: string }[] = [];853for (const subject in subs) {854if (this.queueGroups[subject] != null) {855// server thinks we're subscribed but we do not think so, so cancel that856extra.push({ subject });857}858}859if (extra.length > 0) {860await this.conn.timeout(timeout).emitWithAck("unsubscribe", extra);861stable = false;862}863return stable;864};865866numSubscriptions = () => Object.keys(this.queueGroups).length;867868private getSubscriptions = async (869timeout = DEFAULT_REQUEST_TIMEOUT,870): Promise<Set<string>> => {871const subs = await this.conn872.timeout(timeout)873.emitWithAck("subscriptions", null);874return new Set(subs);875};876877// returns EventEmitter that emits 'message', mesg: Message878private subscriptionEmitter = (879subject: string,880{881closeWhenOffCalled,882queue,883sticky,884confirm,885timeout,886}: {887// if true, when the off method of the event emitter is called, then888// the entire subscription is closed. This is very useful when we wrap the889// EvenEmitter in an async iterator.890closeWhenOffCalled?: boolean;891892// the queue group -- if not given, then one is randomly assigned.893queue?: string;894895// if true, sets queue to "sticky"896sticky?: boolean;897898// confirm -- get confirmation back from server that subscription was created899confirm?: boolean;900901// how long to wait to confirm creation of the subscription;902// only explicitly *used* when confirm=true, but always must be set.903timeout?: number;904} = {},905): { sub: SubscriptionEmitter; promise? } => {906// Having timeout set at all is absolutely critical because if the connection907// goes down while making the subscription, having some timeout causes908// socketio to throw an error, which avoids a huge potential subscription909// leak. We set this by default to DEFAULT_SUBSCRIPTION_TIMEOUT.910if (!timeout) {911timeout = DEFAULT_SUBSCRIPTION_TIMEOUT;912}913if (this.isClosed()) {914throw Error("closed");915}916if (!isValidSubject(subject)) {917throw Error(`invalid subscribe subject '${subject}'`);918}919if (this.permissionError.sub.has(subject)) {920const message = this.permissionError.sub.get(subject)!;921logger.debug(message);922throw new ConatError(message, { code: 403 });923}924if (sticky) {925if (queue) {926throw Error("must not specify queue group if sticky is true");927}928queue = STICKY_QUEUE_GROUP;929}930let sub = this.subs[subject];931if (sub != null) {932if (queue && this.queueGroups[subject] != queue) {933throw Error(934`client can only have one queue group subscription for a given subject -- subject='${subject}', queue='${queue}'`,935);936}937if (queue == STICKY_QUEUE_GROUP) {938throw Error(939`can only have one sticky subscription with given subject pattern per client -- subject='${subject}'`,940);941}942sub.refCount += 1;943return { sub, promise: undefined };944}945if (this.queueGroups[subject] != null) {946throw Error(`already subscribed to '${subject}'`);947}948if (!queue) {949queue = randomId();950}951this.queueGroups[subject] = queue;952sub = new SubscriptionEmitter({953client: this,954subject,955closeWhenOffCalled,956});957this.subs[subject] = sub;958this.stats.subs++;959let promise;960if (confirm) {961const f = async () => {962let response;963try {964if (timeout) {965response = await this.conn966.timeout(timeout)967.emitWithAck("subscribe", { subject, queue });968} else {969// this should never be used -- see above970response = await this.conn.emitWithAck("subscribe", {971subject,972queue,973});974}975} catch (err) {976throw toConatError(err);977}978if (response?.error) {979throw new ConatError(response.error, { code: response.code });980}981return response;982};983promise = f();984} else {985this.conn.emit("subscribe", { subject, queue });986promise = undefined;987}988sub.once("closed", () => {989if (this.isClosed()) {990return;991}992this.conn.emit("unsubscribe", { subject });993delete this.queueGroups[subject];994if (this.subs[subject] != null) {995this.stats.subs--;996delete this.subs[subject];997}998});999return { sub, promise };1000};10011002private subscriptionIterator = (1003sub,1004opts?: SubscriptionOptions,1005): Subscription => {1006// @ts-ignore1007const iter = new EventIterator<Message>(sub, "message", {1008idle: opts?.maxWait,1009limit: opts?.mesgLimit,1010map: (args) => args[0],1011});1012return iter;1013};10141015subscribeSync = (1016subject: string,1017opts?: SubscriptionOptions,1018): Subscription => {1019const { sub } = this.subscriptionEmitter(subject, {1020confirm: false,1021closeWhenOffCalled: true,1022sticky: opts?.sticky,1023queue: opts?.queue,1024});1025return this.subscriptionIterator(sub, opts);1026};10271028subscribe = async (1029subject: string,1030opts?: SubscriptionOptions,1031): Promise<Subscription> => {1032await this.waitUntilSignedIn();1033const { sub, promise } = this.subscriptionEmitter(subject, {1034confirm: true,1035closeWhenOffCalled: true,1036queue: opts?.queue,1037sticky: opts?.sticky,1038timeout: opts?.timeout,1039});1040try {1041await promise;1042} catch (err) {1043sub.close();1044throw err;1045}1046return this.subscriptionIterator(sub, opts);1047};10481049sub = this.subscribe;10501051/*1052A service is a subscription with a function to respond to requests by name.1053Call service with an implementation:10541055service = await client1.service('arith', {mul : async (a,b)=>{a*b}, add : async (a,b)=>a+b})10561057Use the service:10581059arith = await client2.call('arith')1060await arith.mul(2,3)1061await arith.add(2,3)10621063There's by default a single queue group '0', so if you create multiple services on various1064computers, then requests are load balanced across them automatically. Explicitly set1065a random queue group (or something else) and use callMany if you don't want this behavior.10661067Close the service when done:10681069service.close();10701071See backend/conat/test/core/services.test.ts for a tested and working example1072that involves typescript and shows how to use wildcard subjects and get the1073specific subject used for a call by using that this is bound to the calling mesg.1074*/1075service: <T = any>(1076subject: string,1077impl: T,1078opts?: SubscriptionOptions,1079) => Promise<Subscription> = async (subject, impl, opts) => {1080const sub = await this.subscribe(subject, {1081...opts,1082queue: opts?.queue ?? "0",1083});1084const respond = async (mesg: Message) => {1085try {1086const [name, args] = mesg.data;1087// call impl[name], but with 'this' set to the object {subject:...},1088// so inside the service, it is possible to know what subject was used1089// in the request, in case subject is a wildcard subject.1090// const result = await impl[name].apply(1091// { subject: mesg.subject },1092// ...args,1093// );1094// const result = await impl[name].apply(1095// { subject: mesg.subject },1096// ...args,1097// );1098// mesg.respondSync(result);1099let f = impl[name];1100if (f == null) {1101throw Error(`${name} not defined`);1102}1103const result = await f.apply(mesg, args);1104// use await mesg.respond so waitForInterest is on, which is almost always1105// good for services.1106await mesg.respond(result);1107} catch (err) {1108await mesg.respond(null, {1109noThrow: true, // we're not catching this one1110headers: { error: `${err}` },1111});1112}1113};1114const loop = async () => {1115// todo -- param to set max number of responses at once.1116for await (const mesg of sub) {1117respond(mesg);1118}1119};1120loop();1121return sub;1122};11231124// Call a service as defined above.1125call<T = any>(subject: string, opts?: PublishOptions): T {1126const call = async (name: string, args: any[]) => {1127const resp = await this.request(subject, [name, args], opts);1128if (resp.headers?.error) {1129throw Error(`${resp.headers.error}`);1130} else {1131return resp.data;1132}1133};11341135return new Proxy(1136{},1137{1138get: (_, name) => {1139if (typeof name !== "string") {1140return undefined;1141}1142return async (...args) => await call(name, args);1143},1144},1145) as T;1146}11471148callMany<T = any>(subject: string, opts?: RequestManyOptions): T {1149const maxWait = opts?.maxWait ? opts?.maxWait : DEFAULT_REQUEST_TIMEOUT;1150const self = this;1151async function* callMany(name: string, args: any[]) {1152const sub = await self.requestMany(subject, [name, args], {1153...opts,1154maxWait,1155});1156for await (const resp of sub) {1157if (resp.headers?.error) {1158yield new ConatError(`${resp.headers.error}`, {1159code: resp.headers.code,1160});1161} else {1162yield resp.data;1163}1164}1165}11661167return new Proxy(1168{},1169{1170get: (_, name) => {1171if (typeof name !== "string") {1172return undefined;1173}1174return async (...args) => await callMany(name, args);1175},1176},1177) as T;1178}11791180publishSync = (1181subject: string,1182mesg,1183opts?: PublishOptions,1184): { bytes: number } => {1185if (this.isClosed()) {1186// already closed1187return { bytes: 0 };1188}1189// must NOT confirm1190return this._publish(subject, mesg, { ...opts, confirm: false });1191};11921193publish = async (1194subject: string,1195mesg,1196opts: PublishOptions = {},1197): Promise<{1198// bytes encoded (doesn't count some extra wrapping)1199bytes: number;1200// count is the number of matching subscriptions1201// that the server *sent* this message to since the server knows about them.1202// However, there's no guaranteee that the subscribers actually exist1203// **right now** or received these messages.1204count: number;1205}> => {1206if (this.isClosed()) {1207// already closed1208return { bytes: 0, count: 0 };1209}1210await this.waitUntilSignedIn();1211const start = Date.now();1212const { bytes, getCount, promise } = this._publish(subject, mesg, {1213...opts,1214confirm: true,1215});1216await promise;1217let count = getCount?.()!;12181219if (1220opts.waitForInterest &&1221count != null &&1222count == 0 &&1223!this.isClosed() &&1224(opts.timeout == null || Date.now() - start <= opts.timeout)1225) {1226let timeout = opts.timeout ?? DEFAULT_WAIT_FOR_INTEREST_TIMEOUT;1227await this.waitForInterest(subject, {1228timeout: timeout ? timeout - (Date.now() - start) : undefined,1229});1230if (this.isClosed()) {1231return { bytes, count };1232}1233const elapsed = Date.now() - start;1234timeout -= elapsed;1235// client and there is interest1236if (timeout <= 500) {1237// but... not enough time left to try again even if there is interest,1238// i.e., will fail anyways due to network latency1239return { bytes, count };1240}1241const { getCount, promise } = this._publish(subject, mesg, {1242...opts,1243timeout,1244confirm: true,1245});1246await promise;1247count = getCount?.()!;1248}12491250return { bytes, count };1251};12521253private _publish = (1254subject: string,1255mesg,1256{1257headers,1258raw,1259encoding = DEFAULT_ENCODING,1260confirm,1261timeout = DEFAULT_PUBLISH_TIMEOUT,1262noThrow,1263}: PublishOptions & { confirm?: boolean } = {},1264) => {1265if (this.isClosed()) {1266return { bytes: 0 };1267}1268if (!isValidSubjectWithoutWildcards(subject)) {1269throw Error(`invalid publish subject ${subject}`);1270}1271if (this.permissionError.pub.has(subject)) {1272const message = this.permissionError.pub.get(subject)!;1273logger.debug(message);1274throw new ConatError(message, { code: 403 });1275}1276raw = raw ?? encode({ encoding, mesg });1277this.stats.send.messages += 1;1278this.stats.send.bytes += raw.length;12791280// default to 1MB is safe since it's at least that big.1281const chunkSize = Math.max(12821000,1283(this.info?.max_payload ?? 1e6) - MAX_HEADER_SIZE,1284);1285let seq = 0;1286let id = randomId();1287const promises: any[] = [];1288let count = 0;1289for (let i = 0; i < raw.length; i += chunkSize) {1290// !!FOR TESTING ONLY!!1291// if (Math.random() <= 0.01) {1292// console.log("simulating a chunk drop", { subject, seq });1293// seq += 1;1294// continue;1295// }1296const done = i + chunkSize >= raw.length ? 1 : 0;1297const v: any[] = [1298subject,1299id,1300seq,1301done,1302encoding,1303raw.slice(i, i + chunkSize),1304// position v[6] is used for clusters1305];1306if (done && headers) {1307v.push(headers);1308}1309if (confirm) {1310const f = async () => {1311if (timeout) {1312try {1313const response = await this.conn1314.timeout(timeout)1315.emitWithAck("publish", v);1316if (response?.error) {1317throw new ConatError(response.error, { code: response.code });1318} else {1319return response;1320}1321} catch (err) {1322throw toConatError(err);1323}1324} else {1325return await this.conn.emitWithAck("publish", v);1326}1327};1328const promise = (async () => {1329try {1330const response = await f();1331count = Math.max(count, response.count ?? 0);1332} catch (err) {1333if (!noThrow) {1334throw err;1335}1336}1337})();1338promises.push(promise);1339} else {1340this.conn.emit("publish", v);1341}1342seq += 1;1343}1344if (confirm) {1345return {1346bytes: raw.length,1347getCount: () => count,1348promise: Promise.all(promises),1349};1350}1351return { bytes: raw.length };1352};13531354pub = this.publish;13551356request = async (1357subject: string,1358mesg: any,1359{ timeout = DEFAULT_REQUEST_TIMEOUT, ...options }: PublishOptions = {},1360): Promise<Message> => {1361if (timeout <= 0) {1362throw Error("timeout must be positive");1363}1364const inbox = await this.getInbox();1365const inboxSubject = this.temporaryInboxSubject();1366const sub = new EventIterator<Message>(inbox, inboxSubject, {1367idle: timeout,1368limit: 1,1369map: (args) => args[0],1370});13711372const opts = {1373...options,1374timeout,1375headers: { ...options?.headers, [REPLY_HEADER]: inboxSubject },1376};1377const { count } = await this.publish(subject, mesg, opts);1378if (!count) {1379sub.stop();1380// if you hit this, consider using the option waitForInterest:true1381throw new ConatError(`request -- no subscribers matching '${subject}'`, {1382code: 503,1383});1384}1385for await (const resp of sub) {1386sub.stop();1387return resp;1388}1389sub.stop();1390throw new ConatError("timeout", { code: 408 });1391};13921393// NOTE: Using requestMany returns a Subscription sub, and1394// you can call sub.close(). However, the sender doesn't1395// know that this happened and the messages are still going1396// to your inbox. Similarly if you set a maxWait, the1397// subscription just ends at that point, but the server1398// sending messages doesn't know. This is a shortcoming the1399// pub/sub model. You must decide entirely based on your1400// own application protocol how to terminate.1401requestMany = async (1402subject: string,1403mesg: any,1404{ maxMessages, maxWait, ...options }: RequestManyOptions = {},1405): Promise<Subscription> => {1406if (maxMessages != null && maxMessages <= 0) {1407throw Error("maxMessages must be positive");1408}1409if (maxWait != null && maxWait <= 0) {1410throw Error("maxWait must be positive");1411}1412const inbox = await this.getInbox();1413const inboxSubject = this.temporaryInboxSubject();1414const sub = new EventIterator<Message>(inbox, inboxSubject, {1415idle: maxWait,1416limit: maxMessages,1417map: (args) => args[0],1418});1419const { count } = await this.publish(subject, mesg, {1420...options,1421headers: { ...options?.headers, [REPLY_HEADER]: inboxSubject },1422});1423if (!count) {1424sub.stop();1425throw new ConatError(1426`requestMany -- no subscribers matching ${subject}`,1427{ code: 503 },1428);1429}1430return sub;1431};14321433// watch: this is mainly for debugging and interactive use.1434watch = (1435subject: string,1436cb = (x) => console.log(`${new Date()}: ${x.subject}:`, x.data, x.headers),1437opts?: SubscriptionOptions,1438) => {1439const sub = this.subscribeSync(subject, opts);1440const f = async () => {1441for await (const x of sub) {1442cb(x);1443}1444};1445f();1446return sub;1447};14481449sync = {1450dkv: async <T,>(opts: DKVOptions): Promise<DKV<T>> =>1451await dkv<T>({ ...opts, client: this }),1452akv: <T,>(opts: DKVOptions): AKV<T> => akv<T>({ ...opts, client: this }),1453dko: async <T,>(opts: DKVOptions): Promise<DKO<T>> =>1454await dko<T>({ ...opts, client: this }),1455dstream: async <T,>(opts: DStreamOptions): Promise<DStream<T>> =>1456await dstream<T>({ ...opts, client: this }),1457astream: <T,>(opts: DStreamOptions): AStream<T> =>1458astream<T>({ ...opts, client: this }),1459synctable: async (opts: SyncTableOptions): Promise<ConatSyncTable> =>1460await createSyncTable({ ...opts, client: this }),1461};14621463socket = {1464listen: (1465subject: string,1466opts?: SocketConfiguration,1467): ConatSocketServer => {1468if (this.state == "closed") {1469throw Error("closed");1470}1471if (this.sockets.servers[subject] !== undefined) {1472throw Error(1473`there can be at most one socket server per client listening on a subject (subject='${subject}')`,1474);1475}1476const server = new ConatSocketServer({1477subject,1478role: "server",1479client: this,1480id: this.id,1481...opts,1482});1483this.sockets.servers[subject] = server;1484server.once("closed", () => {1485delete this.sockets.servers[subject];1486});1487return server;1488},14891490connect: (1491subject: string,1492opts?: SocketConfiguration,1493): ConatSocketClient => {1494if (this.state == "closed") {1495throw Error("closed");1496}1497const id = randomId();1498const client = new ConatSocketClient({1499subject,1500role: "client",1501client: this,1502id,1503...opts,1504});1505if (this.sockets.clients[subject] === undefined) {1506this.sockets.clients[subject] = { [id]: client };1507} else {1508this.sockets.clients[subject][id] = client;1509}1510client.once("closed", () => {1511const v = this.sockets.clients[subject];1512if (v != null) {1513delete v[id];1514if (isEmpty(v)) {1515delete this.sockets.clients[subject];1516}1517}1518});1519return client;1520},1521};15221523private disconnectAllSockets = () => {1524if (this.state == "closed") {1525return;1526}1527for (const subject in this.sockets.servers) {1528this.sockets.servers[subject].disconnect();1529}1530for (const subject in this.sockets.clients) {1531for (const id in this.sockets.clients[subject]) {1532this.sockets.clients[subject][id].disconnect();1533}1534}1535};15361537private closeAllSockets = () => {1538for (const subject in this.sockets.servers) {1539this.sockets.servers[subject].close();1540}1541for (const subject in this.sockets.clients) {1542for (const id in this.sockets.clients[subject]) {1543this.sockets.clients[subject][id].close();1544}1545}1546};15471548message = (mesg, options?) => messageData(mesg, options);15491550bench = {1551publish: async (n: number = 1000, subject = "bench"): Promise<number> => {1552const t0 = Date.now();1553console.log(`publishing ${n} messages to`, { subject });1554for (let i = 0; i < n - 1; i++) {1555this.publishSync(subject, null);1556}1557// then send one final message and wait for an ack.1558// since messages are in order, we know that all other1559// messages were delivered to the server.1560const { count } = await this.publish(subject, null);1561console.log("listeners: ", count);1562const t1 = Date.now();1563const rate = Math.round((n / (t1 - t0)) * 1000);1564console.log(rate, "messages per second delivered");1565return rate;1566},15671568subscribe: async (n: number = 1000, subject = "bench"): Promise<number> => {1569const sub = await this.subscribe(subject);1570// send the data1571for (let i = 0; i < n; i++) {1572this.publishSync(subject, null);1573}1574const t0 = Date.now();1575let i = 0;1576for await (const _ of sub) {1577i += 1;1578if (i >= n) {1579break;1580}1581}1582const t1 = Date.now();1583return Math.round((n / (t1 - t0)) * 1000);1584},1585};1586}15871588interface PublishOptions {1589headers?: Headers;1590// if encoding is given, it specifies the encoding used to encode the message1591encoding?: DataEncoding;1592// if raw is given, then it is assumed to be the raw binary1593// encoded message (using encoding) and any mesg parameter1594// is *IGNORED*.1595raw?;15961597// timeout used when publishing a message and awaiting a response.1598timeout?: number;15991600// waitForInterest -- if publishing async so its possible to tell whether or not1601// there were any recipients, and there were NO recipients, it will wait until1602// there is a recipient and send again. This does NOT use polling, but instead1603// uses a cluster aware and fully event based primitive in the server.1604// There is thus only a speed penality doing this on failure and never1605// on success. Note that waitForInterest always has a timeout, defaulting1606// to DEFAULT_WAIT_FOR_INTEREST_TIMEOUT if above timeout not given.1607waitForInterest?: boolean;16081609// noThrow -- if set and publishing would throw an exception, it is1610// instead silently dropped and undefined is returned instead.1611// Use this where you might want to use publishSync, but still want1612// to ensure there is interest; however, it's not important to know1613// if there was an error sending.1614noThrow?: boolean;1615}16161617interface RequestManyOptions extends PublishOptions {1618maxWait?: number;1619maxMessages?: number;1620}16211622export function encode({1623encoding,1624mesg,1625}: {1626encoding: DataEncoding;1627mesg: any;1628}) {1629if (encoding == DataEncoding.MsgPack) {1630return msgpack.encode(mesg, MSGPACK_ENCODER_OPTIONS);1631} else if (encoding == DataEncoding.JsonCodec) {1632return jsonEncoder(mesg);1633} else {1634throw Error(`unknown encoding ${encoding}`);1635}1636}16371638export function decode({1639encoding,1640data,1641}: {1642encoding: DataEncoding;1643data;1644}): any {1645if (encoding == DataEncoding.MsgPack) {1646return msgpack.decode(data);1647} else if (encoding == DataEncoding.JsonCodec) {1648return jsonDecoder(data);1649} else {1650throw Error(`unknown encoding ${encoding}`);1651}1652}16531654let textEncoder: TextEncoder | undefined = undefined;1655let textDecoder: TextDecoder | undefined = undefined;16561657function jsonEncoder(obj: any) {1658if (textEncoder === undefined) {1659textEncoder = new TextEncoder();1660}1661return textEncoder.encode(JSON.stringify(obj));1662}16631664function jsonDecoder(data: Buffer): any {1665if (textDecoder === undefined) {1666textDecoder = new TextDecoder();1667}1668return JSON.parse(textDecoder.decode(data));1669}16701671interface Chunk {1672id: string;1673seq: number;1674done: number;1675buffer: Buffer;1676headers?: any;1677}16781679// if an incoming message has chunks at least this old1680// we give up on it and discard all of them. This avoids1681// memory leaks when a chunk is dropped.1682const MAX_CHUNK_TIME = 2 * 60000;16831684class SubscriptionEmitter extends EventEmitter {1685private incoming: { [id: string]: (Partial<Chunk> & { time: number })[] } =1686{};1687private client: Client;1688private closeWhenOffCalled?: boolean;1689private subject: string;1690public refCount: number = 1;16911692constructor({ client, subject, closeWhenOffCalled }) {1693super();1694this.client = client;1695this.subject = subject;1696this.client.conn.on(subject, this.handle);1697this.closeWhenOffCalled = closeWhenOffCalled;1698this.dropOldLoop();1699}17001701close = (force?) => {1702this.refCount -= 1;1703// console.log("SubscriptionEmitter.close - refCount =", this.refCount, this.subject);1704if (this.client == null || (!force && this.refCount > 0)) {1705return;1706}1707this.emit("closed");1708this.client.conn.removeListener(this.subject, this.handle);1709// @ts-ignore1710delete this.incoming;1711// @ts-ignore1712delete this.client;1713// @ts-ignore1714delete this.subject;1715// @ts-ignore1716delete this.closeWhenOffCalled;1717this.removeAllListeners();1718};17191720off(a, b) {1721super.off(a, b);1722if (this.closeWhenOffCalled) {1723this.close();1724}1725return this;1726}17271728private handle = ({ subject, data }) => {1729if (this.client == null) {1730return;1731}1732const [id, seq, done, encoding, buffer, headers] = data;1733// console.log({ id, seq, done, encoding, buffer, headers });1734const chunk = { seq, done, encoding, buffer, headers };1735const { incoming } = this;1736if (incoming[id] == null) {1737if (seq != 0) {1738// part of a dropped message -- by definition this should just1739// silently happen and be handled via application level encodings1740// elsewhere1741console.log(1742`WARNING: drop packet from ${this.subject} -- first message has wrong seq`,1743{ seq },1744);1745return;1746}1747incoming[id] = [];1748} else {1749const prev = incoming[id].slice(-1)[0].seq ?? -1;1750if (prev + 1 != seq) {1751console.log(1752`WARNING: drop packet from ${this.subject} -- seq number wrong`,1753{ prev, seq },1754);1755// part of message was dropped -- discard everything1756delete incoming[id];1757return;1758}1759}1760incoming[id].push({ ...chunk, time: Date.now() });1761if (chunk.done) {1762// console.log("assembling ", incoming[id].length, "chunks");1763const chunks = incoming[id].map((x) => x.buffer!);1764// TESTING ONLY!!1765// This is not necessary due to the above checks as messages arrive.1766// for (let i = 0; i < incoming[id].length; i++) {1767// if (incoming[id][i]?.seq != i) {1768// console.log(`WARNING: bug -- invalid chunk data! -- ${subject}`);1769// throw Error("bug -- invalid chunk data!");1770// }1771// }1772const raw = concatArrayBuffers(chunks);17731774// TESTING ONLY!!1775// try {1776// decode({ encoding, data: raw });1777// } catch (err) {1778// console.log(`ERROR - invalid data ${subject}`, incoming[id], err);1779// }17801781delete incoming[id];1782const mesg = new Message({1783encoding,1784raw,1785headers,1786client: this.client,1787subject,1788});1789this.emit("message", mesg);1790this.client.recvStats(raw.byteLength);1791}1792};17931794dropOldLoop = async () => {1795while (this.incoming != null) {1796const cutoff = Date.now() - MAX_CHUNK_TIME;1797for (const id in this.incoming) {1798const chunks = this.incoming[id];1799if (chunks.length > 0 && chunks[0].time <= cutoff) {1800console.log(1801`WARNING: drop partial message from ${this.subject} due to timeout`,1802);1803delete this.incoming[id];1804}1805}1806await delay(MAX_CHUNK_TIME / 2);1807}1808};1809}18101811function concatArrayBuffers(buffers) {1812if (buffers.length == 1) {1813return buffers[0];1814}1815if (Buffer.isBuffer(buffers[0])) {1816return Buffer.concat(buffers);1817}1818// browser fallback1819const totalLength = buffers.reduce((sum, buf) => sum + buf.byteLength, 0);1820const result = new Uint8Array(totalLength);1821let offset = 0;1822for (const buf of buffers) {1823result.set(new Uint8Array(buf), offset);1824offset += buf.byteLength;1825}18261827return result.buffer;1828}18291830export type Headers = { [key: string]: JSONValue };18311832export class MessageData<T = any> {1833public readonly encoding: DataEncoding;1834public readonly raw;1835public readonly headers?: Headers;18361837constructor({ encoding, raw, headers }) {1838this.encoding = encoding;1839this.raw = raw;1840this.headers = headers;1841}18421843get data(): T {1844return decode({ encoding: this.encoding, data: this.raw });1845}18461847get length(): number {1848// raw is binary data so it's the closest thing we have to the1849// size of this message. It would also make sense to include1850// the headers, but JSON'ing them would be expensive, so we don't.1851return this.raw.length;1852}1853}18541855export class Message<T = any> extends MessageData<T> {1856private client: Client;1857public readonly subject;18581859constructor({ encoding, raw, headers, client, subject }) {1860super({ encoding, raw, headers });1861this.client = client;1862this.subject = subject;1863}18641865isRequest = (): boolean => !!this.headers?.[REPLY_HEADER];18661867private respondSubject = () => {1868const subject = this.headers?.[REPLY_HEADER];1869if (!subject) {1870console.log(1871`WARNING: respond -- message to '${this.subject}' is not a request`,1872);1873return;1874}1875return `${subject}`;1876};18771878respondSync = (mesg, opts?: PublishOptions): { bytes: number } => {1879const subject = this.respondSubject();1880if (!subject) return { bytes: 0 };1881return this.client.publishSync(subject, mesg, opts);1882};18831884respond = async (1885mesg,1886opts: PublishOptions = {},1887): Promise<{ bytes: number; count: number }> => {1888const subject = this.respondSubject();1889if (!subject) {1890return { bytes: 0, count: 0 };1891}1892return await this.client.publish(subject, mesg, {1893// we *always* wait for interest for sync respond, since1894// it is by far the most likely situation where it wil be needed, due1895// to inboxes when users first sign in.1896waitForInterest: true,1897...opts,1898});1899};1900}19011902export function messageData(1903mesg,1904{ headers, raw, encoding = DEFAULT_ENCODING }: PublishOptions = {},1905) {1906return new MessageData({1907encoding,1908raw: raw ?? encode({ encoding, mesg }),1909headers,1910});1911}19121913export type Subscription = EventIterator<Message>;19141915export class ConatError extends Error {1916code: string | number;1917constructor(mesg: string, { code }) {1918super(mesg);1919this.code = code;1920}1921}19221923function isEmpty(obj: object): boolean {1924for (const _x in obj) {1925return false;1926}1927return true;1928}19291930function toConatError(socketIoError) {1931// only errors are "disconnected" and a timeout1932const e = `${socketIoError}`;1933if (e.includes("disconnected")) {1934return e;1935} else {1936return new ConatError(`timeout - ${e}`, {1937code: 408,1938});1939}1940}194119421943