Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/core/server.ts
5796 views
1
/*
2
3
Just try it out, start up node.js in this directory and:
4
5
s = require('@cocalc/conat/core/server').init({port:4567, getUser:()=>{return {hub_id:'hub'}}})
6
c = s.client();
7
c.watch('foo')
8
c2 = s.client();
9
c2.pub('foo', 'bar')
10
11
To connect from another terminal:
12
13
c = require('@cocalc/conat/core/client').connect({address:"http://localhost:4567"})
14
c.state
15
// 'connected'
16
17
18
cd packages/server
19
20
21
s = await require('@cocalc/server/conat/socketio').initConatServer()
22
23
s0 = await require('@cocalc/server/conat/socketio').initConatServer({port:3000}); 0
24
25
26
---
27
28
*/
29
30
import type { ConnectionStats, ServerInfo } from "./types";
31
32
import { delay } from "awaiting";
33
import { EventEmitter } from "events";
34
import { throttle } from "lodash";
35
import { Server } from "socket.io";
36
37
import { getClientIpAddress } from "@cocalc/util/get-client-ip-address";
38
import { getLogger } from "@cocalc/conat/client";
39
import { UsageMonitor } from "@cocalc/conat/monitor/usage";
40
import { type ConatSocketServer } from "@cocalc/conat/socket";
41
import {
42
isValidSubject,
43
isValidSubjectWithoutWildcards,
44
} from "@cocalc/conat/util";
45
import { once, until } from "@cocalc/util/async-utils";
46
import { is_array } from "@cocalc/util/misc";
47
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
48
import { Metrics } from "../types";
49
import {
50
Client,
51
type ClientOptions,
52
ConatError,
53
connect,
54
MAX_INTEREST_TIMEOUT,
55
} from "./client";
56
import {
57
clusterLink,
58
type ClusterLink,
59
clusterStreams,
60
type ClusterStreams,
61
createClusterPersistServer,
62
trimClusterStreams,
63
Interest,
64
hashInterest,
65
} from "./cluster";
66
import {
67
MAX_CONNECTIONS,
68
MAX_CONNECTIONS_PER_USER,
69
MAX_PAYLOAD,
70
MAX_SUBSCRIPTIONS_PER_CLIENT,
71
MAX_SUBSCRIPTIONS_PER_HUB,
72
RESOURCE,
73
} from "./constants";
74
import { Patterns } from "./patterns";
75
import { forkedConatServer } from "./start-server";
76
import { sysApi, sysApiSubject, type SysConatServer } from "./sys";
77
78
const logger = getLogger("conat:core:server");
79
80
const DEFAULT_AUTOSCAN_INTERVAL = 15_000;
81
const DEFAULT_LONG_AUTOSCAN_INTERVAL = 60_000;
82
83
// If a cluster node has been disconnected for this long,
84
// unjoin it, thus freeing the streams tracking its state (so memory),
85
// If it comes back it will have to explicitly
86
// join the cluster again. This is primarily to not leak RAM
87
// when nodes are removed on purpose, and also avoid a cluttered log
88
// of constant reconnect errors. Supercluster nodes are never
89
// automatically forgotten.
90
const DEFAULT_FORGET_CLUSTER_NODE_INTERVAL = 30 * 60_000; // 30 minutes by default
91
92
const DEBUG = false;
93
94
export interface InterestUpdate {
95
op: "add" | "delete";
96
subject: string;
97
queue?: string;
98
room: string;
99
}
100
101
interface Update {
102
interest?: InterestUpdate;
103
}
104
105
export function init(opts: Options) {
106
return new ConatServer(opts);
107
}
108
109
export type UserFunction = (
110
socket,
111
systemAccounts?: { [cookieName: string]: { password: string; user: any } },
112
) => Promise<any>;
113
114
export type AllowFunction = (opts: {
115
type: "pub" | "sub";
116
user: any;
117
subject: string;
118
}) => Promise<boolean>;
119
120
export interface Options {
121
id?: string;
122
httpServer?;
123
port?: number;
124
path?: string;
125
getUser?: UserFunction;
126
isAllowed?: AllowFunction;
127
maxSubscriptionsPerClient?: number;
128
maxSubscriptionsPerHub?: number;
129
systemAccountPassword?: string;
130
// if true, use https when creating an internal client.
131
ssl?: boolean;
132
133
// WARNING: **superclusters are NOT fully iplemented yet.**
134
//
135
// if clusterName is set, enable clustering. Each node
136
// in the cluster must have a different name. systemAccountPassword
137
// must also be set. This only has an impact when the id is '0'.
138
// This publishes interest state in a stream, so uses more resources.
139
clusterName?: string;
140
141
// autoscanInterval = shortest interval when cluster will automatically
142
// be scanned for new nodes. It may be longer if there's no activity.
143
// Set to 0 to disable autoscan, which is very useful for unit tests.
144
// Defaults to 10_000 = 10 seconds.
145
autoscanInterval?: number;
146
longAutoscanInterval?: number;
147
forgetClusterNodeInterval?: number;
148
149
// if localClusterSize >=2 (and clusterName, etc. configured above),
150
// creates a local cluster by spawning child processes with
151
// ports the above port +1, +2, etc.
152
localClusterSize?: number;
153
154
// the ip address of this server on the cluster.
155
clusterIpAddress?: string;
156
}
157
158
type State = "init" | "ready" | "closed";
159
160
export class ConatServer extends EventEmitter {
161
public readonly io;
162
public readonly id: string;
163
164
private getUser: UserFunction;
165
private isAllowed: AllowFunction;
166
public readonly options: Partial<Options>;
167
private cluster?: boolean;
168
169
private sockets: { [id: string]: any } = {};
170
private stats: { [id: string]: ConnectionStats } = {};
171
private usage: UsageMonitor;
172
public state: State = "init";
173
174
private subscriptions: { [socketId: string]: Set<string> } = {};
175
public interest: Interest = new Patterns();
176
177
private clusterStreams?: ClusterStreams;
178
private clusterLinks: {
179
[clusterName: string]: { [id: string]: ClusterLink };
180
} = {};
181
private clusterLinksByAddress: { [address: string]: ClusterLink } = {};
182
private clusterPersistServer?: ConatSocketServer;
183
public readonly clusterName?: string;
184
private queuedClusterUpdates: Update[] = [];
185
186
constructor(options: Options) {
187
super();
188
const {
189
httpServer,
190
port = 3000,
191
ssl = false,
192
id = "0",
193
path = "/conat",
194
getUser,
195
isAllowed,
196
maxSubscriptionsPerClient = MAX_SUBSCRIPTIONS_PER_CLIENT,
197
maxSubscriptionsPerHub = MAX_SUBSCRIPTIONS_PER_HUB,
198
systemAccountPassword,
199
clusterName,
200
autoscanInterval = DEFAULT_AUTOSCAN_INTERVAL,
201
longAutoscanInterval = DEFAULT_LONG_AUTOSCAN_INTERVAL,
202
forgetClusterNodeInterval = DEFAULT_FORGET_CLUSTER_NODE_INTERVAL,
203
localClusterSize = 1,
204
clusterIpAddress,
205
} = options;
206
this.clusterName = clusterName;
207
this.options = {
208
port,
209
ssl,
210
id,
211
path,
212
maxSubscriptionsPerClient,
213
maxSubscriptionsPerHub,
214
systemAccountPassword,
215
clusterName,
216
autoscanInterval,
217
longAutoscanInterval,
218
forgetClusterNodeInterval,
219
localClusterSize,
220
clusterIpAddress,
221
};
222
this.cluster = !!id && !!clusterName;
223
this.getUser = async (socket) => {
224
if (getUser == null) {
225
// no auth at all
226
return null;
227
} else {
228
let systemAccounts;
229
if (this.options.systemAccountPassword) {
230
systemAccounts = {
231
sys: {
232
password: this.options.systemAccountPassword,
233
user: { hub_id: "system" },
234
},
235
};
236
} else {
237
systemAccounts = undefined;
238
}
239
return await getUser(socket, systemAccounts);
240
}
241
};
242
this.isAllowed = isAllowed ?? (async () => true);
243
this.id = id;
244
this.log("Starting Conat server...", {
245
id,
246
path,
247
port: this.options.port,
248
httpServer: httpServer ? "httpServer(...)" : undefined,
249
});
250
251
// NOTE: do NOT enable connectionStateRecovery; it seems to cause issues
252
// when restarting the server.
253
let adapter: any = undefined;
254
255
const socketioOptions = {
256
maxHttpBufferSize: MAX_PAYLOAD,
257
path,
258
adapter,
259
// perMessageDeflate is disabled by default in socket.io, but it
260
// seems unclear exactly *why*:
261
// https://github.com/socketio/socket.io/issues/3477#issuecomment-930503313
262
perMessageDeflate: { threshold: 1024 },
263
};
264
this.log(socketioOptions);
265
if (httpServer) {
266
this.io = new Server(httpServer, socketioOptions);
267
} else {
268
this.io = new Server(port, socketioOptions);
269
this.log(`listening on port ${port}`);
270
}
271
this.initUsage();
272
this.io.on("connection", this.handleSocket);
273
this.init();
274
}
275
276
private setState = (state: State) => {
277
if (this.state == state) return;
278
this.emit(state);
279
this.state = state;
280
};
281
282
private isClosed = () => this.state == "closed";
283
284
private init = async () => {
285
if (this.options.systemAccountPassword) {
286
await this.initSystemService();
287
}
288
if (this.clusterName) {
289
await this.initCluster();
290
}
291
this.setState("ready");
292
};
293
294
private initUsage = () => {
295
this.usage = new UsageMonitor({
296
maxPerUser: MAX_CONNECTIONS_PER_USER,
297
max: MAX_CONNECTIONS,
298
resource: RESOURCE,
299
log: (...args) => this.log("usage", ...args),
300
});
301
};
302
303
public getUsage = (): Metrics => {
304
return this.usage.getMetrics();
305
};
306
307
// this is for the Kubernetes health check -- I haven't
308
// thought at all about what to do here, really.
309
// Hopefully experience can teach us.
310
isHealthy = () => {
311
if (this.isClosed()) {
312
return false;
313
}
314
return true;
315
};
316
317
close = async () => {
318
if (this.isClosed()) {
319
return;
320
}
321
this.setState("closed");
322
323
if (this.clusterStreams != null) {
324
for (const name in this.clusterStreams) {
325
this.clusterStreams[name].close();
326
}
327
delete this.clusterStreams;
328
}
329
for (const clusterName in this.clusterLinks) {
330
const link = this.clusterLinks[clusterName];
331
for (const id in link) {
332
link[id].close();
333
delete link[id];
334
}
335
delete this.clusterLinks[clusterName];
336
}
337
for (const address in this.clusterLinksByAddress) {
338
delete this.clusterLinksByAddress[address];
339
}
340
this.clusterPersistServer?.close();
341
delete this.clusterPersistServer;
342
343
await delay(100);
344
await this.io.close();
345
for (const prop of ["interest", "subscriptions", "sockets", "services"]) {
346
delete this[prop];
347
}
348
this.usage?.close();
349
this.interest?.close();
350
this.subscriptions = {};
351
this.stats = {};
352
this.sockets = {};
353
};
354
355
private info = (): ServerInfo => {
356
return {
357
max_payload: MAX_PAYLOAD,
358
id: this.id,
359
clusterName: this.clusterName,
360
};
361
};
362
363
private log = (...args) => {
364
logger.debug("id", this.id, ":", ...args);
365
};
366
367
private unsubscribe = async ({ socket, subject }) => {
368
if (DEBUG) {
369
this.log("unsubscribe ", { id: socket.id, subject });
370
}
371
const room = socketSubjectRoom({ socket, subject });
372
socket.leave(room);
373
await this.updateInterest({ op: "delete", subject, room });
374
};
375
376
////////////////////////////////////
377
// CLUSTER STREAM
378
////////////////////////////////////
379
380
private publishUpdate = (update: Update) => {
381
if (this.clusterStreams == null) {
382
throw Error("streams must be initialized");
383
}
384
const { interest } = update;
385
if (interest !== undefined) {
386
this.clusterStreams.interest.publish(interest);
387
}
388
this.trimClusterStream();
389
};
390
391
private updateClusterStream = (update: Update) => {
392
if (!this.clusterName) return;
393
394
if (this.clusterStreams !== undefined) {
395
this.publishUpdate(update);
396
} else {
397
this.queuedClusterUpdates.push(update);
398
}
399
};
400
401
// throttled because could be expensive -- once a minute it trims
402
// operations that are definitely no longer relevant and are at least
403
// several minutes old. These are ops involving a pattern where
404
// there is no interest in that pattern.
405
private trimClusterStream = throttle(
406
async () => {
407
if (this.clusterStreams !== undefined && this.interest !== undefined) {
408
await trimClusterStreams(
409
this.clusterStreams,
410
{
411
interest: this.interest,
412
links: Object.values(
413
this.clusterLinks?.[this.clusterName ?? ""] ?? {},
414
),
415
},
416
15 * 60000,
417
);
418
}
419
},
420
60000,
421
{ leading: false, trailing: true },
422
);
423
424
///////////////////////////////////////
425
// INTEREST - PATTERNS USERS ARE SUBSCRIBED TO
426
///////////////////////////////////////
427
428
private updateInterest = async (interest: InterestUpdate) => {
429
if (this.isClosed()) return;
430
// publish to the stream
431
this.updateClusterStream({ interest });
432
// update our local state
433
updateInterest(interest, this.interest);
434
};
435
436
///////////////////////////////////////
437
// SUBSCRIBE and PUBLISH
438
///////////////////////////////////////
439
440
private subscribe = async ({ socket, subject, queue, user }) => {
441
if (DEBUG) {
442
this.log("subscribe ", { id: socket.id, subject, queue });
443
}
444
if (typeof queue != "string") {
445
throw Error("queue must be defined");
446
}
447
if (!isValidSubject(subject)) {
448
throw Error("invalid subject");
449
return;
450
}
451
if (!(await this.isAllowed({ user, subject, type: "sub" }))) {
452
const message = `permission denied subscribing to '${subject}' from ${JSON.stringify(
453
user,
454
)}`;
455
this.log(message);
456
throw new ConatError(message, {
457
code: 403,
458
});
459
}
460
let maxSubs;
461
if (user?.hub_id) {
462
maxSubs =
463
this.options.maxSubscriptionsPerHub ?? MAX_SUBSCRIPTIONS_PER_HUB;
464
} else {
465
maxSubs =
466
this.options.maxSubscriptionsPerClient ?? MAX_SUBSCRIPTIONS_PER_CLIENT;
467
}
468
if (maxSubs) {
469
const numSubs = this.subscriptions?.[socket.id]?.size ?? 0;
470
if (numSubs >= maxSubs) {
471
// error 429 == "too many requests"
472
throw new ConatError(
473
`there is a limit of at most ${maxSubs} subscriptions and you currently have ${numSubs} subscriptions -- subscription to '${subject}' denied`,
474
{ code: 429 },
475
);
476
}
477
}
478
const room = socketSubjectRoom({ socket, subject });
479
// critical to await socket.join so we don't advertise that there is
480
// a subscriber before the socket is actually getting messages.
481
await socket.join(room);
482
await this.updateInterest({ op: "add", subject, room, queue });
483
};
484
485
// get all interest in this subject across the cluster (NOT supercluster)
486
// This is a map from node id to array of patterns.
487
private clusterInterest = (subject: string) => {
488
const X: {
489
[pattern: string]: { [queue: string]: { [nodeId: string]: Set<string> } };
490
} = {};
491
for (const pattern of this.interest.matches(subject)) {
492
X[pattern] = {};
493
const g = this.interest.get(pattern)!;
494
for (const queue in g) {
495
X[pattern][queue] = { [this.id]: g[queue] };
496
}
497
}
498
if (this.clusterName == null) {
499
return X;
500
}
501
const thisCluster = this.clusterLinks[this.clusterName];
502
if (thisCluster == null) {
503
return X;
504
}
505
for (const id in thisCluster) {
506
const link = thisCluster[id];
507
if (!link.isConnected()) {
508
continue;
509
}
510
for (const pattern of link.interest.matches(subject)) {
511
if (X[pattern] === undefined) {
512
X[pattern] = {};
513
}
514
const g = link.interest.get(pattern)!;
515
for (const queue in g) {
516
if (X[pattern][queue] === undefined) {
517
X[pattern][queue] = { [id]: g[queue] };
518
} else {
519
X[pattern][queue][id] = g[queue];
520
}
521
}
522
}
523
}
524
return X;
525
};
526
527
private deliver = ({
528
subject,
529
data,
530
targets,
531
}: {
532
subject: string;
533
data: any;
534
targets: { pattern: string; target: string }[];
535
}): number => {
536
// Deliver the messages to these targets, which should all be
537
// connected to this server. This is used for cluster routing only.
538
for (const { pattern, target } of targets) {
539
this.io.to(target).emit(pattern, { subject, data });
540
}
541
return targets.length;
542
};
543
544
private publish = async ({
545
subject,
546
data,
547
from,
548
}: {
549
subject: string;
550
data: any;
551
from: any;
552
}): Promise<number> => {
553
if (!isValidSubjectWithoutWildcards(subject)) {
554
throw Error("invalid subject");
555
}
556
557
if (!(await this.isAllowed({ user: from, subject, type: "pub" }))) {
558
const message = `permission denied publishing to '${subject}' from ${JSON.stringify(
559
from,
560
)}`;
561
this.log(message);
562
throw new ConatError(message, {
563
// this is the http code for permission denied, and having this
564
// set is assumed elsewhere in our code, so don't mess with it!
565
code: 403,
566
});
567
}
568
569
// note -- position 6 of data is special cluster delivery data, to avoid
570
// a message bouncing back and forth in case the interest stream
571
// were slightly out of sync and also so we no exactly where
572
// to deliver the message.
573
const targets = data[6];
574
if (targets != null) {
575
return this.deliver({ subject, data, targets });
576
}
577
578
if (!this.cluster) {
579
// Simpler non-cluster (or no forward) case. We ONLY have to
580
// consider data about this server, and no other nodes.
581
let count = 0;
582
for (const pattern of this.interest.matches(subject)) {
583
const g = this.interest.get(pattern)!;
584
if (DEBUG) {
585
this.log("publishing", { subject, data, g });
586
}
587
// send to exactly one in each queue group
588
for (const queue in g) {
589
const target = this.loadBalance({
590
targets: g[queue],
591
});
592
if (target !== undefined) {
593
this.io.to(target).emit(pattern, { subject, data });
594
if (!isSilentPattern(pattern)) {
595
count++;
596
}
597
}
598
}
599
}
600
return count;
601
}
602
603
// More complicated cluster case, where we have to consider the
604
// entire cluster, or possibly the supercluster.
605
let count = 0;
606
const outsideTargets: {
607
[id: string]: { pattern: string; target: string }[];
608
} = {};
609
610
// const queueGroups: { [pattern: string]: Set<string> } = {};
611
const clusterInterest = this.clusterInterest(subject);
612
for (const pattern in clusterInterest) {
613
const g = clusterInterest[pattern];
614
for (const queue in g) {
615
const t = this.clusterLoadBalance({
616
targets: g[queue],
617
});
618
if (t !== undefined) {
619
const { id, target } = t;
620
if (id == this.id) {
621
// another client of this server
622
this.io.to(target).emit(pattern, { subject, data });
623
if (!isSilentPattern(pattern)) {
624
count++;
625
}
626
} else {
627
// client connected to a different server -- we note this, and
628
// will send everything for each server at once, instead of
629
// potentially sending the same message multiple times for
630
// different patterns.
631
if (outsideTargets[id] == null) {
632
outsideTargets[id] = [{ pattern, target }];
633
} else {
634
outsideTargets[id].push({ pattern, target });
635
}
636
}
637
}
638
}
639
}
640
641
if (!this.clusterName) {
642
throw Error("clusterName must be set");
643
}
644
645
// Send the messages to the outsideTargets. We send the message
646
// along with exactly who it should be delivered to. There is of
647
// course no guarantee that a target doesn't vanish just as we are
648
// sending this...
649
for (const id in outsideTargets) {
650
const link = this.clusterLinks[this.clusterName]?.[id];
651
const data1 = [subject, ...data];
652
// use explicit index since length of data depends on
653
// whether or not there are headers!
654
data1[7] = outsideTargets[id];
655
for (const { pattern } of data1[7]) {
656
if (!isSilentPattern(pattern)) {
657
count++;
658
}
659
}
660
link?.client.conn.emit("publish", data1);
661
}
662
663
//
664
// TODO: Supercluster routing. NOT IMPLEMENTED YET
665
//
666
// // if no matches in local cluster, try the supercluster (if there is one)
667
// if (count == 0) {
668
// // nothing in this cluster, so try other clusters
669
// for (const clusterName in this.clusterLinks) {
670
// if (clusterName == this.clusterName) continue;
671
// const links = this.clusterLinks[clusterName];
672
// for (const id in links) {
673
// const link = links[id];
674
// const count2 = link.publish({ subject, data, queueGroups });
675
// if (count2 > 0) {
676
// count += count2;
677
// // once we publish to any other cluster, we are done.
678
// break;
679
// }
680
// }
681
// }
682
// }
683
684
return count;
685
};
686
687
///////////////////////////////////////
688
// WHO GETS PUBLISHED MESSAGE:
689
///////////////////////////////////////
690
private loadBalance = ({
691
targets,
692
}: {
693
targets: Set<string>;
694
}): string | undefined => {
695
if (targets.size == 0) {
696
return undefined;
697
}
698
return randomChoice(targets);
699
};
700
701
clusterLoadBalance = ({
702
targets: targets0,
703
}: {
704
targets: { [id: string]: Set<string> };
705
}): { id: string; target: string } | undefined => {
706
const targets = new Set<string>();
707
for (const id in targets0) {
708
for (const target of targets0[id]) {
709
targets.add(JSON.stringify({ id, target }));
710
}
711
}
712
const x = this.loadBalance({ targets });
713
if (!x) {
714
return undefined;
715
}
716
return JSON.parse(x);
717
};
718
719
///////////////////////////////////////
720
// MANAGING A CONNECTION FROM A CLIENT SOCKET
721
///////////////////////////////////////
722
private handleSocket = async (socket) => {
723
this.sockets[socket.id] = socket;
724
socket.once("closed", () => {
725
this.log("connection closed", socket.id);
726
delete this.sockets[socket.id];
727
delete this.stats[socket.id];
728
});
729
730
this.stats[socket.id] = {
731
send: { messages: 0, bytes: 0 },
732
recv: { messages: 0, bytes: 0 },
733
subs: 0,
734
connected: Date.now(),
735
address: getAddress(socket),
736
};
737
let user: any = null;
738
let added = false;
739
try {
740
user = await this.getUser(socket);
741
this.usage.add(user);
742
added = true;
743
} catch (err) {
744
// getUser is supposed to throw an error if authentication fails
745
// for any reason
746
// Also, if the connection limit is hit they still connect, but as
747
// the error user who can't do anything (hence not waste resources).
748
user = { error: `${err}`, code: err.code };
749
}
750
this.stats[socket.id].user = user;
751
const id = socket.id;
752
this.log("new connection", { id, user });
753
if (this.subscriptions[id] == null) {
754
this.subscriptions[id] = new Set<string>();
755
}
756
757
this.sendInfo(socket, user);
758
759
socket.on("stats", ({ recv0 }) => {
760
const s = this.stats[socket.id];
761
if (s == null) return;
762
s.recv = recv0;
763
});
764
765
socket.on(
766
"wait-for-interest",
767
async ({ subject, timeout = MAX_INTEREST_TIMEOUT }, respond) => {
768
if (respond == null) {
769
return;
770
}
771
if (!isValidSubjectWithoutWildcards(subject)) {
772
respond({ error: "invalid subject" });
773
return;
774
}
775
if (!(await this.isAllowed({ user, subject, type: "pub" }))) {
776
const message = `permission denied waiting for interest in '${subject}' from ${JSON.stringify(
777
user,
778
)}`;
779
this.log(message);
780
respond({ error: message, code: 403 });
781
}
782
try {
783
respond(await this.waitForInterest(subject, timeout, socket.id));
784
} catch (err) {
785
respond({ error: `${err}` });
786
}
787
},
788
);
789
790
socket.on("publish", async ([subject, ...data], respond) => {
791
if (data?.[2]) {
792
// done
793
this.stats[socket.id].send.messages += 1;
794
}
795
this.stats[socket.id].send.bytes += data[4]?.length ?? 0;
796
this.stats[socket.id].active = Date.now();
797
// this.log(JSON.stringify(this.stats));
798
799
try {
800
const count = await this.publish({ subject, data, from: user });
801
respond?.({ count });
802
} catch (err) {
803
// console.log(this.id, "ERROR", err);
804
if (err.code == 403) {
805
socket.emit("permission", {
806
message: err.message,
807
subject,
808
type: "pub",
809
});
810
}
811
respond?.({ error: `${err}`, code: err.code });
812
}
813
});
814
815
const subscribe = async ({ subject, queue }) => {
816
try {
817
if (this.subscriptions[id].has(subject)) {
818
return { status: "already-added" };
819
}
820
await this.subscribe({ socket, subject, queue, user });
821
this.subscriptions[id].add(subject);
822
this.stats[socket.id].subs += 1;
823
this.stats[socket.id].active = Date.now();
824
return { status: "added" };
825
} catch (err) {
826
if (err.code == 403) {
827
socket.emit("permission", {
828
message: err.message,
829
subject,
830
type: "sub",
831
});
832
}
833
return { error: `${err}`, code: err.code };
834
}
835
};
836
837
socket.on(
838
"subscribe",
839
async (x: { subject; queue } | { subject; queue }[], respond) => {
840
let r;
841
if (is_array(x)) {
842
const v: any[] = [];
843
for (const y of x) {
844
v.push(await subscribe(y));
845
}
846
r = v;
847
} else {
848
r = await subscribe(x);
849
}
850
respond?.(r);
851
},
852
);
853
854
socket.on("subscriptions", (_, respond) => {
855
if (respond == null) {
856
return;
857
}
858
respond(Array.from(this.subscriptions[id]));
859
});
860
861
const unsubscribe = ({ subject }: { subject: string }) => {
862
if (!this.subscriptions[id].has(subject)) {
863
return;
864
}
865
this.unsubscribe({ socket, subject });
866
this.subscriptions[id].delete(subject);
867
this.stats[socket.id].subs -= 1;
868
this.stats[socket.id].active = Date.now();
869
};
870
871
socket.on(
872
"unsubscribe",
873
(x: { subject: string } | { subject: string }[], respond) => {
874
let r;
875
if (is_array(x)) {
876
r = x.map(unsubscribe);
877
} else {
878
r = unsubscribe(x);
879
}
880
respond?.(r);
881
},
882
);
883
884
socket.on("cluster", (respond) => {
885
respond?.(this.clusterAddresses(this.clusterName));
886
});
887
888
socket.on("disconnecting", async () => {
889
this.log("disconnecting", { id, user });
890
delete this.stats[socket.id];
891
if (added) {
892
this.usage.delete(user);
893
}
894
const rooms = Array.from(socket.rooms) as string[];
895
for (const room of rooms) {
896
const subject = getSubjectFromRoom(room);
897
this.unsubscribe({ socket, subject });
898
}
899
delete this.subscriptions[id];
900
});
901
};
902
903
sendInfo = async (socket, user) => {
904
// we send info with an ack because I think sometimes the initial "info"
905
// message gets dropped, leaving a broken hanging connection that never
906
// does anything (until the user explicitly refreshes their browser).
907
// I did see what is probably this in production frequently.
908
try {
909
await until(
910
async () => {
911
if (!socket.conn?.readyState.startsWith("o")) {
912
// logger.debug(`failed to send "info" message to ${socket.id}`);
913
// readyState not defined or not opened or opening, so connection must
914
// have been closed before success.
915
return true;
916
}
917
try {
918
await socket
919
.timeout(7500)
920
.emitWithAck("info", { ...this.info(), user });
921
return true;
922
} catch (err) {
923
// logger.debug(`error sending "info" message to ${socket.id}`, err);
924
return false;
925
}
926
},
927
{ min: 5000, max: 30000, timeout: 120_000 },
928
);
929
} catch {
930
// never ack'd "info" after a few minutes -- could just be an old client,
931
// so don't do anything at this point.
932
}
933
};
934
935
address = () => getServerAddress(this.options);
936
937
// create new client in the same process connected to this server.
938
// This is especially useful for unit testing and is not cached
939
// by default (i.e., multiple calls return distinct clients).
940
client = (options?: ClientOptions): Client => {
941
const address = this.address();
942
this.log("client: connecting to - ", { address });
943
return connect({
944
address,
945
noCache: true,
946
...options,
947
});
948
};
949
950
private initCluster = async () => {
951
if (!this.cluster) {
952
return;
953
}
954
if (!this.id) {
955
throw Error("if cluster is enabled, then the id must be set");
956
}
957
if (!this.clusterName) {
958
throw Error("if cluster is enabled, then the clusterName must be set");
959
}
960
if (!this.options.systemAccountPassword) {
961
throw Error("cluster must have systemAccountPassword set");
962
}
963
964
this.log("enabling cluster support", {
965
id: this.id,
966
clusterName: this.clusterName,
967
});
968
const client = this.client({
969
systemAccountPassword: this.options.systemAccountPassword,
970
});
971
await client.waitUntilSignedIn();
972
// What this does:
973
// - Start a persist server that runs in same process but is just for
974
// use for coordinator cluster nodes.
975
// - Publish interest updates to a dstream.
976
// - Route messages from another cluster to subscribers in this cluster.
977
978
this.log("creating persist server");
979
this.clusterPersistServer = await createClusterPersistServer({
980
client,
981
id: this.id,
982
clusterName: this.clusterName,
983
});
984
this.log("creating cluster streams");
985
this.clusterStreams = await clusterStreams({
986
client,
987
id: this.id,
988
clusterName: this.clusterName,
989
});
990
// add in everything so far in interest (TODO)
991
if (this.queuedClusterUpdates.length > 0) {
992
this.queuedClusterUpdates.map(this.publishUpdate);
993
this.queuedClusterUpdates.length = 0;
994
}
995
this.initAutoscan();
996
await this.initClusterNodes();
997
this.log("cluster successfully initialized");
998
};
999
1000
private initClusterNodes = async () => {
1001
const localClusterSize = this.options.localClusterSize ?? 1;
1002
if (localClusterSize < 2) {
1003
return;
1004
}
1005
// spawn additional servers as separate processes to form a cluster
1006
const port = this.options.port;
1007
if (!port) {
1008
throw Error("bug -- port must be set");
1009
}
1010
const f = async (i: number) => {
1011
const opts = {
1012
path: this.options.path,
1013
ssl: this.options.ssl,
1014
systemAccountPassword: this.options.systemAccountPassword,
1015
clusterName: this.options.clusterName,
1016
autoscanInterval: this.options.autoscanInterval,
1017
longAutoscanInterval: this.options.longAutoscanInterval,
1018
forgetClusterNodeInterval: this.options.forgetClusterNodeInterval,
1019
port: port + i,
1020
id: `${this.options.id}-${i}`,
1021
};
1022
await forkedConatServer(opts);
1023
await this.join(getServerAddress(opts));
1024
};
1025
const v: any[] = [];
1026
for (let i = 1; i < localClusterSize; i++) {
1027
v.push(f(i));
1028
}
1029
await Promise.all(v);
1030
};
1031
1032
private initAutoscan = async () => {
1033
if (!this.options.autoscanInterval) {
1034
this.log("Cluster autoscan is DISABLED.");
1035
return;
1036
}
1037
this.log(`Cluster autoscan interval ${this.options.autoscanInterval}ms`);
1038
let lastCount = 1;
1039
while (!this.isClosed()) {
1040
let x;
1041
try {
1042
x = await this.scan();
1043
if (this.isClosed()) return;
1044
} catch (err) {
1045
// this should never happen unless there is a serious bug (?).
1046
this.log(`WARNING/BUG?: serious problem scanning -- ${err}`);
1047
throw err;
1048
await delay(this.options.longAutoscanInterval);
1049
continue;
1050
}
1051
if (x.errors.length > 0) {
1052
this.log(`WARNING: errors while scanning cluster`, x.errors);
1053
}
1054
if (x.count > 0 || lastCount > 0) {
1055
this.log(
1056
`cluster scan added ${x.count} links -- will scan again in ${this.options.autoscanInterval}`,
1057
);
1058
await delay(this.options.autoscanInterval);
1059
} else {
1060
this.log(
1061
`cluster scan found no new links -- waiting ${this.options.longAutoscanInterval}ms before next scan`,
1062
);
1063
await delay(this.options.longAutoscanInterval);
1064
}
1065
lastCount = x.count;
1066
}
1067
};
1068
1069
private scanSoon = throttle(
1070
async () => {
1071
if (this.isClosed() || !this.options.autoscanInterval) {
1072
return;
1073
}
1074
let x;
1075
try {
1076
x = await this.scan();
1077
} catch (err) {
1078
this.log(
1079
`WARNING/BUG?: scanSoon -- serious problem scanning -- ${err}`,
1080
);
1081
return;
1082
}
1083
if (x.errors.length > 0) {
1084
this.log(
1085
`WARNING: scanSoon -- errors while scanning cluster`,
1086
x.errors,
1087
);
1088
}
1089
},
1090
1091
10_000,
1092
{
1093
leading: true,
1094
trailing: true,
1095
},
1096
);
1097
1098
// Join this node to the cluster that contains a node with the given address.
1099
// - the address obviously must be reachable over the network
1100
// - the systemAccountPassword of this node and the one with the given
1101
// address must be the same.
1102
join = reuseInFlight(
1103
async (
1104
address: string,
1105
{ timeout }: { timeout?: number } = {},
1106
): Promise<ClusterLink> => {
1107
if (!this.options.systemAccountPassword) {
1108
throw Error("systemAccountPassword must be set");
1109
}
1110
logger.debug("join: connecting to ", address);
1111
const link0 = this.clusterLinksByAddress[address];
1112
if (link0 != null) {
1113
logger.debug("join: already connected to ", address);
1114
return link0;
1115
}
1116
try {
1117
const link = await clusterLink(
1118
address,
1119
this.options.systemAccountPassword,
1120
timeout,
1121
);
1122
const { clusterName, id } = link;
1123
if (this.clusterLinks[clusterName] == null) {
1124
this.clusterLinks[clusterName] = {};
1125
}
1126
this.clusterLinks[clusterName][id] = link;
1127
this.clusterLinksByAddress[address] = link;
1128
this.scanSoon();
1129
logger.debug("join: successfully created new connection to ", address);
1130
return link;
1131
} catch (err) {
1132
logger.debug(
1133
"join: FAILED creating a new connection to ",
1134
address,
1135
err,
1136
);
1137
throw err;
1138
}
1139
},
1140
);
1141
1142
unjoin = ({
1143
id,
1144
clusterName,
1145
address,
1146
}: {
1147
clusterName?: string;
1148
id?: string;
1149
address?: string;
1150
}) => {
1151
if (!clusterName && !id && !address) {
1152
throw Error(
1153
"at least one of clusterName, id or address must be specified",
1154
);
1155
}
1156
let link;
1157
if (address) {
1158
link = this.clusterLinksByAddress[address];
1159
} else {
1160
if (!id) {
1161
throw Error("if address is not given then id must be given");
1162
}
1163
const cluster = clusterName ?? this.clusterName;
1164
if (!cluster) {
1165
throw "clusterName must be set";
1166
}
1167
link = this.clusterLinks[cluster]?.[id];
1168
}
1169
if (link === undefined) {
1170
// already gone
1171
return;
1172
}
1173
1174
link.close();
1175
delete this.clusterLinks[link.clusterName][link.id];
1176
delete this.clusterLinksByAddress[link.address];
1177
if (Object.keys(this.clusterLinks[link.clusterName]).length == 0) {
1178
delete this.clusterLinks[link.clusterName];
1179
}
1180
};
1181
1182
private initSystemService = async () => {
1183
if (!this.options.systemAccountPassword) {
1184
throw Error("system service requires system account");
1185
}
1186
this.log("starting service listening on sys...");
1187
const client = this.client({
1188
extraHeaders: { Cookie: `sys=${this.options.systemAccountPassword}` },
1189
});
1190
1191
const stats = async () => {
1192
return { [this.id]: this.stats };
1193
};
1194
const usage = async () => {
1195
return { [this.id]: this.usage.stats() };
1196
};
1197
// user has to explicitly refresh their browser after
1198
// being disconnected this way:
1199
const disconnect = async (ids: string | string[]) => {
1200
if (typeof ids == "string") {
1201
ids = [ids];
1202
}
1203
for (const id of ids) {
1204
this.io.in(id).disconnectSockets();
1205
}
1206
};
1207
1208
const subject = sysApiSubject({ clusterName: this.clusterName });
1209
// services that ALL servers answer, i.e., a single request gets
1210
// answers from all members of the cluster.
1211
await client.service<SysConatServer>(
1212
subject,
1213
{
1214
stats,
1215
usage,
1216
disconnect,
1217
join: () => {
1218
throw Error("wrong service");
1219
},
1220
unjoin: () => {
1221
throw Error("wrong service");
1222
},
1223
clusterTopology: () => {
1224
throw Error("wrong service");
1225
},
1226
clusterAddresses: () => {
1227
throw Error("wrong service");
1228
},
1229
},
1230
{ queue: `${this.clusterName}-${this.id}` },
1231
);
1232
this.log(`successfully started ${subject} service`);
1233
1234
const subject2 = sysApiSubject({
1235
clusterName: this.clusterName,
1236
id: this.id,
1237
});
1238
1239
await client.service<SysConatServer>(subject2, {
1240
stats,
1241
usage,
1242
// user has to explicitly refresh there browser after
1243
// being disconnected this way
1244
disconnect,
1245
join: async (address: string) => {
1246
await this.join(address);
1247
},
1248
unjoin: async (opts: { clusterName?: string; id: string }) => {
1249
await this.unjoin(opts);
1250
},
1251
1252
clusterTopology: async (): Promise<{
1253
// map from id to address
1254
[clusterName: string]: { [id: string]: string };
1255
}> => this.clusterTopology(),
1256
1257
// addresses of all nodes in the (super-)cluster
1258
clusterAddresses: async (clusterName?: string): Promise<string[]> =>
1259
this.clusterAddresses(clusterName),
1260
});
1261
this.log(`successfully started ${subject2} service`);
1262
};
1263
1264
clusterAddresses = (clusterName?: string) => {
1265
const v: string[] = [];
1266
if (!clusterName) {
1267
v.push(this.address());
1268
for (const addr in this.clusterLinksByAddress) {
1269
const link = this.clusterLinksByAddress[addr];
1270
if (link.isConnected()) {
1271
v.push(addr);
1272
}
1273
}
1274
return v;
1275
}
1276
if (clusterName == this.clusterName) {
1277
v.push(this.address());
1278
}
1279
for (const address in this.clusterLinksByAddress) {
1280
if (
1281
this.clusterLinksByAddress[address].isConnected() &&
1282
this.clusterLinksByAddress[address].clusterName == clusterName
1283
) {
1284
v.push(address);
1285
}
1286
}
1287
return v;
1288
};
1289
1290
clusterTopology = (): {
1291
// map from id to address
1292
[clusterName: string]: { [id: string]: string };
1293
} => {
1294
if (!this.clusterName || !this.id) {
1295
throw Error("not in cluster mode");
1296
}
1297
const addresses: { [clusterName: string]: { [id: string]: string } } = {};
1298
for (const clusterName in this.clusterLinks) {
1299
addresses[clusterName] = {};
1300
const C = this.clusterLinks[clusterName];
1301
for (const id in C) {
1302
addresses[clusterName][id] = C[id].address;
1303
}
1304
}
1305
if (addresses[this.clusterName] == null) {
1306
addresses[this.clusterName] = {};
1307
}
1308
addresses[this.clusterName][this.id] = this.address();
1309
return addresses;
1310
};
1311
1312
// scan via any nodes we're connected to for other known nodes in the cluster
1313
// that we're NOT connected to. If every node does this periodically (and the
1314
// cluster isn't constantly changing, then each component of the digraph
1315
// will eventually be a complete graph. In particular, this function returns
1316
// the number of links created (count), so if it returns 0 when called on all nodes, then
1317
// we're done until new nodes are added.
1318
scan = reuseInFlight(async (): Promise<{ count: number; errors: any[] }> => {
1319
if (this.isClosed()) {
1320
return { count: 0, errors: [] };
1321
}
1322
const knownByUs = new Set(this.clusterAddresses(this.clusterName));
1323
const unknownToUs = new Set<string>([]);
1324
const errors: { err: any; desc: string }[] = [];
1325
1326
// in parallel, we use the sys api to call all nodes we know about
1327
// and ask them "heh, what nodes in this cluster do *YOU* know about"?
1328
// if any come back that we don't know about, we add them to unknownToUs.
1329
let count = 0;
1330
1331
const f = async (client) => {
1332
// Guard against race condition: client may be closed during scan
1333
const address = client?.options?.address;
1334
if (!address) {
1335
return;
1336
}
1337
try {
1338
const sys = sysApi(client);
1339
const knownByRemoteNode = new Set(
1340
await sys.clusterAddresses(this.clusterName),
1341
);
1342
if (this.isClosed()) return;
1343
logger.debug(
1344
"scan: remote",
1345
address,
1346
"knows about ",
1347
knownByRemoteNode,
1348
);
1349
for (const addr of knownByRemoteNode) {
1350
if (!knownByUs.has(addr)) {
1351
unknownToUs.add(addr);
1352
}
1353
}
1354
if (!knownByRemoteNode.has(this.address())) {
1355
// we know about them, but they don't know about us, so ask them to link to us.
1356
logger.debug("scan: asking remote ", address, " to link to us");
1357
await sys.join(this.address());
1358
if (this.isClosed()) return;
1359
count += 1;
1360
}
1361
} catch (err) {
1362
errors.push({
1363
err,
1364
desc: `requesting remote ${address} join us`,
1365
});
1366
}
1367
};
1368
1369
if (!this.clusterName) {
1370
throw Error("if cluster is enabled, then the clusterName must be set");
1371
}
1372
1373
await Promise.all(
1374
Object.values(this.clusterLinks[this.clusterName] ?? {})
1375
.filter((link) => {
1376
if (link.isConnected()) {
1377
return true;
1378
} else {
1379
if (
1380
link.howLongDisconnected() >=
1381
(this.options.forgetClusterNodeInterval ??
1382
DEFAULT_FORGET_CLUSTER_NODE_INTERVAL)
1383
) {
1384
// forget about this link
1385
this.unjoin(link);
1386
}
1387
}
1388
})
1389
.map((link) => f(link.client)),
1390
);
1391
if (unknownToUs.size == 0 || this.isClosed()) {
1392
return { count, errors };
1393
}
1394
1395
// Now (in parallel), join with all unknownToUs nodes.
1396
const g = async (address: string) => {
1397
try {
1398
await this.join(address);
1399
count += 1;
1400
} catch (err) {
1401
errors.push({ err, desc: `joining to ${address}` });
1402
}
1403
};
1404
const v = Array.from(unknownToUs).map(g);
1405
1406
await Promise.all(v);
1407
1408
return { count, errors };
1409
});
1410
1411
private waitForInterest = async (
1412
subject: string,
1413
timeout: number,
1414
socketId: string,
1415
signal?: AbortSignal,
1416
): Promise<boolean> => {
1417
if (!this.cluster) {
1418
// not a cluster
1419
return await this.waitForInterestOnThisNode(
1420
subject,
1421
timeout,
1422
socketId,
1423
signal,
1424
);
1425
}
1426
// check if there is already known interest
1427
const links = this.superclusterLinks();
1428
for (const link of links) {
1429
if (link.hasInterest(subject)) {
1430
return true;
1431
}
1432
}
1433
1434
// wait for interest in any node on any cluster
1435
return await this.waitForInterestInLinks(
1436
subject,
1437
timeout,
1438
socketId,
1439
signal,
1440
links,
1441
);
1442
};
1443
1444
private superclusterLinks = (): ClusterLink[] => {
1445
let links: ClusterLink[] = [];
1446
for (const clusterName in this.clusterLinks) {
1447
links = links.concat(Object.values(this.clusterLinks[clusterName]));
1448
}
1449
return links;
1450
};
1451
1452
private waitForInterestInLinks = async (
1453
subject,
1454
timeout,
1455
socketId,
1456
signal,
1457
links: ClusterLink[],
1458
): Promise<boolean> => {
1459
const v: any[] = [];
1460
// we use AbortController etc below so we can cancel waiting once
1461
// we get any interest.
1462
const nothrow = async (f) => {
1463
try {
1464
return await f;
1465
} catch {}
1466
return false;
1467
};
1468
const controller = new AbortController();
1469
const signal2 = controller.signal;
1470
v.push(
1471
nothrow(
1472
this.waitForInterestOnThisNode(subject, timeout, socketId, signal2),
1473
),
1474
);
1475
for (const link of links) {
1476
v.push(nothrow(link.waitForInterest(subject, timeout, signal2)));
1477
}
1478
if (!timeout) {
1479
// with timeout=0 they all immediately answer (so no need to worry about abort/promise)
1480
const w = await Promise.all(v);
1481
for (const x of w) {
1482
if (x) {
1483
return true;
1484
}
1485
}
1486
return false;
1487
}
1488
1489
signal?.addEventListener("abort", () => {
1490
controller.abort();
1491
});
1492
const w = await Promise.race(v);
1493
// cancel all the others.
1494
controller.abort();
1495
return w;
1496
};
1497
1498
private waitForInterestOnThisNode = async (
1499
subject: string,
1500
timeout: number,
1501
socketId: string,
1502
signal?: AbortSignal,
1503
) => {
1504
const matches = this.interest.matches(subject);
1505
if (matches.length > 0 || !timeout) {
1506
// NOTE: we never return the actual matches, since this is a
1507
// potential security vulnerability.
1508
// it could make it very easy to figure out private inboxes, etc.
1509
return matches.length > 0;
1510
}
1511
if (timeout > MAX_INTEREST_TIMEOUT) {
1512
timeout = MAX_INTEREST_TIMEOUT;
1513
}
1514
const start = Date.now();
1515
while (!this.isClosed() && this.sockets[socketId] && !signal?.aborted) {
1516
if (Date.now() - start >= timeout) {
1517
throw Error("timeout");
1518
}
1519
try {
1520
// if signal is set only wait for the change for up to 1 second.
1521
await once(this.interest, "change", signal != null ? 1000 : undefined);
1522
} catch {
1523
continue;
1524
}
1525
if (this.isClosed() || !this.sockets[socketId] || signal?.aborted) {
1526
return false;
1527
}
1528
const hasMatch = this.interest.hasMatch(subject);
1529
if (hasMatch) {
1530
return true;
1531
}
1532
}
1533
return false;
1534
};
1535
1536
hash = (): { interest: number } => {
1537
return {
1538
interest: hashInterest(this.interest),
1539
};
1540
};
1541
}
1542
1543
function getSubjectFromRoom(room: string) {
1544
if (room.startsWith("{")) {
1545
return JSON.parse(room).subject;
1546
} else {
1547
return room;
1548
}
1549
}
1550
1551
function socketSubjectRoom({ socket, subject }) {
1552
return JSON.stringify({ id: socket.id, subject });
1553
}
1554
1555
export function randomChoice(v: Set<string>): string {
1556
if (v.size == 0) {
1557
throw Error("v must have size at least 1");
1558
}
1559
if (v.size == 1) {
1560
for (const x of v) {
1561
return x;
1562
}
1563
}
1564
const w = Array.from(v);
1565
const i = Math.floor(Math.random() * w.length);
1566
return w[i];
1567
}
1568
1569
// See https://socket.io/how-to/get-the-ip-address-of-the-client
1570
function getAddress(socket) {
1571
return getClientIpAddress(socket.handshake) ?? socket.handshake.address;
1572
}
1573
1574
export function updateInterest(update: InterestUpdate, interest: Interest) {
1575
const { op, subject, queue, room } = update;
1576
const groups = interest.get(subject);
1577
if (op == "add") {
1578
if (typeof queue != "string") {
1579
throw Error("queue must not be null for add");
1580
}
1581
if (groups === undefined) {
1582
interest.set(subject, { [queue]: new Set([room]) });
1583
} else if (groups[queue] == null) {
1584
groups[queue] = new Set([room]);
1585
} else {
1586
groups[queue].add(room);
1587
}
1588
} else if (op == "delete") {
1589
if (groups != null) {
1590
let nonempty = false;
1591
for (const queue in groups) {
1592
groups[queue].delete(room);
1593
if (groups[queue].size == 0) {
1594
delete groups[queue];
1595
} else {
1596
nonempty = true;
1597
}
1598
}
1599
if (!nonempty) {
1600
// no interest anymore
1601
interest.delete(subject);
1602
}
1603
}
1604
} else {
1605
throw Error(`invalid op ${op}`);
1606
}
1607
}
1608
1609
function getServerAddress(options: Options) {
1610
const port = options.port;
1611
const path = options.path?.slice(0, -"/conat".length) ?? "";
1612
return `http${options.ssl || port == 443 ? "s" : ""}://${
1613
options.clusterIpAddress ?? "localhost"
1614
}:${port}${path}`;
1615
}
1616
1617
/*
1618
Without this, if an admin subscribed to '>' then suddenly all the algorithms
1619
for responding to messages, sockets, etc., based on "waiting for interest"
1620
would start failing. The following is a hack to allow subscribing to '>'.
1621
Really we need something more general for other admin "wire taps", but
1622
this will have to do for now.
1623
*/
1624
function isSilentPattern(pattern: string): boolean {
1625
return pattern == ">";
1626
}
1627
1628
/*
1629
const watching = new Set(["xyz"]);
1630
let last = Date.now();
1631
function watch(action, { subject, data, id, from }) {
1632
for (const x of watching) {
1633
if (subject.includes(x)) {
1634
console.log(Date.now() - last, new Date(), action, id, {
1635
subject,
1636
data,
1637
from,
1638
});
1639
last = Date.now();
1640
if (data[5]?.["CN-Reply"]) {
1641
watching.add(data[5]["CN-Reply"]);
1642
}
1643
}
1644
}
1645
}
1646
function trace(subject, ...args) {
1647
for (const x of watching) {
1648
if (subject.includes(x)) {
1649
console.log(Date.now() - last, new Date(), subject, ...args);
1650
last = Date.now();
1651
}
1652
}
1653
}
1654
*/
1655
1656