Path: blob/master/src/packages/backend/conat/test/setup.ts
1712 views
import getPort from "@cocalc/backend/get-port";1import { type Client } from "@cocalc/conat/core/client";2import {3init as createConatServer,4type Options,5type ConatServer,6} from "@cocalc/conat/core/server";7import getLogger from "@cocalc/backend/logger";8import { setConatClient } from "@cocalc/conat/client";9import { server as createPersistServer } from "@cocalc/backend/conat/persist";10import { syncFiles } from "@cocalc/conat/persist/context";11import { mkdtemp, rm } from "node:fs/promises";12import { tmpdir } from "node:os";13import { join } from "path";14import { wait } from "@cocalc/backend/conat/test/util";15import { delay } from "awaiting";16export { setDefaultTimeouts } from "@cocalc/conat/core/client";17export { setDefaultSocketTimeouts } from "@cocalc/conat/socket/util";18export { setDefaultReconnectDelay } from "@cocalc/conat/persist/client";19import { once } from "@cocalc/util/async-utils";20import { until } from "@cocalc/util/async-utils";21import { randomId } from "@cocalc/conat/names";22import { isEqual } from "lodash";2324export { wait, delay, once };2526const logger = getLogger("conat:test:setup");2728export const path = "/conat";2930export async function initConatServer(31options: Partial<Options> = {},32): Promise<ConatServer> {33logger.debug("init");34if (!options?.port) {35const port = await getPort();36options = { ...options, port };37}3839const server = createConatServer(options);40if (server.clusterName == "default") {41defaultCluster.push(server);42}43if (server.state != "ready") {44await once(server, "ready");45}46return server;47}4849export let tempDir;50export let server: any = null;51export let persistServer: any = null;5253let nodeNumber = 0;54function getNodeId() {55return `node-${nodeNumber++}`;56}5758export async function createServer(opts?) {59return await initConatServer({60port: await getPort(),61path,62clusterName: "default",63id: opts?.id ?? getNodeId(),64systemAccountPassword: "secret",65...opts,66});67}6869// add another node to the cluster -- this is still in the same process (not forked), which70// is generally good since you can console.log from it, faster, etc.71// this does connect the new node to all existing nodes.72export const defaultCluster: ConatServer[] = [];73export async function addNodeToDefaultCluster(): Promise<ConatServer> {74const port = await getPort();75const node = await initConatServer({76port,77path,78clusterName: "default",79id: getNodeId(),80systemAccountPassword: "secret",81});82for (const s of defaultCluster) {83await s.join(node.address());84await node.join(s.address());85}86return node;87}8889export async function createConatCluster(n: number, opts?) {90const clusterName = opts?.clusterName ?? `cluster-${randomId()}`;91const systemAccountPassword = opts?.systemAccountPassword ?? randomId();92const servers: { [id: string]: ConatServer } = {};93for (let i = 0; i < n; i++) {94const id = `node-${i}`;95servers[id] = await createServer({96systemAccountPassword,97clusterName,98id,99autoscanInterval: 0,100...opts,101});102}103// join every server to every other server104const v: any[] = [];105for (let i = 0; i < n; i++) {106for (let j = 0; j < n; j++) {107if (i != j) {108v.push(109servers[`node-${i}`].join(110`http://localhost:${servers[`node-${j}`].options.port}`,111),112);113}114}115}116await Promise.all(v);117return servers;118}119120export async function restartServer() {121const port = server.options.port;122await server.close();123await delay(250);124server = await createServer({ port });125}126127export async function restartPersistServer() {128await persistServer.close();129client = connect();130persistServer = createPersistServer({ client });131}132133// one pre-made client134export let client;135export async function before(136opts: { archive?: string; backup?: string; archiveInterval?: number } = {},137) {138// syncFiles and tempDir define where the persist server persists data.139tempDir = await mkdtemp(join(tmpdir(), "conat-test"));140syncFiles.local = join(tempDir, "local");141if (opts.archive) {142syncFiles.archive = join(tempDir, "archive");143}144if (opts.archiveInterval) {145syncFiles.archiveInterval = opts.archiveInterval;146}147if (opts.backup) {148syncFiles.backup = join(tempDir, "backup");149}150151server = await createServer();152client = connect();153persistServer = createPersistServer({ client });154setConatClient({155conat: connect,156getLogger,157});158}159160const clients: Client[] = [];161export function connect(opts?): Client {162const cn = server.client({ noCache: true, path: "/", ...opts });163clients.push(cn);164return cn;165}166167// Given a list of servers that are all connected together in a common168// cluster, wait until they all have a consistent view of the interest.169// I.e., the interest object for servers[i] is the same as what every170// other thinks it is.171export async function waitForConsistentState(172servers: ConatServer[],173timeout = 10000,174): Promise<void> {175if (servers.length <= 1) {176return;177}178// @ts-ignore179const clusterName = servers[0].clusterName;180if (!clusterName) {181throw Error("not a cluster");182}183const ids = new Set<string>([servers[0].id]);184for (let i = 1; i < servers.length; i++) {185// @ts-ignore186if (servers[i].clusterName != clusterName) {187throw Error("all servers must be in the same cluster");188}189ids.add(servers[i].id);190}191192if (ids.size != servers.length) {193throw Error(194`all servers must have distinct ids -- ${JSON.stringify(servers.map((x) => x.id))}`,195);196}197198const start = Date.now();199await until(200() => {201for (let i = 0; i < servers.length; i++) {202if (servers[i].state == "closed") {203return true;204}205// now look at everybody else's view of servers[i].206// @ts-ignore207const a = servers[i].interest.serialize().patterns;208const b = servers[i].sticky;209const hashServer = servers[i].hash();210for (let j = 0; j < servers.length; j++) {211if (i != j) {212// @ts-ignore213const link = servers[j].clusterLinks[clusterName]?.[servers[i].id];214if (link == null) {215if (Date.now() - start > 3000) {216console.log(`node ${j} is not connected to node ${i}`);217}218return false;219}220const hashLink = link.hash();221const x = link.interest.serialize().patterns;222const y = link.sticky;223const showInfo = () => {224for (const type of ["interest", "sticky"]) {225console.log(226`server stream ${type}: `,227hashServer[type],228// @ts-ignore229servers[i].clusterStreams[type].stream.client.id,230// @ts-ignore231servers[i].clusterStreams[type].stream.storage.path,232// @ts-ignore233servers[i].clusterStreams[type].seqs(),234// @ts-ignore235//servers[i].clusterStreams.interest.getAll(),236);237238console.log(239`link stream ${type}: `,240hashLink[type],241// @ts-ignore242link.streams[type].stream.client.id,243// @ts-ignore244link.streams[type].stream.storage.path,245// @ts-ignore246link.streams[type].seqs(),247// @ts-ignore248//link.streams.interest.getAll(),249);250}251console.log("waitForConsistentState", {252i,253j,254serverInterest: a,255linkInterest: x,256serverSticky: b,257linkSticky: y,258});259};260if (!isEqual(hashServer, hashLink)) {261if (Date.now() - start > 3000) {262console.log("hashes are not equal");263// likely going to fail264showInfo();265}266return false;267}268if (!isEqual(a, x) /*|| !isEqual(b, y) */) {269// @ts-ignore270const seqs0 = servers[i].clusterStreams.interest.seqs();271const seqs1 = link.streams.interest.seqs();272if (273!isEqual(274seqs0.slice(0, seqs1.length),275seqs1.slice(0, seqs0.length),276)277) {278showInfo();279throw Error(`inconsistent initial sequences`);280}281282if (Date.now() - start > 3000) {283// likely going to fail284showInfo();285}286287// not yet equal288return false;289}290}291}292}293return true;294},295{ timeout },296);297}298299export async function after() {300persistServer?.close();301await rm(tempDir, { force: true, recursive: true });302try {303server?.close();304} catch {}305for (const cn of clients) {306try {307cn.close();308} catch {}309}310}311312process.once("exit", () => {313after();314});315316["SIGINT", "SIGTERM", "SIGQUIT"].forEach((sig) => {317process.once(sig, () => {318process.exit();319});320});321322323