CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
sagemathinc

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/jupyter/blobs/disk.ts
Views: 687
1
/*
2
* This file is part of CoCalc: Copyright © 2023 Sagemath, Inc.
3
* License: MS-RSL – see LICENSE.md for details
4
*/
5
6
import { reuseInFlight } from "@cocalc/util/reuse-in-flight";
7
import LRU from "lru-cache";
8
import { readFileSync, statSync, writeFileSync } from "node:fs";
9
import { mkdir, readFile, readdir, stat, unlink } from "node:fs/promises";
10
import { homedir } from "node:os";
11
import { join } from "node:path";
12
import { brotliCompressSync, brotliDecompressSync } from "node:zlib";
13
14
import Logger from "@cocalc/backend/logger";
15
import { envToInt } from "@cocalc/backend/misc/env-to-number";
16
import { touch } from "@cocalc/backend/misc/touch";
17
import { sha1 } from "@cocalc/backend/sha1";
18
import type { BlobStoreInterface } from "@cocalc/jupyter/types/project-interface";
19
import { BASE64_TYPES } from "./get";
20
21
const { debug: D, info: I, warn: W } = Logger("jupyter-blobs:disk");
22
23
// the directory where files are stored. by default, in the home directory
24
// in ~/.cache/cocalc/blobs. The path can be overwritten by setting the
25
// environment variable JUPYTER_BLOBS_DB_DIR.
26
27
const BLOB_DIR =
28
process.env["JUPYTER_BLOBS_DB_DIR"] ?? join(homedir(), ".cache/cocalc/blobs");
29
30
// read the integer from JUPYTER_BLOBS_DB_DIR_PRUNE_SIZE_MB, or default to 200
31
const PRUNE_SIZE_MB = envToInt("JUPYTER_BLOBSTORE_DISK_PRUNE_SIZE_MB", 100);
32
const PRUNE_ENTRIES = envToInt("JUPYTER_BLOBSTORE_DISK_PRUNE_ENTRIES", 200);
33
34
interface FStat {
35
mtime: number;
36
size: number;
37
}
38
39
const cache = new LRU<string, FStat>({
40
max: 2 * PRUNE_ENTRIES,
41
});
42
43
async function getStats(path: string): Promise<FStat> {
44
const ret = cache.get(path);
45
if (ret != null) return ret;
46
const stats = await stat(path);
47
const info = { mtime: stats.mtime.getTime(), size: stats.size };
48
cache.set(path, info);
49
return info;
50
}
51
52
// The JSON-serizalized and compressed structure we store per entry.
53
interface Data {
54
ipynb?: string;
55
type?: string;
56
data?: string;
57
}
58
59
export class BlobStoreDisk implements BlobStoreInterface {
60
private hashLength: number;
61
private haveSavedMB: number = 0;
62
private haveSavedCount: number = 0;
63
64
constructor() {
65
this.prune = reuseInFlight(this.prune.bind(this));
66
this.hashLength = sha1("test").length;
67
}
68
69
public async init() {
70
D(
71
`initializing blob store in ${BLOB_DIR} with prune params: size=${PRUNE_SIZE_MB}MB and max entries=${PRUNE_ENTRIES}`
72
);
73
try {
74
await mkdir(BLOB_DIR, { recursive: true });
75
// call this.prune in 1 minute
76
setTimeout(() => this.prune(), 60 * 1000);
77
D(`successfully initialized blob store`);
78
} catch (err) {
79
W(`failed to initialize blob store: ${err}`);
80
throw err;
81
}
82
}
83
84
private async getAllFiles() {
85
const files = await readdir(BLOB_DIR);
86
return files.filter((file) => file.length === this.hashLength);
87
}
88
89
public async delete_all_blobs(): Promise<number> {
90
let deletedFiles = 0;
91
for (const file of await this.getAllFiles()) {
92
deletedFiles += await this.delete(join(BLOB_DIR, file));
93
}
94
return deletedFiles;
95
}
96
97
// we compute the median of all mtimes and delete files older than that.
98
// @return the number of deleted files
99
private async deleteOldFiles(): Promise<number> {
100
const allFiles = await this.getAllFiles();
101
if (allFiles.length <= 5) {
102
return await this.delete_all_blobs();
103
}
104
const times: number[] = [];
105
for (const fn of allFiles) {
106
times.push((await getStats(join(BLOB_DIR, fn))).mtime);
107
}
108
const sorted = times.sort();
109
const median = sorted[Math.floor(sorted.length / 2)];
110
const filesToDelete = allFiles.filter(
111
(file) => (cache.get(join(BLOB_DIR, file))?.mtime ?? median) < median
112
);
113
let filesDeleted = 0;
114
for (const file of filesToDelete) {
115
const path = join(BLOB_DIR, file);
116
filesDeleted += await this.delete(path);
117
}
118
return filesDeleted;
119
}
120
121
// NOTE: this is wrapped in a reuseInFlight, so it only runs once at a time
122
private async prune() {
123
let deletedFiles = 0;
124
let numberGood = true;
125
let sizeGood = true;
126
127
// for up to 3 times we try to prune
128
for (let i = 0; i < 3; i++) {
129
const allFiles = await this.getAllFiles();
130
numberGood = allFiles.length < PRUNE_ENTRIES;
131
if (!numberGood) {
132
D(`prune: ${allFiles.length} are too many files`);
133
deletedFiles += await this.deleteOldFiles();
134
continue;
135
}
136
137
let totalSize = 0;
138
for (const fn of allFiles) {
139
const stats = await getStats(join(BLOB_DIR, fn));
140
totalSize += stats.size;
141
sizeGood = totalSize < PRUNE_SIZE_MB * 1024 * 1024;
142
if (!sizeGood) {
143
D(`prune: ${totalSize}mb is too much size`);
144
deletedFiles += await this.deleteOldFiles();
145
continue;
146
}
147
}
148
149
if (sizeGood && numberGood) {
150
D(`prune: deleted ${deletedFiles} files`);
151
return;
152
}
153
}
154
155
// not all good after three tries, so delete everything
156
if (!sizeGood || !numberGood) {
157
deletedFiles += await this.delete_all_blobs();
158
D(`prune/everything: deleted ${deletedFiles} files`);
159
}
160
}
161
162
public async keys(): Promise<string[]> {
163
return await this.getAllFiles();
164
}
165
166
// TODO: this is synchroneous.
167
// Changing it to async would be great, but needs a lot of additional work in the frontend.
168
public save(data, type, ipynb?): string {
169
const hash = sha1(data);
170
const path = join(BLOB_DIR, hash);
171
172
// JSON serialize the data, type and ipynb and compress using brotliCompress
173
const raw: Data = { data, type, ipynb };
174
const ser = brotliCompressSync(JSON.stringify(raw));
175
176
// replaces the file if it already exists
177
writeFileSync(path, ser);
178
179
// add size of path to haveSavedMB
180
const stats = statSync(path);
181
this.haveSavedMB += stats.size / 1024 / 1024;
182
this.haveSavedCount += 1;
183
D(
184
`Saved ${hash} successfully. haveSavedMB=${this.haveSavedMB}, haveSavedCount=${this.haveSavedCount}`
185
);
186
this.checkPrune();
187
return hash;
188
}
189
190
// prune, if we are at most 20% over
191
private async checkPrune() {
192
if (
193
this.haveSavedMB > PRUNE_SIZE_MB / 5 ||
194
this.haveSavedCount > PRUNE_ENTRIES / 5
195
) {
196
try {
197
await this.prune();
198
this.haveSavedMB = 0;
199
this.haveSavedCount = 0;
200
} catch (err) {
201
W(`failed to prune: ${err}`);
202
}
203
}
204
}
205
206
private getData(sha1: string): Data | undefined {
207
// read the sha1 named file, decrompess it, and return it
208
const path = join(BLOB_DIR, sha1);
209
try {
210
const buf = brotliDecompressSync(readFileSync(path));
211
touch(path, false); // we don't wait for this to finish
212
return JSON.parse(buf.toString());
213
} catch (err) {
214
I(`failed to get blob ${sha1}: ${err}`);
215
this.delete(path);
216
return undefined;
217
}
218
}
219
220
private async delete(path: string): Promise<0 | 1> {
221
try {
222
await unlink(path);
223
cache.delete(path);
224
return 1;
225
} catch {}
226
return 0;
227
}
228
229
public get(sha1: string): Buffer | undefined {
230
const row = this.getData(sha1);
231
if (row?.data == null) return;
232
return this.encodeData(row.data, row.type);
233
}
234
235
public get_ipynb(sha1: string): any {
236
const row = this.getData(sha1);
237
if (row == null) return;
238
if (row.ipynb != null) return row.ipynb;
239
if (row.data != null) return row.data;
240
}
241
242
private encodeData(data: string, type?: string): Buffer {
243
if (typeof type === "string" && BASE64_TYPES.includes(type as any)) {
244
return Buffer.from(data, "base64");
245
} else {
246
return Buffer.from(data);
247
}
248
}
249
250
// Read a file from disk and save it in the database.
251
// Returns the sha1 hash of the file.
252
async readFile(path: string, type: string): Promise<string> {
253
return await this.save(await readFile(path), type);
254
}
255
}
256
257