Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Avatar for KuCalc : devops.
Download
50659 views
1
//filter will reemit the data if cb(err,pass) pass is truthy
2
// reduce is more tricky
3
// maybe we want to group the reductions or emit progress updates occasionally
4
// the most basic reduce just emits one 'data' event after it has recieved 'end'
5
6
7
var Stream = require('stream').Stream
8
, es = exports
9
10
es.Stream = Stream //re-export Stream from core
11
12
// writable stream, collects all events into an array
13
// and calls back when 'end' occurs
14
// mainly I'm using this to test the other functions
15
16
es.writeArray = function (done) {
17
if ('function' !== typeof done)
18
throw new Error('function writeArray (done): done must be function')
19
20
var a = new Stream ()
21
, array = []
22
a.write = function (l) {
23
array.push(l)
24
}
25
a.end = function () {
26
done(null, array)
27
}
28
a.writable = true
29
a.readable = false
30
return a
31
}
32
33
//return a Stream that reads the properties of an object
34
//respecting pause() and resume()
35
36
es.readArray = function (array) {
37
var stream = new Stream()
38
, i = 0
39
, paused = false
40
41
stream.readable = true
42
stream.writable = false
43
44
if(!Array.isArray(array))
45
throw new Error('event-stream.read expects an array')
46
47
stream.resume = function () {
48
paused = false
49
var l = array.length
50
while(i < l && !paused) {
51
stream.emit('data', array[i++])
52
}
53
if(i == l)
54
stream.emit('end'), stream.readible = false
55
}
56
process.nextTick(stream.resume)
57
stream.pause = function () {
58
paused = true
59
}
60
return stream
61
}
62
63
//
64
// readable (asyncFunction)
65
// return a stream that calls an async function while the stream is not paused.
66
//
67
// the function must take: (count, callback) {...
68
//
69
es.readable = function (func, continueOnError) {
70
var stream = new Stream()
71
, i = 0
72
, paused = false
73
, ended = false
74
, reading = false
75
76
stream.readable = true
77
stream.writable = false
78
79
if('function' !== typeof func)
80
throw new Error('event-stream.readable expects async function')
81
82
stream.on('end', function () { ended = true })
83
84
function get (err, data) {
85
86
if(err) {
87
stream.emit('error', err)
88
if(!continueOnError) stream.emit('end')
89
} else if (arguments.length > 1)
90
stream.emit('data', data)
91
92
process.nextTick(function () {
93
if(ended || paused || reading) return
94
try {
95
reading = true
96
func.call(stream, i++, function () {
97
reading = false
98
get.apply(null, arguments)
99
})
100
} catch (err) {
101
stream.emit('error', err)
102
}
103
})
104
105
}
106
stream.resume = function () {
107
paused = false
108
get()
109
}
110
process.nextTick(get)
111
stream.pause = function () {
112
paused = true
113
}
114
return stream
115
}
116
117
118
//create an event stream and apply function to each .write
119
//emitting each response as data
120
//unless it's an empty callback
121
122
es.map = function (mapper) {
123
var stream = new Stream()
124
, inputs = 0
125
, outputs = 0
126
, ended = false
127
, paused = false
128
stream.writable = true
129
stream.readable = true
130
131
stream.write = function () {
132
inputs ++
133
var args = [].slice.call(arguments)
134
, r
135
, inNext = false
136
function next (err) {
137
inNext = true
138
outputs ++
139
var args = [].slice.call(arguments)
140
if(err) {
141
args.unshift('error')
142
return inNext = false, stream.emit.apply(stream, args)
143
}
144
args.shift() //drop err
145
146
if (args.length){
147
args.unshift('data')
148
r = stream.emit.apply(stream, args)
149
}
150
if(inputs == outputs) {
151
if(paused) stream.emit('drain') //written all the incoming events
152
paused = false
153
if(ended)
154
stream.end()
155
}
156
inNext = false
157
}
158
args.push(next)
159
160
try {
161
//catch sync errors and handle them like async errors
162
var written = mapper.apply(null,args)
163
if(written === false) paused = true
164
return written
165
} catch (err) {
166
//if the callback has been called syncronously, and the error
167
//has occured in an listener, throw it again.
168
if(inNext)
169
throw err
170
next(err)
171
return true
172
}
173
}
174
175
stream.end = function () {
176
var args = [].slice.call(arguments)
177
//if end was called with args, write it,
178
ended = true //write will emit 'end' if ended is true
179
if(args.length)
180
return stream.write.apply(emitter, args)
181
else if (inputs == outputs) //wait for processing
182
stream.emit('end')
183
}
184
185
return stream
186
}
187
188
//
189
// map sync
190
//
191
192
es.mapSync = function (sync) {
193
194
return es.map(function () {
195
var args = [].slice.call(arguments)
196
, callback = args.pop()
197
198
callback(null, sync.apply(null, args))
199
})
200
}
201
202
//
203
// log just print out what is coming through the stream, for debugging
204
//
205
206
es.log = function (name) {
207
return es.map(function () {
208
var args = [].slice.call(arguments)
209
var cb = args.pop()
210
if(name) args.slice().unshift(name)
211
console.error.apply(null, args)
212
args.unshift(null)
213
cb.apply(null, args)
214
})
215
}
216
217
//
218
// combine multiple streams together so that they act as a single stream
219
//
220
221
es.pipe = es.connect = function () {
222
223
var streams = [].slice.call(arguments)
224
, first = streams[0]
225
, last = streams[streams.length - 1]
226
, thepipe = es.duplex(first, last)
227
228
if(streams.length == 1)
229
return streams[0]
230
else if (!streams.length)
231
throw new Error('connect called with empty args')
232
233
//pipe all the streams together
234
235
function recurse (streams) {
236
if(streams.length < 2)
237
return
238
streams[0].pipe(streams[1])
239
recurse(streams.slice(1))
240
}
241
242
recurse(streams)
243
244
function onerror () {
245
var args = [].slice.call(arguments)
246
args.unshift('error')
247
thepipe.emit.apply(thepipe, args)
248
}
249
250
streams.forEach(function (stream) {
251
stream.on('error', onerror)
252
})
253
254
return thepipe
255
}
256
257
//
258
// child -- pipe through a child process
259
//
260
261
es.child = function (child) {
262
263
return es.duplex(child.stdin, child.stdout)
264
265
}
266
267
//
268
// duplex -- pipe into one stream and out another
269
//
270
271
es.duplex = function (writer, reader) {
272
var thepipe = new Stream()
273
274
thepipe.__defineGetter__('writable', function () { return writer.writable })
275
thepipe.__defineGetter__('readable', function () { return reader.readable })
276
277
;['write', 'end', 'close'].forEach(function (func) {
278
thepipe[func] = function () {
279
return writer[func].apply(writer, arguments)
280
}
281
})
282
283
;['resume', 'pause'].forEach(function (func) {
284
thepipe[func] = function () {
285
thepipe.emit(func)
286
if(reader[func])
287
return reader[func].apply(reader, arguments)
288
else
289
reader.emit(func)
290
}
291
})
292
293
;['data', 'close'].forEach(function (event) {
294
reader.on(event, function () {
295
var args = [].slice.call(arguments)
296
args.unshift(event)
297
thepipe.emit.apply(thepipe, args)
298
})
299
})
300
//only emit end once
301
var ended = false
302
reader.on('end', function () {
303
if(ended) return
304
ended = true
305
var args = [].slice.call(arguments)
306
args.unshift('end')
307
thepipe.emit.apply(thepipe, args)
308
})
309
310
return thepipe
311
}
312
313
es.split = function (matcher) {
314
var stream = new Stream()
315
, soFar = ''
316
317
if (!matcher)
318
matcher = '\n'
319
320
stream.writable = true
321
stream.write = function (buffer) {
322
buffer = buffer.toString()
323
var l = buffer.length
324
, i = 0
325
while (i < l) {
326
var c = buffer[i].toString()
327
soFar += c
328
if (c == matcher) {
329
var n = soFar;
330
soFar = ''
331
this.emit('data', n)
332
}
333
i++
334
}
335
}
336
337
stream.end = function () {
338
stream.emit('data', soFar)
339
stream.emit('end')
340
}
341
342
return stream
343
}
344
345
//
346
// gate
347
//
348
// while the gate is shut(), buffer incoming.
349
//
350
// if gate is open() stream like normal.
351
//
352
// currently, when opened, this will emit all data unless it is shut again
353
// if downstream pauses it will still write, i'd like to make it respect pause,
354
// but i'll need a test case first.
355
356
es.gate = function (shut) {
357
358
var stream = new Stream()
359
, queue = []
360
, ended = false
361
362
shut = (shut === false ? false : true) //default to shut
363
// console.error('SHUT?', shut)
364
365
stream.writable = true
366
stream.readable = true
367
368
stream.isShut = function () { return shut }
369
stream.shut = function () { shut = true }
370
stream.open = function () { shut = false; maybe() }
371
372
function maybe () {
373
// console.error('maybe', queue.length, shut)
374
while(queue.length && !shut) {
375
var args = queue.shift()
376
args.unshift('data')
377
stream.emit.apply(stream, args)
378
}
379
stream.emit('drain')
380
if(ended && !shut)
381
stream.emit('end')
382
}
383
384
stream.write = function () {
385
var args = [].slice.call(arguments)
386
387
queue.push(args)
388
// console.error(queue)
389
if (shut) return false //pause up stream pipes
390
391
maybe()
392
}
393
394
stream.end = function () {
395
ended = true
396
if (!queue.length)
397
stream.emit('end')
398
}
399
400
return stream
401
}
402
403
//
404
// parse
405
//
406
407
es.parse = function () {
408
return es.mapSync(function (e){
409
return JSON.parse(e.toString())
410
})
411
}
412
//
413
// stringify
414
//
415
416
es.stringify = function () {
417
return es.mapSync(function (e){
418
return JSON.stringify(e) + '\n'
419
})
420
}
421
422
//
423
// helper to make your module into a unix pipe
424
// simply add
425
//
426
// if(!module.parent)
427
// require('event-stream').pipable(asyncFunctionOrStreams)
428
//
429
// asyncFunctionOrStreams may be one or more Streams or if it is a function,
430
// it will be automatically wrapped in es.map
431
//
432
// then pipe stuff into from the command line!
433
//
434
// curl registry.npmjs.org/event-stream | node hello-pipeable.js | grep whatever
435
//
436
// etc!
437
//
438
// also, start pipeable running as a server!
439
//
440
// > node hello-pipeable.js --port 44444
441
//
442
443
var setup = function (args) {
444
return args.map(function (f) {
445
var x = f()
446
if('function' === typeof x)
447
return es.map(x)
448
return x
449
})
450
}
451
452
es.pipeable = function () {
453
var opts = require('optimist').argv
454
var args = [].slice.call(arguments)
455
456
if(opts.h || opts.help) {
457
var name = process.argv[1]
458
console.error([
459
'Usage:',
460
'',
461
'node ' + name + ' [options]',
462
' --port PORT turn this stream into a server',
463
' --host HOST host of server (localhost is default)',
464
' --protocol protocol http|net will require(protocol).createServer(...',
465
' --help display this message',
466
'',
467
' if --port is not set, will stream input from stdin',
468
'',
469
'also, pipe from or to files:',
470
'',
471
' node '+name+ ' < file #pipe from file into this stream',
472
' node '+name+ ' < infile > outfile #pipe from file into this stream',
473
'',
474
].join('\n'))
475
476
} else if (!opts.port) {
477
var streams = setup(args)
478
streams.unshift(es.split())
479
//streams.unshift()
480
streams.push(process.stdout)
481
var c = es.connect.apply(null, streams)
482
process.openStdin().pipe(c) //there
483
return c
484
485
} else {
486
487
opts.host = opts.host || 'localhost'
488
opts.protocol = opts.protocol || 'http'
489
490
var protocol = require(opts.protocol)
491
492
var server = protocol.createServer(function (instream, outstream) {
493
var streams = setup(args)
494
streams.unshift(es.split())
495
streams.unshift(instream)
496
streams.push(outstream || instream)
497
es.pipe.apply(null, streams)
498
})
499
500
server.listen(opts.port, opts.host)
501
502
console.error(process.argv[1] +' is listening for "' + opts.protocol + '" on ' + opts.host + ':' + opts.port)
503
}
504
}
505
506