Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/persist/cluster.test.ts
1712 views
1
/*
2
3
pnpm test `pwd`/cluster.test.ts
4
5
*/
6
7
import { server as createPersistServer } from "@cocalc/backend/conat/persist";
8
import {
9
after,
10
before,
11
server,
12
addNodeToDefaultCluster,
13
once,
14
delay,
15
persistServer as persistServer0,
16
wait,
17
setDefaultTimeouts,
18
setDefaultSocketTimeouts,
19
setDefaultReconnectDelay,
20
waitForConsistentState,
21
} from "../setup";
22
import { uuid } from "@cocalc/util/misc";
23
24
const BROKEN_THRESH = 30;
25
26
beforeAll(async () => {
27
await before();
28
// this speeds up the automatic failover tests a lot.
29
setDefaultTimeouts({ request: 1000, publish: 1000 });
30
setDefaultSocketTimeouts({
31
command: 1000,
32
keepAlive: 2000,
33
keepAliveTimeout: 1000,
34
});
35
setDefaultReconnectDelay(1);
36
});
37
38
jest.setTimeout(10000);
39
describe("test using multiple persist servers in a cluster", () => {
40
let client0, server1, client1;
41
it("add another node", async () => {
42
client0 = server.client();
43
server1 = await addNodeToDefaultCluster();
44
client1 = server1.client();
45
});
46
47
let persistServer1;
48
it("add a second persist server connected to server1", async () => {
49
persistServer1 = createPersistServer({ client: client1 });
50
await once(persistServer1, "ready");
51
expect(persistServer1.state).toBe("ready");
52
await waitForConsistentState([server, server1]);
53
});
54
55
it("make streams until there is at least one connection to each persist server -- this must happen quickly at random due to how sticky queue groups work", async () => {
56
const v: any[] = [];
57
// baseline - no sockets
58
await wait({
59
until: () =>
60
Object.keys(persistServer0.sockets).length == 0 &&
61
Object.keys(persistServer1.sockets).length == 0,
62
});
63
while (
64
Object.keys(persistServer0.sockets).length == 0 ||
65
Object.keys(persistServer1.sockets).length == 0
66
) {
67
const s = await client1.sync.dstream({
68
project_id: uuid(),
69
name: "foo",
70
sync: true,
71
});
72
// this helps give time for the persist server added above to be known
73
await delay(50);
74
v.push(s);
75
if (v.length > BROKEN_THRESH) {
76
throw Error("sticky queue groups are clearly not working properly");
77
}
78
}
79
v.map((x) => x.close());
80
});
81
82
const project_ids: string[] = [];
83
it("same test as above, but with client connected to server0", async () => {
84
// baseline
85
await wait({
86
until: () =>
87
Object.keys(persistServer0.sockets).length == 0 &&
88
Object.keys(persistServer1.sockets).length == 0,
89
});
90
91
const v: any[] = [];
92
while (
93
Object.keys(persistServer0.sockets).length == 0 ||
94
Object.keys(persistServer1.sockets).length == 0
95
) {
96
const project_id = uuid();
97
project_ids.push(project_id);
98
const s = await client0.sync.dstream({
99
project_id,
100
name: "foo",
101
sync: true,
102
});
103
v.push(s);
104
s.publish(project_id);
105
await s.save();
106
if (v.length > BROKEN_THRESH) {
107
throw Error("sticky queue groups are clearly not working properly");
108
}
109
}
110
v.map((x) => x.close());
111
});
112
113
const openStreams0: any[] = [];
114
const openStreams1: any[] = [];
115
it("create more streams connected to both servers to use both", async () => {
116
// wait for all the sockets to close in order to not mess up other tests
117
await wait({
118
until: () =>
119
Object.keys(persistServer0.sockets).length == 0 ||
120
Object.keys(persistServer1.sockets).length == 0,
121
});
122
123
while (
124
Object.keys(persistServer0.sockets).length == 0 ||
125
Object.keys(persistServer1.sockets).length == 0
126
) {
127
const project_id = uuid();
128
const s = await client1.sync.dstream({
129
project_id,
130
name: "foo",
131
noCache: true,
132
sync: true,
133
});
134
s.publish("x");
135
await s.save();
136
openStreams0.push(s);
137
if (openStreams0.length > BROKEN_THRESH) {
138
throw Error("sticky queue groups are clearly not working properly");
139
}
140
const t = await client0.sync.dstream({
141
project_id,
142
name: "foo",
143
noCache: true,
144
sync: true,
145
});
146
expect(t.getAll()).toEqual(["x"]);
147
openStreams1.push(t);
148
}
149
expect(openStreams0.length).toBeGreaterThan(0);
150
});
151
152
it("remove one persist server", async () => {
153
persistServer1.close();
154
});
155
156
it("creating / opening streams we made above still work with no data lost", async () => {
157
for (const project_id of project_ids) {
158
const s = await client0.sync.dstream({
159
project_id,
160
name: "foo",
161
noCache: true,
162
sync: true,
163
});
164
expect(await s.getAll()).toEqual([project_id]);
165
s.close();
166
}
167
expect(Object.keys(persistServer1.sockets).length).toEqual(0);
168
});
169
170
// this can definitely take a long time (e.g., ~10s), as it involves automatic failover.
171
it("Checks automatic failover works: the streams connected to both servers we created above must keep working, despite at least one of them having its persist server get closed.", async () => {
172
for (let i = 0; i < openStreams0.length; i++) {
173
const stream0 = openStreams0[i];
174
stream0.publish("y");
175
await stream0.save();
176
expect(stream0.hasUnsavedChanges()).toBe(false);
177
178
const stream1 = openStreams1[i];
179
expect(stream0.opts.project_id).toEqual(stream1.opts.project_id);
180
await wait({
181
until: async () => {
182
return stream1.length >= 2;
183
},
184
timeout: 5000,
185
start: 1000,
186
});
187
expect(stream1.length).toBe(2);
188
}
189
});
190
});
191
192
afterAll(after);
193
194