Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream 简介 #18

Open
39Er opened this issue Jun 29, 2017 · 0 comments
Open

Stream 简介 #18

39Er opened this issue Jun 29, 2017 · 0 comments
Labels

Comments

@39Er
Copy link
Owner

39Er commented Jun 29, 2017

Stream 是Node.js 的一个基础模块,继承了EventEmitter,是很多模块的基础,几乎所有的I/O操作都与其有关。

Stream 的分类包括Readable、Writable、Duplex、Transform

通过 Stream 源码 可了解具体分类:

const Stream = module.exports = require('internal/streams/legacy');

Stream.Readable = require('_stream_readable');
Stream.Writable = require('_stream_writable');
Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');

// Backwards-compat with node 0.4.x
Stream.Stream = Stream;

1. Readable

常见的Readable 如下:

  • http response(客户端)
  • http request(服务器端)
  • fs.ReadStream
  • zlib
  • crypto
  • net.Socket
  • child precess的stdout和stderr
  • process.stdin

new stream.Readable([options])

    options <Object>

        highWaterMark <number> :The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Defaults to 16384 (16kb), or 16 for objectMode streams

        encoding <string>: If specified, then buffers will be decoded to strings using the specified encoding. Defaults to null

        objectMode <boolean>: Whether this stream should behave as a stream of objects. Meaning that stream.read(n) returns a single value instead of a Buffer of size n. Defaults to false

        read <Function> Implementation for the stream._read() method.

        destroy Implementation for the stream._destroy() method.

Readable stream有两种模式:

  • flowing:在该模式下,会尽快获取数据向外输出。因此如果没有事件监听,也没有pipe()来引导数据流向,数据可能会丢失。
  • paused:默认模式。在该模式下,需要手动调用stream.read()来获取数据。

可以通过以下几种方法切换到following模式:

  1. 为data事件添加监听器:
  2. 调用resume()
  3. 调用pipe()

可以通过以下几种方法切换到paused模式:

  1. 如果没有pipe,则调用pause()即可
  2. 如果有pipe,那么需要移除data事件的所有监听器,并通过unpipe()移除所有的pipe

paused 例子:

var Readable = require('stream').Readable;
var rs = new Readable();

rs.on('readable', function() {
var chunk = rs.read();
console.log('get data:', chunk ? chunk.toString() : null);
});

rs.on('end', function() {
console.log('stream end');
});

rs.push('hello stream');
rs.push('hello alex');
rs.push(null);

flowing 例子:

var Readable = require('stream').Readable;
var rs = new Readable();

rs.on('data', function(chunk) {
console.log('get data:', chunk.toString());
});

rs.on('end', function() {
console.log('stream end');
});

rs.push('hello stream');
rs.push('hello alex');
rs.push(null);

在实际情况下,如果要实现一个自定义的Readable stream类,往往是通过定义其_read方法来进行数据的处理。看如下例子:

var Readable = require('stream').Readable;

function MyReadable(data, options) {
if (!(this instanceof MyReadable)) {
return new MyReadable(data, options);
}
Readable.call(this, options);
this.data = data || [];
this.index = 0;
}

MyReadable.prototype.__proto__ = Readable.prototype;

MyReadable.prototype._read = function() {
if (this.index >= this.data.length) {
this.push(null);
} else {
setTimeout(function() {
this.push(this.data[this.index++]);
}.bind(this), 1000);
}
};

var data = ['California Dreaming', 'Hotel California', 'Californication'];
var rs = MyReadable(data);

rs.on('data', function(chunk) {
console.log('get data:', chunk.toString());
});

实现一个Readable,需要实现_read()方法:

const { Readable } = require('stream');

class MyReadable extends Readable {
  constructor(options) {
    // Calls the stream.Readable(options) constructor
    super(options);
    // ...
  }
  _read(size){
	// ...
  }
}

通过readable.push(chunk[, encoding])方法来向缓冲区中写入数据

2. Writable

常见的如下几种:

  • http request(客户端)
  • http response(服务器端)
  • fs.WriteStream
  • zlib
  • crypto
  • net.Socket
  • child process的stdin
  • process.stdout
  • process.stderr

Constructor: new stream.Writable([options])

    options <Object>

        highWaterMark <number> Buffer level when stream.write() starts returning false. Defaults to 16384 (16kb), or 16 for objectMode streams.

        decodeStrings <boolean> Whether or not to decode strings into Buffers before passing them to stream._write(). Defaults to true

        objectMode <boolean> Whether or not the stream.write(anyObj) is a valid operation. When set, it becomes possible to write JavaScript values other than string, Buffer or Uint8Array if supported by the stream implementation. Defaults to false

        write <Function> Implementation for the stream._write() method.

        writev <Function> Implementation for the stream._writev() method.

        destroy <Function> Implementation for the stream._destroy() method.

        final <Function> Implementation for the stream._final() method.

