react / wstein / node_modules / browserify / node_modules / readable-stream / lib / _stream_readable.js
80537 views// Copyright Joyent, Inc. and other Node contributors.1//2// Permission is hereby granted, free of charge, to any person obtaining a3// copy of this software and associated documentation files (the4// "Software"), to deal in the Software without restriction, including5// without limitation the rights to use, copy, modify, merge, publish,6// distribute, sublicense, and/or sell copies of the Software, and to permit7// persons to whom the Software is furnished to do so, subject to the8// following conditions:9//10// The above copyright notice and this permission notice shall be included11// in all copies or substantial portions of the Software.12//13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF15// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN16// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,17// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR18// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE19// USE OR OTHER DEALINGS IN THE SOFTWARE.2021module.exports = Readable;2223/*<replacement>*/24var isArray = require('isarray');25/*</replacement>*/262728/*<replacement>*/29var Buffer = require('buffer').Buffer;30/*</replacement>*/3132Readable.ReadableState = ReadableState;3334var EE = require('events').EventEmitter;3536/*<replacement>*/37if (!EE.listenerCount) EE.listenerCount = function(emitter, type) {38return emitter.listeners(type).length;39};40/*</replacement>*/4142var Stream = require('stream');4344/*<replacement>*/45var util = require('core-util-is');46util.inherits = require('inherits');47/*</replacement>*/4849var StringDecoder;505152/*<replacement>*/53var debug = require('util');54if (debug && debug.debuglog) {55debug = debug.debuglog('stream');56} else {57debug = function () {};58}59/*</replacement>*/606162util.inherits(Readable, Stream);6364function ReadableState(options, stream) {65var Duplex = require('./_stream_duplex');6667options = options || {};6869// the point at which it stops calling _read() to fill the buffer70// Note: 0 is a valid value, means "don't call _read preemptively ever"71var hwm = options.highWaterMark;72var defaultHwm = options.objectMode ? 16 : 16 * 1024;73this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;7475// cast to ints.76this.highWaterMark = ~~this.highWaterMark;7778this.buffer = [];79this.length = 0;80this.pipes = null;81this.pipesCount = 0;82this.flowing = null;83this.ended = false;84this.endEmitted = false;85this.reading = false;8687// a flag to be able to tell if the onwrite cb is called immediately,88// or on a later tick. We set this to true at first, because any89// actions that shouldn't happen until "later" should generally also90// not happen before the first write call.91this.sync = true;9293// whenever we return null, then we set a flag to say94// that we're awaiting a 'readable' event emission.95this.needReadable = false;96this.emittedReadable = false;97this.readableListening = false;9899100// object stream flag. Used to make read(n) ignore n and to101// make all the buffer merging and length checks go away102this.objectMode = !!options.objectMode;103104if (stream instanceof Duplex)105this.objectMode = this.objectMode || !!options.readableObjectMode;106107// Crypto is kind of old and crusty. Historically, its default string108// encoding is 'binary' so we have to make this configurable.109// Everything else in the universe uses 'utf8', though.110this.defaultEncoding = options.defaultEncoding || 'utf8';111112// when piping, we only care about 'readable' events that happen113// after read()ing all the bytes and not getting any pushback.114this.ranOut = false;115116// the number of writers that are awaiting a drain event in .pipe()s117this.awaitDrain = 0;118119// if true, a maybeReadMore has been scheduled120this.readingMore = false;121122this.decoder = null;123this.encoding = null;124if (options.encoding) {125if (!StringDecoder)126StringDecoder = require('string_decoder/').StringDecoder;127this.decoder = new StringDecoder(options.encoding);128this.encoding = options.encoding;129}130}131132function Readable(options) {133var Duplex = require('./_stream_duplex');134135if (!(this instanceof Readable))136return new Readable(options);137138this._readableState = new ReadableState(options, this);139140// legacy141this.readable = true;142143Stream.call(this);144}145146// Manually shove something into the read() buffer.147// This returns true if the highWaterMark has not been hit yet,148// similar to how Writable.write() returns true if you should149// write() some more.150Readable.prototype.push = function(chunk, encoding) {151var state = this._readableState;152153if (util.isString(chunk) && !state.objectMode) {154encoding = encoding || state.defaultEncoding;155if (encoding !== state.encoding) {156chunk = new Buffer(chunk, encoding);157encoding = '';158}159}160161return readableAddChunk(this, state, chunk, encoding, false);162};163164// Unshift should *always* be something directly out of read()165Readable.prototype.unshift = function(chunk) {166var state = this._readableState;167return readableAddChunk(this, state, chunk, '', true);168};169170function readableAddChunk(stream, state, chunk, encoding, addToFront) {171var er = chunkInvalid(state, chunk);172if (er) {173stream.emit('error', er);174} else if (util.isNullOrUndefined(chunk)) {175state.reading = false;176if (!state.ended)177onEofChunk(stream, state);178} else if (state.objectMode || chunk && chunk.length > 0) {179if (state.ended && !addToFront) {180var e = new Error('stream.push() after EOF');181stream.emit('error', e);182} else if (state.endEmitted && addToFront) {183var e = new Error('stream.unshift() after end event');184stream.emit('error', e);185} else {186if (state.decoder && !addToFront && !encoding)187chunk = state.decoder.write(chunk);188189if (!addToFront)190state.reading = false;191192// if we want the data now, just emit it.193if (state.flowing && state.length === 0 && !state.sync) {194stream.emit('data', chunk);195stream.read(0);196} else {197// update the buffer info.198state.length += state.objectMode ? 1 : chunk.length;199if (addToFront)200state.buffer.unshift(chunk);201else202state.buffer.push(chunk);203204if (state.needReadable)205emitReadable(stream);206}207208maybeReadMore(stream, state);209}210} else if (!addToFront) {211state.reading = false;212}213214return needMoreData(state);215}216217218219// if it's past the high water mark, we can push in some more.220// Also, if we have no data yet, we can stand some221// more bytes. This is to work around cases where hwm=0,222// such as the repl. Also, if the push() triggered a223// readable event, and the user called read(largeNumber) such that224// needReadable was set, then we ought to push more, so that another225// 'readable' event will be triggered.226function needMoreData(state) {227return !state.ended &&228(state.needReadable ||229state.length < state.highWaterMark ||230state.length === 0);231}232233// backwards compatibility.234Readable.prototype.setEncoding = function(enc) {235if (!StringDecoder)236StringDecoder = require('string_decoder/').StringDecoder;237this._readableState.decoder = new StringDecoder(enc);238this._readableState.encoding = enc;239return this;240};241242// Don't raise the hwm > 128MB243var MAX_HWM = 0x800000;244function roundUpToNextPowerOf2(n) {245if (n >= MAX_HWM) {246n = MAX_HWM;247} else {248// Get the next highest power of 2249n--;250for (var p = 1; p < 32; p <<= 1) n |= n >> p;251n++;252}253return n;254}255256function howMuchToRead(n, state) {257if (state.length === 0 && state.ended)258return 0;259260if (state.objectMode)261return n === 0 ? 0 : 1;262263if (isNaN(n) || util.isNull(n)) {264// only flow one buffer at a time265if (state.flowing && state.buffer.length)266return state.buffer[0].length;267else268return state.length;269}270271if (n <= 0)272return 0;273274// If we're asking for more than the target buffer level,275// then raise the water mark. Bump up to the next highest276// power of 2, to prevent increasing it excessively in tiny277// amounts.278if (n > state.highWaterMark)279state.highWaterMark = roundUpToNextPowerOf2(n);280281// don't have that much. return null, unless we've ended.282if (n > state.length) {283if (!state.ended) {284state.needReadable = true;285return 0;286} else287return state.length;288}289290return n;291}292293// you can override either this method, or the async _read(n) below.294Readable.prototype.read = function(n) {295debug('read', n);296var state = this._readableState;297var nOrig = n;298299if (!util.isNumber(n) || n > 0)300state.emittedReadable = false;301302// if we're doing read(0) to trigger a readable event, but we303// already have a bunch of data in the buffer, then just trigger304// the 'readable' event and move on.305if (n === 0 &&306state.needReadable &&307(state.length >= state.highWaterMark || state.ended)) {308debug('read: emitReadable', state.length, state.ended);309if (state.length === 0 && state.ended)310endReadable(this);311else312emitReadable(this);313return null;314}315316n = howMuchToRead(n, state);317318// if we've ended, and we're now clear, then finish it up.319if (n === 0 && state.ended) {320if (state.length === 0)321endReadable(this);322return null;323}324325// All the actual chunk generation logic needs to be326// *below* the call to _read. The reason is that in certain327// synthetic stream cases, such as passthrough streams, _read328// may be a completely synchronous operation which may change329// the state of the read buffer, providing enough data when330// before there was *not* enough.331//332// So, the steps are:333// 1. Figure out what the state of things will be after we do334// a read from the buffer.335//336// 2. If that resulting state will trigger a _read, then call _read.337// Note that this may be asynchronous, or synchronous. Yes, it is338// deeply ugly to write APIs this way, but that still doesn't mean339// that the Readable class should behave improperly, as streams are340// designed to be sync/async agnostic.341// Take note if the _read call is sync or async (ie, if the read call342// has returned yet), so that we know whether or not it's safe to emit343// 'readable' etc.344//345// 3. Actually pull the requested chunks out of the buffer and return.346347// if we need a readable event, then we need to do some reading.348var doRead = state.needReadable;349debug('need readable', doRead);350351// if we currently have less than the highWaterMark, then also read some352if (state.length === 0 || state.length - n < state.highWaterMark) {353doRead = true;354debug('length less than watermark', doRead);355}356357// however, if we've ended, then there's no point, and if we're already358// reading, then it's unnecessary.359if (state.ended || state.reading) {360doRead = false;361debug('reading or ended', doRead);362}363364if (doRead) {365debug('do read');366state.reading = true;367state.sync = true;368// if the length is currently zero, then we *need* a readable event.369if (state.length === 0)370state.needReadable = true;371// call internal read method372this._read(state.highWaterMark);373state.sync = false;374}375376// If _read pushed data synchronously, then `reading` will be false,377// and we need to re-evaluate how much data we can return to the user.378if (doRead && !state.reading)379n = howMuchToRead(nOrig, state);380381var ret;382if (n > 0)383ret = fromList(n, state);384else385ret = null;386387if (util.isNull(ret)) {388state.needReadable = true;389n = 0;390}391392state.length -= n;393394// If we have nothing in the buffer, then we want to know395// as soon as we *do* get something into the buffer.396if (state.length === 0 && !state.ended)397state.needReadable = true;398399// If we tried to read() past the EOF, then emit end on the next tick.400if (nOrig !== n && state.ended && state.length === 0)401endReadable(this);402403if (!util.isNull(ret))404this.emit('data', ret);405406return ret;407};408409function chunkInvalid(state, chunk) {410var er = null;411if (!util.isBuffer(chunk) &&412!util.isString(chunk) &&413!util.isNullOrUndefined(chunk) &&414!state.objectMode) {415er = new TypeError('Invalid non-string/buffer chunk');416}417return er;418}419420421function onEofChunk(stream, state) {422if (state.decoder && !state.ended) {423var chunk = state.decoder.end();424if (chunk && chunk.length) {425state.buffer.push(chunk);426state.length += state.objectMode ? 1 : chunk.length;427}428}429state.ended = true;430431// emit 'readable' now to make sure it gets picked up.432emitReadable(stream);433}434435// Don't emit readable right away in sync mode, because this can trigger436// another read() call => stack overflow. This way, it might trigger437// a nextTick recursion warning, but that's not so bad.438function emitReadable(stream) {439var state = stream._readableState;440state.needReadable = false;441if (!state.emittedReadable) {442debug('emitReadable', state.flowing);443state.emittedReadable = true;444if (state.sync)445process.nextTick(function() {446emitReadable_(stream);447});448else449emitReadable_(stream);450}451}452453function emitReadable_(stream) {454debug('emit readable');455stream.emit('readable');456flow(stream);457}458459460// at this point, the user has presumably seen the 'readable' event,461// and called read() to consume some data. that may have triggered462// in turn another _read(n) call, in which case reading = true if463// it's in progress.464// However, if we're not ended, or reading, and the length < hwm,465// then go ahead and try to read some more preemptively.466function maybeReadMore(stream, state) {467if (!state.readingMore) {468state.readingMore = true;469process.nextTick(function() {470maybeReadMore_(stream, state);471});472}473}474475function maybeReadMore_(stream, state) {476var len = state.length;477while (!state.reading && !state.flowing && !state.ended &&478state.length < state.highWaterMark) {479debug('maybeReadMore read 0');480stream.read(0);481if (len === state.length)482// didn't get any data, stop spinning.483break;484else485len = state.length;486}487state.readingMore = false;488}489490// abstract method. to be overridden in specific implementation classes.491// call cb(er, data) where data is <= n in length.492// for virtual (non-string, non-buffer) streams, "length" is somewhat493// arbitrary, and perhaps not very meaningful.494Readable.prototype._read = function(n) {495this.emit('error', new Error('not implemented'));496};497498Readable.prototype.pipe = function(dest, pipeOpts) {499var src = this;500var state = this._readableState;501502switch (state.pipesCount) {503case 0:504state.pipes = dest;505break;506case 1:507state.pipes = [state.pipes, dest];508break;509default:510state.pipes.push(dest);511break;512}513state.pipesCount += 1;514debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);515516var doEnd = (!pipeOpts || pipeOpts.end !== false) &&517dest !== process.stdout &&518dest !== process.stderr;519520var endFn = doEnd ? onend : cleanup;521if (state.endEmitted)522process.nextTick(endFn);523else524src.once('end', endFn);525526dest.on('unpipe', onunpipe);527function onunpipe(readable) {528debug('onunpipe');529if (readable === src) {530cleanup();531}532}533534function onend() {535debug('onend');536dest.end();537}538539// when the dest drains, it reduces the awaitDrain counter540// on the source. This would be more elegant with a .once()541// handler in flow(), but adding and removing repeatedly is542// too slow.543var ondrain = pipeOnDrain(src);544dest.on('drain', ondrain);545546function cleanup() {547debug('cleanup');548// cleanup event handlers once the pipe is broken549dest.removeListener('close', onclose);550dest.removeListener('finish', onfinish);551dest.removeListener('drain', ondrain);552dest.removeListener('error', onerror);553dest.removeListener('unpipe', onunpipe);554src.removeListener('end', onend);555src.removeListener('end', cleanup);556src.removeListener('data', ondata);557558// if the reader is waiting for a drain event from this559// specific writer, then it would cause it to never start560// flowing again.561// So, if this is awaiting a drain, then we just call it now.562// If we don't know, then assume that we are waiting for one.563if (state.awaitDrain &&564(!dest._writableState || dest._writableState.needDrain))565ondrain();566}567568src.on('data', ondata);569function ondata(chunk) {570debug('ondata');571var ret = dest.write(chunk);572if (false === ret) {573debug('false write response, pause',574src._readableState.awaitDrain);575src._readableState.awaitDrain++;576src.pause();577}578}579580// if the dest has an error, then stop piping into it.581// however, don't suppress the throwing behavior for this.582function onerror(er) {583debug('onerror', er);584unpipe();585dest.removeListener('error', onerror);586if (EE.listenerCount(dest, 'error') === 0)587dest.emit('error', er);588}589// This is a brutally ugly hack to make sure that our error handler590// is attached before any userland ones. NEVER DO THIS.591if (!dest._events || !dest._events.error)592dest.on('error', onerror);593else if (isArray(dest._events.error))594dest._events.error.unshift(onerror);595else596dest._events.error = [onerror, dest._events.error];597598599600// Both close and finish should trigger unpipe, but only once.601function onclose() {602dest.removeListener('finish', onfinish);603unpipe();604}605dest.once('close', onclose);606function onfinish() {607debug('onfinish');608dest.removeListener('close', onclose);609unpipe();610}611dest.once('finish', onfinish);612613function unpipe() {614debug('unpipe');615src.unpipe(dest);616}617618// tell the dest that it's being piped to619dest.emit('pipe', src);620621// start the flow if it hasn't been started already.622if (!state.flowing) {623debug('pipe resume');624src.resume();625}626627return dest;628};629630function pipeOnDrain(src) {631return function() {632var state = src._readableState;633debug('pipeOnDrain', state.awaitDrain);634if (state.awaitDrain)635state.awaitDrain--;636if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {637state.flowing = true;638flow(src);639}640};641}642643644Readable.prototype.unpipe = function(dest) {645var state = this._readableState;646647// if we're not piping anywhere, then do nothing.648if (state.pipesCount === 0)649return this;650651// just one destination. most common case.652if (state.pipesCount === 1) {653// passed in one, but it's not the right one.654if (dest && dest !== state.pipes)655return this;656657if (!dest)658dest = state.pipes;659660// got a match.661state.pipes = null;662state.pipesCount = 0;663state.flowing = false;664if (dest)665dest.emit('unpipe', this);666return this;667}668669// slow case. multiple pipe destinations.670671if (!dest) {672// remove all.673var dests = state.pipes;674var len = state.pipesCount;675state.pipes = null;676state.pipesCount = 0;677state.flowing = false;678679for (var i = 0; i < len; i++)680dests[i].emit('unpipe', this);681return this;682}683684// try to find the right one.685var i = indexOf(state.pipes, dest);686if (i === -1)687return this;688689state.pipes.splice(i, 1);690state.pipesCount -= 1;691if (state.pipesCount === 1)692state.pipes = state.pipes[0];693694dest.emit('unpipe', this);695696return this;697};698699// set up data events if they are asked for700// Ensure readable listeners eventually get something701Readable.prototype.on = function(ev, fn) {702var res = Stream.prototype.on.call(this, ev, fn);703704// If listening to data, and it has not explicitly been paused,705// then call resume to start the flow of data on the next tick.706if (ev === 'data' && false !== this._readableState.flowing) {707this.resume();708}709710if (ev === 'readable' && this.readable) {711var state = this._readableState;712if (!state.readableListening) {713state.readableListening = true;714state.emittedReadable = false;715state.needReadable = true;716if (!state.reading) {717var self = this;718process.nextTick(function() {719debug('readable nexttick read 0');720self.read(0);721});722} else if (state.length) {723emitReadable(this, state);724}725}726}727728return res;729};730Readable.prototype.addListener = Readable.prototype.on;731732// pause() and resume() are remnants of the legacy readable stream API733// If the user uses them, then switch into old mode.734Readable.prototype.resume = function() {735var state = this._readableState;736if (!state.flowing) {737debug('resume');738state.flowing = true;739if (!state.reading) {740debug('resume read 0');741this.read(0);742}743resume(this, state);744}745return this;746};747748function resume(stream, state) {749if (!state.resumeScheduled) {750state.resumeScheduled = true;751process.nextTick(function() {752resume_(stream, state);753});754}755}756757function resume_(stream, state) {758state.resumeScheduled = false;759stream.emit('resume');760flow(stream);761if (state.flowing && !state.reading)762stream.read(0);763}764765Readable.prototype.pause = function() {766debug('call pause flowing=%j', this._readableState.flowing);767if (false !== this._readableState.flowing) {768debug('pause');769this._readableState.flowing = false;770this.emit('pause');771}772return this;773};774775function flow(stream) {776var state = stream._readableState;777debug('flow', state.flowing);778if (state.flowing) {779do {780var chunk = stream.read();781} while (null !== chunk && state.flowing);782}783}784785// wrap an old-style stream as the async data source.786// This is *not* part of the readable stream interface.787// It is an ugly unfortunate mess of history.788Readable.prototype.wrap = function(stream) {789var state = this._readableState;790var paused = false;791792var self = this;793stream.on('end', function() {794debug('wrapped end');795if (state.decoder && !state.ended) {796var chunk = state.decoder.end();797if (chunk && chunk.length)798self.push(chunk);799}800801self.push(null);802});803804stream.on('data', function(chunk) {805debug('wrapped data');806if (state.decoder)807chunk = state.decoder.write(chunk);808if (!chunk || !state.objectMode && !chunk.length)809return;810811var ret = self.push(chunk);812if (!ret) {813paused = true;814stream.pause();815}816});817818// proxy all the other methods.819// important when wrapping filters and duplexes.820for (var i in stream) {821if (util.isFunction(stream[i]) && util.isUndefined(this[i])) {822this[i] = function(method) { return function() {823return stream[method].apply(stream, arguments);824}}(i);825}826}827828// proxy certain important events.829var events = ['error', 'close', 'destroy', 'pause', 'resume'];830forEach(events, function(ev) {831stream.on(ev, self.emit.bind(self, ev));832});833834// when we try to consume some more bytes, simply unpause the835// underlying stream.836self._read = function(n) {837debug('wrapped _read', n);838if (paused) {839paused = false;840stream.resume();841}842};843844return self;845};846847848849// exposed for testing purposes only.850Readable._fromList = fromList;851852// Pluck off n bytes from an array of buffers.853// Length is the combined lengths of all the buffers in the list.854function fromList(n, state) {855var list = state.buffer;856var length = state.length;857var stringMode = !!state.decoder;858var objectMode = !!state.objectMode;859var ret;860861// nothing in the list, definitely empty.862if (list.length === 0)863return null;864865if (length === 0)866ret = null;867else if (objectMode)868ret = list.shift();869else if (!n || n >= length) {870// read it all, truncate the array.871if (stringMode)872ret = list.join('');873else874ret = Buffer.concat(list, length);875list.length = 0;876} else {877// read just some of it.878if (n < list[0].length) {879// just take a part of the first list item.880// slice is the same for buffers and strings.881var buf = list[0];882ret = buf.slice(0, n);883list[0] = buf.slice(n);884} else if (n === list[0].length) {885// first list is a perfect match886ret = list.shift();887} else {888// complex case.889// we have enough to cover it, but it spans past the first buffer.890if (stringMode)891ret = '';892else893ret = new Buffer(n);894895var c = 0;896for (var i = 0, l = list.length; i < l && c < n; i++) {897var buf = list[0];898var cpy = Math.min(n - c, buf.length);899900if (stringMode)901ret += buf.slice(0, cpy);902else903buf.copy(ret, c, 0, cpy);904905if (cpy < buf.length)906list[0] = buf.slice(cpy);907else908list.shift();909910c += cpy;911}912}913}914915return ret;916}917918function endReadable(stream) {919var state = stream._readableState;920921// If we get here before consuming all the bytes, then that is a922// bug in node. Should never happen.923if (state.length > 0)924throw new Error('endReadable called on non-empty stream');925926if (!state.endEmitted) {927state.ended = true;928process.nextTick(function() {929// Check that we didn't get one last unshift.930if (!state.endEmitted && state.length === 0) {931state.endEmitted = true;932stream.readable = false;933stream.emit('end');934}935});936}937}938939function forEach (xs, f) {940for (var i = 0, l = xs.length; i < l; i++) {941f(xs[i], i);942}943}944945function indexOf (xs, x) {946for (var i = 0, l = xs.length; i < l; i++) {947if (xs[i] === x) return i;948}949return -1;950}951952953