Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/cluster/cluster-sticky-state.test.ts
1712 views
1
import {
2
before,
3
after,
4
defaultCluster as servers,
5
waitForConsistentState,
6
wait,
7
addNodeToDefaultCluster,
8
} from "@cocalc/backend/conat/test/setup";
9
import { STICKY_QUEUE_GROUP } from "@cocalc/conat/core/client";
10
import { randomId } from "@cocalc/conat/names";
11
12
beforeAll(before);
13
14
describe("ensure sticky state sync and use is working properly", () => {
15
let clients: any;
16
it("2-node cluster", async () => {
17
await addNodeToDefaultCluster();
18
expect(servers.length).toBe(2);
19
await waitForConsistentState(servers);
20
clients = servers.map((x) => x.client());
21
});
22
23
const count = 25;
24
const subs0: any[] = [];
25
const subs1: any[] = [];
26
it(`create ${count} distinct sticky subscriptions and send one message to each to create sticky routing state on servers[0]`, async () => {
27
clients.push(servers[0].client());
28
clients.push(servers[1].client());
29
for (let i = 0; i < count; i++) {
30
subs0.push(
31
await clients[1].subscribe(`subject.${i}.*`, {
32
queue: STICKY_QUEUE_GROUP,
33
}),
34
);
35
// wait so above subscription is known to *both* servers:
36
// @ts-ignore
37
await servers[0].waitForInterest(
38
`subject.${i}.0`,
39
5000,
40
clients[0].conn.id,
41
);
42
subs1.push(
43
await clients[0].subscribe(`subject.${i}.*`, {
44
queue: STICKY_QUEUE_GROUP,
45
}),
46
);
47
// publishing causes a choice to be made and saved on servers[0]
48
await clients[0].publish(`subject.${i}.foo`, "hello");
49
expect(servers[0].sticky[`subject.${i}.*`]).not.toBe(undefined);
50
// but no choice on servers[1]
51
expect(servers[1].sticky[`subject.${i}.*`]).toBe(undefined);
52
}
53
});
54
55
let chosen;
56
it("see which subscription got chosen for subject.0.* -- this is useful later", async () => {
57
const p0 = async () => {
58
await subs0[0].next();
59
return 0;
60
};
61
const p1 = async () => {
62
await subs1[0].next();
63
return 1;
64
};
65
chosen = await Promise.race([p0(), p1()]);
66
});
67
68
it(`sticky on servers[0] should have ${count} entries starting in "subject".`, async () => {
69
const v = Object.keys(servers[0].sticky).filter((s) =>
70
s.startsWith("subject."),
71
);
72
expect(v.length).toBe(count);
73
});
74
75
it(`sticky on servers[1] should have no entries starting in "subject".`, async () => {
76
const v = Object.keys(servers[1].sticky).filter((s) =>
77
s.startsWith("subject."),
78
);
79
expect(v.length).toBe(0);
80
});
81
82
it(`servers[1]'s link to servers[0] should *eventually* have ${count} entries starting in "subject."`, async () => {
83
// @ts-ignore
84
const link = servers[1].clusterLinksByAddress[servers[0].address()];
85
let v;
86
await wait({
87
until: () => {
88
v = Object.keys(link.sticky).filter((s) => s.startsWith("subject."));
89
return v.length == count;
90
},
91
});
92
expect(v.length).toBe(count);
93
});
94
95
it("send message from clients[1] to each subject", async () => {
96
for (let i = 0; i < count; i++) {
97
await clients[1].publish(`subject.${i}.foo`);
98
}
99
});
100
101
// Sometimes this fails under very heavy load.
102
// It's not a good test, because it probably hits some timeouts sometimes, and
103
// it is testing internal structure/optimizations, not behavior.
104
// Note also that minimizing sticky state computation is just an optimization so even if this test were failing
105
// due to a bug, it might just mean things are slightly slower.
106
// it(`sticky on servers[1] should STILL have no entries starting in "subject", since no choices had to be made`, async () => {
107
// const v = Object.keys(servers[1].sticky).filter((s) =>
108
// s.startsWith("subject."),
109
// );
110
// expect(v.length).toBe(0);
111
// });
112
113
async function deliveryTest() {
114
const sub = chosen == 0 ? subs0[0] : subs1[0];
115
116
// clear up the subscription (we sent it stuff above)
117
const sentinel = randomId();
118
await clients[0].publish("subject.0.foo", sentinel);
119
while (true) {
120
const { value } = await sub.next();
121
if (value.data == sentinel) {
122
break;
123
}
124
}
125
for (const server of servers) {
126
// we randomize the last segment to verify that it is NOT used
127
// as input to the sticky routing choice.
128
const { count } = await server
129
.client()
130
.publish(`subject.0.${randomId()}`, "delivery-test");
131
expect(count).toBe(1);
132
}
133
const ids = new Set<string>();
134
for (let i = 0; i < servers.length; i++) {
135
// on of the subs will receive it and one will hang forever (which is fine)
136
const { value } = await sub.next();
137
expect(value.data).toBe("delivery-test");
138
ids.add(value.client.id);
139
}
140
// all messages must go to the SAME subscriber, since sticky
141
expect(ids.size).toBe(1);
142
}
143
144
it("publish from every node to subject.0.foo", deliveryTest);
145
146
const count2 = 5;
147
it(`add ${count2} more nodes to the cluster should be reaonably fast and not blow up in a feedback loop`, async () => {
148
for (let i = 0; i < count2; i++) {
149
await addNodeToDefaultCluster();
150
}
151
});
152
153
it("wait until cluster is consistent", async () => {
154
await waitForConsistentState(servers);
155
});
156
157
it("double check the links have the sticky state", () => {
158
for (const server of servers.slice(1)) {
159
// @ts-ignore
160
const link = server.clusterLinksByAddress[servers[0].address()];
161
const v = Object.keys(link.sticky).filter((s) =>
162
s.startsWith("subject."),
163
);
164
expect(v.length).toBe(count);
165
}
166
});
167
168
it(
169
"in bigger, cluster, publish from every node to subject.0.foo",
170
deliveryTest,
171
);
172
173
it("listen on > and note that it doesn't impact the count", async () => {
174
const sub = await clients[0].subscribe(">");
175
for (let i = 0; i < servers.length; i++) {
176
const { count } = await servers[i]
177
.client()
178
.publish("subject.0.foo", "hi");
179
expect(count).toBe(1);
180
}
181
sub.close();
182
});
183
184
it("unjoining servers[0] from servers[1] should transfer the sticky state to servers[1]", async () => {
185
await servers[1].unjoin({ address: servers[0].address() });
186
const v = Object.keys(servers[1].sticky).filter((s) =>
187
s.startsWith("subject."),
188
);
189
expect(v.length).toBe(count);
190
});
191
});
192
193
afterAll(after);
194
195