Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/setup.ts
5750 views
1
import getPort from "@cocalc/backend/get-port";
2
import { type Client } from "@cocalc/conat/core/client";
3
import {
4
init as createConatServer,
5
type Options,
6
type ConatServer,
7
} from "@cocalc/conat/core/server";
8
import getLogger from "@cocalc/backend/logger";
9
import { setConatClient } from "@cocalc/conat/client";
10
import { server as createPersistServer } from "@cocalc/backend/conat/persist";
11
import { syncFiles } from "@cocalc/conat/persist/context";
12
import { mkdtemp, rm } from "node:fs/promises";
13
import { tmpdir } from "node:os";
14
import { join } from "path";
15
import { wait } from "@cocalc/backend/conat/test/util";
16
import { delay } from "awaiting";
17
export { setDefaultTimeouts } from "@cocalc/conat/core/client";
18
export { setDefaultSocketTimeouts } from "@cocalc/conat/socket/util";
19
export { setDefaultReconnectDelay } from "@cocalc/conat/persist/client";
20
import { once } from "@cocalc/util/async-utils";
21
import { until } from "@cocalc/util/async-utils";
22
import { randomId } from "@cocalc/conat/names";
23
import { isEqual } from "lodash";
24
25
export { wait, delay, once };
26
27
const logger = getLogger("conat:test:setup");
28
29
export const path = "/conat";
30
31
export async function initConatServer(
32
options: Partial<Options> = {},
33
): Promise<ConatServer> {
34
logger.debug("init");
35
if (!options?.port) {
36
const port = await getPort();
37
options = { ...options, port };
38
}
39
40
const server = createConatServer(options);
41
if (server.clusterName == "default") {
42
defaultCluster.push(server);
43
}
44
if (server.state != "ready") {
45
await once(server, "ready");
46
}
47
return server;
48
}
49
50
export let tempDir;
51
export let server: any = null;
52
export let persistServer: any = null;
53
54
let nodeNumber = 0;
55
function getNodeId() {
56
return `node-${nodeNumber++}`;
57
}
58
59
export async function createServer(opts?) {
60
return await initConatServer({
61
port: await getPort(),
62
path,
63
clusterName: "default",
64
id: opts?.id ?? getNodeId(),
65
systemAccountPassword: "secret",
66
...opts,
67
});
68
}
69
70
// add another node to the cluster -- this is still in the same process (not forked), which
71
// is generally good since you can console.log from it, faster, etc.
72
// this does connect the new node to all existing nodes.
73
export const defaultCluster: ConatServer[] = [];
74
export async function addNodeToDefaultCluster(): Promise<ConatServer> {
75
const port = await getPort();
76
const node = await initConatServer({
77
port,
78
path,
79
clusterName: "default",
80
id: getNodeId(),
81
systemAccountPassword: "secret",
82
});
83
for (const s of defaultCluster) {
84
await s.join(node.address());
85
await node.join(s.address());
86
}
87
return node;
88
}
89
90
export async function createConatCluster(n: number, opts?) {
91
const clusterName = opts?.clusterName ?? `cluster-${randomId()}`;
92
const systemAccountPassword = opts?.systemAccountPassword ?? randomId();
93
const servers: { [id: string]: ConatServer } = {};
94
for (let i = 0; i < n; i++) {
95
const id = `node-${i}`;
96
servers[id] = await createServer({
97
systemAccountPassword,
98
clusterName,
99
id,
100
autoscanInterval: 0,
101
...opts,
102
});
103
}
104
// join every server to every other server
105
const v: any[] = [];
106
for (let i = 0; i < n; i++) {
107
for (let j = 0; j < n; j++) {
108
if (i != j) {
109
v.push(
110
servers[`node-${i}`].join(
111
`http://localhost:${servers[`node-${j}`].options.port}`,
112
),
113
);
114
}
115
}
116
}
117
await Promise.all(v);
118
return servers;
119
}
120
121
export async function restartServer() {
122
const port = server.options.port;
123
await server.close();
124
await delay(250);
125
server = await createServer({ port });
126
}
127
128
export async function restartPersistServer() {
129
await persistServer.close();
130
client = connect();
131
persistServer = createPersistServer({ client });
132
}
133
134
// one pre-made client
135
export let client;
136
export async function before(
137
opts: { archive?: string; backup?: string; archiveInterval?: number } = {},
138
) {
139
// syncFiles and tempDir define where the persist server persists data.
140
tempDir = await mkdtemp(join(tmpdir(), "conat-test"));
141
syncFiles.local = join(tempDir, "local");
142
if (opts.archive) {
143
syncFiles.archive = join(tempDir, "archive");
144
}
145
if (opts.archiveInterval) {
146
syncFiles.archiveInterval = opts.archiveInterval;
147
}
148
if (opts.backup) {
149
syncFiles.backup = join(tempDir, "backup");
150
}
151
152
server = await createServer();
153
client = connect();
154
persistServer = createPersistServer({ client });
155
setConatClient({
156
conat: connect,
157
getLogger,
158
});
159
}
160
161
const clients: Client[] = [];
162
export function connect(opts?): Client {
163
const cn = server.client({ noCache: true, path: "/", ...opts });
164
clients.push(cn);
165
return cn;
166
}
167
168
// Given a list of servers that are all connected together in a common
169
// cluster, wait until they all have a consistent view of the interest.
170
// I.e., the interest object for servers[i] is the same as what every
171
// other thinks it is.
172
export async function waitForConsistentState(
173
servers: ConatServer[],
174
timeout = 10000,
175
): Promise<void> {
176
if (servers.length <= 1) {
177
return;
178
}
179
// @ts-ignore
180
const clusterName = servers[0].clusterName;
181
if (!clusterName) {
182
throw Error("not a cluster");
183
}
184
const ids = new Set<string>([servers[0].id]);
185
for (let i = 1; i < servers.length; i++) {
186
// @ts-ignore
187
if (servers[i].clusterName != clusterName) {
188
throw Error("all servers must be in the same cluster");
189
}
190
ids.add(servers[i].id);
191
}
192
193
if (ids.size != servers.length) {
194
throw Error(
195
`all servers must have distinct ids -- ${JSON.stringify(servers.map((x) => x.id))}`,
196
);
197
}
198
199
const start = Date.now();
200
await until(
201
() => {
202
for (let i = 0; i < servers.length; i++) {
203
if (servers[i].state == "closed") {
204
return true;
205
}
206
// now look at everybody else's view of servers[i].
207
// @ts-ignore
208
const a = servers[i].interest.serialize().patterns;
209
const hashServer = servers[i].hash();
210
for (let j = 0; j < servers.length; j++) {
211
if (i != j) {
212
// @ts-ignore
213
const link = servers[j].clusterLinks[clusterName]?.[servers[i].id];
214
if (link == null) {
215
if (Date.now() - start > 3000) {
216
console.log(`node ${j} is not connected to node ${i}`);
217
}
218
return false;
219
}
220
const hashLink = link.hash();
221
const x = link.interest.serialize().patterns;
222
const showInfo = () => {
223
for (const type of ["interest"]) {
224
console.log(
225
`server stream ${type}: `,
226
hashServer[type],
227
// @ts-ignore
228
servers[i].clusterStreams[type].stream.client.id,
229
// @ts-ignore
230
servers[i].clusterStreams[type].stream.storage.path,
231
// @ts-ignore
232
servers[i].clusterStreams[type].seqs(),
233
// @ts-ignore
234
//servers[i].clusterStreams.interest.getAll(),
235
);
236
237
console.log(
238
`link stream ${type}: `,
239
hashLink[type],
240
// @ts-ignore
241
link.streams[type].stream.client.id,
242
// @ts-ignore
243
link.streams[type].stream.storage.path,
244
// @ts-ignore
245
link.streams[type].seqs(),
246
// @ts-ignore
247
//link.streams.interest.getAll(),
248
);
249
}
250
console.log("waitForConsistentState", {
251
i,
252
j,
253
serverInterest: a,
254
linkInterest: x,
255
});
256
};
257
if (!isEqual(hashServer, hashLink)) {
258
if (Date.now() - start > 3000) {
259
console.log("hashes are not equal");
260
// likely going to fail
261
showInfo();
262
}
263
return false;
264
}
265
if (!isEqual(a, x) /*|| !isEqual(b, y) */) {
266
// @ts-ignore
267
const seqs0 = servers[i].clusterStreams.interest.seqs();
268
const seqs1 = link.streams.interest.seqs();
269
if (
270
!isEqual(
271
seqs0.slice(0, seqs1.length),
272
seqs1.slice(0, seqs0.length),
273
)
274
) {
275
showInfo();
276
throw Error(`inconsistent initial sequences`);
277
}
278
279
if (Date.now() - start > 3000) {
280
// likely going to fail
281
showInfo();
282
}
283
284
// not yet equal
285
return false;
286
}
287
}
288
}
289
}
290
return true;
291
},
292
{ timeout },
293
);
294
}
295
296
export async function after() {
297
persistServer?.close();
298
await rm(tempDir, { force: true, recursive: true });
299
try {
300
server?.close();
301
} catch {}
302
for (const cn of clients) {
303
try {
304
cn.close();
305
} catch {}
306
}
307
}
308
309
process.once("exit", () => {
310
after();
311
});
312
313
["SIGINT", "SIGTERM", "SIGQUIT"].forEach((sig) => {
314
process.once(sig, () => {
315
process.exit();
316
});
317
});
318
319