Path: blob/master/src/packages/backend/conat/test/persist/cluster2.test.ts
1712 views
/*12pnpm test `pwd`/cluster2.test.ts34*/56import { server as createPersistServer } from "@cocalc/backend/conat/persist";7import {8after,9before,10server,11addNodeToDefaultCluster,12once,13persistServer as persistServer0,14waitForConsistentState,15wait,16} from "../setup";17import { uuid } from "@cocalc/util/misc";18const BROKEN_THRESH = 30;1920beforeAll(before);2122describe("test using multiple persist servers in a cluster", () => {23let client0, server1, client1;24it("add another node", async () => {25client0 = server.client();26server1 = await addNodeToDefaultCluster();27client1 = server1.client();28});2930it("wait until both servers in the cluster have the same state", async () => {31await waitForConsistentState([server, server1], 5000);32});3334let persistServer1;35it("add a second persist server connected to server1", async () => {36persistServer1 = createPersistServer({ client: client1 });37await once(persistServer1, "ready");38expect(persistServer1.state).toBe("ready");39});4041it("wait until both servers in the cluster have the same state", async () => {42await waitForConsistentState([server, server1]);43});4445const openStreamsConnectedToBothServers0: any[] = [];46const openStreamsConnectedToBothServers1: any[] = [];47it("create more streams connected to both servers, in each case illustrating as well that the stream works from both clients", async () => {48expect(Object.keys(persistServer0.sockets).length).toBe(0);49expect(Object.keys(persistServer1.sockets).length).toBe(0);50// create random streams, until we get at least one new one connected to each51// of the two persist servers52while (53Object.keys(persistServer0.sockets).length == 0 ||54Object.keys(persistServer1.sockets).length == 055) {56const project_id = uuid();57const s = await client1.sync.dstream({58project_id,59name: "foo",60noCache: true,61// we make these ephemeral so there's no possibility of communication via the filesystem62// in case different persist servers were used!63ephemeral: true,64});65s.publish("x");66await s.save();67openStreamsConnectedToBothServers0.push(s);68if (openStreamsConnectedToBothServers0.length > BROKEN_THRESH) {69throw Error("sticky queue groups are clearly not working properly");70}71const t = await client0.sync.dstream({72project_id,73name: "foo",74noCache: true,75ephemeral: true,76});77openStreamsConnectedToBothServers1.push(t);7879expect(t.getAll()).toEqual(["x"]);80t.publish("y");81await t.save();8283await wait({ until: () => s.length == 2 });84expect(s.getAll()).toEqual(["x", "y"]);85}86expect(openStreamsConnectedToBothServers0.length).toBeGreaterThan(1);87expect(openStreamsConnectedToBothServers1.length).toBeGreaterThan(1);88});89});9091afterAll(after);929394