Path: blob/master/src/packages/backend/conat/test/cluster/pubsub.test.ts
1712 views
/*1pnpm test `pwd`/pubsub.test.ts2*/34import {5before,6after,7server,8addNodeToDefaultCluster,9wait,10delay,11waitForConsistentState,12defaultCluster,13} from "@cocalc/backend/conat/test/setup";14import { STICKY_QUEUE_GROUP } from "@cocalc/conat/core/client";1516beforeAll(before);1718describe("the most basic pub/sub test with a 2-node cluster", () => {19let client0, server1, client1;20it("add another node to cluster", async () => {21client0 = server.client();22server1 = await addNodeToDefaultCluster();23client1 = server1.client();24});2526let sub;27it("subscribe", async () => {28sub = await client0.subscribe("cocalc");29});3031// commenting this out, since it is impossible to robustly test in all cases,32// since sometmes the subscription info can move very quickly and this publish33// could be slow!34// it("publish -- message is initially dropped with no receiver because interest doesn't propogate instantly", async () => {35// const { count } = await client1.publish("cocalc", "hi");36// expect(count).toBe(0);37// });3839it("publish after waiting for interest -- this works", async () => {40await client1.waitForInterest("cocalc");41const { count, bytes } = await client1.publish("cocalc", "hi");42expect(count).toBe(1);43expect(bytes).toBe(3);44});4546it("receive", async () => {47const { value } = await sub.next();48expect(value.data).toBe("hi");49});5051it("clean up", () => {52sub.close();53});54});5556describe("same basic test, but in the other direction", () => {57let client0, client1;58it("get the clients", async () => {59client0 = server.client();60// @ts-ignore61client1 = Object.values(server.clusterLinks[server.clusterName])[0].client;62});6364let sub;65it("subscribe", async () => {66sub = await client1.subscribe("conat");67});6869it("publish after waiting for interest -- this works", async () => {70await client0.waitForInterest("conat");71const { count, bytes } = await client0.publish("conat", "hi");72expect(count).toBe(1);73expect(bytes).toBe(3);74});7576it("receive", async () => {77const { value } = await sub.next();78expect(value.data).toBe("hi");79});8081it("clean up", () => {82sub.close();83});84});8586describe("three nodes and two subscribers with different queue group on distinct nodes", () => {87let client0, client1, client2, server2;88it("get the clients", async () => {89client0 = server.client();90// @ts-ignore91client1 = Object.values(server.clusterLinks[server.clusterName])[0].client;92server2 = await addNodeToDefaultCluster();93client2 = server2.client();94});9596let sub0, sub1;97it("subscribes from two nodes", async () => {98sub0 = await client0.subscribe("sage", { queue: "0" });99sub1 = await client1.subscribe("sage", { queue: "1" });100});101102it("only way to be sure to get both is to just try -- different queue group so eventually there will be two receivers", async () => {103await wait({104until: async () => {105const { count } = await client2.publish("sage", "hi");106return count == 2;107},108});109// and check both got something110await sub0.next();111await sub1.next();112});113114it("subscribe from two nodes but with same queue group so only one subscriber will get it", async () => {115sub0 = await client0.subscribe("math", { queue: "0" });116sub1 = await client1.subscribe("math", { queue: "0" });117await client2.waitForInterest("math");118await delay(250);119const { count } = await client2.publish("math", "hi");120expect(count).toBe(1);121});122123it("make sticky subs from each node, and see that all messages go to the same receiver, no matter which node we send from", async () => {124const sub0 = await client0.subscribe("sticky", {125queue: STICKY_QUEUE_GROUP,126});127let n0 = 0;128(async () => {129for await (const _ of sub0) {130n0++;131}132})();133134const sub1 = await client1.subscribe("sticky", {135queue: STICKY_QUEUE_GROUP,136});137let n1 = 0;138(async () => {139for await (const _ of sub1) {140n1++;141}142})();143144const sub2 = await client2.subscribe("sticky", {145queue: STICKY_QUEUE_GROUP,146});147let n2 = 0;148(async () => {149for await (const _ of sub2) {150n2++;151}152})();153154await client2.waitForInterest("sticky");155156for (let i = 0; i < 20; i++) {157const { count } = await client2.publish("sticky", null);158expect(count).toBe(1);159}160for (let i = 0; i < 20; i++) {161const { count } = await client1.publish("sticky", null);162expect(count).toBe(1);163}164for (let i = 0; i < 20; i++) {165const { count } = await client0.publish("sticky", null);166expect(count).toBe(1);167}168await wait({ until: () => n0 + n1 + n2 == 60 });169expect(n0 * n1).toBe(0);170expect(n0 * n2).toBe(0);171expect(n1 * n2).toBe(0);172expect(n0 + n1 + n2).toBe(60);173174await waitForConsistentState(defaultCluster);175176// add another node and test177const server3 = await addNodeToDefaultCluster();178const client3 = server3.client();179await waitForConsistentState(defaultCluster);180for (let i = 0; i < 20; i++) {181const { count } = await client3.publish("sticky", null);182expect(count).toBe(1);183}184await wait({ until: () => n0 + n1 + n2 == 80 });185expect(n0 * n1).toBe(0);186expect(n0 * n2).toBe(0);187expect(n1 * n2).toBe(0);188expect(n0 + n1 + n2).toBe(80);189});190});191192afterAll(after);193194195