react / wstein / node_modules / browserify / node_modules / module-deps / node_modules / stream-combiner2 / node_modules / through2 / node_modules / readable-stream / lib / _stream_writable.js
80580 views// Copyright Joyent, Inc. and other Node contributors.1//2// Permission is hereby granted, free of charge, to any person obtaining a3// copy of this software and associated documentation files (the4// "Software"), to deal in the Software without restriction, including5// without limitation the rights to use, copy, modify, merge, publish,6// distribute, sublicense, and/or sell copies of the Software, and to permit7// persons to whom the Software is furnished to do so, subject to the8// following conditions:9//10// The above copyright notice and this permission notice shall be included11// in all copies or substantial portions of the Software.12//13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF15// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN16// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,17// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR18// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE19// USE OR OTHER DEALINGS IN THE SOFTWARE.2021// A bit simpler than readable streams.22// Implement an async ._write(chunk, cb), and it'll handle all23// the drain event emission and buffering.2425module.exports = Writable;2627/*<replacement>*/28var Buffer = require('buffer').Buffer;29/*</replacement>*/3031Writable.WritableState = WritableState;323334/*<replacement>*/35var util = require('core-util-is');36util.inherits = require('inherits');37/*</replacement>*/3839var Stream = require('stream');4041util.inherits(Writable, Stream);4243function WriteReq(chunk, encoding, cb) {44this.chunk = chunk;45this.encoding = encoding;46this.callback = cb;47}4849function WritableState(options, stream) {50options = options || {};5152// the point at which write() starts returning false53// Note: 0 is a valid value, means that we always return false if54// the entire buffer is not flushed immediately on write()55var hwm = options.highWaterMark;56this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;5758// object stream flag to indicate whether or not this stream59// contains buffers or objects.60this.objectMode = !!options.objectMode;6162// cast to ints.63this.highWaterMark = ~~this.highWaterMark;6465this.needDrain = false;66// at the start of calling end()67this.ending = false;68// when end() has been called, and returned69this.ended = false;70// when 'finish' is emitted71this.finished = false;7273// should we decode strings into buffers before passing to _write?74// this is here so that some node-core streams can optimize string75// handling at a lower level.76var noDecode = options.decodeStrings === false;77this.decodeStrings = !noDecode;7879// Crypto is kind of old and crusty. Historically, its default string80// encoding is 'binary' so we have to make this configurable.81// Everything else in the universe uses 'utf8', though.82this.defaultEncoding = options.defaultEncoding || 'utf8';8384// not an actual buffer we keep track of, but a measurement85// of how much we're waiting to get pushed to some underlying86// socket or file.87this.length = 0;8889// a flag to see when we're in the middle of a write.90this.writing = false;9192// a flag to be able to tell if the onwrite cb is called immediately,93// or on a later tick. We set this to true at first, becuase any94// actions that shouldn't happen until "later" should generally also95// not happen before the first write call.96this.sync = true;9798// a flag to know if we're processing previously buffered items, which99// may call the _write() callback in the same tick, so that we don't100// end up in an overlapped onwrite situation.101this.bufferProcessing = false;102103// the callback that's passed to _write(chunk,cb)104this.onwrite = function(er) {105onwrite(stream, er);106};107108// the callback that the user supplies to write(chunk,encoding,cb)109this.writecb = null;110111// the amount that is being written when _write is called.112this.writelen = 0;113114this.buffer = [];115116// True if the error was already emitted and should not be thrown again117this.errorEmitted = false;118}119120function Writable(options) {121var Duplex = require('./_stream_duplex');122123// Writable ctor is applied to Duplexes, though they're not124// instanceof Writable, they're instanceof Readable.125if (!(this instanceof Writable) && !(this instanceof Duplex))126return new Writable(options);127128this._writableState = new WritableState(options, this);129130// legacy.131this.writable = true;132133Stream.call(this);134}135136// Otherwise people can pipe Writable streams, which is just wrong.137Writable.prototype.pipe = function() {138this.emit('error', new Error('Cannot pipe. Not readable.'));139};140141142function writeAfterEnd(stream, state, cb) {143var er = new Error('write after end');144// TODO: defer error events consistently everywhere, not just the cb145stream.emit('error', er);146process.nextTick(function() {147cb(er);148});149}150151// If we get something that is not a buffer, string, null, or undefined,152// and we're not in objectMode, then that's an error.153// Otherwise stream chunks are all considered to be of length=1, and the154// watermarks determine how many objects to keep in the buffer, rather than155// how many bytes or characters.156function validChunk(stream, state, chunk, cb) {157var valid = true;158if (!Buffer.isBuffer(chunk) &&159'string' !== typeof chunk &&160chunk !== null &&161chunk !== undefined &&162!state.objectMode) {163var er = new TypeError('Invalid non-string/buffer chunk');164stream.emit('error', er);165process.nextTick(function() {166cb(er);167});168valid = false;169}170return valid;171}172173Writable.prototype.write = function(chunk, encoding, cb) {174var state = this._writableState;175var ret = false;176177if (typeof encoding === 'function') {178cb = encoding;179encoding = null;180}181182if (Buffer.isBuffer(chunk))183encoding = 'buffer';184else if (!encoding)185encoding = state.defaultEncoding;186187if (typeof cb !== 'function')188cb = function() {};189190if (state.ended)191writeAfterEnd(this, state, cb);192else if (validChunk(this, state, chunk, cb))193ret = writeOrBuffer(this, state, chunk, encoding, cb);194195return ret;196};197198function decodeChunk(state, chunk, encoding) {199if (!state.objectMode &&200state.decodeStrings !== false &&201typeof chunk === 'string') {202chunk = new Buffer(chunk, encoding);203}204return chunk;205}206207// if we're already writing something, then just put this208// in the queue, and wait our turn. Otherwise, call _write209// If we return false, then we need a drain event, so set that flag.210function writeOrBuffer(stream, state, chunk, encoding, cb) {211chunk = decodeChunk(state, chunk, encoding);212if (Buffer.isBuffer(chunk))213encoding = 'buffer';214var len = state.objectMode ? 1 : chunk.length;215216state.length += len;217218var ret = state.length < state.highWaterMark;219// we must ensure that previous needDrain will not be reset to false.220if (!ret)221state.needDrain = true;222223if (state.writing)224state.buffer.push(new WriteReq(chunk, encoding, cb));225else226doWrite(stream, state, len, chunk, encoding, cb);227228return ret;229}230231function doWrite(stream, state, len, chunk, encoding, cb) {232state.writelen = len;233state.writecb = cb;234state.writing = true;235state.sync = true;236stream._write(chunk, encoding, state.onwrite);237state.sync = false;238}239240function onwriteError(stream, state, sync, er, cb) {241if (sync)242process.nextTick(function() {243cb(er);244});245else246cb(er);247248stream._writableState.errorEmitted = true;249stream.emit('error', er);250}251252function onwriteStateUpdate(state) {253state.writing = false;254state.writecb = null;255state.length -= state.writelen;256state.writelen = 0;257}258259function onwrite(stream, er) {260var state = stream._writableState;261var sync = state.sync;262var cb = state.writecb;263264onwriteStateUpdate(state);265266if (er)267onwriteError(stream, state, sync, er, cb);268else {269// Check if we're actually ready to finish, but don't emit yet270var finished = needFinish(stream, state);271272if (!finished && !state.bufferProcessing && state.buffer.length)273clearBuffer(stream, state);274275if (sync) {276process.nextTick(function() {277afterWrite(stream, state, finished, cb);278});279} else {280afterWrite(stream, state, finished, cb);281}282}283}284285function afterWrite(stream, state, finished, cb) {286if (!finished)287onwriteDrain(stream, state);288cb();289if (finished)290finishMaybe(stream, state);291}292293// Must force callback to be called on nextTick, so that we don't294// emit 'drain' before the write() consumer gets the 'false' return295// value, and has a chance to attach a 'drain' listener.296function onwriteDrain(stream, state) {297if (state.length === 0 && state.needDrain) {298state.needDrain = false;299stream.emit('drain');300}301}302303304// if there's something in the buffer waiting, then process it305function clearBuffer(stream, state) {306state.bufferProcessing = true;307308for (var c = 0; c < state.buffer.length; c++) {309var entry = state.buffer[c];310var chunk = entry.chunk;311var encoding = entry.encoding;312var cb = entry.callback;313var len = state.objectMode ? 1 : chunk.length;314315doWrite(stream, state, len, chunk, encoding, cb);316317// if we didn't call the onwrite immediately, then318// it means that we need to wait until it does.319// also, that means that the chunk and cb are currently320// being processed, so move the buffer counter past them.321if (state.writing) {322c++;323break;324}325}326327state.bufferProcessing = false;328if (c < state.buffer.length)329state.buffer = state.buffer.slice(c);330else331state.buffer.length = 0;332}333334Writable.prototype._write = function(chunk, encoding, cb) {335cb(new Error('not implemented'));336};337338Writable.prototype.end = function(chunk, encoding, cb) {339var state = this._writableState;340341if (typeof chunk === 'function') {342cb = chunk;343chunk = null;344encoding = null;345} else if (typeof encoding === 'function') {346cb = encoding;347encoding = null;348}349350if (typeof chunk !== 'undefined' && chunk !== null)351this.write(chunk, encoding);352353// ignore unnecessary end() calls.354if (!state.ending && !state.finished)355endWritable(this, state, cb);356};357358359function needFinish(stream, state) {360return (state.ending &&361state.length === 0 &&362!state.finished &&363!state.writing);364}365366function finishMaybe(stream, state) {367var need = needFinish(stream, state);368if (need) {369state.finished = true;370stream.emit('finish');371}372return need;373}374375function endWritable(stream, state, cb) {376state.ending = true;377finishMaybe(stream, state);378if (cb) {379if (state.finished)380process.nextTick(cb);381else382stream.once('finish', cb);383}384state.ended = true;385}386387388