/*1Streaming write over Conat to a project or compute server.23This is a key component to support user uploads, while being memory efficient4by streaming the write. Basically it uses conat to support efficiently doing5streaming writes of files to any compute server or project that is somehow6connected to conat.78INSTRUCTIONS:910Import writeFile:1112import { writeFile } from "@cocalc/conat/files/write";1314Now you can write a given path to a project (or compute_server) as15simply as this:1617const stream = createReadStream('a file')18await writeFile({stream, project_id, compute_server_id, path, maxWait})1920- Here stream can be any readable stream, not necessarily a stream made using21a file. E.g., you could use PassThrough and explicitly write to it by22write calls.2324- maxWait is a time in ms after which if the file isn't fully written, everything25is cleaned up and there is an error.262728HOW THIS WORKS:2930Here's how this works from the side of the compute server:3132- We start a request/response conat server on the compute server:33- There's one message it accepts, which is:34"Using streaming download to get {path} from [subject]."35The sender of that message should set a long timeout (e.g., 10 minutes).36- It uses the streaming read functionality (in read.ts) to download and write37to disk the file {path}.38- When done it responds {status:"success"} or {status:'error', error:'message...'}3940Here's how it works from the side of whoever is sending the file:4142- Start read server at [subject] that can send {path}.43- Send a request saying "we are making {path} available to you at [subject]."44- Get back "ok" or error. On error (or timeout), close the read server.45- Serve {path} exactly once using the server. When finish sending {path},46close it and clean up. We're done.47484950DEVELOPMENT:5152See src/packages/backend/conat/test/files/write.test.ts for unit tests.5354~/cocalc/src/packages/backend$ node5556require('@cocalc/backend/conat'); a = require('@cocalc/conat/files/write');5758project_id = '00847397-d6a8-4cb0-96a8-6ef64ac3e6cf'; compute_server_id = 0; await a.createServer({project_id,compute_server_id,createWriteStream:require('fs').createWriteStream});5960stream=require('fs').createReadStream('env.ts');61await a.writeFile({stream, project_id, compute_server_id, path:'/tmp/a.ts'})6263*/6465import { conat } from "@cocalc/conat/client";66import { randomId } from "@cocalc/conat/names";67import {68close as closeReadService,69createServer as createReadServer,70readFile,71} from "./read";72import { projectSubject } from "@cocalc/conat/names";73import { type Subscription } from "@cocalc/conat/core/client";74import { type Readable } from "node:stream";75import { getLogger } from "@cocalc/conat/client";76const logger = getLogger("conat:files:write");7778function getWriteSubject({ project_id, compute_server_id }) {79return projectSubject({80project_id,81compute_server_id,82service: "files:write",83});84}8586let subs: { [name: string]: Subscription } = {};87export async function close({ project_id, compute_server_id }) {88const subject = getWriteSubject({ project_id, compute_server_id });89if (subs[subject] == null) {90return;91}92const sub = subs[subject];93delete subs[subject];94await sub.drain();95}9697export async function createServer({98project_id,99compute_server_id,100createWriteStream,101}: {102project_id: string;103compute_server_id: number;104// createWriteStream returns a writeable stream105// for writing the specified path to disk. It106// can be an async function.107createWriteStream: (path: string) => any;108}) {109const subject = getWriteSubject({ project_id, compute_server_id });110logger.debug("createServer", { subject });111let sub = subs[subject];112if (sub != null) {113return;114}115const cn = await conat();116sub = await cn.subscribe(subject);117subs[subject] = sub;118listen({ sub, createWriteStream, project_id, compute_server_id });119}120121async function listen({122sub,123createWriteStream,124project_id,125compute_server_id,126}) {127// NOTE: we just handle as many messages as we get in parallel, so this128// could be a large number of simultaneous downloads. These are all by129// authenticated users of the project, and the load is on the project,130// so I think that makes sense.131for await (const mesg of sub) {132handleMessage({ mesg, createWriteStream, project_id, compute_server_id });133}134}135136async function handleMessage({137mesg,138createWriteStream,139project_id,140compute_server_id,141}) {142let error = "";143let writeStream: null | Awaited<ReturnType<typeof createWriteStream>> = null;144try {145const { path, name, maxWait } = mesg.data;146logger.debug("handleMessage", { path, name, maxWait });147writeStream = await createWriteStream(path);148// console.log("created writeStream");149writeStream.on("error", (err) => {150error = `${err}`;151mesg.respondSync({ error, status: "error" });152console.warn(`error writing ${path}: ${error}`);153writeStream.emit("remove");154});155let chunks = 0;156let bytes = 0;157for await (const chunk of await readFile({158project_id,159compute_server_id,160name,161path,162maxWait,163})) {164if (error) {165// console.log("error", error);166writeStream.end();167return;168}169writeStream.write(chunk);170chunks += 1;171bytes += chunk.length;172logger.debug("handleMessage -- wrote", { path, name, bytes });173// console.log("wrote ", bytes);174}175writeStream.end();176writeStream.emit("rename");177mesg.respondSync({ status: "success", bytes, chunks });178logger.debug("handleMessage -- SUCCESS", { path, name });179} catch (err) {180logger.debug("handleMessage: ERROR", err);181if (!error) {182mesg.respondSync({ error: `${err}`, status: "error" });183writeStream?.emit("remove");184}185}186}187188export interface WriteFileOptions {189project_id: string;190compute_server_id?: number;191path: string;192stream: Readable;193maxWait?: number;194}195196export async function writeFile({197project_id,198compute_server_id = 0,199path,200stream,201maxWait = 1000 * 60 * 10, // 10 minutes202}): Promise<{ bytes: number; chunks: number }> {203logger.debug("writeFile", { project_id, compute_server_id, path, maxWait });204const name = randomId();205try {206function createReadStream() {207return stream;208}209// start read server210await createReadServer({211createReadStream,212project_id,213compute_server_id,214name,215});216// tell compute server / project to start reading our file.217const cn = await conat();218const resp = await cn.request(219getWriteSubject({ project_id, compute_server_id }),220{ name, path, maxWait },221{ timeout: maxWait },222);223const { error, bytes, chunks } = resp.data;224if (error) {225throw Error(error);226}227return { bytes, chunks };228} finally {229await closeReadService({ project_id, compute_server_id, name });230}231}232233234