Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/cluster/pubsub.test.ts
5886 views
1
/*
2
pnpm test `pwd`/pubsub.test.ts
3
*/
4
5
import {
6
before,
7
after,
8
server,
9
addNodeToDefaultCluster,
10
wait,
11
delay,
12
} from "@cocalc/backend/conat/test/setup";
13
14
beforeAll(before);
15
16
describe("the most basic pub/sub test with a 2-node cluster", () => {
17
let client0, server1, client1;
18
it("add another node to cluster", async () => {
19
client0 = server.client();
20
server1 = await addNodeToDefaultCluster();
21
client1 = server1.client();
22
});
23
24
let sub;
25
it("subscribe", async () => {
26
sub = await client0.subscribe("cocalc");
27
});
28
29
// commenting this out, since it is impossible to robustly test in all cases,
30
// since sometmes the subscription info can move very quickly and this publish
31
// could be slow!
32
// it("publish -- message is initially dropped with no receiver because interest doesn't propogate instantly", async () => {
33
// const { count } = await client1.publish("cocalc", "hi");
34
// expect(count).toBe(0);
35
// });
36
37
it("publish after waiting for interest -- this works", async () => {
38
await client1.waitForInterest("cocalc");
39
const { count, bytes } = await client1.publish("cocalc", "hi");
40
expect(count).toBe(1);
41
expect(bytes).toBe(3);
42
});
43
44
it("receive", async () => {
45
const { value } = await sub.next();
46
expect(value.data).toBe("hi");
47
});
48
49
it("clean up", () => {
50
sub.close();
51
});
52
});
53
54
describe("same basic test, but in the other direction", () => {
55
let client0, client1;
56
it("get the clients", async () => {
57
client0 = server.client();
58
// @ts-ignore
59
client1 = Object.values(server.clusterLinks[server.clusterName])[0].client;
60
});
61
62
let sub;
63
it("subscribe", async () => {
64
sub = await client1.subscribe("conat");
65
});
66
67
it("publish after waiting for interest -- this works", async () => {
68
await client0.waitForInterest("conat");
69
const { count, bytes } = await client0.publish("conat", "hi");
70
expect(count).toBe(1);
71
expect(bytes).toBe(3);
72
});
73
74
it("receive", async () => {
75
const { value } = await sub.next();
76
expect(value.data).toBe("hi");
77
});
78
79
it("clean up", () => {
80
sub.close();
81
});
82
});
83
84
describe("three nodes and two subscribers with different queue group on distinct nodes", () => {
85
let client0, client1, client2, server2;
86
it("get the clients", async () => {
87
client0 = server.client();
88
// @ts-ignore
89
client1 = Object.values(server.clusterLinks[server.clusterName])[0].client;
90
server2 = await addNodeToDefaultCluster();
91
client2 = server2.client();
92
});
93
94
let sub0, sub1;
95
it("subscribes from two nodes", async () => {
96
sub0 = await client0.subscribe("sage", { queue: "0" });
97
sub1 = await client1.subscribe("sage", { queue: "1" });
98
});
99
100
it("only way to be sure to get both is to just try -- different queue group so eventually there will be two receivers", async () => {
101
await wait({
102
until: async () => {
103
const { count } = await client2.publish("sage", "hi");
104
return count == 2;
105
},
106
});
107
// and check both got something
108
await sub0.next();
109
await sub1.next();
110
});
111
112
it("subscribe from two nodes but with same queue group so only one subscriber will get it", async () => {
113
sub0 = await client0.subscribe("math", { queue: "0" });
114
sub1 = await client1.subscribe("math", { queue: "0" });
115
await client2.waitForInterest("math");
116
await delay(250);
117
const { count } = await client2.publish("math", "hi");
118
expect(count).toBe(1);
119
});
120
});
121
122
afterAll(after);
123
124