Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/cluster/cluster-basics.test.ts
1712 views
1
/*
2
3
pnpm test `pwd`/cluster-basics.test.ts
4
5
*/
6
7
import {
8
before,
9
after,
10
delay,
11
once,
12
wait,
13
waitForConsistentState,
14
} from "@cocalc/backend/conat/test/setup";
15
import {
16
clusterLink,
17
clusterStreams,
18
clusterService,
19
trimClusterStreams,
20
} from "@cocalc/conat/core/cluster";
21
import { createClusterNode } from "./util";
22
import type { Client } from "@cocalc/conat/core/client";
23
import { sysApi } from "@cocalc/conat/core/sys";
24
25
beforeAll(before);
26
27
jest.setTimeout(20000);
28
29
describe("create a cluster enabled socketio server and test that the streams update as they should", () => {
30
let server, client;
31
it("create a server with cluster support enabled", async () => {
32
({ server, client } = await createClusterNode({
33
clusterName: "cluster0",
34
id: "0",
35
}));
36
37
expect(server.isHealthy()).toBe(true);
38
});
39
40
let streams;
41
it("get the interest stream via our client. There MUST be at least two persist subjects in there, since they were needed to even create the interest stream.", async () => {
42
streams = await clusterStreams({
43
...server.options,
44
client,
45
});
46
const service = clusterService(server.options);
47
await wait({
48
until: () => {
49
const v = streams.interest.getAll();
50
expect(service).toContain(server.options.clusterName);
51
const persistUpdates = v.filter((update) =>
52
update.subject.startsWith(service),
53
);
54
if (persistUpdates.length <= 1) {
55
return false;
56
}
57
expect(persistUpdates.length).toBeGreaterThan(1);
58
return true;
59
},
60
});
61
});
62
63
it("subscribe and see update appear in the stream; close sub and see delete appear", async () => {
64
const sub = await client.subscribe("foo");
65
while (true) {
66
const v = streams.interest.getAll().filter((x) => x.subject == "foo");
67
if (v.length == 1) {
68
expect(v[0]).toEqual(
69
expect.objectContaining({ op: "add", subject: "foo" }),
70
);
71
break;
72
}
73
await once(streams.interest, "change");
74
}
75
sub.close();
76
while (true) {
77
const v = streams.interest.getAll().filter((x) => x.subject == "foo");
78
if (v.length == 2) {
79
expect(v[1]).toEqual(
80
expect.objectContaining({ op: "delete", subject: "foo" }),
81
);
82
break;
83
}
84
await once(streams.interest, "change");
85
}
86
});
87
88
let link;
89
it("get access to the same stream, but via a cluster link, and note that it is identical to the one in the server -- keeping these pattern objects sync'd is the point of the link", async () => {
90
link = await clusterLink(
91
server.address(),
92
server.options.systemAccountPassword,
93
);
94
await wait({
95
until: () => {
96
return (
97
Object.keys(server.interest.serialize().patterns).length ==
98
Object.keys(link.interest.serialize().patterns).length
99
);
100
},
101
});
102
expect(server.interest.serialize().patterns).toEqual(
103
link.interest.serialize().patterns,
104
);
105
});
106
107
it("creates a sub and see this reflected in the patterns", async () => {
108
const sub = await client.subscribe("foo");
109
await wait({
110
until: () => link.interest.serialize().patterns["foo"] !== undefined,
111
});
112
// equal after making the subscription to foo
113
expect(server.interest.serialize()).toEqual(link.interest.serialize());
114
115
const { patterns } = link.interest.serialize();
116
expect(patterns["foo"] != undefined).toBe(true);
117
118
sub.close();
119
await wait({
120
until: () => link.interest.serialize().patterns["foo"] === undefined,
121
});
122
expect(patterns["foo"] === undefined).toBe(true);
123
124
// still identical
125
expect(server.interest.serialize()).toEqual(link.interest.serialize());
126
});
127
128
const count = 50;
129
it(`make ${count} more subscriptions and see this reflected in the link`, async () => {
130
const v: any[] = [];
131
for (let i = 0; i < count; i++) {
132
v.push(await client.subscribe(`foo.${i}`));
133
}
134
135
await wait({
136
until: () =>
137
link.interest.serialize().patterns[`foo.${count - 1}`] !== undefined,
138
});
139
140
expect(server.interest.serialize()).toEqual(link.interest.serialize());
141
142
// and unsubscribe
143
for (let i = 0; i < count; i++) {
144
v[i].close();
145
}
146
await wait({
147
until: () =>
148
link.interest.serialize().patterns[`foo.${count - 1}`] === undefined,
149
});
150
151
expect(server.interest.serialize()).toEqual(link.interest.serialize());
152
});
153
154
it("a new link has correct state, despite the activity", async () => {
155
const link2 = await clusterLink(
156
server.address(),
157
server.options.systemAccountPassword,
158
);
159
await wait({
160
until: () => {
161
return (
162
Object.keys(server.interest.serialize().patterns).length ==
163
// @ts-ignore
164
Object.keys(link2.interest.serialize().patterns).length
165
);
166
},
167
});
168
expect(server.interest.serialize().patterns).toEqual(
169
// @ts-ignore
170
link2.interest.serialize().patterns,
171
);
172
link2.close();
173
});
174
});
175
176
describe("create a cluster with two distinct servers and send a message from one client to another via a link, and also use request/reply", () => {
177
let server1, server2, client1, client2;
178
it("create two distinct servers with cluster support enabled", async () => {
179
({ server: server1, client: client1 } = await createClusterNode({
180
clusterName: "cluster1",
181
systemAccountPassword: "squeamish",
182
id: "1",
183
}));
184
({ server: server2, client: client2 } = await createClusterNode({
185
clusterName: "cluster1",
186
systemAccountPassword: "ossifrage",
187
id: "2",
188
}));
189
});
190
191
it("link them", async () => {
192
await delay(500);
193
await server1.join(server2.address());
194
await server2.join(server1.address());
195
});
196
197
it("wait for consistent state", async () => {
198
await waitForConsistentState([server1, server2], 10000);
199
});
200
201
it("tests that server-side waitForInterestOnThisNode can be aborted", async () => {
202
const controller = new AbortController();
203
const w = server2.waitForInterestOnThisNode(
204
"no-interest",
205
90000,
206
client2.conn.id,
207
controller.signal,
208
);
209
await delay(150);
210
controller.abort();
211
expect(await w).toBe(false);
212
});
213
214
it("tests that server-side waitForInterest can be aborted", async () => {
215
const controller = new AbortController();
216
const w = server2.waitForInterest(
217
"no-interest",
218
90000,
219
client2.conn.id,
220
controller.signal,
221
);
222
// give it some time to get going (otherwise test is too easy)
223
await delay(150);
224
controller.abort();
225
expect(await w).toBe(false);
226
});
227
228
const N =
229
"114381625757888867669235779976146612010218296721242362562561842935706935245733897830597123563958705058989075147599290026879543541";
230
231
let sub;
232
233
it("creates a subscription on client1, then publish to it from client2, thus using routing over the link", async () => {
234
sub = await client1.subscribe("rsa");
235
236
const x = await client2.publish("rsa", N);
237
// interest hasn't propogated from one cluster to another yet:
238
expect(x.count).toBe(0);
239
240
await client2.waitForInterest("rsa");
241
242
const y = await client2.publish("rsa", N);
243
expect(y.count).toBe(1);
244
245
const { value } = await sub.next();
246
expect(value.data).toBe(N);
247
});
248
249
it("test request/reply between clusters", async () => {
250
const req = client2.request("rsa", N);
251
const { value } = await sub.next();
252
expect(value.data).toBe(N);
253
await wait({
254
until: async () => {
255
// ensure respons gets received -- it's possible for sub to be visible to client2
256
// slightly before the inbox for client2 is visible, in which case client2
257
// would never get a response and timeout.
258
const { count } = await value.respond(
259
"3490529510847650949147849619903898133417764638493387843990820577 × 32769132993266709549961988190834461413177642967992942539798288533",
260
);
261
return count > 0;
262
},
263
});
264
const response = await req;
265
expect(response.data).toContain("×");
266
});
267
268
it("remove the links", async () => {
269
const sub = await client1.subscribe("x");
270
await server1.unjoin({ id: "2" });
271
await server2.unjoin({ id: "1" });
272
const { count } = await client1.publish("x", "hello");
273
expect(count).toBe(1);
274
const { count: count2 } = await client2.publish("x", "hello");
275
expect(count2).toBe(0);
276
sub.close();
277
});
278
});
279
280
// This is basically identical to the previous one, but for a bigger cluster:
281
const clusterSize = 3;
282
describe(`a cluster with ${clusterSize} nodes`, () => {
283
const servers: any[] = [],
284
clients: any[] = [];
285
it(`create ${clusterSize} distinct servers with cluster support enabled`, async () => {
286
for (let i = 0; i < clusterSize; i++) {
287
const { server, client } = await createClusterNode({
288
clusterName: "my-cluster",
289
id: `node-${i}`,
290
});
291
expect(server.options.id).toBe(`node-${i}`);
292
expect(server.options.clusterName).toBe("my-cluster");
293
servers.push(server);
294
clients.push(client);
295
}
296
});
297
298
it("link them all together in a complete digraph", async () => {
299
for (let i = 0; i < servers.length; i++) {
300
for (let j = 0; j < servers.length; j++) {
301
if (i != j) {
302
await servers[i].join(servers[j].address());
303
await servers[j].join(servers[i].address());
304
}
305
}
306
}
307
});
308
309
it("get addresses and topology", async () => {
310
await clients[0].waitUntilSignedIn();
311
for (let i = 0; i < clusterSize; i++) {
312
const sys = sysApi(clients[i]);
313
expect(new Set(await sys.clusterAddresses()).size).toBe(clusterSize);
314
}
315
const t = await sysApi(clients[0]).clusterTopology();
316
expect(Object.keys(t)).toEqual(["my-cluster"]);
317
const v = Object.values(t["my-cluster"]);
318
expect(new Set(v)).toEqual(
319
new Set(servers.map((server) => server.address())),
320
);
321
});
322
323
let sub;
324
it("creates a subscription on clients[0], then observe each other client sees it as existing and can send it a message", async () => {
325
sub = await clients[0].subscribe("rsa");
326
for (let i = 0; i < clusterSize; i++) {
327
await clients[i].waitForInterest("rsa");
328
clients[i].publish("rsa", i);
329
}
330
for (let i = 0; i < clusterSize; i++) {
331
const { value } = await sub.next();
332
expect(value.data).toBe(i);
333
}
334
});
335
336
it("check that interest data is *eventually* consistent", async () => {
337
await waitForConsistentState(servers);
338
});
339
340
it("test request/respond from all participants", async () => {
341
await delay(500);
342
const v: any[] = [];
343
for (let i = 0; i < clusterSize; i++) {
344
const req = clients[i].request("rsa", i);
345
v.push(req);
346
}
347
for (let i = 0; i < clusterSize; i++) {
348
const { value } = await sub.next();
349
expect(value.data).toBe(i);
350
const { count } = await value.respond(i + 1);
351
expect(count).toBeGreaterThan(0);
352
}
353
354
for (let i = 0; i < clusterSize; i++) {
355
const r = (await v[i]).data;
356
expect(r).toBe(i + 1);
357
}
358
});
359
});
360
361
describe("test trimming the interest stream", () => {
362
let server, client;
363
it("create a cluster server", async () => {
364
({ server, client } = await createClusterNode({
365
id: "0",
366
clusterName: "trim",
367
}));
368
await wait({ until: () => server.clusterStreams != null });
369
});
370
371
let sub;
372
it("subscribes and verifies that trimming does nothing", async () => {
373
sub = await client.sub("389");
374
const { seqsInterest: seqs } = await trimClusterStreams(
375
server.clusterStreams,
376
server,
377
0,
378
);
379
expect(seqs).toEqual([]);
380
});
381
382
it("unsubscribes and verifies that trimming with a 5s maxAge does nothing", async () => {
383
sub.close();
384
await delay(100);
385
const { seqsInterest: seqs } = await trimClusterStreams(
386
server.clusterStreams,
387
server,
388
5000,
389
);
390
expect(seqs).toEqual([]);
391
});
392
393
it(" see that two updates are trimmed when maxAge is 0s", async () => {
394
// have to use wait since don't know how long until
395
// stream actually updated after unsub.
396
let seqs;
397
await wait({
398
until: async () => {
399
seqs = (await trimClusterStreams(server.clusterStreams, server, 0))
400
.seqsInterest;
401
return seqs.length >= 2;
402
},
403
});
404
expect(seqs.length).toBe(2);
405
await delay(1);
406
for (const update of server.clusterStreams.interest.getAll()) {
407
if (update.subject == "389" && update.op == "add") {
408
throw Error("adding 389 should have been removed");
409
}
410
}
411
});
412
});
413
414
describe("join two servers in a cluster using the sys api instead of directly calling join on the server objects", () => {
415
let server1, server2, client1: Client, client2: Client;
416
const systemAccountPassword = "squeamish ossifrage";
417
it("create two distinct servers with cluster support enabled", async () => {
418
({ server: server1, client: client1 } = await createClusterNode({
419
clusterName: "cluster-sys",
420
systemAccountPassword,
421
id: "1",
422
}));
423
({ server: server2, client: client2 } = await createClusterNode({
424
clusterName: "cluster-sys",
425
systemAccountPassword,
426
id: "2",
427
}));
428
});
429
430
let sys1, sys2;
431
it("link them using the sys api", async () => {
432
sys1 = sysApi(client1);
433
expect(await sys1.clusterAddresses()).toEqual([server1.address()]);
434
await sys1.join(server2.address());
435
expect(await sys1.clusterAddresses()).toEqual([
436
server1.address(),
437
server2.address(),
438
]);
439
expect(await sys1.clusterTopology()).toEqual({
440
"cluster-sys": {
441
"1": server1.address(),
442
"2": server2.address(),
443
},
444
});
445
446
sys2 = sysApi(client2);
447
expect(await sys2.clusterAddresses()).toEqual([server2.address()]);
448
await sys2.join(server1.address());
449
expect(await sys2.clusterAddresses()).toEqual([
450
server2.address(),
451
server1.address(),
452
]);
453
expect(await sys1.clusterTopology()).toEqual(await sys2.clusterTopology());
454
});
455
456
let sub;
457
it("verify link worked", async () => {
458
sub = await client1.subscribe("x");
459
await client2.waitForInterest("x");
460
const { count } = await client2.publish("x", "hello");
461
expect(count).toBe(1);
462
const { value } = await sub.next();
463
expect(value.data).toBe("hello");
464
});
465
466
it("remove the links", async () => {
467
await sys1.unjoin({ id: "2" });
468
await sys2.unjoin({ id: "1" });
469
const { count } = await client1.publish("x", "hello");
470
expect(count).toBe(1);
471
const { count: count2 } = await client2.publish("x", "hello");
472
expect(count2).toBe(0);
473
sub.close();
474
475
expect(await sys1.clusterAddresses()).toEqual([server1.address()]);
476
expect(await sys2.clusterAddresses()).toEqual([server2.address()]);
477
expect(await sys1.clusterTopology()).toEqual({
478
"cluster-sys": {
479
"1": server1.address(),
480
},
481
});
482
expect(await sys2.clusterTopology()).toEqual({
483
"cluster-sys": {
484
"2": server2.address(),
485
},
486
});
487
});
488
});
489
490
describe("test automatic node discovery", () => {
491
// create a cluster with 3 nodes just two edges connecting them
492
const nodes: { client; server }[] = [];
493
494
it("create three distinct servers with cluster support enabled", async () => {
495
nodes.push(
496
await createClusterNode({ id: "node0", clusterName: "discovery" }),
497
);
498
nodes.push(
499
await createClusterNode({ id: "node1", clusterName: "discovery" }),
500
);
501
nodes.push(
502
await createClusterNode({ id: "node2", clusterName: "discovery" }),
503
);
504
// different cluster
505
nodes.push(await createClusterNode({ id: "node0", clusterName: "moon" }));
506
});
507
508
it("connect them in the minimal possible way", async () => {
509
await nodes[0].server.join(nodes[1].server.address());
510
await nodes[1].server.join(nodes[2].server.address());
511
512
// plus one to the other cluster
513
await nodes[0].server.join(nodes[3].server.address());
514
515
expect(nodes[0].server.clusterAddresses("discovery").length).toBe(2);
516
expect(nodes[1].server.clusterAddresses("discovery").length).toBe(2);
517
expect(nodes[2].server.clusterAddresses("discovery").length).toBe(1);
518
});
519
520
it("run scan from node0. this results in the following new connections: 1->0, 0->2", async () => {
521
const { count, errors } = await nodes[0].server.scan();
522
expect(count).toBe(2);
523
expect(errors.length).toBe(0);
524
expect(nodes[0].server.clusterAddresses("discovery").length).toBe(3);
525
expect(nodes[1].server.clusterAddresses("discovery").length).toBe(3);
526
expect(nodes[2].server.clusterAddresses("discovery").length).toBe(1);
527
});
528
529
it("run scan from node1. this should result in the following new connections: 2->1 ", async () => {
530
const { count, errors } = await nodes[1].server.scan();
531
expect(count).toBe(1);
532
expect(errors.length).toBe(0);
533
expect(nodes[2].server.clusterAddresses("discovery").length).toBe(2);
534
});
535
536
it("run scan from node2. this should result in the following new connections: 2->0. We now have the complete graph.", async () => {
537
const { count, errors } = await nodes[2].server.scan();
538
expect(count).toBe(1);
539
expect(errors.length).toBe(0);
540
expect(nodes[2].server.clusterAddresses("discovery").length).toBe(3);
541
});
542
543
it("join a new node3 (=nodes[4] due to other cluster node) to node2 and do a scan", async () => {
544
nodes.push(
545
await createClusterNode({ id: "node3", clusterName: "discovery" }),
546
);
547
await nodes[4].server.join(nodes[2].server.address());
548
// only knows about self and node2 initially
549
expect(nodes[4].server.clusterAddresses("discovery").length).toBe(2);
550
551
// new node scans and now it knows about ALL nodes in the cluster
552
expect((await nodes[4].server.scan()).count).toBe(3);
553
expect(nodes[4].server.clusterAddresses("discovery").length).toBe(4);
554
555
// have the other 3 nodes scan so they know all nodes:
556
for (let i = 0; i < 3; i++) {
557
await nodes[i].server.scan();
558
expect(nodes[i].server.clusterAddresses("discovery").length).toBe(4);
559
}
560
});
561
});
562
563
describe("test the clusterIpAddress option", () => {
564
it("create a server without explicit clusterIpAddress", async () => {
565
const { server } = await createClusterNode({
566
clusterName: "cluster0",
567
id: "0",
568
});
569
570
expect(server.options.clusterIpAddress).toBe(undefined);
571
expect(server.address()).toBe(`http://localhost:${server.options.port}`);
572
server.close();
573
});
574
575
it("create a server with explicit clusterIpAddress", async () => {
576
const { server } = await createClusterNode({
577
clusterName: "cluster0",
578
id: "0",
579
clusterIpAddress: "127.0.0.1",
580
});
581
582
expect(server.options.clusterIpAddress).toBe("127.0.0.1");
583
expect(server.address()).toBe(`http://127.0.0.1:${server.options.port}`);
584
server.close();
585
});
586
});
587
588
afterAll(after);
589
590