Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/cluster/cluster-sticky.test.ts
1712 views
1
import {
2
before,
3
after,
4
wait,
5
waitForConsistentState,
6
} from "@cocalc/backend/conat/test/setup";
7
import { STICKY_QUEUE_GROUP } from "@cocalc/conat/core/client";
8
import { type ConatServer } from "@cocalc/conat/core/server";
9
import { createClusterNode } from "./util";
10
11
beforeAll(before);
12
13
describe("create cluster of two nodes, and verify that *sticky* subs properly work", () => {
14
let server1: ConatServer, server2: ConatServer, client1, client1b, client2;
15
it("create two distinct servers and link them", async () => {
16
({ server: server1, client: client1 } = await createClusterNode({
17
clusterName: "cluster0",
18
id: "1",
19
autoscanInterval: 100,
20
longAutoscanInterval: 1000,
21
}));
22
client1b = server1.client();
23
({ server: server2, client: client2 } = await createClusterNode({
24
clusterName: "cluster0",
25
id: "2",
26
autoscanInterval: 100,
27
longAutoscanInterval: 1000,
28
}));
29
await server1.join(server2.address());
30
await server2.join(server1.address());
31
});
32
33
let sub1, sub1b;
34
let recv1 = 0,
35
recv1b = 0;
36
const subject = "5077.org";
37
it("make two subscriptions with the same sticky queue group", async () => {
38
sub1 = await client1.sub(subject, { queue: STICKY_QUEUE_GROUP });
39
(async () => {
40
for await (const _ of sub1) {
41
recv1++;
42
}
43
})();
44
sub1b = await client1b.sub(subject, { queue: STICKY_QUEUE_GROUP });
45
(async () => {
46
for await (const _ of sub1b) {
47
recv1b++;
48
}
49
})();
50
});
51
52
let count = 50;
53
it("send messages and note they all go to the same target -- first the easy sanity check all on the same node", async () => {
54
await client1.waitForInterest(subject);
55
for (let i = 0; i < count; i++) {
56
await client1.publish(subject, "hi");
57
}
58
await wait({ until: () => recv1 + recv1b >= count });
59
expect(recv1 + recv1b).toEqual(count);
60
expect(recv1 * recv1b).toEqual(0);
61
});
62
63
it("send messages and note they all go to the same target -- next the hard case across the cluster", async () => {
64
// NOTE: if we do this test without waiting for consistent state, it will definitely
65
// fail sometimes, since server2 literally doesn't know enough about the servers yet,
66
// so has to make a different choice. Services of course must account for the fact that
67
// for the first moments of their existence, sticky routing can't work.
68
await waitForConsistentState([server1, server2]);
69
70
await client2.waitForInterest(subject);
71
for (let i = 0; i < count; i++) {
72
await client2.publish(subject, "hi");
73
}
74
await wait({ until: () => recv1 + recv1b >= 2 * count });
75
expect(recv1 + recv1b).toEqual(2 * count);
76
expect(recv1 * recv1b).toEqual(0);
77
});
78
79
let server3, client3;
80
it("add a third node", async () => {
81
const { client, server } = await createClusterNode({
82
clusterName: "cluster0",
83
autoscanInterval: 100,
84
longAutoscanInterval: 5000,
85
id: "3",
86
});
87
server3 = server;
88
client3 = client;
89
await server1.join(server.address());
90
});
91
92
it("waits for consistent state -- this verifies, e.g., that sticky state is equal", async () => {
93
await waitForConsistentState([server1, server2, server3]);
94
});
95
96
it("client connected to server3 also routes properly", async () => {
97
const total = recv1 + recv1b;
98
await client3.waitForInterest(subject);
99
for (let i = 0; i < count; i++) {
100
await client3.publish(subject, "hi");
101
}
102
await wait({ until: () => recv1 + recv1b >= total + count });
103
expect(recv1 + recv1b).toEqual(total + count);
104
expect(recv1 * recv1b).toEqual(0);
105
});
106
107
it("client1 and client2 still route properly", async () => {
108
const total = recv1 + recv1b;
109
for (let i = 0; i < count; i++) {
110
await client1.publish(subject, "hi");
111
await client2.publish(subject, "hi");
112
}
113
await wait({ until: () => recv1 + recv1b >= total + 2 * count });
114
expect(recv1 + recv1b).toEqual(total + 2 * count);
115
expect(recv1 * recv1b).toEqual(0);
116
});
117
});
118
119
afterAll(after);
120
121