Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/core/client.ts
1710 views
1
/*
2
core/client.s -- core conat client
3
4
This is a client that has a similar API to NATS / Socket.io, but is much,
5
much better in so many ways:
6
7
- It has global pub/sub just like with NATS. This uses the server to
8
rebroadcast messages, and for authentication.
9
Better than NATS: Authentication is done for a subject *as
10
needed* instead of at connection time.
11
12
- Message can be arbitrarily large and they are *automatically* divided
13
into chunks and reassembled. Better than both NATS and socket.io.
14
15
- There are multiple supported ways of encoding messages, and
16
no coordination is required with the server or other clients! E.g.,
17
one message can be sent with one encoding and the next with a different
18
encoding and that's fine.
19
- MsgPack: https://msgpack.org/ -- a very compact encoding that handles
20
dates nicely and small numbers efficiently. This also works
21
well with binary Buffer objects, which is nice.
22
- JsonCodec: uses JSON.stringify and TextEncoder. This does not work
23
with Buffer or Date and is less compact, but can be very fast.
24
25
- One BIG DIFFERENCE from Nats is that when a message is sent the sender
26
can optionally find out how many clients received it. They can also wait
27
until there is interest and send the message again. This is automated so
28
it's very easy to use, and it makes writing distributed services without
29
race conditions making them broken temporarily much easier. HOWEVER,
30
there is one caveat -- if as an admin you create a "tap", i.e., you
31
subscribe to all messages matching some pattern just to see what's going
32
on, then currently that counts in the delivery count and interest, and that
33
would then cause these race conditions to happen again. E.g., a user
34
signs in, subscribes to their INBOX, sends a request, and gets a response
35
to that inbox, but does this all quickly, and in a cluster, the server doesn't
36
see the inbox yet, so it fails. As a workaround, subscriptions to the
37
subject pattern '>' are invisible, so you can always tap into '>' for debugging
38
purposes. TODO: implement a general way of making an invisible subscriber that
39
doesn't count.
40
41
42
THE CORE API
43
44
This section contains the crucial information you have to know to build a distributed
45
system using Conat. It's our take on the NATS primitives (it's not exactly the
46
same, but it is close). It's basically a symmetrical pub/sub/reqest/respond model
47
for messaging on which you can build distributed systems. The tricky part, which
48
NATS.js gets wrong (in my opinion), is implementing this in a way that is robust
49
and scalable, in terms for authentication, real world browser connectivity and
50
so on. Our approach is to use proven mature technology like socket.io and sqlite,
51
instead of writing everything from scratch.
52
53
Clients: We view all clients as plugged into a common "dial tone",
54
except for optional permissions that are configured when starting the server.
55
The methods you call on the client to build everything are:
56
57
- subscribe, subscribeSync - subscribe to a subject which returns an
58
async iterator over all messages that match the subject published by
59
anyone with permission to do so. If you provide the same optional
60
queue parameter for multiple subscribers, then one subscriber in each queue
61
group receives each message. The async form of this functino confirms
62
the subscription was created before returning. If a client creates multiple
63
subscriptions at the same time, the queue group must be the same.
64
Subscriptions are guaranteed to stay valid until the client ends them;
65
they do not stop working due to client or server reconnects or restarts.
66
(If you need more subscriptions with different queue groups, make another
67
client object.)
68
69
- publish, publishSync - publish to a subject. The async version returns
70
a count of the number of recipients, whereas the sync version is
71
fire-and-forget.
72
**There is no a priori size limit on messages since chunking
73
is automatic. However, we have to impose some limit, but
74
it can be much larger than the socketio message size limit.**
75
76
- request - send a message to a subject, and if there is at least one
77
subscriber listening, it may respond. If there are no subscribers,
78
it throws a 503 error. To create a microservice, subscribe
79
to a subject pattern and called mesg.respond(...) on each message you
80
receive.
81
82
- requestMany - send a message to a subject, and receive many
83
messages in reply. Typically you end the response stream by sending
84
a null message, but what you do is up to you. This is very useful
85
for streaming arbitrarily large data, long running changefeeds, LLM
86
responses, etc.
87
88
89
Messages: A message mesg is:
90
91
- Data:
92
- subject - the subject the message was sent to
93
- encoding - usually MessagePack
94
- raw - encoded binary data
95
- headers - a JSON-able Javascript object.
96
97
- Methods:
98
- data: this is a property, so if you do mesg.data, then it decodes raw
99
and returns the resulting Javascript object.
100
- respond, respondSync: if REPLY_HEADER is set, calling this publishes a
101
respond message to the original sender of the message.
102
103
104
Persistence:
105
106
We also implement persistent streams, where you can also set a key. This can
107
be used to build the analogue of Jetstream's streams and kv stores. The object
108
store isn't necessary since there is no limit on message size. Conat's persistent
109
streams are compressed by default and backed by individual sqlite files, which
110
makes them very memory efficient and it is easy to tier storage to cloud storage.
111
112
UNIT TESTS: See packages/server/conat/test/core
113
114
MISC NOTES:
115
116
NOTE: There is a socketio msgpack parser, but it just doesn't
117
work at all, which is weird. Also, I think it's impossible to
118
do the sort of chunking we want at the level of a socket.io
119
parser -- it's just not possible in that the encoding. We customize
120
things purely client side without using a parser, and get a much
121
simpler and better result, inspired by how NATS approaches things
122
with opaque messages.
123
124
125
SUBSCRIPTION ROBUSTNESS: When you call client.subscribe(...) you get back an async iterator.
126
It ONLY ends when you explicitly do the standard ways of terminating
127
such an iterator, including calling .close() on it. It is a MAJOR BUG
128
if it were to terminate for any other reason. In particular, the subscription
129
MUST NEVER throw an error or silently end when the connection is dropped
130
then resumed, or the server is restarted, or the client connects to
131
a different server! These situations can, of course, result in missing
132
some messages, but that's understood. There are no guarantees at all with
133
a subscription that every message is received. Finally, any time a client
134
disconnects and reconnects, the client ensures that all subscriptions
135
exist for it on the server via a sync process.
136
137
Subscription robustness is a major difference with NATS.js, which would
138
mysteriously terminate subscriptions for a variety of reasons, meaning that any
139
code using subscriptions had to be wrapped in ugly complexity to be
140
usable in production.
141
142
INTEREST AWARENESS: In Conat there is a cluster-aware event driving way to
143
wait for interest in a subject. This is an extremely useful extension to
144
NATS functionality, since it makes it much easier to dynamically setup
145
a client and a server and exchange messages without having to poll and fail
146
potentially a few times. This makes certain operations involving complicated
147
steps behind the scenes -- upload a file, open a file to edit with sync, etc. --
148
feel more responsive.
149
150
USAGE:
151
152
The following should mostly work to interactively play around with this
153
code and develop it. It's NOT automatically tested and depends on your
154
environment though, so may break. See the unit tests in
155
156
packages/server/conat/test/core/
157
158
for something that definitely works perfectly.
159
160
161
For developing at the command line, cd to packages/backend, then in node:
162
163
c = require('@cocalc/backend/conat/conat').connect()
164
165
or
166
167
c = require('@cocalc/conat/core/client').connect('http://localhost:3000')
168
169
c.watch('a')
170
171
s = await c.subscribe('a'); for await (const x of s) { console.log(x.length)}
172
173
// in another console
174
175
c = require('@cocalc/backend/conat/conat').connect()
176
c.publish('a', 'hello there')
177
178
// in browser (right now)
179
180
cc.conat.conat()
181
182
// client server:
183
184
s = await c.subscribe('eval'); for await(const x of s) { x.respond(eval(x.data)) }
185
186
then in another console
187
188
f = async () => (await c.request('eval', '2+3')).data
189
await f()
190
191
t = Date.now(); for(i=0;i<1000;i++) { await f()} ; Date.now()-t
192
193
// slower, but won't silently fail due to errors, etc.
194
195
f2 = async () => (await c.request('eval', '2+3', {confirm:true})).data
196
197
Wildcard subject:
198
199
200
c = require('@cocalc/conat/core/client').connect(); c.watch('a.*');
201
202
203
c = require('@cocalc/conat/core/client').connect(); c.publish('a.x', 'foo')
204
205
206
Testing disconnect
207
208
c.sub('>')
209
c.conn.io.engine.close();0;
210
211
other:
212
213
a=0; setInterval(()=>c.pub('a',a++), 250)
214
215
*/
216
217
import {
218
connect as connectToSocketIO,
219
type SocketOptions,
220
type ManagerOptions,
221
} from "socket.io-client";
222
import { EventIterator } from "@cocalc/util/event-iterator";
223
import type { ConnectionStats, ServerInfo } from "./types";
224
import * as msgpack from "@msgpack/msgpack";
225
import { randomId } from "@cocalc/conat/names";
226
import type { JSONValue } from "@cocalc/util/types";
227
import { EventEmitter } from "events";
228
import {
229
isValidSubject,
230
isValidSubjectWithoutWildcards,
231
} from "@cocalc/conat/util";
232
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
233
import { once, until } from "@cocalc/util/async-utils";
234
import { delay } from "awaiting";
235
import { getLogger } from "@cocalc/conat/client";
236
import { refCacheSync } from "@cocalc/util/refcache";
237
import { join } from "path";
238
import { dko, type DKO } from "@cocalc/conat/sync/dko";
239
import { dkv, type DKVOptions, type DKV } from "@cocalc/conat/sync/dkv";
240
import {
241
dstream,
242
type DStreamOptions,
243
type DStream,
244
} from "@cocalc/conat/sync/dstream";
245
import { akv, type AKV } from "@cocalc/conat/sync/akv";
246
import { astream, type AStream } from "@cocalc/conat/sync/astream";
247
import TTL from "@isaacs/ttlcache";
248
import {
249
ConatSocketServer,
250
ConatSocketClient,
251
ServerSocket,
252
type SocketConfiguration,
253
} from "@cocalc/conat/socket";
254
export { type ConatSocketServer, ConatSocketClient, ServerSocket };
255
import {
256
type SyncTableOptions,
257
type ConatSyncTable,
258
createSyncTable,
259
} from "@cocalc/conat/sync/synctable";
260
261
export const MAX_INTEREST_TIMEOUT = 90_000;
262
263
const DEFAULT_WAIT_FOR_INTEREST_TIMEOUT = 30_000;
264
265
const MSGPACK_ENCODER_OPTIONS = {
266
// ignoreUndefined is critical so database queries work properly, and
267
// also we have a lot of api calls with tons of wasted undefined values.
268
ignoreUndefined: true,
269
};
270
271
export const STICKY_QUEUE_GROUP = "sticky";
272
273
export const DEFAULT_SOCKETIO_CLIENT_OPTIONS = {
274
// A major problem if we allow long polling is that we must always use at most
275
// half the chunk size... because there is no way to know if recipients will be
276
// using long polling to RECEIVE messages. Not insurmountable.
277
transports: ["websocket"],
278
rememberUpgrade: true,
279
280
// nodejs specific for project/compute server in some settings
281
rejectUnauthorized: false,
282
283
reconnection: true,
284
reconnectionDelay: process.env.COCALC_TEST_MODE ? 50 : 500,
285
reconnectionDelayMax: process.env.COCALC_TEST_MODE ? 500 : 15000,
286
reconnectionAttempts: 9999999999, // infinite
287
};
288
289
type State = "disconnected" | "connected" | "closed";
290
291
const logger = getLogger("core/client");
292
293
interface Options {
294
// address = the address of a cocalc server, including the base url, e.g.,
295
//
296
// https://cocalc.com
297
//
298
// or for a dev server running locally with a base url:
299
//
300
// http://localhost:4043/3fa218e5-7196-4020-8b30-e2127847cc4f/port/5002
301
//
302
// The socketio path is always /conat (after the base url) and is set automatically.
303
//
304
address?: string;
305
inboxPrefix?: string;
306
systemAccountPassword?: string;
307
}
308
309
export type ClientOptions = Options & {
310
noCache?: boolean;
311
} & Partial<SocketOptions> &
312
Partial<ManagerOptions>;
313
314
const INBOX_PREFIX = "_INBOX";
315
const REPLY_HEADER = "CN-Reply";
316
const MAX_HEADER_SIZE = 100000;
317
318
const STATS_LOOP = 5000;
319
320
// fairly long since this is to avoid leaks, not for responsiveness in the UI.
321
export const DEFAULT_SUBSCRIPTION_TIMEOUT = 60_000;
322
323
// long so servers don't get DOS's on startup, etc. Also, we use interest-based
324
// checks when publish and request fail, so we're not depending on these to
325
// fail as part of the normal startup process for anything.
326
export let DEFAULT_REQUEST_TIMEOUT = 30_000;
327
export let DEFAULT_PUBLISH_TIMEOUT = 30_000;
328
329
export function setDefaultTimeouts({
330
request = DEFAULT_REQUEST_TIMEOUT,
331
publish = DEFAULT_PUBLISH_TIMEOUT,
332
}: {
333
request?: number;
334
publish?: number;
335
}) {
336
DEFAULT_REQUEST_TIMEOUT = request;
337
DEFAULT_PUBLISH_TIMEOUT = publish;
338
}
339
340
export enum DataEncoding {
341
MsgPack = 0,
342
JsonCodec = 1,
343
}
344
345
interface SubscriptionOptions {
346
maxWait?: number;
347
mesgLimit?: number;
348
queue?: string;
349
// sticky: when a choice from a queue group is made, the same choice is always made
350
// in the future for any message with subject matching subject with the last segment
351
// replaced by a *, until the target for the choice goes away. Setting this just
352
// sets the queue option to the constant string STICKY_QUEUE_GROUP.
353
//
354
// Examples of two subjects matching "except possibly last segments" are
355
// - foo.bar.lJcBSieLn
356
// - foo.bar.ZzsDC376ge
357
//
358
// You can put anything random in the last segment and all messages
359
// that match foo.bar.* get the same choice from the queue group.
360
// The idea is that *when* the message with subject foo.bar.lJcBSieLn gets
361
// sent, the backend server selects a target from the queue group to receive
362
// that message. It remembers the choice, and so long as that target is subscribed,
363
// it sends any message matching foo.bar.* to that same target.
364
// This is used in our implementation of persistent socket connections that
365
// are built on pub/sub.
366
367
// The underlying implementation uses (1) consistent hashing and (2) a stream
368
// to sync state of the servers.
369
//
370
// If the members of the sticky queue group have been stable for a while (e.g., a minute)
371
// then all servers in a local cluster have the same list of subscribers in the sticky
372
// queue group, and using consistent hashing any server will make the same choice of
373
// where to route messages. If the members of the sticky queue group are dynamically changing,
374
// it is possible for an inconsistent choice to be made. If this happens, it will be fixed
375
// within a few seconds. During that time, it's possible a virtual conat socket
376
// connection could get created then destroyed a few seconds laters, due to the sticky
377
// assignment changing.
378
//
379
// The following isn't implemented yet -- one idea. Another idea would be the stickiness
380
// is local to a cluster. Not sure!
381
// Regarding a *supercluster*, if no choice has already been made in any cluster for
382
// a given subject (except last segment), then a choice is made using consistent hashing
383
// from the subscribers in the *nearest* cluster that has at least one subscriber.
384
// If choices are made simultaneously across the supercluster, then it is likely that
385
// they are inconsistent. As soon as these choices are visible, they are resolved, with
386
// the tie broken using lexicographic sort of the "socketio room" (this is a
387
// random string that is used to send messages to a subscriber).
388
389
sticky?: boolean;
390
respond?: Function;
391
// timeout to create the subscription -- this may wait *until* you connect before
392
// it starts ticking.
393
timeout?: number;
394
}
395
396
// WARNING! This is the default and you can't just change it!
397
// Yes, for specific messages you can, but in general DO NOT. The reason is because, e.g.,
398
// JSON will turn Dates into strings, and we no longer fix that. So unless you modify the
399
// JsonCodec to handle Date's properly, don't change this!!
400
const DEFAULT_ENCODING = DataEncoding.MsgPack;
401
402
function cocalcServerToSocketioAddress(url: string): {
403
address: string;
404
path: string;
405
} {
406
const u = new URL(url, "http://dummy.org");
407
const address = u.origin;
408
const path = join(u.pathname, "conat");
409
return { address, path };
410
}
411
412
const cache = refCacheSync<ClientOptions, Client>({
413
name: "conat-client",
414
createObject: (opts: ClientOptions) => {
415
return new Client(opts);
416
},
417
});
418
419
export function connect(opts: ClientOptions = {}) {
420
//console.trace("connect", opts);
421
if (!opts.address) {
422
const x = cache.one();
423
if (x != null) {
424
return x;
425
}
426
}
427
return cache(opts);
428
}
429
430
// Get any cached client, if there is one; otherwise make one
431
// with default options.
432
export function getClient() {
433
return cache.one() ?? connect();
434
}
435
436
export class Client extends EventEmitter {
437
public conn: ReturnType<typeof connectToSocketIO>;
438
// queueGroups is a map from subject to the queue group for the subscription to that subject
439
private queueGroups: { [subject: string]: string } = {};
440
private subs: { [subject: string]: SubscriptionEmitter } = {};
441
private sockets: {
442
// all socket servers created using this Client
443
servers: { [subject: string]: ConatSocketServer };
444
// all client connections created using this Client.
445
clients: { [subject: string]: { [id: string]: ConatSocketClient } };
446
} = { servers: {}, clients: {} };
447
public readonly options: ClientOptions;
448
private inboxSubject: string;
449
private inbox?: EventEmitter;
450
private permissionError = {
451
pub: new TTL<string, string>({ ttl: 1000 * 60 }),
452
sub: new TTL<string, string>({ ttl: 1000 * 60 }),
453
};
454
public info: ServerInfo | undefined = undefined;
455
// total number of
456
public readonly stats: ConnectionStats & {
457
recv0: { messages: number; bytes: number };
458
} = {
459
send: { messages: 0, bytes: 0 },
460
recv: { messages: 0, bytes: 0 },
461
// recv0 = count since last connect
462
recv0: { messages: 0, bytes: 0 },
463
subs: 0,
464
};
465
466
public readonly id: string = randomId();
467
public state: State = "disconnected";
468
469
constructor(options: ClientOptions) {
470
super();
471
if (!options.address) {
472
if (!process.env.CONAT_SERVER) {
473
throw Error(
474
"Must specificy address or set CONAT_SERVER environment variable",
475
);
476
}
477
options = { ...options, address: process.env.CONAT_SERVER };
478
}
479
this.options = options;
480
this.setMaxListeners(1000);
481
482
// for socket.io the address has no base url
483
const { address, path } = cocalcServerToSocketioAddress(
484
this.options.address!,
485
);
486
logger.debug(`Conat: Connecting to ${this.options.address}...`);
487
// if (options.extraHeaders == null) {
488
// console.trace("WARNING: no auth set");
489
// }
490
this.conn = connectToSocketIO(address, {
491
...DEFAULT_SOCKETIO_CLIENT_OPTIONS,
492
// it is necessary to manually managed reconnects due to a bugs
493
// in socketio that has stumped their devs
494
// -- https://github.com/socketio/socket.io/issues/5197
495
// So no matter what options are set, we never use socketio's
496
// reconnection logic. if options.reconnection is true or
497
// not given, then we implement (in this file) reconnect ourselves.
498
// The browser frontend explicit sets options.reconnection false
499
// and uses its own logic.
500
...options,
501
...(options.systemAccountPassword
502
? {
503
extraHeaders: {
504
...options.extraHeaders,
505
Cookie: `sys=${options.systemAccountPassword}`,
506
},
507
}
508
: undefined),
509
path,
510
reconnection: true,
511
});
512
513
this.conn.on("info", (info, ack) => {
514
if (typeof ack == "function") {
515
ack();
516
}
517
const firstTime = this.info == null;
518
this.info = info;
519
this.emit("info", info);
520
setTimeout(this.syncSubscriptions, firstTime ? 3000 : 0);
521
});
522
this.conn.on("permission", ({ message, type, subject }) => {
523
logger.debug(message);
524
this.permissionError[type]?.set(subject, message);
525
});
526
this.conn.on("connect", async () => {
527
logger.debug(`Conat: Connected to ${this.options.address}`);
528
if (this.conn.connected) {
529
this.setState("connected");
530
}
531
});
532
this.conn.io.on("error", (...args) => {
533
logger.debug(
534
`Conat: Error connecting to ${this.options.address} -- `,
535
...args,
536
);
537
});
538
this.conn.on("disconnect", async () => {
539
if (this.isClosed()) {
540
return;
541
}
542
this.stats.recv0 = { messages: 0, bytes: 0 }; // reset on disconnect
543
this.setState("disconnected");
544
this.disconnectAllSockets();
545
});
546
this.conn.io.connect();
547
this.initInbox();
548
this.statsLoop();
549
}
550
551
cluster = async () => {
552
return await this.conn.timeout(10000).emitWithAck("cluster");
553
};
554
555
disconnect = () => {
556
if (this.isClosed()) {
557
return;
558
}
559
this.disconnectAllSockets();
560
// @ts-ignore
561
setTimeout(() => this.conn.io.disconnect(), 1);
562
};
563
564
// this has NO timeout by default
565
waitUntilSignedIn = reuseInFlight(
566
async ({ timeout }: { timeout?: number } = {}) => {
567
// not "signed in" if --
568
// - not connected, or
569
// - no info at all (which gets sent on sign in)
570
// - or the user is {error:....}, which is what happens when sign in fails
571
// e.g., do to an expired cookie
572
if (
573
this.info == null ||
574
this.state != "connected" ||
575
this.info?.user?.error
576
) {
577
await once(this, "info", timeout);
578
}
579
if (
580
this.info == null ||
581
this.state != "connected" ||
582
this.info?.user?.error
583
) {
584
throw Error("failed to sign in");
585
}
586
},
587
);
588
589
private statsLoop = async () => {
590
await until(
591
async () => {
592
if (this.isClosed()) {
593
return true;
594
}
595
try {
596
await this.waitUntilConnected();
597
if (this.isClosed()) {
598
return true;
599
}
600
this.conn.emit("stats", { recv0: this.stats.recv0 });
601
} catch {}
602
return false;
603
},
604
{ start: STATS_LOOP, max: STATS_LOOP },
605
);
606
};
607
608
interest = async (subject: string): Promise<boolean> => {
609
return await this.waitForInterest(subject, { timeout: 0 });
610
};
611
612
waitForInterest = async (
613
subject: string,
614
{
615
timeout = MAX_INTEREST_TIMEOUT,
616
}: {
617
timeout?: number;
618
} = {},
619
) => {
620
if (!isValidSubjectWithoutWildcards(subject)) {
621
throw Error(
622
`subject ${subject} must be a valid subject without wildcards`,
623
);
624
}
625
timeout = Math.min(timeout, MAX_INTEREST_TIMEOUT);
626
try {
627
const response = await this.conn
628
.timeout(timeout ? timeout : 10000)
629
.emitWithAck("wait-for-interest", { subject, timeout });
630
return response;
631
} catch (err) {
632
throw toConatError(err);
633
}
634
};
635
636
recvStats = (bytes: number) => {
637
this.stats.recv.messages += 1;
638
this.stats.recv.bytes += bytes;
639
this.stats.recv0.messages += 1;
640
this.stats.recv0.bytes += bytes;
641
};
642
643
// There should usually be no reason to call this because socket.io
644
// is so good at abstracting this away. It's useful for unit testing.
645
waitUntilConnected = reuseInFlight(async () => {
646
if (this.conn.connected) {
647
return;
648
}
649
// @ts-ignore
650
await once(this.conn, "connect");
651
});
652
653
waitUntilReady = reuseInFlight(async () => {
654
await this.waitUntilSignedIn();
655
await this.waitUntilConnected();
656
});
657
658
private setState = (state: State) => {
659
if (this.isClosed() || this.state == state) {
660
return;
661
}
662
this.state = state;
663
this.emit(state);
664
};
665
666
private temporaryInboxSubject = () => {
667
if (!this.inboxSubject) {
668
throw Error("inbox not setup properly");
669
}
670
return `${this.inboxSubject}.${randomId()}`;
671
};
672
673
private getInbox = reuseInFlight(async (): Promise<EventEmitter> => {
674
if (this.inbox == null) {
675
if (this.isClosed()) {
676
throw Error("closed");
677
}
678
await once(this, "inbox");
679
}
680
if (this.inbox == null) {
681
throw Error("bug");
682
}
683
return this.inbox;
684
});
685
686
private initInbox = async () => {
687
// For request/respond instead of setting up one
688
// inbox *every time there is a request*, we setup a single
689
// inbox once and for all for all responses. We listen for
690
// everything to inbox...Prefix.* and emit it via this.inbox.
691
// The request sender then listens on this.inbox for the response.
692
// We *could* use a regular subscription for each request,
693
// but (1) that massively increases the load on the server for
694
// every single request (having to create and destroy subscriptions)
695
// and (2) there is a race condition between creating that subscription
696
// and getting the response; it's fine with one server, but with
697
// multiple servers solving the race condition would slow everything down
698
// due to having to wait for so many acknowledgements. Instead, we
699
// remove all those problems by just using a single inbox subscription.
700
const inboxPrefix = this.options.inboxPrefix ?? INBOX_PREFIX;
701
if (!inboxPrefix.startsWith(INBOX_PREFIX)) {
702
throw Error(`custom inboxPrefix must start with '${INBOX_PREFIX}'`);
703
}
704
this.inboxSubject = `${inboxPrefix}.${randomId()}`;
705
let sub;
706
await until(
707
async () => {
708
try {
709
await this.waitUntilSignedIn();
710
sub = await this.subscribe(this.inboxSubject + ".*");
711
return true;
712
} catch (err) {
713
if (this.isClosed()) {
714
return true;
715
}
716
// this should only fail due to permissions issues, at which point
717
// request can't work, but pub/sub can.
718
if (!process.env.COCALC_TEST_MODE) {
719
console.log(`WARNING: inbox not available -- ${err}`);
720
}
721
}
722
return false;
723
},
724
{ start: 1000, max: 15000 },
725
);
726
if (this.isClosed()) {
727
return;
728
}
729
730
this.inbox = new EventEmitter();
731
(async () => {
732
for await (const mesg of sub) {
733
if (this.inbox == null) {
734
return;
735
}
736
this.inbox.emit(mesg.subject, mesg);
737
}
738
})();
739
this.emit("inbox", this.inboxSubject);
740
};
741
742
private isClosed = () => {
743
return this.state == "closed";
744
};
745
746
close = () => {
747
if (this.isClosed()) {
748
return;
749
}
750
this.setState("closed");
751
this.removeAllListeners();
752
this.closeAllSockets();
753
// @ts-ignore
754
delete this.sockets;
755
for (const subject in this.queueGroups) {
756
this.conn.emit("unsubscribe", { subject });
757
delete this.queueGroups[subject];
758
}
759
for (const sub of Object.values(this.subs)) {
760
sub.refCount = 0;
761
sub.close();
762
// @ts-ignore
763
delete this.subs;
764
}
765
// @ts-ignore
766
delete this.queueGroups;
767
// @ts-ignore
768
delete this.inboxSubject;
769
delete this.inbox;
770
// @ts-ignore
771
delete this.options;
772
// @ts-ignore
773
delete this.info;
774
// @ts-ignore
775
delete this.permissionError;
776
777
try {
778
this.conn.close();
779
} catch {}
780
};
781
782
private syncSubscriptions = reuseInFlight(async () => {
783
let fails = 0;
784
await until(
785
async () => {
786
if (this.isClosed()) return true;
787
try {
788
if (this.info == null) {
789
// no point in trying until we are signed in and connected
790
await once(this, "info");
791
}
792
if (this.isClosed()) return true;
793
await this.waitUntilConnected();
794
if (this.isClosed()) return true;
795
const stable = await this.syncSubscriptions0(10000);
796
if (stable) {
797
return true;
798
}
799
} catch (err) {
800
fails++;
801
if (fails >= 3) {
802
console.log(
803
`WARNING: failed to sync subscriptions ${fails} times -- ${err}`,
804
);
805
}
806
}
807
return false;
808
},
809
{ start: 1000, max: 15000 },
810
);
811
});
812
813
// syncSubscriptions0 ensures that we're subscribed on server
814
// to what we think we're subscribed to, or throws an error.
815
private syncSubscriptions0 = async (timeout: number): Promise<boolean> => {
816
if (this.isClosed()) return true;
817
if (this.info == null) {
818
throw Error("not signed in");
819
}
820
const subs = await this.getSubscriptions(timeout);
821
// console.log("syncSubscriptions", {
822
// server: subs,
823
// client: Object.keys(this.queueGroups),
824
// });
825
const missing: { subject: string; queue: string }[] = [];
826
for (const subject in this.queueGroups) {
827
// subscribe on backend to all subscriptions we think we should have that
828
// the server does not have
829
if (!subs.has(subject)) {
830
missing.push({
831
subject,
832
queue: this.queueGroups[subject],
833
});
834
}
835
}
836
let stable = true;
837
if (missing.length > 0) {
838
stable = false;
839
const resp = await this.conn
840
.timeout(timeout)
841
.emitWithAck("subscribe", missing);
842
// some subscription could fail due to permissions changes, e.g., user got
843
// removed from a project.
844
for (let i = 0; i < missing.length; i++) {
845
if (resp[i].error) {
846
const sub = this.subs[missing[i].subject];
847
if (sub != null) {
848
sub.close(true);
849
}
850
}
851
}
852
}
853
const extra: { subject: string }[] = [];
854
for (const subject in subs) {
855
if (this.queueGroups[subject] != null) {
856
// server thinks we're subscribed but we do not think so, so cancel that
857
extra.push({ subject });
858
}
859
}
860
if (extra.length > 0) {
861
await this.conn.timeout(timeout).emitWithAck("unsubscribe", extra);
862
stable = false;
863
}
864
return stable;
865
};
866
867
numSubscriptions = () => Object.keys(this.queueGroups).length;
868
869
private getSubscriptions = async (
870
timeout = DEFAULT_REQUEST_TIMEOUT,
871
): Promise<Set<string>> => {
872
const subs = await this.conn
873
.timeout(timeout)
874
.emitWithAck("subscriptions", null);
875
return new Set(subs);
876
};
877
878
// returns EventEmitter that emits 'message', mesg: Message
879
private subscriptionEmitter = (
880
subject: string,
881
{
882
closeWhenOffCalled,
883
queue,
884
sticky,
885
confirm,
886
timeout,
887
}: {
888
// if true, when the off method of the event emitter is called, then
889
// the entire subscription is closed. This is very useful when we wrap the
890
// EvenEmitter in an async iterator.
891
closeWhenOffCalled?: boolean;
892
893
// the queue group -- if not given, then one is randomly assigned.
894
queue?: string;
895
896
// if true, sets queue to "sticky"
897
sticky?: boolean;
898
899
// confirm -- get confirmation back from server that subscription was created
900
confirm?: boolean;
901
902
// how long to wait to confirm creation of the subscription;
903
// only explicitly *used* when confirm=true, but always must be set.
904
timeout?: number;
905
} = {},
906
): { sub: SubscriptionEmitter; promise? } => {
907
// Having timeout set at all is absolutely critical because if the connection
908
// goes down while making the subscription, having some timeout causes
909
// socketio to throw an error, which avoids a huge potential subscription
910
// leak. We set this by default to DEFAULT_SUBSCRIPTION_TIMEOUT.
911
if (!timeout) {
912
timeout = DEFAULT_SUBSCRIPTION_TIMEOUT;
913
}
914
if (this.isClosed()) {
915
throw Error("closed");
916
}
917
if (!isValidSubject(subject)) {
918
throw Error(`invalid subscribe subject '${subject}'`);
919
}
920
if (this.permissionError.sub.has(subject)) {
921
const message = this.permissionError.sub.get(subject)!;
922
logger.debug(message);
923
throw new ConatError(message, { code: 403 });
924
}
925
if (sticky) {
926
if (queue) {
927
throw Error("must not specify queue group if sticky is true");
928
}
929
queue = STICKY_QUEUE_GROUP;
930
}
931
let sub = this.subs[subject];
932
if (sub != null) {
933
if (queue && this.queueGroups[subject] != queue) {
934
throw Error(
935
`client can only have one queue group subscription for a given subject -- subject='${subject}', queue='${queue}'`,
936
);
937
}
938
if (queue == STICKY_QUEUE_GROUP) {
939
throw Error(
940
`can only have one sticky subscription with given subject pattern per client -- subject='${subject}'`,
941
);
942
}
943
sub.refCount += 1;
944
return { sub, promise: undefined };
945
}
946
if (this.queueGroups[subject] != null) {
947
throw Error(`already subscribed to '${subject}'`);
948
}
949
if (!queue) {
950
queue = randomId();
951
}
952
this.queueGroups[subject] = queue;
953
sub = new SubscriptionEmitter({
954
client: this,
955
subject,
956
closeWhenOffCalled,
957
});
958
this.subs[subject] = sub;
959
this.stats.subs++;
960
let promise;
961
if (confirm) {
962
const f = async () => {
963
let response;
964
try {
965
if (timeout) {
966
response = await this.conn
967
.timeout(timeout)
968
.emitWithAck("subscribe", { subject, queue });
969
} else {
970
// this should never be used -- see above
971
response = await this.conn.emitWithAck("subscribe", {
972
subject,
973
queue,
974
});
975
}
976
} catch (err) {
977
throw toConatError(err);
978
}
979
if (response?.error) {
980
throw new ConatError(response.error, { code: response.code });
981
}
982
return response;
983
};
984
promise = f();
985
} else {
986
this.conn.emit("subscribe", { subject, queue });
987
promise = undefined;
988
}
989
sub.once("closed", () => {
990
if (this.isClosed()) {
991
return;
992
}
993
this.conn.emit("unsubscribe", { subject });
994
delete this.queueGroups[subject];
995
if (this.subs[subject] != null) {
996
this.stats.subs--;
997
delete this.subs[subject];
998
}
999
});
1000
return { sub, promise };
1001
};
1002
1003
private subscriptionIterator = (
1004
sub,
1005
opts?: SubscriptionOptions,
1006
): Subscription => {
1007
// @ts-ignore
1008
const iter = new EventIterator<Message>(sub, "message", {
1009
idle: opts?.maxWait,
1010
limit: opts?.mesgLimit,
1011
map: (args) => args[0],
1012
});
1013
return iter;
1014
};
1015
1016
subscribeSync = (
1017
subject: string,
1018
opts?: SubscriptionOptions,
1019
): Subscription => {
1020
const { sub } = this.subscriptionEmitter(subject, {
1021
confirm: false,
1022
closeWhenOffCalled: true,
1023
sticky: opts?.sticky,
1024
queue: opts?.queue,
1025
});
1026
return this.subscriptionIterator(sub, opts);
1027
};
1028
1029
subscribe = async (
1030
subject: string,
1031
opts?: SubscriptionOptions,
1032
): Promise<Subscription> => {
1033
await this.waitUntilSignedIn();
1034
const { sub, promise } = this.subscriptionEmitter(subject, {
1035
confirm: true,
1036
closeWhenOffCalled: true,
1037
queue: opts?.queue,
1038
sticky: opts?.sticky,
1039
timeout: opts?.timeout,
1040
});
1041
try {
1042
await promise;
1043
} catch (err) {
1044
sub.close();
1045
throw err;
1046
}
1047
return this.subscriptionIterator(sub, opts);
1048
};
1049
1050
sub = this.subscribe;
1051
1052
/*
1053
A service is a subscription with a function to respond to requests by name.
1054
Call service with an implementation:
1055
1056
service = await client1.service('arith', {mul : async (a,b)=>{a*b}, add : async (a,b)=>a+b})
1057
1058
Use the service:
1059
1060
arith = await client2.call('arith')
1061
await arith.mul(2,3)
1062
await arith.add(2,3)
1063
1064
There's by default a single queue group '0', so if you create multiple services on various
1065
computers, then requests are load balanced across them automatically. Explicitly set
1066
a random queue group (or something else) and use callMany if you don't want this behavior.
1067
1068
Close the service when done:
1069
1070
service.close();
1071
1072
See backend/conat/test/core/services.test.ts for a tested and working example
1073
that involves typescript and shows how to use wildcard subjects and get the
1074
specific subject used for a call by using that this is bound to the calling mesg.
1075
*/
1076
service: <T = any>(
1077
subject: string,
1078
impl: T,
1079
opts?: SubscriptionOptions,
1080
) => Promise<Subscription> = async (subject, impl, opts) => {
1081
const sub = await this.subscribe(subject, {
1082
...opts,
1083
queue: opts?.queue ?? "0",
1084
});
1085
const respond = async (mesg: Message) => {
1086
try {
1087
const [name, args] = mesg.data;
1088
// call impl[name], but with 'this' set to the object {subject:...},
1089
// so inside the service, it is possible to know what subject was used
1090
// in the request, in case subject is a wildcard subject.
1091
// const result = await impl[name].apply(
1092
// { subject: mesg.subject },
1093
// ...args,
1094
// );
1095
// const result = await impl[name].apply(
1096
// { subject: mesg.subject },
1097
// ...args,
1098
// );
1099
// mesg.respondSync(result);
1100
let f = impl[name];
1101
if (f == null) {
1102
throw Error(`${name} not defined`);
1103
}
1104
const result = await f.apply(mesg, args);
1105
// use await mesg.respond so waitForInterest is on, which is almost always
1106
// good for services.
1107
await mesg.respond(result);
1108
} catch (err) {
1109
await mesg.respond(null, {
1110
noThrow: true, // we're not catching this one
1111
headers: { error: `${err}` },
1112
});
1113
}
1114
};
1115
const loop = async () => {
1116
// todo -- param to set max number of responses at once.
1117
for await (const mesg of sub) {
1118
respond(mesg);
1119
}
1120
};
1121
loop();
1122
return sub;
1123
};
1124
1125
// Call a service as defined above.
1126
call<T = any>(subject: string, opts?: PublishOptions): T {
1127
const call = async (name: string, args: any[]) => {
1128
const resp = await this.request(subject, [name, args], opts);
1129
if (resp.headers?.error) {
1130
throw Error(`${resp.headers.error}`);
1131
} else {
1132
return resp.data;
1133
}
1134
};
1135
1136
return new Proxy(
1137
{},
1138
{
1139
get: (_, name) => {
1140
if (typeof name !== "string") {
1141
return undefined;
1142
}
1143
return async (...args) => await call(name, args);
1144
},
1145
},
1146
) as T;
1147
}
1148
1149
callMany<T = any>(subject: string, opts?: RequestManyOptions): T {
1150
const maxWait = opts?.maxWait ? opts?.maxWait : DEFAULT_REQUEST_TIMEOUT;
1151
const self = this;
1152
async function* callMany(name: string, args: any[]) {
1153
const sub = await self.requestMany(subject, [name, args], {
1154
...opts,
1155
maxWait,
1156
});
1157
for await (const resp of sub) {
1158
if (resp.headers?.error) {
1159
yield new ConatError(`${resp.headers.error}`, {
1160
code: resp.headers.code,
1161
});
1162
} else {
1163
yield resp.data;
1164
}
1165
}
1166
}
1167
1168
return new Proxy(
1169
{},
1170
{
1171
get: (_, name) => {
1172
if (typeof name !== "string") {
1173
return undefined;
1174
}
1175
return async (...args) => await callMany(name, args);
1176
},
1177
},
1178
) as T;
1179
}
1180
1181
publishSync = (
1182
subject: string,
1183
mesg,
1184
opts?: PublishOptions,
1185
): { bytes: number } => {
1186
if (this.isClosed()) {
1187
// already closed
1188
return { bytes: 0 };
1189
}
1190
// must NOT confirm
1191
return this._publish(subject, mesg, { ...opts, confirm: false });
1192
};
1193
1194
publish = async (
1195
subject: string,
1196
mesg,
1197
opts: PublishOptions = {},
1198
): Promise<{
1199
// bytes encoded (doesn't count some extra wrapping)
1200
bytes: number;
1201
// count is the number of matching subscriptions
1202
// that the server *sent* this message to since the server knows about them.
1203
// However, there's no guaranteee that the subscribers actually exist
1204
// **right now** or received these messages.
1205
count: number;
1206
}> => {
1207
if (this.isClosed()) {
1208
// already closed
1209
return { bytes: 0, count: 0 };
1210
}
1211
await this.waitUntilSignedIn();
1212
const start = Date.now();
1213
const { bytes, getCount, promise } = this._publish(subject, mesg, {
1214
...opts,
1215
confirm: true,
1216
});
1217
await promise;
1218
let count = getCount?.()!;
1219
1220
if (
1221
opts.waitForInterest &&
1222
count != null &&
1223
count == 0 &&
1224
!this.isClosed() &&
1225
(opts.timeout == null || Date.now() - start <= opts.timeout)
1226
) {
1227
let timeout = opts.timeout ?? DEFAULT_WAIT_FOR_INTEREST_TIMEOUT;
1228
await this.waitForInterest(subject, {
1229
timeout: timeout ? timeout - (Date.now() - start) : undefined,
1230
});
1231
if (this.isClosed()) {
1232
return { bytes, count };
1233
}
1234
const elapsed = Date.now() - start;
1235
timeout -= elapsed;
1236
// client and there is interest
1237
if (timeout <= 500) {
1238
// but... not enough time left to try again even if there is interest,
1239
// i.e., will fail anyways due to network latency
1240
return { bytes, count };
1241
}
1242
const { getCount, promise } = this._publish(subject, mesg, {
1243
...opts,
1244
timeout,
1245
confirm: true,
1246
});
1247
await promise;
1248
count = getCount?.()!;
1249
}
1250
1251
return { bytes, count };
1252
};
1253
1254
private _publish = (
1255
subject: string,
1256
mesg,
1257
{
1258
headers,
1259
raw,
1260
encoding = DEFAULT_ENCODING,
1261
confirm,
1262
timeout = DEFAULT_PUBLISH_TIMEOUT,
1263
noThrow,
1264
}: PublishOptions & { confirm?: boolean } = {},
1265
) => {
1266
if (this.isClosed()) {
1267
return { bytes: 0 };
1268
}
1269
if (!isValidSubjectWithoutWildcards(subject)) {
1270
throw Error(`invalid publish subject ${subject}`);
1271
}
1272
if (this.permissionError.pub.has(subject)) {
1273
const message = this.permissionError.pub.get(subject)!;
1274
logger.debug(message);
1275
throw new ConatError(message, { code: 403 });
1276
}
1277
raw = raw ?? encode({ encoding, mesg });
1278
this.stats.send.messages += 1;
1279
this.stats.send.bytes += raw.length;
1280
1281
// default to 1MB is safe since it's at least that big.
1282
const chunkSize = Math.max(
1283
1000,
1284
(this.info?.max_payload ?? 1e6) - MAX_HEADER_SIZE,
1285
);
1286
let seq = 0;
1287
let id = randomId();
1288
const promises: any[] = [];
1289
let count = 0;
1290
for (let i = 0; i < raw.length; i += chunkSize) {
1291
// !!FOR TESTING ONLY!!
1292
// if (Math.random() <= 0.01) {
1293
// console.log("simulating a chunk drop", { subject, seq });
1294
// seq += 1;
1295
// continue;
1296
// }
1297
const done = i + chunkSize >= raw.length ? 1 : 0;
1298
const v: any[] = [
1299
subject,
1300
id,
1301
seq,
1302
done,
1303
encoding,
1304
raw.slice(i, i + chunkSize),
1305
// position v[6] is used for clusters
1306
];
1307
if (done && headers) {
1308
v.push(headers);
1309
}
1310
if (confirm) {
1311
const f = async () => {
1312
if (timeout) {
1313
try {
1314
const response = await this.conn
1315
.timeout(timeout)
1316
.emitWithAck("publish", v);
1317
if (response?.error) {
1318
throw new ConatError(response.error, { code: response.code });
1319
} else {
1320
return response;
1321
}
1322
} catch (err) {
1323
throw toConatError(err);
1324
}
1325
} else {
1326
return await this.conn.emitWithAck("publish", v);
1327
}
1328
};
1329
const promise = (async () => {
1330
try {
1331
const response = await f();
1332
count = Math.max(count, response.count ?? 0);
1333
} catch (err) {
1334
if (!noThrow) {
1335
throw err;
1336
}
1337
}
1338
})();
1339
promises.push(promise);
1340
} else {
1341
this.conn.emit("publish", v);
1342
}
1343
seq += 1;
1344
}
1345
if (confirm) {
1346
return {
1347
bytes: raw.length,
1348
getCount: () => count,
1349
promise: Promise.all(promises),
1350
};
1351
}
1352
return { bytes: raw.length };
1353
};
1354
1355
pub = this.publish;
1356
1357
request = async (
1358
subject: string,
1359
mesg: any,
1360
{ timeout = DEFAULT_REQUEST_TIMEOUT, ...options }: PublishOptions = {},
1361
): Promise<Message> => {
1362
if (timeout <= 0) {
1363
throw Error("timeout must be positive");
1364
}
1365
const inbox = await this.getInbox();
1366
const inboxSubject = this.temporaryInboxSubject();
1367
const sub = new EventIterator<Message>(inbox, inboxSubject, {
1368
idle: timeout,
1369
limit: 1,
1370
map: (args) => args[0],
1371
});
1372
1373
const opts = {
1374
...options,
1375
timeout,
1376
headers: { ...options?.headers, [REPLY_HEADER]: inboxSubject },
1377
};
1378
const { count } = await this.publish(subject, mesg, opts);
1379
if (!count) {
1380
sub.stop();
1381
// if you hit this, consider using the option waitForInterest:true
1382
throw new ConatError(`request -- no subscribers matching '${subject}'`, {
1383
code: 503,
1384
});
1385
}
1386
for await (const resp of sub) {
1387
sub.stop();
1388
return resp;
1389
}
1390
sub.stop();
1391
throw new ConatError("timeout", { code: 408 });
1392
};
1393
1394
// NOTE: Using requestMany returns a Subscription sub, and
1395
// you can call sub.close(). However, the sender doesn't
1396
// know that this happened and the messages are still going
1397
// to your inbox. Similarly if you set a maxWait, the
1398
// subscription just ends at that point, but the server
1399
// sending messages doesn't know. This is a shortcoming the
1400
// pub/sub model. You must decide entirely based on your
1401
// own application protocol how to terminate.
1402
requestMany = async (
1403
subject: string,
1404
mesg: any,
1405
{ maxMessages, maxWait, ...options }: RequestManyOptions = {},
1406
): Promise<Subscription> => {
1407
if (maxMessages != null && maxMessages <= 0) {
1408
throw Error("maxMessages must be positive");
1409
}
1410
if (maxWait != null && maxWait <= 0) {
1411
throw Error("maxWait must be positive");
1412
}
1413
const inbox = await this.getInbox();
1414
const inboxSubject = this.temporaryInboxSubject();
1415
const sub = new EventIterator<Message>(inbox, inboxSubject, {
1416
idle: maxWait,
1417
limit: maxMessages,
1418
map: (args) => args[0],
1419
});
1420
const { count } = await this.publish(subject, mesg, {
1421
...options,
1422
headers: { ...options?.headers, [REPLY_HEADER]: inboxSubject },
1423
});
1424
if (!count) {
1425
sub.stop();
1426
throw new ConatError(
1427
`requestMany -- no subscribers matching ${subject}`,
1428
{ code: 503 },
1429
);
1430
}
1431
return sub;
1432
};
1433
1434
// watch: this is mainly for debugging and interactive use.
1435
watch = (
1436
subject: string,
1437
cb = (x) => console.log(`${new Date()}: ${x.subject}:`, x.data, x.headers),
1438
opts?: SubscriptionOptions,
1439
) => {
1440
const sub = this.subscribeSync(subject, opts);
1441
const f = async () => {
1442
for await (const x of sub) {
1443
cb(x);
1444
}
1445
};
1446
f();
1447
return sub;
1448
};
1449
1450
sync = {
1451
dkv: async <T,>(opts: DKVOptions): Promise<DKV<T>> =>
1452
await dkv<T>({ ...opts, client: this }),
1453
akv: <T,>(opts: DKVOptions): AKV<T> => akv<T>({ ...opts, client: this }),
1454
dko: async <T,>(opts: DKVOptions): Promise<DKO<T>> =>
1455
await dko<T>({ ...opts, client: this }),
1456
dstream: async <T,>(opts: DStreamOptions): Promise<DStream<T>> =>
1457
await dstream<T>({ ...opts, client: this }),
1458
astream: <T,>(opts: DStreamOptions): AStream<T> =>
1459
astream<T>({ ...opts, client: this }),
1460
synctable: async (opts: SyncTableOptions): Promise<ConatSyncTable> =>
1461
await createSyncTable({ ...opts, client: this }),
1462
};
1463
1464
socket = {
1465
listen: (
1466
subject: string,
1467
opts?: SocketConfiguration,
1468
): ConatSocketServer => {
1469
if (this.state == "closed") {
1470
throw Error("closed");
1471
}
1472
if (this.sockets.servers[subject] !== undefined) {
1473
throw Error(
1474
`there can be at most one socket server per client listening on a subject (subject='${subject}')`,
1475
);
1476
}
1477
const server = new ConatSocketServer({
1478
subject,
1479
role: "server",
1480
client: this,
1481
id: this.id,
1482
...opts,
1483
});
1484
this.sockets.servers[subject] = server;
1485
server.once("closed", () => {
1486
delete this.sockets.servers[subject];
1487
});
1488
return server;
1489
},
1490
1491
connect: (
1492
subject: string,
1493
opts?: SocketConfiguration,
1494
): ConatSocketClient => {
1495
if (this.state == "closed") {
1496
throw Error("closed");
1497
}
1498
const id = randomId();
1499
const client = new ConatSocketClient({
1500
subject,
1501
role: "client",
1502
client: this,
1503
id,
1504
...opts,
1505
});
1506
if (this.sockets.clients[subject] === undefined) {
1507
this.sockets.clients[subject] = { [id]: client };
1508
} else {
1509
this.sockets.clients[subject][id] = client;
1510
}
1511
client.once("closed", () => {
1512
const v = this.sockets.clients[subject];
1513
if (v != null) {
1514
delete v[id];
1515
if (isEmpty(v)) {
1516
delete this.sockets.clients[subject];
1517
}
1518
}
1519
});
1520
return client;
1521
},
1522
};
1523
1524
private disconnectAllSockets = () => {
1525
if (this.state == "closed") {
1526
return;
1527
}
1528
for (const subject in this.sockets.servers) {
1529
this.sockets.servers[subject].disconnect();
1530
}
1531
for (const subject in this.sockets.clients) {
1532
for (const id in this.sockets.clients[subject]) {
1533
this.sockets.clients[subject][id].disconnect();
1534
}
1535
}
1536
};
1537
1538
private closeAllSockets = () => {
1539
for (const subject in this.sockets.servers) {
1540
this.sockets.servers[subject].close();
1541
}
1542
for (const subject in this.sockets.clients) {
1543
for (const id in this.sockets.clients[subject]) {
1544
this.sockets.clients[subject][id].close();
1545
}
1546
}
1547
};
1548
1549
message = (mesg, options?) => messageData(mesg, options);
1550
1551
bench = {
1552
publish: async (n: number = 1000, subject = "bench"): Promise<number> => {
1553
const t0 = Date.now();
1554
console.log(`publishing ${n} messages to`, { subject });
1555
for (let i = 0; i < n - 1; i++) {
1556
this.publishSync(subject, null);
1557
}
1558
// then send one final message and wait for an ack.
1559
// since messages are in order, we know that all other
1560
// messages were delivered to the server.
1561
const { count } = await this.publish(subject, null);
1562
console.log("listeners: ", count);
1563
const t1 = Date.now();
1564
const rate = Math.round((n / (t1 - t0)) * 1000);
1565
console.log(rate, "messages per second delivered");
1566
return rate;
1567
},
1568
1569
subscribe: async (n: number = 1000, subject = "bench"): Promise<number> => {
1570
const sub = await this.subscribe(subject);
1571
// send the data
1572
for (let i = 0; i < n; i++) {
1573
this.publishSync(subject, null);
1574
}
1575
const t0 = Date.now();
1576
let i = 0;
1577
for await (const _ of sub) {
1578
i += 1;
1579
if (i >= n) {
1580
break;
1581
}
1582
}
1583
const t1 = Date.now();
1584
return Math.round((n / (t1 - t0)) * 1000);
1585
},
1586
};
1587
}
1588
1589
interface PublishOptions {
1590
headers?: Headers;
1591
// if encoding is given, it specifies the encoding used to encode the message
1592
encoding?: DataEncoding;
1593
// if raw is given, then it is assumed to be the raw binary
1594
// encoded message (using encoding) and any mesg parameter
1595
// is *IGNORED*.
1596
raw?;
1597
1598
// timeout used when publishing a message and awaiting a response.
1599
timeout?: number;
1600
1601
// waitForInterest -- if publishing async so its possible to tell whether or not
1602
// there were any recipients, and there were NO recipients, it will wait until
1603
// there is a recipient and send again. This does NOT use polling, but instead
1604
// uses a cluster aware and fully event based primitive in the server.
1605
// There is thus only a speed penality doing this on failure and never
1606
// on success. Note that waitForInterest always has a timeout, defaulting
1607
// to DEFAULT_WAIT_FOR_INTEREST_TIMEOUT if above timeout not given.
1608
waitForInterest?: boolean;
1609
1610
// noThrow -- if set and publishing would throw an exception, it is
1611
// instead silently dropped and undefined is returned instead.
1612
// Use this where you might want to use publishSync, but still want
1613
// to ensure there is interest; however, it's not important to know
1614
// if there was an error sending.
1615
noThrow?: boolean;
1616
}
1617
1618
interface RequestManyOptions extends PublishOptions {
1619
maxWait?: number;
1620
maxMessages?: number;
1621
}
1622
1623
export function encode({
1624
encoding,
1625
mesg,
1626
}: {
1627
encoding: DataEncoding;
1628
mesg: any;
1629
}) {
1630
if (encoding == DataEncoding.MsgPack) {
1631
return msgpack.encode(mesg, MSGPACK_ENCODER_OPTIONS);
1632
} else if (encoding == DataEncoding.JsonCodec) {
1633
return jsonEncoder(mesg);
1634
} else {
1635
throw Error(`unknown encoding ${encoding}`);
1636
}
1637
}
1638
1639
export function decode({
1640
encoding,
1641
data,
1642
}: {
1643
encoding: DataEncoding;
1644
data;
1645
}): any {
1646
if (encoding == DataEncoding.MsgPack) {
1647
return msgpack.decode(data);
1648
} else if (encoding == DataEncoding.JsonCodec) {
1649
return jsonDecoder(data);
1650
} else {
1651
throw Error(`unknown encoding ${encoding}`);
1652
}
1653
}
1654
1655
let textEncoder: TextEncoder | undefined = undefined;
1656
let textDecoder: TextDecoder | undefined = undefined;
1657
1658
function jsonEncoder(obj: any) {
1659
if (textEncoder === undefined) {
1660
textEncoder = new TextEncoder();
1661
}
1662
return textEncoder.encode(JSON.stringify(obj));
1663
}
1664
1665
function jsonDecoder(data: Buffer): any {
1666
if (textDecoder === undefined) {
1667
textDecoder = new TextDecoder();
1668
}
1669
return JSON.parse(textDecoder.decode(data));
1670
}
1671
1672
interface Chunk {
1673
id: string;
1674
seq: number;
1675
done: number;
1676
buffer: Buffer;
1677
headers?: any;
1678
}
1679
1680
// if an incoming message has chunks at least this old
1681
// we give up on it and discard all of them. This avoids
1682
// memory leaks when a chunk is dropped.
1683
const MAX_CHUNK_TIME = 2 * 60000;
1684
1685
class SubscriptionEmitter extends EventEmitter {
1686
private incoming: { [id: string]: (Partial<Chunk> & { time: number })[] } =
1687
{};
1688
private client: Client;
1689
private closeWhenOffCalled?: boolean;
1690
private subject: string;
1691
public refCount: number = 1;
1692
1693
constructor({ client, subject, closeWhenOffCalled }) {
1694
super();
1695
this.client = client;
1696
this.subject = subject;
1697
this.client.conn.on(subject, this.handle);
1698
this.closeWhenOffCalled = closeWhenOffCalled;
1699
this.dropOldLoop();
1700
}
1701
1702
close = (force?) => {
1703
this.refCount -= 1;
1704
// console.log("SubscriptionEmitter.close - refCount =", this.refCount, this.subject);
1705
if (this.client == null || (!force && this.refCount > 0)) {
1706
return;
1707
}
1708
this.emit("closed");
1709
this.client.conn.removeListener(this.subject, this.handle);
1710
// @ts-ignore
1711
delete this.incoming;
1712
// @ts-ignore
1713
delete this.client;
1714
// @ts-ignore
1715
delete this.subject;
1716
// @ts-ignore
1717
delete this.closeWhenOffCalled;
1718
this.removeAllListeners();
1719
};
1720
1721
off(a, b) {
1722
super.off(a, b);
1723
if (this.closeWhenOffCalled) {
1724
this.close();
1725
}
1726
return this;
1727
}
1728
1729
private handle = ({ subject, data }) => {
1730
if (this.client == null) {
1731
return;
1732
}
1733
const [id, seq, done, encoding, buffer, headers] = data;
1734
// console.log({ id, seq, done, encoding, buffer, headers });
1735
const chunk = { seq, done, encoding, buffer, headers };
1736
const { incoming } = this;
1737
if (incoming[id] == null) {
1738
if (seq != 0) {
1739
// part of a dropped message -- by definition this should just
1740
// silently happen and be handled via application level encodings
1741
// elsewhere
1742
console.log(
1743
`WARNING: drop packet from ${this.subject} -- first message has wrong seq`,
1744
{ seq },
1745
);
1746
return;
1747
}
1748
incoming[id] = [];
1749
} else {
1750
const prev = incoming[id].slice(-1)[0].seq ?? -1;
1751
if (prev + 1 != seq) {
1752
console.log(
1753
`WARNING: drop packet from ${this.subject} -- seq number wrong`,
1754
{ prev, seq },
1755
);
1756
// part of message was dropped -- discard everything
1757
delete incoming[id];
1758
return;
1759
}
1760
}
1761
incoming[id].push({ ...chunk, time: Date.now() });
1762
if (chunk.done) {
1763
// console.log("assembling ", incoming[id].length, "chunks");
1764
const chunks = incoming[id].map((x) => x.buffer!);
1765
// TESTING ONLY!!
1766
// This is not necessary due to the above checks as messages arrive.
1767
// for (let i = 0; i < incoming[id].length; i++) {
1768
// if (incoming[id][i]?.seq != i) {
1769
// console.log(`WARNING: bug -- invalid chunk data! -- ${subject}`);
1770
// throw Error("bug -- invalid chunk data!");
1771
// }
1772
// }
1773
const raw = concatArrayBuffers(chunks);
1774
1775
// TESTING ONLY!!
1776
// try {
1777
// decode({ encoding, data: raw });
1778
// } catch (err) {
1779
// console.log(`ERROR - invalid data ${subject}`, incoming[id], err);
1780
// }
1781
1782
delete incoming[id];
1783
const mesg = new Message({
1784
encoding,
1785
raw,
1786
headers,
1787
client: this.client,
1788
subject,
1789
});
1790
this.emit("message", mesg);
1791
this.client.recvStats(raw.byteLength);
1792
}
1793
};
1794
1795
dropOldLoop = async () => {
1796
while (this.incoming != null) {
1797
const cutoff = Date.now() - MAX_CHUNK_TIME;
1798
for (const id in this.incoming) {
1799
const chunks = this.incoming[id];
1800
if (chunks.length > 0 && chunks[0].time <= cutoff) {
1801
console.log(
1802
`WARNING: drop partial message from ${this.subject} due to timeout`,
1803
);
1804
delete this.incoming[id];
1805
}
1806
}
1807
await delay(MAX_CHUNK_TIME / 2);
1808
}
1809
};
1810
}
1811
1812
function concatArrayBuffers(buffers) {
1813
if (buffers.length == 1) {
1814
return buffers[0];
1815
}
1816
if (Buffer.isBuffer(buffers[0])) {
1817
return Buffer.concat(buffers);
1818
}
1819
// browser fallback
1820
const totalLength = buffers.reduce((sum, buf) => sum + buf.byteLength, 0);
1821
const result = new Uint8Array(totalLength);
1822
let offset = 0;
1823
for (const buf of buffers) {
1824
result.set(new Uint8Array(buf), offset);
1825
offset += buf.byteLength;
1826
}
1827
1828
return result.buffer;
1829
}
1830
1831
export type Headers = { [key: string]: JSONValue };
1832
1833
export class MessageData<T = any> {
1834
public readonly encoding: DataEncoding;
1835
public readonly raw;
1836
public readonly headers?: Headers;
1837
1838
constructor({ encoding, raw, headers }) {
1839
this.encoding = encoding;
1840
this.raw = raw;
1841
this.headers = headers;
1842
}
1843
1844
get data(): T {
1845
return decode({ encoding: this.encoding, data: this.raw });
1846
}
1847
1848
get length(): number {
1849
// raw is binary data so it's the closest thing we have to the
1850
// size of this message. It would also make sense to include
1851
// the headers, but JSON'ing them would be expensive, so we don't.
1852
return this.raw.length;
1853
}
1854
}
1855
1856
export class Message<T = any> extends MessageData<T> {
1857
private client: Client;
1858
public readonly subject;
1859
1860
constructor({ encoding, raw, headers, client, subject }) {
1861
super({ encoding, raw, headers });
1862
this.client = client;
1863
this.subject = subject;
1864
}
1865
1866
isRequest = (): boolean => !!this.headers?.[REPLY_HEADER];
1867
1868
private respondSubject = () => {
1869
const subject = this.headers?.[REPLY_HEADER];
1870
if (!subject) {
1871
console.log(
1872
`WARNING: respond -- message to '${this.subject}' is not a request`,
1873
);
1874
return;
1875
}
1876
return `${subject}`;
1877
};
1878
1879
respondSync = (mesg, opts?: PublishOptions): { bytes: number } => {
1880
const subject = this.respondSubject();
1881
if (!subject) return { bytes: 0 };
1882
return this.client.publishSync(subject, mesg, opts);
1883
};
1884
1885
respond = async (
1886
mesg,
1887
opts: PublishOptions = {},
1888
): Promise<{ bytes: number; count: number }> => {
1889
const subject = this.respondSubject();
1890
if (!subject) {
1891
return { bytes: 0, count: 0 };
1892
}
1893
return await this.client.publish(subject, mesg, {
1894
// we *always* wait for interest for sync respond, since
1895
// it is by far the most likely situation where it wil be needed, due
1896
// to inboxes when users first sign in.
1897
waitForInterest: true,
1898
...opts,
1899
});
1900
};
1901
}
1902
1903
export function messageData(
1904
mesg,
1905
{ headers, raw, encoding = DEFAULT_ENCODING }: PublishOptions = {},
1906
) {
1907
return new MessageData({
1908
encoding,
1909
raw: raw ?? encode({ encoding, mesg }),
1910
headers,
1911
});
1912
}
1913
1914
export type Subscription = EventIterator<Message>;
1915
1916
export class ConatError extends Error {
1917
code: string | number;
1918
constructor(mesg: string, { code }) {
1919
super(mesg);
1920
this.code = code;
1921
}
1922
}
1923
1924
function isEmpty(obj: object): boolean {
1925
for (const _x in obj) {
1926
return false;
1927
}
1928
return true;
1929
}
1930
1931
function toConatError(socketIoError) {
1932
// only errors are "disconnected" and a timeout
1933
const e = `${socketIoError}`;
1934
if (e.includes("disconnected")) {
1935
return e;
1936
} else {
1937
return new ConatError(`timeout - ${e}`, {
1938
code: 408,
1939
});
1940
}
1941
}
1942
1943