Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/jupyter/blobs/disk.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2023 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45import { reuseInFlight } from "@cocalc/util/reuse-in-flight";6import LRU from "lru-cache";7import { readFileSync, statSync, writeFileSync } from "node:fs";8import { mkdir, readFile, readdir, stat, unlink } from "node:fs/promises";9import { homedir } from "node:os";10import { join } from "node:path";11import { brotliCompressSync, brotliDecompressSync } from "node:zlib";1213import Logger from "@cocalc/backend/logger";14import { envToInt } from "@cocalc/backend/misc/env-to-number";15import { touch } from "@cocalc/backend/misc/touch";16import { sha1 } from "@cocalc/backend/sha1";17import type { BlobStoreInterface } from "@cocalc/jupyter/types/project-interface";18import { BASE64_TYPES } from "./get";1920const { debug: D, info: I, warn: W } = Logger("jupyter-blobs:disk");2122// the directory where files are stored. by default, in the home directory23// in ~/.cache/cocalc/blobs. The path can be overwritten by setting the24// environment variable JUPYTER_BLOBS_DB_DIR.2526const BLOB_DIR =27process.env["JUPYTER_BLOBS_DB_DIR"] ?? join(homedir(), ".cache/cocalc/blobs");2829// read the integer from JUPYTER_BLOBS_DB_DIR_PRUNE_SIZE_MB, or default to 20030const PRUNE_SIZE_MB = envToInt("JUPYTER_BLOBSTORE_DISK_PRUNE_SIZE_MB", 100);31const PRUNE_ENTRIES = envToInt("JUPYTER_BLOBSTORE_DISK_PRUNE_ENTRIES", 200);3233interface FStat {34mtime: number;35size: number;36}3738const cache = new LRU<string, FStat>({39max: 2 * PRUNE_ENTRIES,40});4142async function getStats(path: string): Promise<FStat> {43const ret = cache.get(path);44if (ret != null) return ret;45const stats = await stat(path);46const info = { mtime: stats.mtime.getTime(), size: stats.size };47cache.set(path, info);48return info;49}5051// The JSON-serizalized and compressed structure we store per entry.52interface Data {53ipynb?: string;54type?: string;55data?: string;56}5758export class BlobStoreDisk implements BlobStoreInterface {59private hashLength: number;60private haveSavedMB: number = 0;61private haveSavedCount: number = 0;6263constructor() {64this.prune = reuseInFlight(this.prune.bind(this));65this.hashLength = sha1("test").length;66}6768public async init() {69D(70`initializing blob store in ${BLOB_DIR} with prune params: size=${PRUNE_SIZE_MB}MB and max entries=${PRUNE_ENTRIES}`71);72try {73await mkdir(BLOB_DIR, { recursive: true });74// call this.prune in 1 minute75setTimeout(() => this.prune(), 60 * 1000);76D(`successfully initialized blob store`);77} catch (err) {78W(`failed to initialize blob store: ${err}`);79throw err;80}81}8283private async getAllFiles() {84const files = await readdir(BLOB_DIR);85return files.filter((file) => file.length === this.hashLength);86}8788public async delete_all_blobs(): Promise<number> {89let deletedFiles = 0;90for (const file of await this.getAllFiles()) {91deletedFiles += await this.delete(join(BLOB_DIR, file));92}93return deletedFiles;94}9596// we compute the median of all mtimes and delete files older than that.97// @return the number of deleted files98private async deleteOldFiles(): Promise<number> {99const allFiles = await this.getAllFiles();100if (allFiles.length <= 5) {101return await this.delete_all_blobs();102}103const times: number[] = [];104for (const fn of allFiles) {105times.push((await getStats(join(BLOB_DIR, fn))).mtime);106}107const sorted = times.sort();108const median = sorted[Math.floor(sorted.length / 2)];109const filesToDelete = allFiles.filter(110(file) => (cache.get(join(BLOB_DIR, file))?.mtime ?? median) < median111);112let filesDeleted = 0;113for (const file of filesToDelete) {114const path = join(BLOB_DIR, file);115filesDeleted += await this.delete(path);116}117return filesDeleted;118}119120// NOTE: this is wrapped in a reuseInFlight, so it only runs once at a time121private async prune() {122let deletedFiles = 0;123let numberGood = true;124let sizeGood = true;125126// for up to 3 times we try to prune127for (let i = 0; i < 3; i++) {128const allFiles = await this.getAllFiles();129numberGood = allFiles.length < PRUNE_ENTRIES;130if (!numberGood) {131D(`prune: ${allFiles.length} are too many files`);132deletedFiles += await this.deleteOldFiles();133continue;134}135136let totalSize = 0;137for (const fn of allFiles) {138const stats = await getStats(join(BLOB_DIR, fn));139totalSize += stats.size;140sizeGood = totalSize < PRUNE_SIZE_MB * 1024 * 1024;141if (!sizeGood) {142D(`prune: ${totalSize}mb is too much size`);143deletedFiles += await this.deleteOldFiles();144continue;145}146}147148if (sizeGood && numberGood) {149D(`prune: deleted ${deletedFiles} files`);150return;151}152}153154// not all good after three tries, so delete everything155if (!sizeGood || !numberGood) {156deletedFiles += await this.delete_all_blobs();157D(`prune/everything: deleted ${deletedFiles} files`);158}159}160161public async keys(): Promise<string[]> {162return await this.getAllFiles();163}164165// TODO: this is synchroneous.166// Changing it to async would be great, but needs a lot of additional work in the frontend.167public save(data, type, ipynb?): string {168const hash = sha1(data);169const path = join(BLOB_DIR, hash);170171// JSON serialize the data, type and ipynb and compress using brotliCompress172const raw: Data = { data, type, ipynb };173const ser = brotliCompressSync(JSON.stringify(raw));174175// replaces the file if it already exists176writeFileSync(path, ser);177178// add size of path to haveSavedMB179const stats = statSync(path);180this.haveSavedMB += stats.size / 1024 / 1024;181this.haveSavedCount += 1;182D(183`Saved ${hash} successfully. haveSavedMB=${this.haveSavedMB}, haveSavedCount=${this.haveSavedCount}`184);185this.checkPrune();186return hash;187}188189// prune, if we are at most 20% over190private async checkPrune() {191if (192this.haveSavedMB > PRUNE_SIZE_MB / 5 ||193this.haveSavedCount > PRUNE_ENTRIES / 5194) {195try {196await this.prune();197this.haveSavedMB = 0;198this.haveSavedCount = 0;199} catch (err) {200W(`failed to prune: ${err}`);201}202}203}204205private getData(sha1: string): Data | undefined {206// read the sha1 named file, decrompess it, and return it207const path = join(BLOB_DIR, sha1);208try {209const buf = brotliDecompressSync(readFileSync(path));210touch(path, false); // we don't wait for this to finish211return JSON.parse(buf.toString());212} catch (err) {213I(`failed to get blob ${sha1}: ${err}`);214this.delete(path);215return undefined;216}217}218219private async delete(path: string): Promise<0 | 1> {220try {221await unlink(path);222cache.delete(path);223return 1;224} catch {}225return 0;226}227228public get(sha1: string): Buffer | undefined {229const row = this.getData(sha1);230if (row?.data == null) return;231return this.encodeData(row.data, row.type);232}233234public get_ipynb(sha1: string): any {235const row = this.getData(sha1);236if (row == null) return;237if (row.ipynb != null) return row.ipynb;238if (row.data != null) return row.data;239}240241private encodeData(data: string, type?: string): Buffer {242if (typeof type === "string" && BASE64_TYPES.includes(type as any)) {243return Buffer.from(data, "base64");244} else {245return Buffer.from(data);246}247}248249// Read a file from disk and save it in the database.250// Returns the sha1 hash of the file.251async readFile(path: string, type: string): Promise<string> {252return await this.save(await readFile(path), type);253}254}255256257