Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: restore flow if there are 'data' handlers after once('readable') #22209

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,12 @@ instance, when the `readable.resume()` method is called without a listener
attached to the `'data'` event, or when a `'data'` event handler is removed
from the stream.

Adding a [`'readable'`][] event handler automatically make the stream to
stop flowing, and the data to be consumed via
[`readable.read()`][stream-read]. If the [`'readable'`] event handler is
removed, then the stream will start flowing again if there is a
[`'data'`][] event handler.

#### Three States

The "two modes" of operation for a `Readable` stream are a simplified
Expand Down Expand Up @@ -666,12 +672,15 @@ within the streams internal buffer.
The `Readable` stream API evolved across multiple Node.js versions and provides
multiple methods of consuming stream data. In general, developers should choose
*one* of the methods of consuming data and *should never* use multiple methods
to consume data from a single stream.
to consume data from a single stream. Specifically, using a combination
of `on('data')`, `on('readable')`, `pipe()` or async iterators could
lead to unintuitive behavior.

Use of the `readable.pipe()` method is recommended for most users as it has been
implemented to provide the easiest way of consuming stream data. Developers that
require more fine-grained control over the transfer and generation of data can
use the [`EventEmitter`][] and `readable.pause()`/`readable.resume()` APIs.
use the [`EventEmitter`][] and `readable.on('readable')`/`readable.read()`
or the `readable.pause()`/`readable.resume()` APIs.

#### Class: stream.Readable
<!-- YAML
Expand Down Expand Up @@ -825,7 +834,11 @@ result in increased throughput.

If both `'readable'` and [`'data'`][] are used at the same time, `'readable'`
takes precedence in controlling the flow, i.e. `'data'` will be emitted
only when [`stream.read()`][stream-read] is called.
only when [`stream.read()`][stream-read] is called. The
`readableFlowing` property would become `false`.
If there are `'data'` listeners when `'readable'` is removed, the stream
will start flowing, i.e. `'data'` events will be emitted without calling
`.resume()`.

##### readable.destroy([error])
<!-- YAML
Expand Down Expand Up @@ -887,6 +900,9 @@ readable.on('data', (chunk) => {
});
```

The `readable.pause()` method has no effect if there is a `'readable'`
event listener.

##### readable.pipe(destination[, options])
<!-- YAML
added: v0.9.4
Expand Down
9 changes: 8 additions & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ Readable.prototype.on = function(ev, fn) {
} else if (ev === 'readable') {
if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true;
state.flowing = false;
state.emittedReadable = false;
debug('on readable', state.length, state.reading);
if (state.length) {
Expand Down Expand Up @@ -858,6 +859,11 @@ Readable.prototype.removeAllListeners = function(ev) {

function updateReadableListening(self) {
self._readableState.readableListening = self.listenerCount('readable') > 0;

// crude way to check if we should resume
if (self.listenerCount('data') > 0) {
self.resume();
}
}

function nReadingNextTick(self) {
Expand All @@ -872,7 +878,8 @@ Readable.prototype.resume = function() {
if (!state.flowing) {
debug('resume');
// we flow only if there is no one listening
// for readable
// for readable, but we still have to call
// resume()
state.flowing = !state.readableListening;
resume(this, state);
}
Expand Down
61 changes: 61 additions & 0 deletions test/parallel/test-stream-once-readable-pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { Readable, Writable } = require('stream');

// This test ensures that if have 'readable' listener
// on Readable instance it will not disrupt the pipe.

{
let receivedData = '';
const w = new Writable({
write: (chunk, env, callback) => {
receivedData += chunk;
callback();
},
});

const data = ['foo', 'bar', 'baz'];
const r = new Readable({
read: () => {},
});

r.once('readable', common.mustCall());

r.pipe(w);
r.push(data[0]);
r.push(data[1]);
r.push(data[2]);
r.push(null);

w.on('finish', common.mustCall(() => {
assert.strictEqual(receivedData, data.join(''));
}));
}

{
let receivedData = '';
const w = new Writable({
write: (chunk, env, callback) => {
receivedData += chunk;
callback();
},
});

const data = ['foo', 'bar', 'baz'];
const r = new Readable({
read: () => {},
});

r.pipe(w);
r.push(data[0]);
r.push(data[1]);
r.push(data[2]);
r.push(null);
r.once('readable', common.mustCall());

w.on('finish', common.mustCall(() => {
assert.strictEqual(receivedData, data.join(''));
}));
}
7 changes: 4 additions & 3 deletions test/parallel/test-stream-readable-reading-readingMore.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ const Readable = require('stream').Readable;
assert.strictEqual(state.reading, false);
}

const expectedReadingMore = [true, false];
readable.on('readable', common.mustCall(() => {
// 'readable' always gets called before 'end'
// since 'end' hasn't been emitted, more data could be incoming
assert.strictEqual(state.readingMore, true);
// there is only one readingMore scheduled from on('data'),
// after which everything is governed by the .read() call
assert.strictEqual(state.readingMore, expectedReadingMore.shift());

// if the stream has ended, we shouldn't be reading
assert.strictEqual(state.ended, !state.reading);
Expand Down