Path: blob/master/src/packages/backend/conat/test/cluster/cluster-sticky.test.ts
1712 views
import {1before,2after,3wait,4waitForConsistentState,5} from "@cocalc/backend/conat/test/setup";6import { STICKY_QUEUE_GROUP } from "@cocalc/conat/core/client";7import { type ConatServer } from "@cocalc/conat/core/server";8import { createClusterNode } from "./util";910beforeAll(before);1112describe("create cluster of two nodes, and verify that *sticky* subs properly work", () => {13let server1: ConatServer, server2: ConatServer, client1, client1b, client2;14it("create two distinct servers and link them", async () => {15({ server: server1, client: client1 } = await createClusterNode({16clusterName: "cluster0",17id: "1",18autoscanInterval: 100,19longAutoscanInterval: 1000,20}));21client1b = server1.client();22({ server: server2, client: client2 } = await createClusterNode({23clusterName: "cluster0",24id: "2",25autoscanInterval: 100,26longAutoscanInterval: 1000,27}));28await server1.join(server2.address());29await server2.join(server1.address());30});3132let sub1, sub1b;33let recv1 = 0,34recv1b = 0;35const subject = "5077.org";36it("make two subscriptions with the same sticky queue group", async () => {37sub1 = await client1.sub(subject, { queue: STICKY_QUEUE_GROUP });38(async () => {39for await (const _ of sub1) {40recv1++;41}42})();43sub1b = await client1b.sub(subject, { queue: STICKY_QUEUE_GROUP });44(async () => {45for await (const _ of sub1b) {46recv1b++;47}48})();49});5051let count = 50;52it("send messages and note they all go to the same target -- first the easy sanity check all on the same node", async () => {53await client1.waitForInterest(subject);54for (let i = 0; i < count; i++) {55await client1.publish(subject, "hi");56}57await wait({ until: () => recv1 + recv1b >= count });58expect(recv1 + recv1b).toEqual(count);59expect(recv1 * recv1b).toEqual(0);60});6162it("send messages and note they all go to the same target -- next the hard case across the cluster", async () => {63// NOTE: if we do this test without waiting for consistent state, it will definitely64// fail sometimes, since server2 literally doesn't know enough about the servers yet,65// so has to make a different choice. Services of course must account for the fact that66// for the first moments of their existence, sticky routing can't work.67await waitForConsistentState([server1, server2]);6869await client2.waitForInterest(subject);70for (let i = 0; i < count; i++) {71await client2.publish(subject, "hi");72}73await wait({ until: () => recv1 + recv1b >= 2 * count });74expect(recv1 + recv1b).toEqual(2 * count);75expect(recv1 * recv1b).toEqual(0);76});7778let server3, client3;79it("add a third node", async () => {80const { client, server } = await createClusterNode({81clusterName: "cluster0",82autoscanInterval: 100,83longAutoscanInterval: 5000,84id: "3",85});86server3 = server;87client3 = client;88await server1.join(server.address());89});9091it("waits for consistent state -- this verifies, e.g., that sticky state is equal", async () => {92await waitForConsistentState([server1, server2, server3]);93});9495it("client connected to server3 also routes properly", async () => {96const total = recv1 + recv1b;97await client3.waitForInterest(subject);98for (let i = 0; i < count; i++) {99await client3.publish(subject, "hi");100}101await wait({ until: () => recv1 + recv1b >= total + count });102expect(recv1 + recv1b).toEqual(total + count);103expect(recv1 * recv1b).toEqual(0);104});105106it("client1 and client2 still route properly", async () => {107const total = recv1 + recv1b;108for (let i = 0; i < count; i++) {109await client1.publish(subject, "hi");110await client2.publish(subject, "hi");111}112await wait({ until: () => recv1 + recv1b >= total + 2 * count });113expect(recv1 + recv1b).toEqual(total + 2 * count);114expect(recv1 * recv1b).toEqual(0);115});116});117118afterAll(after);119120121