Path: blob/master/src/packages/backend/conat/test/cluster/pubsub.test.ts
5886 views
/*1pnpm test `pwd`/pubsub.test.ts2*/34import {5before,6after,7server,8addNodeToDefaultCluster,9wait,10delay,11} from "@cocalc/backend/conat/test/setup";1213beforeAll(before);1415describe("the most basic pub/sub test with a 2-node cluster", () => {16let client0, server1, client1;17it("add another node to cluster", async () => {18client0 = server.client();19server1 = await addNodeToDefaultCluster();20client1 = server1.client();21});2223let sub;24it("subscribe", async () => {25sub = await client0.subscribe("cocalc");26});2728// commenting this out, since it is impossible to robustly test in all cases,29// since sometmes the subscription info can move very quickly and this publish30// could be slow!31// it("publish -- message is initially dropped with no receiver because interest doesn't propogate instantly", async () => {32// const { count } = await client1.publish("cocalc", "hi");33// expect(count).toBe(0);34// });3536it("publish after waiting for interest -- this works", async () => {37await client1.waitForInterest("cocalc");38const { count, bytes } = await client1.publish("cocalc", "hi");39expect(count).toBe(1);40expect(bytes).toBe(3);41});4243it("receive", async () => {44const { value } = await sub.next();45expect(value.data).toBe("hi");46});4748it("clean up", () => {49sub.close();50});51});5253describe("same basic test, but in the other direction", () => {54let client0, client1;55it("get the clients", async () => {56client0 = server.client();57// @ts-ignore58client1 = Object.values(server.clusterLinks[server.clusterName])[0].client;59});6061let sub;62it("subscribe", async () => {63sub = await client1.subscribe("conat");64});6566it("publish after waiting for interest -- this works", async () => {67await client0.waitForInterest("conat");68const { count, bytes } = await client0.publish("conat", "hi");69expect(count).toBe(1);70expect(bytes).toBe(3);71});7273it("receive", async () => {74const { value } = await sub.next();75expect(value.data).toBe("hi");76});7778it("clean up", () => {79sub.close();80});81});8283describe("three nodes and two subscribers with different queue group on distinct nodes", () => {84let client0, client1, client2, server2;85it("get the clients", async () => {86client0 = server.client();87// @ts-ignore88client1 = Object.values(server.clusterLinks[server.clusterName])[0].client;89server2 = await addNodeToDefaultCluster();90client2 = server2.client();91});9293let sub0, sub1;94it("subscribes from two nodes", async () => {95sub0 = await client0.subscribe("sage", { queue: "0" });96sub1 = await client1.subscribe("sage", { queue: "1" });97});9899it("only way to be sure to get both is to just try -- different queue group so eventually there will be two receivers", async () => {100await wait({101until: async () => {102const { count } = await client2.publish("sage", "hi");103return count == 2;104},105});106// and check both got something107await sub0.next();108await sub1.next();109});110111it("subscribe from two nodes but with same queue group so only one subscriber will get it", async () => {112sub0 = await client0.subscribe("math", { queue: "0" });113sub1 = await client1.subscribe("math", { queue: "0" });114await client2.waitForInterest("math");115await delay(250);116const { count } = await client2.publish("math", "hi");117expect(count).toBe(1);118});119});120121afterAll(after);122123124