Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/sync-fs/lib/index.ts
Views: 687
/*1* This file is part of CoCalc: Copyright © 2020 Sagemath, Inc.2* License: MS-RSL – see LICENSE.md for details3*/45import {6copyFile,7mkdir,8open,9rename,10rm,11stat,12writeFile,13} from "fs/promises";14import { basename, dirname, join } from "path";15import type { FilesystemState /*FilesystemStatePatch*/ } from "./types";16import { execa, mtimeDirTree, parseCommonPrefixes, remove } from "./util";17import { toCompressedJSON } from "./compressed-json";18import SyncClient from "@cocalc/sync-client/lib/index";19import { encodeIntToUUID } from "@cocalc/util/compute/manager";20import getLogger from "@cocalc/backend/logger";21import { apiCall } from "@cocalc/api-client";22import mkdirp from "mkdirp";23import { throttle } from "lodash";24import { trunc_middle } from "@cocalc/util/misc";25import getListing from "@cocalc/backend/get-listing";26import { executeCode } from "@cocalc/backend/execute-code";27import { delete_files } from "@cocalc/backend/files/delete-files";28import { move_files } from "@cocalc/backend/files/move-files";29import { rename_file } from "@cocalc/backend/files/rename-file";30import ensureContainingDirectoryExists from "@cocalc/backend/misc/ensure-containing-directory-exists";3132const EXPLICIT_HIDDEN_EXCLUDES = [".cache", ".local"];3334const log = getLogger("sync-fs:index").debug;35const REGISTER_INTERVAL_MS = 30000;3637export default function syncFS(opts: Options) {38log("syncFS: ", opts);39return new SyncFS(opts);40}4142type State = "init" | "ready" | "sync" | "closed";4344interface Options {45lower: string;46upper: string;47mount: string;48project_id: string;49compute_server_id: number;50// sync at most every this many seconds51syncIntervalMin?: number;52// but up to this long if there is no activity (exponential backoff)53syncIntervalMax?: number;54// list of top-level directory names that are excluded from sync.55// do not use wildcards.56// RECOMMEND: hidden files in HOME should be excluded, which you can do by including "./*"57// ALSO: if you have "~" or "." in the exclude array, then sync is completely disabled.58exclude?: string[];59readTrackingFile?: string;60tar: { send; get };61compression?: "lz4"; // default 'lz4'62data?: string; // absolute path to data directory (default: /data)63role;64}6566const UNIONFS = ".unionfs-fuse";67// Do not make this too short, since every time it happens, the project has to68// do a find scan, which can take some resources!69const DEFAULT_SYNC_INTERVAL_MIN_S = 10;70// no idea what this *should* be.71const DEFAULT_SYNC_INTERVAL_MAX_S = 30;7273// if sync fails this many times in a row, then we pause syncing until the user74// explicitly re-enables it. We have to do this, since the failure mode could75// result in massive bandwidth usage.76const MAX_FAILURES_IN_A_ROW = 3;7778class SyncFS {79private state: State = "init";80private lower: string;81private upper: string;82private mount: string;83private data: string;84private project_id: string;85private compute_server_id: number;86private syncInterval: number;87private registerToSyncInterval?;88private syncIntervalMin: number;89private syncIntervalMax: number;90private exclude: string[];91private readTrackingFile?: string;92private scratch: string;93private error_txt: string;94private tar: { send; get };95// number of failures in a row to sync.96private numFails: number = 0;9798private client: SyncClient;99100private timeout;101private websocket?;102103constructor({104lower,105upper,106mount,107project_id,108compute_server_id,109syncIntervalMin = DEFAULT_SYNC_INTERVAL_MIN_S,110syncIntervalMax = DEFAULT_SYNC_INTERVAL_MAX_S,111exclude = [],112readTrackingFile,113tar,114compression = "lz4",115data = "/data",116role,117}: Options) {118this.lower = lower;119this.upper = upper;120this.mount = mount;121this.data = data;122this.project_id = project_id;123this.compute_server_id = compute_server_id;124this.exclude = exclude;125this.syncInterval = syncIntervalMin;126this.syncIntervalMin = syncIntervalMin;127this.syncIntervalMax = syncIntervalMax;128this.readTrackingFile = readTrackingFile;129this.scratch = join(130this.lower,131".compute-servers",132`${this.compute_server_id}`,133);134this.client = new SyncClient({135project_id: this.project_id,136client_id: encodeIntToUUID(this.compute_server_id),137role,138});139this.state = "ready";140this.error_txt = join(this.scratch, "error.txt");141if (!compression) {142this.tar = tar;143} else if (compression == "lz4") {144const alter = (v) => ["-I", "lz4"].concat(v);145this.tar = {146send: async ({ createArgs, extractArgs, HOME }) => {147createArgs = alter(createArgs);148extractArgs = alter(extractArgs);149await tar.send({ createArgs, extractArgs, HOME });150},151get: async ({ createArgs, extractArgs, HOME }) => {152createArgs = alter(createArgs);153extractArgs = alter(extractArgs);154await tar.get({ createArgs, extractArgs, HOME });155},156};157} else {158throw Error(`invalid compression: '${compression}'`);159}160}161162init = async () => {163await this.mountUnionFS();164await this.bindMountExcludes();165await this.makeScratchDir();166try {167await rm(this.error_txt);168} catch (_) {}169await this.initSyncRequestHandler();170await this.syncLoop();171};172173close = async () => {174log("close");175if (this.state == "closed") {176return;177}178this.state = "closed";179if (this.timeout != null) {180clearTimeout(this.timeout);181delete this.timeout;182}183if (this.registerToSyncInterval) {184clearInterval(this.registerToSyncInterval);185delete this.registerToSyncInterval;186}187const args = ["-uz", this.mount];188log("fusermount", args.join(" "));189try {190await execa("fusermount", args);191} catch (err) {192log("fusermount fail -- ", err);193}194try {195await this.unmountExcludes();196} catch (err) {197log("unmountExcludes fail -- ", err);198}199this.websocket?.removeListener("data", this.handleApiRequest);200this.websocket?.removeListener("state", this.registerToSync);201};202203// The sync api listens on the project websocket for requests204// to do a sync. There's no response (for now).205// Project --> ComputeServer: "heh, please do a sync now"206private initSyncRequestHandler = async () => {207log("initSyncRequestHandler: installing sync request handler");208this.websocket = await this.client.project_client.websocket(209this.project_id,210);211this.websocket.on("data", this.handleApiRequest);212log("initSyncRequestHandler: installed handler");213this.registerToSync();214// We use *both* a period interval and websocket state change,215// since we can't depend on just the state change to always216// be enough, unfortunately... :-(217this.registerToSyncInterval = setInterval(218this.registerToSync,219REGISTER_INTERVAL_MS,220);221this.websocket.on("state", this.registerToSync);222};223224private registerToSync = async (state = "online") => {225if (state != "online") return;226try {227log("registerToSync: registering");228const api = await this.client.project_client.api(this.project_id);229await api.computeServerSyncRegister(this.compute_server_id);230await apiCall("v2/compute/set-detailed-state", {231id: this.compute_server_id,232state: "ready",233progress: 100,234name: "filesystem",235timeout: Math.round(REGISTER_INTERVAL_MS / 1000) + 15,236});237log("registerToSync: registered");238} catch (err) {239log("registerToSync: ERROR -- ", err);240}241};242243private handleApiRequest = async (data) => {244try {245log("handleApiRequest:", { data });246const resp = await this.doApiRequest(data);247log("handleApiRequest: ", { resp });248if (data.id && this.websocket != null) {249this.websocket.write({250id: data.id,251resp,252});253}254} catch (err) {255// console.trace(err);256const error = `${err}`;257if (data.id && this.websocket != null) {258log("handleApiRequest: returning error", { event: data?.event, error });259this.websocket.write({260id: data.id,261error,262});263} else {264log("handleApiRequest: ignoring error", { event: data?.event, error });265}266}267};268269private doApiRequest = async (data) => {270log("doApiRequest", { data });271switch (data?.event) {272case "compute_server_sync_request":273try {274if (this.state == "sync") {275// already in progress276return;277}278if (!this.syncIsDisabled()) {279await this.sync();280}281log("doApiRequest: sync worked");282} catch (err) {283log("doApiRequest: sync failed", err);284}285return;286287case "copy_from_project_to_compute_server":288case "copy_from_compute_server_to_project": {289const extractArgs = ["-x"];290extractArgs.push("-C");291extractArgs.push(data.dest ? data.dest : ".");292const HOME = this.mount;293for (const { prefix, paths } of parseCommonPrefixes(data.paths)) {294const createArgs = ["-c", "-C", prefix, ...paths];295log({ extractArgs, createArgs });296if (data.event == "copy_from_project_to_compute_server") {297await this.tar.get({298createArgs,299extractArgs,300HOME,301});302} else if (data.event == "copy_from_compute_server_to_project") {303await this.tar.send({304createArgs,305extractArgs,306HOME,307});308} else {309// impossible310throw Error(`bug -- invalid event ${data.event}`);311}312}313return;314}315316case "listing":317return await getListing(data.path, data.hidden, this.mount);318319case "exec":320if (data.opts.command == "cc-new-file") {321// so we don't have to depend on having our cc-new-file script322// installed. We just don't support templates on compute server.323for (const path of data.opts.args ?? []) {324const target = join(this.mount, path);325await ensureContainingDirectoryExists(target);326await writeFile(target, "");327}328return { status: 0, stdout: "", stderr: "" };329}330return await executeCode({ ...data.opts, home: this.mount });331332case "delete_files":333return await delete_files(data.paths, this.mount);334335case "move_files":336return await move_files(337data.paths,338data.dest,339(path) => this.client.set_deleted(path),340this.mount,341);342case "rename_file":343return await rename_file(344data.src,345data.dest,346(path) => this.client.set_deleted(path),347this.mount,348);349350default:351throw Error(`unknown event '${data?.event}'`);352}353};354355private mountUnionFS = async () => {356// NOTE: allow_other is essential to allow bind mounted as root357// of fast scratch directories into HOME!358// unionfs-fuse -o allow_other,auto_unmount,nonempty,large_read,cow,max_files=32768 /upper=RW:/home/user=RO /merged359await execa("unionfs-fuse", [360"-o",361"allow_other,auto_unmount,nonempty,large_read,cow,max_files=32768",362`${this.upper}=RW:${this.lower}=RO`,363this.mount,364]);365};366367private shouldMountExclude = (path) => {368return (369path &&370!path.startsWith(".") &&371!path.startsWith("/") &&372path != "~" &&373!path.includes("/")374);375};376377private unmountExcludes = async () => {378for (const path of this.exclude) {379if (this.shouldMountExclude(path)) {380try {381const target = join(this.mount, path);382log("unmountExcludes -- unmounting", { target });383await execa("sudo", ["umount", target]);384} catch (err) {385log("unmountExcludes -- warning ", err);386}387}388}389};390391private bindMountExcludes = async () => {392// Setup bind mounds for each excluded directory, e.g.,393// mount --bind /data/scratch /home/user/scratch394for (const path of this.exclude) {395if (this.shouldMountExclude(path)) {396log("bindMountExcludes -- mounting", { path });397const source = join(this.data, path);398const target = join(this.mount, path);399const upper = join(this.upper, path);400log("bindMountExcludes -- mounting", { source, target });401await mkdirp(source);402// Yes, we have to mkdir in the upper level of the unionfs, because403// we excluded this path from the websocketfs metadataFile caching.404await mkdirp(upper);405await execa("sudo", ["mount", "--bind", source, target]);406} else {407log("bindMountExcludes -- skipping", { path });408}409}410// The following are (1) not mounted above due to shouldMountExclude,411// and (2) always get exclued, and (3) start with . so could conflict412// with the unionfs upper layer, so we change them:413for (const path of EXPLICIT_HIDDEN_EXCLUDES) {414log("bindMountExcludes -- explicit hidden path ", { path });415const source = join(this.data, `.explicit${path}`);416const target = join(this.mount, path);417const upper = join(this.upper, path);418log("bindMountExcludes -- explicit hidden path", { source, target });419await mkdirp(source);420await mkdirp(upper);421await execa("sudo", ["mount", "--bind", source, target]);422}423};424425public sync = async () => {426if (this.state == "sync") {427throw Error("sync currently in progress");428}429if (this.state != "ready") {430throw Error(431`can only sync when state is ready but state is "${this.state}"`,432);433}434log("sync: doing a sync");435const t0 = Date.now();436try {437this.state = "sync";438await this.__doSync();439this.numFails = 0; // it worked440this.reportState({441state: "ready",442progress: 100,443timeout: 3 + this.syncInterval,444});445} catch (err) {446this.numFails += 1;447let extra;448let message = trunc_middle(`${err.message}`, 500);449if (this.numFails >= MAX_FAILURES_IN_A_ROW) {450extra = `XXX Sync failed ${MAX_FAILURES_IN_A_ROW} in a row. FIX THE PROBLEM, THEN CLEAR THIS ERROR TO RESUME SYNC. -- ${message}`;451} else {452extra = `XXX Sync failed ${this.numFails} times in a row with -- ${message}`;453}454// extra here sets visible error state that the user sees.455this.reportState({ state: "error", extra, timeout: 60, progress: 0 });456await this.logSyncError(extra);457throw Error(extra);458} finally {459if (this.state != ("closed" as State)) {460this.state = "ready";461}462log("sync - done, time=", (Date.now() - t0) / 1000);463}464};465466private syncIsDisabled = () => {467if (this.exclude.includes("~") || this.exclude.includes(".")) {468log("syncLoop: '~' or '.' is included in excludes, so we never sync");469return true;470}471return false;472};473474private syncLoop = async () => {475if (this.syncIsDisabled()) {476const wait = 1000 * 60;477log(`syncLoop -- sleeping ${wait / 1000} seconds...`);478this.timeout = setTimeout(this.syncLoop, wait);479return;480}481const t0 = Date.now();482if (this.state == "ready") {483log("syncLoop: ready");484try {485if (this.numFails >= MAX_FAILURES_IN_A_ROW) {486// TODO: get the current error message and if cleared do sync. Otherwise:487const detailedState = await this.getDetailedState();488if (489detailedState &&490(!detailedState.extra || detailedState.state != "error")491) {492log("syncLoop: resuming sync since error was cleared");493this.numFails = 0;494await this.sync();495} else {496log(497`syncLoop: not syncing due to failing ${this.numFails} times in a row. Will restart when error message is cleared.`,498);499}500} else {501await this.sync();502}503} catch (err) {504// This might happen if there is a lot of filesystem activity,505// which changes things during the sync.506// NOTE: the error message can be VERY long, including507// all the output filenames.508log(err.message);509// In case of error, we aggressively back off to reduce impact.510this.syncInterval = Math.min(511this.syncIntervalMax,5121.5 * this.syncInterval,513);514}515} else {516log("sync: skipping since state = ", this.state);517}518// We always wait as long as the last sync took plus the519// next interval. This way if sync is taking a long time520// due to huge files or load, we spread it out, up to a point,521// which is maybe a good idea. If sync is fast, it's fine522// to do it frequently.523const wait = Math.min(524this.syncIntervalMax * 1000,525this.syncInterval * 1000 + (Date.now() - t0),526);527log(`syncLoop -- sleeping ${wait / 1000} seconds...`);528this.timeout = setTimeout(this.syncLoop, wait);529};530531private makeScratchDir = async () => {532await mkdir(this.scratch, { recursive: true });533};534535private logSyncError = async (mesg: string) => {536try {537await writeFile(this.error_txt, mesg);538} catch (err) {539log(`UNABLE to log sync err -- ${err}`);540}541};542543// Save current state to database; useful to inform user as to what is going on.544// We throttle this, because if you call it, then immediately call it again,545// two different hub servers basically gets two different stats at the same time,546// and which state is saved to the database is pretty random! By spacing this out547// by 2s, such a problem is vastly less likely.548private reportState = throttle(549async (opts: { state; extra?; timeout?; progress? }) => {550log("reportState", opts);551try {552await apiCall("v2/compute/set-detailed-state", {553id: this.compute_server_id,554name: "filesystem-sync",555...opts,556});557} catch (err) {558log("reportState: WARNING -- ", err);559}560},5611500,562{ leading: true, trailing: true },563);564565private getDetailedState = async () => {566return await apiCall("v2/compute/get-detailed-state", {567id: this.compute_server_id,568name: "filesystem-sync",569});570};571572// ONLY call this from this.sync!573private __doSync = async () => {574log("doSync");575this.reportState({ state: "get-compute-state", progress: 0, timeout: 10 });576await this.makeScratchDir();577const api = await this.client.project_client.api(this.project_id);578const { computeState, whiteouts } = await this.getComputeState();579// log("doSync", computeState, whiteouts);580const computeStateJson = join(581".compute-servers",582`${this.compute_server_id}`,583"compute-state.json.lz4",584);585await writeFile(586join(this.lower, computeStateJson),587toCompressedJSON(computeState),588);589this.reportState({590state: "send-state-to-project",591progress: 20,592timeout: 10,593});594const { removeFromCompute, copyFromCompute, copyFromProjectTar } =595await api.syncFS({596computeStateJson,597exclude: this.exclude,598compute_server_id: this.compute_server_id,599now: Date.now(),600});601602// log("doSync", { removeFromCompute, copyFromCompute, copyFromProjectTar });603let isActive = false;604if (whiteouts.length > 0) {605isActive = true;606await remove(whiteouts, join(this.upper, UNIONFS));607}608if (removeFromCompute?.length ?? 0 > 0) {609isActive = true;610await remove(removeFromCompute, this.upper);611}612if (copyFromCompute?.length ?? 0 > 0) {613isActive = true;614this.reportState({615state: `send-${copyFromCompute?.length ?? 0}-files-to-project`,616progress: 50,617});618await this.sendFiles(copyFromCompute);619}620if (copyFromProjectTar) {621isActive = true;622this.reportState({623state: "receive-files-from-project",624progress: 70,625});626await this.receiveFiles(copyFromProjectTar);627}628log("DONE receiving files from project as part of sync");629630if (isActive) {631this.syncInterval = this.syncIntervalMin;632} else {633// exponential backoff when not active634this.syncInterval = Math.min(635this.syncIntervalMax,6361.3 * this.syncInterval,637);638}639await this.updateReadTracking();640};641642// private getComputeStatePatch = async (643// lastState: FilesystemState,644// ): Promise<FilesystemStatePatch> => {645// // todo -- whiteouts?646// const { computeState: newState } = await this.getComputeState();647// return makePatch(lastState, newState);648// };649650private getComputeState = async (): Promise<{651computeState: FilesystemState;652whiteouts: string[];653}> => {654// Create the map from all paths in upper (both directories and files and whiteouts),655// except ones excluded from sync, to the ctime for the path (or negative mtime656// for deleted paths): {[path:string]:mtime of last change to file metadata}657const whiteLen = "_HIDDEN~".length;658const computeState = await mtimeDirTree({659path: this.upper,660exclude: this.exclude,661});662const whiteouts: string[] = [];663const unionfs = join(this.upper, UNIONFS);664const mtimes = await mtimeDirTree({665path: unionfs,666exclude: [],667});668for (const path in mtimes) {669const mtime = mtimes[path];670if (path.endsWith("_HIDDEN~")) {671const p = path.slice(0, -whiteLen);672whiteouts.push(path);673if ((await stat(join(unionfs, path))).isDirectory()) {674whiteouts.push(p);675}676computeState[p] = -mtime;677}678}679680return { computeState, whiteouts };681};682683private sendFiles = async (files: string[]) => {684const target = join(this.scratch, "copy-to-project");685log("sendFiles: sending ", files.length, "files listed in ", target);686const file = await open(target, "w");687await file.write(files.join("\0"));688await file.close();689const createArgs = [690"-c",691"--null",692"--no-recursion",693"--verbatim-files-from",694"--files-from",695target,696];697const extractArgs = ["--delay-directory-restore", "-x"];698await this.tar.send({ createArgs, extractArgs });699log("sendFiles: ", files.length, "sent");700};701702// pathToFileList is the path to a file in the file system on703// in the project that has the names of the files to copy to704// the compute server.705private receiveFiles = async (pathToFileList: string) => {706log("receiveFiles: getting files in from project -- ", pathToFileList);707// this runs in the project708const createArgs = [709"-c",710"--null",711"--no-recursion",712"--verbatim-files-from",713"--files-from",714pathToFileList,715];716// this runs here717const extractArgs = ["--delay-directory-restore", "-x"];718await this.tar.get({719createArgs,720extractArgs,721});722log("receiveFiles: files in ", pathToFileList, "received from project");723};724725private updateReadTracking = async () => {726if (!this.readTrackingFile) {727return;728}729// 1. Move the read tracking file to the project. We do a move, so atomic730// and new writes go to a new file and nothing is missed.731// 2. Call tar.get to grab the files.732// NOTE: read tracking isn't triggered on any files that were copied over,733// since unionfs reads those from the local cache (stat doesn't count), so734// we don't have to filter those out.735736// We make any errors below WARNINGS that do not throw an exception, because737// this is an optimization, not critical for sync, and each time we do it,738// things are reset.739const readTrackingOnProject = join(740".compute-servers",741`${this.compute_server_id}`,742"read-tracking",743);744this.reportState({745state: "cache-files-from-project",746progress: 80,747});748try {749try {750// move the file; first locally, then copy across devices, then delete.751// This is to make the initial mv atomic so we don't miss anything.752const tmp = join(753dirname(this.readTrackingFile),754`.${basename(this.readTrackingFile)}.tmp`,755);756await rename(this.readTrackingFile, tmp); // should be atomic757await copyFile(tmp, join(this.lower, readTrackingOnProject));758await rm(tmp);759} catch (err) {760if (err.code == "ENOENT") {761log(762`updateReadTracking -- no read tracking file '${this.readTrackingFile}'`,763);764return;765}766// this could be harmless, e.g., the file doesn't exist yet767log(768`updateReadTracking -- issue moving tracking file '${this.readTrackingFile}'`,769err,770);771return;772}773const createArgs = [774"-c",775"--null",776"--no-recursion",777"--verbatim-files-from",778"--files-from",779readTrackingOnProject,780];781const extractArgs = ["--keep-newer-files", "-x"];782log("updateReadTracking:", "tar", createArgs.join(" "));783try {784await this.tar.get({ createArgs, extractArgs });785} catch (err) {786log(787`updateReadTracking -- issue extracting tracking file '${this.readTrackingFile}'`,788err,789);790return;791}792} finally {793this.reportState({794state: "cache-files-from-project",795progress: 85,796});797}798};799}800801802