Path: blob/master/src/packages/backend/conat/test/persist/cluster.test.ts
1712 views
/*12pnpm test `pwd`/cluster.test.ts34*/56import { server as createPersistServer } from "@cocalc/backend/conat/persist";7import {8after,9before,10server,11addNodeToDefaultCluster,12once,13delay,14persistServer as persistServer0,15wait,16setDefaultTimeouts,17setDefaultSocketTimeouts,18setDefaultReconnectDelay,19waitForConsistentState,20} from "../setup";21import { uuid } from "@cocalc/util/misc";2223const BROKEN_THRESH = 30;2425beforeAll(async () => {26await before();27// this speeds up the automatic failover tests a lot.28setDefaultTimeouts({ request: 1000, publish: 1000 });29setDefaultSocketTimeouts({30command: 1000,31keepAlive: 2000,32keepAliveTimeout: 1000,33});34setDefaultReconnectDelay(1);35});3637jest.setTimeout(10000);38describe("test using multiple persist servers in a cluster", () => {39let client0, server1, client1;40it("add another node", async () => {41client0 = server.client();42server1 = await addNodeToDefaultCluster();43client1 = server1.client();44});4546let persistServer1;47it("add a second persist server connected to server1", async () => {48persistServer1 = createPersistServer({ client: client1 });49await once(persistServer1, "ready");50expect(persistServer1.state).toBe("ready");51await waitForConsistentState([server, server1]);52});5354it("make streams until there is at least one connection to each persist server -- this must happen quickly at random due to how sticky queue groups work", async () => {55const v: any[] = [];56// baseline - no sockets57await wait({58until: () =>59Object.keys(persistServer0.sockets).length == 0 &&60Object.keys(persistServer1.sockets).length == 0,61});62while (63Object.keys(persistServer0.sockets).length == 0 ||64Object.keys(persistServer1.sockets).length == 065) {66const s = await client1.sync.dstream({67project_id: uuid(),68name: "foo",69sync: true,70});71// this helps give time for the persist server added above to be known72await delay(50);73v.push(s);74if (v.length > BROKEN_THRESH) {75throw Error("sticky queue groups are clearly not working properly");76}77}78v.map((x) => x.close());79});8081const project_ids: string[] = [];82it("same test as above, but with client connected to server0", async () => {83// baseline84await wait({85until: () =>86Object.keys(persistServer0.sockets).length == 0 &&87Object.keys(persistServer1.sockets).length == 0,88});8990const v: any[] = [];91while (92Object.keys(persistServer0.sockets).length == 0 ||93Object.keys(persistServer1.sockets).length == 094) {95const project_id = uuid();96project_ids.push(project_id);97const s = await client0.sync.dstream({98project_id,99name: "foo",100sync: true,101});102v.push(s);103s.publish(project_id);104await s.save();105if (v.length > BROKEN_THRESH) {106throw Error("sticky queue groups are clearly not working properly");107}108}109v.map((x) => x.close());110});111112const openStreams0: any[] = [];113const openStreams1: any[] = [];114it("create more streams connected to both servers to use both", async () => {115// wait for all the sockets to close in order to not mess up other tests116await wait({117until: () =>118Object.keys(persistServer0.sockets).length == 0 ||119Object.keys(persistServer1.sockets).length == 0,120});121122while (123Object.keys(persistServer0.sockets).length == 0 ||124Object.keys(persistServer1.sockets).length == 0125) {126const project_id = uuid();127const s = await client1.sync.dstream({128project_id,129name: "foo",130noCache: true,131sync: true,132});133s.publish("x");134await s.save();135openStreams0.push(s);136if (openStreams0.length > BROKEN_THRESH) {137throw Error("sticky queue groups are clearly not working properly");138}139const t = await client0.sync.dstream({140project_id,141name: "foo",142noCache: true,143sync: true,144});145expect(t.getAll()).toEqual(["x"]);146openStreams1.push(t);147}148expect(openStreams0.length).toBeGreaterThan(0);149});150151it("remove one persist server", async () => {152persistServer1.close();153});154155it("creating / opening streams we made above still work with no data lost", async () => {156for (const project_id of project_ids) {157const s = await client0.sync.dstream({158project_id,159name: "foo",160noCache: true,161sync: true,162});163expect(await s.getAll()).toEqual([project_id]);164s.close();165}166expect(Object.keys(persistServer1.sockets).length).toEqual(0);167});168169// this can definitely take a long time (e.g., ~10s), as it involves automatic failover.170it("Checks automatic failover works: the streams connected to both servers we created above must keep working, despite at least one of them having its persist server get closed.", async () => {171for (let i = 0; i < openStreams0.length; i++) {172const stream0 = openStreams0[i];173stream0.publish("y");174await stream0.save();175expect(stream0.hasUnsavedChanges()).toBe(false);176177const stream1 = openStreams1[i];178expect(stream0.opts.project_id).toEqual(stream1.opts.project_id);179await wait({180until: async () => {181return stream1.length >= 2;182},183timeout: 5000,184start: 1000,185});186expect(stream1.length).toBe(2);187}188});189});190191afterAll(after);192193194