当创建一个Writable stream的时候,我们需要实现其_write()方法。看下面例子:

var Writable = require('stream').Writable;

var ws = Writable();

ws._write = function(chunk, encoding, cb) {
console.log(chunk.toString());
cb();
}

ws.on('finish', function() {
console.log('on finish');
});

ws.write('hello world');
ws.write('hello alex');
ws.end();

实现Writable:

const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
    // Calls the stream.Writable() constructor
    super(options);
    // ...
  }
}

3. Duplex

Duplex 同时实现了 Readable 和 Writable。

new stream.Duplex(options)

    options <Object> Passed to both Writable and Readable constructors. Also has the following fields:

        allowHalfOpen <boolean> Defaults to true. If set to false, then the stream will automatically end the readable side when the writable side ends and vice versa.

        readableObjectMode <boolean> Defaults to false. Sets objectMode for readable side of the stream. Has no effect if objectMode is true.

        writableObjectMode <boolean> Defaults to false. Sets objectMode for writable side of the stream. Has no effect if objectMode is true.

常见的Duplex stream有:

  • zlib
  • crypto
  • net.Socket

如果要实现一个Duplex stream,需要实现它的_read()和_write()方法。

如下例子,实现了一个比较简单的双向流:

var util = require('util'),
stream = require('stream'),
Readable = stream.Readable,
Duplex = stream.Duplex;

function MyReadable(options) {
  if (!(this instanceof MyReadable)) {
    return new MyReadable(options);
  }
  Readable.call(this, options);
  this._cur = 1;
  this._max = 20;
}

util.inherits(MyReadable, Readable);

MyReadable.prototype._read = function() {
  if (this._cur > this._max) {
    this.push(null);
  } else {
    this.push('' + this._cur++);
  }
}

function MyDuplex(options) {
  if (!(this instanceof MyDuplex)) {
    return new MyDuplex(options);
  }
  Duplex.call(this, options);
  this._data = [];
}

util.inherits(MyDuplex, Duplex);

MyDuplex.prototype._read = function() {
  if (this._data.length) {
    this.push(this._data.shift());
    this.push('\n');
  } else {
    this.push(null);
  }
}

MyDuplex.prototype._write = function(chunk, encoding, cb) {
  console.log('write data:', chunk.toString());
  this._data.push(chunk);
  cb();
};

var rs = MyReadable(),
ds = MyDuplex();

rs.pipe(ds).pipe(process.stdout);

4. Transform

Transform是一种特殊的Duplex stream,它可以对数据进行转换,也就是说,它的输出是将输入根据某种规则计算而成的。常见的Transform stream有:

  • zlib
  • crypto

new stream.Transform([options])

    options <Object> Passed to both Writable and Readable constructors. Also has the following fields:

        transform <Function> Implementation for the stream._transform() method.

        flush <Function> Implementation for the stream._flush() method.

如果要实现一个Transform stream,需要实现它的_transform()方法。

如下例子,实现了一个比较简单的转换流:

var util = require('util'),
stream = require('stream'),
Readable = stream.Readable,
Transform = stream.Transform;

function MyReadable(options) {
  if (!(this instanceof MyReadable)) {
    return new MyReadable(options);
  }
  Readable.call(this, options);
  this._cur = 1;
  this._max = 20;
}

util.inherits(MyReadable, Readable);

MyReadable.prototype._read = function() {
  if (this._cur > this._max) {
    this.push(null);
  } else {
    this.push('' + this._cur++);
  }
}

function MyTransform(options) {
  if (!(this instanceof MyTransform)) {
    return new MyTransform(options);
  }
  Transform.call(this, options);
}

util.inherits(MyTransform, Transform);

MyTransform.prototype._transform = function(chunk, encoding, cb) {
  var val = Number(chunk.toString());
  this.push('' + val * 2 + '\n');
  cb();
};

var rs = MyReadable(),
ts = MyTransform();

rs.pipe(ts).pipe(process.stdout);

5. Duplex与Transform的区别

Duplex 虽然同事具备可读流和可写流,但两者是相对独立的;Transform 的可读流的数据会经过一定的处理过程自动进入可写流。

@39Er 39Er added the node.js label Jun 29, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant