Path: blob/master/src/packages/backend/conat/test/sync/dstream.test.ts
1712 views
/*1Testing basic ops with *persistent* dstreams.23DEVELOPMENT:45pnpm test ./dstream.test.ts67*/89import { createDstream as create } from "./util";10import { dstream as createDstream } from "@cocalc/backend/conat/sync";11import { once } from "@cocalc/util/async-utils";12import { connect, before, after, wait } from "@cocalc/backend/conat/test/setup";1314beforeAll(before);1516jest.setTimeout(20000);1718describe("create a dstream and do some basic operations", () => {19let s;2021it("creates stream", async () => {22s = await create();23});2425it("starts out empty", () => {26expect(s.getAll()).toEqual([]);27expect(s.length).toEqual(0);28});2930const mesg = { stdout: "hello" };31it("publishes a message to the stream and confirms it is there", () => {32s.push(mesg);33expect(s.getAll()).toEqual([mesg]);34expect(s.length).toEqual(1);35expect(s[0]).toEqual(mesg);36});3738it("verifies that unsaved changes works properly", async () => {39expect(s.hasUnsavedChanges()).toBe(true);40expect(s.unsavedChanges()).toEqual([mesg]);41await s.save();42expect(s.hasUnsavedChanges()).toBe(false);43expect(s.unsavedChanges()).toEqual([]);44});4546it("confirm persistence: closes and re-opens stream and confirms message is still there", async () => {47const name = s.name;48await s.save();49// close s:50await s.close();51// using s fails52expect(s.getAll).toThrow("closed");53// create new stream with same name54const t = await createDstream({ name });55// ensure it is NOT just from the cache56expect(s === t).toBe(false);57// make sure it has our message58expect(t.getAll()).toEqual([mesg]);59});60});6162describe("create two dstreams and observe sync between them", () => {63const name = `test-${Math.random()}`;64let s1, s2;65it("creates two distinct dstream objects s1 and s2 with the same name", async () => {66s1 = await createDstream({ name, noAutosave: true, noCache: true });67s2 = await createDstream({ name, noAutosave: true, noCache: true });68// definitely distinct69expect(s1 === s2).toBe(false);70});7172it("writes to s1 and observes s2 doesn't see anything until we save", async () => {73s1.push("hello");74expect(s1[0]).toEqual("hello");75expect(s2.length).toEqual(0);76s1.save();77await once(s2, "change");78expect(s2[0]).toEqual("hello");79expect(s2.getAll()).toEqual(["hello"]);80});8182it("now write to s2 and save and see that reflected in s1", async () => {83s2.push("hi from s2");84s2.save();85while (s1[1] != "hi from s2") {86await once(s1, "change");87}88expect(s1[1]).toEqual("hi from s2");89});9091it("write to s1 and s2 and save at the same time and see some 'random choice' of order gets imposed by the server", async () => {92s1.push("s1");93s2.push("s2");94// our changes are reflected locally95expect(s1.getAll()).toEqual(["hello", "hi from s2", "s1"]);96expect(s2.getAll()).toEqual(["hello", "hi from s2", "s2"]);97// now kick off the two saves *in parallel*98s1.save();99s2.save();100await wait({101until: () => {102return s1.length == 4 && s2.length == 4;103},104});105expect(s1.getAll()).toEqual(s2.getAll());106expect(new Set(s1.getAll())).toEqual(107new Set(["hello", "hi from s2", "s1", "s2"]),108);109});110});111112describe("get sequence number and time of message", () => {113let s;114115it("creates stream and write message", async () => {116s = await create();117s.push("hello");118});119120it("sequence number is initialized undefined because it is server assigned ", async () => {121const n = s.seq(0);122expect(n).toBe(undefined);123});124125it("time also undefined because it is server assigned ", async () => {126const t = s.time(0);127expect(t).toBe(undefined);128});129130it("save and get server assigned sequence number", async () => {131s.save();132await once(s, "change");133const n = s.seq(0);134expect(n).toBeGreaterThan(0);135});136137it("seqs gives all sequence numbers", () => {138expect(s.seqs()).toEqual([s.seq(0)]);139});140141it("get server assigned time", async () => {142const t = s.time(0);143// since testing on the same machine as server, these times should be close:144expect(t.getTime() - Date.now()).toBeLessThan(5000);145});146147it("publish another message and get next server number is bigger", async () => {148const n = s.seq(0);149s.push("there");150await s.save();151const m = s.seq(1);152expect(m).toBeGreaterThan(n);153expect(s.seqs()).toEqual([n, m]);154});155156it("and time is bigger", async () => {157if (s.time(1) == null) {158await once(s, "change");159}160expect(s.time(0).getTime()).toBeLessThan(s.time(1).getTime());161});162});163164describe("closing also saves by default, but not if autosave is off", () => {165let s;166const name = `test-${Math.random()}`;167168it("creates stream and write a message", async () => {169// noAutosave: false is the default:170s = await createDstream({ name, noAutosave: false });171s.push(389);172});173174it("closes then opens and message is there, since autosave is on", async () => {175await s.close();176const t = await createDstream({ name });177expect(t[0]).toEqual(389);178});179180it("make another stream with autosave off, and close which causes LOSS OF DATA", async () => {181const name = `test-${Math.random()}`;182const s = await createDstream({ name, noAutosave: true });183s.push(389);184s.close();185const t = await createDstream({ name, noAutosave: true });186// data is gone forever!187expect(t.length).toBe(0);188});189});190191describe("testing start_seq", () => {192const name = `test-${Math.random()}`;193let seq;194it("creates a stream and adds 3 messages, noting their assigned sequence numbers", async () => {195const s = await createDstream({ name, noAutosave: true });196s.push(1, 2, 3);197expect(s.getAll()).toEqual([1, 2, 3]);198// save, thus getting sequence numbers199s.save();200while (s.seq(2) == null) {201s.save();202await once(s, "change");203}204seq = [s.seq(0), s.seq(1), s.seq(2)];205// tests partly that these are integers...206const n = seq.reduce((a, b) => a + b, 0);207expect(typeof n).toBe("number");208expect(n).toBeGreaterThan(2);209await s.close();210});211212let s;213it("it opens the stream but starting with the last sequence number, so only one message", async () => {214s = await createDstream({215name,216noAutosave: true,217start_seq: seq[2],218});219expect(s.length).toBe(1);220expect(s.getAll()).toEqual([3]);221expect(s.start_seq).toEqual(seq[2]);222});223224it("it then pulls in the previous message, so now two messages are loaded", async () => {225await s.load({ start_seq: seq[1] });226expect(s.length).toBe(2);227expect(s.getAll()).toEqual([2, 3]);228expect(s.start_seq).toEqual(seq[1]);229});230231it("a bigger example involving loading older messages", async () => {232for (let i = 4; i < 100; i++) {233s.push(i);234}235await s.save();236const last = s.seq(s.length - 1);237const mid = s.seq(s.length - 50);238await s.close();239s = await createDstream({240name,241noAutosave: true,242start_seq: last,243});244expect(s.length).toBe(1);245expect(s.getAll()).toEqual([99]);246expect(s.start_seq).toEqual(last);247248await s.load({ start_seq: mid });249expect(s.length).toEqual(50);250expect(s.start_seq).toEqual(mid);251for (let i = 0; i < 50; i++) {252expect(s.get(i)).toBe(i + 50);253}254255await s.load({ start_seq: 0 });256for (let i = 0; i < 99; i++) {257expect(s.get(i)).toBe(i + 1);258}259});260});261262describe("a little bit of a stress test", () => {263const name = `test-${Math.random()}`;264const count = 100;265let s;266it(`creates a stream and pushes ${count} messages`, async () => {267s = await createDstream({268name,269noAutosave: true,270});271for (let i = 0; i < count; i++) {272s.push({ i });273}274expect(s.length).toBe(count);275// NOTE: warning -- this is **MUCH SLOWER**, e.g., 10x slower,276// running under jest, hence why count is small.277await s.save();278expect(s.length).toBe(count);279});280});281282describe("dstream typescript test", () => {283it("creates stream", async () => {284const name = `test-${Math.random()}`;285const s = await createDstream<string>({ name });286287// write a message with the correct type288s.push("foo");289290// wrong type -- no way to test this, but if you uncomment291// this you should get a typescript error:292293// s.push({ foo: "bar" });294});295});296297describe("ensure there isn't a really obvious subscription leak", () => {298let client;299300it("create a client, which initially has only one subscription (the inbox)", async () => {301client = connect();302await client.getInbox();303expect(client.numSubscriptions()).toBe(1);304});305306const count = 100;307it(`creates and closes ${count} streams and checks there is no leak`, async () => {308const before = client.numSubscriptions();309// create310const a: any = [];311for (let i = 0; i < count; i++) {312a[i] = await createDstream({313name: `${Math.random()}`,314});315}316for (let i = 0; i < count; i++) {317await a[i].close();318}319const after = client.numSubscriptions();320expect(after).toBe(before);321322// also check count on server went down.323expect((await client.getSubscriptions()).size).toBe(before);324});325326it("does another leak test, but with a publish operation each time", async () => {327const before = client.numSubscriptions();328// create329const a: any = [];330for (let i = 0; i < count; i++) {331a[i] = await createDstream({332name: `${Math.random()}`,333noAutosave: true,334});335a[i].publish(i);336await a[i].save();337}338for (let i = 0; i < count; i++) {339await a[i].close();340}341const after = client.numSubscriptions();342expect(after).toBe(before);343});344});345346describe("test delete of messages from stream", () => {347let client1, client2, s1, s2;348const name = "test-delete";349it("create two clients", async () => {350client1 = connect();351client2 = connect();352s1 = await createDstream({353client: client1,354name,355noAutosave: true,356noCache: true,357});358s2 = await createDstream({359client: client2,360name,361noAutosave: true,362noCache: true,363});364});365366it("writes message one, confirm seen by other, then delete and confirm works", async () => {367s1.push("hello");368await s1.save();369await wait({ until: () => s2.length > 0 });370s1.delete({ all: true });371await wait({ until: () => s2.length == 0 && s1.length == 0 });372});373374it("same delete test as above but with a few more items and delete on s2 instead", async () => {375for (let i = 0; i < 10; i++) {376s1.push(i);377}378await s1.save();379await wait({ until: () => s2.length == 10 });380s2.delete({ all: true });381await wait({ until: () => s2.length == 0 && s1.length == 0 });382});383384it("delete specific index", async () => {385s1.push("x", "y", "z");386await s1.save();387await wait({ until: () => s2.length == 3 });388s2.delete({ last_index: 1 });389await wait({ until: () => s2.length == 1 && s1.length == 1 });390expect(s1.get()).toEqual(["z"]);391});392393it("delete specific seq number", async () => {394s1.push("x", "y");395await s1.save();396expect(s1.get()).toEqual(["z", "x", "y"]);397const seq = s1.seq(1);398const { seqs } = await s1.delete({ seq });399expect(seqs).toEqual([seq]);400await wait({ until: () => s2.length == 2 && s1.length == 2 });401expect(s1.get()).toEqual(["z", "y"]);402});403404it("delete up to a sequence number", async () => {405s1.push("x", "y");406await s1.save();407expect(s1.get()).toEqual(["z", "y", "x", "y"]);408const seq = s1.seq(1);409const { seqs } = await s1.delete({ last_seq: seq });410expect(seqs.length).toBe(2);411expect(seqs[1]).toBe(seq);412await wait({ until: () => s1.length == 2 });413expect(s1.get()).toEqual(["x", "y"]);414});415416it("delete an array of sequence numbers", async () => {417s1.push("x", "y", "z");418await s1.save();419const v = await s1.getAll();420const seqs0 = [s1.seq(0), s1.seq(2)];421const { seqs } = await s1.delete({ seqs: seqs0 });422expect(seqs).toEqual(seqs0);423await wait({ until: () => s1.length == v.length - 2 });424expect(s1.getAll()).toEqual([v[1]].concat(v.slice(3)));425});426});427428afterAll(after);429430431