123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325 |
- //filter will reemit the data if cb(err,pass) pass is truthy
- // reduce is more tricky
- // maybe we want to group the reductions or emit progress updates occasionally
- // the most basic reduce just emits one 'data' event after it has recieved 'end'
- var Stream = require('stream').Stream
- , es = exports
- , through = require('through')
- , from = require('from')
- , duplex = require('duplexer')
- , map = require('map-stream')
- , pause = require('pause-stream')
- , split = require('split')
- , pipeline = require('stream-combiner')
- , immediately = global.setImmediate || process.nextTick;
- es.Stream = Stream //re-export Stream from core
- es.through = through
- es.from = from
- es.duplex = duplex
- es.map = map
- es.pause = pause
- es.split = split
- es.pipeline = es.connect = es.pipe = pipeline
- // merge / concat
- //
- // combine multiple streams into a single stream.
- // will emit end only once
- es.concat = //actually this should be called concat
- es.merge = function (/*streams...*/) {
- var toMerge = [].slice.call(arguments)
- if (toMerge.length === 1 && (toMerge[0] instanceof Array)) {
- toMerge = toMerge[0] //handle array as arguments object
- }
- var stream = new Stream()
- stream.setMaxListeners(0) // allow adding more than 11 streams
- var endCount = 0
- stream.writable = stream.readable = true
- if (toMerge.length) {
- toMerge.forEach(function (e) {
- e.pipe(stream, {end: false})
- var ended = false
- e.on('end', function () {
- if(ended) return
- ended = true
- endCount ++
- if(endCount == toMerge.length)
- stream.emit('end')
- })
- })
- } else {
- process.nextTick(function () {
- stream.emit('end')
- })
- }
-
- stream.write = function (data) {
- this.emit('data', data)
- }
- stream.destroy = function () {
- toMerge.forEach(function (e) {
- if(e.destroy) e.destroy()
- })
- }
- return stream
- }
- // writable stream, collects all events into an array
- // and calls back when 'end' occurs
- // mainly I'm using this to test the other functions
- es.writeArray = function (done) {
- if ('function' !== typeof done)
- throw new Error('function writeArray (done): done must be function')
- var a = new Stream ()
- , array = [], isDone = false
- a.write = function (l) {
- array.push(l)
- }
- a.end = function () {
- isDone = true
- done(null, array)
- }
- a.writable = true
- a.readable = false
- a.destroy = function () {
- a.writable = a.readable = false
- if(isDone) return
- done(new Error('destroyed before end'), array)
- }
- return a
- }
- //return a Stream that reads the properties of an object
- //respecting pause() and resume()
- es.readArray = function (array) {
- var stream = new Stream()
- , i = 0
- , paused = false
- , ended = false
- stream.readable = true
- stream.writable = false
- if(!Array.isArray(array))
- throw new Error('event-stream.read expects an array')
- stream.resume = function () {
- if(ended) return
- paused = false
- var l = array.length
- while(i < l && !paused && !ended) {
- stream.emit('data', array[i++])
- }
- if(i == l && !ended)
- ended = true, stream.readable = false, stream.emit('end')
- }
- process.nextTick(stream.resume)
- stream.pause = function () {
- paused = true
- }
- stream.destroy = function () {
- ended = true
- stream.emit('close')
- }
- return stream
- }
- //
- // readable (asyncFunction)
- // return a stream that calls an async function while the stream is not paused.
- //
- // the function must take: (count, callback) {...
- //
- es.readable =
- function (func, continueOnError) {
- var stream = new Stream()
- , i = 0
- , paused = false
- , ended = false
- , reading = false
- stream.readable = true
- stream.writable = false
- if('function' !== typeof func)
- throw new Error('event-stream.readable expects async function')
- stream.on('end', function () { ended = true })
- function get (err, data) {
- if(err) {
- stream.emit('error', err)
- if(!continueOnError) stream.emit('end')
- } else if (arguments.length > 1)
- stream.emit('data', data)
- immediately(function () {
- if(ended || paused || reading) return
- try {
- reading = true
- func.call(stream, i++, function () {
- reading = false
- get.apply(null, arguments)
- })
- } catch (err) {
- stream.emit('error', err)
- }
- })
- }
- stream.resume = function () {
- paused = false
- get()
- }
- process.nextTick(get)
- stream.pause = function () {
- paused = true
- }
- stream.destroy = function () {
- stream.emit('end')
- stream.emit('close')
- ended = true
- }
- return stream
- }
- //
- // map sync
- //
- es.mapSync = function (sync) {
- return es.through(function write(data) {
- var mappedData
- try {
- mappedData = sync(data)
- } catch (err) {
- return this.emit('error', err)
- }
- if (mappedData !== undefined)
- this.emit('data', mappedData)
- })
- }
- //
- // log just print out what is coming through the stream, for debugging
- //
- es.log = function (name) {
- return es.through(function (data) {
- var args = [].slice.call(arguments)
- if(name) console.error(name, data)
- else console.error(data)
- this.emit('data', data)
- })
- }
- //
- // child -- pipe through a child process
- //
- es.child = function (child) {
- return es.duplex(child.stdin, child.stdout)
- }
- //
- // parse
- //
- // must be used after es.split() to ensure that each chunk represents a line
- // source.pipe(es.split()).pipe(es.parse())
- es.parse = function (options) {
- var emitError = !!(options ? options.error : false)
- return es.through(function (data) {
- var obj
- try {
- if(data) //ignore empty lines
- obj = JSON.parse(data.toString())
- } catch (err) {
- if (emitError)
- return this.emit('error', err)
- return console.error(err, 'attempting to parse:', data)
- }
- //ignore lines that where only whitespace.
- if(obj !== undefined)
- this.emit('data', obj)
- })
- }
- //
- // stringify
- //
- es.stringify = function () {
- var Buffer = require('buffer').Buffer
- return es.mapSync(function (e){
- return JSON.stringify(Buffer.isBuffer(e) ? e.toString() : e) + '\n'
- })
- }
- //
- // replace a string within a stream.
- //
- // warn: just concatenates the string and then does str.split().join().
- // probably not optimal.
- // for smallish responses, who cares?
- // I need this for shadow-npm so it's only relatively small json files.
- es.replace = function (from, to) {
- return es.pipeline(es.split(from), es.join(to))
- }
- //
- // join chunks with a joiner. just like Array#join
- // also accepts a callback that is passed the chunks appended together
- // this is still supported for legacy reasons.
- //
- es.join = function (str) {
- //legacy api
- if('function' === typeof str)
- return es.wait(str)
- var first = true
- return es.through(function (data) {
- if(!first)
- this.emit('data', str)
- first = false
- this.emit('data', data)
- return true
- })
- }
- //
- // wait. callback when 'end' is emitted, with all chunks appended as string.
- //
- es.wait = function (callback) {
- var arr = []
- return es.through(function (data) { arr.push(data) },
- function () {
- var body = Buffer.isBuffer(arr[0]) ? Buffer.concat(arr)
- : arr.join('')
- this.emit('data', body)
- this.emit('end')
- if(callback) callback(null, body)
- })
- }
- es.pipeable = function () {
- throw new Error('[EVENT-STREAM] es.pipeable is deprecated')
- }
|