Path: blob/master/src/packages/backend/conat/test/cluster/cluster-basics.test.ts
1712 views
/*12pnpm test `pwd`/cluster-basics.test.ts34*/56import {7before,8after,9delay,10once,11wait,12waitForConsistentState,13} from "@cocalc/backend/conat/test/setup";14import {15clusterLink,16clusterStreams,17clusterService,18trimClusterStreams,19} from "@cocalc/conat/core/cluster";20import { createClusterNode } from "./util";21import type { Client } from "@cocalc/conat/core/client";22import { sysApi } from "@cocalc/conat/core/sys";2324beforeAll(before);2526jest.setTimeout(20000);2728describe("create a cluster enabled socketio server and test that the streams update as they should", () => {29let server, client;30it("create a server with cluster support enabled", async () => {31({ server, client } = await createClusterNode({32clusterName: "cluster0",33id: "0",34}));3536expect(server.isHealthy()).toBe(true);37});3839let streams;40it("get the interest stream via our client. There MUST be at least two persist subjects in there, since they were needed to even create the interest stream.", async () => {41streams = await clusterStreams({42...server.options,43client,44});45const service = clusterService(server.options);46await wait({47until: () => {48const v = streams.interest.getAll();49expect(service).toContain(server.options.clusterName);50const persistUpdates = v.filter((update) =>51update.subject.startsWith(service),52);53if (persistUpdates.length <= 1) {54return false;55}56expect(persistUpdates.length).toBeGreaterThan(1);57return true;58},59});60});6162it("subscribe and see update appear in the stream; close sub and see delete appear", async () => {63const sub = await client.subscribe("foo");64while (true) {65const v = streams.interest.getAll().filter((x) => x.subject == "foo");66if (v.length == 1) {67expect(v[0]).toEqual(68expect.objectContaining({ op: "add", subject: "foo" }),69);70break;71}72await once(streams.interest, "change");73}74sub.close();75while (true) {76const v = streams.interest.getAll().filter((x) => x.subject == "foo");77if (v.length == 2) {78expect(v[1]).toEqual(79expect.objectContaining({ op: "delete", subject: "foo" }),80);81break;82}83await once(streams.interest, "change");84}85});8687let link;88it("get access to the same stream, but via a cluster link, and note that it is identical to the one in the server -- keeping these pattern objects sync'd is the point of the link", async () => {89link = await clusterLink(90server.address(),91server.options.systemAccountPassword,92);93await wait({94until: () => {95return (96Object.keys(server.interest.serialize().patterns).length ==97Object.keys(link.interest.serialize().patterns).length98);99},100});101expect(server.interest.serialize().patterns).toEqual(102link.interest.serialize().patterns,103);104});105106it("creates a sub and see this reflected in the patterns", async () => {107const sub = await client.subscribe("foo");108await wait({109until: () => link.interest.serialize().patterns["foo"] !== undefined,110});111// equal after making the subscription to foo112expect(server.interest.serialize()).toEqual(link.interest.serialize());113114const { patterns } = link.interest.serialize();115expect(patterns["foo"] != undefined).toBe(true);116117sub.close();118await wait({119until: () => link.interest.serialize().patterns["foo"] === undefined,120});121expect(patterns["foo"] === undefined).toBe(true);122123// still identical124expect(server.interest.serialize()).toEqual(link.interest.serialize());125});126127const count = 50;128it(`make ${count} more subscriptions and see this reflected in the link`, async () => {129const v: any[] = [];130for (let i = 0; i < count; i++) {131v.push(await client.subscribe(`foo.${i}`));132}133134await wait({135until: () =>136link.interest.serialize().patterns[`foo.${count - 1}`] !== undefined,137});138139expect(server.interest.serialize()).toEqual(link.interest.serialize());140141// and unsubscribe142for (let i = 0; i < count; i++) {143v[i].close();144}145await wait({146until: () =>147link.interest.serialize().patterns[`foo.${count - 1}`] === undefined,148});149150expect(server.interest.serialize()).toEqual(link.interest.serialize());151});152153it("a new link has correct state, despite the activity", async () => {154const link2 = await clusterLink(155server.address(),156server.options.systemAccountPassword,157);158await wait({159until: () => {160return (161Object.keys(server.interest.serialize().patterns).length ==162// @ts-ignore163Object.keys(link2.interest.serialize().patterns).length164);165},166});167expect(server.interest.serialize().patterns).toEqual(168// @ts-ignore169link2.interest.serialize().patterns,170);171link2.close();172});173});174175describe("create a cluster with two distinct servers and send a message from one client to another via a link, and also use request/reply", () => {176let server1, server2, client1, client2;177it("create two distinct servers with cluster support enabled", async () => {178({ server: server1, client: client1 } = await createClusterNode({179clusterName: "cluster1",180systemAccountPassword: "squeamish",181id: "1",182}));183({ server: server2, client: client2 } = await createClusterNode({184clusterName: "cluster1",185systemAccountPassword: "ossifrage",186id: "2",187}));188});189190it("link them", async () => {191await delay(500);192await server1.join(server2.address());193await server2.join(server1.address());194});195196it("wait for consistent state", async () => {197await waitForConsistentState([server1, server2], 10000);198});199200it("tests that server-side waitForInterestOnThisNode can be aborted", async () => {201const controller = new AbortController();202const w = server2.waitForInterestOnThisNode(203"no-interest",20490000,205client2.conn.id,206controller.signal,207);208await delay(150);209controller.abort();210expect(await w).toBe(false);211});212213it("tests that server-side waitForInterest can be aborted", async () => {214const controller = new AbortController();215const w = server2.waitForInterest(216"no-interest",21790000,218client2.conn.id,219controller.signal,220);221// give it some time to get going (otherwise test is too easy)222await delay(150);223controller.abort();224expect(await w).toBe(false);225});226227const N =228"114381625757888867669235779976146612010218296721242362562561842935706935245733897830597123563958705058989075147599290026879543541";229230let sub;231232it("creates a subscription on client1, then publish to it from client2, thus using routing over the link", async () => {233sub = await client1.subscribe("rsa");234235const x = await client2.publish("rsa", N);236// interest hasn't propogated from one cluster to another yet:237expect(x.count).toBe(0);238239await client2.waitForInterest("rsa");240241const y = await client2.publish("rsa", N);242expect(y.count).toBe(1);243244const { value } = await sub.next();245expect(value.data).toBe(N);246});247248it("test request/reply between clusters", async () => {249const req = client2.request("rsa", N);250const { value } = await sub.next();251expect(value.data).toBe(N);252await wait({253until: async () => {254// ensure respons gets received -- it's possible for sub to be visible to client2255// slightly before the inbox for client2 is visible, in which case client2256// would never get a response and timeout.257const { count } = await value.respond(258"3490529510847650949147849619903898133417764638493387843990820577 × 32769132993266709549961988190834461413177642967992942539798288533",259);260return count > 0;261},262});263const response = await req;264expect(response.data).toContain("×");265});266267it("remove the links", async () => {268const sub = await client1.subscribe("x");269await server1.unjoin({ id: "2" });270await server2.unjoin({ id: "1" });271const { count } = await client1.publish("x", "hello");272expect(count).toBe(1);273const { count: count2 } = await client2.publish("x", "hello");274expect(count2).toBe(0);275sub.close();276});277});278279// This is basically identical to the previous one, but for a bigger cluster:280const clusterSize = 3;281describe(`a cluster with ${clusterSize} nodes`, () => {282const servers: any[] = [],283clients: any[] = [];284it(`create ${clusterSize} distinct servers with cluster support enabled`, async () => {285for (let i = 0; i < clusterSize; i++) {286const { server, client } = await createClusterNode({287clusterName: "my-cluster",288id: `node-${i}`,289});290expect(server.options.id).toBe(`node-${i}`);291expect(server.options.clusterName).toBe("my-cluster");292servers.push(server);293clients.push(client);294}295});296297it("link them all together in a complete digraph", async () => {298for (let i = 0; i < servers.length; i++) {299for (let j = 0; j < servers.length; j++) {300if (i != j) {301await servers[i].join(servers[j].address());302await servers[j].join(servers[i].address());303}304}305}306});307308it("get addresses and topology", async () => {309await clients[0].waitUntilSignedIn();310for (let i = 0; i < clusterSize; i++) {311const sys = sysApi(clients[i]);312expect(new Set(await sys.clusterAddresses()).size).toBe(clusterSize);313}314const t = await sysApi(clients[0]).clusterTopology();315expect(Object.keys(t)).toEqual(["my-cluster"]);316const v = Object.values(t["my-cluster"]);317expect(new Set(v)).toEqual(318new Set(servers.map((server) => server.address())),319);320});321322let sub;323it("creates a subscription on clients[0], then observe each other client sees it as existing and can send it a message", async () => {324sub = await clients[0].subscribe("rsa");325for (let i = 0; i < clusterSize; i++) {326await clients[i].waitForInterest("rsa");327clients[i].publish("rsa", i);328}329for (let i = 0; i < clusterSize; i++) {330const { value } = await sub.next();331expect(value.data).toBe(i);332}333});334335it("check that interest data is *eventually* consistent", async () => {336await waitForConsistentState(servers);337});338339it("test request/respond from all participants", async () => {340await delay(500);341const v: any[] = [];342for (let i = 0; i < clusterSize; i++) {343const req = clients[i].request("rsa", i);344v.push(req);345}346for (let i = 0; i < clusterSize; i++) {347const { value } = await sub.next();348expect(value.data).toBe(i);349const { count } = await value.respond(i + 1);350expect(count).toBeGreaterThan(0);351}352353for (let i = 0; i < clusterSize; i++) {354const r = (await v[i]).data;355expect(r).toBe(i + 1);356}357});358});359360describe("test trimming the interest stream", () => {361let server, client;362it("create a cluster server", async () => {363({ server, client } = await createClusterNode({364id: "0",365clusterName: "trim",366}));367await wait({ until: () => server.clusterStreams != null });368});369370let sub;371it("subscribes and verifies that trimming does nothing", async () => {372sub = await client.sub("389");373const { seqsInterest: seqs } = await trimClusterStreams(374server.clusterStreams,375server,3760,377);378expect(seqs).toEqual([]);379});380381it("unsubscribes and verifies that trimming with a 5s maxAge does nothing", async () => {382sub.close();383await delay(100);384const { seqsInterest: seqs } = await trimClusterStreams(385server.clusterStreams,386server,3875000,388);389expect(seqs).toEqual([]);390});391392it(" see that two updates are trimmed when maxAge is 0s", async () => {393// have to use wait since don't know how long until394// stream actually updated after unsub.395let seqs;396await wait({397until: async () => {398seqs = (await trimClusterStreams(server.clusterStreams, server, 0))399.seqsInterest;400return seqs.length >= 2;401},402});403expect(seqs.length).toBe(2);404await delay(1);405for (const update of server.clusterStreams.interest.getAll()) {406if (update.subject == "389" && update.op == "add") {407throw Error("adding 389 should have been removed");408}409}410});411});412413describe("join two servers in a cluster using the sys api instead of directly calling join on the server objects", () => {414let server1, server2, client1: Client, client2: Client;415const systemAccountPassword = "squeamish ossifrage";416it("create two distinct servers with cluster support enabled", async () => {417({ server: server1, client: client1 } = await createClusterNode({418clusterName: "cluster-sys",419systemAccountPassword,420id: "1",421}));422({ server: server2, client: client2 } = await createClusterNode({423clusterName: "cluster-sys",424systemAccountPassword,425id: "2",426}));427});428429let sys1, sys2;430it("link them using the sys api", async () => {431sys1 = sysApi(client1);432expect(await sys1.clusterAddresses()).toEqual([server1.address()]);433await sys1.join(server2.address());434expect(await sys1.clusterAddresses()).toEqual([435server1.address(),436server2.address(),437]);438expect(await sys1.clusterTopology()).toEqual({439"cluster-sys": {440"1": server1.address(),441"2": server2.address(),442},443});444445sys2 = sysApi(client2);446expect(await sys2.clusterAddresses()).toEqual([server2.address()]);447await sys2.join(server1.address());448expect(await sys2.clusterAddresses()).toEqual([449server2.address(),450server1.address(),451]);452expect(await sys1.clusterTopology()).toEqual(await sys2.clusterTopology());453});454455let sub;456it("verify link worked", async () => {457sub = await client1.subscribe("x");458await client2.waitForInterest("x");459const { count } = await client2.publish("x", "hello");460expect(count).toBe(1);461const { value } = await sub.next();462expect(value.data).toBe("hello");463});464465it("remove the links", async () => {466await sys1.unjoin({ id: "2" });467await sys2.unjoin({ id: "1" });468const { count } = await client1.publish("x", "hello");469expect(count).toBe(1);470const { count: count2 } = await client2.publish("x", "hello");471expect(count2).toBe(0);472sub.close();473474expect(await sys1.clusterAddresses()).toEqual([server1.address()]);475expect(await sys2.clusterAddresses()).toEqual([server2.address()]);476expect(await sys1.clusterTopology()).toEqual({477"cluster-sys": {478"1": server1.address(),479},480});481expect(await sys2.clusterTopology()).toEqual({482"cluster-sys": {483"2": server2.address(),484},485});486});487});488489describe("test automatic node discovery", () => {490// create a cluster with 3 nodes just two edges connecting them491const nodes: { client; server }[] = [];492493it("create three distinct servers with cluster support enabled", async () => {494nodes.push(495await createClusterNode({ id: "node0", clusterName: "discovery" }),496);497nodes.push(498await createClusterNode({ id: "node1", clusterName: "discovery" }),499);500nodes.push(501await createClusterNode({ id: "node2", clusterName: "discovery" }),502);503// different cluster504nodes.push(await createClusterNode({ id: "node0", clusterName: "moon" }));505});506507it("connect them in the minimal possible way", async () => {508await nodes[0].server.join(nodes[1].server.address());509await nodes[1].server.join(nodes[2].server.address());510511// plus one to the other cluster512await nodes[0].server.join(nodes[3].server.address());513514expect(nodes[0].server.clusterAddresses("discovery").length).toBe(2);515expect(nodes[1].server.clusterAddresses("discovery").length).toBe(2);516expect(nodes[2].server.clusterAddresses("discovery").length).toBe(1);517});518519it("run scan from node0. this results in the following new connections: 1->0, 0->2", async () => {520const { count, errors } = await nodes[0].server.scan();521expect(count).toBe(2);522expect(errors.length).toBe(0);523expect(nodes[0].server.clusterAddresses("discovery").length).toBe(3);524expect(nodes[1].server.clusterAddresses("discovery").length).toBe(3);525expect(nodes[2].server.clusterAddresses("discovery").length).toBe(1);526});527528it("run scan from node1. this should result in the following new connections: 2->1 ", async () => {529const { count, errors } = await nodes[1].server.scan();530expect(count).toBe(1);531expect(errors.length).toBe(0);532expect(nodes[2].server.clusterAddresses("discovery").length).toBe(2);533});534535it("run scan from node2. this should result in the following new connections: 2->0. We now have the complete graph.", async () => {536const { count, errors } = await nodes[2].server.scan();537expect(count).toBe(1);538expect(errors.length).toBe(0);539expect(nodes[2].server.clusterAddresses("discovery").length).toBe(3);540});541542it("join a new node3 (=nodes[4] due to other cluster node) to node2 and do a scan", async () => {543nodes.push(544await createClusterNode({ id: "node3", clusterName: "discovery" }),545);546await nodes[4].server.join(nodes[2].server.address());547// only knows about self and node2 initially548expect(nodes[4].server.clusterAddresses("discovery").length).toBe(2);549550// new node scans and now it knows about ALL nodes in the cluster551expect((await nodes[4].server.scan()).count).toBe(3);552expect(nodes[4].server.clusterAddresses("discovery").length).toBe(4);553554// have the other 3 nodes scan so they know all nodes:555for (let i = 0; i < 3; i++) {556await nodes[i].server.scan();557expect(nodes[i].server.clusterAddresses("discovery").length).toBe(4);558}559});560});561562describe("test the clusterIpAddress option", () => {563it("create a server without explicit clusterIpAddress", async () => {564const { server } = await createClusterNode({565clusterName: "cluster0",566id: "0",567});568569expect(server.options.clusterIpAddress).toBe(undefined);570expect(server.address()).toBe(`http://localhost:${server.options.port}`);571server.close();572});573574it("create a server with explicit clusterIpAddress", async () => {575const { server } = await createClusterNode({576clusterName: "cluster0",577id: "0",578clusterIpAddress: "127.0.0.1",579});580581expect(server.options.clusterIpAddress).toBe("127.0.0.1");582expect(server.address()).toBe(`http://127.0.0.1:${server.options.port}`);583server.close();584});585});586587afterAll(after);588589590