Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/setup.ts
1712 views
1
import getPort from "@cocalc/backend/get-port";
2
import { type Client } from "@cocalc/conat/core/client";
3
import {
4
init as createConatServer,
5
type Options,
6
type ConatServer,
7
} from "@cocalc/conat/core/server";
8
import getLogger from "@cocalc/backend/logger";
9
import { setConatClient } from "@cocalc/conat/client";
10
import { server as createPersistServer } from "@cocalc/backend/conat/persist";
11
import { syncFiles } from "@cocalc/conat/persist/context";
12
import { mkdtemp, rm } from "node:fs/promises";
13
import { tmpdir } from "node:os";
14
import { join } from "path";
15
import { wait } from "@cocalc/backend/conat/test/util";
16
import { delay } from "awaiting";
17
export { setDefaultTimeouts } from "@cocalc/conat/core/client";
18
export { setDefaultSocketTimeouts } from "@cocalc/conat/socket/util";
19
export { setDefaultReconnectDelay } from "@cocalc/conat/persist/client";
20
import { once } from "@cocalc/util/async-utils";
21
import { until } from "@cocalc/util/async-utils";
22
import { randomId } from "@cocalc/conat/names";
23
import { isEqual } from "lodash";
24
25
export { wait, delay, once };
26
27
const logger = getLogger("conat:test:setup");
28
29
export const path = "/conat";
30
31
export async function initConatServer(
32
options: Partial<Options> = {},
33
): Promise<ConatServer> {
34
logger.debug("init");
35
if (!options?.port) {
36
const port = await getPort();
37
options = { ...options, port };
38
}
39
40
const server = createConatServer(options);
41
if (server.clusterName == "default") {
42
defaultCluster.push(server);
43
}
44
if (server.state != "ready") {
45
await once(server, "ready");
46
}
47
return server;
48
}
49
50
export let tempDir;
51
export let server: any = null;
52
export let persistServer: any = null;
53
54
let nodeNumber = 0;
55
function getNodeId() {
56
return `node-${nodeNumber++}`;
57
}
58
59
export async function createServer(opts?) {
60
return await initConatServer({
61
port: await getPort(),
62
path,
63
clusterName: "default",
64
id: opts?.id ?? getNodeId(),
65
systemAccountPassword: "secret",
66
...opts,
67
});
68
}
69
70
// add another node to the cluster -- this is still in the same process (not forked), which
71
// is generally good since you can console.log from it, faster, etc.
72
// this does connect the new node to all existing nodes.
73
export const defaultCluster: ConatServer[] = [];
74
export async function addNodeToDefaultCluster(): Promise<ConatServer> {
75
const port = await getPort();
76
const node = await initConatServer({
77
port,
78
path,
79
clusterName: "default",
80
id: getNodeId(),
81
systemAccountPassword: "secret",
82
});
83
for (const s of defaultCluster) {
84
await s.join(node.address());
85
await node.join(s.address());
86
}
87
return node;
88
}
89
90
export async function createConatCluster(n: number, opts?) {
91
const clusterName = opts?.clusterName ?? `cluster-${randomId()}`;
92
const systemAccountPassword = opts?.systemAccountPassword ?? randomId();
93
const servers: { [id: string]: ConatServer } = {};
94
for (let i = 0; i < n; i++) {
95
const id = `node-${i}`;
96
servers[id] = await createServer({
97
systemAccountPassword,
98
clusterName,
99
id,
100
autoscanInterval: 0,
101
...opts,
102
});
103
}
104
// join every server to every other server
105
const v: any[] = [];
106
for (let i = 0; i < n; i++) {
107
for (let j = 0; j < n; j++) {
108
if (i != j) {
109
v.push(
110
servers[`node-${i}`].join(
111
`http://localhost:${servers[`node-${j}`].options.port}`,
112
),
113
);
114
}
115
}
116
}
117
await Promise.all(v);
118
return servers;
119
}
120
121
export async function restartServer() {
122
const port = server.options.port;
123
await server.close();
124
await delay(250);
125
server = await createServer({ port });
126
}
127
128
export async function restartPersistServer() {
129
await persistServer.close();
130
client = connect();
131
persistServer = createPersistServer({ client });
132
}
133
134
// one pre-made client
135
export let client;
136
export async function before(
137
opts: { archive?: string; backup?: string; archiveInterval?: number } = {},
138
) {
139
// syncFiles and tempDir define where the persist server persists data.
140
tempDir = await mkdtemp(join(tmpdir(), "conat-test"));
141
syncFiles.local = join(tempDir, "local");
142
if (opts.archive) {
143
syncFiles.archive = join(tempDir, "archive");
144
}
145
if (opts.archiveInterval) {
146
syncFiles.archiveInterval = opts.archiveInterval;
147
}
148
if (opts.backup) {
149
syncFiles.backup = join(tempDir, "backup");
150
}
151
152
server = await createServer();
153
client = connect();
154
persistServer = createPersistServer({ client });
155
setConatClient({
156
conat: connect,
157
getLogger,
158
});
159
}
160
161
const clients: Client[] = [];
162
export function connect(opts?): Client {
163
const cn = server.client({ noCache: true, path: "/", ...opts });
164
clients.push(cn);
165
return cn;
166
}
167
168
// Given a list of servers that are all connected together in a common
169
// cluster, wait until they all have a consistent view of the interest.
170
// I.e., the interest object for servers[i] is the same as what every
171
// other thinks it is.
172
export async function waitForConsistentState(
173
servers: ConatServer[],
174
timeout = 10000,
175
): Promise<void> {
176
if (servers.length <= 1) {
177
return;
178
}
179
// @ts-ignore
180
const clusterName = servers[0].clusterName;
181
if (!clusterName) {
182
throw Error("not a cluster");
183
}
184
const ids = new Set<string>([servers[0].id]);
185
for (let i = 1; i < servers.length; i++) {
186
// @ts-ignore
187
if (servers[i].clusterName != clusterName) {
188
throw Error("all servers must be in the same cluster");
189
}
190
ids.add(servers[i].id);
191
}
192
193
if (ids.size != servers.length) {
194
throw Error(
195
`all servers must have distinct ids -- ${JSON.stringify(servers.map((x) => x.id))}`,
196
);
197
}
198
199
const start = Date.now();
200
await until(
201
() => {
202
for (let i = 0; i < servers.length; i++) {
203
if (servers[i].state == "closed") {
204
return true;
205
}
206
// now look at everybody else's view of servers[i].
207
// @ts-ignore
208
const a = servers[i].interest.serialize().patterns;
209
const b = servers[i].sticky;
210
const hashServer = servers[i].hash();
211
for (let j = 0; j < servers.length; j++) {
212
if (i != j) {
213
// @ts-ignore
214
const link = servers[j].clusterLinks[clusterName]?.[servers[i].id];
215
if (link == null) {
216
if (Date.now() - start > 3000) {
217
console.log(`node ${j} is not connected to node ${i}`);
218
}
219
return false;
220
}
221
const hashLink = link.hash();
222
const x = link.interest.serialize().patterns;
223
const y = link.sticky;
224
const showInfo = () => {
225
for (const type of ["interest", "sticky"]) {
226
console.log(
227
`server stream ${type}: `,
228
hashServer[type],
229
// @ts-ignore
230
servers[i].clusterStreams[type].stream.client.id,
231
// @ts-ignore
232
servers[i].clusterStreams[type].stream.storage.path,
233
// @ts-ignore
234
servers[i].clusterStreams[type].seqs(),
235
// @ts-ignore
236
//servers[i].clusterStreams.interest.getAll(),
237
);
238
239
console.log(
240
`link stream ${type}: `,
241
hashLink[type],
242
// @ts-ignore
243
link.streams[type].stream.client.id,
244
// @ts-ignore
245
link.streams[type].stream.storage.path,
246
// @ts-ignore
247
link.streams[type].seqs(),
248
// @ts-ignore
249
//link.streams.interest.getAll(),
250
);
251
}
252
console.log("waitForConsistentState", {
253
i,
254
j,
255
serverInterest: a,
256
linkInterest: x,
257
serverSticky: b,
258
linkSticky: y,
259
});
260
};
261
if (!isEqual(hashServer, hashLink)) {
262
if (Date.now() - start > 3000) {
263
console.log("hashes are not equal");
264
// likely going to fail
265
showInfo();
266
}
267
return false;
268
}
269
if (!isEqual(a, x) /*|| !isEqual(b, y) */) {
270
// @ts-ignore
271
const seqs0 = servers[i].clusterStreams.interest.seqs();
272
const seqs1 = link.streams.interest.seqs();
273
if (
274
!isEqual(
275
seqs0.slice(0, seqs1.length),
276
seqs1.slice(0, seqs0.length),
277
)
278
) {
279
showInfo();
280
throw Error(`inconsistent initial sequences`);
281
}
282
283
if (Date.now() - start > 3000) {
284
// likely going to fail
285
showInfo();
286
}
287
288
// not yet equal
289
return false;
290
}
291
}
292
}
293
}
294
return true;
295
},
296
{ timeout },
297
);
298
}
299
300
export async function after() {
301
persistServer?.close();
302
await rm(tempDir, { force: true, recursive: true });
303
try {
304
server?.close();
305
} catch {}
306
for (const cn of clients) {
307
try {
308
cn.close();
309
} catch {}
310
}
311
}
312
313
process.once("exit", () => {
314
after();
315
});
316
317
["SIGINT", "SIGTERM", "SIGQUIT"].forEach((sig) => {
318
process.once(sig, () => {
319
process.exit();
320
});
321
});
322
323