Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
emscripten-core
GitHub Repository: emscripten-core/emscripten
Path: blob/main/src/lib/libpipefs.js
6178 views
1
/**
2
* @license
3
* Copyright 2017 The Emscripten Authors
4
* SPDX-License-Identifier: MIT
5
*/
6
7
addToLibrary({
8
$PIPEFS__postset: () => addAtInit('PIPEFS.root = FS.mount(PIPEFS, {}, null);'),
9
$PIPEFS__deps: ['$FS'],
10
$PIPEFS: {
11
BUCKET_BUFFER_SIZE: 1024 * 8, // 8KiB Buffer
12
mount(mount) {
13
// Do not pollute the real root directory or its child nodes with pipes
14
// Looks like it is OK to create another pseudo-root node not linked to the FS.root hierarchy this way
15
return FS.createNode(null, '/', {{{ cDefs.S_IFDIR }}} | 0o777, 0);
16
},
17
createPipe() {
18
var pipe = {
19
buckets: [],
20
// refcnt 2 because pipe has a read end and a write end. We need to be
21
// able to read from the read end after write end is closed.
22
refcnt : 2,
23
timestamp: new Date(),
24
#if PTHREADS || ASYNCIFY
25
readableHandlers: [],
26
registerReadableHandler: (callback) => {
27
callback.registerCleanupFunc(() => {
28
const i = pipe.readableHandlers.indexOf(callback);
29
if (i !== -1) pipe.readableHandlers.splice(i, 1);
30
});
31
pipe.readableHandlers.push(callback);
32
},
33
notifyReadableHandlers: () => {
34
while (pipe.readableHandlers.length > 0) {
35
const cb = pipe.readableHandlers.shift();
36
if (cb) cb({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
37
}
38
pipe.readableHandlers = [];
39
}
40
#endif
41
};
42
43
pipe.buckets.push({
44
buffer: new Uint8Array(PIPEFS.BUCKET_BUFFER_SIZE),
45
offset: 0,
46
roffset: 0
47
});
48
49
var rName = PIPEFS.nextname();
50
var wName = PIPEFS.nextname();
51
var rNode = FS.createNode(PIPEFS.root, rName, {{{ cDefs.S_IFIFO }}}, 0);
52
var wNode = FS.createNode(PIPEFS.root, wName, {{{ cDefs.S_IFIFO }}}, 0);
53
54
rNode.pipe = pipe;
55
wNode.pipe = pipe;
56
57
var readableStream = FS.createStream({
58
path: rName,
59
node: rNode,
60
flags: {{{ cDefs.O_RDONLY }}},
61
seekable: false,
62
stream_ops: PIPEFS.stream_ops
63
});
64
rNode.stream = readableStream;
65
66
var writableStream = FS.createStream({
67
path: wName,
68
node: wNode,
69
flags: {{{ cDefs.O_WRONLY }}},
70
seekable: false,
71
stream_ops: PIPEFS.stream_ops
72
});
73
wNode.stream = writableStream;
74
75
return {
76
readable_fd: readableStream.fd,
77
writable_fd: writableStream.fd
78
};
79
},
80
stream_ops: {
81
getattr(stream) {
82
var node = stream.node;
83
var timestamp = node.pipe.timestamp;
84
return {
85
dev: 14,
86
ino: node.id,
87
mode: 0o10600,
88
nlink: 1,
89
uid: 0,
90
gid: 0,
91
rdev: 0,
92
size: 0,
93
atime: timestamp,
94
mtime: timestamp,
95
ctime: timestamp,
96
blksize: 4096,
97
blocks: 0,
98
};
99
},
100
poll(stream, timeout, notifyCallback) {
101
var pipe = stream.node.pipe;
102
103
if ((stream.flags & {{{ cDefs.O_ACCMODE }}}) === {{{ cDefs.O_WRONLY }}}) {
104
return ({{{ cDefs.POLLWRNORM }}} | {{{ cDefs.POLLOUT }}});
105
}
106
for (var bucket of pipe.buckets) {
107
if (bucket.offset - bucket.roffset > 0) {
108
return ({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
109
}
110
}
111
112
#if PTHREADS || ASYNCIFY
113
if (notifyCallback) pipe.registerReadableHandler(notifyCallback);
114
#endif
115
return 0;
116
},
117
dup(stream) {
118
stream.node.pipe.refcnt++;
119
},
120
ioctl(stream, request, varargs) {
121
return {{{ cDefs.EINVAL }}};
122
},
123
fsync(stream) {
124
return {{{ cDefs.EINVAL }}};
125
},
126
read(stream, buffer, offset, length, position /* ignored */) {
127
var pipe = stream.node.pipe;
128
var currentLength = 0;
129
130
for (var bucket of pipe.buckets) {
131
currentLength += bucket.offset - bucket.roffset;
132
}
133
134
#if ASSERTIONS && !(MEMORY64 && MAXIMUM_MEMORY > FOUR_GB)
135
#if PTHREADS
136
assert(buffer instanceof ArrayBuffer || buffer instanceof SharedArrayBuffer || ArrayBuffer.isView(buffer));
137
#else
138
assert(buffer instanceof ArrayBuffer || ArrayBuffer.isView(buffer));
139
#endif
140
#endif
141
var data = buffer.subarray(offset, offset + length);
142
143
if (length <= 0) {
144
return 0;
145
}
146
if (currentLength == 0) {
147
// Behave as if the read end is always non-blocking
148
throw new FS.ErrnoError({{{ cDefs.EAGAIN }}});
149
}
150
var toRead = Math.min(currentLength, length);
151
152
var totalRead = toRead;
153
var toRemove = 0;
154
155
for (var bucket of pipe.buckets) {
156
var bucketSize = bucket.offset - bucket.roffset;
157
158
if (toRead <= bucketSize) {
159
var tmpSlice = bucket.buffer.subarray(bucket.roffset, bucket.offset);
160
if (toRead < bucketSize) {
161
tmpSlice = tmpSlice.subarray(0, toRead);
162
bucket.roffset += toRead;
163
} else {
164
toRemove++;
165
}
166
data.set(tmpSlice);
167
break;
168
} else {
169
var tmpSlice = bucket.buffer.subarray(bucket.roffset, bucket.offset);
170
data.set(tmpSlice);
171
data = data.subarray(tmpSlice.byteLength);
172
toRead -= tmpSlice.byteLength;
173
toRemove++;
174
}
175
}
176
177
if (toRemove && toRemove == pipe.buckets.length) {
178
// Do not generate excessive garbage in use cases such as
179
// write several bytes, read everything, write several bytes, read everything...
180
toRemove--;
181
pipe.buckets[toRemove].offset = 0;
182
pipe.buckets[toRemove].roffset = 0;
183
}
184
185
pipe.buckets.splice(0, toRemove);
186
187
return totalRead;
188
},
189
write(stream, buffer, offset, length, position /* ignored */) {
190
var pipe = stream.node.pipe;
191
192
#if ASSERTIONS && !(MEMORY64 && MAXIMUM_MEMORY > FOUR_GB)
193
#if PTHREADS
194
assert(buffer instanceof ArrayBuffer || buffer instanceof SharedArrayBuffer || ArrayBuffer.isView(buffer));
195
#else
196
assert(buffer instanceof ArrayBuffer || ArrayBuffer.isView(buffer));
197
#endif
198
#endif
199
var data = buffer.subarray(offset, offset + length);
200
201
var dataLen = data.byteLength;
202
if (dataLen <= 0) {
203
return 0;
204
}
205
206
var currBucket = null;
207
208
if (pipe.buckets.length == 0) {
209
currBucket = {
210
buffer: new Uint8Array(PIPEFS.BUCKET_BUFFER_SIZE),
211
offset: 0,
212
roffset: 0
213
};
214
pipe.buckets.push(currBucket);
215
} else {
216
currBucket = pipe.buckets[pipe.buckets.length - 1];
217
}
218
219
#if ASSERTIONS
220
assert(currBucket.offset <= PIPEFS.BUCKET_BUFFER_SIZE);
221
#endif
222
223
var freeBytesInCurrBuffer = PIPEFS.BUCKET_BUFFER_SIZE - currBucket.offset;
224
if (freeBytesInCurrBuffer >= dataLen) {
225
currBucket.buffer.set(data, currBucket.offset);
226
currBucket.offset += dataLen;
227
#if PTHREADS || ASYNCIFY
228
pipe.notifyReadableHandlers();
229
#endif
230
return dataLen;
231
} else if (freeBytesInCurrBuffer > 0) {
232
currBucket.buffer.set(data.subarray(0, freeBytesInCurrBuffer), currBucket.offset);
233
currBucket.offset += freeBytesInCurrBuffer;
234
data = data.subarray(freeBytesInCurrBuffer, data.byteLength);
235
}
236
237
var numBuckets = (data.byteLength / PIPEFS.BUCKET_BUFFER_SIZE) | 0;
238
var remElements = data.byteLength % PIPEFS.BUCKET_BUFFER_SIZE;
239
240
for (var i = 0; i < numBuckets; i++) {
241
var newBucket = {
242
buffer: new Uint8Array(PIPEFS.BUCKET_BUFFER_SIZE),
243
offset: PIPEFS.BUCKET_BUFFER_SIZE,
244
roffset: 0
245
};
246
pipe.buckets.push(newBucket);
247
newBucket.buffer.set(data.subarray(0, PIPEFS.BUCKET_BUFFER_SIZE));
248
data = data.subarray(PIPEFS.BUCKET_BUFFER_SIZE, data.byteLength);
249
}
250
251
if (remElements > 0) {
252
var newBucket = {
253
buffer: new Uint8Array(PIPEFS.BUCKET_BUFFER_SIZE),
254
offset: data.byteLength,
255
roffset: 0
256
};
257
pipe.buckets.push(newBucket);
258
newBucket.buffer.set(data);
259
}
260
261
#if PTHREADS || ASYNCIFY
262
pipe.notifyReadableHandlers();
263
#endif
264
return dataLen;
265
},
266
close(stream) {
267
var pipe = stream.node.pipe;
268
pipe.refcnt--;
269
if (pipe.refcnt === 0) {
270
pipe.buckets = null;
271
}
272
}
273
},
274
nextname() {
275
if (!PIPEFS.nextname.current) {
276
PIPEFS.nextname.current = 0;
277
}
278
return 'pipe[' + (PIPEFS.nextname.current++) + ']';
279
},
280
},
281
});
282
283