Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/sync/dstream.test.ts
1712 views
1
/*
2
Testing basic ops with *persistent* dstreams.
3
4
DEVELOPMENT:
5
6
pnpm test ./dstream.test.ts
7
8
*/
9
10
import { createDstream as create } from "./util";
11
import { dstream as createDstream } from "@cocalc/backend/conat/sync";
12
import { once } from "@cocalc/util/async-utils";
13
import { connect, before, after, wait } from "@cocalc/backend/conat/test/setup";
14
15
beforeAll(before);
16
17
jest.setTimeout(20000);
18
19
describe("create a dstream and do some basic operations", () => {
20
let s;
21
22
it("creates stream", async () => {
23
s = await create();
24
});
25
26
it("starts out empty", () => {
27
expect(s.getAll()).toEqual([]);
28
expect(s.length).toEqual(0);
29
});
30
31
const mesg = { stdout: "hello" };
32
it("publishes a message to the stream and confirms it is there", () => {
33
s.push(mesg);
34
expect(s.getAll()).toEqual([mesg]);
35
expect(s.length).toEqual(1);
36
expect(s[0]).toEqual(mesg);
37
});
38
39
it("verifies that unsaved changes works properly", async () => {
40
expect(s.hasUnsavedChanges()).toBe(true);
41
expect(s.unsavedChanges()).toEqual([mesg]);
42
await s.save();
43
expect(s.hasUnsavedChanges()).toBe(false);
44
expect(s.unsavedChanges()).toEqual([]);
45
});
46
47
it("confirm persistence: closes and re-opens stream and confirms message is still there", async () => {
48
const name = s.name;
49
await s.save();
50
// close s:
51
await s.close();
52
// using s fails
53
expect(s.getAll).toThrow("closed");
54
// create new stream with same name
55
const t = await createDstream({ name });
56
// ensure it is NOT just from the cache
57
expect(s === t).toBe(false);
58
// make sure it has our message
59
expect(t.getAll()).toEqual([mesg]);
60
});
61
});
62
63
describe("create two dstreams and observe sync between them", () => {
64
const name = `test-${Math.random()}`;
65
let s1, s2;
66
it("creates two distinct dstream objects s1 and s2 with the same name", async () => {
67
s1 = await createDstream({ name, noAutosave: true, noCache: true });
68
s2 = await createDstream({ name, noAutosave: true, noCache: true });
69
// definitely distinct
70
expect(s1 === s2).toBe(false);
71
});
72
73
it("writes to s1 and observes s2 doesn't see anything until we save", async () => {
74
s1.push("hello");
75
expect(s1[0]).toEqual("hello");
76
expect(s2.length).toEqual(0);
77
s1.save();
78
await once(s2, "change");
79
expect(s2[0]).toEqual("hello");
80
expect(s2.getAll()).toEqual(["hello"]);
81
});
82
83
it("now write to s2 and save and see that reflected in s1", async () => {
84
s2.push("hi from s2");
85
s2.save();
86
while (s1[1] != "hi from s2") {
87
await once(s1, "change");
88
}
89
expect(s1[1]).toEqual("hi from s2");
90
});
91
92
it("write to s1 and s2 and save at the same time and see some 'random choice' of order gets imposed by the server", async () => {
93
s1.push("s1");
94
s2.push("s2");
95
// our changes are reflected locally
96
expect(s1.getAll()).toEqual(["hello", "hi from s2", "s1"]);
97
expect(s2.getAll()).toEqual(["hello", "hi from s2", "s2"]);
98
// now kick off the two saves *in parallel*
99
s1.save();
100
s2.save();
101
await wait({
102
until: () => {
103
return s1.length == 4 && s2.length == 4;
104
},
105
});
106
expect(s1.getAll()).toEqual(s2.getAll());
107
expect(new Set(s1.getAll())).toEqual(
108
new Set(["hello", "hi from s2", "s1", "s2"]),
109
);
110
});
111
});
112
113
describe("get sequence number and time of message", () => {
114
let s;
115
116
it("creates stream and write message", async () => {
117
s = await create();
118
s.push("hello");
119
});
120
121
it("sequence number is initialized undefined because it is server assigned ", async () => {
122
const n = s.seq(0);
123
expect(n).toBe(undefined);
124
});
125
126
it("time also undefined because it is server assigned ", async () => {
127
const t = s.time(0);
128
expect(t).toBe(undefined);
129
});
130
131
it("save and get server assigned sequence number", async () => {
132
s.save();
133
await once(s, "change");
134
const n = s.seq(0);
135
expect(n).toBeGreaterThan(0);
136
});
137
138
it("seqs gives all sequence numbers", () => {
139
expect(s.seqs()).toEqual([s.seq(0)]);
140
});
141
142
it("get server assigned time", async () => {
143
const t = s.time(0);
144
// since testing on the same machine as server, these times should be close:
145
expect(t.getTime() - Date.now()).toBeLessThan(5000);
146
});
147
148
it("publish another message and get next server number is bigger", async () => {
149
const n = s.seq(0);
150
s.push("there");
151
await s.save();
152
const m = s.seq(1);
153
expect(m).toBeGreaterThan(n);
154
expect(s.seqs()).toEqual([n, m]);
155
});
156
157
it("and time is bigger", async () => {
158
if (s.time(1) == null) {
159
await once(s, "change");
160
}
161
expect(s.time(0).getTime()).toBeLessThan(s.time(1).getTime());
162
});
163
});
164
165
describe("closing also saves by default, but not if autosave is off", () => {
166
let s;
167
const name = `test-${Math.random()}`;
168
169
it("creates stream and write a message", async () => {
170
// noAutosave: false is the default:
171
s = await createDstream({ name, noAutosave: false });
172
s.push(389);
173
});
174
175
it("closes then opens and message is there, since autosave is on", async () => {
176
await s.close();
177
const t = await createDstream({ name });
178
expect(t[0]).toEqual(389);
179
});
180
181
it("make another stream with autosave off, and close which causes LOSS OF DATA", async () => {
182
const name = `test-${Math.random()}`;
183
const s = await createDstream({ name, noAutosave: true });
184
s.push(389);
185
s.close();
186
const t = await createDstream({ name, noAutosave: true });
187
// data is gone forever!
188
expect(t.length).toBe(0);
189
});
190
});
191
192
describe("testing start_seq", () => {
193
const name = `test-${Math.random()}`;
194
let seq;
195
it("creates a stream and adds 3 messages, noting their assigned sequence numbers", async () => {
196
const s = await createDstream({ name, noAutosave: true });
197
s.push(1, 2, 3);
198
expect(s.getAll()).toEqual([1, 2, 3]);
199
// save, thus getting sequence numbers
200
s.save();
201
while (s.seq(2) == null) {
202
s.save();
203
await once(s, "change");
204
}
205
seq = [s.seq(0), s.seq(1), s.seq(2)];
206
// tests partly that these are integers...
207
const n = seq.reduce((a, b) => a + b, 0);
208
expect(typeof n).toBe("number");
209
expect(n).toBeGreaterThan(2);
210
await s.close();
211
});
212
213
let s;
214
it("it opens the stream but starting with the last sequence number, so only one message", async () => {
215
s = await createDstream({
216
name,
217
noAutosave: true,
218
start_seq: seq[2],
219
});
220
expect(s.length).toBe(1);
221
expect(s.getAll()).toEqual([3]);
222
expect(s.start_seq).toEqual(seq[2]);
223
});
224
225
it("it then pulls in the previous message, so now two messages are loaded", async () => {
226
await s.load({ start_seq: seq[1] });
227
expect(s.length).toBe(2);
228
expect(s.getAll()).toEqual([2, 3]);
229
expect(s.start_seq).toEqual(seq[1]);
230
});
231
232
it("a bigger example involving loading older messages", async () => {
233
for (let i = 4; i < 100; i++) {
234
s.push(i);
235
}
236
await s.save();
237
const last = s.seq(s.length - 1);
238
const mid = s.seq(s.length - 50);
239
await s.close();
240
s = await createDstream({
241
name,
242
noAutosave: true,
243
start_seq: last,
244
});
245
expect(s.length).toBe(1);
246
expect(s.getAll()).toEqual([99]);
247
expect(s.start_seq).toEqual(last);
248
249
await s.load({ start_seq: mid });
250
expect(s.length).toEqual(50);
251
expect(s.start_seq).toEqual(mid);
252
for (let i = 0; i < 50; i++) {
253
expect(s.get(i)).toBe(i + 50);
254
}
255
256
await s.load({ start_seq: 0 });
257
for (let i = 0; i < 99; i++) {
258
expect(s.get(i)).toBe(i + 1);
259
}
260
});
261
});
262
263
describe("a little bit of a stress test", () => {
264
const name = `test-${Math.random()}`;
265
const count = 100;
266
let s;
267
it(`creates a stream and pushes ${count} messages`, async () => {
268
s = await createDstream({
269
name,
270
noAutosave: true,
271
});
272
for (let i = 0; i < count; i++) {
273
s.push({ i });
274
}
275
expect(s.length).toBe(count);
276
// NOTE: warning -- this is **MUCH SLOWER**, e.g., 10x slower,
277
// running under jest, hence why count is small.
278
await s.save();
279
expect(s.length).toBe(count);
280
});
281
});
282
283
describe("dstream typescript test", () => {
284
it("creates stream", async () => {
285
const name = `test-${Math.random()}`;
286
const s = await createDstream<string>({ name });
287
288
// write a message with the correct type
289
s.push("foo");
290
291
// wrong type -- no way to test this, but if you uncomment
292
// this you should get a typescript error:
293
294
// s.push({ foo: "bar" });
295
});
296
});
297
298
describe("ensure there isn't a really obvious subscription leak", () => {
299
let client;
300
301
it("create a client, which initially has only one subscription (the inbox)", async () => {
302
client = connect();
303
await client.getInbox();
304
expect(client.numSubscriptions()).toBe(1);
305
});
306
307
const count = 100;
308
it(`creates and closes ${count} streams and checks there is no leak`, async () => {
309
const before = client.numSubscriptions();
310
// create
311
const a: any = [];
312
for (let i = 0; i < count; i++) {
313
a[i] = await createDstream({
314
name: `${Math.random()}`,
315
});
316
}
317
for (let i = 0; i < count; i++) {
318
await a[i].close();
319
}
320
const after = client.numSubscriptions();
321
expect(after).toBe(before);
322
323
// also check count on server went down.
324
expect((await client.getSubscriptions()).size).toBe(before);
325
});
326
327
it("does another leak test, but with a publish operation each time", async () => {
328
const before = client.numSubscriptions();
329
// create
330
const a: any = [];
331
for (let i = 0; i < count; i++) {
332
a[i] = await createDstream({
333
name: `${Math.random()}`,
334
noAutosave: true,
335
});
336
a[i].publish(i);
337
await a[i].save();
338
}
339
for (let i = 0; i < count; i++) {
340
await a[i].close();
341
}
342
const after = client.numSubscriptions();
343
expect(after).toBe(before);
344
});
345
});
346
347
describe("test delete of messages from stream", () => {
348
let client1, client2, s1, s2;
349
const name = "test-delete";
350
it("create two clients", async () => {
351
client1 = connect();
352
client2 = connect();
353
s1 = await createDstream({
354
client: client1,
355
name,
356
noAutosave: true,
357
noCache: true,
358
});
359
s2 = await createDstream({
360
client: client2,
361
name,
362
noAutosave: true,
363
noCache: true,
364
});
365
});
366
367
it("writes message one, confirm seen by other, then delete and confirm works", async () => {
368
s1.push("hello");
369
await s1.save();
370
await wait({ until: () => s2.length > 0 });
371
s1.delete({ all: true });
372
await wait({ until: () => s2.length == 0 && s1.length == 0 });
373
});
374
375
it("same delete test as above but with a few more items and delete on s2 instead", async () => {
376
for (let i = 0; i < 10; i++) {
377
s1.push(i);
378
}
379
await s1.save();
380
await wait({ until: () => s2.length == 10 });
381
s2.delete({ all: true });
382
await wait({ until: () => s2.length == 0 && s1.length == 0 });
383
});
384
385
it("delete specific index", async () => {
386
s1.push("x", "y", "z");
387
await s1.save();
388
await wait({ until: () => s2.length == 3 });
389
s2.delete({ last_index: 1 });
390
await wait({ until: () => s2.length == 1 && s1.length == 1 });
391
expect(s1.get()).toEqual(["z"]);
392
});
393
394
it("delete specific seq number", async () => {
395
s1.push("x", "y");
396
await s1.save();
397
expect(s1.get()).toEqual(["z", "x", "y"]);
398
const seq = s1.seq(1);
399
const { seqs } = await s1.delete({ seq });
400
expect(seqs).toEqual([seq]);
401
await wait({ until: () => s2.length == 2 && s1.length == 2 });
402
expect(s1.get()).toEqual(["z", "y"]);
403
});
404
405
it("delete up to a sequence number", async () => {
406
s1.push("x", "y");
407
await s1.save();
408
expect(s1.get()).toEqual(["z", "y", "x", "y"]);
409
const seq = s1.seq(1);
410
const { seqs } = await s1.delete({ last_seq: seq });
411
expect(seqs.length).toBe(2);
412
expect(seqs[1]).toBe(seq);
413
await wait({ until: () => s1.length == 2 });
414
expect(s1.get()).toEqual(["x", "y"]);
415
});
416
417
it("delete an array of sequence numbers", async () => {
418
s1.push("x", "y", "z");
419
await s1.save();
420
const v = await s1.getAll();
421
const seqs0 = [s1.seq(0), s1.seq(2)];
422
const { seqs } = await s1.delete({ seqs: seqs0 });
423
expect(seqs).toEqual(seqs0);
424
await wait({ until: () => s1.length == v.length - 2 });
425
expect(s1.getAll()).toEqual([v[1]].concat(v.slice(3)));
426
});
427
});
428
429
afterAll(after);
430
431