Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download
80580 views
1
// Copyright Joyent, Inc. and other Node contributors.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the
5
// "Software"), to deal in the Software without restriction, including
6
// without limitation the rights to use, copy, modify, merge, publish,
7
// distribute, sublicense, and/or sell copies of the Software, and to permit
8
// persons to whom the Software is furnished to do so, subject to the
9
// following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included
12
// in all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20
// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22
// A bit simpler than readable streams.
23
// Implement an async ._write(chunk, cb), and it'll handle all
24
// the drain event emission and buffering.
25
26
module.exports = Writable;
27
28
/*<replacement>*/
29
var Buffer = require('buffer').Buffer;
30
/*</replacement>*/
31
32
Writable.WritableState = WritableState;
33
34
35
/*<replacement>*/
36
var util = require('core-util-is');
37
util.inherits = require('inherits');
38
/*</replacement>*/
39
40
var Stream = require('stream');
41
42
util.inherits(Writable, Stream);
43
44
function WriteReq(chunk, encoding, cb) {
45
this.chunk = chunk;
46
this.encoding = encoding;
47
this.callback = cb;
48
}
49
50
function WritableState(options, stream) {
51
options = options || {};
52
53
// the point at which write() starts returning false
54
// Note: 0 is a valid value, means that we always return false if
55
// the entire buffer is not flushed immediately on write()
56
var hwm = options.highWaterMark;
57
this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
58
59
// object stream flag to indicate whether or not this stream
60
// contains buffers or objects.
61
this.objectMode = !!options.objectMode;
62
63
// cast to ints.
64
this.highWaterMark = ~~this.highWaterMark;
65
66
this.needDrain = false;
67
// at the start of calling end()
68
this.ending = false;
69
// when end() has been called, and returned
70
this.ended = false;
71
// when 'finish' is emitted
72
this.finished = false;
73
74
// should we decode strings into buffers before passing to _write?
75
// this is here so that some node-core streams can optimize string
76
// handling at a lower level.
77
var noDecode = options.decodeStrings === false;
78
this.decodeStrings = !noDecode;
79
80
// Crypto is kind of old and crusty. Historically, its default string
81
// encoding is 'binary' so we have to make this configurable.
82
// Everything else in the universe uses 'utf8', though.
83
this.defaultEncoding = options.defaultEncoding || 'utf8';
84
85
// not an actual buffer we keep track of, but a measurement
86
// of how much we're waiting to get pushed to some underlying
87
// socket or file.
88
this.length = 0;
89
90
// a flag to see when we're in the middle of a write.
91
this.writing = false;
92
93
// a flag to be able to tell if the onwrite cb is called immediately,
94
// or on a later tick. We set this to true at first, becuase any
95
// actions that shouldn't happen until "later" should generally also
96
// not happen before the first write call.
97
this.sync = true;
98
99
// a flag to know if we're processing previously buffered items, which
100
// may call the _write() callback in the same tick, so that we don't
101
// end up in an overlapped onwrite situation.
102
this.bufferProcessing = false;
103
104
// the callback that's passed to _write(chunk,cb)
105
this.onwrite = function(er) {
106
onwrite(stream, er);
107
};
108
109
// the callback that the user supplies to write(chunk,encoding,cb)
110
this.writecb = null;
111
112
// the amount that is being written when _write is called.
113
this.writelen = 0;
114
115
this.buffer = [];
116
117
// True if the error was already emitted and should not be thrown again
118
this.errorEmitted = false;
119
}
120
121
function Writable(options) {
122
var Duplex = require('./_stream_duplex');
123
124
// Writable ctor is applied to Duplexes, though they're not
125
// instanceof Writable, they're instanceof Readable.
126
if (!(this instanceof Writable) && !(this instanceof Duplex))
127
return new Writable(options);
128
129
this._writableState = new WritableState(options, this);
130
131
// legacy.
132
this.writable = true;
133
134
Stream.call(this);
135
}
136
137
// Otherwise people can pipe Writable streams, which is just wrong.
138
Writable.prototype.pipe = function() {
139
this.emit('error', new Error('Cannot pipe. Not readable.'));
140
};
141
142
143
function writeAfterEnd(stream, state, cb) {
144
var er = new Error('write after end');
145
// TODO: defer error events consistently everywhere, not just the cb
146
stream.emit('error', er);
147
process.nextTick(function() {
148
cb(er);
149
});
150
}
151
152
// If we get something that is not a buffer, string, null, or undefined,
153
// and we're not in objectMode, then that's an error.
154
// Otherwise stream chunks are all considered to be of length=1, and the
155
// watermarks determine how many objects to keep in the buffer, rather than
156
// how many bytes or characters.
157
function validChunk(stream, state, chunk, cb) {
158
var valid = true;
159
if (!Buffer.isBuffer(chunk) &&
160
'string' !== typeof chunk &&
161
chunk !== null &&
162
chunk !== undefined &&
163
!state.objectMode) {
164
var er = new TypeError('Invalid non-string/buffer chunk');
165
stream.emit('error', er);
166
process.nextTick(function() {
167
cb(er);
168
});
169
valid = false;
170
}
171
return valid;
172
}
173
174
Writable.prototype.write = function(chunk, encoding, cb) {
175
var state = this._writableState;
176
var ret = false;
177
178
if (typeof encoding === 'function') {
179
cb = encoding;
180
encoding = null;
181
}
182
183
if (Buffer.isBuffer(chunk))
184
encoding = 'buffer';
185
else if (!encoding)
186
encoding = state.defaultEncoding;
187
188
if (typeof cb !== 'function')
189
cb = function() {};
190
191
if (state.ended)
192
writeAfterEnd(this, state, cb);
193
else if (validChunk(this, state, chunk, cb))
194
ret = writeOrBuffer(this, state, chunk, encoding, cb);
195
196
return ret;
197
};
198
199
function decodeChunk(state, chunk, encoding) {
200
if (!state.objectMode &&
201
state.decodeStrings !== false &&
202
typeof chunk === 'string') {
203
chunk = new Buffer(chunk, encoding);
204
}
205
return chunk;
206
}
207
208
// if we're already writing something, then just put this
209
// in the queue, and wait our turn. Otherwise, call _write
210
// If we return false, then we need a drain event, so set that flag.
211
function writeOrBuffer(stream, state, chunk, encoding, cb) {
212
chunk = decodeChunk(state, chunk, encoding);
213
if (Buffer.isBuffer(chunk))
214
encoding = 'buffer';
215
var len = state.objectMode ? 1 : chunk.length;
216
217
state.length += len;
218
219
var ret = state.length < state.highWaterMark;
220
// we must ensure that previous needDrain will not be reset to false.
221
if (!ret)
222
state.needDrain = true;
223
224
if (state.writing)
225
state.buffer.push(new WriteReq(chunk, encoding, cb));
226
else
227
doWrite(stream, state, len, chunk, encoding, cb);
228
229
return ret;
230
}
231
232
function doWrite(stream, state, len, chunk, encoding, cb) {
233
state.writelen = len;
234
state.writecb = cb;
235
state.writing = true;
236
state.sync = true;
237
stream._write(chunk, encoding, state.onwrite);
238
state.sync = false;
239
}
240
241
function onwriteError(stream, state, sync, er, cb) {
242
if (sync)
243
process.nextTick(function() {
244
cb(er);
245
});
246
else
247
cb(er);
248
249
stream._writableState.errorEmitted = true;
250
stream.emit('error', er);
251
}
252
253
function onwriteStateUpdate(state) {
254
state.writing = false;
255
state.writecb = null;
256
state.length -= state.writelen;
257
state.writelen = 0;
258
}
259
260
function onwrite(stream, er) {
261
var state = stream._writableState;
262
var sync = state.sync;
263
var cb = state.writecb;
264
265
onwriteStateUpdate(state);
266
267
if (er)
268
onwriteError(stream, state, sync, er, cb);
269
else {
270
// Check if we're actually ready to finish, but don't emit yet
271
var finished = needFinish(stream, state);
272
273
if (!finished && !state.bufferProcessing && state.buffer.length)
274
clearBuffer(stream, state);
275
276
if (sync) {
277
process.nextTick(function() {
278
afterWrite(stream, state, finished, cb);
279
});
280
} else {
281
afterWrite(stream, state, finished, cb);
282
}
283
}
284
}
285
286
function afterWrite(stream, state, finished, cb) {
287
if (!finished)
288
onwriteDrain(stream, state);
289
cb();
290
if (finished)
291
finishMaybe(stream, state);
292
}
293
294
// Must force callback to be called on nextTick, so that we don't
295
// emit 'drain' before the write() consumer gets the 'false' return
296
// value, and has a chance to attach a 'drain' listener.
297
function onwriteDrain(stream, state) {
298
if (state.length === 0 && state.needDrain) {
299
state.needDrain = false;
300
stream.emit('drain');
301
}
302
}
303
304
305
// if there's something in the buffer waiting, then process it
306
function clearBuffer(stream, state) {
307
state.bufferProcessing = true;
308
309
for (var c = 0; c < state.buffer.length; c++) {
310
var entry = state.buffer[c];
311
var chunk = entry.chunk;
312
var encoding = entry.encoding;
313
var cb = entry.callback;
314
var len = state.objectMode ? 1 : chunk.length;
315
316
doWrite(stream, state, len, chunk, encoding, cb);
317
318
// if we didn't call the onwrite immediately, then
319
// it means that we need to wait until it does.
320
// also, that means that the chunk and cb are currently
321
// being processed, so move the buffer counter past them.
322
if (state.writing) {
323
c++;
324
break;
325
}
326
}
327
328
state.bufferProcessing = false;
329
if (c < state.buffer.length)
330
state.buffer = state.buffer.slice(c);
331
else
332
state.buffer.length = 0;
333
}
334
335
Writable.prototype._write = function(chunk, encoding, cb) {
336
cb(new Error('not implemented'));
337
};
338
339
Writable.prototype.end = function(chunk, encoding, cb) {
340
var state = this._writableState;
341
342
if (typeof chunk === 'function') {
343
cb = chunk;
344
chunk = null;
345
encoding = null;
346
} else if (typeof encoding === 'function') {
347
cb = encoding;
348
encoding = null;
349
}
350
351
if (typeof chunk !== 'undefined' && chunk !== null)
352
this.write(chunk, encoding);
353
354
// ignore unnecessary end() calls.
355
if (!state.ending && !state.finished)
356
endWritable(this, state, cb);
357
};
358
359
360
function needFinish(stream, state) {
361
return (state.ending &&
362
state.length === 0 &&
363
!state.finished &&
364
!state.writing);
365
}
366
367
function finishMaybe(stream, state) {
368
var need = needFinish(stream, state);
369
if (need) {
370
state.finished = true;
371
stream.emit('finish');
372
}
373
return need;
374
}
375
376
function endWritable(stream, state, cb) {
377
state.ending = true;
378
finishMaybe(stream, state);
379
if (cb) {
380
if (state.finished)
381
process.nextTick(cb);
382
else
383
stream.once('finish', cb);
384
}
385
state.ended = true;
386
}
387
388