Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/backend/conat/test/persist/backup-archive.test.ts
1712 views
1
/*
2
Testing automatic tiered storage and backup persistence functionality.
3
*/
4
5
import {
6
before,
7
after,
8
delay,
9
client,
10
wait,
11
} from "@cocalc/backend/conat/test/setup";
12
import { stream } from "@cocalc/conat/persist/client";
13
import { syncFiles } from "@cocalc/conat/persist/context";
14
import { pathExists } from "fs-extra";
15
import { join } from "path";
16
import * as fs from "fs/promises";
17
import { messageData } from "@cocalc/conat/core/client";
18
import sqlite from "better-sqlite3";
19
import { openPaths } from "@cocalc/conat/persist/storage";
20
21
beforeAll(async () => {
22
await before({ archive: "archive", backup: "backup", archiveInterval: 250 });
23
});
24
25
describe("create persist server that also saves data to an archive folder and a backup folder", () => {
26
it("verify that archive, backup and archiveInterval are all configured", async () => {
27
expect(syncFiles.archive).toContain("archive");
28
expect(syncFiles.archiveInterval).toBeGreaterThan(0);
29
expect(syncFiles.backup).toContain("backup");
30
});
31
32
async function waitUntilClosed() {
33
await wait({
34
until: () => {
35
return !openPaths.has(join(syncFiles.local, "hub/foo"));
36
},
37
});
38
}
39
40
let s1;
41
it("create a new stream", async () => {
42
s1 = stream({
43
client,
44
user: { hub_id: "x" },
45
storage: { path: "hub/foo" },
46
});
47
await s1.set({
48
key: "my-key-1",
49
messageData: messageData("one"),
50
});
51
});
52
53
let local, archive, backup;
54
it(`wait, then there is an updated archive file too`, async () => {
55
((local = join(syncFiles.local, "hub/foo.db")),
56
(archive = join(syncFiles.archive, "hub/foo.db")),
57
(backup = join(syncFiles.backup, "hub/foo.db")),
58
expect(await pathExists(local)).toBe(true));
59
// gets created initially
60
expect(await pathExists(archive)).toBe(true);
61
// backup should only exist when stream is closed
62
expect(await pathExists(backup)).toBe(false);
63
64
// timestamp before another write
65
const stats = await fs.stat(archive);
66
67
await s1.set({
68
key: "my-key-2",
69
messageData: messageData("two"),
70
});
71
// now wait to ensure archive gets written
72
73
await delay(syncFiles.archiveInterval + 100);
74
expect(await pathExists(archive)).toBe(true);
75
const stats2 = await fs.stat(archive);
76
expect(stats2.mtimeMs).not.toEqual(stats.mtimeMs);
77
});
78
79
it("close the stream and see that the backup and archive are both written, even though we didn't wait the full archive interval", async () => {
80
s1.close();
81
const t = Date.now();
82
await wait({
83
until: async () => await pathExists(backup),
84
});
85
expect(Date.now() - t).toBeLessThan(syncFiles.archiveInterval);
86
expect(await pathExists(backup)).toBe(true);
87
// at this point the actual sqlite3 database should be closed
88
});
89
90
it("the backup, archive, and local files should all be identical as sqlite database", async () => {
91
// they are not the same as files though so we need some care to compare them.
92
expect(await serialize(local)).toEqual(await serialize(backup));
93
expect(await serialize(archive)).toEqual(await serialize(backup));
94
});
95
96
it("delete the local copy and open stream, the data is still available", async () => {
97
await fs.unlink(local);
98
99
s1 = stream({
100
client,
101
user: { hub_id: "x" },
102
storage: { path: "hub/foo" },
103
});
104
const mesg = await s1.get({ key: "my-key-1" });
105
expect(mesg.data).toBe("one");
106
107
await s1.set({
108
key: "my-key-3",
109
messageData: messageData("three"),
110
});
111
112
s1.close();
113
await waitUntilClosed();
114
});
115
116
it("delete the archive copy and open stream, the data is still available because local is used", async () => {
117
await fs.unlink(archive);
118
119
s1 = stream({
120
client,
121
user: { hub_id: "x" },
122
storage: { path: "hub/foo" },
123
});
124
const mesg = await s1.get({ key: "my-key-3" });
125
expect(mesg.data).toBe("three");
126
127
s1.close();
128
await waitUntilClosed();
129
});
130
131
it("all should identical again sqlite database", async () => {
132
// they are not the same as files though so we need some care to compare them.
133
expect(await serialize(local)).toEqual(await serialize(backup));
134
expect(await serialize(archive)).toEqual(await serialize(backup));
135
});
136
137
it("if both archive and local exist and local is newer, it is used", async () => {
138
// grab copy of local
139
const copy = local + ".copy";
140
await fs.copyFile(local, copy);
141
142
s1 = stream({
143
client,
144
user: { hub_id: "x" },
145
storage: { path: "hub/foo" },
146
});
147
await s1.set({
148
key: "my-key-4",
149
messageData: messageData("four"),
150
});
151
fs.unlink(backup);
152
s1.close();
153
await wait({
154
until: async () => await pathExists(backup),
155
});
156
157
// ensure the old copy of local is the newer one by making archive old
158
await fs.copyFile(copy, local);
159
await fs.utimes(
160
archive,
161
Date.now() / 1000 - 100_000,
162
Date.now() / 1000 - 100_000,
163
);
164
s1 = stream({
165
client,
166
user: { hub_id: "x" },
167
storage: { path: "hub/foo" },
168
});
169
expect((await s1.get({ key: "my-key-4" }))?.data).toEqual(undefined);
170
171
s1.close();
172
await waitUntilClosed();
173
});
174
175
it("if both archive and local exist and archive is newer, then archive is used", async () => {
176
// grab copy of archive
177
const copy = archive + ".copy";
178
await fs.copyFile(archive, copy);
179
180
s1 = stream({
181
client,
182
user: { hub_id: "x" },
183
storage: { path: "hub/foo" },
184
});
185
await s1.set({
186
key: "my-key-5",
187
messageData: messageData("five"),
188
});
189
s1.close();
190
await waitUntilClosed();
191
192
// ensure the old copy of archive is the newer one by making local old
193
await fs.copyFile(copy, archive);
194
await fs.utimes(
195
local,
196
Date.now() / 1000 - 100_000,
197
Date.now() / 1000 - 100_000,
198
);
199
s1 = stream({
200
client,
201
user: { hub_id: "x" },
202
storage: { path: "hub/foo" },
203
});
204
expect((await s1.get({ key: "my-key-5" }))?.data).toEqual(undefined);
205
206
s1.close();
207
await waitUntilClosed();
208
});
209
210
it("another check all are equal now", async () => {
211
//console.log("checking equality");
212
expect(await serialize(local)).toEqual(await serialize(backup));
213
expect(await serialize(archive)).toEqual(await serialize(backup));
214
});
215
216
it("deletes local and archive but not backup -- data is NOT available", async () => {
217
await fs.unlink(local);
218
await fs.unlink(archive);
219
s1 = stream({
220
client,
221
user: { hub_id: "x" },
222
storage: { path: "hub/foo" },
223
});
224
expect((await s1.get({ key: "my-key-1" }))?.data).toEqual(undefined);
225
});
226
});
227
228
async function serialize(path: string): Promise<string> {
229
while (true) {
230
const db = new sqlite(path);
231
try {
232
const x = JSON.stringify({
233
messages: db.prepare("select * from messages").all(),
234
config: db.prepare("select * from config").all(),
235
});
236
db.close();
237
return x;
238
} catch (err) {
239
console.log(err);
240
}
241
await delay(50);
242
}
243
}
244
245
afterAll(async () => {
246
after();
247
});
248
249