Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/cluster/pubsub.test.ts
1712 views
1
/*
2
pnpm test `pwd`/pubsub.test.ts
3
*/
4
5
import {
6
before,
7
after,
8
server,
9
addNodeToDefaultCluster,
10
wait,
11
delay,
12
waitForConsistentState,
13
defaultCluster,
14
} from "@cocalc/backend/conat/test/setup";
15
import { STICKY_QUEUE_GROUP } from "@cocalc/conat/core/client";
16
17
beforeAll(before);
18
19
describe("the most basic pub/sub test with a 2-node cluster", () => {
20
let client0, server1, client1;
21
it("add another node to cluster", async () => {
22
client0 = server.client();
23
server1 = await addNodeToDefaultCluster();
24
client1 = server1.client();
25
});
26
27
let sub;
28
it("subscribe", async () => {
29
sub = await client0.subscribe("cocalc");
30
});
31
32
// commenting this out, since it is impossible to robustly test in all cases,
33
// since sometmes the subscription info can move very quickly and this publish
34
// could be slow!
35
// it("publish -- message is initially dropped with no receiver because interest doesn't propogate instantly", async () => {
36
// const { count } = await client1.publish("cocalc", "hi");
37
// expect(count).toBe(0);
38
// });
39
40
it("publish after waiting for interest -- this works", async () => {
41
await client1.waitForInterest("cocalc");
42
const { count, bytes } = await client1.publish("cocalc", "hi");
43
expect(count).toBe(1);
44
expect(bytes).toBe(3);
45
});
46
47
it("receive", async () => {
48
const { value } = await sub.next();
49
expect(value.data).toBe("hi");
50
});
51
52
it("clean up", () => {
53
sub.close();
54
});
55
});
56
57
describe("same basic test, but in the other direction", () => {
58
let client0, client1;
59
it("get the clients", async () => {
60
client0 = server.client();
61
// @ts-ignore
62
client1 = Object.values(server.clusterLinks[server.clusterName])[0].client;
63
});
64
65
let sub;
66
it("subscribe", async () => {
67
sub = await client1.subscribe("conat");
68
});
69
70
it("publish after waiting for interest -- this works", async () => {
71
await client0.waitForInterest("conat");
72
const { count, bytes } = await client0.publish("conat", "hi");
73
expect(count).toBe(1);
74
expect(bytes).toBe(3);
75
});
76
77
it("receive", async () => {
78
const { value } = await sub.next();
79
expect(value.data).toBe("hi");
80
});
81
82
it("clean up", () => {
83
sub.close();
84
});
85
});
86
87
describe("three nodes and two subscribers with different queue group on distinct nodes", () => {
88
let client0, client1, client2, server2;
89
it("get the clients", async () => {
90
client0 = server.client();
91
// @ts-ignore
92
client1 = Object.values(server.clusterLinks[server.clusterName])[0].client;
93
server2 = await addNodeToDefaultCluster();
94
client2 = server2.client();
95
});
96
97
let sub0, sub1;
98
it("subscribes from two nodes", async () => {
99
sub0 = await client0.subscribe("sage", { queue: "0" });
100
sub1 = await client1.subscribe("sage", { queue: "1" });
101
});
102
103
it("only way to be sure to get both is to just try -- different queue group so eventually there will be two receivers", async () => {
104
await wait({
105
until: async () => {
106
const { count } = await client2.publish("sage", "hi");
107
return count == 2;
108
},
109
});
110
// and check both got something
111
await sub0.next();
112
await sub1.next();
113
});
114
115
it("subscribe from two nodes but with same queue group so only one subscriber will get it", async () => {
116
sub0 = await client0.subscribe("math", { queue: "0" });
117
sub1 = await client1.subscribe("math", { queue: "0" });
118
await client2.waitForInterest("math");
119
await delay(250);
120
const { count } = await client2.publish("math", "hi");
121
expect(count).toBe(1);
122
});
123
124
it("make sticky subs from each node, and see that all messages go to the same receiver, no matter which node we send from", async () => {
125
const sub0 = await client0.subscribe("sticky", {
126
queue: STICKY_QUEUE_GROUP,
127
});
128
let n0 = 0;
129
(async () => {
130
for await (const _ of sub0) {
131
n0++;
132
}
133
})();
134
135
const sub1 = await client1.subscribe("sticky", {
136
queue: STICKY_QUEUE_GROUP,
137
});
138
let n1 = 0;
139
(async () => {
140
for await (const _ of sub1) {
141
n1++;
142
}
143
})();
144
145
const sub2 = await client2.subscribe("sticky", {
146
queue: STICKY_QUEUE_GROUP,
147
});
148
let n2 = 0;
149
(async () => {
150
for await (const _ of sub2) {
151
n2++;
152
}
153
})();
154
155
await client2.waitForInterest("sticky");
156
157
for (let i = 0; i < 20; i++) {
158
const { count } = await client2.publish("sticky", null);
159
expect(count).toBe(1);
160
}
161
for (let i = 0; i < 20; i++) {
162
const { count } = await client1.publish("sticky", null);
163
expect(count).toBe(1);
164
}
165
for (let i = 0; i < 20; i++) {
166
const { count } = await client0.publish("sticky", null);
167
expect(count).toBe(1);
168
}
169
await wait({ until: () => n0 + n1 + n2 == 60 });
170
expect(n0 * n1).toBe(0);
171
expect(n0 * n2).toBe(0);
172
expect(n1 * n2).toBe(0);
173
expect(n0 + n1 + n2).toBe(60);
174
175
await waitForConsistentState(defaultCluster);
176
177
// add another node and test
178
const server3 = await addNodeToDefaultCluster();
179
const client3 = server3.client();
180
await waitForConsistentState(defaultCluster);
181
for (let i = 0; i < 20; i++) {
182
const { count } = await client3.publish("sticky", null);
183
expect(count).toBe(1);
184
}
185
await wait({ until: () => n0 + n1 + n2 == 80 });
186
expect(n0 * n1).toBe(0);
187
expect(n0 * n2).toBe(0);
188
expect(n1 * n2).toBe(0);
189
expect(n0 + n1 + n2).toBe(80);
190
});
191
});
192
193
afterAll(after);
194
195