Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
sagemathinc
GitHub Repository: sagemathinc/cocalc
Path: blob/master/src/packages/conat/files/read.ts
1710 views
1
/*
2
Read a file from a project/compute server via an async generator, so it is memory
3
efficient.
4
5
This is a conat service that uses requestMany, takes as input a filename path, and streams all
6
the binary data from that path.
7
8
We use headers to add sequence numbers into the response messages.
9
10
This is useful to implement:
11
12
- an http server for downloading any file, even large ones.
13
14
15
IDEAS:
16
17
- we could also implement a version of this that takes a directory
18
as input, runs compressed tar on it, and pipes the output into
19
response messages. We could then implement streaming download of
20
a tarball of a directory tree, or also copying a directory tree from
21
one place to another (without using rsync). I've done this already
22
over a websocket for compute servers, so would just copy that code.
23
24
25
DEVELOPMENT:
26
27
See src/packages/backend/conat/test/files/read.test.ts for unit tests.
28
29
~/cocalc/src/packages/backend$ node
30
31
require('@cocalc/backend/conat'); a = require('@cocalc/conat/files/read'); a.createServer({project_id:'00847397-d6a8-4cb0-96a8-6ef64ac3e6cf',compute_server_id:0,createReadStream:require('fs').createReadStream})
32
33
for await (const chunk of await a.readFile({project_id:'00847397-d6a8-4cb0-96a8-6ef64ac3e6cf',compute_server_id:0,path:'/tmp/a'})) { console.log({chunk}); }
34
35
36
for await (const chunk of await a.readFile({project_id:'00847397-d6a8-4cb0-96a8-6ef64ac3e6cf',compute_server_id:0,path:'/projects/6b851643-360e-435e-b87e-f9a6ab64a8b1/cocalc/.git/objects/pack/pack-771f7fe4ee855601463be070cf9fb9afc91f84ac.pack'})) { console.log({chunk}); }
37
38
39
*/
40
41
import { conat } from "@cocalc/conat/client";
42
import { projectSubject } from "@cocalc/conat/names";
43
import { type Subscription } from "@cocalc/conat/core/client";
44
import { delay } from "awaiting";
45
import { getLogger } from "@cocalc/conat/client";
46
47
const logger = getLogger("conat:files:read");
48
49
let subs: { [name: string]: Subscription } = {};
50
export async function close({ project_id, compute_server_id, name = "" }) {
51
const subject = getSubject({ project_id, compute_server_id, name });
52
if (subs[subject] == null) {
53
return;
54
}
55
const sub = subs[subject];
56
delete subs[subject];
57
await sub.drain();
58
}
59
60
function getSubject({ project_id, compute_server_id, name = "" }) {
61
return projectSubject({
62
project_id,
63
compute_server_id,
64
service: `files:read${name ?? ""}`,
65
});
66
}
67
68
export async function createServer({
69
createReadStream,
70
project_id,
71
compute_server_id,
72
name = "",
73
}) {
74
const subject = getSubject({
75
project_id,
76
compute_server_id,
77
name,
78
});
79
logger.debug("createServer", { subject });
80
const cn = await conat();
81
const sub = await cn.subscribe(subject);
82
subs[subject] = sub;
83
listen({ sub, createReadStream });
84
}
85
86
async function listen({ sub, createReadStream }) {
87
// NOTE: we just handle as many messages as we get in parallel, so this
88
// could be a large number of simultaneous downloads. These are all by
89
// authenticated users of the project, and the load is on the project,
90
// so I think that makes sense.
91
for await (const mesg of sub) {
92
handleMessage(mesg, createReadStream);
93
}
94
}
95
96
async function handleMessage(mesg, createReadStream) {
97
logger.debug("handleMessage", mesg.subject);
98
try {
99
await sendData(mesg, createReadStream);
100
await mesg.respond(null, { headers: { done: true } });
101
} catch (err) {
102
logger.debug("handleMessage: ERROR", err);
103
mesg.respondSync(null, { headers: { error: `${err}` } });
104
}
105
}
106
107
// 4MB -- chunks may be slightly bigger
108
const CHUNK_SIZE = 4194304;
109
const CHUNK_INTERVAL = 250;
110
111
function getSeqHeader(seq) {
112
return { headers: { seq } };
113
}
114
115
async function sendData(mesg, createReadStream) {
116
const { path } = mesg.data;
117
logger.debug("sendData: starting", { path });
118
let seq = 0;
119
const chunks: Buffer[] = [];
120
let size = 0;
121
const sendChunks = async () => {
122
// Not only is waiting for the response useful to make sure somebody is listening,
123
// we also use await here partly to space out the messages to avoid saturing
124
// the websocket connection, since doing so would break everything
125
// (heartbeats, etc.) and disconnect us, when transfering a large file.
126
seq += 1;
127
logger.debug("sendData: sending", { path, seq });
128
const data = Buffer.concat(chunks);
129
const { count } = await mesg.respond(data, getSeqHeader(seq));
130
if (count == 0) {
131
logger.debug("sendData: nobody is listening");
132
// nobody is listening so don't waste effort sending...
133
throw Error("receiver is gone");
134
}
135
size = 0;
136
chunks.length = 0;
137
// Delay a little just to give other messages a chance, so we don't get disconnected
138
// e.g., due to lack of heartbeats. Also, this reduces the load on conat-router.
139
await delay(CHUNK_INTERVAL);
140
};
141
142
for await (let chunk of createReadStream(path, {
143
highWaterMark: CHUNK_SIZE,
144
})) {
145
chunks.push(chunk);
146
size += chunk.length;
147
if (size >= CHUNK_SIZE) {
148
// send it
149
await sendChunks();
150
}
151
}
152
if (size > 0) {
153
await sendChunks();
154
}
155
logger.debug("sendData: done", { path }, "successfully sent ", seq, "chunks");
156
}
157
158
export interface ReadFileOptions {
159
project_id: string;
160
compute_server_id?: number;
161
path: string;
162
name?: string;
163
maxWait?: number;
164
}
165
166
export async function* readFile({
167
project_id,
168
compute_server_id = 0,
169
path,
170
name = "",
171
maxWait = 1000 * 60 * 10, // 10 minutes
172
}: ReadFileOptions) {
173
logger.debug("readFile", { project_id, compute_server_id, path });
174
const cn = await conat();
175
const subject = getSubject({
176
project_id,
177
compute_server_id,
178
name,
179
});
180
const v: any = [];
181
let seq = 0;
182
let bytes = 0;
183
for await (const resp of await cn.requestMany(
184
subject,
185
{ path },
186
{
187
// waitForInterest is extremely important because of the timing
188
// of how readFile gets used by writeFile in write.ts.
189
waitForInterest: true,
190
maxWait,
191
},
192
)) {
193
if (resp.headers == null) {
194
continue;
195
}
196
if (resp.headers.error) {
197
throw Error(`${resp.headers.error}`);
198
}
199
if (resp.headers.done) {
200
return;
201
}
202
if (resp.headers.seq) {
203
const next = resp.headers.seq as number;
204
bytes = resp.data.length;
205
// console.log("received seq", { seq: next, bytes });
206
if (next != seq + 1) {
207
throw Error(`lost data: seq=${seq}, next=${next}`);
208
}
209
seq = next;
210
}
211
yield resp.data;
212
}
213
if (bytes != 0) {
214
throw Error("truncated");
215
}
216
// console.log("done!");
217
return v;
218
}
219
220