Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download
80536 views
1
var Stream = require('stream')
2
3
// through
4
//
5
// a stream that does nothing but re-emit the input.
6
// useful for aggregating a series of changing but not ending streams into one stream)
7
8
exports = module.exports = through
9
through.through = through
10
11
//create a readable writable stream.
12
13
function through (write, end, opts) {
14
write = write || function (data) { this.queue(data) }
15
end = end || function () { this.queue(null) }
16
17
var ended = false, destroyed = false, buffer = [], _ended = false
18
var stream = new Stream()
19
stream.readable = stream.writable = true
20
stream.paused = false
21
22
// stream.autoPause = !(opts && opts.autoPause === false)
23
stream.autoDestroy = !(opts && opts.autoDestroy === false)
24
25
stream.write = function (data) {
26
write.call(this, data)
27
return !stream.paused
28
}
29
30
function drain() {
31
while(buffer.length && !stream.paused) {
32
var data = buffer.shift()
33
if(null === data)
34
return stream.emit('end')
35
else
36
stream.emit('data', data)
37
}
38
}
39
40
stream.queue = stream.push = function (data) {
41
// console.error(ended)
42
if(_ended) return stream
43
if(data === null) _ended = true
44
buffer.push(data)
45
drain()
46
return stream
47
}
48
49
//this will be registered as the first 'end' listener
50
//must call destroy next tick, to make sure we're after any
51
//stream piped from here.
52
//this is only a problem if end is not emitted synchronously.
53
//a nicer way to do this is to make sure this is the last listener for 'end'
54
55
stream.on('end', function () {
56
stream.readable = false
57
if(!stream.writable && stream.autoDestroy)
58
process.nextTick(function () {
59
stream.destroy()
60
})
61
})
62
63
function _end () {
64
stream.writable = false
65
end.call(stream)
66
if(!stream.readable && stream.autoDestroy)
67
stream.destroy()
68
}
69
70
stream.end = function (data) {
71
if(ended) return
72
ended = true
73
if(arguments.length) stream.write(data)
74
_end() // will emit or queue
75
return stream
76
}
77
78
stream.destroy = function () {
79
if(destroyed) return
80
destroyed = true
81
ended = true
82
buffer.length = 0
83
stream.writable = stream.readable = false
84
stream.emit('close')
85
return stream
86
}
87
88
stream.pause = function () {
89
if(stream.paused) return
90
stream.paused = true
91
return stream
92
}
93
94
stream.resume = function () {
95
if(stream.paused) {
96
stream.paused = false
97
stream.emit('resume')
98
}
99
drain()
100
//may have become paused again,
101
//as drain emits 'data'.
102
if(!stream.paused)
103
stream.emit('drain')
104
return stream
105
}
106
return stream
107
}
108
109
110