Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download
80547 views
1
var Readable = require('readable-stream').Readable;
2
3
// wrap an old-style stream as the async data source.
4
// This is *not* part of the readable stream interface.
5
// It is an ugly unfortunate mess of history.
6
module.exports = wrap;
7
module.exports.obj = function (stream, opts) {
8
if (!opts) opts = {};
9
opts.objectMode = true;
10
return wrap(stream, opts);
11
};
12
13
function wrap (stream, opts) {
14
var self = new Readable(opts)
15
var state = self._readableState;
16
var paused = false;
17
18
stream.on('end', function() {
19
if (state.decoder && !state.ended) {
20
var chunk = state.decoder.end();
21
if (chunk && chunk.length)
22
self.push(chunk);
23
}
24
25
self.push(null);
26
});
27
28
stream.on('data', function(chunk) {
29
if (state.decoder)
30
chunk = state.decoder.write(chunk);
31
32
// don't skip over falsy values in objectMode
33
//if (state.objectMode && util.isNullOrUndefined(chunk))
34
if (state.objectMode && (chunk === null || chunk === undefined))
35
return
36
else if (!state.objectMode && (!chunk || !chunk.length))
37
return;
38
39
var ret = self.push(chunk);
40
if (!ret) {
41
paused = true;
42
stream.pause();
43
}
44
});
45
46
// proxy all the other methods.
47
// important when wrapping filters and duplexes.
48
for (var i in stream) {
49
if (typeof stream[i] === 'function' &&
50
typeof self[i] === 'undefined') {
51
self[i] = function(method) { return function() {
52
return stream[method].apply(stream, arguments);
53
}}(i);
54
}
55
}
56
57
// proxy certain important events.
58
var events = ['error', 'close', 'destroy', 'pause', 'resume'];
59
for (var i = 0; i < events.length; i++) (function (ev) {
60
stream.on(ev, function () {
61
var args = [ ev ].concat([].slice.call(arguments));
62
self.emit.apply(self, args);
63
})
64
})(events[i]);
65
66
// when we try to consume some more bytes, simply unpause the
67
// underlying stream.
68
self._read = function(n) {
69
if (paused) {
70
paused = false;
71
stream.resume();
72
}
73
};
74
75
return self;
76
};
77
78