import {
connect as connectToSocketIO,
type SocketOptions,
type ManagerOptions,
} from "socket.io-client";
import { EventIterator } from "@cocalc/util/event-iterator";
import type { ConnectionStats, ServerInfo } from "./types";
import * as msgpack from "@msgpack/msgpack";
import { randomId } from "@cocalc/conat/names";
import type { JSONValue } from "@cocalc/util/types";
import { EventEmitter } from "events";
import {
isValidSubject,
isValidSubjectWithoutWildcards,
} from "@cocalc/conat/util";
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
import { once, until } from "@cocalc/util/async-utils";
import { delay } from "awaiting";
import { getLogger } from "@cocalc/conat/client";
import { refCacheSync } from "@cocalc/util/refcache";
import { join } from "path";
import { dko, type DKO } from "@cocalc/conat/sync/dko";
import { dkv, type DKVOptions, type DKV } from "@cocalc/conat/sync/dkv";
import {
dstream,
type DStreamOptions,
type DStream,
} from "@cocalc/conat/sync/dstream";
import { akv, type AKV } from "@cocalc/conat/sync/akv";
import { astream, type AStream } from "@cocalc/conat/sync/astream";
import TTL from "@isaacs/ttlcache";
import {
ConatSocketServer,
ConatSocketClient,
ServerSocket,
type SocketConfiguration,
} from "@cocalc/conat/socket";
export { type ConatSocketServer, ConatSocketClient, ServerSocket };
import {
type SyncTableOptions,
type ConatSyncTable,
createSyncTable,
} from "@cocalc/conat/sync/synctable";
export const MAX_INTEREST_TIMEOUT = 90_000;
const DEFAULT_WAIT_FOR_INTEREST_TIMEOUT = 30_000;
const MSGPACK_ENCODER_OPTIONS = {
ignoreUndefined: true,
};
export const DEFAULT_SOCKETIO_CLIENT_OPTIONS = {
transports: ["websocket"],
rememberUpgrade: true,
rejectUnauthorized: false,
reconnection: true,
reconnectionDelay: process.env.COCALC_TEST_MODE ? 50 : 500,
reconnectionDelayMax: process.env.COCALC_TEST_MODE ? 500 : 15000,
reconnectionAttempts: 9999999999,
};
type State = "disconnected" | "connected" | "closed";
const logger = getLogger("core/client");
interface Options {
address?: string;
inboxPrefix?: string;
systemAccountPassword?: string;
}
export type ClientOptions = Options & {
noCache?: boolean;
} & Partial<SocketOptions> &
Partial<ManagerOptions>;
const INBOX_PREFIX = "_INBOX";
const REPLY_HEADER = "CN-Reply";
const MAX_HEADER_SIZE = 100000;
const STATS_LOOP = 5000;
export const DEFAULT_SUBSCRIPTION_TIMEOUT = 60_000;
export let DEFAULT_REQUEST_TIMEOUT = 30_000;
export let DEFAULT_PUBLISH_TIMEOUT = 30_000;
export function setDefaultTimeouts({
request = DEFAULT_REQUEST_TIMEOUT,
publish = DEFAULT_PUBLISH_TIMEOUT,
}: {
request?: number;
publish?: number;
}) {
DEFAULT_REQUEST_TIMEOUT = request;
DEFAULT_PUBLISH_TIMEOUT = publish;
}
export enum DataEncoding {
MsgPack = 0,
JsonCodec = 1,
}
interface SubscriptionOptions {
maxWait?: number;
mesgLimit?: number;
queue?: string;
respond?: Function;
timeout?: number;
}
const DEFAULT_ENCODING = DataEncoding.MsgPack;
function cocalcServerToSocketioAddress(url: string): {
address: string;
path: string;
} {
const u = new URL(url, "http://dummy.org");
const address = u.origin;
const path = join(u.pathname, "conat");
return { address, path };
}
const cache = refCacheSync<ClientOptions, Client>({
name: "conat-client",
createObject: (opts: ClientOptions) => {
return new Client(opts);
},
});
export function connect(opts: ClientOptions = {}) {
if (!opts.address) {
const x = cache.one();
if (x != null) {
return x;
}
}
return cache(opts);
}
export function getClient() {
return cache.one() ?? connect();
}
export class Client extends EventEmitter {
public conn: ReturnType<typeof connectToSocketIO>;
private queueGroups: { [subject: string]: string } = {};
private subs: { [subject: string]: SubscriptionEmitter } = {};
private sockets: {
servers: { [subject: string]: ConatSocketServer };
clients: { [subject: string]: { [id: string]: ConatSocketClient } };
} = { servers: {}, clients: {} };
public readonly options: ClientOptions;
private inboxSubject: string;
private inbox?: EventEmitter;
private permissionError = {
pub: new TTL<string, string>({ ttl: 1000 * 60 }),
sub: new TTL<string, string>({ ttl: 1000 * 60 }),
};
public info: ServerInfo | undefined = undefined;
public readonly stats: ConnectionStats & {
recv0: { messages: number; bytes: number };
} = {
send: { messages: 0, bytes: 0 },
recv: { messages: 0, bytes: 0 },
recv0: { messages: 0, bytes: 0 },
subs: 0,
};
public readonly id: string = randomId();
public state: State = "disconnected";
constructor(options: ClientOptions) {
super();
if (!options.address) {
if (!process.env.CONAT_SERVER) {
throw Error(
"Must specificy address or set CONAT_SERVER environment variable",
);
}
options = { ...options, address: process.env.CONAT_SERVER };
}
this.options = options;
this.setMaxListeners(1000);
const { address, path } = cocalcServerToSocketioAddress(
this.options.address!,
);
logger.debug(`Conat: Connecting to ${this.options.address}...`);
this.conn = connectToSocketIO(address, {
...DEFAULT_SOCKETIO_CLIENT_OPTIONS,
...options,
...(options.systemAccountPassword
? {
extraHeaders: {
...options.extraHeaders,
Cookie: `sys=${options.systemAccountPassword}`,
},
}
: undefined),
path,
reconnection: true,
});
this.conn.on("info", (info, ack) => {
if (typeof ack == "function") {
ack();
}
const firstTime = this.info == null;
this.info = info;
this.emit("info", info);
setTimeout(this.syncSubscriptions, firstTime ? 3000 : 0);
});
this.conn.on("permission", ({ message, type, subject }) => {
logger.debug(message);
this.permissionError[type]?.set(subject, message);
});
this.conn.on("connect", async () => {
logger.debug(`Conat: Connected to ${this.options.address}`);
if (this.conn.connected) {
this.setState("connected");
}
});
this.conn.io.on("error", (...args) => {
logger.debug(
`Conat: Error connecting to ${this.options.address} -- `,
...args,
);
});
this.conn.on("disconnect", async () => {
if (this.isClosed()) {
return;
}
this.stats.recv0 = { messages: 0, bytes: 0 };
this.setState("disconnected");
this.disconnectAllSockets();
});
this.conn.io.connect();
this.initInbox();
this.statsLoop();
}
cluster = async () => {
return await this.conn.timeout(10000).emitWithAck("cluster");
};
disconnect = () => {
if (this.isClosed()) {
return;
}
this.disconnectAllSockets();
setTimeout(() => this.conn.io.disconnect(), 1);
};
waitUntilSignedIn = reuseInFlight(
async ({ timeout }: { timeout?: number } = {}) => {
if (
this.info == null ||
this.state != "connected" ||
this.info?.user?.error
) {
await once(this, "info", timeout);
}
if (
this.info == null ||
this.state != "connected" ||
this.info?.user?.error
) {
throw Error("failed to sign in");
}
},
);
private statsLoop = async () => {
await until(
async () => {
if (this.isClosed()) {
return true;
}
try {
await this.waitUntilConnected();
if (this.isClosed()) {
return true;
}
this.conn.emit("stats", { recv0: this.stats.recv0 });
} catch {}
return false;
},
{ start: STATS_LOOP, max: STATS_LOOP },
);
};
interest = async (subject: string): Promise<boolean> => {
return await this.waitForInterest(subject, { timeout: 0 });
};
waitForInterest = async (
subject: string,
{
timeout = MAX_INTEREST_TIMEOUT,
}: {
timeout?: number;
} = {},
) => {
if (!isValidSubjectWithoutWildcards(subject)) {
throw Error(
`subject ${subject} must be a valid subject without wildcards`,
);
}
timeout = Math.min(timeout, MAX_INTEREST_TIMEOUT);
try {
const response = await this.conn
.timeout(timeout ? timeout : 10000)
.emitWithAck("wait-for-interest", { subject, timeout });
return response;
} catch (err) {
throw toConatError(err);
}
};
recvStats = (bytes: number) => {
this.stats.recv.messages += 1;
this.stats.recv.bytes += bytes;
this.stats.recv0.messages += 1;
this.stats.recv0.bytes += bytes;
};
waitUntilConnected = reuseInFlight(async () => {
if (this.conn.connected) {
return;
}
await once(this.conn, "connect");
});
waitUntilReady = reuseInFlight(async () => {
await this.waitUntilSignedIn();
await this.waitUntilConnected();
});
private setState = (state: State) => {
if (this.isClosed() || this.state == state) {
return;
}
this.state = state;
this.emit(state);
};
private temporaryInboxSubject = () => {
if (!this.inboxSubject) {
throw Error("inbox not setup properly");
}
return `${this.inboxSubject}.${randomId()}`;
};
private getInbox = reuseInFlight(async (): Promise<EventEmitter> => {
if (this.inbox == null) {
if (this.isClosed()) {
throw Error("closed");
}
await once(this, "inbox");
}
if (this.inbox == null) {
throw Error("bug");
}
return this.inbox;
});
private initInbox = async () => {
const inboxPrefix = this.options.inboxPrefix ?? INBOX_PREFIX;
if (!inboxPrefix.startsWith(INBOX_PREFIX)) {
throw Error(`custom inboxPrefix must start with '${INBOX_PREFIX}'`);
}
this.inboxSubject = `${inboxPrefix}.${randomId()}`;
let sub;
await until(
async () => {
try {
await this.waitUntilSignedIn();
sub = await this.subscribe(this.inboxSubject + ".*");
return true;
} catch (err) {
if (this.isClosed()) {
return true;
}
if (!process.env.COCALC_TEST_MODE) {
console.log(`WARNING: inbox not available -- ${err}`);
}
}
return false;
},
{ start: 1000, max: 15000 },
);
if (this.isClosed()) {
return;
}
this.inbox = new EventEmitter();
(async () => {
for await (const mesg of sub) {
if (this.inbox == null) {
return;
}
this.inbox.emit(mesg.subject, mesg);
}
})();
this.emit("inbox", this.inboxSubject);
};
private isClosed = () => {
return this.state == "closed";
};
close = () => {
if (this.isClosed()) {
return;
}
this.setState("closed");
this.removeAllListeners();
this.closeAllSockets();
delete this.sockets;
for (const subject in this.queueGroups) {
this.conn.emit("unsubscribe", { subject });
delete this.queueGroups[subject];
}
for (const sub of Object.values(this.subs)) {
sub.refCount = 0;
sub.close();
delete this.subs;
}
delete this.queueGroups;
delete this.inboxSubject;
delete this.inbox;
delete this.options;
delete this.info;
delete this.permissionError;
try {
this.conn.close();
} catch {}
};
private syncSubscriptions = reuseInFlight(async () => {
let fails = 0;
await until(
async () => {
if (this.isClosed()) return true;
try {
if (this.info == null) {
await once(this, "info");
}
if (this.isClosed()) return true;
await this.waitUntilConnected();
if (this.isClosed()) return true;
const stable = await this.syncSubscriptions0(10000);
if (stable) {
return true;
}
} catch (err) {
fails++;
if (fails >= 3) {
console.log(
`WARNING: failed to sync subscriptions ${fails} times -- ${err}`,
);
}
}
return false;
},
{ start: 1000, max: 15000 },
);
});
private syncSubscriptions0 = async (timeout: number): Promise<boolean> => {
if (this.isClosed()) return true;
if (this.info == null) {
throw Error("not signed in");
}
const subs = await this.getSubscriptions(timeout);
const missing: { subject: string; queue: string }[] = [];
for (const subject in this.queueGroups) {
if (!subs.has(subject)) {
missing.push({
subject,
queue: this.queueGroups[subject],
});
}
}
let stable = true;
if (missing.length > 0) {
stable = false;
const resp = await this.conn
.timeout(timeout)
.emitWithAck("subscribe", missing);
for (let i = 0; i < missing.length; i++) {
if (resp[i].error) {
const sub = this.subs[missing[i].subject];
if (sub != null) {
sub.close(true);
}
}
}
}
const extra: { subject: string }[] = [];
for (const subject in subs) {
if (this.queueGroups[subject] != null) {
extra.push({ subject });
}
}
if (extra.length > 0) {
await this.conn.timeout(timeout).emitWithAck("unsubscribe", extra);
stable = false;
}
return stable;
};
numSubscriptions = () => Object.keys(this.queueGroups).length;
private getSubscriptions = async (
timeout = DEFAULT_REQUEST_TIMEOUT,
): Promise<Set<string>> => {
const subs = await this.conn
.timeout(timeout)
.emitWithAck("subscriptions", null);
return new Set(subs);
};
private subscriptionEmitter = (
subject: string,
{
closeWhenOffCalled,
queue,
confirm,
timeout,
}: {
closeWhenOffCalled?: boolean;
queue?: string;
confirm?: boolean;
timeout?: number;
} = {},
): { sub: SubscriptionEmitter; promise? } => {
if (!timeout) {
timeout = DEFAULT_SUBSCRIPTION_TIMEOUT;
}
if (this.isClosed()) {
throw Error("closed");
}
if (!isValidSubject(subject)) {
throw Error(`invalid subscribe subject '${subject}'`);
}
if (this.permissionError.sub.has(subject)) {
const message = this.permissionError.sub.get(subject)!;
logger.debug(message);
throw new ConatError(message, { code: 403 });
}
let sub = this.subs[subject];
if (sub != null) {
if (queue && this.queueGroups[subject] != queue) {
throw Error(
`client can only have one queue group subscription for a given subject -- subject='${subject}', queue='${queue}'`,
);
}
sub.refCount += 1;
return { sub, promise: undefined };
}
if (this.queueGroups[subject] != null) {
throw Error(`already subscribed to '${subject}'`);
}
if (!queue) {
queue = randomId();
}
this.queueGroups[subject] = queue;
sub = new SubscriptionEmitter({
client: this,
subject,
closeWhenOffCalled,
});
this.subs[subject] = sub;
this.stats.subs++;
let promise;
if (confirm) {
const f = async () => {
let response;
try {
if (timeout) {
response = await this.conn
.timeout(timeout)
.emitWithAck("subscribe", { subject, queue });
} else {
response = await this.conn.emitWithAck("subscribe", {
subject,
queue,
});
}
} catch (err) {
throw toConatError(err);
}
if (response?.error) {
throw new ConatError(response.error, { code: response.code });
}
return response;
};
promise = f();
} else {
this.conn.emit("subscribe", { subject, queue });
promise = undefined;
}
sub.once("closed", () => {
if (this.isClosed()) {
return;
}
this.conn.emit("unsubscribe", { subject });
delete this.queueGroups[subject];
if (this.subs[subject] != null) {
this.stats.subs--;
delete this.subs[subject];
}
});
return { sub, promise };
};
private subscriptionIterator = (
sub,
opts?: SubscriptionOptions,
): Subscription => {
const iter = new EventIterator<Message>(sub, "message", {
idle: opts?.maxWait,
limit: opts?.mesgLimit,
map: (args) => args[0],
});
return iter;
};
subscribeSync = (
subject: string,
opts?: SubscriptionOptions,
): Subscription => {
const { sub } = this.subscriptionEmitter(subject, {
confirm: false,
closeWhenOffCalled: true,
queue: opts?.queue,
});
return this.subscriptionIterator(sub, opts);
};
subscribe = async (
subject: string,
opts?: SubscriptionOptions,
): Promise<Subscription> => {
await this.waitUntilSignedIn();
const { sub, promise } = this.subscriptionEmitter(subject, {
confirm: true,
closeWhenOffCalled: true,
queue: opts?.queue,
timeout: opts?.timeout,
});
try {
await promise;
} catch (err) {
sub.close();
throw err;
}
return this.subscriptionIterator(sub, opts);
};
sub = this.subscribe;
service: <T = any>(
subject: string,
impl: T,
opts?: SubscriptionOptions,
) => Promise<Subscription> = async (subject, impl, opts) => {
const sub = await this.subscribe(subject, {
...opts,
queue: opts?.queue ?? "0",
});
const respond = async (mesg: Message) => {
try {
const [name, args] = mesg.data;
let f = impl[name];
if (f == null) {
throw Error(`${name} not defined`);
}
const result = await f.apply(mesg, args);
await mesg.respond(result);
} catch (err) {
await mesg.respond(null, {
noThrow: true,
headers: { error: `${err}` },
});
}
};
const loop = async () => {
for await (const mesg of sub) {
respond(mesg);
}
};
loop();
return sub;
};
call<T = any>(subject: string, opts?: PublishOptions): T {
const call = async (name: string, args: any[]) => {
const resp = await this.request(subject, [name, args], opts);
if (resp.headers?.error) {
throw Error(`${resp.headers.error}`);
} else {
return resp.data;
}
};
return new Proxy(
{},
{
get: (_, name) => {
if (typeof name !== "string") {
return undefined;
}
return async (...args) => await call(name, args);
},
},
) as T;
}
callMany<T = any>(subject: string, opts?: RequestManyOptions): T {
const maxWait = opts?.maxWait ? opts?.maxWait : DEFAULT_REQUEST_TIMEOUT;
const self = this;
async function* callMany(name: string, args: any[]) {
const sub = await self.requestMany(subject, [name, args], {
...opts,
maxWait,
});
for await (const resp of sub) {
if (resp.headers?.error) {
yield new ConatError(`${resp.headers.error}`, {
code: resp.headers.code,
});
} else {
yield resp.data;
}
}
}
return new Proxy(
{},
{
get: (_, name) => {
if (typeof name !== "string") {
return undefined;
}
return async (...args) => await callMany(name, args);
},
},
) as T;
}
publishSync = (
subject: string,
mesg,
opts?: PublishOptions,
): { bytes: number } => {
if (this.isClosed()) {
return { bytes: 0 };
}
return this._publish(subject, mesg, { ...opts, confirm: false });
};
publish = async (
subject: string,
mesg,
opts: PublishOptions = {},
): Promise<{
bytes: number;
count: number;
}> => {
try {
if (this.isClosed()) {
return { bytes: 0, count: 0 };
}
await this.waitUntilSignedIn();
const start = Date.now();
const { bytes, getCount, promise } = this._publish(subject, mesg, {
...opts,
confirm: true,
});
await promise;
let count = getCount?.()!;
if (
opts.waitForInterest &&
count != null &&
count == 0 &&
!this.isClosed() &&
(opts.timeout == null || Date.now() - start <= opts.timeout)
) {
let timeout = opts.timeout ?? DEFAULT_WAIT_FOR_INTEREST_TIMEOUT;
await this.waitForInterest(subject, {
timeout: timeout ? timeout - (Date.now() - start) : undefined,
});
if (this.isClosed()) {
return { bytes, count };
}
const elapsed = Date.now() - start;
timeout -= elapsed;
if (timeout <= 500) {
return { bytes, count };
}
const { getCount, promise } = this._publish(subject, mesg, {
...opts,
timeout,
confirm: true,
});
await promise;
count = getCount?.()!;
}
return { bytes, count };
} catch (err) {
if (opts.noThrow) {
return { bytes: 0, count: 0 };
} else {
throw err;
}
}
};
private _publish = (
subject: string,
mesg,
{
headers,
raw,
encoding = DEFAULT_ENCODING,
confirm,
timeout = DEFAULT_PUBLISH_TIMEOUT,
noThrow,
}: PublishOptions & { confirm?: boolean } = {},
) => {
if (this.isClosed()) {
return { bytes: 0 };
}
if (!isValidSubjectWithoutWildcards(subject)) {
throw Error(`invalid publish subject ${subject}`);
}
if (this.permissionError.pub.has(subject)) {
const message = this.permissionError.pub.get(subject)!;
logger.debug(message);
throw new ConatError(message, { code: 403 });
}
raw = raw ?? encode({ encoding, mesg });
this.stats.send.messages += 1;
this.stats.send.bytes += raw.length;
const chunkSize = Math.max(
1000,
(this.info?.max_payload ?? 1e6) - MAX_HEADER_SIZE,
);
let seq = 0;
let id = randomId();
const promises: any[] = [];
let count = 0;
for (let i = 0; i < raw.length; i += chunkSize) {
const done = i + chunkSize >= raw.length ? 1 : 0;
const v: any[] = [
subject,
id,
seq,
done,
encoding,
raw.slice(i, i + chunkSize),
];
if (done && headers) {
v.push(headers);
}
if (confirm) {
const f = async () => {
if (timeout) {
try {
const response = await this.conn
.timeout(timeout)
.emitWithAck("publish", v);
if (response?.error) {
throw new ConatError(response.error, { code: response.code });
} else {
return response;
}
} catch (err) {
throw toConatError(err);
}
} else {
return await this.conn.emitWithAck("publish", v);
}
};
const promise = (async () => {
try {
const response = await f();
count = Math.max(count, response.count ?? 0);
} catch (err) {
if (!noThrow) {
throw err;
}
}
})();
promises.push(promise);
} else {
this.conn.emit("publish", v);
}
seq += 1;
}
if (confirm) {
return {
bytes: raw.length,
getCount: () => count,
promise: Promise.all(promises),
};
}
return { bytes: raw.length };
};
pub = this.publish;
request = async (
subject: string,
mesg: any,
{ timeout = DEFAULT_REQUEST_TIMEOUT, ...options }: PublishOptions = {},
): Promise<Message> => {
if (timeout <= 0) {
throw Error("timeout must be positive");
}
const inbox = await this.getInbox();
const inboxSubject = this.temporaryInboxSubject();
const sub = new EventIterator<Message>(inbox, inboxSubject, {
idle: timeout,
limit: 1,
map: (args) => args[0],
});
const opts = {
...options,
timeout,
headers: { ...options?.headers, [REPLY_HEADER]: inboxSubject },
};
const { count } = await this.publish(subject, mesg, opts);
if (!count) {
sub.stop();
throw new ConatError(`request -- no subscribers matching '${subject}'`, {
code: 503,
});
}
for await (const resp of sub) {
sub.stop();
return resp;
}
sub.stop();
throw new ConatError("timeout", { code: 408 });
};
requestMany = async (
subject: string,
mesg: any,
{ maxMessages, maxWait, ...options }: RequestManyOptions = {},
): Promise<Subscription> => {
if (maxMessages != null && maxMessages <= 0) {
throw Error("maxMessages must be positive");
}
if (maxWait != null && maxWait <= 0) {
throw Error("maxWait must be positive");
}
const inbox = await this.getInbox();
const inboxSubject = this.temporaryInboxSubject();
const sub = new EventIterator<Message>(inbox, inboxSubject, {
idle: maxWait,
limit: maxMessages,
map: (args) => args[0],
});
const { count } = await this.publish(subject, mesg, {
...options,
headers: { ...options?.headers, [REPLY_HEADER]: inboxSubject },
});
if (!count) {
sub.stop();
throw new ConatError(
`requestMany -- no subscribers matching ${subject}`,
{ code: 503 },
);
}
return sub;
};
watch = (
subject: string,
cb = (x) => console.log(`${new Date()}: ${x.subject}:`, x.data, x.headers),
opts?: SubscriptionOptions,
) => {
const sub = this.subscribeSync(subject, opts);
const f = async () => {
for await (const x of sub) {
cb(x);
}
};
f();
return sub;
};
sync = {
dkv: async <T,>(opts: DKVOptions): Promise<DKV<T>> =>
await dkv<T>({ ...opts, client: this }),
akv: <T,>(opts: DKVOptions): AKV<T> => akv<T>({ ...opts, client: this }),
dko: async <T,>(opts: DKVOptions): Promise<DKO<T>> =>
await dko<T>({ ...opts, client: this }),
dstream: async <T,>(opts: DStreamOptions): Promise<DStream<T>> =>
await dstream<T>({ ...opts, client: this }),
astream: <T,>(opts: DStreamOptions): AStream<T> =>
astream<T>({ ...opts, client: this }),
synctable: async (opts: SyncTableOptions): Promise<ConatSyncTable> =>
await createSyncTable({ ...opts, client: this }),
};
socket = {
listen: (
subject: string,
opts?: SocketConfiguration & { id?: string },
): ConatSocketServer => {
if (this.state == "closed") {
throw Error("closed");
}
if (this.sockets.servers[subject] !== undefined) {
throw Error(
`there can be at most one socket server per client listening on a subject (subject='${subject}')`,
);
}
const server = new ConatSocketServer({
subject,
role: "server",
client: this,
id: this.id,
...opts,
});
this.sockets.servers[subject] = server;
server.once("closed", () => {
delete this.sockets.servers[subject];
});
return server;
},
connect: (
subject: string,
opts?: SocketConfiguration & { id?: string },
): ConatSocketClient => {
if (this.state == "closed") {
throw Error("closed");
}
const id = opts?.id ?? randomId();
const client = new ConatSocketClient({
subject,
role: "client",
client: this,
id,
...opts,
});
if (this.sockets.clients[subject] === undefined) {
this.sockets.clients[subject] = { [id]: client };
} else {
this.sockets.clients[subject][id] = client;
}
client.once("closed", () => {
const v = this.sockets.clients[subject];
if (v != null) {
delete v[id];
if (isEmpty(v)) {
delete this.sockets.clients[subject];
}
}
});
return client;
},
};
private disconnectAllSockets = () => {
if (this.state == "closed") {
return;
}
for (const subject in this.sockets.servers) {
this.sockets.servers[subject].disconnect();
}
for (const subject in this.sockets.clients) {
for (const id in this.sockets.clients[subject]) {
this.sockets.clients[subject][id].disconnect();
}
}
};
private closeAllSockets = () => {
for (const subject in this.sockets.servers) {
this.sockets.servers[subject].close();
}
for (const subject in this.sockets.clients) {
for (const id in this.sockets.clients[subject]) {
this.sockets.clients[subject][id].close();
}
}
};
message = (mesg, options?) => messageData(mesg, options);
bench = {
publish: async (n: number = 1000, subject = "bench"): Promise<number> => {
const t0 = Date.now();
console.log(`publishing ${n} messages to`, { subject });
for (let i = 0; i < n - 1; i++) {
this.publishSync(subject, null);
}
const { count } = await this.publish(subject, null);
console.log("listeners: ", count);
const t1 = Date.now();
const rate = Math.round((n / (t1 - t0)) * 1000);
console.log(rate, "messages per second delivered");
return rate;
},
subscribe: async (n: number = 1000, subject = "bench"): Promise<number> => {
const sub = await this.subscribe(subject);
for (let i = 0; i < n; i++) {
this.publishSync(subject, null);
}
const t0 = Date.now();
let i = 0;
for await (const _ of sub) {
i += 1;
if (i >= n) {
break;
}
}
const t1 = Date.now();
return Math.round((n / (t1 - t0)) * 1000);
},
};
}
interface PublishOptions {
headers?: Headers;
encoding?: DataEncoding;
raw?;
timeout?: number;
waitForInterest?: boolean;
noThrow?: boolean;
}
interface RequestManyOptions extends PublishOptions {
maxWait?: number;
maxMessages?: number;
}
export function encode({
encoding,
mesg,
}: {
encoding: DataEncoding;
mesg: any;
}) {
if (encoding == DataEncoding.MsgPack) {
return msgpack.encode(mesg, MSGPACK_ENCODER_OPTIONS);
} else if (encoding == DataEncoding.JsonCodec) {
return jsonEncoder(mesg);
} else {
throw Error(`unknown encoding ${encoding}`);
}
}
export function decode({
encoding,
data,
}: {
encoding: DataEncoding;
data;
}): any {
if (encoding == DataEncoding.MsgPack) {
return msgpack.decode(data);
} else if (encoding == DataEncoding.JsonCodec) {
return jsonDecoder(data);
} else {
throw Error(`unknown encoding ${encoding}`);
}
}
let textEncoder: TextEncoder | undefined = undefined;
let textDecoder: TextDecoder | undefined = undefined;
function jsonEncoder(obj: any) {
if (textEncoder === undefined) {
textEncoder = new TextEncoder();
}
return textEncoder.encode(JSON.stringify(obj));
}
function jsonDecoder(data: Buffer): any {
if (textDecoder === undefined) {
textDecoder = new TextDecoder();
}
return JSON.parse(textDecoder.decode(data));
}
interface Chunk {
id: string;
seq: number;
done: number;
buffer: Buffer;
headers?: any;
}
const MAX_CHUNK_TIME = 2 * 60000;
class SubscriptionEmitter extends EventEmitter {
private incoming: { [id: string]: (Partial<Chunk> & { time: number })[] } =
{};
private client: Client;
private closeWhenOffCalled?: boolean;
private subject: string;
public refCount: number = 1;
constructor({ client, subject, closeWhenOffCalled }) {
super();
this.client = client;
this.subject = subject;
this.client.conn.on(subject, this.handle);
this.closeWhenOffCalled = closeWhenOffCalled;
this.dropOldLoop();
}
close = (force?) => {
this.refCount -= 1;
if (this.client == null || (!force && this.refCount > 0)) {
return;
}
this.emit("closed");
this.client.conn.removeListener(this.subject, this.handle);
delete this.incoming;
delete this.client;
delete this.subject;
delete this.closeWhenOffCalled;
this.removeAllListeners();
};
off(a, b) {
super.off(a, b);
if (this.closeWhenOffCalled) {
this.close();
}
return this;
}
private handle = ({ subject, data }) => {
if (this.client == null) {
return;
}
const [id, seq, done, encoding, buffer, headers] = data;
const chunk = { seq, done, encoding, buffer, headers };
const { incoming } = this;
if (incoming[id] == null) {
if (seq != 0) {
console.log(
`WARNING: drop packet from ${this.subject} -- first message has wrong seq`,
{ seq },
);
return;
}
incoming[id] = [];
} else {
const prev = incoming[id].slice(-1)[0].seq ?? -1;
if (prev + 1 != seq) {
console.log(
`WARNING: drop packet from ${this.subject} -- seq number wrong`,
{ prev, seq },
);
delete incoming[id];
return;
}
}
incoming[id].push({ ...chunk, time: Date.now() });
if (chunk.done) {
const chunks = incoming[id].map((x) => x.buffer!);
const raw = concatArrayBuffers(chunks);
delete incoming[id];
const mesg = new Message({
encoding,
raw,
headers,
client: this.client,
subject,
});
this.emit("message", mesg);
this.client.recvStats(raw.byteLength);
}
};
dropOldLoop = async () => {
while (this.incoming != null) {
const cutoff = Date.now() - MAX_CHUNK_TIME;
for (const id in this.incoming) {
const chunks = this.incoming[id];
if (chunks.length > 0 && chunks[0].time <= cutoff) {
console.log(
`WARNING: drop partial message from ${this.subject} due to timeout`,
);
delete this.incoming[id];
}
}
await delay(MAX_CHUNK_TIME / 2);
}
};
}
function concatArrayBuffers(buffers) {
if (buffers.length == 1) {
return buffers[0];
}
if (Buffer.isBuffer(buffers[0])) {
return Buffer.concat(buffers);
}
const totalLength = buffers.reduce((sum, buf) => sum + buf.byteLength, 0);
const result = new Uint8Array(totalLength);
let offset = 0;
for (const buf of buffers) {
result.set(new Uint8Array(buf), offset);
offset += buf.byteLength;
}
return result.buffer;
}
export type Headers = { [key: string]: JSONValue };
export class MessageData<T = any> {
public readonly encoding: DataEncoding;
public readonly raw;
public readonly headers?: Headers;
constructor({ encoding, raw, headers }) {
this.encoding = encoding;
this.raw = raw;
this.headers = headers;
}
get data(): T {
return decode({ encoding: this.encoding, data: this.raw });
}
get length(): number {
return this.raw.length;
}
}
export class Message<T = any> extends MessageData<T> {
private client: Client;
public readonly subject;
constructor({ encoding, raw, headers, client, subject }) {
super({ encoding, raw, headers });
this.client = client;
this.subject = subject;
}
isRequest = (): boolean => !!this.headers?.[REPLY_HEADER];
private respondSubject = () => {
const subject = this.headers?.[REPLY_HEADER];
if (!subject) {
console.log(
`WARNING: respond -- message to '${this.subject}' is not a request`,
);
return;
}
return `${subject}`;
};
respondSync = (mesg, opts?: PublishOptions): { bytes: number } => {
const subject = this.respondSubject();
if (!subject) return { bytes: 0 };
return this.client.publishSync(subject, mesg, opts);
};
respond = async (
mesg,
opts: PublishOptions = {},
): Promise<{ bytes: number; count: number }> => {
const subject = this.respondSubject();
if (!subject) {
return { bytes: 0, count: 0 };
}
return await this.client.publish(subject, mesg, {
waitForInterest: true,
...opts,
});
};
}
export function messageData(
mesg,
{ headers, raw, encoding = DEFAULT_ENCODING }: PublishOptions = {},
) {
return new MessageData({
encoding,
raw: raw ?? encode({ encoding, mesg }),
headers,
});
}
export type Subscription = EventIterator<Message>;
export class ConatError extends Error {
code: string | number;
constructor(mesg: string, { code }) {
super(mesg);
this.code = code;
}
}
function isEmpty(obj: object): boolean {
for (const _x in obj) {
return false;
}
return true;
}
function toConatError(socketIoError) {
const e = `${socketIoError}`;
if (e.includes("disconnected")) {
return e;
} else {
return new ConatError(`timeout - ${e}`, {
code: 408,
});
}
}