Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/core/client.ts
5793 views
1
/*
2
core/client.s -- core conat client
3
4
This is a client that has a similar API to NATS / Socket.io, but is much,
5
much better in so many ways:
6
7
- It has global pub/sub just like with NATS. This uses the server to
8
rebroadcast messages, and for authentication.
9
Better than NATS: Authentication is done for a subject *as
10
needed* instead of at connection time.
11
12
- Message can be arbitrarily large and they are *automatically* divided
13
into chunks and reassembled. Better than both NATS and socket.io.
14
15
- There are multiple supported ways of encoding messages, and
16
no coordination is required with the server or other clients! E.g.,
17
one message can be sent with one encoding and the next with a different
18
encoding and that's fine.
19
- MsgPack: https://msgpack.org/ -- a very compact encoding that handles
20
dates nicely and small numbers efficiently. This also works
21
well with binary Buffer objects, which is nice.
22
- JsonCodec: uses JSON.stringify and TextEncoder. This does not work
23
with Buffer or Date and is less compact, but can be very fast.
24
25
- One BIG DIFFERENCE from Nats is that when a message is sent the sender
26
can optionally find out how many clients received it. They can also wait
27
until there is interest and send the message again. This is automated so
28
it's very easy to use, and it makes writing distributed services without
29
race conditions making them broken temporarily much easier. HOWEVER,
30
there is one caveat -- if as an admin you create a "tap", i.e., you
31
subscribe to all messages matching some pattern just to see what's going
32
on, then currently that counts in the delivery count and interest, and that
33
would then cause these race conditions to happen again. E.g., a user
34
signs in, subscribes to their INBOX, sends a request, and gets a response
35
to that inbox, but does this all quickly, and in a cluster, the server doesn't
36
see the inbox yet, so it fails. As a workaround, subscriptions to the
37
subject pattern '>' are invisible, so you can always tap into '>' for debugging
38
purposes. TODO: implement a general way of making an invisible subscriber that
39
doesn't count.
40
41
42
THE CORE API
43
44
This section contains the crucial information you have to know to build a distributed
45
system using Conat. It's our take on the NATS primitives (it's not exactly the
46
same, but it is close). It's basically a symmetrical pub/sub/reqest/respond model
47
for messaging on which you can build distributed systems. The tricky part, which
48
NATS.js gets wrong (in my opinion), is implementing this in a way that is robust
49
and scalable, in terms for authentication, real world browser connectivity and
50
so on. Our approach is to use proven mature technology like socket.io and sqlite,
51
instead of writing everything from scratch.
52
53
Clients: We view all clients as plugged into a common "dial tone",
54
except for optional permissions that are configured when starting the server.
55
The methods you call on the client to build everything are:
56
57
- subscribe, subscribeSync - subscribe to a subject which returns an
58
async iterator over all messages that match the subject published by
59
anyone with permission to do so. If you provide the same optional
60
queue parameter for multiple subscribers, then one subscriber in each queue
61
group receives each message. The async form of this functino confirms
62
the subscription was created before returning. If a client creates multiple
63
subscriptions at the same time, the queue group must be the same.
64
Subscriptions are guaranteed to stay valid until the client ends them;
65
they do not stop working due to client or server reconnects or restarts.
66
(If you need more subscriptions with different queue groups, make another
67
client object.)
68
69
- publish, publishSync - publish to a subject. The async version returns
70
a count of the number of recipients, whereas the sync version is
71
fire-and-forget.
72
**There is no a priori size limit on messages since chunking
73
is automatic. However, we have to impose some limit, but
74
it can be much larger than the socketio message size limit.**
75
76
- request - send a message to a subject, and if there is at least one
77
subscriber listening, it may respond. If there are no subscribers,
78
it throws a 503 error. To create a microservice, subscribe
79
to a subject pattern and called mesg.respond(...) on each message you
80
receive.
81
82
- requestMany - send a message to a subject, and receive many
83
messages in reply. Typically you end the response stream by sending
84
a null message, but what you do is up to you. This is very useful
85
for streaming arbitrarily large data, long running changefeeds, LLM
86
responses, etc.
87
88
89
Messages: A message mesg is:
90
91
- Data:
92
- subject - the subject the message was sent to
93
- encoding - usually MessagePack
94
- raw - encoded binary data
95
- headers - a JSON-able Javascript object.
96
97
- Methods:
98
- data: this is a property, so if you do mesg.data, then it decodes raw
99
and returns the resulting Javascript object.
100
- respond, respondSync: if REPLY_HEADER is set, calling this publishes a
101
respond message to the original sender of the message.
102
103
104
Persistence:
105
106
We also implement persistent streams, where you can also set a key. This can
107
be used to build the analogue of Jetstream's streams and kv stores. The object
108
store isn't necessary since there is no limit on message size. Conat's persistent
109
streams are compressed by default and backed by individual sqlite files, which
110
makes them very memory efficient and it is easy to tier storage to cloud storage.
111
112
UNIT TESTS: See packages/server/conat/test/core
113
114
MISC NOTES:
115
116
NOTE: There is a socketio msgpack parser, but it just doesn't
117
work at all, which is weird. Also, I think it's impossible to
118
do the sort of chunking we want at the level of a socket.io
119
parser -- it's just not possible in that the encoding. We customize
120
things purely client side without using a parser, and get a much
121
simpler and better result, inspired by how NATS approaches things
122
with opaque messages.
123
124
125
SUBSCRIPTION ROBUSTNESS: When you call client.subscribe(...) you get back an async iterator.
126
It ONLY ends when you explicitly do the standard ways of terminating
127
such an iterator, including calling .close() on it. It is a MAJOR BUG
128
if it were to terminate for any other reason. In particular, the subscription
129
MUST NEVER throw an error or silently end when the connection is dropped
130
then resumed, or the server is restarted, or the client connects to
131
a different server! These situations can, of course, result in missing
132
some messages, but that's understood. There are no guarantees at all with
133
a subscription that every message is received. Finally, any time a client
134
disconnects and reconnects, the client ensures that all subscriptions
135
exist for it on the server via a sync process.
136
137
Subscription robustness is a major difference with NATS.js, which would
138
mysteriously terminate subscriptions for a variety of reasons, meaning that any
139
code using subscriptions had to be wrapped in ugly complexity to be
140
usable in production.
141
142
INTEREST AWARENESS: In Conat there is a cluster-aware event driving way to
143
wait for interest in a subject. This is an extremely useful extension to
144
NATS functionality, since it makes it much easier to dynamically setup
145
a client and a server and exchange messages without having to poll and fail
146
potentially a few times. This makes certain operations involving complicated
147
steps behind the scenes -- upload a file, open a file to edit with sync, etc. --
148
feel more responsive.
149
150
USAGE:
151
152
The following should mostly work to interactively play around with this
153
code and develop it. It's NOT automatically tested and depends on your
154
environment though, so may break. See the unit tests in
155
156
packages/server/conat/test/core/
157
158
for something that definitely works perfectly.
159
160
161
For developing at the command line, cd to packages/backend, then in node:
162
163
c = require('@cocalc/backend/conat/conat').connect()
164
165
or
166
167
c = require('@cocalc/conat/core/client').connect('http://localhost:3000')
168
169
c.watch('a')
170
171
s = await c.subscribe('a'); for await (const x of s) { console.log(x.length)}
172
173
// in another console
174
175
c = require('@cocalc/backend/conat/conat').connect()
176
c.publish('a', 'hello there')
177
178
// in browser (right now)
179
180
cc.conat.conat()
181
182
// client server:
183
184
s = await c.subscribe('eval'); for await(const x of s) { x.respond(eval(x.data)) }
185
186
then in another console
187
188
f = async () => (await c.request('eval', '2+3')).data
189
await f()
190
191
t = Date.now(); for(i=0;i<1000;i++) { await f()} ; Date.now()-t
192
193
// slower, but won't silently fail due to errors, etc.
194
195
f2 = async () => (await c.request('eval', '2+3', {confirm:true})).data
196
197
Wildcard subject:
198
199
200
c = require('@cocalc/conat/core/client').connect(); c.watch('a.*');
201
202
203
c = require('@cocalc/conat/core/client').connect(); c.publish('a.x', 'foo')
204
205
206
Testing disconnect
207
208
c.sub('>')
209
c.conn.io.engine.close();0;
210
211
other:
212
213
a=0; setInterval(()=>c.pub('a',a++), 250)
214
215
*/
216
217
import {
218
connect as connectToSocketIO,
219
type SocketOptions,
220
type ManagerOptions,
221
} from "socket.io-client";
222
import { EventIterator } from "@cocalc/util/event-iterator";
223
import type { ConnectionStats, ServerInfo } from "./types";
224
import * as msgpack from "@msgpack/msgpack";
225
import { randomId } from "@cocalc/conat/names";
226
import type { JSONValue } from "@cocalc/util/types";
227
import { EventEmitter } from "events";
228
import {
229
isValidSubject,
230
isValidSubjectWithoutWildcards,
231
} from "@cocalc/conat/util";
232
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
233
import { once, until } from "@cocalc/util/async-utils";
234
import { delay } from "awaiting";
235
import { getLogger } from "@cocalc/conat/client";
236
import { refCacheSync } from "@cocalc/util/refcache";
237
import { join } from "path";
238
import { dko, type DKO } from "@cocalc/conat/sync/dko";
239
import { dkv, type DKVOptions, type DKV } from "@cocalc/conat/sync/dkv";
240
import {
241
dstream,
242
type DStreamOptions,
243
type DStream,
244
} from "@cocalc/conat/sync/dstream";
245
import { akv, type AKV } from "@cocalc/conat/sync/akv";
246
import { astream, type AStream } from "@cocalc/conat/sync/astream";
247
import TTL from "@isaacs/ttlcache";
248
import {
249
ConatSocketServer,
250
ConatSocketClient,
251
ServerSocket,
252
type SocketConfiguration,
253
} from "@cocalc/conat/socket";
254
export { type ConatSocketServer, ConatSocketClient, ServerSocket };
255
import {
256
type SyncTableOptions,
257
type ConatSyncTable,
258
createSyncTable,
259
} from "@cocalc/conat/sync/synctable";
260
261
export const MAX_INTEREST_TIMEOUT = 90_000;
262
263
const DEFAULT_WAIT_FOR_INTEREST_TIMEOUT = 30_000;
264
265
const MSGPACK_ENCODER_OPTIONS = {
266
// ignoreUndefined is critical so database queries work properly, and
267
// also we have a lot of api calls with tons of wasted undefined values.
268
ignoreUndefined: true,
269
};
270
271
export const DEFAULT_SOCKETIO_CLIENT_OPTIONS = {
272
// A major problem if we allow long polling is that we must always use at most
273
// half the chunk size... because there is no way to know if recipients will be
274
// using long polling to RECEIVE messages. Not insurmountable.
275
transports: ["websocket"],
276
rememberUpgrade: true,
277
278
// nodejs specific for project/compute server in some settings
279
rejectUnauthorized: false,
280
281
reconnection: true,
282
reconnectionDelay: process.env.COCALC_TEST_MODE ? 50 : 500,
283
reconnectionDelayMax: process.env.COCALC_TEST_MODE ? 500 : 15000,
284
reconnectionAttempts: 9999999999, // infinite
285
};
286
287
type State = "disconnected" | "connected" | "closed";
288
289
const logger = getLogger("core/client");
290
291
interface Options {
292
// address = the address of a cocalc server, including the base url, e.g.,
293
//
294
// https://cocalc.com
295
//
296
// or for a dev server running locally with a base url:
297
//
298
// http://localhost:4043/3fa218e5-7196-4020-8b30-e2127847cc4f/port/5002
299
//
300
// The socketio path is always /conat (after the base url) and is set automatically.
301
//
302
address?: string;
303
inboxPrefix?: string;
304
systemAccountPassword?: string;
305
}
306
307
export type ClientOptions = Options & {
308
noCache?: boolean;
309
} & Partial<SocketOptions> &
310
Partial<ManagerOptions>;
311
312
const INBOX_PREFIX = "_INBOX";
313
const REPLY_HEADER = "CN-Reply";
314
const MAX_HEADER_SIZE = 100000;
315
316
const STATS_LOOP = 5000;
317
318
// fairly long since this is to avoid leaks, not for responsiveness in the UI.
319
export const DEFAULT_SUBSCRIPTION_TIMEOUT = 60_000;
320
321
// long so servers don't get DOS's on startup, etc. Also, we use interest-based
322
// checks when publish and request fail, so we're not depending on these to
323
// fail as part of the normal startup process for anything.
324
export let DEFAULT_REQUEST_TIMEOUT = 30_000;
325
export let DEFAULT_PUBLISH_TIMEOUT = 30_000;
326
327
export function setDefaultTimeouts({
328
request = DEFAULT_REQUEST_TIMEOUT,
329
publish = DEFAULT_PUBLISH_TIMEOUT,
330
}: {
331
request?: number;
332
publish?: number;
333
}) {
334
DEFAULT_REQUEST_TIMEOUT = request;
335
DEFAULT_PUBLISH_TIMEOUT = publish;
336
}
337
338
export enum DataEncoding {
339
MsgPack = 0,
340
JsonCodec = 1,
341
}
342
343
interface SubscriptionOptions {
344
maxWait?: number;
345
mesgLimit?: number;
346
queue?: string;
347
respond?: Function;
348
// timeout to create the subscription -- this may wait *until* you connect before
349
// it starts ticking.
350
timeout?: number;
351
}
352
353
// WARNING! This is the default and you can't just change it!
354
// Yes, for specific messages you can, but in general DO NOT. The reason is because, e.g.,
355
// JSON will turn Dates into strings, and we no longer fix that. So unless you modify the
356
// JsonCodec to handle Date's properly, don't change this!!
357
const DEFAULT_ENCODING = DataEncoding.MsgPack;
358
359
function cocalcServerToSocketioAddress(url: string): {
360
address: string;
361
path: string;
362
} {
363
const u = new URL(url, "http://dummy.org");
364
const address = u.origin;
365
const path = join(u.pathname, "conat");
366
return { address, path };
367
}
368
369
const cache = refCacheSync<ClientOptions, Client>({
370
name: "conat-client",
371
createObject: (opts: ClientOptions) => {
372
return new Client(opts);
373
},
374
});
375
376
export function connect(opts: ClientOptions = {}) {
377
//console.trace("connect", opts);
378
if (!opts.address) {
379
const x = cache.one();
380
if (x != null) {
381
return x;
382
}
383
}
384
return cache(opts);
385
}
386
387
// Get any cached client, if there is one; otherwise make one
388
// with default options.
389
export function getClient() {
390
return cache.one() ?? connect();
391
}
392
393
export class Client extends EventEmitter {
394
public conn: ReturnType<typeof connectToSocketIO>;
395
// queueGroups is a map from subject to the queue group for the subscription to that subject
396
private queueGroups: { [subject: string]: string } = {};
397
private subs: { [subject: string]: SubscriptionEmitter } = {};
398
private sockets: {
399
// all socket servers created using this Client
400
servers: { [subject: string]: ConatSocketServer };
401
// all client connections created using this Client.
402
clients: { [subject: string]: { [id: string]: ConatSocketClient } };
403
} = { servers: {}, clients: {} };
404
public readonly options: ClientOptions;
405
private inboxSubject: string;
406
private inbox?: EventEmitter;
407
private permissionError = {
408
pub: new TTL<string, string>({ ttl: 1000 * 60 }),
409
sub: new TTL<string, string>({ ttl: 1000 * 60 }),
410
};
411
public info: ServerInfo | undefined = undefined;
412
// total number of
413
public readonly stats: ConnectionStats & {
414
recv0: { messages: number; bytes: number };
415
} = {
416
send: { messages: 0, bytes: 0 },
417
recv: { messages: 0, bytes: 0 },
418
// recv0 = count since last connect
419
recv0: { messages: 0, bytes: 0 },
420
subs: 0,
421
};
422
423
public readonly id: string = randomId();
424
public state: State = "disconnected";
425
426
constructor(options: ClientOptions) {
427
super();
428
if (!options.address) {
429
if (!process.env.CONAT_SERVER) {
430
throw Error(
431
"Must specificy address or set CONAT_SERVER environment variable",
432
);
433
}
434
options = { ...options, address: process.env.CONAT_SERVER };
435
}
436
this.options = options;
437
this.setMaxListeners(1000);
438
439
// for socket.io the address has no base url
440
const { address, path } = cocalcServerToSocketioAddress(
441
this.options.address!,
442
);
443
logger.debug(`Conat: Connecting to ${this.options.address}...`);
444
// if (options.extraHeaders == null) {
445
// console.trace("WARNING: no auth set");
446
// }
447
this.conn = connectToSocketIO(address, {
448
...DEFAULT_SOCKETIO_CLIENT_OPTIONS,
449
// it is necessary to manually managed reconnects due to a bugs
450
// in socketio that has stumped their devs
451
// -- https://github.com/socketio/socket.io/issues/5197
452
// So no matter what options are set, we never use socketio's
453
// reconnection logic. if options.reconnection is true or
454
// not given, then we implement (in this file) reconnect ourselves.
455
// The browser frontend explicit sets options.reconnection false
456
// and uses its own logic.
457
...options,
458
...(options.systemAccountPassword
459
? {
460
extraHeaders: {
461
...options.extraHeaders,
462
Cookie: `sys=${options.systemAccountPassword}`,
463
},
464
}
465
: undefined),
466
path,
467
reconnection: true,
468
});
469
470
this.conn.on("info", (info, ack) => {
471
if (typeof ack == "function") {
472
ack();
473
}
474
const firstTime = this.info == null;
475
this.info = info;
476
this.emit("info", info);
477
setTimeout(this.syncSubscriptions, firstTime ? 3000 : 0);
478
});
479
this.conn.on("permission", ({ message, type, subject }) => {
480
logger.debug(message);
481
this.permissionError[type]?.set(subject, message);
482
});
483
this.conn.on("connect", async () => {
484
logger.debug(`Conat: Connected to ${this.options.address}`);
485
if (this.conn.connected) {
486
this.setState("connected");
487
}
488
});
489
this.conn.io.on("error", (...args) => {
490
logger.debug(
491
`Conat: Error connecting to ${this.options.address} -- `,
492
...args,
493
);
494
});
495
this.conn.on("disconnect", async () => {
496
if (this.isClosed()) {
497
return;
498
}
499
this.stats.recv0 = { messages: 0, bytes: 0 }; // reset on disconnect
500
this.setState("disconnected");
501
this.disconnectAllSockets();
502
});
503
this.conn.io.connect();
504
this.initInbox();
505
this.statsLoop();
506
}
507
508
cluster = async () => {
509
return await this.conn.timeout(10000).emitWithAck("cluster");
510
};
511
512
disconnect = () => {
513
if (this.isClosed()) {
514
return;
515
}
516
this.disconnectAllSockets();
517
// @ts-ignore
518
setTimeout(() => this.conn.io.disconnect(), 1);
519
};
520
521
// this has NO timeout by default
522
waitUntilSignedIn = reuseInFlight(
523
async ({ timeout }: { timeout?: number } = {}) => {
524
// not "signed in" if --
525
// - not connected, or
526
// - no info at all (which gets sent on sign in)
527
// - or the user is {error:....}, which is what happens when sign in fails
528
// e.g., do to an expired cookie
529
if (
530
this.info == null ||
531
this.state != "connected" ||
532
this.info?.user?.error
533
) {
534
await once(this, "info", timeout);
535
}
536
if (
537
this.info == null ||
538
this.state != "connected" ||
539
this.info?.user?.error
540
) {
541
throw Error("failed to sign in");
542
}
543
},
544
);
545
546
private statsLoop = async () => {
547
await until(
548
async () => {
549
if (this.isClosed()) {
550
return true;
551
}
552
try {
553
await this.waitUntilConnected();
554
if (this.isClosed()) {
555
return true;
556
}
557
this.conn.emit("stats", { recv0: this.stats.recv0 });
558
} catch {}
559
return false;
560
},
561
{ start: STATS_LOOP, max: STATS_LOOP },
562
);
563
};
564
565
interest = async (subject: string): Promise<boolean> => {
566
return await this.waitForInterest(subject, { timeout: 0 });
567
};
568
569
waitForInterest = async (
570
subject: string,
571
{
572
timeout = MAX_INTEREST_TIMEOUT,
573
}: {
574
timeout?: number;
575
} = {},
576
) => {
577
if (!isValidSubjectWithoutWildcards(subject)) {
578
throw Error(
579
`subject ${subject} must be a valid subject without wildcards`,
580
);
581
}
582
timeout = Math.min(timeout, MAX_INTEREST_TIMEOUT);
583
try {
584
const response = await this.conn
585
.timeout(timeout ? timeout : 10000)
586
.emitWithAck("wait-for-interest", { subject, timeout });
587
return response;
588
} catch (err) {
589
throw toConatError(err);
590
}
591
};
592
593
recvStats = (bytes: number) => {
594
this.stats.recv.messages += 1;
595
this.stats.recv.bytes += bytes;
596
this.stats.recv0.messages += 1;
597
this.stats.recv0.bytes += bytes;
598
};
599
600
// There should usually be no reason to call this because socket.io
601
// is so good at abstracting this away. It's useful for unit testing.
602
waitUntilConnected = reuseInFlight(async () => {
603
if (this.conn.connected) {
604
return;
605
}
606
// @ts-ignore
607
await once(this.conn, "connect");
608
});
609
610
waitUntilReady = reuseInFlight(async () => {
611
await this.waitUntilSignedIn();
612
await this.waitUntilConnected();
613
});
614
615
private setState = (state: State) => {
616
if (this.isClosed() || this.state == state) {
617
return;
618
}
619
this.state = state;
620
this.emit(state);
621
};
622
623
private temporaryInboxSubject = () => {
624
if (!this.inboxSubject) {
625
throw Error("inbox not setup properly");
626
}
627
return `${this.inboxSubject}.${randomId()}`;
628
};
629
630
private getInbox = reuseInFlight(async (): Promise<EventEmitter> => {
631
if (this.inbox == null) {
632
if (this.isClosed()) {
633
throw Error("closed");
634
}
635
await once(this, "inbox");
636
}
637
if (this.inbox == null) {
638
throw Error("bug");
639
}
640
return this.inbox;
641
});
642
643
private initInbox = async () => {
644
// For request/respond instead of setting up one
645
// inbox *every time there is a request*, we setup a single
646
// inbox once and for all for all responses. We listen for
647
// everything to inbox...Prefix.* and emit it via this.inbox.
648
// The request sender then listens on this.inbox for the response.
649
// We *could* use a regular subscription for each request,
650
// but (1) that massively increases the load on the server for
651
// every single request (having to create and destroy subscriptions)
652
// and (2) there is a race condition between creating that subscription
653
// and getting the response; it's fine with one server, but with
654
// multiple servers solving the race condition would slow everything down
655
// due to having to wait for so many acknowledgements. Instead, we
656
// remove all those problems by just using a single inbox subscription.
657
const inboxPrefix = this.options.inboxPrefix ?? INBOX_PREFIX;
658
if (!inboxPrefix.startsWith(INBOX_PREFIX)) {
659
throw Error(`custom inboxPrefix must start with '${INBOX_PREFIX}'`);
660
}
661
this.inboxSubject = `${inboxPrefix}.${randomId()}`;
662
let sub;
663
await until(
664
async () => {
665
try {
666
await this.waitUntilSignedIn();
667
sub = await this.subscribe(this.inboxSubject + ".*");
668
return true;
669
} catch (err) {
670
if (this.isClosed()) {
671
return true;
672
}
673
// this should only fail due to permissions issues, at which point
674
// request can't work, but pub/sub can.
675
if (!process.env.COCALC_TEST_MODE) {
676
console.log(`WARNING: inbox not available -- ${err}`);
677
}
678
}
679
return false;
680
},
681
{ start: 1000, max: 15000 },
682
);
683
if (this.isClosed()) {
684
return;
685
}
686
687
this.inbox = new EventEmitter();
688
(async () => {
689
for await (const mesg of sub) {
690
if (this.inbox == null) {
691
return;
692
}
693
this.inbox.emit(mesg.subject, mesg);
694
}
695
})();
696
this.emit("inbox", this.inboxSubject);
697
};
698
699
private isClosed = () => {
700
return this.state == "closed";
701
};
702
703
close = () => {
704
if (this.isClosed()) {
705
return;
706
}
707
this.setState("closed");
708
this.removeAllListeners();
709
this.closeAllSockets();
710
// @ts-ignore
711
delete this.sockets;
712
for (const subject in this.queueGroups) {
713
this.conn.emit("unsubscribe", { subject });
714
delete this.queueGroups[subject];
715
}
716
for (const sub of Object.values(this.subs)) {
717
sub.refCount = 0;
718
sub.close();
719
// @ts-ignore
720
delete this.subs;
721
}
722
// @ts-ignore
723
delete this.queueGroups;
724
// @ts-ignore
725
delete this.inboxSubject;
726
delete this.inbox;
727
// @ts-ignore
728
delete this.options;
729
// @ts-ignore
730
delete this.info;
731
// @ts-ignore
732
delete this.permissionError;
733
734
try {
735
this.conn.close();
736
} catch {}
737
};
738
739
private syncSubscriptions = reuseInFlight(async () => {
740
let fails = 0;
741
await until(
742
async () => {
743
if (this.isClosed()) return true;
744
try {
745
if (this.info == null) {
746
// no point in trying until we are signed in and connected
747
await once(this, "info");
748
}
749
if (this.isClosed()) return true;
750
await this.waitUntilConnected();
751
if (this.isClosed()) return true;
752
const stable = await this.syncSubscriptions0(10000);
753
if (stable) {
754
return true;
755
}
756
} catch (err) {
757
fails++;
758
if (fails >= 3) {
759
console.log(
760
`WARNING: failed to sync subscriptions ${fails} times -- ${err}`,
761
);
762
}
763
}
764
return false;
765
},
766
{ start: 1000, max: 15000 },
767
);
768
});
769
770
// syncSubscriptions0 ensures that we're subscribed on server
771
// to what we think we're subscribed to, or throws an error.
772
private syncSubscriptions0 = async (timeout: number): Promise<boolean> => {
773
if (this.isClosed()) return true;
774
if (this.info == null) {
775
throw Error("not signed in");
776
}
777
const subs = await this.getSubscriptions(timeout);
778
// console.log("syncSubscriptions", {
779
// server: subs,
780
// client: Object.keys(this.queueGroups),
781
// });
782
const missing: { subject: string; queue: string }[] = [];
783
for (const subject in this.queueGroups) {
784
// subscribe on backend to all subscriptions we think we should have that
785
// the server does not have
786
if (!subs.has(subject)) {
787
missing.push({
788
subject,
789
queue: this.queueGroups[subject],
790
});
791
}
792
}
793
let stable = true;
794
if (missing.length > 0) {
795
stable = false;
796
const resp = await this.conn
797
.timeout(timeout)
798
.emitWithAck("subscribe", missing);
799
// some subscription could fail due to permissions changes, e.g., user got
800
// removed from a project.
801
for (let i = 0; i < missing.length; i++) {
802
if (resp[i].error) {
803
const sub = this.subs[missing[i].subject];
804
if (sub != null) {
805
sub.close(true);
806
}
807
}
808
}
809
}
810
const extra: { subject: string }[] = [];
811
for (const subject in subs) {
812
if (this.queueGroups[subject] != null) {
813
// server thinks we're subscribed but we do not think so, so cancel that
814
extra.push({ subject });
815
}
816
}
817
if (extra.length > 0) {
818
await this.conn.timeout(timeout).emitWithAck("unsubscribe", extra);
819
stable = false;
820
}
821
return stable;
822
};
823
824
numSubscriptions = () => Object.keys(this.queueGroups).length;
825
826
private getSubscriptions = async (
827
timeout = DEFAULT_REQUEST_TIMEOUT,
828
): Promise<Set<string>> => {
829
const subs = await this.conn
830
.timeout(timeout)
831
.emitWithAck("subscriptions", null);
832
return new Set(subs);
833
};
834
835
// returns EventEmitter that emits 'message', mesg: Message
836
private subscriptionEmitter = (
837
subject: string,
838
{
839
closeWhenOffCalled,
840
queue,
841
confirm,
842
timeout,
843
}: {
844
// if true, when the off method of the event emitter is called, then
845
// the entire subscription is closed. This is very useful when we wrap the
846
// EvenEmitter in an async iterator.
847
closeWhenOffCalled?: boolean;
848
849
// the queue group -- if not given, then one is randomly assigned.
850
queue?: string;
851
852
// confirm -- get confirmation back from server that subscription was created
853
confirm?: boolean;
854
855
// how long to wait to confirm creation of the subscription;
856
// only explicitly *used* when confirm=true, but always must be set.
857
timeout?: number;
858
} = {},
859
): { sub: SubscriptionEmitter; promise? } => {
860
// Having timeout set at all is absolutely critical because if the connection
861
// goes down while making the subscription, having some timeout causes
862
// socketio to throw an error, which avoids a huge potential subscription
863
// leak. We set this by default to DEFAULT_SUBSCRIPTION_TIMEOUT.
864
if (!timeout) {
865
timeout = DEFAULT_SUBSCRIPTION_TIMEOUT;
866
}
867
if (this.isClosed()) {
868
throw Error("closed");
869
}
870
if (!isValidSubject(subject)) {
871
throw Error(`invalid subscribe subject '${subject}'`);
872
}
873
if (this.permissionError.sub.has(subject)) {
874
const message = this.permissionError.sub.get(subject)!;
875
logger.debug(message);
876
throw new ConatError(message, { code: 403 });
877
}
878
let sub = this.subs[subject];
879
if (sub != null) {
880
if (queue && this.queueGroups[subject] != queue) {
881
throw Error(
882
`client can only have one queue group subscription for a given subject -- subject='${subject}', queue='${queue}'`,
883
);
884
}
885
sub.refCount += 1;
886
return { sub, promise: undefined };
887
}
888
if (this.queueGroups[subject] != null) {
889
throw Error(`already subscribed to '${subject}'`);
890
}
891
if (!queue) {
892
queue = randomId();
893
}
894
this.queueGroups[subject] = queue;
895
sub = new SubscriptionEmitter({
896
client: this,
897
subject,
898
closeWhenOffCalled,
899
});
900
this.subs[subject] = sub;
901
this.stats.subs++;
902
let promise;
903
if (confirm) {
904
const f = async () => {
905
let response;
906
try {
907
if (timeout) {
908
response = await this.conn
909
.timeout(timeout)
910
.emitWithAck("subscribe", { subject, queue });
911
} else {
912
// this should never be used -- see above
913
response = await this.conn.emitWithAck("subscribe", {
914
subject,
915
queue,
916
});
917
}
918
} catch (err) {
919
throw toConatError(err);
920
}
921
if (response?.error) {
922
throw new ConatError(response.error, { code: response.code });
923
}
924
return response;
925
};
926
promise = f();
927
} else {
928
this.conn.emit("subscribe", { subject, queue });
929
promise = undefined;
930
}
931
sub.once("closed", () => {
932
if (this.isClosed()) {
933
return;
934
}
935
this.conn.emit("unsubscribe", { subject });
936
delete this.queueGroups[subject];
937
if (this.subs[subject] != null) {
938
this.stats.subs--;
939
delete this.subs[subject];
940
}
941
});
942
return { sub, promise };
943
};
944
945
private subscriptionIterator = (
946
sub,
947
opts?: SubscriptionOptions,
948
): Subscription => {
949
// @ts-ignore
950
const iter = new EventIterator<Message>(sub, "message", {
951
idle: opts?.maxWait,
952
limit: opts?.mesgLimit,
953
map: (args) => args[0],
954
});
955
return iter;
956
};
957
958
subscribeSync = (
959
subject: string,
960
opts?: SubscriptionOptions,
961
): Subscription => {
962
const { sub } = this.subscriptionEmitter(subject, {
963
confirm: false,
964
closeWhenOffCalled: true,
965
queue: opts?.queue,
966
});
967
return this.subscriptionIterator(sub, opts);
968
};
969
970
subscribe = async (
971
subject: string,
972
opts?: SubscriptionOptions,
973
): Promise<Subscription> => {
974
await this.waitUntilSignedIn();
975
const { sub, promise } = this.subscriptionEmitter(subject, {
976
confirm: true,
977
closeWhenOffCalled: true,
978
queue: opts?.queue,
979
timeout: opts?.timeout,
980
});
981
try {
982
await promise;
983
} catch (err) {
984
sub.close();
985
throw err;
986
}
987
return this.subscriptionIterator(sub, opts);
988
};
989
990
sub = this.subscribe;
991
992
/*
993
A service is a subscription with a function to respond to requests by name.
994
Call service with an implementation:
995
996
service = await client1.service('arith', {mul : async (a,b)=>{a*b}, add : async (a,b)=>a+b})
997
998
Use the service:
999
1000
arith = await client2.call('arith')
1001
await arith.mul(2,3)
1002
await arith.add(2,3)
1003
1004
There's by default a single queue group '0', so if you create multiple services on various
1005
computers, then requests are load balanced across them automatically. Explicitly set
1006
a random queue group (or something else) and use callMany if you don't want this behavior.
1007
1008
Close the service when done:
1009
1010
service.close();
1011
1012
See backend/conat/test/core/services.test.ts for a tested and working example
1013
that involves typescript and shows how to use wildcard subjects and get the
1014
specific subject used for a call by using that this is bound to the calling mesg.
1015
*/
1016
service: <T = any>(
1017
subject: string,
1018
impl: T,
1019
opts?: SubscriptionOptions,
1020
) => Promise<Subscription> = async (subject, impl, opts) => {
1021
const sub = await this.subscribe(subject, {
1022
...opts,
1023
queue: opts?.queue ?? "0",
1024
});
1025
const respond = async (mesg: Message) => {
1026
try {
1027
const [name, args] = mesg.data;
1028
// call impl[name], but with 'this' set to the object {subject:...},
1029
// so inside the service, it is possible to know what subject was used
1030
// in the request, in case subject is a wildcard subject.
1031
// const result = await impl[name].apply(
1032
// { subject: mesg.subject },
1033
// ...args,
1034
// );
1035
// const result = await impl[name].apply(
1036
// { subject: mesg.subject },
1037
// ...args,
1038
// );
1039
// mesg.respondSync(result);
1040
let f = impl[name];
1041
if (f == null) {
1042
throw Error(`${name} not defined`);
1043
}
1044
const result = await f.apply(mesg, args);
1045
// use await mesg.respond so waitForInterest is on, which is almost always
1046
// good for services.
1047
await mesg.respond(result);
1048
} catch (err) {
1049
await mesg.respond(null, {
1050
noThrow: true, // we're not catching this one
1051
headers: { error: `${err}` },
1052
});
1053
}
1054
};
1055
const loop = async () => {
1056
// todo -- param to set max number of responses at once.
1057
for await (const mesg of sub) {
1058
respond(mesg);
1059
}
1060
};
1061
loop();
1062
return sub;
1063
};
1064
1065
// Call a service as defined above.
1066
call<T = any>(subject: string, opts?: PublishOptions): T {
1067
const call = async (name: string, args: any[]) => {
1068
const resp = await this.request(subject, [name, args], opts);
1069
if (resp.headers?.error) {
1070
throw Error(`${resp.headers.error}`);
1071
} else {
1072
return resp.data;
1073
}
1074
};
1075
1076
return new Proxy(
1077
{},
1078
{
1079
get: (_, name) => {
1080
if (typeof name !== "string") {
1081
return undefined;
1082
}
1083
return async (...args) => await call(name, args);
1084
},
1085
},
1086
) as T;
1087
}
1088
1089
callMany<T = any>(subject: string, opts?: RequestManyOptions): T {
1090
const maxWait = opts?.maxWait ? opts?.maxWait : DEFAULT_REQUEST_TIMEOUT;
1091
const self = this;
1092
async function* callMany(name: string, args: any[]) {
1093
const sub = await self.requestMany(subject, [name, args], {
1094
...opts,
1095
maxWait,
1096
});
1097
for await (const resp of sub) {
1098
if (resp.headers?.error) {
1099
yield new ConatError(`${resp.headers.error}`, {
1100
code: resp.headers.code,
1101
});
1102
} else {
1103
yield resp.data;
1104
}
1105
}
1106
}
1107
1108
return new Proxy(
1109
{},
1110
{
1111
get: (_, name) => {
1112
if (typeof name !== "string") {
1113
return undefined;
1114
}
1115
return async (...args) => await callMany(name, args);
1116
},
1117
},
1118
) as T;
1119
}
1120
1121
publishSync = (
1122
subject: string,
1123
mesg,
1124
opts?: PublishOptions,
1125
): { bytes: number } => {
1126
if (this.isClosed()) {
1127
// already closed
1128
return { bytes: 0 };
1129
}
1130
// must NOT confirm
1131
return this._publish(subject, mesg, { ...opts, confirm: false });
1132
};
1133
1134
publish = async (
1135
subject: string,
1136
mesg,
1137
opts: PublishOptions = {},
1138
): Promise<{
1139
// bytes encoded (doesn't count some extra wrapping)
1140
bytes: number;
1141
// count is the number of matching subscriptions
1142
// that the server *sent* this message to since the server knows about them.
1143
// However, there's no guaranteee that the subscribers actually exist
1144
// **right now** or received these messages.
1145
count: number;
1146
}> => {
1147
try {
1148
if (this.isClosed()) {
1149
// already closed
1150
return { bytes: 0, count: 0 };
1151
}
1152
await this.waitUntilSignedIn();
1153
const start = Date.now();
1154
const { bytes, getCount, promise } = this._publish(subject, mesg, {
1155
...opts,
1156
confirm: true,
1157
});
1158
await promise;
1159
let count = getCount?.()!;
1160
1161
if (
1162
opts.waitForInterest &&
1163
count != null &&
1164
count == 0 &&
1165
!this.isClosed() &&
1166
(opts.timeout == null || Date.now() - start <= opts.timeout)
1167
) {
1168
let timeout = opts.timeout ?? DEFAULT_WAIT_FOR_INTEREST_TIMEOUT;
1169
await this.waitForInterest(subject, {
1170
timeout: timeout ? timeout - (Date.now() - start) : undefined,
1171
});
1172
if (this.isClosed()) {
1173
return { bytes, count };
1174
}
1175
const elapsed = Date.now() - start;
1176
timeout -= elapsed;
1177
// client and there is interest
1178
if (timeout <= 500) {
1179
// but... not enough time left to try again even if there is interest,
1180
// i.e., will fail anyways due to network latency
1181
return { bytes, count };
1182
}
1183
const { getCount, promise } = this._publish(subject, mesg, {
1184
...opts,
1185
timeout,
1186
confirm: true,
1187
});
1188
await promise;
1189
count = getCount?.()!;
1190
}
1191
return { bytes, count };
1192
} catch (err) {
1193
if (opts.noThrow) {
1194
return { bytes: 0, count: 0 };
1195
} else {
1196
throw err;
1197
}
1198
}
1199
};
1200
1201
private _publish = (
1202
subject: string,
1203
mesg,
1204
{
1205
headers,
1206
raw,
1207
encoding = DEFAULT_ENCODING,
1208
confirm,
1209
timeout = DEFAULT_PUBLISH_TIMEOUT,
1210
noThrow,
1211
}: PublishOptions & { confirm?: boolean } = {},
1212
) => {
1213
if (this.isClosed()) {
1214
return { bytes: 0 };
1215
}
1216
if (!isValidSubjectWithoutWildcards(subject)) {
1217
throw Error(`invalid publish subject ${subject}`);
1218
}
1219
if (this.permissionError.pub.has(subject)) {
1220
const message = this.permissionError.pub.get(subject)!;
1221
logger.debug(message);
1222
throw new ConatError(message, { code: 403 });
1223
}
1224
raw = raw ?? encode({ encoding, mesg });
1225
this.stats.send.messages += 1;
1226
this.stats.send.bytes += raw.length;
1227
1228
// default to 1MB is safe since it's at least that big.
1229
const chunkSize = Math.max(
1230
1000,
1231
(this.info?.max_payload ?? 1e6) - MAX_HEADER_SIZE,
1232
);
1233
let seq = 0;
1234
let id = randomId();
1235
const promises: any[] = [];
1236
let count = 0;
1237
for (let i = 0; i < raw.length; i += chunkSize) {
1238
// !!FOR TESTING ONLY!!
1239
// if (Math.random() <= 0.01) {
1240
// console.log("simulating a chunk drop", { subject, seq });
1241
// seq += 1;
1242
// continue;
1243
// }
1244
const done = i + chunkSize >= raw.length ? 1 : 0;
1245
const v: any[] = [
1246
subject,
1247
id,
1248
seq,
1249
done,
1250
encoding,
1251
raw.slice(i, i + chunkSize),
1252
// position v[6] is used for clusters
1253
];
1254
if (done && headers) {
1255
v.push(headers);
1256
}
1257
if (confirm) {
1258
const f = async () => {
1259
if (timeout) {
1260
try {
1261
const response = await this.conn
1262
.timeout(timeout)
1263
.emitWithAck("publish", v);
1264
if (response?.error) {
1265
throw new ConatError(response.error, { code: response.code });
1266
} else {
1267
return response;
1268
}
1269
} catch (err) {
1270
throw toConatError(err);
1271
}
1272
} else {
1273
return await this.conn.emitWithAck("publish", v);
1274
}
1275
};
1276
const promise = (async () => {
1277
try {
1278
const response = await f();
1279
count = Math.max(count, response.count ?? 0);
1280
} catch (err) {
1281
if (!noThrow) {
1282
throw err;
1283
}
1284
}
1285
})();
1286
promises.push(promise);
1287
} else {
1288
this.conn.emit("publish", v);
1289
}
1290
seq += 1;
1291
}
1292
if (confirm) {
1293
return {
1294
bytes: raw.length,
1295
getCount: () => count,
1296
promise: Promise.all(promises),
1297
};
1298
}
1299
return { bytes: raw.length };
1300
};
1301
1302
pub = this.publish;
1303
1304
request = async (
1305
subject: string,
1306
mesg: any,
1307
{ timeout = DEFAULT_REQUEST_TIMEOUT, ...options }: PublishOptions = {},
1308
): Promise<Message> => {
1309
if (timeout <= 0) {
1310
throw Error("timeout must be positive");
1311
}
1312
const inbox = await this.getInbox();
1313
const inboxSubject = this.temporaryInboxSubject();
1314
const sub = new EventIterator<Message>(inbox, inboxSubject, {
1315
idle: timeout,
1316
limit: 1,
1317
map: (args) => args[0],
1318
});
1319
1320
const opts = {
1321
...options,
1322
timeout,
1323
headers: { ...options?.headers, [REPLY_HEADER]: inboxSubject },
1324
};
1325
const { count } = await this.publish(subject, mesg, opts);
1326
if (!count) {
1327
sub.stop();
1328
// if you hit this, consider using the option waitForInterest:true
1329
throw new ConatError(`request -- no subscribers matching '${subject}'`, {
1330
code: 503,
1331
});
1332
}
1333
for await (const resp of sub) {
1334
sub.stop();
1335
return resp;
1336
}
1337
sub.stop();
1338
throw new ConatError("timeout", { code: 408 });
1339
};
1340
1341
// NOTE: Using requestMany returns a Subscription sub, and
1342
// you can call sub.close(). However, the sender doesn't
1343
// know that this happened and the messages are still going
1344
// to your inbox. Similarly if you set a maxWait, the
1345
// subscription just ends at that point, but the server
1346
// sending messages doesn't know. This is a shortcoming the
1347
// pub/sub model. You must decide entirely based on your
1348
// own application protocol how to terminate.
1349
requestMany = async (
1350
subject: string,
1351
mesg: any,
1352
{ maxMessages, maxWait, ...options }: RequestManyOptions = {},
1353
): Promise<Subscription> => {
1354
if (maxMessages != null && maxMessages <= 0) {
1355
throw Error("maxMessages must be positive");
1356
}
1357
if (maxWait != null && maxWait <= 0) {
1358
throw Error("maxWait must be positive");
1359
}
1360
const inbox = await this.getInbox();
1361
const inboxSubject = this.temporaryInboxSubject();
1362
const sub = new EventIterator<Message>(inbox, inboxSubject, {
1363
idle: maxWait,
1364
limit: maxMessages,
1365
map: (args) => args[0],
1366
});
1367
const { count } = await this.publish(subject, mesg, {
1368
...options,
1369
headers: { ...options?.headers, [REPLY_HEADER]: inboxSubject },
1370
});
1371
if (!count) {
1372
sub.stop();
1373
throw new ConatError(
1374
`requestMany -- no subscribers matching ${subject}`,
1375
{ code: 503 },
1376
);
1377
}
1378
return sub;
1379
};
1380
1381
// watch: this is mainly for debugging and interactive use.
1382
watch = (
1383
subject: string,
1384
cb = (x) => console.log(`${new Date()}: ${x.subject}:`, x.data, x.headers),
1385
opts?: SubscriptionOptions,
1386
) => {
1387
const sub = this.subscribeSync(subject, opts);
1388
const f = async () => {
1389
for await (const x of sub) {
1390
cb(x);
1391
}
1392
};
1393
f();
1394
return sub;
1395
};
1396
1397
sync = {
1398
dkv: async <T,>(opts: DKVOptions): Promise<DKV<T>> =>
1399
await dkv<T>({ ...opts, client: this }),
1400
akv: <T,>(opts: DKVOptions): AKV<T> => akv<T>({ ...opts, client: this }),
1401
dko: async <T,>(opts: DKVOptions): Promise<DKO<T>> =>
1402
await dko<T>({ ...opts, client: this }),
1403
dstream: async <T,>(opts: DStreamOptions): Promise<DStream<T>> =>
1404
await dstream<T>({ ...opts, client: this }),
1405
astream: <T,>(opts: DStreamOptions): AStream<T> =>
1406
astream<T>({ ...opts, client: this }),
1407
synctable: async (opts: SyncTableOptions): Promise<ConatSyncTable> =>
1408
await createSyncTable({ ...opts, client: this }),
1409
};
1410
1411
socket = {
1412
listen: (
1413
subject: string,
1414
opts?: SocketConfiguration & { id?: string },
1415
): ConatSocketServer => {
1416
if (this.state == "closed") {
1417
throw Error("closed");
1418
}
1419
if (this.sockets.servers[subject] !== undefined) {
1420
throw Error(
1421
`there can be at most one socket server per client listening on a subject (subject='${subject}')`,
1422
);
1423
}
1424
const server = new ConatSocketServer({
1425
subject,
1426
role: "server",
1427
client: this,
1428
// ok to use this.id as default since can have at most
1429
// one server per client for a given subject
1430
id: this.id,
1431
...opts,
1432
});
1433
this.sockets.servers[subject] = server;
1434
server.once("closed", () => {
1435
delete this.sockets.servers[subject];
1436
});
1437
return server;
1438
},
1439
1440
connect: (
1441
subject: string,
1442
opts?: SocketConfiguration & { id?: string },
1443
): ConatSocketClient => {
1444
if (this.state == "closed") {
1445
throw Error("closed");
1446
}
1447
const id = opts?.id ?? randomId();
1448
const client = new ConatSocketClient({
1449
subject,
1450
role: "client",
1451
client: this,
1452
id,
1453
...opts,
1454
});
1455
if (this.sockets.clients[subject] === undefined) {
1456
this.sockets.clients[subject] = { [id]: client };
1457
} else {
1458
this.sockets.clients[subject][id] = client;
1459
}
1460
client.once("closed", () => {
1461
const v = this.sockets.clients[subject];
1462
if (v != null) {
1463
delete v[id];
1464
if (isEmpty(v)) {
1465
delete this.sockets.clients[subject];
1466
}
1467
}
1468
});
1469
return client;
1470
},
1471
};
1472
1473
private disconnectAllSockets = () => {
1474
if (this.state == "closed") {
1475
return;
1476
}
1477
for (const subject in this.sockets.servers) {
1478
this.sockets.servers[subject].disconnect();
1479
}
1480
for (const subject in this.sockets.clients) {
1481
for (const id in this.sockets.clients[subject]) {
1482
this.sockets.clients[subject][id].disconnect();
1483
}
1484
}
1485
};
1486
1487
private closeAllSockets = () => {
1488
for (const subject in this.sockets.servers) {
1489
this.sockets.servers[subject].close();
1490
}
1491
for (const subject in this.sockets.clients) {
1492
for (const id in this.sockets.clients[subject]) {
1493
this.sockets.clients[subject][id].close();
1494
}
1495
}
1496
};
1497
1498
message = (mesg, options?) => messageData(mesg, options);
1499
1500
bench = {
1501
publish: async (n: number = 1000, subject = "bench"): Promise<number> => {
1502
const t0 = Date.now();
1503
console.log(`publishing ${n} messages to`, { subject });
1504
for (let i = 0; i < n - 1; i++) {
1505
this.publishSync(subject, null);
1506
}
1507
// then send one final message and wait for an ack.
1508
// since messages are in order, we know that all other
1509
// messages were delivered to the server.
1510
const { count } = await this.publish(subject, null);
1511
console.log("listeners: ", count);
1512
const t1 = Date.now();
1513
const rate = Math.round((n / (t1 - t0)) * 1000);
1514
console.log(rate, "messages per second delivered");
1515
return rate;
1516
},
1517
1518
subscribe: async (n: number = 1000, subject = "bench"): Promise<number> => {
1519
const sub = await this.subscribe(subject);
1520
// send the data
1521
for (let i = 0; i < n; i++) {
1522
this.publishSync(subject, null);
1523
}
1524
const t0 = Date.now();
1525
let i = 0;
1526
for await (const _ of sub) {
1527
i += 1;
1528
if (i >= n) {
1529
break;
1530
}
1531
}
1532
const t1 = Date.now();
1533
return Math.round((n / (t1 - t0)) * 1000);
1534
},
1535
};
1536
}
1537
1538
interface PublishOptions {
1539
headers?: Headers;
1540
// if encoding is given, it specifies the encoding used to encode the message
1541
encoding?: DataEncoding;
1542
// if raw is given, then it is assumed to be the raw binary
1543
// encoded message (using encoding) and any mesg parameter
1544
// is *IGNORED*.
1545
raw?;
1546
1547
// timeout used when publishing a message and awaiting a response.
1548
timeout?: number;
1549
1550
// waitForInterest -- if publishing async so its possible to tell whether or not
1551
// there were any recipients, and there were NO recipients, it will wait until
1552
// there is a recipient and send again. This does NOT use polling, but instead
1553
// uses a cluster aware and fully event based primitive in the server.
1554
// There is thus only a speed penality doing this on failure and never
1555
// on success. Note that waitForInterest always has a timeout, defaulting
1556
// to DEFAULT_WAIT_FOR_INTEREST_TIMEOUT if above timeout not given.
1557
waitForInterest?: boolean;
1558
1559
// noThrow -- if set and publishing would throw an exception, it is
1560
// instead silently dropped and undefined is returned instead.
1561
// Returned value of bytes and count will are not defined.
1562
// Use this where you might want to use publishSync, but still want
1563
// to ensure there is interest; however, it's not important to know
1564
// if there was an error sending or that sending worked.
1565
noThrow?: boolean;
1566
}
1567
1568
interface RequestManyOptions extends PublishOptions {
1569
maxWait?: number;
1570
maxMessages?: number;
1571
}
1572
1573
export function encode({
1574
encoding,
1575
mesg,
1576
}: {
1577
encoding: DataEncoding;
1578
mesg: any;
1579
}) {
1580
if (encoding == DataEncoding.MsgPack) {
1581
return msgpack.encode(mesg, MSGPACK_ENCODER_OPTIONS);
1582
} else if (encoding == DataEncoding.JsonCodec) {
1583
return jsonEncoder(mesg);
1584
} else {
1585
throw Error(`unknown encoding ${encoding}`);
1586
}
1587
}
1588
1589
export function decode({
1590
encoding,
1591
data,
1592
}: {
1593
encoding: DataEncoding;
1594
data;
1595
}): any {
1596
if (encoding == DataEncoding.MsgPack) {
1597
return msgpack.decode(data);
1598
} else if (encoding == DataEncoding.JsonCodec) {
1599
return jsonDecoder(data);
1600
} else {
1601
throw Error(`unknown encoding ${encoding}`);
1602
}
1603
}
1604
1605
let textEncoder: TextEncoder | undefined = undefined;
1606
let textDecoder: TextDecoder | undefined = undefined;
1607
1608
function jsonEncoder(obj: any) {
1609
if (textEncoder === undefined) {
1610
textEncoder = new TextEncoder();
1611
}
1612
return textEncoder.encode(JSON.stringify(obj));
1613
}
1614
1615
function jsonDecoder(data: Buffer): any {
1616
if (textDecoder === undefined) {
1617
textDecoder = new TextDecoder();
1618
}
1619
return JSON.parse(textDecoder.decode(data));
1620
}
1621
1622
interface Chunk {
1623
id: string;
1624
seq: number;
1625
done: number;
1626
buffer: Buffer;
1627
headers?: any;
1628
}
1629
1630
// if an incoming message has chunks at least this old
1631
// we give up on it and discard all of them. This avoids
1632
// memory leaks when a chunk is dropped.
1633
const MAX_CHUNK_TIME = 2 * 60000;
1634
1635
class SubscriptionEmitter extends EventEmitter {
1636
private incoming: { [id: string]: (Partial<Chunk> & { time: number })[] } =
1637
{};
1638
private client: Client;
1639
private closeWhenOffCalled?: boolean;
1640
private subject: string;
1641
public refCount: number = 1;
1642
1643
constructor({ client, subject, closeWhenOffCalled }) {
1644
super();
1645
this.client = client;
1646
this.subject = subject;
1647
this.client.conn.on(subject, this.handle);
1648
this.closeWhenOffCalled = closeWhenOffCalled;
1649
this.dropOldLoop();
1650
}
1651
1652
close = (force?) => {
1653
this.refCount -= 1;
1654
// console.log("SubscriptionEmitter.close - refCount =", this.refCount, this.subject);
1655
if (this.client == null || (!force && this.refCount > 0)) {
1656
return;
1657
}
1658
this.emit("closed");
1659
this.client.conn.removeListener(this.subject, this.handle);
1660
// @ts-ignore
1661
delete this.incoming;
1662
// @ts-ignore
1663
delete this.client;
1664
// @ts-ignore
1665
delete this.subject;
1666
// @ts-ignore
1667
delete this.closeWhenOffCalled;
1668
this.removeAllListeners();
1669
};
1670
1671
off(a, b) {
1672
super.off(a, b);
1673
if (this.closeWhenOffCalled) {
1674
this.close();
1675
}
1676
return this;
1677
}
1678
1679
private handle = ({ subject, data }) => {
1680
if (this.client == null) {
1681
return;
1682
}
1683
const [id, seq, done, encoding, buffer, headers] = data;
1684
// console.log({ id, seq, done, encoding, buffer, headers });
1685
const chunk = { seq, done, encoding, buffer, headers };
1686
const { incoming } = this;
1687
if (incoming[id] == null) {
1688
if (seq != 0) {
1689
// part of a dropped message -- by definition this should just
1690
// silently happen and be handled via application level encodings
1691
// elsewhere
1692
console.log(
1693
`WARNING: drop packet from ${this.subject} -- first message has wrong seq`,
1694
{ seq },
1695
);
1696
return;
1697
}
1698
incoming[id] = [];
1699
} else {
1700
const prev = incoming[id].slice(-1)[0].seq ?? -1;
1701
if (prev + 1 != seq) {
1702
console.log(
1703
`WARNING: drop packet from ${this.subject} -- seq number wrong`,
1704
{ prev, seq },
1705
);
1706
// part of message was dropped -- discard everything
1707
delete incoming[id];
1708
return;
1709
}
1710
}
1711
incoming[id].push({ ...chunk, time: Date.now() });
1712
if (chunk.done) {
1713
// console.log("assembling ", incoming[id].length, "chunks");
1714
const chunks = incoming[id].map((x) => x.buffer!);
1715
// TESTING ONLY!!
1716
// This is not necessary due to the above checks as messages arrive.
1717
// for (let i = 0; i < incoming[id].length; i++) {
1718
// if (incoming[id][i]?.seq != i) {
1719
// console.log(`WARNING: bug -- invalid chunk data! -- ${subject}`);
1720
// throw Error("bug -- invalid chunk data!");
1721
// }
1722
// }
1723
const raw = concatArrayBuffers(chunks);
1724
1725
// TESTING ONLY!!
1726
// try {
1727
// decode({ encoding, data: raw });
1728
// } catch (err) {
1729
// console.log(`ERROR - invalid data ${subject}`, incoming[id], err);
1730
// }
1731
1732
delete incoming[id];
1733
const mesg = new Message({
1734
encoding,
1735
raw,
1736
headers,
1737
client: this.client,
1738
subject,
1739
});
1740
this.emit("message", mesg);
1741
this.client.recvStats(raw.byteLength);
1742
}
1743
};
1744
1745
dropOldLoop = async () => {
1746
while (this.incoming != null) {
1747
const cutoff = Date.now() - MAX_CHUNK_TIME;
1748
for (const id in this.incoming) {
1749
const chunks = this.incoming[id];
1750
if (chunks.length > 0 && chunks[0].time <= cutoff) {
1751
console.log(
1752
`WARNING: drop partial message from ${this.subject} due to timeout`,
1753
);
1754
delete this.incoming[id];
1755
}
1756
}
1757
await delay(MAX_CHUNK_TIME / 2);
1758
}
1759
};
1760
}
1761
1762
function concatArrayBuffers(buffers) {
1763
if (buffers.length == 1) {
1764
return buffers[0];
1765
}
1766
if (Buffer.isBuffer(buffers[0])) {
1767
return Buffer.concat(buffers);
1768
}
1769
// browser fallback
1770
const totalLength = buffers.reduce((sum, buf) => sum + buf.byteLength, 0);
1771
const result = new Uint8Array(totalLength);
1772
let offset = 0;
1773
for (const buf of buffers) {
1774
result.set(new Uint8Array(buf), offset);
1775
offset += buf.byteLength;
1776
}
1777
1778
return result.buffer;
1779
}
1780
1781
export type Headers = { [key: string]: JSONValue };
1782
1783
export class MessageData<T = any> {
1784
public readonly encoding: DataEncoding;
1785
public readonly raw;
1786
public readonly headers?: Headers;
1787
1788
constructor({ encoding, raw, headers }) {
1789
this.encoding = encoding;
1790
this.raw = raw;
1791
this.headers = headers;
1792
}
1793
1794
get data(): T {
1795
return decode({ encoding: this.encoding, data: this.raw });
1796
}
1797
1798
get length(): number {
1799
// raw is binary data so it's the closest thing we have to the
1800
// size of this message. It would also make sense to include
1801
// the headers, but JSON'ing them would be expensive, so we don't.
1802
return this.raw.length;
1803
}
1804
}
1805
1806
export class Message<T = any> extends MessageData<T> {
1807
private client: Client;
1808
public readonly subject;
1809
1810
constructor({ encoding, raw, headers, client, subject }) {
1811
super({ encoding, raw, headers });
1812
this.client = client;
1813
this.subject = subject;
1814
}
1815
1816
isRequest = (): boolean => !!this.headers?.[REPLY_HEADER];
1817
1818
private respondSubject = () => {
1819
const subject = this.headers?.[REPLY_HEADER];
1820
if (!subject) {
1821
console.log(
1822
`WARNING: respond -- message to '${this.subject}' is not a request`,
1823
);
1824
return;
1825
}
1826
return `${subject}`;
1827
};
1828
1829
respondSync = (mesg, opts?: PublishOptions): { bytes: number } => {
1830
const subject = this.respondSubject();
1831
if (!subject) return { bytes: 0 };
1832
return this.client.publishSync(subject, mesg, opts);
1833
};
1834
1835
respond = async (
1836
mesg,
1837
opts: PublishOptions = {},
1838
): Promise<{ bytes: number; count: number }> => {
1839
const subject = this.respondSubject();
1840
if (!subject) {
1841
return { bytes: 0, count: 0 };
1842
}
1843
return await this.client.publish(subject, mesg, {
1844
// we *always* wait for interest for async respond, since
1845
// it is by far the most likely situation where it wil be needed, due
1846
// to inboxes when users first sign in.
1847
waitForInterest: true,
1848
...opts,
1849
});
1850
};
1851
}
1852
1853
export function messageData(
1854
mesg,
1855
{ headers, raw, encoding = DEFAULT_ENCODING }: PublishOptions = {},
1856
) {
1857
return new MessageData({
1858
encoding,
1859
raw: raw ?? encode({ encoding, mesg }),
1860
headers,
1861
});
1862
}
1863
1864
export type Subscription = EventIterator<Message>;
1865
1866
export class ConatError extends Error {
1867
code: string | number;
1868
constructor(mesg: string, { code }) {
1869
super(mesg);
1870
this.code = code;
1871
}
1872
}
1873
1874
function isEmpty(obj: object): boolean {
1875
for (const _x in obj) {
1876
return false;
1877
}
1878
return true;
1879
}
1880
1881
function toConatError(socketIoError) {
1882
// only errors are "disconnected" and a timeout
1883
const e = `${socketIoError}`;
1884
if (e.includes("disconnected")) {
1885
return e;
1886
} else {
1887
return new ConatError(`timeout - ${e}`, {
1888
code: 408,
1889
});
1890
}
1891
}
1892
1893