import { conat } from "@cocalc/conat/client";
import { projectSubject } from "@cocalc/conat/names";
import { type Subscription } from "@cocalc/conat/core/client";
import { delay } from "awaiting";
import { getLogger } from "@cocalc/conat/client";
const logger = getLogger("conat:files:read");
let subs: { [name: string]: Subscription } = {};
export async function close({ project_id, compute_server_id, name = "" }) {
const subject = getSubject({ project_id, compute_server_id, name });
if (subs[subject] == null) {
return;
}
const sub = subs[subject];
delete subs[subject];
await sub.drain();
}
function getSubject({ project_id, compute_server_id, name = "" }) {
return projectSubject({
project_id,
compute_server_id,
service: `files:read${name ?? ""}`,
});
}
export async function createServer({
createReadStream,
project_id,
compute_server_id,
name = "",
}) {
const subject = getSubject({
project_id,
compute_server_id,
name,
});
logger.debug("createServer", { subject });
const cn = await conat();
const sub = await cn.subscribe(subject);
subs[subject] = sub;
listen({ sub, createReadStream });
}
async function listen({ sub, createReadStream }) {
for await (const mesg of sub) {
handleMessage(mesg, createReadStream);
}
}
async function handleMessage(mesg, createReadStream) {
logger.debug("handleMessage", mesg.subject);
try {
await sendData(mesg, createReadStream);
await mesg.respond(null, { headers: { done: true } });
} catch (err) {
logger.debug("handleMessage: ERROR", err);
mesg.respondSync(null, { headers: { error: `${err}` } });
}
}
const CHUNK_SIZE = 4194304;
const CHUNK_INTERVAL = 250;
function getSeqHeader(seq) {
return { headers: { seq } };
}
async function sendData(mesg, createReadStream) {
const { path } = mesg.data;
logger.debug("sendData: starting", { path });
let seq = 0;
const chunks: Buffer[] = [];
let size = 0;
const sendChunks = async () => {
seq += 1;
logger.debug("sendData: sending", { path, seq });
const data = Buffer.concat(chunks);
const { count } = await mesg.respond(data, getSeqHeader(seq));
if (count == 0) {
logger.debug("sendData: nobody is listening");
throw Error("receiver is gone");
}
size = 0;
chunks.length = 0;
await delay(CHUNK_INTERVAL);
};
for await (let chunk of createReadStream(path, {
highWaterMark: CHUNK_SIZE,
})) {
chunks.push(chunk);
size += chunk.length;
if (size >= CHUNK_SIZE) {
await sendChunks();
}
}
if (size > 0) {
await sendChunks();
}
logger.debug("sendData: done", { path }, "successfully sent ", seq, "chunks");
}
export interface ReadFileOptions {
project_id: string;
compute_server_id?: number;
path: string;
name?: string;
maxWait?: number;
}
export async function* readFile({
project_id,
compute_server_id = 0,
path,
name = "",
maxWait = 1000 * 60 * 10,
}: ReadFileOptions) {
logger.debug("readFile", { project_id, compute_server_id, path });
const cn = await conat();
const subject = getSubject({
project_id,
compute_server_id,
name,
});
const v: any = [];
let seq = 0;
let bytes = 0;
for await (const resp of await cn.requestMany(
subject,
{ path },
{
waitForInterest: true,
maxWait,
},
)) {
if (resp.headers == null) {
continue;
}
if (resp.headers.error) {
throw Error(`${resp.headers.error}`);
}
if (resp.headers.done) {
return;
}
if (resp.headers.seq) {
const next = resp.headers.seq as number;
bytes = resp.data.length;
if (next != seq + 1) {
throw Error(`lost data: seq=${seq}, next=${next}`);
}
seq = next;
}
yield resp.data;
}
if (bytes != 0) {
throw Error("truncated");
}
return v;
}