react / wstein / node_modules / browserify / node_modules / readable-stream / lib / _stream_writable.js
80537 views// Copyright Joyent, Inc. and other Node contributors.1//2// Permission is hereby granted, free of charge, to any person obtaining a3// copy of this software and associated documentation files (the4// "Software"), to deal in the Software without restriction, including5// without limitation the rights to use, copy, modify, merge, publish,6// distribute, sublicense, and/or sell copies of the Software, and to permit7// persons to whom the Software is furnished to do so, subject to the8// following conditions:9//10// The above copyright notice and this permission notice shall be included11// in all copies or substantial portions of the Software.12//13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF15// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN16// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,17// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR18// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE19// USE OR OTHER DEALINGS IN THE SOFTWARE.2021// A bit simpler than readable streams.22// Implement an async ._write(chunk, cb), and it'll handle all23// the drain event emission and buffering.2425module.exports = Writable;2627/*<replacement>*/28var Buffer = require('buffer').Buffer;29/*</replacement>*/3031Writable.WritableState = WritableState;323334/*<replacement>*/35var util = require('core-util-is');36util.inherits = require('inherits');37/*</replacement>*/3839var Stream = require('stream');4041util.inherits(Writable, Stream);4243function WriteReq(chunk, encoding, cb) {44this.chunk = chunk;45this.encoding = encoding;46this.callback = cb;47}4849function WritableState(options, stream) {50var Duplex = require('./_stream_duplex');5152options = options || {};5354// the point at which write() starts returning false55// Note: 0 is a valid value, means that we always return false if56// the entire buffer is not flushed immediately on write()57var hwm = options.highWaterMark;58var defaultHwm = options.objectMode ? 16 : 16 * 1024;59this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;6061// object stream flag to indicate whether or not this stream62// contains buffers or objects.63this.objectMode = !!options.objectMode;6465if (stream instanceof Duplex)66this.objectMode = this.objectMode || !!options.writableObjectMode;6768// cast to ints.69this.highWaterMark = ~~this.highWaterMark;7071this.needDrain = false;72// at the start of calling end()73this.ending = false;74// when end() has been called, and returned75this.ended = false;76// when 'finish' is emitted77this.finished = false;7879// should we decode strings into buffers before passing to _write?80// this is here so that some node-core streams can optimize string81// handling at a lower level.82var noDecode = options.decodeStrings === false;83this.decodeStrings = !noDecode;8485// Crypto is kind of old and crusty. Historically, its default string86// encoding is 'binary' so we have to make this configurable.87// Everything else in the universe uses 'utf8', though.88this.defaultEncoding = options.defaultEncoding || 'utf8';8990// not an actual buffer we keep track of, but a measurement91// of how much we're waiting to get pushed to some underlying92// socket or file.93this.length = 0;9495// a flag to see when we're in the middle of a write.96this.writing = false;9798// when true all writes will be buffered until .uncork() call99this.corked = 0;100101// a flag to be able to tell if the onwrite cb is called immediately,102// or on a later tick. We set this to true at first, because any103// actions that shouldn't happen until "later" should generally also104// not happen before the first write call.105this.sync = true;106107// a flag to know if we're processing previously buffered items, which108// may call the _write() callback in the same tick, so that we don't109// end up in an overlapped onwrite situation.110this.bufferProcessing = false;111112// the callback that's passed to _write(chunk,cb)113this.onwrite = function(er) {114onwrite(stream, er);115};116117// the callback that the user supplies to write(chunk,encoding,cb)118this.writecb = null;119120// the amount that is being written when _write is called.121this.writelen = 0;122123this.buffer = [];124125// number of pending user-supplied write callbacks126// this must be 0 before 'finish' can be emitted127this.pendingcb = 0;128129// emit prefinish if the only thing we're waiting for is _write cbs130// This is relevant for synchronous Transform streams131this.prefinished = false;132133// True if the error was already emitted and should not be thrown again134this.errorEmitted = false;135}136137function Writable(options) {138var Duplex = require('./_stream_duplex');139140// Writable ctor is applied to Duplexes, though they're not141// instanceof Writable, they're instanceof Readable.142if (!(this instanceof Writable) && !(this instanceof Duplex))143return new Writable(options);144145this._writableState = new WritableState(options, this);146147// legacy.148this.writable = true;149150Stream.call(this);151}152153// Otherwise people can pipe Writable streams, which is just wrong.154Writable.prototype.pipe = function() {155this.emit('error', new Error('Cannot pipe. Not readable.'));156};157158159function writeAfterEnd(stream, state, cb) {160var er = new Error('write after end');161// TODO: defer error events consistently everywhere, not just the cb162stream.emit('error', er);163process.nextTick(function() {164cb(er);165});166}167168// If we get something that is not a buffer, string, null, or undefined,169// and we're not in objectMode, then that's an error.170// Otherwise stream chunks are all considered to be of length=1, and the171// watermarks determine how many objects to keep in the buffer, rather than172// how many bytes or characters.173function validChunk(stream, state, chunk, cb) {174var valid = true;175if (!util.isBuffer(chunk) &&176!util.isString(chunk) &&177!util.isNullOrUndefined(chunk) &&178!state.objectMode) {179var er = new TypeError('Invalid non-string/buffer chunk');180stream.emit('error', er);181process.nextTick(function() {182cb(er);183});184valid = false;185}186return valid;187}188189Writable.prototype.write = function(chunk, encoding, cb) {190var state = this._writableState;191var ret = false;192193if (util.isFunction(encoding)) {194cb = encoding;195encoding = null;196}197198if (util.isBuffer(chunk))199encoding = 'buffer';200else if (!encoding)201encoding = state.defaultEncoding;202203if (!util.isFunction(cb))204cb = function() {};205206if (state.ended)207writeAfterEnd(this, state, cb);208else if (validChunk(this, state, chunk, cb)) {209state.pendingcb++;210ret = writeOrBuffer(this, state, chunk, encoding, cb);211}212213return ret;214};215216Writable.prototype.cork = function() {217var state = this._writableState;218219state.corked++;220};221222Writable.prototype.uncork = function() {223var state = this._writableState;224225if (state.corked) {226state.corked--;227228if (!state.writing &&229!state.corked &&230!state.finished &&231!state.bufferProcessing &&232state.buffer.length)233clearBuffer(this, state);234}235};236237function decodeChunk(state, chunk, encoding) {238if (!state.objectMode &&239state.decodeStrings !== false &&240util.isString(chunk)) {241chunk = new Buffer(chunk, encoding);242}243return chunk;244}245246// if we're already writing something, then just put this247// in the queue, and wait our turn. Otherwise, call _write248// If we return false, then we need a drain event, so set that flag.249function writeOrBuffer(stream, state, chunk, encoding, cb) {250chunk = decodeChunk(state, chunk, encoding);251if (util.isBuffer(chunk))252encoding = 'buffer';253var len = state.objectMode ? 1 : chunk.length;254255state.length += len;256257var ret = state.length < state.highWaterMark;258// we must ensure that previous needDrain will not be reset to false.259if (!ret)260state.needDrain = true;261262if (state.writing || state.corked)263state.buffer.push(new WriteReq(chunk, encoding, cb));264else265doWrite(stream, state, false, len, chunk, encoding, cb);266267return ret;268}269270function doWrite(stream, state, writev, len, chunk, encoding, cb) {271state.writelen = len;272state.writecb = cb;273state.writing = true;274state.sync = true;275if (writev)276stream._writev(chunk, state.onwrite);277else278stream._write(chunk, encoding, state.onwrite);279state.sync = false;280}281282function onwriteError(stream, state, sync, er, cb) {283if (sync)284process.nextTick(function() {285state.pendingcb--;286cb(er);287});288else {289state.pendingcb--;290cb(er);291}292293stream._writableState.errorEmitted = true;294stream.emit('error', er);295}296297function onwriteStateUpdate(state) {298state.writing = false;299state.writecb = null;300state.length -= state.writelen;301state.writelen = 0;302}303304function onwrite(stream, er) {305var state = stream._writableState;306var sync = state.sync;307var cb = state.writecb;308309onwriteStateUpdate(state);310311if (er)312onwriteError(stream, state, sync, er, cb);313else {314// Check if we're actually ready to finish, but don't emit yet315var finished = needFinish(stream, state);316317if (!finished &&318!state.corked &&319!state.bufferProcessing &&320state.buffer.length) {321clearBuffer(stream, state);322}323324if (sync) {325process.nextTick(function() {326afterWrite(stream, state, finished, cb);327});328} else {329afterWrite(stream, state, finished, cb);330}331}332}333334function afterWrite(stream, state, finished, cb) {335if (!finished)336onwriteDrain(stream, state);337state.pendingcb--;338cb();339finishMaybe(stream, state);340}341342// Must force callback to be called on nextTick, so that we don't343// emit 'drain' before the write() consumer gets the 'false' return344// value, and has a chance to attach a 'drain' listener.345function onwriteDrain(stream, state) {346if (state.length === 0 && state.needDrain) {347state.needDrain = false;348stream.emit('drain');349}350}351352353// if there's something in the buffer waiting, then process it354function clearBuffer(stream, state) {355state.bufferProcessing = true;356357if (stream._writev && state.buffer.length > 1) {358// Fast case, write everything using _writev()359var cbs = [];360for (var c = 0; c < state.buffer.length; c++)361cbs.push(state.buffer[c].callback);362363// count the one we are adding, as well.364// TODO(isaacs) clean this up365state.pendingcb++;366doWrite(stream, state, true, state.length, state.buffer, '', function(err) {367for (var i = 0; i < cbs.length; i++) {368state.pendingcb--;369cbs[i](err);370}371});372373// Clear buffer374state.buffer = [];375} else {376// Slow case, write chunks one-by-one377for (var c = 0; c < state.buffer.length; c++) {378var entry = state.buffer[c];379var chunk = entry.chunk;380var encoding = entry.encoding;381var cb = entry.callback;382var len = state.objectMode ? 1 : chunk.length;383384doWrite(stream, state, false, len, chunk, encoding, cb);385386// if we didn't call the onwrite immediately, then387// it means that we need to wait until it does.388// also, that means that the chunk and cb are currently389// being processed, so move the buffer counter past them.390if (state.writing) {391c++;392break;393}394}395396if (c < state.buffer.length)397state.buffer = state.buffer.slice(c);398else399state.buffer.length = 0;400}401402state.bufferProcessing = false;403}404405Writable.prototype._write = function(chunk, encoding, cb) {406cb(new Error('not implemented'));407408};409410Writable.prototype._writev = null;411412Writable.prototype.end = function(chunk, encoding, cb) {413var state = this._writableState;414415if (util.isFunction(chunk)) {416cb = chunk;417chunk = null;418encoding = null;419} else if (util.isFunction(encoding)) {420cb = encoding;421encoding = null;422}423424if (!util.isNullOrUndefined(chunk))425this.write(chunk, encoding);426427// .end() fully uncorks428if (state.corked) {429state.corked = 1;430this.uncork();431}432433// ignore unnecessary end() calls.434if (!state.ending && !state.finished)435endWritable(this, state, cb);436};437438439function needFinish(stream, state) {440return (state.ending &&441state.length === 0 &&442!state.finished &&443!state.writing);444}445446function prefinish(stream, state) {447if (!state.prefinished) {448state.prefinished = true;449stream.emit('prefinish');450}451}452453function finishMaybe(stream, state) {454var need = needFinish(stream, state);455if (need) {456if (state.pendingcb === 0) {457prefinish(stream, state);458state.finished = true;459stream.emit('finish');460} else461prefinish(stream, state);462}463return need;464}465466function endWritable(stream, state, cb) {467state.ending = true;468finishMaybe(stream, state);469if (cb) {470if (state.finished)471process.nextTick(cb);472else473stream.once('finish', cb);474}475state.ended = true;476}477478479