Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download
80537 views
1
// Copyright Joyent, Inc. and other Node contributors.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the
5
// "Software"), to deal in the Software without restriction, including
6
// without limitation the rights to use, copy, modify, merge, publish,
7
// distribute, sublicense, and/or sell copies of the Software, and to permit
8
// persons to whom the Software is furnished to do so, subject to the
9
// following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included
12
// in all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20
// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22
// A bit simpler than readable streams.
23
// Implement an async ._write(chunk, cb), and it'll handle all
24
// the drain event emission and buffering.
25
26
module.exports = Writable;
27
28
/*<replacement>*/
29
var Buffer = require('buffer').Buffer;
30
/*</replacement>*/
31
32
Writable.WritableState = WritableState;
33
34
35
/*<replacement>*/
36
var util = require('core-util-is');
37
util.inherits = require('inherits');
38
/*</replacement>*/
39
40
var Stream = require('stream');
41
42
util.inherits(Writable, Stream);
43
44
function WriteReq(chunk, encoding, cb) {
45
this.chunk = chunk;
46
this.encoding = encoding;
47
this.callback = cb;
48
}
49
50
function WritableState(options, stream) {
51
var Duplex = require('./_stream_duplex');
52
53
options = options || {};
54
55
// the point at which write() starts returning false
56
// Note: 0 is a valid value, means that we always return false if
57
// the entire buffer is not flushed immediately on write()
58
var hwm = options.highWaterMark;
59
var defaultHwm = options.objectMode ? 16 : 16 * 1024;
60
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
61
62
// object stream flag to indicate whether or not this stream
63
// contains buffers or objects.
64
this.objectMode = !!options.objectMode;
65
66
if (stream instanceof Duplex)
67
this.objectMode = this.objectMode || !!options.writableObjectMode;
68
69
// cast to ints.
70
this.highWaterMark = ~~this.highWaterMark;
71
72
this.needDrain = false;
73
// at the start of calling end()
74
this.ending = false;
75
// when end() has been called, and returned
76
this.ended = false;
77
// when 'finish' is emitted
78
this.finished = false;
79
80
// should we decode strings into buffers before passing to _write?
81
// this is here so that some node-core streams can optimize string
82
// handling at a lower level.
83
var noDecode = options.decodeStrings === false;
84
this.decodeStrings = !noDecode;
85
86
// Crypto is kind of old and crusty. Historically, its default string
87
// encoding is 'binary' so we have to make this configurable.
88
// Everything else in the universe uses 'utf8', though.
89
this.defaultEncoding = options.defaultEncoding || 'utf8';
90
91
// not an actual buffer we keep track of, but a measurement
92
// of how much we're waiting to get pushed to some underlying
93
// socket or file.
94
this.length = 0;
95
96
// a flag to see when we're in the middle of a write.
97
this.writing = false;
98
99
// when true all writes will be buffered until .uncork() call
100
this.corked = 0;
101
102
// a flag to be able to tell if the onwrite cb is called immediately,
103
// or on a later tick. We set this to true at first, because any
104
// actions that shouldn't happen until "later" should generally also
105
// not happen before the first write call.
106
this.sync = true;
107
108
// a flag to know if we're processing previously buffered items, which
109
// may call the _write() callback in the same tick, so that we don't
110
// end up in an overlapped onwrite situation.
111
this.bufferProcessing = false;
112
113
// the callback that's passed to _write(chunk,cb)
114
this.onwrite = function(er) {
115
onwrite(stream, er);
116
};
117
118
// the callback that the user supplies to write(chunk,encoding,cb)
119
this.writecb = null;
120
121
// the amount that is being written when _write is called.
122
this.writelen = 0;
123
124
this.buffer = [];
125
126
// number of pending user-supplied write callbacks
127
// this must be 0 before 'finish' can be emitted
128
this.pendingcb = 0;
129
130
// emit prefinish if the only thing we're waiting for is _write cbs
131
// This is relevant for synchronous Transform streams
132
this.prefinished = false;
133
134
// True if the error was already emitted and should not be thrown again
135
this.errorEmitted = false;
136
}
137
138
function Writable(options) {
139
var Duplex = require('./_stream_duplex');
140
141
// Writable ctor is applied to Duplexes, though they're not
142
// instanceof Writable, they're instanceof Readable.
143
if (!(this instanceof Writable) && !(this instanceof Duplex))
144
return new Writable(options);
145
146
this._writableState = new WritableState(options, this);
147
148
// legacy.
149
this.writable = true;
150
151
Stream.call(this);
152
}
153
154
// Otherwise people can pipe Writable streams, which is just wrong.
155
Writable.prototype.pipe = function() {
156
this.emit('error', new Error('Cannot pipe. Not readable.'));
157
};
158
159
160
function writeAfterEnd(stream, state, cb) {
161
var er = new Error('write after end');
162
// TODO: defer error events consistently everywhere, not just the cb
163
stream.emit('error', er);
164
process.nextTick(function() {
165
cb(er);
166
});
167
}
168
169
// If we get something that is not a buffer, string, null, or undefined,
170
// and we're not in objectMode, then that's an error.
171
// Otherwise stream chunks are all considered to be of length=1, and the
172
// watermarks determine how many objects to keep in the buffer, rather than
173
// how many bytes or characters.
174
function validChunk(stream, state, chunk, cb) {
175
var valid = true;
176
if (!util.isBuffer(chunk) &&
177
!util.isString(chunk) &&
178
!util.isNullOrUndefined(chunk) &&
179
!state.objectMode) {
180
var er = new TypeError('Invalid non-string/buffer chunk');
181
stream.emit('error', er);
182
process.nextTick(function() {
183
cb(er);
184
});
185
valid = false;
186
}
187
return valid;
188
}
189
190
Writable.prototype.write = function(chunk, encoding, cb) {
191
var state = this._writableState;
192
var ret = false;
193
194
if (util.isFunction(encoding)) {
195
cb = encoding;
196
encoding = null;
197
}
198
199
if (util.isBuffer(chunk))
200
encoding = 'buffer';
201
else if (!encoding)
202
encoding = state.defaultEncoding;
203
204
if (!util.isFunction(cb))
205
cb = function() {};
206
207
if (state.ended)
208
writeAfterEnd(this, state, cb);
209
else if (validChunk(this, state, chunk, cb)) {
210
state.pendingcb++;
211
ret = writeOrBuffer(this, state, chunk, encoding, cb);
212
}
213
214
return ret;
215
};
216
217
Writable.prototype.cork = function() {
218
var state = this._writableState;
219
220
state.corked++;
221
};
222
223
Writable.prototype.uncork = function() {
224
var state = this._writableState;
225
226
if (state.corked) {
227
state.corked--;
228
229
if (!state.writing &&
230
!state.corked &&
231
!state.finished &&
232
!state.bufferProcessing &&
233
state.buffer.length)
234
clearBuffer(this, state);
235
}
236
};
237
238
function decodeChunk(state, chunk, encoding) {
239
if (!state.objectMode &&
240
state.decodeStrings !== false &&
241
util.isString(chunk)) {
242
chunk = new Buffer(chunk, encoding);
243
}
244
return chunk;
245
}
246
247
// if we're already writing something, then just put this
248
// in the queue, and wait our turn. Otherwise, call _write
249
// If we return false, then we need a drain event, so set that flag.
250
function writeOrBuffer(stream, state, chunk, encoding, cb) {
251
chunk = decodeChunk(state, chunk, encoding);
252
if (util.isBuffer(chunk))
253
encoding = 'buffer';
254
var len = state.objectMode ? 1 : chunk.length;
255
256
state.length += len;
257
258
var ret = state.length < state.highWaterMark;
259
// we must ensure that previous needDrain will not be reset to false.
260
if (!ret)
261
state.needDrain = true;
262
263
if (state.writing || state.corked)
264
state.buffer.push(new WriteReq(chunk, encoding, cb));
265
else
266
doWrite(stream, state, false, len, chunk, encoding, cb);
267
268
return ret;
269
}
270
271
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
272
state.writelen = len;
273
state.writecb = cb;
274
state.writing = true;
275
state.sync = true;
276
if (writev)
277
stream._writev(chunk, state.onwrite);
278
else
279
stream._write(chunk, encoding, state.onwrite);
280
state.sync = false;
281
}
282
283
function onwriteError(stream, state, sync, er, cb) {
284
if (sync)
285
process.nextTick(function() {
286
state.pendingcb--;
287
cb(er);
288
});
289
else {
290
state.pendingcb--;
291
cb(er);
292
}
293
294
stream._writableState.errorEmitted = true;
295
stream.emit('error', er);
296
}
297
298
function onwriteStateUpdate(state) {
299
state.writing = false;
300
state.writecb = null;
301
state.length -= state.writelen;
302
state.writelen = 0;
303
}
304
305
function onwrite(stream, er) {
306
var state = stream._writableState;
307
var sync = state.sync;
308
var cb = state.writecb;
309
310
onwriteStateUpdate(state);
311
312
if (er)
313
onwriteError(stream, state, sync, er, cb);
314
else {
315
// Check if we're actually ready to finish, but don't emit yet
316
var finished = needFinish(stream, state);
317
318
if (!finished &&
319
!state.corked &&
320
!state.bufferProcessing &&
321
state.buffer.length) {
322
clearBuffer(stream, state);
323
}
324
325
if (sync) {
326
process.nextTick(function() {
327
afterWrite(stream, state, finished, cb);
328
});
329
} else {
330
afterWrite(stream, state, finished, cb);
331
}
332
}
333
}
334
335
function afterWrite(stream, state, finished, cb) {
336
if (!finished)
337
onwriteDrain(stream, state);
338
state.pendingcb--;
339
cb();
340
finishMaybe(stream, state);
341
}
342
343
// Must force callback to be called on nextTick, so that we don't
344
// emit 'drain' before the write() consumer gets the 'false' return
345
// value, and has a chance to attach a 'drain' listener.
346
function onwriteDrain(stream, state) {
347
if (state.length === 0 && state.needDrain) {
348
state.needDrain = false;
349
stream.emit('drain');
350
}
351
}
352
353
354
// if there's something in the buffer waiting, then process it
355
function clearBuffer(stream, state) {
356
state.bufferProcessing = true;
357
358
if (stream._writev && state.buffer.length > 1) {
359
// Fast case, write everything using _writev()
360
var cbs = [];
361
for (var c = 0; c < state.buffer.length; c++)
362
cbs.push(state.buffer[c].callback);
363
364
// count the one we are adding, as well.
365
// TODO(isaacs) clean this up
366
state.pendingcb++;
367
doWrite(stream, state, true, state.length, state.buffer, '', function(err) {
368
for (var i = 0; i < cbs.length; i++) {
369
state.pendingcb--;
370
cbs[i](err);
371
}
372
});
373
374
// Clear buffer
375
state.buffer = [];
376
} else {
377
// Slow case, write chunks one-by-one
378
for (var c = 0; c < state.buffer.length; c++) {
379
var entry = state.buffer[c];
380
var chunk = entry.chunk;
381
var encoding = entry.encoding;
382
var cb = entry.callback;
383
var len = state.objectMode ? 1 : chunk.length;
384
385
doWrite(stream, state, false, len, chunk, encoding, cb);
386
387
// if we didn't call the onwrite immediately, then
388
// it means that we need to wait until it does.
389
// also, that means that the chunk and cb are currently
390
// being processed, so move the buffer counter past them.
391
if (state.writing) {
392
c++;
393
break;
394
}
395
}
396
397
if (c < state.buffer.length)
398
state.buffer = state.buffer.slice(c);
399
else
400
state.buffer.length = 0;
401
}
402
403
state.bufferProcessing = false;
404
}
405
406
Writable.prototype._write = function(chunk, encoding, cb) {
407
cb(new Error('not implemented'));
408
409
};
410
411
Writable.prototype._writev = null;
412
413
Writable.prototype.end = function(chunk, encoding, cb) {
414
var state = this._writableState;
415
416
if (util.isFunction(chunk)) {
417
cb = chunk;
418
chunk = null;
419
encoding = null;
420
} else if (util.isFunction(encoding)) {
421
cb = encoding;
422
encoding = null;
423
}
424
425
if (!util.isNullOrUndefined(chunk))
426
this.write(chunk, encoding);
427
428
// .end() fully uncorks
429
if (state.corked) {
430
state.corked = 1;
431
this.uncork();
432
}
433
434
// ignore unnecessary end() calls.
435
if (!state.ending && !state.finished)
436
endWritable(this, state, cb);
437
};
438
439
440
function needFinish(stream, state) {
441
return (state.ending &&
442
state.length === 0 &&
443
!state.finished &&
444
!state.writing);
445
}
446
447
function prefinish(stream, state) {
448
if (!state.prefinished) {
449
state.prefinished = true;
450
stream.emit('prefinish');
451
}
452
}
453
454
function finishMaybe(stream, state) {
455
var need = needFinish(stream, state);
456
if (need) {
457
if (state.pendingcb === 0) {
458
prefinish(stream, state);
459
state.finished = true;
460
stream.emit('finish');
461
} else
462
prefinish(stream, state);
463
}
464
return need;
465
}
466
467
function endWritable(stream, state, cb) {
468
state.ending = true;
469
finishMaybe(stream, state);
470
if (cb) {
471
if (state.finished)
472
process.nextTick(cb);
473
else
474
stream.once('finish', cb);
475
}
476
state.ended = true;
477
}
478
479