Path: blob/master/src/packages/backend/conat/test/setup.ts
5750 views
import getPort from "@cocalc/backend/get-port";1import { type Client } from "@cocalc/conat/core/client";2import {3init as createConatServer,4type Options,5type ConatServer,6} from "@cocalc/conat/core/server";7import getLogger from "@cocalc/backend/logger";8import { setConatClient } from "@cocalc/conat/client";9import { server as createPersistServer } from "@cocalc/backend/conat/persist";10import { syncFiles } from "@cocalc/conat/persist/context";11import { mkdtemp, rm } from "node:fs/promises";12import { tmpdir } from "node:os";13import { join } from "path";14import { wait } from "@cocalc/backend/conat/test/util";15import { delay } from "awaiting";16export { setDefaultTimeouts } from "@cocalc/conat/core/client";17export { setDefaultSocketTimeouts } from "@cocalc/conat/socket/util";18export { setDefaultReconnectDelay } from "@cocalc/conat/persist/client";19import { once } from "@cocalc/util/async-utils";20import { until } from "@cocalc/util/async-utils";21import { randomId } from "@cocalc/conat/names";22import { isEqual } from "lodash";2324export { wait, delay, once };2526const logger = getLogger("conat:test:setup");2728export const path = "/conat";2930export async function initConatServer(31options: Partial<Options> = {},32): Promise<ConatServer> {33logger.debug("init");34if (!options?.port) {35const port = await getPort();36options = { ...options, port };37}3839const server = createConatServer(options);40if (server.clusterName == "default") {41defaultCluster.push(server);42}43if (server.state != "ready") {44await once(server, "ready");45}46return server;47}4849export let tempDir;50export let server: any = null;51export let persistServer: any = null;5253let nodeNumber = 0;54function getNodeId() {55return `node-${nodeNumber++}`;56}5758export async function createServer(opts?) {59return await initConatServer({60port: await getPort(),61path,62clusterName: "default",63id: opts?.id ?? getNodeId(),64systemAccountPassword: "secret",65...opts,66});67}6869// add another node to the cluster -- this is still in the same process (not forked), which70// is generally good since you can console.log from it, faster, etc.71// this does connect the new node to all existing nodes.72export const defaultCluster: ConatServer[] = [];73export async function addNodeToDefaultCluster(): Promise<ConatServer> {74const port = await getPort();75const node = await initConatServer({76port,77path,78clusterName: "default",79id: getNodeId(),80systemAccountPassword: "secret",81});82for (const s of defaultCluster) {83await s.join(node.address());84await node.join(s.address());85}86return node;87}8889export async function createConatCluster(n: number, opts?) {90const clusterName = opts?.clusterName ?? `cluster-${randomId()}`;91const systemAccountPassword = opts?.systemAccountPassword ?? randomId();92const servers: { [id: string]: ConatServer } = {};93for (let i = 0; i < n; i++) {94const id = `node-${i}`;95servers[id] = await createServer({96systemAccountPassword,97clusterName,98id,99autoscanInterval: 0,100...opts,101});102}103// join every server to every other server104const v: any[] = [];105for (let i = 0; i < n; i++) {106for (let j = 0; j < n; j++) {107if (i != j) {108v.push(109servers[`node-${i}`].join(110`http://localhost:${servers[`node-${j}`].options.port}`,111),112);113}114}115}116await Promise.all(v);117return servers;118}119120export async function restartServer() {121const port = server.options.port;122await server.close();123await delay(250);124server = await createServer({ port });125}126127export async function restartPersistServer() {128await persistServer.close();129client = connect();130persistServer = createPersistServer({ client });131}132133// one pre-made client134export let client;135export async function before(136opts: { archive?: string; backup?: string; archiveInterval?: number } = {},137) {138// syncFiles and tempDir define where the persist server persists data.139tempDir = await mkdtemp(join(tmpdir(), "conat-test"));140syncFiles.local = join(tempDir, "local");141if (opts.archive) {142syncFiles.archive = join(tempDir, "archive");143}144if (opts.archiveInterval) {145syncFiles.archiveInterval = opts.archiveInterval;146}147if (opts.backup) {148syncFiles.backup = join(tempDir, "backup");149}150151server = await createServer();152client = connect();153persistServer = createPersistServer({ client });154setConatClient({155conat: connect,156getLogger,157});158}159160const clients: Client[] = [];161export function connect(opts?): Client {162const cn = server.client({ noCache: true, path: "/", ...opts });163clients.push(cn);164return cn;165}166167// Given a list of servers that are all connected together in a common168// cluster, wait until they all have a consistent view of the interest.169// I.e., the interest object for servers[i] is the same as what every170// other thinks it is.171export async function waitForConsistentState(172servers: ConatServer[],173timeout = 10000,174): Promise<void> {175if (servers.length <= 1) {176return;177}178// @ts-ignore179const clusterName = servers[0].clusterName;180if (!clusterName) {181throw Error("not a cluster");182}183const ids = new Set<string>([servers[0].id]);184for (let i = 1; i < servers.length; i++) {185// @ts-ignore186if (servers[i].clusterName != clusterName) {187throw Error("all servers must be in the same cluster");188}189ids.add(servers[i].id);190}191192if (ids.size != servers.length) {193throw Error(194`all servers must have distinct ids -- ${JSON.stringify(servers.map((x) => x.id))}`,195);196}197198const start = Date.now();199await until(200() => {201for (let i = 0; i < servers.length; i++) {202if (servers[i].state == "closed") {203return true;204}205// now look at everybody else's view of servers[i].206// @ts-ignore207const a = servers[i].interest.serialize().patterns;208const hashServer = servers[i].hash();209for (let j = 0; j < servers.length; j++) {210if (i != j) {211// @ts-ignore212const link = servers[j].clusterLinks[clusterName]?.[servers[i].id];213if (link == null) {214if (Date.now() - start > 3000) {215console.log(`node ${j} is not connected to node ${i}`);216}217return false;218}219const hashLink = link.hash();220const x = link.interest.serialize().patterns;221const showInfo = () => {222for (const type of ["interest"]) {223console.log(224`server stream ${type}: `,225hashServer[type],226// @ts-ignore227servers[i].clusterStreams[type].stream.client.id,228// @ts-ignore229servers[i].clusterStreams[type].stream.storage.path,230// @ts-ignore231servers[i].clusterStreams[type].seqs(),232// @ts-ignore233//servers[i].clusterStreams.interest.getAll(),234);235236console.log(237`link stream ${type}: `,238hashLink[type],239// @ts-ignore240link.streams[type].stream.client.id,241// @ts-ignore242link.streams[type].stream.storage.path,243// @ts-ignore244link.streams[type].seqs(),245// @ts-ignore246//link.streams.interest.getAll(),247);248}249console.log("waitForConsistentState", {250i,251j,252serverInterest: a,253linkInterest: x,254});255};256if (!isEqual(hashServer, hashLink)) {257if (Date.now() - start > 3000) {258console.log("hashes are not equal");259// likely going to fail260showInfo();261}262return false;263}264if (!isEqual(a, x) /*|| !isEqual(b, y) */) {265// @ts-ignore266const seqs0 = servers[i].clusterStreams.interest.seqs();267const seqs1 = link.streams.interest.seqs();268if (269!isEqual(270seqs0.slice(0, seqs1.length),271seqs1.slice(0, seqs0.length),272)273) {274showInfo();275throw Error(`inconsistent initial sequences`);276}277278if (Date.now() - start > 3000) {279// likely going to fail280showInfo();281}282283// not yet equal284return false;285}286}287}288}289return true;290},291{ timeout },292);293}294295export async function after() {296persistServer?.close();297await rm(tempDir, { force: true, recursive: true });298try {299server?.close();300} catch {}301for (const cn of clients) {302try {303cn.close();304} catch {}305}306}307308process.once("exit", () => {309after();310});311312["SIGINT", "SIGTERM", "SIGQUIT"].forEach((sig) => {313process.once(sig, () => {314process.exit();315});316});317318319