Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/sync/limits.test.ts
1712 views
1
/*
2
Testing the limits.
3
4
DEVELOPMENT:
5
6
pnpm test ./limits.test.ts
7
8
*/
9
10
import { dkv as createDkv } from "@cocalc/backend/conat/sync";
11
import { dstream as createDstream } from "@cocalc/backend/conat/sync";
12
import { delay } from "awaiting";
13
import { once } from "@cocalc/util/async-utils";
14
import {
15
before,
16
after,
17
wait,
18
connect,
19
client,
20
} from "@cocalc/backend/conat/test/setup";
21
22
beforeAll(before);
23
24
jest.setTimeout(15000);
25
describe("create a dkv with limit on the total number of keys, and confirm auto-delete works", () => {
26
let kv;
27
const name = `test-${Math.random()}`;
28
29
it("creates the dkv", async () => {
30
kv = await createDkv({ client, name, config: { max_msgs: 2 } });
31
expect(kv.getAll()).toEqual({});
32
});
33
34
it("adds 2 keys, then a third, and sees first is gone", async () => {
35
kv.a = 10;
36
kv.b = 20;
37
expect(kv.a).toEqual(10);
38
expect(kv.b).toEqual(20);
39
kv.c = 30;
40
expect(kv.c).toEqual(30);
41
// have to wait until it's all saved and acknowledged before enforcing limit
42
if (!kv.isStable()) {
43
await once(kv, "stable");
44
}
45
// next change is the enforcement happening
46
if (kv.has("a")) {
47
await once(kv, "change", 500);
48
}
49
// and confirm it
50
expect(kv.a).toBe(undefined);
51
expect(kv.getAll()).toEqual({ b: 20, c: 30 });
52
});
53
54
it("closes the kv", async () => {
55
await kv.clear();
56
await kv.close();
57
});
58
});
59
60
describe("create a dkv with limit on age of keys, and confirm auto-delete works", () => {
61
let kv;
62
const name = `test-${Math.random()}`;
63
64
it("creates the dkv", async () => {
65
kv = await createDkv({ client, name, config: { max_age: 50 } });
66
expect(kv.getAll()).toEqual({});
67
});
68
69
it("adds 2 keys, then a third, and sees first two are gone due to aging out", async () => {
70
kv.a = 10;
71
kv.b = 20;
72
expect(kv.a).toEqual(10);
73
expect(kv.b).toEqual(20);
74
await kv.save();
75
await kv.config();
76
await delay(50);
77
await kv.config();
78
await delay(10);
79
expect(kv.has("a")).toBe(false);
80
expect(kv.has("b")).toBe(false);
81
});
82
83
it("closes the kv", async () => {
84
await kv.clear();
85
await kv.close();
86
});
87
});
88
89
describe("create a dkv with limit on total bytes of keys, and confirm auto-delete works", () => {
90
let kv;
91
const name = `test-${Math.random()}`;
92
93
it("creates the dkv", async () => {
94
kv = await createDkv({ client, name, config: { max_bytes: 100 } });
95
expect(kv.getAll()).toEqual({});
96
});
97
98
it("adds a key, then a second, and sees first one is gone due to bytes", async () => {
99
kv.a = "x".repeat(50);
100
kv.b = "x".repeat(55);
101
expect(kv.getAll()).toEqual({ a: "x".repeat(50), b: "x".repeat(55) });
102
await kv.save();
103
expect(kv.has("b")).toBe(true);
104
await wait({
105
until: async () => {
106
await kv.config();
107
return !kv.has("a");
108
},
109
});
110
expect(kv.getAll()).toEqual({ b: "x".repeat(55) });
111
});
112
113
it("closes the kv", async () => {
114
await kv.clear();
115
await kv.close();
116
});
117
});
118
119
describe("create a dkv with limit on max_msg_size, and confirm writing small messages works but writing a big one result in a 'reject' event", () => {
120
let kv;
121
const name = `test-${Math.random()}`;
122
123
it("creates the dkv", async () => {
124
kv = await createDkv({ client, name, config: { max_msg_size: 100 } });
125
expect(kv.getAll()).toEqual({});
126
});
127
128
it("adds a key, then a second big one results in a 'reject' event", async () => {
129
const rejects: { key: string; value: string }[] = [];
130
kv.once("reject", (x) => {
131
rejects.push(x);
132
});
133
kv.a = "x".repeat(50);
134
await kv.save();
135
kv.b = "x".repeat(150);
136
await kv.save();
137
expect(rejects).toEqual([{ key: "b", value: "x".repeat(150) }]);
138
expect(kv.has("b")).toBe(false);
139
});
140
141
it("closes the kv", async () => {
142
await kv.clear();
143
await kv.close();
144
});
145
});
146
147
describe("create a dstream with limit on the total number of messages, and confirm max_msgs, max_age works", () => {
148
let s, s2;
149
const name = `test-${Math.random()}`;
150
151
it("creates the dstream and another with a different client", async () => {
152
s = await createDstream({ client, name, config: { max_msgs: 2 } });
153
s2 = await createDstream({
154
client: connect(),
155
name,
156
config: { max_msgs: 2 },
157
noCache: true,
158
});
159
expect(s.get()).toEqual([]);
160
expect((await s.config()).max_msgs).toBe(2);
161
expect((await s2.config()).max_msgs).toBe(2);
162
});
163
164
it("push 2 messages, then a third, and see first is gone and that this is reflected on both clients", async () => {
165
expect((await s.config()).max_msgs).toBe(2);
166
expect((await s2.config()).max_msgs).toBe(2);
167
s.push("a");
168
s.push("b");
169
await wait({ until: () => s.length == 2 && s2.length == 2 });
170
expect(s2.get()).toEqual(["a", "b"]);
171
s.push("c");
172
await wait({
173
until: () =>
174
s.get(0) != "a" &&
175
s.get(1) == "c" &&
176
s2.get(0) != "a" &&
177
s2.get(1) == "c",
178
});
179
expect(s.getAll()).toEqual(["b", "c"]);
180
expect(s2.getAll()).toEqual(["b", "c"]);
181
182
// also check limits ar enforced if we close, then open new one:
183
await s.close();
184
s = await createDstream({ client, name, config: { max_msgs: 2 } });
185
expect(s.getAll()).toEqual(["b", "c"]);
186
187
await s.config({ max_msgs: -1 });
188
});
189
190
it("verifies that max_age works", async () => {
191
await s.save();
192
expect(s.hasUnsavedChanges()).toBe(false);
193
await delay(1100);
194
// this "new" should be much newer than 1s
195
s.push("new");
196
await s.config({ max_age: 1000 }); // anything older than 1s should be deleted
197
await wait({ until: () => s.length == 1 });
198
expect(s.getAll()).toEqual(["new"]);
199
await s.config({ max_age: -1 });
200
});
201
202
it("verifies that ttl works by writing a message and making sure it gets automatically deleted soon", async () => {
203
const conf = await s.config();
204
expect(conf.allow_msg_ttl).toBe(false);
205
const conf2 = await s.config({ max_age: -1, allow_msg_ttl: true });
206
expect(conf2.allow_msg_ttl).toBe(true);
207
208
s.publish("ttl-message", { ttl: 50 });
209
expect(s.length).toBe(2);
210
await s.save();
211
await wait({
212
until: async () => {
213
await s.config();
214
return s.length == 1;
215
},
216
});
217
expect(s.length).toBe(1);
218
expect(s.get()).toEqual(["new"]);
219
});
220
221
it("verifies that max_bytes works -- publishing something too large causes everything to end up gone", async () => {
222
const conf = await s.config({ max_bytes: 100 });
223
expect(conf.max_bytes).toBe(100);
224
s.publish("x".repeat(1000));
225
await s.config();
226
await wait({ until: () => s.length == 0 });
227
expect(s.length).toBe(0);
228
});
229
230
it("max_bytes -- publish something then another thing that causes the first to get deleted", async () => {
231
s.publish("x".repeat(75));
232
s.publish("y".repeat(90));
233
await wait({
234
until: async () => {
235
await s.config();
236
return s.length == 1;
237
},
238
});
239
expect(s.get()).toEqual(["y".repeat(90)]);
240
await s.config({ max_bytes: -1 });
241
});
242
243
it("verifies that max_msg_size rejects messages that are too big", async () => {
244
await s.config({ max_msg_size: 100 });
245
expect((await s.config()).max_msg_size).toBe(100);
246
s.publish("x".repeat(70));
247
await expect(async () => {
248
await s.stream.publish("x".repeat(150));
249
}).rejects.toThrow("max_msg_size");
250
await s.config({ max_msg_size: 200 });
251
s.publish("x".repeat(150));
252
await s.config({ max_msg_size: -1 });
253
expect((await s.config()).max_msg_size).toBe(-1);
254
});
255
256
it("closes the stream", async () => {
257
await s.close();
258
await s2.close();
259
});
260
});
261
262
describe("create a dstream with limit on max_age, and confirm auto-delete works", () => {
263
let s;
264
const name = `test-${Math.random()}`;
265
266
it("creates the dstream", async () => {
267
s = await createDstream({ client, name, config: { max_age: 50 } });
268
});
269
270
it("push a message, then another and see first disappears", async () => {
271
s.push({ a: 10 });
272
await delay(75);
273
s.push({ b: 20 });
274
expect(s.get()).toEqual([{ a: 10 }, { b: 20 }]);
275
await wait({
276
until: async () => {
277
await s.config();
278
return s.length == 1;
279
},
280
});
281
expect(s.getAll()).toEqual([{ b: 20 }]);
282
});
283
284
it("closes the stream", async () => {
285
await s.delete({ all: true });
286
await s.close();
287
});
288
});
289
290
describe("create a dstream with limit on max_bytes, and confirm auto-delete works", () => {
291
let s;
292
const name = `test-${Math.random()}`;
293
294
it("creates the dstream", async () => {
295
// note: 60 and not 40 due to slack for headers
296
s = await createDstream({ client, name, config: { max_bytes: 60 } });
297
});
298
299
it("push a message, then another and see first disappears", async () => {
300
s.push("x".repeat(40));
301
s.push("x".repeat(45));
302
s.push("x");
303
if (!s.isStable()) {
304
await once(s, "stable");
305
}
306
expect(s.getAll()).toEqual(["x".repeat(45), "x"]);
307
});
308
309
it("closes the stream", async () => {
310
await s.delete({ all: true });
311
await s.close();
312
});
313
});
314
315
describe("create a dstream with limit on max_msg_size, and confirm auto-delete works", () => {
316
let s;
317
const name = `test-${Math.random()}`;
318
319
it("creates the dstream", async () => {
320
s = await createDstream({ client, name, config: { max_msg_size: 50 } });
321
});
322
323
it("push a message, then another and see first disappears", async () => {
324
const rejects: any[] = [];
325
s.on("reject", ({ mesg }) => {
326
rejects.push(mesg);
327
});
328
s.push("x".repeat(40));
329
s.push("y".repeat(60)); // silently vanishes (well a reject event is emitted)
330
s.push("x");
331
await wait({
332
until: async () => {
333
await s.config();
334
return s.length == 2;
335
},
336
});
337
expect(s.getAll()).toEqual(["x".repeat(40), "x"]);
338
expect(rejects).toEqual(["y".repeat(60)]);
339
});
340
341
it("closes the stream", async () => {
342
await s.close();
343
});
344
});
345
346
describe("test discard_policy 'new' where writes are rejected rather than old data being deleted, for max_bytes and max_msgs", () => {
347
let s;
348
const name = `test-${Math.random()}`;
349
350
it("creates the dstream", async () => {
351
s = await createDstream({
352
client,
353
name,
354
// we can write at most 300 bytes and 3 messages. beyond that we
355
// get reject events.
356
config: {
357
max_bytes: 300,
358
max_msgs: 3,
359
discard_policy: "new",
360
desc: { example: "config" },
361
},
362
});
363
const rejects: any[] = [];
364
s.on("reject", ({ mesg }) => {
365
rejects.push(mesg);
366
});
367
s.publish("x");
368
s.publish("y");
369
s.publish("w");
370
s.publish("foo");
371
372
await wait({
373
until: async () => {
374
await s.config();
375
return rejects.length == 1;
376
},
377
});
378
expect(s.getAll()).toEqual(["x", "y", "w"]);
379
expect(rejects).toEqual(["foo"]);
380
381
s.publish("x".repeat(299));
382
await wait({
383
until: async () => {
384
await s.config();
385
return rejects.length == 2;
386
},
387
});
388
expect(s.getAll()).toEqual(["x", "y", "w"]);
389
expect(rejects).toEqual(["foo", "x".repeat(299)]);
390
});
391
392
it("check the config is persisted", async () => {
393
const lastConfig = await s.config();
394
s.close();
395
s = await createDstream({
396
client,
397
name,
398
noCache: true,
399
});
400
const config = await s.config();
401
expect(lastConfig).toEqual(config);
402
expect(lastConfig.desc).toEqual({ example: "config" });
403
});
404
405
it("closes the stream", async () => {
406
s.close();
407
});
408
});
409
410
describe("test rate limiting", () => {
411
let s;
412
const name = `test-${Math.random()}`;
413
414
it("creates the dstream", async () => {
415
s = await createDstream({
416
client,
417
name,
418
// we can write at most 300 bytes and 3 messages. beyond that we
419
// get reject events.
420
config: {
421
max_bytes_per_second: 300,
422
max_msgs_per_second: 3,
423
discard_policy: "new",
424
},
425
});
426
const rejects: any[] = [];
427
s.on("reject", ({ mesg }) => {
428
rejects.push(mesg);
429
});
430
});
431
432
it("closes the stream", async () => {
433
await s.close();
434
});
435
});
436
437
import { EPHEMERAL_MAX_BYTES } from "@cocalc/conat/persist/storage";
438
describe(`ephemeral streams always have a hard cap of ${EPHEMERAL_MAX_BYTES} on max_bytes `, () => {
439
let s;
440
it("creates a non-ephemeral dstream and checks no automatic max_bytes set", async () => {
441
const s1 = await createDstream({
442
client,
443
name: "test-NON-ephemeral",
444
ephemeral: false,
445
});
446
expect((await s1.config()).max_bytes).toBe(-1);
447
s1.close();
448
});
449
450
it("creates an ephemeral dstream and checks max bytes automatically set", async () => {
451
s = await createDstream({
452
client,
453
name: "test-ephemeral",
454
ephemeral: true,
455
});
456
expect((await s.config()).max_bytes).toBe(EPHEMERAL_MAX_BYTES);
457
});
458
459
it("trying to set larger doesn't work", async () => {
460
expect(
461
(await s.config({ max_bytes: 2 * EPHEMERAL_MAX_BYTES })).max_bytes,
462
).toBe(EPHEMERAL_MAX_BYTES);
463
expect((await s.config()).max_bytes).toBe(EPHEMERAL_MAX_BYTES);
464
});
465
466
it("setting it smaller is allowed", async () => {
467
expect(
468
(await s.config({ max_bytes: EPHEMERAL_MAX_BYTES / 2 })).max_bytes,
469
).toBe(EPHEMERAL_MAX_BYTES / 2);
470
expect((await s.config()).max_bytes).toBe(EPHEMERAL_MAX_BYTES / 2);
471
});
472
});
473
474
afterAll(after);
475
476