码迷,mamicode.com
首页 > 其他好文 > 详细

kumavis/obj-multiplex

时间:2018-11-07 17:11:17      阅读:186      评论:0      收藏:0      [点我收藏+]

标签:super   must   rto   https   timeout   col   eth   ini   written   

https://github.com/kumavis/obj-multiplex

 

obj-multiplex多路复用

simple stream multiplexing for objectMode

其实就是一个多路复用流能够使用name来区分各个子流,以达到一个parent流下其实有多个子流在运行,可以通过多个子流来读入写出数据,效率更高。而且parent流结束了,则所有子流也会被销毁

usage

// create multiplexer
const mux = new ObjMultiplex()

// setup substreams
const streamA = mux.createStream(hello)
const streamB = mux.createStream(world)

// pipe over transport (and back)
mux.pipe(transport).pipe(mux)

// send values over the substreams
streamA.write({ thisIsAn: object })
streamA.write(123)

// or pipe together normally
streamB.pipe(evilAiBrain).pipe(streamB)

 

 obj-multiplex/index.js

const { Duplex } = require(readable-stream)
const endOfStream = require(end-of-stream)//看博客mafintosh/end-of-stream
const once = require(once)
const noop = () => {}

const IGNORE_SUBSTREAM = {}


class ObjectMultiplex extends Duplex {

  constructor(_opts = {}) {
    const opts = Object.assign({}, _opts, {
      objectMode: true,//流可传各类形式数据
    })
    super(opts)//生成这个流

    this._substreams = {}
  }

  createStream (name) {//就是创建两个流,一个是这个流,另一个是parent是这个流的一个子流,并返回子流
    // validate name
    if (!name) throw new Error(ObjectMultiplex - name must not be empty)//name不能为空
    if (this._substreams[name]) throw new Error(ObjectMultiplex - Substream for name "${name}" already exists)//name不能重复

    // create substream
    const substream = new Substream({ parent: this, name: name })
    this._substreams[name] = substream

    // listen for parent stream to end
    anyStreamEnd(this, (err) => {//定义当parent流结束,则相应的所有子流也要被销毁
      substream.destroy(err)//substream被destroy,如果出错返回的错误信息即err
    })

    return substream
  }

  // ignore streams (dont display orphaned data warning)
  ignoreStream (name) {//就是将之前创建的name的子流的内容清空
    // validate name
    if (!name) throw new Error(ObjectMultiplex - name must not be empty)
    if (this._substreams[name]) throw new Error(ObjectMultiplex - Substream for name "${name}" already exists)
    // set
    this._substreams[name] = IGNORE_SUBSTREAM
  }

  // stream plumbing
  //下面就是parent流能够做的一系列读写操作
  _read () {}

  _write(chunk, encoding, callback) {//当调用 writable.write(chunk) 时,数据会被缓冲在可写流中。
    // parse message,就是当parent流write时,将根据其传入的name来决定该数据是写到哪个子流上的
    const name = chunk.name
    const data = chunk.data
    if (!name) {//name不能为空,否则不知道是哪个子流
      console.warn(`ObjectMultiplex - malformed chunk without name "${chunk}"`)
      return callback()
    }

    // get corresponding substream
    const substream = this._substreams[name]//然后根据name得到子流
    if (!substream) {//如果为空则warn
      console.warn(`ObjectMultiplex - orphaned data for stream "${name}"`)
      return callback()
    }

    // push data into substream
    if (substream !== IGNORE_SUBSTREAM) {//只有当子流不为{}时,才将data压入
      substream.push(data)//当调用 stream.push(chunk) 时,数据会被缓冲在可读流中。 如果流的消费者没有调用 stream.read(),则数据会保留在内部队列中直到被消费
    }

    callback()
  }//_write

}//class


class Substream extends Duplex {

  constructor ({ parent, name }) {
    super({
      objectMode: true,
    })

    this._parent = parent
    this._name = name
  }

  _read () {}//读入的操作即Duplex的定义

  _write (chunk, enc, callback) {//当子流被写入时,其实是将数据压入流parent中
    this._parent.push({
      name: this._name,
      data: chunk,
    })
    callback()//然后调用回调函数
  }

}

module.exports = ObjectMultiplex

// util

function anyStreamEnd(stream, _cb) {//就是当stream结束的时候就会调用cb回调函数
  const cb = once(_cb)
  endOfStream(stream, { readable: false }, cb)
  endOfStream(stream, { writable: false }, cb)
}

 

通过测试学习该库的使用:

obj-multiplex/test/index.js

const test = require(tape)
const once = require(once)
const { PassThrough, Transform } = require(readable-stream)//PassThrough本质也是Transform流,是最简单的Transform流,只是将数据从此处传过
// a passthrough stream.
// basically just the most minimal sort of Transform stream.
// Every written chunk gets output as-is
const endOfStream = require(end-of-stream) const pump = require(pump) const ObjMultiplex = require(../index.js) test(basic - string, (t) => { t.plan(2) const { inTransport, outTransport, inMux, outMux, inStream, outStream, } = basicTestSetup() bufferToEnd(outStream, (err, results) => { t.error(err, should not error) t.deepEqual(results, [haay, wuurl], results should match) t.end() }) // pass in messages inStream.write(haay) inStream.write(wuurl) // simulate disconnect setTimeout(() => inTransport.destroy()) }) test(basic - obj, (t) => { t.plan(2) const { inTransport, outTransport, inMux, outMux, inStream, outStream, } = basicTestSetup() bufferToEnd(outStream, (err, results) => { t.error(err, should not error) t.deepEqual(results, [{ message: haay }, { message: wuurl }], results should match) t.end() }) // pass in messages inStream.write({ message: haay }) inStream.write({ message: wuurl }) // simulate disconnect setTimeout(() => inTransport.destroy()) }) test(roundtrip, (t) => { t.plan(2) const { inTransport, outTransport, inMux, outMux, inStream, outStream, } = basicTestSetup() const doubler = new Transform({ objectMode: true, transform (chunk, end, callback) {//对流内数据进行*2计算 // console.log(‘doubler!‘, chunk) const result = chunk * 2 callback(null, result) } }) pump(//即将从outStream处得到的数据进行*2处理后再传回outStream outStream, doubler, outStream ) bufferToEnd(inStream, (err, results) => { t.error(err, should not error) t.deepEqual(results, [20, 24], results should match) t.end() }) // pass in messages inStream.write(10) inStream.write(12) // simulate disconnect setTimeout(() => outTransport.destroy(), 100) }) // util function basicTestSetup() { // setup multiplex and Transport const inMux = new ObjMultiplex()//定义了两个parent流 const outMux = new ObjMultiplex() const inTransport = new PassThrough({ objectMode: true }) const outTransport = new PassThrough({ objectMode: true }) // setup substreams const inStream = inMux.createStream(hello)//分别在两个parent流中各自定义一个name为hello的子流 const outStream = outMux.createStream(hello) pump(//形成一个pipe流 inMux, inTransport, outMux, outTransport, inMux ) return { inTransport, outTransport, inMux, outMux, inStream, outStream, } } function bufferToEnd(stream, callback) { const results = [] endOfStream(stream, (err) => callback(err, results))//定义了流结束后的回调 stream.on(data, (chunk) => results.push(chunk))//并监听data事件,用于将数据压入流 }

 



kumavis/obj-multiplex

标签:super   must   rto   https   timeout   col   eth   ini   written   

原文地址:https://www.cnblogs.com/wanghui-garcia/p/9885481.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!