Path: blob/master/src/packages/backend/conat/test/cluster/cluster-sticky-state.test.ts
1712 views
import {1before,2after,3defaultCluster as servers,4waitForConsistentState,5wait,6addNodeToDefaultCluster,7} from "@cocalc/backend/conat/test/setup";8import { STICKY_QUEUE_GROUP } from "@cocalc/conat/core/client";9import { randomId } from "@cocalc/conat/names";1011beforeAll(before);1213describe("ensure sticky state sync and use is working properly", () => {14let clients: any;15it("2-node cluster", async () => {16await addNodeToDefaultCluster();17expect(servers.length).toBe(2);18await waitForConsistentState(servers);19clients = servers.map((x) => x.client());20});2122const count = 25;23const subs0: any[] = [];24const subs1: any[] = [];25it(`create ${count} distinct sticky subscriptions and send one message to each to create sticky routing state on servers[0]`, async () => {26clients.push(servers[0].client());27clients.push(servers[1].client());28for (let i = 0; i < count; i++) {29subs0.push(30await clients[1].subscribe(`subject.${i}.*`, {31queue: STICKY_QUEUE_GROUP,32}),33);34// wait so above subscription is known to *both* servers:35// @ts-ignore36await servers[0].waitForInterest(37`subject.${i}.0`,385000,39clients[0].conn.id,40);41subs1.push(42await clients[0].subscribe(`subject.${i}.*`, {43queue: STICKY_QUEUE_GROUP,44}),45);46// publishing causes a choice to be made and saved on servers[0]47await clients[0].publish(`subject.${i}.foo`, "hello");48expect(servers[0].sticky[`subject.${i}.*`]).not.toBe(undefined);49// but no choice on servers[1]50expect(servers[1].sticky[`subject.${i}.*`]).toBe(undefined);51}52});5354let chosen;55it("see which subscription got chosen for subject.0.* -- this is useful later", async () => {56const p0 = async () => {57await subs0[0].next();58return 0;59};60const p1 = async () => {61await subs1[0].next();62return 1;63};64chosen = await Promise.race([p0(), p1()]);65});6667it(`sticky on servers[0] should have ${count} entries starting in "subject".`, async () => {68const v = Object.keys(servers[0].sticky).filter((s) =>69s.startsWith("subject."),70);71expect(v.length).toBe(count);72});7374it(`sticky on servers[1] should have no entries starting in "subject".`, async () => {75const v = Object.keys(servers[1].sticky).filter((s) =>76s.startsWith("subject."),77);78expect(v.length).toBe(0);79});8081it(`servers[1]'s link to servers[0] should *eventually* have ${count} entries starting in "subject."`, async () => {82// @ts-ignore83const link = servers[1].clusterLinksByAddress[servers[0].address()];84let v;85await wait({86until: () => {87v = Object.keys(link.sticky).filter((s) => s.startsWith("subject."));88return v.length == count;89},90});91expect(v.length).toBe(count);92});9394it("send message from clients[1] to each subject", async () => {95for (let i = 0; i < count; i++) {96await clients[1].publish(`subject.${i}.foo`);97}98});99100// Sometimes this fails under very heavy load.101// It's not a good test, because it probably hits some timeouts sometimes, and102// it is testing internal structure/optimizations, not behavior.103// Note also that minimizing sticky state computation is just an optimization so even if this test were failing104// due to a bug, it might just mean things are slightly slower.105// it(`sticky on servers[1] should STILL have no entries starting in "subject", since no choices had to be made`, async () => {106// const v = Object.keys(servers[1].sticky).filter((s) =>107// s.startsWith("subject."),108// );109// expect(v.length).toBe(0);110// });111112async function deliveryTest() {113const sub = chosen == 0 ? subs0[0] : subs1[0];114115// clear up the subscription (we sent it stuff above)116const sentinel = randomId();117await clients[0].publish("subject.0.foo", sentinel);118while (true) {119const { value } = await sub.next();120if (value.data == sentinel) {121break;122}123}124for (const server of servers) {125// we randomize the last segment to verify that it is NOT used126// as input to the sticky routing choice.127const { count } = await server128.client()129.publish(`subject.0.${randomId()}`, "delivery-test");130expect(count).toBe(1);131}132const ids = new Set<string>();133for (let i = 0; i < servers.length; i++) {134// on of the subs will receive it and one will hang forever (which is fine)135const { value } = await sub.next();136expect(value.data).toBe("delivery-test");137ids.add(value.client.id);138}139// all messages must go to the SAME subscriber, since sticky140expect(ids.size).toBe(1);141}142143it("publish from every node to subject.0.foo", deliveryTest);144145const count2 = 5;146it(`add ${count2} more nodes to the cluster should be reaonably fast and not blow up in a feedback loop`, async () => {147for (let i = 0; i < count2; i++) {148await addNodeToDefaultCluster();149}150});151152it("wait until cluster is consistent", async () => {153await waitForConsistentState(servers);154});155156it("double check the links have the sticky state", () => {157for (const server of servers.slice(1)) {158// @ts-ignore159const link = server.clusterLinksByAddress[servers[0].address()];160const v = Object.keys(link.sticky).filter((s) =>161s.startsWith("subject."),162);163expect(v.length).toBe(count);164}165});166167it(168"in bigger, cluster, publish from every node to subject.0.foo",169deliveryTest,170);171172it("listen on > and note that it doesn't impact the count", async () => {173const sub = await clients[0].subscribe(">");174for (let i = 0; i < servers.length; i++) {175const { count } = await servers[i]176.client()177.publish("subject.0.foo", "hi");178expect(count).toBe(1);179}180sub.close();181});182183it("unjoining servers[0] from servers[1] should transfer the sticky state to servers[1]", async () => {184await servers[1].unjoin({ address: servers[0].address() });185const v = Object.keys(servers[1].sticky).filter((s) =>186s.startsWith("subject."),187);188expect(v.length).toBe(count);189});190});191192afterAll(after);193194195