Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/core/server.ts
1710 views
1
/*
2
3
Just try it out, start up node.js in this directory and:
4
5
s = require('@cocalc/conat/core/server').init({port:4567, getUser:()=>{return {hub_id:'hub'}}})
6
c = s.client();
7
c.watch('foo')
8
c2 = s.client();
9
c2.pub('foo', 'bar')
10
11
To connect from another terminal:
12
13
c = require('@cocalc/conat/core/client').connect({address:"http://localhost:4567"})
14
c.state
15
// 'connected'
16
17
18
cd packages/server
19
20
21
s = await require('@cocalc/server/conat/socketio').initConatServer()
22
23
s0 = await require('@cocalc/server/conat/socketio').initConatServer({port:3000}); 0
24
25
26
---
27
28
*/
29
30
import type { ConnectionStats, ServerInfo } from "./types";
31
import {
32
isValidSubject,
33
isValidSubjectWithoutWildcards,
34
} from "@cocalc/conat/util";
35
import { Server } from "socket.io";
36
import { delay } from "awaiting";
37
import {
38
ConatError,
39
connect,
40
Client,
41
type ClientOptions,
42
MAX_INTEREST_TIMEOUT,
43
STICKY_QUEUE_GROUP,
44
} from "./client";
45
import {
46
RESOURCE,
47
MAX_CONNECTIONS_PER_USER,
48
MAX_CONNECTIONS,
49
MAX_PAYLOAD,
50
MAX_SUBSCRIPTIONS_PER_CLIENT,
51
MAX_SUBSCRIPTIONS_PER_HUB,
52
} from "./constants";
53
import { Patterns } from "./patterns";
54
import { is_array } from "@cocalc/util/misc";
55
import { UsageMonitor } from "@cocalc/conat/monitor/usage";
56
import { once, until } from "@cocalc/util/async-utils";
57
import {
58
clusterLink,
59
type ClusterLink,
60
clusterStreams,
61
type ClusterStreams,
62
trimClusterStreams,
63
createClusterPersistServer,
64
Sticky,
65
Interest,
66
hashInterest,
67
hashSticky,
68
} from "./cluster";
69
import { type ConatSocketServer } from "@cocalc/conat/socket";
70
import { throttle } from "lodash";
71
import { getLogger } from "@cocalc/conat/client";
72
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
73
import { type SysConatServer, sysApiSubject, sysApi } from "./sys";
74
import { forkedConatServer } from "./start-server";
75
import { stickyChoice } from "./sticky";
76
import { EventEmitter } from "events";
77
import { Metrics } from "../types";
78
79
const logger = getLogger("conat:core:server");
80
81
const DEFAULT_AUTOSCAN_INTERVAL = 15_000;
82
const DEFAULT_LONG_AUTOSCAN_INTERVAL = 60_000;
83
84
// If a cluster node has been disconnected for this long,
85
// unjoin it, thus freeing the streams tracking its state (so memory),
86
// If it comes back it will have to explicitly
87
// join the cluster again. This is primarily to not leak RAM
88
// when nodes are removed on purpose, and also avoid a cluttered log
89
// of constant reconnect errors. Supercluster nodes are never
90
// automatically forgotten.
91
const DEFAULT_FORGET_CLUSTER_NODE_INTERVAL = 30 * 60_000; // 30 minutes by default
92
93
const DEBUG = false;
94
95
export interface InterestUpdate {
96
op: "add" | "delete";
97
subject: string;
98
queue?: string;
99
room: string;
100
}
101
102
export interface StickyUpdate {
103
pattern: string;
104
subject: string;
105
target: string;
106
}
107
108
interface Update {
109
interest?: InterestUpdate;
110
sticky?: StickyUpdate;
111
}
112
113
export function init(opts: Options) {
114
return new ConatServer(opts);
115
}
116
117
export type UserFunction = (
118
socket,
119
systemAccounts?: { [cookieName: string]: { password: string; user: any } },
120
) => Promise<any>;
121
122
export type AllowFunction = (opts: {
123
type: "pub" | "sub";
124
user: any;
125
subject: string;
126
}) => Promise<boolean>;
127
128
export interface Options {
129
id?: string;
130
httpServer?;
131
port?: number;
132
path?: string;
133
getUser?: UserFunction;
134
isAllowed?: AllowFunction;
135
maxSubscriptionsPerClient?: number;
136
maxSubscriptionsPerHub?: number;
137
systemAccountPassword?: string;
138
// if true, use https when creating an internal client.
139
ssl?: boolean;
140
141
// WARNING: **superclusters are NOT fully iplemented yet.**
142
//
143
// if clusterName is set, enable clustering. Each node
144
// in the cluster must have a different name. systemAccountPassword
145
// must also be set. This only has an impact when the id is '0'.
146
// This publishes interest state in a stream, so uses more resources.
147
clusterName?: string;
148
149
// autoscanInterval = shortest interval when cluster will automatically
150
// be scanned for new nodes. It may be longer if there's no activity.
151
// Set to 0 to disable autoscan, which is very useful for unit tests.
152
// Defaults to 10_000 = 10 seconds.
153
autoscanInterval?: number;
154
longAutoscanInterval?: number;
155
forgetClusterNodeInterval?: number;
156
157
// if localClusterSize >=2 (and clusterName, etc. configured above),
158
// creates a local cluster by spawning child processes with
159
// ports the above port +1, +2, etc.
160
localClusterSize?: number;
161
162
// the ip address of this server on the cluster.
163
clusterIpAddress?: string;
164
}
165
166
type State = "init" | "ready" | "closed";
167
168
export class ConatServer extends EventEmitter {
169
public readonly io;
170
public readonly id: string;
171
172
private getUser: UserFunction;
173
private isAllowed: AllowFunction;
174
public readonly options: Partial<Options>;
175
private cluster?: boolean;
176
177
private sockets: { [id: string]: any } = {};
178
private stats: { [id: string]: ConnectionStats } = {};
179
private usage: UsageMonitor;
180
public state: State = "init";
181
182
private subscriptions: { [socketId: string]: Set<string> } = {};
183
public interest: Interest = new Patterns();
184
// the target string is JSON.stringify({ id: string; subject: string }),
185
// which is the socket.io room to send the messages to.
186
public sticky: Sticky = {};
187
188
private clusterStreams?: ClusterStreams;
189
private clusterLinks: {
190
[clusterName: string]: { [id: string]: ClusterLink };
191
} = {};
192
private clusterLinksByAddress: { [address: string]: ClusterLink } = {};
193
private clusterPersistServer?: ConatSocketServer;
194
public readonly clusterName?: string;
195
private queuedClusterUpdates: Update[] = [];
196
197
constructor(options: Options) {
198
super();
199
const {
200
httpServer,
201
port = 3000,
202
ssl = false,
203
id = "0",
204
path = "/conat",
205
getUser,
206
isAllowed,
207
maxSubscriptionsPerClient = MAX_SUBSCRIPTIONS_PER_CLIENT,
208
maxSubscriptionsPerHub = MAX_SUBSCRIPTIONS_PER_HUB,
209
systemAccountPassword,
210
clusterName,
211
autoscanInterval = DEFAULT_AUTOSCAN_INTERVAL,
212
longAutoscanInterval = DEFAULT_LONG_AUTOSCAN_INTERVAL,
213
forgetClusterNodeInterval = DEFAULT_FORGET_CLUSTER_NODE_INTERVAL,
214
localClusterSize = 1,
215
clusterIpAddress,
216
} = options;
217
this.clusterName = clusterName;
218
this.options = {
219
port,
220
ssl,
221
id,
222
path,
223
maxSubscriptionsPerClient,
224
maxSubscriptionsPerHub,
225
systemAccountPassword,
226
clusterName,
227
autoscanInterval,
228
longAutoscanInterval,
229
forgetClusterNodeInterval,
230
localClusterSize,
231
clusterIpAddress,
232
};
233
this.cluster = !!id && !!clusterName;
234
this.getUser = async (socket) => {
235
if (getUser == null) {
236
// no auth at all
237
return null;
238
} else {
239
let systemAccounts;
240
if (this.options.systemAccountPassword) {
241
systemAccounts = {
242
sys: {
243
password: this.options.systemAccountPassword,
244
user: { hub_id: "system" },
245
},
246
};
247
} else {
248
systemAccounts = undefined;
249
}
250
return await getUser(socket, systemAccounts);
251
}
252
};
253
this.isAllowed = isAllowed ?? (async () => true);
254
this.id = id;
255
this.log("Starting Conat server...", {
256
id,
257
path,
258
port: this.options.port,
259
httpServer: httpServer ? "httpServer(...)" : undefined,
260
});
261
262
// NOTE: do NOT enable connectionStateRecovery; it seems to cause issues
263
// when restarting the server.
264
let adapter: any = undefined;
265
266
const socketioOptions = {
267
maxHttpBufferSize: MAX_PAYLOAD,
268
path,
269
adapter,
270
// perMessageDeflate is disabled by default in socket.io, but it
271
// seems unclear exactly *why*:
272
// https://github.com/socketio/socket.io/issues/3477#issuecomment-930503313
273
perMessageDeflate: { threshold: 1024 },
274
};
275
this.log(socketioOptions);
276
if (httpServer) {
277
this.io = new Server(httpServer, socketioOptions);
278
} else {
279
this.io = new Server(port, socketioOptions);
280
this.log(`listening on port ${port}`);
281
}
282
this.initUsage();
283
this.io.on("connection", this.handleSocket);
284
this.init();
285
}
286
287
private setState = (state: State) => {
288
if (this.state == state) return;
289
this.emit(state);
290
this.state = state;
291
};
292
293
private isClosed = () => this.state == "closed";
294
295
private init = async () => {
296
if (this.options.systemAccountPassword) {
297
await this.initSystemService();
298
}
299
if (this.clusterName) {
300
await this.initCluster();
301
}
302
this.setState("ready");
303
};
304
305
private initUsage = () => {
306
this.usage = new UsageMonitor({
307
maxPerUser: MAX_CONNECTIONS_PER_USER,
308
max: MAX_CONNECTIONS,
309
resource: RESOURCE,
310
log: (...args) => this.log("usage", ...args),
311
});
312
};
313
314
public getUsage = (): Metrics => {
315
return this.usage.getMetrics();
316
};
317
318
// this is for the Kubernetes health check -- I haven't
319
// thought at all about what to do here, really.
320
// Hopefully experience can teach us.
321
isHealthy = () => {
322
if (this.isClosed()) {
323
return false;
324
}
325
return true;
326
};
327
328
close = async () => {
329
if (this.isClosed()) {
330
return;
331
}
332
this.setState("closed");
333
334
if (this.clusterStreams != null) {
335
for (const name in this.clusterStreams) {
336
this.clusterStreams[name].close();
337
}
338
delete this.clusterStreams;
339
}
340
for (const clusterName in this.clusterLinks) {
341
const link = this.clusterLinks[clusterName];
342
for (const id in link) {
343
link[id].close();
344
delete link[id];
345
}
346
delete this.clusterLinks[clusterName];
347
}
348
for (const address in this.clusterLinksByAddress) {
349
delete this.clusterLinksByAddress[address];
350
}
351
this.clusterPersistServer?.close();
352
delete this.clusterPersistServer;
353
354
await delay(100);
355
await this.io.close();
356
for (const prop of ["interest", "subscriptions", "sockets", "services"]) {
357
delete this[prop];
358
}
359
this.usage?.close();
360
this.interest?.close();
361
this.sticky = {};
362
this.subscriptions = {};
363
this.stats = {};
364
this.sockets = {};
365
};
366
367
private info = (): ServerInfo => {
368
return {
369
max_payload: MAX_PAYLOAD,
370
id: this.id,
371
clusterName: this.clusterName,
372
};
373
};
374
375
private log = (...args) => {
376
logger.debug("id", this.id, ":", ...args);
377
};
378
379
private unsubscribe = async ({ socket, subject }) => {
380
if (DEBUG) {
381
this.log("unsubscribe ", { id: socket.id, subject });
382
}
383
const room = socketSubjectRoom({ socket, subject });
384
socket.leave(room);
385
await this.updateInterest({ op: "delete", subject, room });
386
};
387
388
////////////////////////////////////
389
// CLUSTER STREAM
390
////////////////////////////////////
391
392
private publishUpdate = (update: Update) => {
393
if (this.clusterStreams == null) {
394
throw Error("streams must be initialized");
395
}
396
const { interest, sticky } = update;
397
if (interest !== undefined) {
398
this.clusterStreams.interest.publish(interest);
399
}
400
if (sticky !== undefined) {
401
this.clusterStreams.sticky.publish(sticky);
402
}
403
this.trimClusterStream();
404
};
405
406
private updateClusterStream = (update: Update) => {
407
if (!this.clusterName) return;
408
409
if (this.clusterStreams !== undefined) {
410
this.publishUpdate(update);
411
} else {
412
this.queuedClusterUpdates.push(update);
413
}
414
};
415
416
// throttled because could be expensive -- once a minute it trims
417
// operations that are definitely no longer relevant and are at least
418
// several minutes old. These are ops involving a pattern where
419
// there is no interest in that pattern.
420
private trimClusterStream = throttle(
421
async () => {
422
if (
423
this.clusterStreams !== undefined &&
424
this.interest !== undefined &&
425
this.sticky !== undefined
426
) {
427
await trimClusterStreams(
428
this.clusterStreams,
429
{
430
interest: this.interest,
431
sticky: this.sticky,
432
links: Object.values(
433
this.clusterLinks?.[this.clusterName ?? ""] ?? {},
434
),
435
},
436
15 * 60000,
437
);
438
}
439
},
440
60000,
441
{ leading: false, trailing: true },
442
);
443
444
///////////////////////////////////////
445
// INTEREST - PATTERNS USERS ARE SUBSCRIBED TO
446
///////////////////////////////////////
447
448
private updateInterest = async (interest: InterestUpdate) => {
449
if (this.isClosed()) return;
450
// publish to the stream
451
this.updateClusterStream({ interest });
452
// update our local state
453
updateInterest(interest, this.interest, this.sticky);
454
};
455
456
///////////////////////////////////////
457
// STICKY QUEUE GROUPS
458
///////////////////////////////////////
459
460
private updateSticky = (sticky: StickyUpdate) => {
461
if (updateSticky(sticky, this.sticky)) {
462
this.updateClusterStream({ sticky });
463
}
464
};
465
466
private getStickyTarget = ({
467
pattern,
468
subject,
469
targets: targets0,
470
}: {
471
pattern: string;
472
subject: string;
473
targets: Set<string>; // the current valid choices as defined by subscribers known via interest graph
474
}) => {
475
if (!this.cluster || this.clusterName == null) {
476
return this.sticky[pattern]?.[subject];
477
}
478
const targets = new Set<string>();
479
const target = this.sticky[pattern]?.[subject];
480
if (target !== undefined && targets0.has(target)) {
481
targets.add(target);
482
}
483
// next check sticky state of other nodes in the cluster
484
const cluster = this.clusterLinks[this.clusterName];
485
for (const id in cluster) {
486
const target = cluster[id].sticky[pattern]?.[subject];
487
if (target !== undefined && targets0.has(target)) {
488
targets.add(target);
489
}
490
}
491
if (targets.size == 0) {
492
return undefined;
493
}
494
if (targets.size == 1) {
495
for (const target of targets) {
496
return target;
497
}
498
}
499
500
// problem: there are distinct mutually incompatible
501
// choices of targets. This can only happen if at least
502
// two choices were made when the cluster was in an
503
// inconsistent state.
504
// We just take the first in alphabetical order.
505
// Since the sticky maps being used to make
506
// this list of targets is eventually consistent
507
// across the cluster, the same choice of target from
508
// those targets will be made by all nodes.
509
// The main problem with doing this is its slightly more
510
// effort. The main advantage is that no communication
511
// or coordination between nodes is needed to "fix or agree
512
// on something", and that's a huge advantage!!
513
514
// This choice algorithm is used in saveNonredundantStickyState below!
515
// **Don't change this without changing that!!**
516
return Array.from(targets).sort()[0];
517
};
518
519
private saveNonredundantStickyState = (link: ClusterLink) => {
520
// When a link is about to be closed (e.g., the node died or is lost),
521
// we store its non-redundant sticky state in our own sticky state,
522
// so those routing choices aren't lost. Hopefully only a single
523
// node in the cluster stores this info (due to the randomness of removing,
524
// it'll be the one that happens to do this first), but if more than one
525
// does, it's just less efficient.
526
527
if (link.clusterName != this.clusterName) {
528
// only worry about nodes in the same cluster
529
return;
530
}
531
const cluster = this.clusterLinks[this.clusterName];
532
const isRedundant = (pattern, subject, target) => {
533
let t = this.sticky[pattern]?.[subject];
534
if (t == target) {
535
// we already have it -- not redundnant
536
return true;
537
}
538
539
for (const id in cluster) {
540
const link = cluster[id];
541
if (id == link.id) {
542
continue;
543
}
544
const s = cluster[id].sticky[pattern]?.subject;
545
if (s !== undefined) {
546
if (s == target) {
547
// someone else has it, so definitely not redundant
548
return true;
549
}
550
if (t === undefined || s < t) {
551
t = s;
552
}
553
}
554
}
555
// nobody else has this mapping... but maybe it's not used
556
// due to other conflicting ones?
557
if (t !== undefined && t < target) {
558
// target wouldn't be used since there's conflicting ones that are smaller
559
return true;
560
}
561
// target *would* be used, but nobody else knows it, so we probably must save it.
562
// Make sure the pattern is still of interest first
563
if (this.interest.hasPattern(pattern)) {
564
// we need it!
565
return false;
566
}
567
for (const id in cluster) {
568
const link = cluster[id];
569
if (id == link.id) {
570
continue;
571
}
572
if (link.interest.hasPattern(pattern)) {
573
return false;
574
}
575
}
576
// nothing in the remaining cluster is subscribed to this pattern, so
577
// no point in preserving this sticky routing info
578
return true;
579
};
580
581
// { [pattern: string]: { [subject: string]: string } }
582
for (const pattern in link.sticky) {
583
const x = link.sticky[pattern];
584
for (const subject in x) {
585
const target = x[subject];
586
if (!isRedundant(pattern, subject, target)) {
587
// we save the assignment
588
this.updateSticky({ pattern, subject, target });
589
}
590
}
591
}
592
};
593
594
///////////////////////////////////////
595
// SUBSCRIBE and PUBLISH
596
///////////////////////////////////////
597
598
private subscribe = async ({ socket, subject, queue, user }) => {
599
if (DEBUG) {
600
this.log("subscribe ", { id: socket.id, subject, queue });
601
}
602
if (typeof queue != "string") {
603
throw Error("queue must be defined");
604
}
605
if (!isValidSubject(subject)) {
606
throw Error("invalid subject");
607
return;
608
}
609
if (!(await this.isAllowed({ user, subject, type: "sub" }))) {
610
const message = `permission denied subscribing to '${subject}' from ${JSON.stringify(
611
user,
612
)}`;
613
this.log(message);
614
throw new ConatError(message, {
615
code: 403,
616
});
617
}
618
let maxSubs;
619
if (user?.hub_id) {
620
maxSubs =
621
this.options.maxSubscriptionsPerHub ?? MAX_SUBSCRIPTIONS_PER_HUB;
622
} else {
623
maxSubs =
624
this.options.maxSubscriptionsPerClient ?? MAX_SUBSCRIPTIONS_PER_CLIENT;
625
}
626
if (maxSubs) {
627
const numSubs = this.subscriptions?.[socket.id]?.size ?? 0;
628
if (numSubs >= maxSubs) {
629
// error 429 == "too many requests"
630
throw new ConatError(
631
`there is a limit of at most ${maxSubs} subscriptions and you currently have ${numSubs} subscriptions -- subscription to '${subject}' denied`,
632
{ code: 429 },
633
);
634
}
635
}
636
const room = socketSubjectRoom({ socket, subject });
637
// critical to await socket.join so we don't advertise that there is
638
// a subscriber before the socket is actually getting messages.
639
await socket.join(room);
640
await this.updateInterest({ op: "add", subject, room, queue });
641
};
642
643
// get all interest in this subject across the cluster (NOT supercluster)
644
// This is a map from node id to array of patterns.
645
private clusterInterest = (subject: string) => {
646
const X: {
647
[pattern: string]: { [queue: string]: { [nodeId: string]: Set<string> } };
648
} = {};
649
for (const pattern of this.interest.matches(subject)) {
650
X[pattern] = {};
651
const g = this.interest.get(pattern)!;
652
for (const queue in g) {
653
X[pattern][queue] = { [this.id]: g[queue] };
654
}
655
}
656
if (this.clusterName == null) {
657
return X;
658
}
659
const thisCluster = this.clusterLinks[this.clusterName];
660
if (thisCluster == null) {
661
return X;
662
}
663
for (const id in thisCluster) {
664
const link = thisCluster[id];
665
if (!link.isConnected()) {
666
continue;
667
}
668
for (const pattern of link.interest.matches(subject)) {
669
if (X[pattern] === undefined) {
670
X[pattern] = {};
671
}
672
const g = link.interest.get(pattern)!;
673
for (const queue in g) {
674
if (X[pattern][queue] === undefined) {
675
X[pattern][queue] = { [id]: g[queue] };
676
} else {
677
X[pattern][queue][id] = g[queue];
678
}
679
}
680
}
681
}
682
return X;
683
};
684
685
private deliver = ({
686
subject,
687
data,
688
targets,
689
}: {
690
subject: string;
691
data: any;
692
targets: { pattern: string; target: string }[];
693
}): number => {
694
// Deliver the messages to these targets, which should all be
695
// connected to this server. This is used for cluster routing only.
696
for (const { pattern, target } of targets) {
697
this.io.to(target).emit(pattern, { subject, data });
698
}
699
return targets.length;
700
};
701
702
private publish = async ({
703
subject,
704
data,
705
from,
706
}: {
707
subject: string;
708
data: any;
709
from: any;
710
}): Promise<number> => {
711
if (!isValidSubjectWithoutWildcards(subject)) {
712
throw Error("invalid subject");
713
}
714
715
if (!(await this.isAllowed({ user: from, subject, type: "pub" }))) {
716
const message = `permission denied publishing to '${subject}' from ${JSON.stringify(
717
from,
718
)}`;
719
this.log(message);
720
throw new ConatError(message, {
721
// this is the http code for permission denied, and having this
722
// set is assumed elsewhere in our code, so don't mess with it!
723
code: 403,
724
});
725
}
726
727
// note -- position 6 of data is special cluster delivery data, to avoid
728
// a message bouncing back and forth in case the interest stream
729
// were slightly out of sync and also so we no exactly where
730
// to deliver the message.
731
const targets = data[6];
732
if (targets != null) {
733
return this.deliver({ subject, data, targets });
734
}
735
736
if (!this.cluster) {
737
// Simpler non-cluster (or no forward) case. We ONLY have to
738
// consider data about this server, and no other nodes.
739
let count = 0;
740
for (const pattern of this.interest.matches(subject)) {
741
const g = this.interest.get(pattern)!;
742
if (DEBUG) {
743
this.log("publishing", { subject, data, g });
744
}
745
// send to exactly one in each queue group
746
for (const queue in g) {
747
const target = this.loadBalance({
748
pattern,
749
subject,
750
queue,
751
targets: g[queue],
752
});
753
if (target !== undefined) {
754
this.io.to(target).emit(pattern, { subject, data });
755
if (!isSilentPattern(pattern)) {
756
count++;
757
}
758
}
759
}
760
}
761
return count;
762
}
763
764
// More complicated cluster case, where we have to consider the
765
// entire cluster, or possibly the supercluster.
766
let count = 0;
767
const outsideTargets: {
768
[id: string]: { pattern: string; target: string }[];
769
} = {};
770
771
// const queueGroups: { [pattern: string]: Set<string> } = {};
772
const clusterInterest = this.clusterInterest(subject);
773
for (const pattern in clusterInterest) {
774
const g = clusterInterest[pattern];
775
for (const queue in g) {
776
const t = this.clusterLoadBalance({
777
pattern,
778
subject,
779
queue,
780
targets: g[queue],
781
});
782
if (t !== undefined) {
783
const { id, target } = t;
784
if (id == this.id) {
785
// another client of this server
786
this.io.to(target).emit(pattern, { subject, data });
787
if (!isSilentPattern(pattern)) {
788
count++;
789
}
790
} else {
791
// client connected to a different server -- we note this, and
792
// will send everything for each server at once, instead of
793
// potentially sending the same message multiple times for
794
// different patterns.
795
if (outsideTargets[id] == null) {
796
outsideTargets[id] = [{ pattern, target }];
797
} else {
798
outsideTargets[id].push({ pattern, target });
799
}
800
}
801
}
802
}
803
}
804
805
if (!this.clusterName) {
806
throw Error("clusterName must be set");
807
}
808
809
// Send the messages to the outsideTargets. We send the message
810
// along with exactly who it should be delivered to. There is of
811
// course no guarantee that a target doesn't vanish just as we are
812
// sending this...
813
for (const id in outsideTargets) {
814
const link = this.clusterLinks[this.clusterName]?.[id];
815
const data1 = [subject, ...data];
816
// use explicit index since length of data depends on
817
// whether or not there are headers!
818
data1[7] = outsideTargets[id];
819
for (const { pattern } of data1[7]) {
820
if (!isSilentPattern(pattern)) {
821
count++;
822
}
823
}
824
link?.client.conn.emit("publish", data1);
825
}
826
827
//
828
// TODO: Supercluster routing. NOT IMPLEMENTED YET
829
//
830
// // if no matches in local cluster, try the supercluster (if there is one)
831
// if (count == 0) {
832
// // nothing in this cluster, so try other clusters
833
// for (const clusterName in this.clusterLinks) {
834
// if (clusterName == this.clusterName) continue;
835
// const links = this.clusterLinks[clusterName];
836
// for (const id in links) {
837
// const link = links[id];
838
// const count2 = link.publish({ subject, data, queueGroups });
839
// if (count2 > 0) {
840
// count += count2;
841
// // once we publish to any other cluster, we are done.
842
// break;
843
// }
844
// }
845
// }
846
// }
847
848
return count;
849
};
850
851
///////////////////////////////////////
852
// WHO GETS PUBLISHED MESSAGE:
853
///////////////////////////////////////
854
private loadBalance = ({
855
pattern,
856
subject,
857
queue,
858
targets,
859
}: {
860
pattern: string;
861
subject: string;
862
queue: string;
863
targets: Set<string>;
864
}): string | undefined => {
865
if (targets.size == 0) {
866
return undefined;
867
}
868
if (queue == STICKY_QUEUE_GROUP) {
869
return stickyChoice({
870
pattern,
871
subject,
872
targets,
873
updateSticky: this.updateSticky,
874
getStickyTarget: this.getStickyTarget,
875
});
876
} else {
877
return randomChoice(targets);
878
}
879
};
880
881
clusterLoadBalance = ({
882
pattern,
883
subject,
884
queue,
885
targets: targets0,
886
}: {
887
pattern: string;
888
subject: string;
889
queue: string;
890
targets: { [id: string]: Set<string> };
891
}): { id: string; target: string } | undefined => {
892
const targets = new Set<string>();
893
for (const id in targets0) {
894
for (const target of targets0[id]) {
895
targets.add(JSON.stringify({ id, target }));
896
}
897
}
898
const x = this.loadBalance({ pattern, subject, queue, targets });
899
if (!x) {
900
return undefined;
901
}
902
return JSON.parse(x);
903
};
904
905
///////////////////////////////////////
906
// MANAGING A CONNECTION FROM A CLIENT SOCKET
907
///////////////////////////////////////
908
private handleSocket = async (socket) => {
909
this.sockets[socket.id] = socket;
910
socket.once("closed", () => {
911
this.log("connection closed", socket.id);
912
delete this.sockets[socket.id];
913
delete this.stats[socket.id];
914
});
915
916
this.stats[socket.id] = {
917
send: { messages: 0, bytes: 0 },
918
recv: { messages: 0, bytes: 0 },
919
subs: 0,
920
connected: Date.now(),
921
address: getAddress(socket),
922
};
923
let user: any = null;
924
let added = false;
925
try {
926
user = await this.getUser(socket);
927
this.usage.add(user);
928
added = true;
929
} catch (err) {
930
// getUser is supposed to throw an error if authentication fails
931
// for any reason
932
// Also, if the connection limit is hit they still connect, but as
933
// the error user who can't do anything (hence not waste resources).
934
user = { error: `${err}`, code: err.code };
935
}
936
this.stats[socket.id].user = user;
937
const id = socket.id;
938
this.log("new connection", { id, user });
939
if (this.subscriptions[id] == null) {
940
this.subscriptions[id] = new Set<string>();
941
}
942
943
this.sendInfo(socket, user);
944
945
socket.on("stats", ({ recv0 }) => {
946
const s = this.stats[socket.id];
947
if (s == null) return;
948
s.recv = recv0;
949
});
950
951
socket.on(
952
"wait-for-interest",
953
async ({ subject, timeout = MAX_INTEREST_TIMEOUT }, respond) => {
954
if (respond == null) {
955
return;
956
}
957
if (!isValidSubjectWithoutWildcards(subject)) {
958
respond({ error: "invalid subject" });
959
return;
960
}
961
if (!(await this.isAllowed({ user, subject, type: "pub" }))) {
962
const message = `permission denied waiting for interest in '${subject}' from ${JSON.stringify(
963
user,
964
)}`;
965
this.log(message);
966
respond({ error: message, code: 403 });
967
}
968
try {
969
respond(await this.waitForInterest(subject, timeout, socket.id));
970
} catch (err) {
971
respond({ error: `${err}` });
972
}
973
},
974
);
975
976
socket.on("publish", async ([subject, ...data], respond) => {
977
if (data?.[2]) {
978
// done
979
this.stats[socket.id].send.messages += 1;
980
}
981
this.stats[socket.id].send.bytes += data[4]?.length ?? 0;
982
this.stats[socket.id].active = Date.now();
983
// this.log(JSON.stringify(this.stats));
984
985
try {
986
const count = await this.publish({ subject, data, from: user });
987
respond?.({ count });
988
} catch (err) {
989
// console.log(this.id, "ERROR", err);
990
if (err.code == 403) {
991
socket.emit("permission", {
992
message: err.message,
993
subject,
994
type: "pub",
995
});
996
}
997
respond?.({ error: `${err}`, code: err.code });
998
}
999
});
1000
1001
const subscribe = async ({ subject, queue }) => {
1002
try {
1003
if (this.subscriptions[id].has(subject)) {
1004
return { status: "already-added" };
1005
}
1006
await this.subscribe({ socket, subject, queue, user });
1007
this.subscriptions[id].add(subject);
1008
this.stats[socket.id].subs += 1;
1009
this.stats[socket.id].active = Date.now();
1010
return { status: "added" };
1011
} catch (err) {
1012
if (err.code == 403) {
1013
socket.emit("permission", {
1014
message: err.message,
1015
subject,
1016
type: "sub",
1017
});
1018
}
1019
return { error: `${err}`, code: err.code };
1020
}
1021
};
1022
1023
socket.on(
1024
"subscribe",
1025
async (x: { subject; queue } | { subject; queue }[], respond) => {
1026
let r;
1027
if (is_array(x)) {
1028
const v: any[] = [];
1029
for (const y of x) {
1030
v.push(await subscribe(y));
1031
}
1032
r = v;
1033
} else {
1034
r = await subscribe(x);
1035
}
1036
respond?.(r);
1037
},
1038
);
1039
1040
socket.on("subscriptions", (_, respond) => {
1041
if (respond == null) {
1042
return;
1043
}
1044
respond(Array.from(this.subscriptions[id]));
1045
});
1046
1047
const unsubscribe = ({ subject }: { subject: string }) => {
1048
if (!this.subscriptions[id].has(subject)) {
1049
return;
1050
}
1051
this.unsubscribe({ socket, subject });
1052
this.subscriptions[id].delete(subject);
1053
this.stats[socket.id].subs -= 1;
1054
this.stats[socket.id].active = Date.now();
1055
};
1056
1057
socket.on(
1058
"unsubscribe",
1059
(x: { subject: string } | { subject: string }[], respond) => {
1060
let r;
1061
if (is_array(x)) {
1062
r = x.map(unsubscribe);
1063
} else {
1064
r = unsubscribe(x);
1065
}
1066
respond?.(r);
1067
},
1068
);
1069
1070
socket.on("cluster", (respond) => {
1071
respond?.(this.clusterAddresses(this.clusterName));
1072
});
1073
1074
socket.on("disconnecting", async () => {
1075
this.log("disconnecting", { id, user });
1076
delete this.stats[socket.id];
1077
if (added) {
1078
this.usage.delete(user);
1079
}
1080
const rooms = Array.from(socket.rooms) as string[];
1081
for (const room of rooms) {
1082
const subject = getSubjectFromRoom(room);
1083
this.unsubscribe({ socket, subject });
1084
}
1085
delete this.subscriptions[id];
1086
});
1087
};
1088
1089
sendInfo = async (socket, user) => {
1090
// we send info with an ack because I think sometimes the initial "info"
1091
// message gets dropped, leaving a broken hanging connection that never
1092
// does anything (until the user explicitly refreshes their browser).
1093
// I did see what is probably this in production frequently.
1094
try {
1095
await until(
1096
async () => {
1097
if (!socket.conn?.readyState.startsWith("o")) {
1098
// logger.debug(`failed to send "info" message to ${socket.id}`);
1099
// readyState not defined or not opened or opening, so connection must
1100
// have been closed before success.
1101
return true;
1102
}
1103
try {
1104
await socket
1105
.timeout(7500)
1106
.emitWithAck("info", { ...this.info(), user });
1107
return true;
1108
} catch (err) {
1109
// logger.debug(`error sending "info" message to ${socket.id}`, err);
1110
return false;
1111
}
1112
},
1113
{ min: 5000, max: 30000, timeout: 120_000 },
1114
);
1115
} catch {
1116
// never ack'd "info" after a few minutes -- could just be an old client,
1117
// so don't do anything at this point.
1118
}
1119
};
1120
1121
address = () => getServerAddress(this.options);
1122
1123
// create new client in the same process connected to this server.
1124
// This is especially useful for unit testing and is not cached
1125
// by default (i.e., multiple calls return distinct clients).
1126
client = (options?: ClientOptions): Client => {
1127
const address = this.address();
1128
this.log("client: connecting to - ", { address });
1129
return connect({
1130
address,
1131
noCache: true,
1132
...options,
1133
});
1134
};
1135
1136
private initCluster = async () => {
1137
if (!this.cluster) {
1138
return;
1139
}
1140
if (!this.id) {
1141
throw Error("if cluster is enabled, then the id must be set");
1142
}
1143
if (!this.clusterName) {
1144
throw Error("if cluster is enabled, then the clusterName must be set");
1145
}
1146
if (!this.options.systemAccountPassword) {
1147
throw Error("cluster must have systemAccountPassword set");
1148
}
1149
1150
this.log("enabling cluster support", {
1151
id: this.id,
1152
clusterName: this.clusterName,
1153
});
1154
const client = this.client({
1155
systemAccountPassword: this.options.systemAccountPassword,
1156
});
1157
await client.waitUntilSignedIn();
1158
// What this does:
1159
// - Start a persist server that runs in same process but is just for
1160
// use for coordinator cluster nodes.
1161
// - Publish interest updates to a dstream.
1162
// - Route messages from another cluster to subscribers in this cluster.
1163
1164
this.log("creating persist server");
1165
this.clusterPersistServer = await createClusterPersistServer({
1166
client,
1167
id: this.id,
1168
clusterName: this.clusterName,
1169
});
1170
this.log("creating cluster streams");
1171
this.clusterStreams = await clusterStreams({
1172
client,
1173
id: this.id,
1174
clusterName: this.clusterName,
1175
});
1176
// add in everything so far in interest (TODO)
1177
if (this.queuedClusterUpdates.length > 0) {
1178
this.queuedClusterUpdates.map(this.publishUpdate);
1179
this.queuedClusterUpdates.length = 0;
1180
}
1181
this.initAutoscan();
1182
await this.initClusterNodes();
1183
this.log("cluster successfully initialized");
1184
};
1185
1186
private initClusterNodes = async () => {
1187
const localClusterSize = this.options.localClusterSize ?? 1;
1188
if (localClusterSize < 2) {
1189
return;
1190
}
1191
// spawn additional servers as separate processes to form a cluster
1192
const port = this.options.port;
1193
if (!port) {
1194
throw Error("bug -- port must be set");
1195
}
1196
const f = async (i: number) => {
1197
const opts = {
1198
path: this.options.path,
1199
ssl: this.options.ssl,
1200
systemAccountPassword: this.options.systemAccountPassword,
1201
clusterName: this.options.clusterName,
1202
autoscanInterval: this.options.autoscanInterval,
1203
longAutoscanInterval: this.options.longAutoscanInterval,
1204
forgetClusterNodeInterval: this.options.forgetClusterNodeInterval,
1205
port: port + i,
1206
id: `${this.options.id}-${i}`,
1207
};
1208
await forkedConatServer(opts);
1209
await this.join(getServerAddress(opts));
1210
};
1211
const v: any[] = [];
1212
for (let i = 1; i < localClusterSize; i++) {
1213
v.push(f(i));
1214
}
1215
await Promise.all(v);
1216
};
1217
1218
private initAutoscan = async () => {
1219
if (!this.options.autoscanInterval) {
1220
this.log("Cluster autoscan is DISABLED.");
1221
return;
1222
}
1223
this.log(`Cluster autoscan interval ${this.options.autoscanInterval}ms`);
1224
let lastCount = 1;
1225
while (!this.isClosed()) {
1226
let x;
1227
try {
1228
x = await this.scan();
1229
if (this.isClosed()) return;
1230
} catch (err) {
1231
// this should never happen unless there is a serious bug (?).
1232
this.log(`WARNING/BUG?: serious problem scanning -- ${err}`);
1233
throw err;
1234
await delay(this.options.longAutoscanInterval);
1235
continue;
1236
}
1237
if (x.errors.length > 0) {
1238
this.log(`WARNING: errors while scanning cluster`, x.errors);
1239
}
1240
if (x.count > 0 || lastCount > 0) {
1241
this.log(
1242
`cluster scan added ${x.count} links -- will scan again in ${this.options.autoscanInterval}`,
1243
);
1244
await delay(this.options.autoscanInterval);
1245
} else {
1246
this.log(
1247
`cluster scan found no new links -- waiting ${this.options.longAutoscanInterval}ms before next scan`,
1248
);
1249
await delay(this.options.longAutoscanInterval);
1250
}
1251
lastCount = x.count;
1252
}
1253
};
1254
1255
private scanSoon = throttle(
1256
async () => {
1257
if (this.isClosed() || !this.options.autoscanInterval) {
1258
return;
1259
}
1260
let x;
1261
try {
1262
x = await this.scan();
1263
} catch (err) {
1264
this.log(
1265
`WARNING/BUG?: scanSoon -- serious problem scanning -- ${err}`,
1266
);
1267
return;
1268
}
1269
if (x.errors.length > 0) {
1270
this.log(
1271
`WARNING: scanSoon -- errors while scanning cluster`,
1272
x.errors,
1273
);
1274
}
1275
},
1276
1277
10_000,
1278
{
1279
leading: true,
1280
trailing: true,
1281
},
1282
);
1283
1284
// Join this node to the cluster that contains a node with the given address.
1285
// - the address obviously must be reachable over the network
1286
// - the systemAccountPassword of this node and the one with the given
1287
// address must be the same.
1288
join = reuseInFlight(
1289
async (
1290
address: string,
1291
{ timeout }: { timeout?: number } = {},
1292
): Promise<ClusterLink> => {
1293
if (!this.options.systemAccountPassword) {
1294
throw Error("systemAccountPassword must be set");
1295
}
1296
logger.debug("join: connecting to ", address);
1297
const link0 = this.clusterLinksByAddress[address];
1298
if (link0 != null) {
1299
logger.debug("join: already connected to ", address);
1300
return link0;
1301
}
1302
try {
1303
const link = await clusterLink(
1304
address,
1305
this.options.systemAccountPassword,
1306
timeout,
1307
);
1308
const { clusterName, id } = link;
1309
if (this.clusterLinks[clusterName] == null) {
1310
this.clusterLinks[clusterName] = {};
1311
}
1312
this.clusterLinks[clusterName][id] = link;
1313
this.clusterLinksByAddress[address] = link;
1314
this.scanSoon();
1315
logger.debug("join: successfully created new connection to ", address);
1316
return link;
1317
} catch (err) {
1318
logger.debug(
1319
"join: FAILED creating a new connection to ",
1320
address,
1321
err,
1322
);
1323
throw err;
1324
}
1325
},
1326
);
1327
1328
unjoin = ({
1329
id,
1330
clusterName,
1331
address,
1332
}: {
1333
clusterName?: string;
1334
id?: string;
1335
address?: string;
1336
}) => {
1337
if (!clusterName && !id && !address) {
1338
throw Error(
1339
"at least one of clusterName, id or address must be specified",
1340
);
1341
}
1342
let link;
1343
if (address) {
1344
link = this.clusterLinksByAddress[address];
1345
} else {
1346
if (!id) {
1347
throw Error("if address is not given then id must be given");
1348
}
1349
const cluster = clusterName ?? this.clusterName;
1350
if (!cluster) {
1351
throw "clusterName must be set";
1352
}
1353
link = this.clusterLinks[cluster]?.[id];
1354
}
1355
if (link === undefined) {
1356
// already gone
1357
return;
1358
}
1359
1360
this.saveNonredundantStickyState(link);
1361
link.close();
1362
delete this.clusterLinks[link.clusterName][link.id];
1363
delete this.clusterLinksByAddress[link.address];
1364
if (Object.keys(this.clusterLinks[link.clusterName]).length == 0) {
1365
delete this.clusterLinks[link.clusterName];
1366
}
1367
};
1368
1369
private initSystemService = async () => {
1370
if (!this.options.systemAccountPassword) {
1371
throw Error("system service requires system account");
1372
}
1373
this.log("starting service listening on sys...");
1374
const client = this.client({
1375
extraHeaders: { Cookie: `sys=${this.options.systemAccountPassword}` },
1376
});
1377
1378
const stats = async () => {
1379
return { [this.id]: this.stats };
1380
};
1381
const usage = async () => {
1382
return { [this.id]: this.usage.stats() };
1383
};
1384
// user has to explicitly refresh their browser after
1385
// being disconnected this way:
1386
const disconnect = async (ids: string | string[]) => {
1387
if (typeof ids == "string") {
1388
ids = [ids];
1389
}
1390
for (const id of ids) {
1391
this.io.in(id).disconnectSockets();
1392
}
1393
};
1394
1395
const subject = sysApiSubject({ clusterName: this.clusterName });
1396
// services that ALL servers answer, i.e., a single request gets
1397
// answers from all members of the cluster.
1398
await client.service<SysConatServer>(
1399
subject,
1400
{
1401
stats,
1402
usage,
1403
disconnect,
1404
join: () => {
1405
throw Error("wrong service");
1406
},
1407
unjoin: () => {
1408
throw Error("wrong service");
1409
},
1410
clusterTopology: () => {
1411
throw Error("wrong service");
1412
},
1413
clusterAddresses: () => {
1414
throw Error("wrong service");
1415
},
1416
},
1417
{ queue: `${this.clusterName}-${this.id}` },
1418
);
1419
this.log(`successfully started ${subject} service`);
1420
1421
const subject2 = sysApiSubject({
1422
clusterName: this.clusterName,
1423
id: this.id,
1424
});
1425
1426
await client.service<SysConatServer>(subject2, {
1427
stats,
1428
usage,
1429
// user has to explicitly refresh there browser after
1430
// being disconnected this way
1431
disconnect,
1432
join: async (address: string) => {
1433
await this.join(address);
1434
},
1435
unjoin: async (opts: { clusterName?: string; id: string }) => {
1436
await this.unjoin(opts);
1437
},
1438
1439
clusterTopology: async (): Promise<{
1440
// map from id to address
1441
[clusterName: string]: { [id: string]: string };
1442
}> => this.clusterTopology(),
1443
1444
// addresses of all nodes in the (super-)cluster
1445
clusterAddresses: async (clusterName?: string): Promise<string[]> =>
1446
this.clusterAddresses(clusterName),
1447
});
1448
this.log(`successfully started ${subject2} service`);
1449
};
1450
1451
clusterAddresses = (clusterName?: string) => {
1452
const v: string[] = [];
1453
if (!clusterName) {
1454
v.push(this.address());
1455
for (const addr in this.clusterLinksByAddress) {
1456
const link = this.clusterLinksByAddress[addr];
1457
if (link.isConnected()) {
1458
v.push(addr);
1459
}
1460
}
1461
return v;
1462
}
1463
if (clusterName == this.clusterName) {
1464
v.push(this.address());
1465
}
1466
for (const address in this.clusterLinksByAddress) {
1467
if (
1468
this.clusterLinksByAddress[address].isConnected() &&
1469
this.clusterLinksByAddress[address].clusterName == clusterName
1470
) {
1471
v.push(address);
1472
}
1473
}
1474
return v;
1475
};
1476
1477
clusterTopology = (): {
1478
// map from id to address
1479
[clusterName: string]: { [id: string]: string };
1480
} => {
1481
if (!this.clusterName || !this.id) {
1482
throw Error("not in cluster mode");
1483
}
1484
const addresses: { [clusterName: string]: { [id: string]: string } } = {};
1485
for (const clusterName in this.clusterLinks) {
1486
addresses[clusterName] = {};
1487
const C = this.clusterLinks[clusterName];
1488
for (const id in C) {
1489
addresses[clusterName][id] = C[id].address;
1490
}
1491
}
1492
if (addresses[this.clusterName] == null) {
1493
addresses[this.clusterName] = {};
1494
}
1495
addresses[this.clusterName][this.id] = this.address();
1496
return addresses;
1497
};
1498
1499
// scan via any nodes we're connected to for other known nodes in the cluster
1500
// that we're NOT connected to. If every node does this periodically (and the
1501
// cluster isn't constantly changing, then each component of the digraph
1502
// will eventually be a complete graph. In particular, this function returns
1503
// the number of links created (count), so if it returns 0 when called on all nodes, then
1504
// we're done until new nodes are added.
1505
scan = reuseInFlight(async (): Promise<{ count: number; errors: any[] }> => {
1506
if (this.isClosed()) {
1507
return { count: 0, errors: [] };
1508
}
1509
const knownByUs = new Set(this.clusterAddresses(this.clusterName));
1510
const unknownToUs = new Set<string>([]);
1511
const errors: { err: any; desc: string }[] = [];
1512
1513
// in parallel, we use the sys api to call all nodes we know about
1514
// and ask them "heh, what nodes in this cluster do *YOU* know about"?
1515
// if any come back that we don't know about, we add them to unknownToUs.
1516
let count = 0;
1517
1518
const f = async (client) => {
1519
try {
1520
const sys = sysApi(client);
1521
const knownByRemoteNode = new Set(
1522
await sys.clusterAddresses(this.clusterName),
1523
);
1524
if (this.isClosed()) return;
1525
logger.debug(
1526
"scan: remote",
1527
client.options.address,
1528
"knows about ",
1529
knownByRemoteNode,
1530
);
1531
for (const address of knownByRemoteNode) {
1532
if (!knownByUs.has(address)) {
1533
unknownToUs.add(address);
1534
}
1535
}
1536
if (!knownByRemoteNode.has(this.address())) {
1537
// we know about them, but they don't know about us, so ask them to link to us.
1538
logger.debug(
1539
"scan: asking remote ",
1540
client.options.address,
1541
" to link to us",
1542
);
1543
await sys.join(this.address());
1544
if (this.isClosed()) return;
1545
count += 1;
1546
}
1547
} catch (err) {
1548
errors.push({
1549
err,
1550
desc: `requesting remote ${client.options.address} join us`,
1551
});
1552
}
1553
};
1554
1555
if (!this.clusterName) {
1556
throw Error("if cluster is enabled, then the clusterName must be set");
1557
}
1558
1559
await Promise.all(
1560
Object.values(this.clusterLinks[this.clusterName] ?? {})
1561
.filter((link) => {
1562
if (link.isConnected()) {
1563
return true;
1564
} else {
1565
if (
1566
link.howLongDisconnected() >=
1567
(this.options.forgetClusterNodeInterval ??
1568
DEFAULT_FORGET_CLUSTER_NODE_INTERVAL)
1569
) {
1570
// forget about this link
1571
this.unjoin(link);
1572
}
1573
}
1574
})
1575
.map((link) => f(link.client)),
1576
);
1577
if (unknownToUs.size == 0 || this.isClosed()) {
1578
return { count, errors };
1579
}
1580
1581
// Now (in parallel), join with all unknownToUs nodes.
1582
const g = async (address: string) => {
1583
try {
1584
await this.join(address);
1585
count += 1;
1586
} catch (err) {
1587
errors.push({ err, desc: `joining to ${address}` });
1588
}
1589
};
1590
const v = Array.from(unknownToUs).map(g);
1591
1592
await Promise.all(v);
1593
1594
return { count, errors };
1595
});
1596
1597
private waitForInterest = async (
1598
subject: string,
1599
timeout: number,
1600
socketId: string,
1601
signal?: AbortSignal,
1602
): Promise<boolean> => {
1603
if (!this.cluster) {
1604
// not a cluster
1605
return await this.waitForInterestOnThisNode(
1606
subject,
1607
timeout,
1608
socketId,
1609
signal,
1610
);
1611
}
1612
// check if there is already known interest
1613
const links = this.superclusterLinks();
1614
for (const link of links) {
1615
if (link.hasInterest(subject)) {
1616
return true;
1617
}
1618
}
1619
1620
// wait for interest in any node on any cluster
1621
return await this.waitForInterestInLinks(
1622
subject,
1623
timeout,
1624
socketId,
1625
signal,
1626
links,
1627
);
1628
};
1629
1630
private superclusterLinks = (): ClusterLink[] => {
1631
let links: ClusterLink[] = [];
1632
for (const clusterName in this.clusterLinks) {
1633
links = links.concat(Object.values(this.clusterLinks[clusterName]));
1634
}
1635
return links;
1636
};
1637
1638
private waitForInterestInLinks = async (
1639
subject,
1640
timeout,
1641
socketId,
1642
signal,
1643
links: ClusterLink[],
1644
): Promise<boolean> => {
1645
const v: any[] = [];
1646
// we use AbortController etc below so we can cancel waiting once
1647
// we get any interest.
1648
const nothrow = async (f) => {
1649
try {
1650
return await f;
1651
} catch {}
1652
return false;
1653
};
1654
const controller = new AbortController();
1655
const signal2 = controller.signal;
1656
v.push(
1657
nothrow(
1658
this.waitForInterestOnThisNode(subject, timeout, socketId, signal2),
1659
),
1660
);
1661
for (const link of links) {
1662
v.push(nothrow(link.waitForInterest(subject, timeout, signal2)));
1663
}
1664
if (!timeout) {
1665
// with timeout=0 they all immediately answer (so no need to worry about abort/promise)
1666
const w = await Promise.all(v);
1667
for (const x of w) {
1668
if (x) {
1669
return true;
1670
}
1671
}
1672
return false;
1673
}
1674
1675
signal?.addEventListener("abort", () => {
1676
controller.abort();
1677
});
1678
const w = await Promise.race(v);
1679
// cancel all the others.
1680
controller.abort();
1681
return w;
1682
};
1683
1684
private waitForInterestOnThisNode = async (
1685
subject: string,
1686
timeout: number,
1687
socketId: string,
1688
signal?: AbortSignal,
1689
) => {
1690
const matches = this.interest.matches(subject);
1691
if (matches.length > 0 || !timeout) {
1692
// NOTE: we never return the actual matches, since this is a
1693
// potential security vulnerability.
1694
// it could make it very easy to figure out private inboxes, etc.
1695
return matches.length > 0;
1696
}
1697
if (timeout > MAX_INTEREST_TIMEOUT) {
1698
timeout = MAX_INTEREST_TIMEOUT;
1699
}
1700
const start = Date.now();
1701
while (!this.isClosed() && this.sockets[socketId] && !signal?.aborted) {
1702
if (Date.now() - start >= timeout) {
1703
throw Error("timeout");
1704
}
1705
try {
1706
// if signal is set only wait for the change for up to 1 second.
1707
await once(this.interest, "change", signal != null ? 1000 : undefined);
1708
} catch {
1709
continue;
1710
}
1711
if (this.isClosed() || !this.sockets[socketId] || signal?.aborted) {
1712
return false;
1713
}
1714
const hasMatch = this.interest.hasMatch(subject);
1715
if (hasMatch) {
1716
return true;
1717
}
1718
}
1719
return false;
1720
};
1721
1722
hash = (): { interest: number; sticky: number } => {
1723
return {
1724
interest: hashInterest(this.interest),
1725
sticky: hashSticky(this.sticky),
1726
};
1727
};
1728
}
1729
1730
function getSubjectFromRoom(room: string) {
1731
if (room.startsWith("{")) {
1732
return JSON.parse(room).subject;
1733
} else {
1734
return room;
1735
}
1736
}
1737
1738
function socketSubjectRoom({ socket, subject }) {
1739
return JSON.stringify({ id: socket.id, subject });
1740
}
1741
1742
export function randomChoice(v: Set<string>): string {
1743
if (v.size == 0) {
1744
throw Error("v must have size at least 1");
1745
}
1746
if (v.size == 1) {
1747
for (const x of v) {
1748
return x;
1749
}
1750
}
1751
const w = Array.from(v);
1752
const i = Math.floor(Math.random() * w.length);
1753
return w[i];
1754
}
1755
1756
// See https://socket.io/how-to/get-the-ip-address-of-the-client
1757
function getAddress(socket) {
1758
const header = socket.handshake.headers["forwarded"];
1759
if (header) {
1760
for (const directive of header.split(",")[0].split(";")) {
1761
if (directive.startsWith("for=")) {
1762
return directive.substring(4);
1763
}
1764
}
1765
}
1766
1767
let addr = socket.handshake.headers["x-forwarded-for"]?.split(",")?.[0];
1768
if (addr) {
1769
return addr;
1770
}
1771
for (const other of ["cf-connecting-ip", "fastly-client-ip"]) {
1772
addr = socket.handshake.headers[other];
1773
if (addr) {
1774
return addr;
1775
}
1776
}
1777
1778
return socket.handshake.address;
1779
}
1780
1781
export function updateInterest(
1782
update: InterestUpdate,
1783
interest: Interest,
1784
sticky: Sticky,
1785
) {
1786
const { op, subject, queue, room } = update;
1787
const groups = interest.get(subject);
1788
if (op == "add") {
1789
if (typeof queue != "string") {
1790
throw Error("queue must not be null for add");
1791
}
1792
if (groups === undefined) {
1793
interest.set(subject, { [queue]: new Set([room]) });
1794
} else if (groups[queue] == null) {
1795
groups[queue] = new Set([room]);
1796
} else {
1797
groups[queue].add(room);
1798
}
1799
} else if (op == "delete") {
1800
if (groups != null) {
1801
let nonempty = false;
1802
for (const queue in groups) {
1803
groups[queue].delete(room);
1804
if (groups[queue].size == 0) {
1805
delete groups[queue];
1806
} else {
1807
nonempty = true;
1808
}
1809
}
1810
if (!nonempty) {
1811
// no interest anymore
1812
interest.delete(subject);
1813
delete sticky[subject];
1814
}
1815
}
1816
} else {
1817
throw Error(`invalid op ${op}`);
1818
}
1819
}
1820
1821
// returns true if this update actually causes a change to sticky
1822
export function updateSticky(update: StickyUpdate, sticky: Sticky): boolean {
1823
const { pattern, subject, target } = update;
1824
if (sticky[pattern] === undefined) {
1825
sticky[pattern] = {};
1826
}
1827
if (sticky[pattern][subject] == target) {
1828
return false;
1829
}
1830
sticky[pattern][subject] = target;
1831
return true;
1832
}
1833
1834
function getServerAddress(options: Options) {
1835
const port = options.port;
1836
const path = options.path?.slice(0, -"/conat".length) ?? "";
1837
return `http${options.ssl || port == 443 ? "s" : ""}://${
1838
options.clusterIpAddress ?? "localhost"
1839
}:${port}${path}`;
1840
}
1841
1842
/*
1843
Without this, if an admin subscribed to '>' then suddenly all the algorithms
1844
for responding to messages, sockets, etc., based on "waiting for interest"
1845
would start failing. The following is a hack to allow subscribing to '>'.
1846
Really we need something more general for other admin "wire taps", but
1847
this will have to do for now.
1848
*/
1849
function isSilentPattern(pattern: string): boolean {
1850
return pattern == ">";
1851
}
1852
1853
/*
1854
const watching = new Set(["xyz"]);
1855
let last = Date.now();
1856
function watch(action, { subject, data, id, from }) {
1857
for (const x of watching) {
1858
if (subject.includes(x)) {
1859
console.log(Date.now() - last, new Date(), action, id, {
1860
subject,
1861
data,
1862
from,
1863
});
1864
last = Date.now();
1865
if (data[5]?.["CN-Reply"]) {
1866
watching.add(data[5]["CN-Reply"]);
1867
}
1868
}
1869
}
1870
}
1871
function trace(subject, ...args) {
1872
for (const x of watching) {
1873
if (subject.includes(x)) {
1874
console.log(Date.now() - last, new Date(), subject, ...args);
1875
last = Date.now();
1876
}
1877
}
1878
}
1879
*/
1880
1881