Path: blob/master/src/packages/backend/conat/test/sync/limits.test.ts
1712 views
/*1Testing the limits.23DEVELOPMENT:45pnpm test ./limits.test.ts67*/89import { dkv as createDkv } from "@cocalc/backend/conat/sync";10import { dstream as createDstream } from "@cocalc/backend/conat/sync";11import { delay } from "awaiting";12import { once } from "@cocalc/util/async-utils";13import {14before,15after,16wait,17connect,18client,19} from "@cocalc/backend/conat/test/setup";2021beforeAll(before);2223jest.setTimeout(15000);24describe("create a dkv with limit on the total number of keys, and confirm auto-delete works", () => {25let kv;26const name = `test-${Math.random()}`;2728it("creates the dkv", async () => {29kv = await createDkv({ client, name, config: { max_msgs: 2 } });30expect(kv.getAll()).toEqual({});31});3233it("adds 2 keys, then a third, and sees first is gone", async () => {34kv.a = 10;35kv.b = 20;36expect(kv.a).toEqual(10);37expect(kv.b).toEqual(20);38kv.c = 30;39expect(kv.c).toEqual(30);40// have to wait until it's all saved and acknowledged before enforcing limit41if (!kv.isStable()) {42await once(kv, "stable");43}44// next change is the enforcement happening45if (kv.has("a")) {46await once(kv, "change", 500);47}48// and confirm it49expect(kv.a).toBe(undefined);50expect(kv.getAll()).toEqual({ b: 20, c: 30 });51});5253it("closes the kv", async () => {54await kv.clear();55await kv.close();56});57});5859describe("create a dkv with limit on age of keys, and confirm auto-delete works", () => {60let kv;61const name = `test-${Math.random()}`;6263it("creates the dkv", async () => {64kv = await createDkv({ client, name, config: { max_age: 50 } });65expect(kv.getAll()).toEqual({});66});6768it("adds 2 keys, then a third, and sees first two are gone due to aging out", async () => {69kv.a = 10;70kv.b = 20;71expect(kv.a).toEqual(10);72expect(kv.b).toEqual(20);73await kv.save();74await kv.config();75await delay(50);76await kv.config();77await delay(10);78expect(kv.has("a")).toBe(false);79expect(kv.has("b")).toBe(false);80});8182it("closes the kv", async () => {83await kv.clear();84await kv.close();85});86});8788describe("create a dkv with limit on total bytes of keys, and confirm auto-delete works", () => {89let kv;90const name = `test-${Math.random()}`;9192it("creates the dkv", async () => {93kv = await createDkv({ client, name, config: { max_bytes: 100 } });94expect(kv.getAll()).toEqual({});95});9697it("adds a key, then a second, and sees first one is gone due to bytes", async () => {98kv.a = "x".repeat(50);99kv.b = "x".repeat(55);100expect(kv.getAll()).toEqual({ a: "x".repeat(50), b: "x".repeat(55) });101await kv.save();102expect(kv.has("b")).toBe(true);103await wait({104until: async () => {105await kv.config();106return !kv.has("a");107},108});109expect(kv.getAll()).toEqual({ b: "x".repeat(55) });110});111112it("closes the kv", async () => {113await kv.clear();114await kv.close();115});116});117118describe("create a dkv with limit on max_msg_size, and confirm writing small messages works but writing a big one result in a 'reject' event", () => {119let kv;120const name = `test-${Math.random()}`;121122it("creates the dkv", async () => {123kv = await createDkv({ client, name, config: { max_msg_size: 100 } });124expect(kv.getAll()).toEqual({});125});126127it("adds a key, then a second big one results in a 'reject' event", async () => {128const rejects: { key: string; value: string }[] = [];129kv.once("reject", (x) => {130rejects.push(x);131});132kv.a = "x".repeat(50);133await kv.save();134kv.b = "x".repeat(150);135await kv.save();136expect(rejects).toEqual([{ key: "b", value: "x".repeat(150) }]);137expect(kv.has("b")).toBe(false);138});139140it("closes the kv", async () => {141await kv.clear();142await kv.close();143});144});145146describe("create a dstream with limit on the total number of messages, and confirm max_msgs, max_age works", () => {147let s, s2;148const name = `test-${Math.random()}`;149150it("creates the dstream and another with a different client", async () => {151s = await createDstream({ client, name, config: { max_msgs: 2 } });152s2 = await createDstream({153client: connect(),154name,155config: { max_msgs: 2 },156noCache: true,157});158expect(s.get()).toEqual([]);159expect((await s.config()).max_msgs).toBe(2);160expect((await s2.config()).max_msgs).toBe(2);161});162163it("push 2 messages, then a third, and see first is gone and that this is reflected on both clients", async () => {164expect((await s.config()).max_msgs).toBe(2);165expect((await s2.config()).max_msgs).toBe(2);166s.push("a");167s.push("b");168await wait({ until: () => s.length == 2 && s2.length == 2 });169expect(s2.get()).toEqual(["a", "b"]);170s.push("c");171await wait({172until: () =>173s.get(0) != "a" &&174s.get(1) == "c" &&175s2.get(0) != "a" &&176s2.get(1) == "c",177});178expect(s.getAll()).toEqual(["b", "c"]);179expect(s2.getAll()).toEqual(["b", "c"]);180181// also check limits ar enforced if we close, then open new one:182await s.close();183s = await createDstream({ client, name, config: { max_msgs: 2 } });184expect(s.getAll()).toEqual(["b", "c"]);185186await s.config({ max_msgs: -1 });187});188189it("verifies that max_age works", async () => {190await s.save();191expect(s.hasUnsavedChanges()).toBe(false);192await delay(1100);193// this "new" should be much newer than 1s194s.push("new");195await s.config({ max_age: 1000 }); // anything older than 1s should be deleted196await wait({ until: () => s.length == 1 });197expect(s.getAll()).toEqual(["new"]);198await s.config({ max_age: -1 });199});200201it("verifies that ttl works by writing a message and making sure it gets automatically deleted soon", async () => {202const conf = await s.config();203expect(conf.allow_msg_ttl).toBe(false);204const conf2 = await s.config({ max_age: -1, allow_msg_ttl: true });205expect(conf2.allow_msg_ttl).toBe(true);206207s.publish("ttl-message", { ttl: 50 });208expect(s.length).toBe(2);209await s.save();210await wait({211until: async () => {212await s.config();213return s.length == 1;214},215});216expect(s.length).toBe(1);217expect(s.get()).toEqual(["new"]);218});219220it("verifies that max_bytes works -- publishing something too large causes everything to end up gone", async () => {221const conf = await s.config({ max_bytes: 100 });222expect(conf.max_bytes).toBe(100);223s.publish("x".repeat(1000));224await s.config();225await wait({ until: () => s.length == 0 });226expect(s.length).toBe(0);227});228229it("max_bytes -- publish something then another thing that causes the first to get deleted", async () => {230s.publish("x".repeat(75));231s.publish("y".repeat(90));232await wait({233until: async () => {234await s.config();235return s.length == 1;236},237});238expect(s.get()).toEqual(["y".repeat(90)]);239await s.config({ max_bytes: -1 });240});241242it("verifies that max_msg_size rejects messages that are too big", async () => {243await s.config({ max_msg_size: 100 });244expect((await s.config()).max_msg_size).toBe(100);245s.publish("x".repeat(70));246await expect(async () => {247await s.stream.publish("x".repeat(150));248}).rejects.toThrow("max_msg_size");249await s.config({ max_msg_size: 200 });250s.publish("x".repeat(150));251await s.config({ max_msg_size: -1 });252expect((await s.config()).max_msg_size).toBe(-1);253});254255it("closes the stream", async () => {256await s.close();257await s2.close();258});259});260261describe("create a dstream with limit on max_age, and confirm auto-delete works", () => {262let s;263const name = `test-${Math.random()}`;264265it("creates the dstream", async () => {266s = await createDstream({ client, name, config: { max_age: 50 } });267});268269it("push a message, then another and see first disappears", async () => {270s.push({ a: 10 });271await delay(75);272s.push({ b: 20 });273expect(s.get()).toEqual([{ a: 10 }, { b: 20 }]);274await wait({275until: async () => {276await s.config();277return s.length == 1;278},279});280expect(s.getAll()).toEqual([{ b: 20 }]);281});282283it("closes the stream", async () => {284await s.delete({ all: true });285await s.close();286});287});288289describe("create a dstream with limit on max_bytes, and confirm auto-delete works", () => {290let s;291const name = `test-${Math.random()}`;292293it("creates the dstream", async () => {294// note: 60 and not 40 due to slack for headers295s = await createDstream({ client, name, config: { max_bytes: 60 } });296});297298it("push a message, then another and see first disappears", async () => {299s.push("x".repeat(40));300s.push("x".repeat(45));301s.push("x");302if (!s.isStable()) {303await once(s, "stable");304}305expect(s.getAll()).toEqual(["x".repeat(45), "x"]);306});307308it("closes the stream", async () => {309await s.delete({ all: true });310await s.close();311});312});313314describe("create a dstream with limit on max_msg_size, and confirm auto-delete works", () => {315let s;316const name = `test-${Math.random()}`;317318it("creates the dstream", async () => {319s = await createDstream({ client, name, config: { max_msg_size: 50 } });320});321322it("push a message, then another and see first disappears", async () => {323const rejects: any[] = [];324s.on("reject", ({ mesg }) => {325rejects.push(mesg);326});327s.push("x".repeat(40));328s.push("y".repeat(60)); // silently vanishes (well a reject event is emitted)329s.push("x");330await wait({331until: async () => {332await s.config();333return s.length == 2;334},335});336expect(s.getAll()).toEqual(["x".repeat(40), "x"]);337expect(rejects).toEqual(["y".repeat(60)]);338});339340it("closes the stream", async () => {341await s.close();342});343});344345describe("test discard_policy 'new' where writes are rejected rather than old data being deleted, for max_bytes and max_msgs", () => {346let s;347const name = `test-${Math.random()}`;348349it("creates the dstream", async () => {350s = await createDstream({351client,352name,353// we can write at most 300 bytes and 3 messages. beyond that we354// get reject events.355config: {356max_bytes: 300,357max_msgs: 3,358discard_policy: "new",359desc: { example: "config" },360},361});362const rejects: any[] = [];363s.on("reject", ({ mesg }) => {364rejects.push(mesg);365});366s.publish("x");367s.publish("y");368s.publish("w");369s.publish("foo");370371await wait({372until: async () => {373await s.config();374return rejects.length == 1;375},376});377expect(s.getAll()).toEqual(["x", "y", "w"]);378expect(rejects).toEqual(["foo"]);379380s.publish("x".repeat(299));381await wait({382until: async () => {383await s.config();384return rejects.length == 2;385},386});387expect(s.getAll()).toEqual(["x", "y", "w"]);388expect(rejects).toEqual(["foo", "x".repeat(299)]);389});390391it("check the config is persisted", async () => {392const lastConfig = await s.config();393s.close();394s = await createDstream({395client,396name,397noCache: true,398});399const config = await s.config();400expect(lastConfig).toEqual(config);401expect(lastConfig.desc).toEqual({ example: "config" });402});403404it("closes the stream", async () => {405s.close();406});407});408409describe("test rate limiting", () => {410let s;411const name = `test-${Math.random()}`;412413it("creates the dstream", async () => {414s = await createDstream({415client,416name,417// we can write at most 300 bytes and 3 messages. beyond that we418// get reject events.419config: {420max_bytes_per_second: 300,421max_msgs_per_second: 3,422discard_policy: "new",423},424});425const rejects: any[] = [];426s.on("reject", ({ mesg }) => {427rejects.push(mesg);428});429});430431it("closes the stream", async () => {432await s.close();433});434});435436import { EPHEMERAL_MAX_BYTES } from "@cocalc/conat/persist/storage";437describe(`ephemeral streams always have a hard cap of ${EPHEMERAL_MAX_BYTES} on max_bytes `, () => {438let s;439it("creates a non-ephemeral dstream and checks no automatic max_bytes set", async () => {440const s1 = await createDstream({441client,442name: "test-NON-ephemeral",443ephemeral: false,444});445expect((await s1.config()).max_bytes).toBe(-1);446s1.close();447});448449it("creates an ephemeral dstream and checks max bytes automatically set", async () => {450s = await createDstream({451client,452name: "test-ephemeral",453ephemeral: true,454});455expect((await s.config()).max_bytes).toBe(EPHEMERAL_MAX_BYTES);456});457458it("trying to set larger doesn't work", async () => {459expect(460(await s.config({ max_bytes: 2 * EPHEMERAL_MAX_BYTES })).max_bytes,461).toBe(EPHEMERAL_MAX_BYTES);462expect((await s.config()).max_bytes).toBe(EPHEMERAL_MAX_BYTES);463});464465it("setting it smaller is allowed", async () => {466expect(467(await s.config({ max_bytes: EPHEMERAL_MAX_BYTES / 2 })).max_bytes,468).toBe(EPHEMERAL_MAX_BYTES / 2);469expect((await s.config()).max_bytes).toBe(EPHEMERAL_MAX_BYTES / 2);470});471});472473afterAll(after);474475476