Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download
80536 views
1
var Duplex = require('readable-stream').Duplex;
2
var Readable = require('readable-stream').Readable;
3
var Pass = require('readable-stream').PassThrough;
4
var inherits = require('inherits');
5
var isArray = require('isarray');
6
var indexof = require('indexof');
7
var wrap = require('readable-wrap');
8
9
var nextTick = typeof setImmediate !== 'undefined'
10
? setImmediate : process.nextTick
11
;
12
13
module.exports = Pipeline;
14
inherits(Pipeline, Duplex);
15
16
module.exports.obj = function (streams, opts) {
17
if (!opts && !isArray(streams)) {
18
opts = streams;
19
streams = [];
20
}
21
if (!streams) streams = [];
22
if (!opts) opts = {};
23
opts.objectMode = true;
24
return new Pipeline(streams, opts);
25
};
26
27
function Pipeline (streams, opts) {
28
if (!(this instanceof Pipeline)) return new Pipeline(streams, opts);
29
if (!opts && !isArray(streams)) {
30
opts = streams;
31
streams = [];
32
}
33
if (!streams) streams = [];
34
if (!opts) opts = {};
35
Duplex.call(this, opts);
36
37
var self = this;
38
this._options = opts;
39
this._wrapOptions = { objectMode: opts.objectMode !== false };
40
this._streams = [];
41
42
this.splice.apply(this, [ 0, 0 ].concat(streams));
43
44
this.once('finish', function () {
45
self._streams[0].end();
46
});
47
}
48
49
Pipeline.prototype._read = function () {
50
var self = this;
51
this._notEmpty();
52
53
var r = this._streams[this._streams.length-1];
54
var buf, reads = 0;
55
while ((buf = r.read()) !== null) {
56
Duplex.prototype.push.call(this, buf);
57
reads ++;
58
}
59
if (reads === 0) {
60
var onreadable = function () {
61
r.removeListener('readable', onreadable);
62
self.removeListener('_mutate', onreadable);
63
self._read()
64
};
65
r.once('readable', onreadable);
66
self.once('_mutate', onreadable);
67
}
68
};
69
70
Pipeline.prototype._write = function (buf, enc, next) {
71
this._notEmpty();
72
this._streams[0]._write(buf, enc, next);
73
};
74
75
Pipeline.prototype._notEmpty = function () {
76
var self = this;
77
if (this._streams.length > 0) return;
78
var stream = new Pass(this._options);
79
stream.once('end', function () {
80
var ix = indexof(self._streams, stream);
81
if (ix >= 0 && ix === self._streams.length - 1) {
82
Duplex.prototype.push.call(self, null);
83
}
84
});
85
this._streams.push(stream);
86
this.length = this._streams.length;
87
};
88
89
Pipeline.prototype.push = function (stream) {
90
var args = [ this._streams.length, 0 ].concat([].slice.call(arguments));
91
this.splice.apply(this, args);
92
return this._streams.length;
93
};
94
95
Pipeline.prototype.pop = function () {
96
return this.splice(this._streams.length-1,1)[0];
97
};
98
99
Pipeline.prototype.shift = function () {
100
return this.splice(0,1)[0];
101
};
102
103
Pipeline.prototype.unshift = function () {
104
this.splice.apply(this, [0,0].concat([].slice.call(arguments)));
105
return this._streams.length;
106
};
107
108
Pipeline.prototype.splice = function (start, removeLen) {
109
var self = this;
110
var len = this._streams.length;
111
start = start < 0 ? len - start : start;
112
if (removeLen === undefined) removeLen = len - start;
113
removeLen = Math.max(0, Math.min(len - start, removeLen));
114
115
for (var i = start; i < start + removeLen; i++) {
116
if (self._streams[i-1]) {
117
self._streams[i-1].unpipe(self._streams[i]);
118
}
119
}
120
if (self._streams[i-1] && self._streams[i]) {
121
self._streams[i-1].unpipe(self._streams[i]);
122
}
123
var end = i;
124
125
var reps = [], args = arguments;
126
for (var j = 2; j < args.length; j++) (function (stream) {
127
if (isArray(stream)) {
128
stream = new Pipeline(stream, self._options);
129
}
130
stream.on('error', function (err) {
131
err.stream = this;
132
self.emit('error', err);
133
});
134
stream = self._wrapStream(stream);
135
stream.once('end', function () {
136
var ix = indexof(self._streams, stream);
137
if (ix >= 0 && ix === self._streams.length - 1) {
138
Duplex.prototype.push.call(self, null);
139
}
140
});
141
reps.push(stream);
142
})(arguments[j]);
143
144
for (var i = 0; i < reps.length - 1; i++) {
145
reps[i].pipe(reps[i+1]);
146
}
147
148
if (reps.length && self._streams[end]) {
149
reps[reps.length-1].pipe(self._streams[end]);
150
}
151
if (reps[0] && self._streams[start-1]) {
152
self._streams[start-1].pipe(reps[0]);
153
}
154
155
var sargs = [start,removeLen].concat(reps);
156
var removed = self._streams.splice.apply(self._streams, sargs);
157
158
this.emit('_mutate');
159
this.length = this._streams.length;
160
return removed;
161
};
162
163
Pipeline.prototype.get = function () {
164
if (arguments.length === 0) return undefined;
165
166
var base = this;
167
for (var i = 0; i < arguments.length; i++) {
168
var index = arguments[i];
169
if (index < 0) {
170
base = base._streams[base._streams.length + index];
171
}
172
else {
173
base = base._streams[index];
174
}
175
if (!base) return undefined;
176
}
177
return base;
178
};
179
180
Pipeline.prototype.indexOf = function (stream) {
181
return indexof(this._streams, stream);
182
};
183
184
Pipeline.prototype._wrapStream = function (stream) {
185
if (typeof stream.read === 'function') return stream;
186
var w = wrap(stream, this._wrapOptions);
187
w._write = function (buf, enc, next) {
188
if (stream.write(buf) === false) {
189
stream.once('drain', next);
190
}
191
else nextTick(next);
192
};
193
return w;
194
};
195
196