Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/files/write.ts
1710 views
1
/*
2
Streaming write over Conat to a project or compute server.
3
4
This is a key component to support user uploads, while being memory efficient
5
by streaming the write. Basically it uses conat to support efficiently doing
6
streaming writes of files to any compute server or project that is somehow
7
connected to conat.
8
9
INSTRUCTIONS:
10
11
Import writeFile:
12
13
import { writeFile } from "@cocalc/conat/files/write";
14
15
Now you can write a given path to a project (or compute_server) as
16
simply as this:
17
18
const stream = createReadStream('a file')
19
await writeFile({stream, project_id, compute_server_id, path, maxWait})
20
21
- Here stream can be any readable stream, not necessarily a stream made using
22
a file. E.g., you could use PassThrough and explicitly write to it by
23
write calls.
24
25
- maxWait is a time in ms after which if the file isn't fully written, everything
26
is cleaned up and there is an error.
27
28
29
HOW THIS WORKS:
30
31
Here's how this works from the side of the compute server:
32
33
- We start a request/response conat server on the compute server:
34
- There's one message it accepts, which is:
35
"Using streaming download to get {path} from [subject]."
36
The sender of that message should set a long timeout (e.g., 10 minutes).
37
- It uses the streaming read functionality (in read.ts) to download and write
38
to disk the file {path}.
39
- When done it responds {status:"success"} or {status:'error', error:'message...'}
40
41
Here's how it works from the side of whoever is sending the file:
42
43
- Start read server at [subject] that can send {path}.
44
- Send a request saying "we are making {path} available to you at [subject]."
45
- Get back "ok" or error. On error (or timeout), close the read server.
46
- Serve {path} exactly once using the server. When finish sending {path},
47
close it and clean up. We're done.
48
49
50
51
DEVELOPMENT:
52
53
See src/packages/backend/conat/test/files/write.test.ts for unit tests.
54
55
~/cocalc/src/packages/backend$ node
56
57
require('@cocalc/backend/conat'); a = require('@cocalc/conat/files/write');
58
59
project_id = '00847397-d6a8-4cb0-96a8-6ef64ac3e6cf'; compute_server_id = 0; await a.createServer({project_id,compute_server_id,createWriteStream:require('fs').createWriteStream});
60
61
stream=require('fs').createReadStream('env.ts');
62
await a.writeFile({stream, project_id, compute_server_id, path:'/tmp/a.ts'})
63
64
*/
65
66
import { conat } from "@cocalc/conat/client";
67
import { randomId } from "@cocalc/conat/names";
68
import {
69
close as closeReadService,
70
createServer as createReadServer,
71
readFile,
72
} from "./read";
73
import { projectSubject } from "@cocalc/conat/names";
74
import { type Subscription } from "@cocalc/conat/core/client";
75
import { type Readable } from "node:stream";
76
import { getLogger } from "@cocalc/conat/client";
77
const logger = getLogger("conat:files:write");
78
79
function getWriteSubject({ project_id, compute_server_id }) {
80
return projectSubject({
81
project_id,
82
compute_server_id,
83
service: "files:write",
84
});
85
}
86
87
let subs: { [name: string]: Subscription } = {};
88
export async function close({ project_id, compute_server_id }) {
89
const subject = getWriteSubject({ project_id, compute_server_id });
90
if (subs[subject] == null) {
91
return;
92
}
93
const sub = subs[subject];
94
delete subs[subject];
95
await sub.drain();
96
}
97
98
export async function createServer({
99
project_id,
100
compute_server_id,
101
createWriteStream,
102
}: {
103
project_id: string;
104
compute_server_id: number;
105
// createWriteStream returns a writeable stream
106
// for writing the specified path to disk. It
107
// can be an async function.
108
createWriteStream: (path: string) => any;
109
}) {
110
const subject = getWriteSubject({ project_id, compute_server_id });
111
logger.debug("createServer", { subject });
112
let sub = subs[subject];
113
if (sub != null) {
114
return;
115
}
116
const cn = await conat();
117
sub = await cn.subscribe(subject);
118
subs[subject] = sub;
119
listen({ sub, createWriteStream, project_id, compute_server_id });
120
}
121
122
async function listen({
123
sub,
124
createWriteStream,
125
project_id,
126
compute_server_id,
127
}) {
128
// NOTE: we just handle as many messages as we get in parallel, so this
129
// could be a large number of simultaneous downloads. These are all by
130
// authenticated users of the project, and the load is on the project,
131
// so I think that makes sense.
132
for await (const mesg of sub) {
133
handleMessage({ mesg, createWriteStream, project_id, compute_server_id });
134
}
135
}
136
137
async function handleMessage({
138
mesg,
139
createWriteStream,
140
project_id,
141
compute_server_id,
142
}) {
143
let error = "";
144
let writeStream: null | Awaited<ReturnType<typeof createWriteStream>> = null;
145
try {
146
const { path, name, maxWait } = mesg.data;
147
logger.debug("handleMessage", { path, name, maxWait });
148
writeStream = await createWriteStream(path);
149
// console.log("created writeStream");
150
writeStream.on("error", (err) => {
151
error = `${err}`;
152
mesg.respondSync({ error, status: "error" });
153
console.warn(`error writing ${path}: ${error}`);
154
writeStream.emit("remove");
155
});
156
let chunks = 0;
157
let bytes = 0;
158
for await (const chunk of await readFile({
159
project_id,
160
compute_server_id,
161
name,
162
path,
163
maxWait,
164
})) {
165
if (error) {
166
// console.log("error", error);
167
writeStream.end();
168
return;
169
}
170
writeStream.write(chunk);
171
chunks += 1;
172
bytes += chunk.length;
173
logger.debug("handleMessage -- wrote", { path, name, bytes });
174
// console.log("wrote ", bytes);
175
}
176
writeStream.end();
177
writeStream.emit("rename");
178
mesg.respondSync({ status: "success", bytes, chunks });
179
logger.debug("handleMessage -- SUCCESS", { path, name });
180
} catch (err) {
181
logger.debug("handleMessage: ERROR", err);
182
if (!error) {
183
mesg.respondSync({ error: `${err}`, status: "error" });
184
writeStream?.emit("remove");
185
}
186
}
187
}
188
189
export interface WriteFileOptions {
190
project_id: string;
191
compute_server_id?: number;
192
path: string;
193
stream: Readable;
194
maxWait?: number;
195
}
196
197
export async function writeFile({
198
project_id,
199
compute_server_id = 0,
200
path,
201
stream,
202
maxWait = 1000 * 60 * 10, // 10 minutes
203
}): Promise<{ bytes: number; chunks: number }> {
204
logger.debug("writeFile", { project_id, compute_server_id, path, maxWait });
205
const name = randomId();
206
try {
207
function createReadStream() {
208
return stream;
209
}
210
// start read server
211
await createReadServer({
212
createReadStream,
213
project_id,
214
compute_server_id,
215
name,
216
});
217
// tell compute server / project to start reading our file.
218
const cn = await conat();
219
const resp = await cn.request(
220
getWriteSubject({ project_id, compute_server_id }),
221
{ name, path, maxWait },
222
{ timeout: maxWait },
223
);
224
const { error, bytes, chunks } = resp.data;
225
if (error) {
226
throw Error(error);
227
}
228
return { bytes, chunks };
229
} finally {
230
await closeReadService({ project_id, compute_server_id, name });
231
}
232
}
233
234