react / wstein / node_modules / browserify / node_modules / JSONStream / node_modules / through / index.js
80536 viewsvar Stream = require('stream')12// through3//4// a stream that does nothing but re-emit the input.5// useful for aggregating a series of changing but not ending streams into one stream)67exports = module.exports = through8through.through = through910//create a readable writable stream.1112function through (write, end, opts) {13write = write || function (data) { this.queue(data) }14end = end || function () { this.queue(null) }1516var ended = false, destroyed = false, buffer = [], _ended = false17var stream = new Stream()18stream.readable = stream.writable = true19stream.paused = false2021// stream.autoPause = !(opts && opts.autoPause === false)22stream.autoDestroy = !(opts && opts.autoDestroy === false)2324stream.write = function (data) {25write.call(this, data)26return !stream.paused27}2829function drain() {30while(buffer.length && !stream.paused) {31var data = buffer.shift()32if(null === data)33return stream.emit('end')34else35stream.emit('data', data)36}37}3839stream.queue = stream.push = function (data) {40// console.error(ended)41if(_ended) return stream42if(data === null) _ended = true43buffer.push(data)44drain()45return stream46}4748//this will be registered as the first 'end' listener49//must call destroy next tick, to make sure we're after any50//stream piped from here.51//this is only a problem if end is not emitted synchronously.52//a nicer way to do this is to make sure this is the last listener for 'end'5354stream.on('end', function () {55stream.readable = false56if(!stream.writable && stream.autoDestroy)57process.nextTick(function () {58stream.destroy()59})60})6162function _end () {63stream.writable = false64end.call(stream)65if(!stream.readable && stream.autoDestroy)66stream.destroy()67}6869stream.end = function (data) {70if(ended) return71ended = true72if(arguments.length) stream.write(data)73_end() // will emit or queue74return stream75}7677stream.destroy = function () {78if(destroyed) return79destroyed = true80ended = true81buffer.length = 082stream.writable = stream.readable = false83stream.emit('close')84return stream85}8687stream.pause = function () {88if(stream.paused) return89stream.paused = true90return stream91}9293stream.resume = function () {94if(stream.paused) {95stream.paused = false96stream.emit('resume')97}98drain()99//may have become paused again,100//as drain emits 'data'.101if(!stream.paused)102stream.emit('drain')103return stream104}105return stream106}107108109110