Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.
Path: blob/master/src/packages/sync-fs/lib/handle-api-call.ts
Views: 687
/* This runs in the project and handles api calls from computer servers.12It mainly handles a persistent connection from the file system container,3and supports functions including moving files, syncing, executing code,4etc.5*/67import { fromCompressedJSON } from "./compressed-json";8import getLogger from "@cocalc/backend/logger";9import type { FilesystemState } from "./types";10import { metadataFile, mtimeDirTree, remove, writeFileLz4 } from "./util";11import { join } from "path";12import { mkdir, rename, readFile, writeFile } from "fs/promises";13import type { MesgSyncFSOptions } from "@cocalc/comm/websocket/types";14import { sha1 } from "@cocalc/backend/sha1";15//import type { Spark } from "primus";16type Spark = any; // for now1718const log = getLogger("sync-fs:handle-api-call").debug;1920const CLOCK_THRESH_MS = 5 * 1000;2122export default async function handleApiCall({23computeStateJson,24exclude = [],25compute_server_id,26now, // time in ms since epoch on compute server27}: MesgSyncFSOptions) {28log("handleApiCall");2930let computeState;31if (computeStateJson) {32computeState = fromCompressedJSON(await readFile(computeStateJson));33} else {34throw Error("not implemented");35}36if (!process.env.HOME) {37throw Error("HOME must be defined");38}39// This can also happen if the network connection breaks for a bit, e.g., when40// restarting the project.41const clockSkew = Math.abs((now ?? 0) - Date.now());42if (clockSkew >= CLOCK_THRESH_MS) {43throw Error(44`Compute server sync time is off by ${clockSkew}ms, which exceeds the ${CLOCK_THRESH_MS}ms threshhold. Try again and possibly double check your clock settings.`,45);46}4748const meta = await metadataFile({ path: process.env.HOME, exclude });49const projectState = await getProjectState(meta, exclude);5051const {52removeFromCompute,53removeFromProject,54copyFromProject,55copyFromCompute,56} = getOperations({ computeState, projectState });5758if (removeFromProject.length > 0) {59await remove(removeFromProject, process.env.HOME);60}6162await writeMetadataFile({ compute_server_id, meta });6364return {65removeFromCompute,66copyFromCompute,67copyFromProjectTar:68copyFromProject.length > 069? await createCopyFromProjectTar(copyFromProject, compute_server_id)70: undefined,71};72}7374let lastMetadataFileHash: { [compute_server_id: number]: string } = {};75async function writeMetadataFile({ compute_server_id, meta }) {76let start = Date.now();77const hash = sha1(meta);78const path = join(getStateDir(compute_server_id), "meta");79const tmp = join(path, ".meta.lz4");80const target = join(path, "meta.lz4");81if (hash == lastMetadataFileHash[compute_server_id]) {82log(83`writeMetadataFile: not writing "${target}" since hash didn't change. Hash time =`,84Date.now() - start,85"ms",86);87return;88}89lastMetadataFileHash[compute_server_id] = hash;90await mkdir(path, { recursive: true });91await writeFileLz4(tmp, meta);92// ensure this is atomic93await rename(tmp, target);94log(95`writeMetadataFile: wrote out "${target}" atomically. Total time =`,96Date.now() - start,97"ms",98);99}100101function getStateDir(compute_server_id): string {102if (!process.env.HOME) {103throw Error("HOME env var must be set");104}105return join(process.env.HOME, ".compute-servers", `${compute_server_id}`);106}107108// This is the path to a file with the names109// of the files to copy via tar, separated by NULL.110// **This is not an actual tarball.**111// We use NULL instead of newline so that filenames112// with newlines in them work, and this should be processed113// with tar using the --null option.114async function createCopyFromProjectTar(115paths: string[],116compute_server_id: number,117): Promise<string> {118if (!process.env.HOME) {119throw Error("HOME must be defined");120}121const stateDir = getStateDir(compute_server_id);122await mkdir(stateDir, { recursive: true });123const target = join(stateDir, "copy-from-project");124await writeFile(target, paths.join("\0"));125const i = target.lastIndexOf(stateDir);126return target.slice(i);127}128129// we have to use separate cache/state for each exclude list, unfortunately. in practice,130// they should often be similar or the same, because people will rarely customize this (?).131let lastProjectState: { [exclude: string]: FilesystemState } = {};132async function getProjectState(meta, exclude): Promise<FilesystemState> {133const now = Math.floor(Date.now() / 1000); // in integers seconds134const key = JSON.stringify(exclude);135const lastState = lastProjectState[key] ?? {};136137if (!process.env.HOME) {138throw Error("HOME must be defined");139}140const projectState = await mtimeDirTree({141path: process.env.HOME,142exclude,143metadataFile: meta,144});145146// figure out what got deleted in the project147for (const path in lastState) {148if (projectState[path] === undefined) {149// it is currently deleted. If it was already marked deleted at a point in time,150// just stay with that time. If now, consider it deleted now (negative sign means "deleted").151// NOTE: it's impossible to know exactly when path was actually deleted.152projectState[path] = lastState[path] < 0 ? lastState[path] : -now;153}154}155156lastProjectState[key] = projectState;157158// // this is for DEBUGING ONLY!159// await writeFile(160// join(process.env.HOME, ".compute-servers", "project-state.json"),161// JSON.stringify(projectState),162// );163164return projectState;165}166167function getOperations({ computeState, projectState }): {168removeFromCompute: string[];169removeFromProject: string[];170copyFromProject: string[];171copyFromCompute: string[];172} {173const removeFromCompute: string[] = [];174const removeFromProject: string[] = [];175const copyFromProject: string[] = [];176const copyFromCompute: string[] = [];177178const handlePath = (path) => {179const projectMtime = projectState[path];180const computeMtime = computeState[path];181if (projectMtime == computeMtime) {182// definitely nothing to do183return;184}185if (projectMtime !== undefined && computeMtime === undefined) {186// file is NOT stored on compute server, so no need to worry about it187return;188}189// something must be done! What:190if (projectMtime === undefined) {191if (computeMtime < 0) {192// it's supposed to be deleted and it's gone, so nothing to do.193return;194}195// it's definitely NOT on the project but it is on the compute server, so we need it.196copyFromCompute.push(path);197return;198}199200// now both projectMtime and computeMtime are defined and different201// We use >= instead of > so that ties are broken in favor of the project,202// which is an arbitrary but consistent choice.203if (Math.abs(projectMtime) >= Math.abs(computeMtime)) {204// project version is newer205if (projectMtime > 0) {206// it was edited later on the project207copyFromProject.push(path);208} else {209// it was deleted from the project, so now need to delete on compute210removeFromCompute.push(path);211}212return;213} else {214// compute version is newer215if (computeMtime > 0) {216// edited on compute later217copyFromCompute.push(path);218} else {219// deleted on compute, so now also need to delete in project220removeFromProject.push(path);221}222}223};224225for (const path in projectState) {226handlePath(path);227}228for (const path in computeState) {229if (projectState[path] === undefined) {230// NOT already handled above231handlePath(path);232}233}234235return {236removeFromCompute,237removeFromProject,238copyFromProject,239copyFromCompute,240};241}242243const sparks: { [compute_server_id: number]: Spark } = {};244245export async function handleComputeServerSyncRegister(246{ compute_server_id },247spark,248) {249log("handleComputeServerSyncRegister -- registering ", {250compute_server_id,251spark_id: spark.id,252});253// save the connection so we can send a sync_request message later, and also handle the api254// calls for copying files back and forth, etc.255sparks[compute_server_id] = spark;256const remove = () => {257if (sparks[compute_server_id]?.id == spark.id) {258log(259"handleComputeServerSyncRegister: removing compute server connection due to disconnect -- ",260{ compute_server_id, spark_id: spark.id },261);262// the spark connection currently cached is this263// one, so we remove it. It could be replaced by264// a new one, in which case we better not remove it.265delete sparks[compute_server_id];266}267};268spark.on("end", remove);269spark.on("close", remove);270}271272// User has requested that compute_server_id273// do sync right now via the browser websocket api.274export async function handleSyncFsRequestCall({ compute_server_id }) {275const spark = sparks[compute_server_id];276if (spark != null) {277log("handleSyncFsRequestCall: success");278spark.write({ event: "compute_server_sync_request" });279return { status: "ok" };280} else {281log("handleSyncFsRequestCall: fail");282throw Error(`no connection to compute server -- please start it or restart it`);283//throw Error("no connection to compute server");284}285}286287function callComputeServerApi(288compute_server_id,289mesg,290timeoutMs = 30000,291compute = false,292): Promise<any> {293const spark = compute294? computeSparks[compute_server_id]295: sparks[compute_server_id];296if (spark == null) {297log("callComputeServerApi: no connection");298throw Error(299`no connection to compute server -- please start or restart it`,300);301}302return new Promise((resolve, reject) => {303const id = Math.random();304spark.write({ ...mesg, id });305306const handler = (data) => {307if (data?.id == id) {308spark.removeListener("data", handler);309clearTimeout(timeout);310if (data.error) {311reject(Error(data.error));312} else {313resolve(data.resp);314}315}316};317spark.addListener("data", handler);318319const timeout = setTimeout(() => {320spark.removeListener("data", handler);321reject(Error(`timeout -- ${timeoutMs}ms`));322}, timeoutMs);323});324}325326export async function handleCopy(opts: {327event: string;328compute_server_id: number;329paths: string[];330dest?: string;331timeout?: number;332}) {333log("handleCopy: ", opts);334const mesg = { event: opts.event, paths: opts.paths, dest: opts.dest };335return await callComputeServerApi(336opts.compute_server_id,337mesg,338(opts.timeout ?? 30) * 1000,339);340}341342export async function handleSyncFsGetListing({343path,344hidden,345compute_server_id,346}) {347log("handleSyncFsGetListing: ", { path, hidden, compute_server_id });348const mesg = { event: "listing", path, hidden };349return await callComputeServerApi(compute_server_id, mesg, 15000);350}351352export async function handleComputeServerFilesystemExec(opts) {353const { compute_server_id } = opts;354log("handleComputeServerFilesystemExec: ", opts);355const mesg = { event: "exec", opts };356return await callComputeServerApi(357compute_server_id,358mesg,359(opts.timeout ?? 10) * 1000,360);361}362363export async function handleComputeServerDeleteFiles({364compute_server_id,365paths,366}) {367log("handleComputeServerDeleteFiles: ", { compute_server_id, paths });368const mesg = { event: "delete_files", paths };369return await callComputeServerApi(compute_server_id, mesg, 60 * 1000);370}371372export async function handleComputeServerRenameFile({373compute_server_id,374src,375dest,376}) {377log("handleComputeServerRenameFile: ", { compute_server_id, src, dest });378const mesg = { event: "rename_file", src, dest };379return await callComputeServerApi(compute_server_id, mesg, 60 * 1000);380}381382export async function handleComputeServerMoveFiles({383compute_server_id,384paths,385dest,386}) {387log("handleComputeServerMoveFiles: ", { compute_server_id, paths, dest });388const mesg = { event: "move_files", paths, dest };389return await callComputeServerApi(compute_server_id, mesg, 60 * 1000);390}391392/*393Similar but for compute instead of filesystem:394*/395396const computeSparks: { [compute_server_id: number]: Spark } = {};397398export async function handleComputeServerComputeRegister(399{ compute_server_id },400spark,401) {402log("handleComputeServerComputeRegister -- registering ", {403compute_server_id,404spark_id: spark.id,405});406// save the connection so we can send a sync_request message later, and also handle the api407// calls for copying files back and forth, etc.408computeSparks[compute_server_id] = spark;409const remove = () => {410if (computeSparks[compute_server_id]?.id == spark.id) {411log(412"handleComputeServerComputeRegister: removing compute server connection due to disconnect -- ",413{ compute_server_id, spark_id: spark.id },414);415// the spark connection currently cached is this416// one, so we remove it. It could be replaced by417// a new one, in which case we better not remove it.418delete computeSparks[compute_server_id];419}420};421spark.on("end", remove);422spark.on("close", remove);423}424425export async function handleComputeServerComputeExec(opts) {426const { compute_server_id } = opts;427log("handleComputeServerComputeExec: ", opts);428const mesg = { event: "exec", opts };429return await callComputeServerApi(430compute_server_id,431mesg,432(opts.timeout ?? 10) * 1000,433true,434);435}436437438