diff --git a/.npmignore b/.npmignore
index 265ff739e0..6d270c6ccb 100644
--- a/.npmignore
+++ b/.npmignore
@@ -6,3 +6,4 @@ zlib.js
diff --git a/README.md b/README.md
index 9095c97598..9be2adb158 100644
--- a/README.md
+++ b/README.md
@@ -16,7 +16,9 @@ npm install --save readable-stream
***Node-core streams for userland***
This package is a mirror of the Streams2 and Streams3 implementations in
-Node-core, including [documentation](doc/stream.md).
+Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v7.1.0/docs/api/).
If you want to guarantee a stable streams base, regardless of what version of
Node you, or the users of your libraries are using, use **readable-stream** *only* and avoid the *"stream"* module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html).
diff --git a/build/build.js b/build/build.js
index a764841493..a54eae8181 100644
--- a/build/build.js
+++ b/build/build.js
@@ -12,12 +12,11 @@ const hyperquest = require('hyperzip')(require('hyperdirect'))
, nodeVersionRegexString = '\\d+\\.\\d+\\.\\d+'
, usageVersionRegex = RegExp('^' + nodeVersionRegexString + '$')
, readmeVersionRegex =
- RegExp('(Node-core v)' + nodeVersionRegexString, 'g')
+ RegExp('((?:Node-core )|(?:https\:\/\/nodejs\.org\/dist\/)v)' + nodeVersionRegexString, 'g')
, readmePath = path.join(__dirname, '..', 'README.md')
, files = require('./files')
, testReplace = require('./test-replacements')
- , docReplace = require('./doc-replacements')
, srcurlpfx = `https://raw.githubusercontent.com/nodejs/node/v${nodeVersion}/`
, libsrcurl = srcurlpfx + 'lib/'
@@ -25,8 +24,6 @@ const hyperquest = require('hyperzip')(require('hyperdirect'))
, testlisturl = `https://github.com/nodejs/node/tree/v${nodeVersion}/test/parallel`
, libourroot = path.join(__dirname, '../lib/')
, testourroot = path.join(__dirname, '../test/parallel/')
- , docurlpfx = `https://raw.githubusercontent.com/nodejs/node/v${nodeVersion}/doc/api/`
- , docourroot = path.join(__dirname, '../doc')
if (!usageVersionRegex.test(nodeVersion)) {
@@ -68,7 +65,14 @@ function processFile (inputLoc, out, replacements) {
+function deleteOldTests(){
+ const files = fs.readdirSync(path.join(__dirname, '..', 'test', 'parallel'));
+ for (let file of files) {
+ let name = path.join(__dirname, '..', 'test', 'parallel', file);
+ console.log('removing', name);
+ fs.unlinkSync(name);
+ }
function processLibFile (file) {
var replacements = files[file]
, url = libsrcurl + file
@@ -94,6 +98,10 @@ function processTestFile (file) {
+// delete the current contents of test/parallel so if node removes any tests
+// they are removed here
// Discover, grab and process all test-stream* files on nodejs/node
@@ -110,8 +118,6 @@ hyperquest(testlisturl).pipe(bl(function (err, data) {
-processFile(docurlpfx + 'stream.md', path.join(docourroot, 'stream.md'), docReplace)
// Grab the nodejs/node test/common.js
diff --git a/build/doc-replacements.js b/build/doc-replacements.js
deleted file mode 100644
index 684333c9c7..0000000000
--- a/build/doc-replacements.js
+++ /dev/null
@@ -1,6 +0,0 @@
-module.exports = [
- [
- /\]([\:|\(]\W?)([^\#]\w+\.html(?:#\w+))/g,
- `]$1https://nodejs.org/docs/v${process.argv[2]}/api/$2`
- ]
diff --git a/build/files.js b/build/files.js
index 47c21345d6..955cc168d7 100644
--- a/build/files.js
+++ b/build/files.js
@@ -27,9 +27,13 @@ const headRegexp = /(^module.exports = \w+;?)/m
// The browser build ends up with a circular dependency, so the require is
// done lazily, but cached.
+ , addDuplexDec = [
+ headRegexp
+ , '$1\n\n/**/\nvar Duplex;\n/**/\n'
+ ]
, addDuplexRequire = [
/^(function (?:Writable|Readable)(?:State)?.*{)/gm
- , 'var Duplex;\n$1\n Duplex = Duplex || require(\'./_stream_duplex\');\n'
+ , '\n$1\n Duplex = Duplex || require(\'./_stream_duplex\');\n'
, altForEachImplReplacement = require('./common-replacements').altForEachImplReplacement
@@ -199,6 +203,7 @@ module.exports['_stream_passthrough.js'] = [
module.exports['_stream_readable.js'] = [
+ , addDuplexDec
, requireReplacement
, instanceofReplacement
, altForEachImplReplacement
@@ -232,6 +237,7 @@ module.exports['_stream_transform.js'] = [
module.exports['_stream_writable.js'] = [
+ , addDuplexDec
, requireReplacement
, instanceofReplacement
, utilReplacement
diff --git a/build/test-replacements.js b/build/test-replacements.js
index d6ec87c430..48d42be70d 100644
--- a/build/test-replacements.js
+++ b/build/test-replacements.js
@@ -174,6 +174,14 @@ module.exports['common.js'] = [
, [
'{now: function (){}}'
+ /exports\.enoughTestCpu/,
+ '//exports.enoughTestCpu'
+ /exports\.buildType/,
+ '//exports.buildType'
@@ -284,3 +292,9 @@ module.exports['test-stream-writev.js'] = [
+module.exports['test-stream2-readable-empty-buffer-no-eof.js'] = [
+ [
+ /case 3:\n(\s+)setImmediate\(r\.read\.bind\(r, 0\)\);/,
+ 'case 3:\n$1setTimeout(r.read.bind(r, 0), 50);'
+ ]
diff --git a/doc/stream.md b/doc/stream.md
deleted file mode 100644
index 619efedbb1..0000000000
--- a/doc/stream.md
+++ /dev/null
@@ -1,2014 +0,0 @@
-# Stream
-> Stability: 2 - Stable
-A stream is an abstract interface for working with streaming data in Node.js.
-The `stream` module provides a base API that makes it easy to build objects
-that implement the stream interface.
-There are many stream objects provided by Node.js. For instance, a
-[request to an HTTP server][http-incoming-message] and [`process.stdout`][]
-are both stream instances.
-Streams can be readable, writable, or both. All streams are instances of
-The `stream` module can be accessed using:
-const stream = require('stream');
-While it is important for all Node.js users to understand how streams work,
-the `stream` module itself is most useful for developers that are creating new
-types of stream instances. Developer's who are primarily *consuming* stream
-objects will rarely (if ever) have need to use the `stream` module directly.
-## Organization of this Document
-This document is divided into two primary sections with a third section for
-additional notes. The first section explains the elements of the stream API that
-are required to *use* streams within an application. The second section explains
-the elements of the API that are required to *implement* new types of streams.
-## Types of Streams
-There are four fundamental stream types within Node.js:
-* [Readable][] - streams from which data can be read (for example
- [`fs.createReadStream()`][]).
-* [Writable][] - streams to which data can be written (for example
- [`fs.createWriteStream()`][]).
-* [Duplex][] - streams that are both Readable and Writable (for example
- [`net.Socket`][]).
-* [Transform][] - Duplex streams that can modify or transform the data as it
- is written and read (for example [`zlib.createDeflate()`][]).
-### Object Mode
-All streams created by Node.js APIs operate exclusively on strings and `Buffer`
-objects. It is possible, however, for stream implementations to work with other
-types of JavaScript values (with the exception of `null`, which serves a special
-purpose within streams). Such streams are considered to operate in "object
-Stream instances are switched into object mode using the `objectMode` option
-when the stream is created. Attempting to switch an existing stream into
-object mode is not safe.
-### Buffering
-Both [Writable][] and [Readable][] streams will store data in an internal
-buffer that can be retrieved using `writable._writableState.getBuffer()` or
-`readable._readableState.buffer`, respectively.
-The amount of data potentially buffered depends on the `highWaterMark` option
-passed into the streams constructor. For normal streams, the `highWaterMark`
-option specifies a total number of bytes. For streams operating in object mode,
-the `highWaterMark` specifies a total number of objects.
-Data is buffered in Readable streams when the implementation calls
-[`stream.push(chunk)`][stream-push]. If the consumer of the Stream does not
-call [`stream.read()`][stream-read], the data will sit in the internal
-queue until it is consumed.
-Once the total size of the internal read buffer reaches the threshold specified
-by `highWaterMark`, the stream will temporarily stop reading data from the
-underlying resource until the data currently buffered can be consumed (that is,
-the stream will stop calling the internal `readable._read()` method that is
-used to fill the read buffer).
-Data is buffered in Writable streams when the
-[`writable.write(chunk)`][stream-write] method is called repeatedly. While the
-total size of the internal write buffer is below the threshold set by
-`highWaterMark`, calls to `writable.write()` will return `true`. Once
-the size of the internal buffer reaches or exceeds the `highWaterMark`, `false`
-will be returned.
-A key goal of the `stream` API, particularly the [`stream.pipe()`] method,
-is to limit the buffering of data to acceptable levels such that sources and
-destinations of differing speeds will not overwhelm the available memory.
-Because [Duplex][] and [Transform][] streams are both Readable and Writable,
-each maintain *two* separate internal buffers used for reading and writing,
-allowing each side to operate independently of the other while maintaining an
-appropriate and efficient flow of data. For example, [`net.Socket`][] instances
-are [Duplex][] streams whose Readable side allows consumption of data received
-*from* the socket and whose Writable side allows writing data *to* the socket.
-Because data may be written to the socket at a faster or slower rate than data
-is received, it is important for each side to operate (and buffer) independently
-of the other.
-## API for Stream Consumers
-Almost all Node.js applications, no matter how simple, use streams in some
-manner. The following is an example of using streams in a Node.js application
-that implements an HTTP server:
-const http = require('http');
-const server = http.createServer( (req, res) => {
- // req is an http.IncomingMessage, which is a Readable Stream
- // res is an http.ServerResponse, which is a Writable Stream
- let body = '';
- // Get the data as utf8 strings.
- // If an encoding is not set, Buffer objects will be received.
- req.setEncoding('utf8');
- // Readable streams emit 'data' events once a listener is added
- req.on('data', (chunk) => {
- body += chunk;
- });
- // the end event indicates that the entire body has been received
- req.on('end', () => {
- try {
- const data = JSON.parse(body);
- // write back something interesting to the user:
- res.write(typeof data);
- res.end();
- } catch (er) {
- // uh oh! bad json!
- res.statusCode = 400;
- return res.end(`error: ${er.message}`);
- }
- });
-// $ curl localhost:1337 -d '{}'
-// object
-// $ curl localhost:1337 -d '"foo"'
-// string
-// $ curl localhost:1337 -d 'not json'
-// error: Unexpected token o
-[Writable][] streams (such as `res` in the example) expose methods such as
-`write()` and `end()` that are used to write data onto the stream.
-[Readable][] streams use the [`EventEmitter`][] API for notifying application
-code when data is available to be read off the stream. That available data can
-be read from the stream in multiple ways.
-Both [Writable][] and [Readable][] streams use the [`EventEmitter`][] API in
-various ways to communicate the current state of the stream.
-[Duplex][] and [Transform][] streams are both [Writable][] and [Readable][].
-Applications that are either writing data to or consuming data from a stream
-are not required to implement the stream interfaces directly and will generally
-have no reason to call `require('stream')`.
-Developers wishing to implement new types of streams should refer to the
-section [API for Stream Implementers][].
-### Writable Streams
-Writable streams are an abstraction for a *destination* to which data is
-Examples of [Writable][] streams include:
-* [HTTP requests, on the client][]
-* [HTTP responses, on the server][]
-* [fs write streams][]
-* [zlib streams][zlib]
-* [crypto streams][crypto]
-* [TCP sockets][]
-* [child process stdin][]
-* [`process.stdout`][], [`process.stderr`][]
-*Note*: Some of these examples are actually [Duplex][] streams that implement
-the [Writable][] interface.
-All [Writable][] streams implement the interface defined by the
-`stream.Writable` class.
-While specific instances of [Writable][] streams may differ in various ways,
-all Writable streams follow the same fundamental usage pattern as illustrated
-in the example below:
-const myStream = getWritableStreamSomehow();
-myStream.write('some data');
-myStream.write('some more data');
-myStream.end('done writing data');
-#### Class: stream.Writable
-##### Event: 'close'
-The `'close'` event is emitted when the stream and any of its underlying
-resources (a file descriptor, for example) have been closed. The event indicates
-that no more events will be emitted, and no further computation will occur.
-Not all Writable streams will emit the `'close'` event.
-##### Event: 'drain'
-If a call to [`stream.write(chunk)`][stream-write] returns `false`, the
-`'drain'` event will be emitted when it is appropriate to resume writing data
-to the stream.
-// Write the data to the supplied writable stream one million times.
-// Be attentive to back-pressure.
-function writeOneMillionTimes(writer, data, encoding, callback) {
- let i = 1000000;
- write();
- function write() {
- var ok = true;
- do {
- i--;
- if (i === 0) {
- // last time!
- writer.write(data, encoding, callback);
- } else {
- // see if we should continue, or wait
- // don't pass the callback, because we're not done yet.
- ok = writer.write(data, encoding);
- }
- } while (i > 0 && ok);
- if (i > 0) {
- // had to stop early!
- // write some more once it drains
- writer.once('drain', write);
- }
- }
-##### Event: 'error'
-* {Error}
-The `'error'` event is emitted if an error occurred while writing or piping
-data. The listener callback is passed a single `Error` argument when called.
-*Note*: The stream is not closed when the `'error'` event is emitted.
-##### Event: 'finish'
-The `'finish'` event is emitted after the [`stream.end()`][stream-end] method
-has been called, and all data has been flushed to the underlying system.
-const writer = getWritableStreamSomehow();
-for (var i = 0; i < 100; i ++) {
- writer.write('hello, #${i}!\n');
-writer.end('This is the end\n');
-writer.on('finish', () => {
- console.error('All writes are now complete.');
-##### Event: 'pipe'
-* `src` {stream.Readable} source stream that is piping to this writable
-The `'pipe'` event is emitted when the [`stream.pipe()`][] method is called on
-a readable stream, adding this writable to its set of destinations.
-const writer = getWritableStreamSomehow();
-const reader = getReadableStreamSomehow();
-writer.on('pipe', (src) => {
- console.error('something is piping into the writer');
- assert.equal(src, reader);
-##### Event: 'unpipe'
-* `src` {[Readable][] Stream} The source stream that
- [unpiped][`stream.unpipe()`] this writable
-The `'unpipe'` event is emitted when the [`stream.unpipe()`][] method is called
-on a [Readable][] stream, removing this [Writable][] from its set of
-const writer = getWritableStreamSomehow();
-const reader = getReadableStreamSomehow();
-writer.on('unpipe', (src) => {
- console.error('Something has stopped piping into the writer.');
- assert.equal(src, reader);
-##### writable.cork()
-The `writable.cork()` method forces all written data to be buffered in memory.
-The buffered data will be flushed when either the [`stream.uncork()`][] or
-[`stream.end()`][stream-end] methods are called.
-The primary intent of `writable.cork()` is to avoid a situation where writing
-many small chunks of data to a stream do not cause a backup in the internal
-buffer that would have an adverse impact on performance. In such situations,
-implementations that implement the `writable._writev()` method can perform
-buffered writes in a more optimized manner.
-##### writable.end([chunk][, encoding][, callback])
-* `chunk` {String|Buffer|any} Optional data to write. For streams not operating
- in object mode, `chunk` must be a string or a `Buffer`. For object mode
- streams, `chunk` may be any JavaScript value other than `null`.
-* `encoding` {String} The encoding, if `chunk` is a String
-* `callback` {Function} Optional callback for when the stream is finished
-Calling the `writable.end()` method signals that no more data will be written
-to the [Writable][]. The optional `chunk` and `encoding` arguments allow one
-final additional chunk of data to be written immediately before closing the
-stream. If provided, the optional `callback` function is attached as a listener
-for the [`'finish'`][] event.
-Calling the [`stream.write()`][stream-write] method after calling
-[`stream.end()`][stream-end] will raise an error.
-// write 'hello, ' and then end with 'world!'
-const file = fs.createWriteStream('example.txt');
-file.write('hello, ');
-// writing more now is not allowed!
-##### writable.setDefaultEncoding(encoding)
-* `encoding` {String} The new default encoding
-* Return: `this`
-The `writable.setDefaultEncoding()` method sets the default `encoding` for a
-[Writable][] stream.
-##### writable.uncork()
-The `writable.uncork()` method flushes all data buffered since
-[`stream.cork()`][] was called.
-When using `writable.cork()` and `writable.uncork()` to manage the buffering
-of writes to a stream, it is recommended that calls to `writable.uncork()` be
-deferred using `process.nextTick()`. Doing so allows batching of all
-`writable.write()` calls that occur within a given Node.js event loop phase.
-stream.write('some ');
-stream.write('data ');
-process.nextTick(() => stream.uncork());
-If the `writable.cork()` method is called multiple times on a stream, the same
-number of calls to `writable.uncork()` must be called to flush the buffered
-stream.write('some ');
-stream.write('data ');
-process.nextTick(() => {
- stream.uncork();
- // The data will not be flushed until uncork() is called a second time.
- stream.uncork();
-##### writable.write(chunk[, encoding][, callback])
-* `chunk` {String|Buffer} The data to write
-* `encoding` {String} The encoding, if `chunk` is a String
-* `callback` {Function} Callback for when this chunk of data is flushed
-* Returns: {Boolean} `false` if the stream wishes for the calling code to
- wait for the `'drain'` event to be emitted before continuing to write
- additional data; otherwise `true`.
-The `writable.write()` method writes some data to the stream, and calls the
-supplied `callback` once the data has been fully handled. If an error
-occurs, the `callback` *may or may not* be called with the error as its
-first argument. To reliably detect write errors, add a listener for the
-`'error'` event.
-The return value indicates whether the written `chunk` was buffered internally
-and the buffer has exceeded the `highWaterMark` configured when the stream was
-created. If `false` is returned, further attempts to write data to the stream
-should be paused until the [`'drain'`][] event is emitted.
-A Writable stream in object mode will always ignore the `encoding` argument.
-### Readable Streams
-Readable streams are an abstraction for a *source* from which data is
-Examples of Readable streams include:
-* [HTTP responses, on the client][http-incoming-message]
-* [HTTP requests, on the server][http-incoming-message]
-* [fs read streams][]
-* [zlib streams][zlib]
-* [crypto streams][crypto]
-* [TCP sockets][]
-* [child process stdout and stderr][]
-* [`process.stdin`][]
-All [Readable][] streams implement the interface defined by the
-`stream.Readable` class.
-#### Two Modes
-Readable streams effectively operate in one of two modes: flowing and paused.
-When in flowing mode, data is read from the underlying system automatically
-and provided to an application as quickly as possible using events via the
-[`EventEmitter`][] interface.
-In paused mode, the [`stream.read()`][stream-read] method must be called
-explicitly to read chunks of data from the stream.
-All [Readable][] streams begin in paused mode but can be switched to flowing
-mode in one of the following ways:
-* Adding a [`'data'`][] event handler.
-* Calling the [`stream.resume()`][stream-resume] method.
-* Calling the [`stream.pipe()`][] method to send the data to a [Writable][].
-The Readable can switch back to paused mode using one of the following:
-* If there are no pipe destinations, by calling the
- [`stream.pause()`][stream-pause] method.
-* If there are pipe destinations, by removing any [`'data'`][] event
- handlers, and removing all pipe destinations by calling the
- [`stream.unpipe()`][] method.
-The important concept to remember is that a Readable will not generate data
-until a mechanism for either consuming or ignoring that data is provided. If
-the consuming mechanism is disabled or taken away, the Readable will *attempt*
-to stop generating the data.
-*Note*: For backwards compatibility reasons, removing [`'data'`][] event
-handlers will **not** automatically pause the stream. Also, if there are piped
-destinations, then calling [`stream.pause()`][stream-pause] will not guarantee
-that the stream will *remain* paused once those destinations drain and ask for
-more data.
-*Note*: If a [Readable][] is switched into flowing mode and there are no
-consumers available handle the data, that data will be lost. This can occur,
-for instance, when the `readable.resume()` method is called without a listener
-attached to the `'data'` event, or when a `'data'` event handler is removed
-from the stream.
-#### Three States
-The "two modes" of operation for a Readable stream are a simplified abstraction
-for the more complicated internal state management that is happening within the
-Readable stream implementation.
-Specifically, at any given point in time, every Readable is in one of three
-possible states:
-* `readable._readableState.flowing = null`
-* `readable._readableState.flowing = false`
-* `readable._readableState.flowing = true`
-When `readable._readableState.flowing` is `null`, no mechanism for consuming the
-streams data is provided so the stream will not generate its data.
-Attaching a listener for the `'data'` event, calling the `readable.pipe()`
-method, or calling the `readable.resume()` method will switch
-`readable._readableState.flowing` to `true`, causing the Readable to begin
-actively emitting events as data is generated.
-Calling `readable.pause()`, `readable.unpipe()`, or receiving "back pressure"
-will cause the `readable._readableState.flowing` to be set as `false`,
-temporarily halting the flowing of events but *not* halting the generation of
-While `readable._readableState.flowing` is `false`, data may be accumulating
-within the streams internal buffer.
-#### Choose One
-The Readable stream API evolved across multiple Node.js versions and provides
-multiple methods of consuming stream data. In general, developers should choose
-*one* of the methods of consuming data and *should never* use multiple methods
-to consume data from a single stream.
-Use of the `readable.pipe()` method is recommended for most users as it has been
-implemented to provide the easiest way of consuming stream data. Developers that
-require more fine-grained control over the transfer and generation of data can
-use the [`EventEmitter`][] and `readable.pause()`/`readable.resume()` APIs.
-#### Class: stream.Readable
-##### Event: 'close'
-The `'close'` event is emitted when the stream and any of its underlying
-resources (a file descriptor, for example) have been closed. The event indicates
-that no more events will be emitted, and no further computation will occur.
-Not all [Readable][] streams will emit the `'close'` event.
-##### Event: 'data'
-* `chunk` {Buffer|String|any} The chunk of data. For streams that are not
- operating in object mode, the chunk will be either a string or `Buffer`.
- For streams that are in object mode, the chunk can be any JavaScript value
- other than `null`.
-The `'data'` event is emitted whenever the stream is relinquishing ownership of
-a chunk of data to a consumer. This may occur whenever the stream is switched
-in flowing mode by calling `readable.pipe()`, `readable.resume()`, or by
-attaching a listener callback to the `'data'` event. The `'data'` event will
-also be emitted whenever the `readable.read()` method is called and a chunk of
-data is available to be returned.
-Attaching a `'data'` event listener to a stream that has not been explicitly
-paused will switch the stream into flowing mode. Data will then be passed as
-soon as it is available.
-The listener callback will be passed the chunk of data as a string if a default
-encoding has been specified for the stream using the
-`readable.setEncoding()` method; otherwise the data will be passed as a
-const readable = getReadableStreamSomehow();
-readable.on('data', (chunk) => {
- console.log(`Received ${chunk.length} bytes of data.`);
-##### Event: 'end'
-The `'end'` event is emitted when there is no more data to be consumed from
-the stream.
-*Note*: The `'end'` event **will not be emitted** unless the data is
-completely consumed. This can be accomplished by switching the stream into
-flowing mode, or by calling [`stream.read()`][stream-read] repeatedly until
-all data has been consumed.
-const readable = getReadableStreamSomehow();
-readable.on('data', (chunk) => {
- console.log(`Received ${chunk.length} bytes of data.`);
-readable.on('end', () => {
- console.log('There will be no more data.');
-##### Event: 'error'
-* {Error}
-The `'error'` event may be emitted by a Readable implementation at any time.
-Typically, this may occur if the underlying stream in unable to generate data
-due to an underlying internal failure, or when a stream implementation attempts
-to push an invalid chunk of data.
-The listener callback will be passed a single `Error` object.
-##### Event: 'readable'
-The `'readable'` event is emitted when there is data available to be read from
-the stream. In some cases, attaching a listener for the `'readable'` event will
-cause some amount of data to be read into an internal buffer.
-const readable = getReadableStreamSomehow();
-readable.on('readable', () => {
- // there is some data to read now
-The `'readable'` event will also be emitted once the end of the stream data
-has been reached but before the `'end'` event is emitted.
-Effectively, the `'readable'` event indicates that the stream has new
-information: either new data is available or the end of the stream has been
-reached. In the former case, [`stream.read()`][stream-read] will return the
-available data. In the latter case, [`stream.read()`][stream-read] will return
-`null`. For instance, in the following example, `foo.txt` is an empty file:
-const fs = require('fs');
-const rr = fs.createReadStream('foo.txt');
-rr.on('readable', () => {
- console.log('readable:', rr.read());
-rr.on('end', () => {
- console.log('end');
-The output of running this script is:
-$ node test.js
-readable: null
-*Note*: In general, the `readable.pipe()` and `'data'` event mechanisms are
-preferred over the use of the `'readable'` event.
-##### readable.isPaused()
-* Return: {Boolean}
-The `readable.isPaused()` method returns the current operating state of the
-Readable. This is used primarily by the mechanism that underlies the
-`readable.pipe()` method. In most typical cases, there will be no reason to
-use this method directly.
-const readable = new stream.Readable
-readable.isPaused() // === false
-readable.isPaused() // === true
-readable.isPaused() // === false
-##### readable.pause()
-* Return: `this`
-The `readable.pause()` method will cause a stream in flowing mode to stop
-emitting [`'data'`][] events, switching out of flowing mode. Any data that
-becomes available will remain in the internal buffer.
-const readable = getReadableStreamSomehow();
-readable.on('data', (chunk) => {
- console.log(`Received ${chunk.length} bytes of data.`);
- readable.pause();
- console.log('There will be no additional data for 1 second.');
- setTimeout(() => {
- console.log('Now data will start flowing again.');
- readable.resume();
- }, 1000);
-##### readable.pipe(destination[, options])
-* `destination` {stream.Writable} The destination for writing data
-* `options` {Object} Pipe options
- * `end` {Boolean} End the writer when the reader ends. Defaults to `true`.
-The `readable.pipe()` method attaches a [Writable][] stream to the `readable`,
-causing it to switch automatically into flowing mode and push all of its data
-to the attached [Writable][]. The flow of data will be automatically managed so
-that the destination Writable stream is not overwhelmed by a faster Readable
-The following example pipes all of the data from the `readable` into a file
-named `file.txt`:
-const readable = getReadableStreamSomehow();
-const writable = fs.createWriteStream('file.txt');
-// All the data from readable goes into 'file.txt'
-It is possible to attach multiple Writable streams to a single Readable stream.
-The `readable.pipe()` method returns a reference to the *destination* stream
-making it possible to set up chains of piped streams:
-const r = fs.createReadStream('file.txt');
-const z = zlib.createGzip();
-const w = fs.createWriteStream('file.txt.gz');
-By default, [`stream.end()`][stream-end] is called on the destination Writable
-stream when the source Readable stream emits [`'end'`][], so that the
-destination is no longer writable. To disable this default behavior, the `end`
-option can be passed as `false`, causing the destination stream to remain open,
-as illustrated in the following example:
-reader.pipe(writer, { end: false });
-reader.on('end', () => {
- writer.end('Goodbye\n');
-One important caveat is that if the Readable stream emits an error during
-processing, the Writable destination *is not closed* automatically. If an
-error occurs, it will be necessary to *manually* close each stream in order
-to prevent memory leaks.
-*Note*: The [`process.stderr`][] and [`process.stdout`][] Writable streams are
-never closed until the Node.js process exits, regardless of the specified
-##### readable.read([size])
-* `size` {Number} Optional argument to specify how much data to read.
-* Return {String|Buffer|Null}
-The `readable.read()` method pulls some data out of the internal buffer and
-returns it. If no data available to be read, `null` is returned. By default,
-the data will be returned as a `Buffer` object unless an encoding has been
-specified using the `readable.setEncoding()` method or the stream is operating
-in object mode.
-The optional `size` argument specifies a specific number of bytes to read. If
-`size` bytes are not available to be read, `null` will be returned *unless*
-the stream has ended, in which case all of the data remaining in the internal
-buffer will be returned (*even if it exceeds `size` bytes*).
-If the `size` argument is not specified, all of the data contained in the
-internal buffer will be returned.
-The `readable.read()` method should only be called on Readable streams operating
-in paused mode. In flowing mode, `readable.read()` is called automatically until
-the internal buffer is fully drained.
-const readable = getReadableStreamSomehow();
-readable.on('readable', () => {
- var chunk;
- while (null !== (chunk = readable.read())) {
- console.log(`Received ${chunk.length} bytes of data.`);
- }
-In general, it is recommended that developers avoid the use of the `'readable'`
-event and the `readable.read()` method in favor of using either
-`readable.pipe()` or the `'data'` event.
-A Readable stream in object mode will always return a single item from
-a call to [`readable.read(size)`][stream-read], regardless of the value of the
-`size` argument.
-*Note:* If the `readable.read()` method returns a chunk of data, a `'data'`
-event will also be emitted.
-*Note*: Calling [`stream.read([size])`][stream-read] after the [`'end'`][]
-event has been emitted will return `null`. No runtime error will be raised.
-##### readable.resume()
-* Return: `this`
-The `readable.resume()` method causes an explicitly paused Readable stream to
-resume emitting [`'data'`][] events, switching the stream into flowing mode.
-The `readable.resume()` method can be used to fully consume the data from a
-stream without actually processing any of that data as illustrated in the
-following example:
- .resume()
- .on('end', () => {
- console.log('Reached the end, but did not read anything.');
- });
-##### readable.setEncoding(encoding)
-* `encoding` {String} The encoding to use.
-* Return: `this`
-The `readable.setEncoding()` method sets the default character encoding for
-data read from the Readable stream.
-Setting an encoding causes the stream data
-to be returned as string of the specified encoding rather than as `Buffer`
-objects. For instance, calling `readable.setEncoding('utf8')` will cause the
-output data will be interpreted as UTF-8 data, and passed as strings. Calling
-`readable.setEncoding('hex')` will cause the data to be encoded in hexadecimal
-string format.
-The Readable stream will properly handle multi-byte characters delivered through
-the stream that would otherwise become improperly decoded if simply pulled from
-the stream as `Buffer` objects.
-Encoding can be disabled by calling `readable.setEncoding(null)`. This approach
-is useful when working with binary data or with large multi-byte strings spread
-out over multiple chunks.
-const readable = getReadableStreamSomehow();
-readable.on('data', (chunk) => {
- assert.equal(typeof chunk, 'string');
- console.log('got %d characters of string data', chunk.length);
-##### readable.unpipe([destination])
-* `destination` {stream.Writable} Optional specific stream to unpipe
-The `readable.unpipe()` method detaches a Writable stream previously attached
-using the [`stream.pipe()`][] method.
-If the `destination` is not specified, then *all* pipes are detached.
-If the `destination` is specified, but no pipe is set up for it, then
-the method does nothing.
-const readable = getReadableStreamSomehow();
-const writable = fs.createWriteStream('file.txt');
-// All the data from readable goes into 'file.txt',
-// but only for the first second
-setTimeout(() => {
- console.log('Stop writing to file.txt');
- readable.unpipe(writable);
- console.log('Manually close the file stream');
- writable.end();
-}, 1000);
-##### readable.unshift(chunk)
-* `chunk` {Buffer|String} Chunk of data to unshift onto the read queue
-The `readable.unshift()` method pushes a chunk of data back into the internal
-buffer. This is useful in certain situations where a stream is being consumed by
-code that needs to "un-consume" some amount of data that it has optimistically
-pulled out of the source, so that the data can be passed on to some other party.
-*Note*: The `stream.unshift(chunk)` method cannot be called after the
-[`'end'`][] event has been emitted or a runtime error will be thrown.
-Developers using `stream.unshift()` often should consider switching to
-use of a [Transform][] stream instead. See the [API for Stream Implementers][]
-section for more information.
-// Pull off a header delimited by \n\n
-// use unshift() if we get too much
-// Call the callback with (error, header, stream)
-const StringDecoder = require('string_decoder').StringDecoder;
-function parseHeader(stream, callback) {
- stream.on('error', callback);
- stream.on('readable', onReadable);
- const decoder = new StringDecoder('utf8');
- var header = '';
- function onReadable() {
- var chunk;
- while (null !== (chunk = stream.read())) {
- var str = decoder.write(chunk);
- if (str.match(/\n\n/)) {
- // found the header boundary
- var split = str.split(/\n\n/);
- header += split.shift();
- const remaining = split.join('\n\n');
- const buf = Buffer.from(remaining, 'utf8');
- stream.removeListener('error', callback);
- // set the readable listener before unshifting
- stream.removeListener('readable', onReadable);
- if (buf.length)
- stream.unshift(buf);
- // now the body of the message can be read from the stream.
- callback(null, header, stream);
- } else {
- // still reading the header.
- header += str;
- }
- }
- }
-*Note*: Unlike [`stream.push(chunk)`][stream-push], `stream.unshift(chunk)`
-will not end the reading process by resetting the internal reading state of the
-stream. This can cause unexpected results if `readable.unshift()` is called
-during a read (i.e. from within a [`stream._read()`][stream-_read]
-implementation on a custom stream). Following the call to `readable.unshift()`
-with an immediate [`stream.push('')`][stream-push] will reset the reading state
-appropriately, however it is best to simply avoid calling `readable.unshift()`
-while in the process of performing a read.
-##### readable.wrap(stream)
-* `stream` {Stream} An "old style" readable stream
-Versions of Node.js prior to v0.10 had streams that did not implement the
-entire `stream` module API as it is currently defined. (See [Compatibility][]
-for more information.)
-When using an older Node.js library that emits [`'data'`][] events and has a
-[`stream.pause()`][stream-pause] method that is advisory only, the
-`readable.wrap()` method can be used to create a [Readable][] stream that uses
-the old stream as its data source.
-It will rarely be necessary to use `readable.wrap()` but the method has been
-provided as a convenience for interacting with older Node.js applications and
-For example:
-const OldReader = require('./old-api-module.js').OldReader;
-const Readable = require('stream').Readable;
-const oreader = new OldReader;
-const myReader = new Readable().wrap(oreader);
-myReader.on('readable', () => {
- myReader.read(); // etc.
-### Duplex and Transform Streams
-#### Class: stream.Duplex
-Duplex streams are streams that implement both the [Readable][] and
-[Writable][] interfaces.
-Examples of Duplex streams include:
-* [TCP sockets][]
-* [zlib streams][zlib]
-* [crypto streams][crypto]
-#### Class: stream.Transform
-Transform streams are [Duplex][] streams where the output is in some way
-related to the input. Like all [Duplex][] streams, Transform streams
-implement both the [Readable][] and [Writable][] interfaces.
-Examples of Transform streams include:
-* [zlib streams][zlib]
-* [crypto streams][crypto]
-## API for Stream Implementers
-The `stream` module API has been designed to make it possible to easily
-implement streams using JavaScript's prototypal inheritance model.
-First, a stream developer would declare a new JavaScript class that extends one
-of the four basic stream classes (`stream.Writable`, `stream.Readable`,
-`stream.Duplex`, or `stream.Transform`), making sure the call the appropriate
-parent class constructor:
-const Writable = require('stream').Writable;
-class MyWritable extends Writable {
- constructor(options) {
- super(options);
- }
-The new stream class must then implement one or more specific methods, depending
-on the type of stream being created, as detailed in the chart below:
- Use-case
- |
- Class
- |
- Method(s) to implement
- |
- Reading only
- |
- [Readable](#stream_class_stream_readable)
- |
- [_read][stream-_read]
- |
- Writing only
- |
- [Writable](#stream_class_stream_writable)
- |
- [_write][stream-_write] , [_writev][stream-_writev]
- |
- Reading and writing
- |
- [Duplex](#stream_class_stream_duplex)
- |
- [_read][stream-_read] , [_write][stream-_write] , [_writev][stream-_writev]
- |
- Operate on written data, then read the result
- |
- [Transform](#stream_class_stream_transform)
- |
- [_transform][stream-_transform] , [_flush][stream-_flush]
- |
-*Note*: The implementation code for a stream should *never* call the "public"
-methods of a stream that are intended for use by consumers (as described in
-the [API for Stream Consumers][] section). Doing so may lead to adverse
-side effects in application code consuming the stream.
-### Simplified Construction
-For many simple cases, it is possible to construct a stream without relying on
-inheritance. This can be accomplished by directly creating instances of the
-`stream.Writable`, `stream.Readable`, `stream.Duplex` or `stream.Transform`
-objects and passing appropriate methods as constructor options.
-For example:
-const Writable = require('stream').Writable;
-const myWritable = new Writable({
- write(chunk, encoding, callback) {
- // ...
- }
-### Implementing a Writable Stream
-The `stream.Writable` class is extended to implement a [Writable][] stream.
-Custom Writable streams *must* call the `new stream.Writable([options])`
-constructor and implement the `writable._write()` method. The
-`writable._writev()` method *may* also be implemented.
-#### Constructor: new stream.Writable([options])
-* `options` {Object}
- * `highWaterMark` {Number} Buffer level when
- [`stream.write()`][stream-write] starts returning `false`. Defaults to
- `16384` (16kb), or `16` for `objectMode` streams.
- * `decodeStrings` {Boolean} Whether or not to decode strings into
- Buffers before passing them to [`stream._write()`][stream-_write].
- Defaults to `true`
- * `objectMode` {Boolean} Whether or not the
- [`stream.write(anyObj)`][stream-write] is a valid operation. When set,
- it becomes possible to write JavaScript values other than string or
- `Buffer` if supported by the stream implementation. Defaults to `false`
- * `write` {Function} Implementation for the
- [`stream._write()`][stream-_write] method.
- * `writev` {Function} Implementation for the
- [`stream._writev()`][stream-_writev] method.
-For example:
-const Writable = require('stream').Writable;
-class MyWritable extends Writable {
- constructor(options) {
- // Calls the stream.Writable() constructor
- super(options);
- }
-Or, when using pre-ES6 style constructors:
-const Writable = require('stream').Writable;
-const util = require('util');
-function MyWritable(options) {
- if (!(this instanceof MyWritable))
- return new MyWritable(options);
- Writable.call(this, options);
-util.inherits(MyWritable, Writable);
-Or, using the Simplified Constructor approach:
-const Writable = require('stream').Writable;
-const myWritable = new Writable({
- write(chunk, encoding, callback) {
- // ...
- },
- writev(chunks, callback) {
- // ...
- }
-#### writable.\_write(chunk, encoding, callback)
-* `chunk` {Buffer|String} The chunk to be written. Will **always**
- be a buffer unless the `decodeStrings` option was set to `false`.
-* `encoding` {String} If the chunk is a string, then `encoding` is the
- character encoding of that string. If chunk is a `Buffer`, or if the
- stream is operating in object mode, `encoding` may be ignored.
-* `callback` {Function} Call this function (optionally with an error
- argument) when processing is complete for the supplied chunk.
-All Writable stream implementations must provide a
-[`writable._write()`][stream-_write] method to send data to the underlying
-*Note*: [Transform][] streams provide their own implementation of the
-*Note*: **This function MUST NOT be called by application code directly.** It
-should be implemented by child classes, and called only by the internal Writable
-class methods only.
-The `callback` method must be called to signal either that the write completed
-successfully or failed with an error. The first argument passed to the
-`callback` must be the `Error` object if the call failed or `null` if the
-write succeeded.
-It is important to note that all calls to `writable.write()` that occur between
-the time `writable._write()` is called and the `callback` is called will cause
-the written data to be buffered. Once the `callback` is invoked, the stream will
-emit a [`'drain'`][] event. If a stream implementation is capable of processing
-multiple chunks of data at once, the `writable._writev()` method should be
-If the `decodeStrings` property is set in the constructor options, then
-`chunk` may be a string rather than a Buffer, and `encoding` will
-indicate the character encoding of the string. This is to support
-implementations that have an optimized handling for certain string
-data encodings. If the `decodeStrings` property is explicitly set to `false`,
-the `encoding` argument can be safely ignored, and `chunk` will always be a
-The `writable._write()` method is prefixed with an underscore because it is
-internal to the class that defines it, and should never be called directly by
-user programs.
-#### writable.\_writev(chunks, callback)
-* `chunks` {Array} The chunks to be written. Each chunk has following
- format: `{ chunk: ..., encoding: ... }`.
-* `callback` {Function} A callback function (optionally with an error
- argument) to be invoked when processing is complete for the supplied chunks.
-*Note*: **This function MUST NOT be called by application code directly.** It
-should be implemented by child classes, and called only by the internal Writable
-class methods only.
-The `writable._writev()` method may be implemented in addition to
-`writable._write()` in stream implementations that are capable of processing
-multiple chunks of data at once. If implemented, the method will be called with
-all chunks of data currently buffered in the write queue.
-The `writable._writev()` method is prefixed with an underscore because it is
-internal to the class that defines it, and should never be called directly by
-user programs.
-#### Errors While Writing
-It is recommended that errors occurring during the processing of the
-`writable._write()` and `writable._writev()` methods are reported by invoking
-the callback and passing the error as the first argument. This will cause an
-`'error'` event to be emitted by the Writable. Throwing an Error from within
-`writable._write()` can result in expected and inconsistent behavior depending
-on how the stream is being used. Using the callback ensures consistent and
-predictable handling of errors.
-const Writable = require('stream').Writable;
-const myWritable = new Writable({
- write(chunk, encoding, callback) {
- if (chunk.toString().indexOf('a') >= 0) {
- callback(new Error('chunk is invalid'));
- } else {
- callback();
- }
- }
-#### An Example Writable Stream
-The following illustrates a rather simplistic (and somewhat pointless) custom
-Writable stream implementation. While this specific Writable stream instance
-is not of any real particular usefulness, the example illustrates each of the
-required elements of a custom [Writable][] stream instance:
-const Writable = require('stream').Writable;
-class MyWritable extends Writable {
- constructor(options) {
- super(options);
- }
- _write(chunk, encoding, callback) {
- if (chunk.toString().indexOf('a') >= 0) {
- callback(new Error('chunk is invalid'));
- } else {
- callback();
- }
- }
-### Implementing a Readable Stream
-The `stream.Readable` class is extended to implement a [Readable][] stream.
-Custom Readable streams *must* call the `new stream.Readable([options])`
-constructor and implement the `readable._read()` method.
-#### new stream.Readable([options])
-* `options` {Object}
- * `highWaterMark` {Number} The maximum number of bytes to store in
- the internal buffer before ceasing to read from the underlying
- resource. Defaults to `16384` (16kb), or `16` for `objectMode` streams
- * `encoding` {String} If specified, then buffers will be decoded to
- strings using the specified encoding. Defaults to `null`
- * `objectMode` {Boolean} Whether this stream should behave
- as a stream of objects. Meaning that [`stream.read(n)`][stream-read] returns
- a single value instead of a Buffer of size n. Defaults to `false`
- * `read` {Function} Implementation for the [`stream._read()`][stream-_read]
- method.
-For example:
-const Readable = require('stream').Readable;
-class MyReadable extends Readable {
- constructor(options) {
- // Calls the stream.Readable(options) constructor
- super(options);
- }
-Or, when using pre-ES6 style constructors:
-const Readable = require('stream').Readable;
-const util = require('util');
-function MyReadable(options) {
- if (!(this instanceof MyReadable))
- return new MyReadable(options);
- Readable.call(this, options);
-util.inherits(MyReadable, Readable);
-Or, using the Simplified Constructor approach:
-const Readable = require('stream').Readable;
-const myReadable = new Readable({
- read(size) {
- // ...
- }
-#### readable.\_read(size)
-* `size` {Number} Number of bytes to read asynchronously
-*Note*: **This function MUST NOT be called by application code directly.** It
-should be implemented by child classes, and called only by the internal Readable
-class methods only.
-All Readable stream implementations must provide an implementation of the
-`readable._read()` method to fetch data from the underlying resource.
-When `readable._read()` is called, if data is available from the resource, the
-implementation should begin pushing that data into the read queue using the
-[`this.push(dataChunk)`][stream-push] method. `_read()` should continue reading
-from the resource and pushing data until `readable.push()` returns `false`. Only
-when `_read()` is called again after it has stopped should it resume pushing
-additional data onto the queue.
-*Note*: Once the `readable._read()` method has been called, it will not be
-called again until the [`readable.push()`][stream-push] method is called.
-The `size` argument is advisory. For implementations where a "read" is a
-single operation that returns data can use the `size` argument to determine how
-much data to fetch. Other implementations may ignore this argument and simply
-provide data whenever it becomes available. There is no need to "wait" until
-`size` bytes are available before calling [`stream.push(chunk)`][stream-push].
-The `readable._read()` method is prefixed with an underscore because it is
-internal to the class that defines it, and should never be called directly by
-user programs.
-#### readable.push(chunk[, encoding])
-* `chunk` {Buffer|Null|String} Chunk of data to push into the read queue
-* `encoding` {String} Encoding of String chunks. Must be a valid
- Buffer encoding, such as `'utf8'` or `'ascii'`
-* Returns {Boolean} `true` if additional chunks of data may continued to be
- pushed; `false` otherwise.
-When `chunk` is a `Buffer` or `string`, the `chunk` of data will be added to the
-internal queue for users of the stream to consume. Passing `chunk` as `null`
-signals the end of the stream (EOF), after which no more data can be written.
-When the Readable is operating in paused mode, the data added with
-`readable.push()` can be read out by calling the
-[`readable.read()`][stream-read] method when the [`'readable'`][] event is
-When the Readable is operating in flowing mode, the data added with
-`readable.push()` will be delivered by emitting a `'data'` event.
-The `readable.push()` method is designed to be as flexible as possible. For
-example, when wrapping a lower-level source that provides some form of
-pause/resume mechanism, and a data callback, the low-level source can be wrapped
-by the custom Readable instance as illustrated in the following example:
-// source is an object with readStop() and readStart() methods,
-// and an `ondata` member that gets called when it has data, and
-// an `onend` member that gets called when the data is over.
-class SourceWrapper extends Readable {
- constructor(options) {
- super(options);
- this._source = getLowlevelSourceObject();
- // Every time there's data, push it into the internal buffer.
- this._source.ondata = (chunk) => {
- // if push() returns false, then stop reading from source
- if (!this.push(chunk))
- this._source.readStop();
- };
- // When the source ends, push the EOF-signaling `null` chunk
- this._source.onend = () => {
- this.push(null);
- };
- }
- // _read will be called when the stream wants to pull more data in
- // the advisory size argument is ignored in this case.
- _read(size) {
- this._source.readStart();
- }
-*Note*: The `readable.push()` method is intended be called only by Readable
-Implementers, and only from within the `readable._read()` method.
-#### Errors While Reading
-It is recommended that errors occurring during the processing of the
-`readable._read()` method are emitted using the `'error'` event rather than
-being thrown. Throwing an Error from within `readable._read()` can result in
-expected and inconsistent behavior depending on whether the stream is operating
-in flowing or paused mode. Using the `'error'` event ensures consistent and
-predictable handling of errors.
-const Readable = require('stream').Readable;
-const myReadable = new Readable({
- read(size) {
- if (checkSomeErrorCondition()) {
- process.nextTick(() => this.emit('error', err));
- return;
- }
- // do some work
- }
-#### An Example Counting Stream
-The following is a basic example of a Readable stream that emits the numerals
-from 1 to 1,000,000 in ascending order, and then ends.
-const Readable = require('stream').Readable;
-class Counter extends Readable {
- constructor(opt) {
- super(opt);
- this._max = 1000000;
- this._index = 1;
- }
- _read() {
- var i = this._index++;
- if (i > this._max)
- this.push(null);
- else {
- var str = '' + i;
- var buf = Buffer.from(str, 'ascii');
- this.push(buf);
- }
- }
-### Implementing a Duplex Stream
-A [Duplex][] stream is one that implements both [Readable][] and [Writable][],
-such as a TCP socket connection.
-Because JavaScript does not have support for multiple inheritance, the
-`stream.Duplex` class is extended to implement a [Duplex][] stream (as opposed
-to extending the `stream.Readable` *and* `stream.Writable` classes).
-*Note*: The `stream.Duplex` class prototypically inherits from `stream.Readable`
-and parasitically from `stream.Writable`, but `instanceof` will work properly
-for both base classes due to overriding [`Symbol.hasInstance`][]
-on `stream.Writable`.
-Custom Duplex streams *must* call the `new stream.Duplex([options])`
-constructor and implement *both* the `readable._read()` and
-`writable._write()` methods.
-#### new stream.Duplex(options)
-* `options` {Object} Passed to both Writable and Readable
- constructors. Also has the following fields:
- * `allowHalfOpen` {Boolean} Defaults to `true`. If set to `false`, then
- the stream will automatically end the readable side when the
- writable side ends and vice versa.
- * `readableObjectMode` {Boolean} Defaults to `false`. Sets `objectMode`
- for readable side of the stream. Has no effect if `objectMode`
- is `true`.
- * `writableObjectMode` {Boolean} Defaults to `false`. Sets `objectMode`
- for writable side of the stream. Has no effect if `objectMode`
- is `true`.
-For example:
-const Duplex = require('stream').Duplex;
-class MyDuplex extends Duplex {
- constructor(options) {
- super(options);
- }
-Or, when using pre-ES6 style constructors:
-const Duplex = require('stream').Duplex;
-const util = require('util');
-function MyDuplex(options) {
- if (!(this instanceof MyDuplex))
- return new MyDuplex(options);
- Duplex.call(this, options);
-util.inherits(MyDuplex, Duplex);
-Or, using the Simplified Constructor approach:
-const Duplex = require('stream').Duplex;
-const myDuplex = new Duplex({
- read(size) {
- // ...
- },
- write(chunk, encoding, callback) {
- // ...
- }
-#### An Example Duplex Stream
-The following illustrates a simple example of a Duplex stream that wraps a
-hypothetical lower-level source object to which data can be written, and
-from which data can be read, albeit using an API that is not compatible with
-Node.js streams.
-The following illustrates a simple example of a Duplex stream that buffers
-incoming written data via the [Writable][] interface that is read back out
-via the [Readable][] interface.
-const Duplex = require('stream').Duplex;
-const kSource = Symbol('source');
-class MyDuplex extends Duplex {
- constructor(source, options) {
- super(options);
- this[kSource] = source;
- }
- _write(chunk, encoding, callback) {
- // The underlying source only deals with strings
- if (Buffer.isBuffer(chunk))
- chunk = chunk.toString();
- this[kSource].writeSomeData(chunk);
- callback();
- }
- _read(size) {
- this[kSource].fetchSomeData(size, (data, encoding) => {
- this.push(Buffer.from(data, encoding));
- });
- }
-The most important aspect of a Duplex stream is that the Readable and Writable
-sides operate independently of one another despite co-existing within a single
-object instance.
-#### Object Mode Duplex Streams
-For Duplex streams, `objectMode` can be set exclusively for either the Readable
-or Writable side using the `readableObjectMode` and `writableObjectMode` options
-In the following example, for instance, a new Transform stream (which is a
-type of [Duplex][] stream) is created that has an object mode Writable side
-that accepts JavaScript numbers that are converted to hexadecimal strings on
-the Readable side.
-const Transform = require('stream').Transform;
-// All Transform streams are also Duplex Streams
-const myTransform = new Transform({
- writableObjectMode: true,
- transform(chunk, encoding, callback) {
- // Coerce the chunk to a number if necessary
- chunk |= 0;
- // Transform the chunk into something else.
- const data = chunk.toString(16);
- // Push the data onto the readable queue.
- callback(null, '0'.repeat(data.length % 2) + data);
- }
-myTransform.on('data', (chunk) => console.log(chunk));
- // Prints: 01
- // Prints: 0a
- // Prints: 64
-### Implementing a Transform Stream
-A [Transform][] stream is a [Duplex][] stream where the output is computed
-in some way from the input. Examples include [zlib][] streams or [crypto][]
-streams that compress, encrypt, or decrypt data.
-*Note*: There is no requirement that the output be the same size as the input,
-the same number of chunks, or arrive at the same time. For example, a
-Hash stream will only ever have a single chunk of output which is
-provided when the input is ended. A `zlib` stream will produce output
-that is either much smaller or much larger than its input.
-The `stream.Transform` class is extended to implement a [Transform][] stream.
-The `stream.Transform` class prototypically inherits from `stream.Duplex` and
-implements its own versions of the `writable._write()` and `readable._read()`
-methods. Custom Transform implementations *must* implement the
-[`transform._transform()`][stream-_transform] method and *may* also implement
-the [`transform._flush()`][stream-_flush] method.
-*Note*: Care must be taken when using Transform streams in that data written
-to the stream can cause the Writable side of the stream to become paused if
-the output on the Readable side is not consumed.
-#### new stream.Transform([options])
-* `options` {Object} Passed to both Writable and Readable
- constructors. Also has the following fields:
- * `transform` {Function} Implementation for the
- [`stream._transform()`][stream-_transform] method.
- * `flush` {Function} Implementation for the [`stream._flush()`][stream-_flush]
- method.
-For example:
-const Transform = require('stream').Transform;
-class MyTransform extends Transform {
- constructor(options) {
- super(options);
- }
-Or, when using pre-ES6 style constructors:
-const Transform = require('stream').Transform;
-const util = require('util');
-function MyTransform(options) {
- if (!(this instanceof MyTransform))
- return new MyTransform(options);
- Transform.call(this, options);
-util.inherits(MyTransform, Transform);
-Or, using the Simplified Constructor approach:
-const Transform = require('stream').Transform;
-const myTransform = new Transform({
- transform(chunk, encoding, callback) {
- // ...
- }
-#### Events: 'finish' and 'end'
-The [`'finish'`][] and [`'end'`][] events are from the `stream.Writable`
-and `stream.Readable` classes, respectively. The `'finish'` event is emitted
-after [`stream.end()`][stream-end] is called and all chunks have been processed
-by [`stream._transform()`][stream-_transform]. The `'end'` event is emitted
-after all data has been output, which occurs after the callback in
-[`transform._flush()`][stream-_flush] has been called.
-#### transform.\_flush(callback)
-* `callback` {Function} A callback function (optionally with an error
- argument and data) to be called when remaining data has been flushed.
-*Note*: **This function MUST NOT be called by application code directly.** It
-should be implemented by child classes, and called only by the internal Readable
-class methods only.
-In some cases, a transform operation may need to emit an additional bit of
-data at the end of the stream. For example, a `zlib` compression stream will
-store an amount of internal state used to optimally compress the output. When
-the stream ends, however, that additional data needs to be flushed so that the
-compressed data will be complete.
-Custom [Transform][] implementations *may* implement the `transform._flush()`
-method. This will be called when there is no more written data to be consumed,
-but before the [`'end'`][] event is emitted signaling the end of the
-[Readable][] stream.
-Within the `transform._flush()` implementation, the `readable.push()` method
-may be called zero or more times, as appropriate. The `callback` function must
-be called when the flush operation is complete.
-The `transform._flush()` method is prefixed with an underscore because it is
-internal to the class that defines it, and should never be called directly by
-user programs.
-#### transform.\_transform(chunk, encoding, callback)
-* `chunk` {Buffer|String} The chunk to be transformed. Will **always**
- be a buffer unless the `decodeStrings` option was set to `false`.
-* `encoding` {String} If the chunk is a string, then this is the
- encoding type. If chunk is a buffer, then this is the special
- value - 'buffer', ignore it in this case.
-* `callback` {Function} A callback function (optionally with an error
- argument and data) to be called after the supplied `chunk` has been
- processed.
-*Note*: **This function MUST NOT be called by application code directly.** It
-should be implemented by child classes, and called only by the internal Readable
-class methods only.
-All Transform stream implementations must provide a `_transform()`
-method to accept input and produce output. The `transform._transform()`
-implementation handles the bytes being written, computes an output, then passes
-that output off to the readable portion using the `readable.push()` method.
-The `transform.push()` method may be called zero or more times to generate
-output from a single input chunk, depending on how much is to be output
-as a result of the chunk.
-It is possible that no output is generated from any given chunk of input data.
-The `callback` function must be called only when the current chunk is completely
-consumed. The first argument passed to the `callback` must be an `Error` object
-if an error occurred while processing the input or `null` otherwise. If a second
-argument is passed to the `callback`, it will be forwarded on to the
-`readable.push()` method. In other words the following are equivalent:
-transform.prototype._transform = function (data, encoding, callback) {
- this.push(data);
- callback();
-transform.prototype._transform = function (data, encoding, callback) {
- callback(null, data);
-The `transform._transform()` method is prefixed with an underscore because it
-is internal to the class that defines it, and should never be called directly by
-user programs.
-#### Class: stream.PassThrough
-The `stream.PassThrough` class is a trivial implementation of a [Transform][]
-stream that simply passes the input bytes across to the output. Its purpose is
-primarily for examples and testing, but there are some use cases where
-`stream.PassThrough` is useful as a building block for novel sorts of streams.
-## Additional Notes
-### Compatibility with Older Node.js Versions
-In versions of Node.js prior to v0.10, the Readable stream interface was
-simpler, but also less powerful and less useful.
-* Rather than waiting for calls the [`stream.read()`][stream-read] method,
- [`'data'`][] events would begin emitting immediately. Applications that
- would need to perform some amount of work to decide how to handle data
- were required to store read data into buffers so the data would not be lost.
-* The [`stream.pause()`][stream-pause] method was advisory, rather than
- guaranteed. This meant that it was still necessary to be prepared to receive
- [`'data'`][] events *even when the stream was in a paused state*.
-In Node.js v0.10, the [Readable][] class was added. For backwards compatibility
-with older Node.js programs, Readable streams switch into "flowing mode" when a
-[`'data'`][] event handler is added, or when the
-[`stream.resume()`][stream-resume] method is called. The effect is that, even
-when not using the new [`stream.read()`][stream-read] method and
-[`'readable'`][] event, it is no longer necessary to worry about losing
-[`'data'`][] chunks.
-While most applications will continue to function normally, this introduces an
-edge case in the following conditions:
-* No [`'data'`][] event listener is added.
-* The [`stream.resume()`][stream-resume] method is never called.
-* The stream is not piped to any writable destination.
-For example, consider the following code:
-net.createServer((socket) => {
- // we add an 'end' method, but never consume the data
- socket.on('end', () => {
- // It will never get here.
- socket.end('The message was received but was not processed.\n');
- });
-In versions of Node.js prior to v0.10, the incoming message data would be
-simply discarded. However, in Node.js v0.10 and beyond, the socket remains
-paused forever.
-The workaround in this situation is to call the
-[`stream.resume()`][stream-resume] method to begin the flow of data:
-// Workaround
-net.createServer((socket) => {
- socket.on('end', () => {
- socket.end('The message was received but was not processed.\n');
- });
- // start the flow of data, discarding it.
- socket.resume();
-In addition to new Readable streams switching into flowing mode,
-pre-v0.10 style streams can be wrapped in a Readable class using the
-[`readable.wrap()`][`stream.wrap()`] method.
-### `readable.read(0)`
-There are some cases where it is necessary to trigger a refresh of the
-underlying readable stream mechanisms, without actually consuming any
-data. In such cases, it is possible to call `readable.read(0)`, which will
-always return `null`.
-If the internal read buffer is below the `highWaterMark`, and the
-stream is not currently reading, then calling `stream.read(0)` will trigger
-a low-level [`stream._read()`][stream-_read] call.
-While most applications will almost never need to do this, there are
-situations within Node.js where this is done, particularly in the
-Readable stream class internals.
-### `readable.push('')`
-Use of `readable.push('')` is not recommended.
-Pushing a zero-byte string or `Buffer` to a stream that is not in object mode
-has an interesting side effect. Because it *is* a call to
-[`readable.push()`][stream-push], the call will end the reading process.
-However, because the argument is an empty string, no data is added to the
-readable buffer so there is nothing for a user to consume.
-[`'data'`]: #stream_event_data
-[`'drain'`]: #stream_event_drain
-[`'end'`]: #stream_event_end
-[`'finish'`]: #stream_event_finish
-[`'readable'`]: #stream_event_readable
-[`EventEmitter`]: https://nodejs.org/docs/v7.0.0/api/events.html#events_class_eventemitter
-[`process.stderr`]: https://nodejs.org/docs/v7.0.0/api/process.html#process_process_stderr
-[`process.stdin`]: https://nodejs.org/docs/v7.0.0/api/process.html#process_process_stdin
-[`process.stdout`]: https://nodejs.org/docs/v7.0.0/api/process.html#process_process_stdout
-[`stream.cork()`]: #stream_writable_cork
-[`stream.pipe()`]: #stream_readable_pipe_destination_options
-[`stream.uncork()`]: #stream_writable_uncork
-[`stream.unpipe()`]: #stream_readable_unpipe_destination
-[`stream.wrap()`]: #stream_readable_wrap_stream
-[API for Stream Consumers]: #stream_api_for_stream_consumers
-[API for Stream Implementers]: #stream_api_for_stream_implementers
-[child process stdin]: https://nodejs.org/docs/v7.0.0/api/child_process.html#child_process_child_stdin
-[child process stdout and stderr]: https://nodejs.org/docs/v7.0.0/api/child_process.html#child_process_child_stdout
-[Compatibility]: #stream_compatibility_with_older_node_js_versions
-[crypto]: crypto.html
-[Duplex]: #stream_class_stream_duplex
-[fs read streams]: https://nodejs.org/docs/v7.0.0/api/fs.html#fs_class_fs_readstream
-[fs write streams]: https://nodejs.org/docs/v7.0.0/api/fs.html#fs_class_fs_writestream
-[`fs.createReadStream()`]: https://nodejs.org/docs/v7.0.0/api/fs.html#fs_fs_createreadstream_path_options
-[`fs.createWriteStream()`]: https://nodejs.org/docs/v7.0.0/api/fs.html#fs_fs_createwritestream_path_options
-[`net.Socket`]: https://nodejs.org/docs/v7.0.0/api/net.html#net_class_net_socket
-[`zlib.createDeflate()`]: https://nodejs.org/docs/v7.0.0/api/zlib.html#zlib_zlib_createdeflate_options
-[HTTP requests, on the client]: https://nodejs.org/docs/v7.0.0/api/http.html#http_class_http_clientrequest
-[HTTP responses, on the server]: https://nodejs.org/docs/v7.0.0/api/http.html#http_class_http_serverresponse
-[http-incoming-message]: https://nodejs.org/docs/v7.0.0/api/http.html#http_class_http_incomingmessage
-[Readable]: #stream_class_stream_readable
-[stream-_flush]: #stream_transform_flush_callback
-[stream-_read]: #stream_readable_read_size_1
-[stream-_transform]: #stream_transform_transform_chunk_encoding_callback
-[stream-_write]: #stream_writable_write_chunk_encoding_callback_1
-[stream-_writev]: #stream_writable_writev_chunks_callback
-[stream-end]: #stream_writable_end_chunk_encoding_callback
-[stream-pause]: #stream_readable_pause
-[stream-push]: #stream_readable_push_chunk_encoding
-[stream-read]: #stream_readable_read_size
-[stream-resume]: #stream_readable_resume
-[stream-write]: #stream_writable_write_chunk_encoding_callback
-[TCP sockets]: https://nodejs.org/docs/v7.0.0/api/net.html#net_class_net_socket
-[Transform]: #stream_class_stream_transform
-[Writable]: #stream_class_stream_writable
-[zlib]: zlib.html
-[`Symbol.hasInstance`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/hasInstance
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 75a73797ec..3a7d42d62b 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -10,6 +10,10 @@ var processNextTick = require('process-nextick-args');
var isArray = require('isarray');
+var Duplex;
Readable.ReadableState = ReadableState;
@@ -70,7 +74,6 @@ function prependListener(emitter, event, fn) {
-var Duplex;
function ReadableState(options, stream) {
Duplex = Duplex || require('./_stream_duplex');
@@ -140,7 +143,6 @@ function ReadableState(options, stream) {
-var Duplex;
function Readable(options) {
Duplex = Duplex || require('./_stream_duplex');
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index ffcc61772e..290b5ee9fd 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -14,6 +14,10 @@ var processNextTick = require('process-nextick-args');
var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : processNextTick;
+var Duplex;
Writable.WritableState = WritableState;
@@ -54,7 +58,6 @@ function WriteReq(chunk, encoding, cb) {
this.next = null;
-var Duplex;
function WritableState(options, stream) {
Duplex = Duplex || require('./_stream_duplex');
@@ -76,6 +79,7 @@ function WritableState(options, stream) {
// cast to ints.
this.highWaterMark = ~ ~this.highWaterMark;
+ // drain event flag.
this.needDrain = false;
// at the start of calling end()
this.ending = false;
@@ -188,7 +192,6 @@ if (typeof Symbol === 'function' && Symbol.hasInstance) {
-var Duplex;
function Writable(options) {
Duplex = Duplex || require('./_stream_duplex');
diff --git a/test/common.js b/test/common.js
index 8c58d065eb..171cb6ea52 100644
--- a/test/common.js
+++ b/test/common.js
@@ -60,10 +60,10 @@ exports.isOSX = process.platform === 'darwin';
exports.enoughTestMem = os.totalmem() > 0x40000000; /* 1 Gb */
var cpus = os.cpus();
-exports.enoughTestCpu = cpus.length > 1 || cpus[0].speed > 999;
+//exports.enoughTestCpu = cpus.length > 1 || cpus[0].speed > 999;
exports.rootDir = exports.isWindows ? 'c:\\' : '/';
-exports.buildType = process.config.target_defaults.default_configuration;
+//exports.buildType = process.config.target_defaults.default_configuration;
function rimrafSync(p) {
try {
@@ -537,6 +537,14 @@ exports.expectWarning = function (name, expected) {
}, expected.length));
+/**/if (!process.browser) {
+ Object.defineProperty(exports, 'hasIntl', {
+ get: function () {
+ return process.binding('config').hasIntl;
+ }
+ });
+} /**/
function forEach(xs, f) {
for (var i = 0, l = xs.length; i < l; i++) {
f(xs[i], i);
diff --git a/test/parallel/test-stream-writable-finished-state.js b/test/parallel/test-stream-writable-finished-state.js
new file mode 100644
index 0000000000..6decdbbba3
--- /dev/null
+++ b/test/parallel/test-stream-writable-finished-state.js
@@ -0,0 +1,24 @@
+var bufferShim = require('buffer-shims');
+var common = require('../common');
+var assert = require('assert/');
+var stream = require('../../');
+var writable = new stream.Writable();
+writable._write = function (chunk, encoding, cb) {
+ // The state finished should start in false.
+ assert.strictEqual(writable._writableState.finished, false);
+ cb();
+writable.on('finish', common.mustCall(function () {
+ assert.strictEqual(writable._writableState.finished, true);
+writable.end('testing finished state', common.mustCall(function () {
+ assert.strictEqual(writable._writableState.finished, true);
\ No newline at end of file
diff --git a/test/parallel/test-stream-writable-needdrain-state.js b/test/parallel/test-stream-writable-needdrain-state.js
new file mode 100644
index 0000000000..5e5ffa6380
--- /dev/null
+++ b/test/parallel/test-stream-writable-needdrain-state.js
@@ -0,0 +1,25 @@
+var bufferShim = require('buffer-shims');
+var common = require('../common');
+var stream = require('../../');
+var assert = require('assert/');
+var transform = new stream.Transform({
+ transform: _transform,
+ highWaterMark: 1
+function _transform(chunk, encoding, cb) {
+ assert.strictEqual(transform._writableState.needDrain, true);
+ cb();
+assert.strictEqual(transform._writableState.needDrain, false);
+transform.write('asdasd', common.mustCall(function () {
+ assert.strictEqual(transform._writableState.needDrain, false);
+assert.strictEqual(transform._writableState.needDrain, true);
\ No newline at end of file
diff --git a/test/parallel/test-stream-writableState-ending.js b/test/parallel/test-stream-writableState-ending.js
new file mode 100644
index 0000000000..18729fd147
--- /dev/null
+++ b/test/parallel/test-stream-writableState-ending.js
@@ -0,0 +1,36 @@
+var bufferShim = require('buffer-shims');
+var assert = require('assert/');
+var stream = require('../../');
+var writable = new stream.Writable();
+function testStates(ending, finished, ended) {
+ assert.strictEqual(writable._writableState.ending, ending);
+ assert.strictEqual(writable._writableState.finished, finished);
+ assert.strictEqual(writable._writableState.ended, ended);
+writable._write = function (chunk, encoding, cb) {
+ // ending, finished, ended start in false.
+ testStates(false, false, false);
+ cb();
+writable.on('finish', function () {
+ // ending, finished, ended = true.
+ testStates(true, true, true);
+writable.end('testing function end()', function () {
+ // ending, finished, ended = true.
+ testStates(true, true, true);
+// ending, ended = true.
+// finished = false.
+testStates(true, false, true);
\ No newline at end of file
diff --git a/test/parallel/test-stream2-readable-empty-buffer-no-eof.js b/test/parallel/test-stream2-readable-empty-buffer-no-eof.js
index e674ea37b1..a940eef012 100644
--- a/test/parallel/test-stream2-readable-empty-buffer-no-eof.js
+++ b/test/parallel/test-stream2-readable-empty-buffer-no-eof.js
@@ -1,7 +1,7 @@
var bufferShim = require('buffer-shims');
-var common = require('../common');
var assert = require('assert/');
var Readable = require('../../').Readable;
@@ -18,36 +18,35 @@ function test1() {
// note that this is very unusual. it only works for crypto streams
// because the other side of the stream will call read(0) to cycle
- // data through openssl. that's why we set the timeouts to call
+ // data through openssl. that's why setImmediate() is used to call
// r.read(0) again later, otherwise there is no more work being done
// and the process just exits.
var buf = bufferShim.alloc(5, 'x');
var reads = 5;
- var timeout = common.platformTimeout(50);
r._read = function (n) {
switch (reads--) {
- case 0:
- return r.push(null); // EOF
- case 1:
- return r.push(buf);
- case 2:
- setTimeout(r.read.bind(r, 0), timeout);
- return r.push(bufferShim.alloc(0)); // Not-EOF!
- case 3:
- setTimeout(r.read.bind(r, 0), timeout);
- return process.nextTick(function () {
- return r.push(bufferShim.alloc(0));
+ case 5:
+ return setImmediate(function () {
+ return r.push(buf);
case 4:
- setTimeout(r.read.bind(r, 0), timeout);
- return setTimeout(function () {
+ setImmediate(function () {
return r.push(bufferShim.alloc(0));
- case 5:
- return setTimeout(function () {
- return r.push(buf);
+ return setImmediate(r.read.bind(r, 0));
+ case 3:
+ setTimeout(r.read.bind(r, 0), 50);
+ return process.nextTick(function () {
+ return r.push(bufferShim.alloc(0));
+ case 2:
+ setImmediate(r.read.bind(r, 0));
+ return r.push(bufferShim.alloc(0)); // Not-EOF!
+ case 1:
+ return r.push(buf);
+ case 0:
+ return r.push(null); // EOF
throw new Error('unreachable');