Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: flow for 'data' listeners upon removal of last 'readable' listener #21696

Closed
wants to merge 3 commits into from

Conversation

lundibundi
Copy link
Member

@lundibundi lundibundi commented Jul 7, 2018

When there is at least one 'data' listener try to flow when
last 'readable' listener gets removed and the stream is not piped.

Currently if we have both 'readable' and 'data' listeners set only
'readable' listener will get called and stream will be stalled without
'data' and 'end' events if 'readable' listener neither read()'s nor
resume()'s the stream.

Fixes: #21398

Checklist
  • make -j4 test (UNIX) passes
  • tests and/or benchmarks are included
  • documentation is changed or added
  • commit message follows commit guidelines

@nodejs-github-bot nodejs-github-bot added the stream Issues and PRs related to the stream subsystem. label Jul 7, 2018
@mscdex
Copy link
Contributor

mscdex commented Jul 7, 2018

/cc @nodejs/streams

@lundibundi
Copy link
Member Author

ping @mcollina @mafintosh.

@lundibundi
Copy link
Member Author

@mcollina CI has a seemingly not related failure, can we rerun or am I misunderstanding something? (I also tried --repeat 192 on my linux for those 3 tests and no failures).

CITGM has a lot of build failures, I'm not sure how to interpret the results.

Also, I've rebased on master just in case.

@mcollina
Copy link
Member

mcollina commented Aug 6, 2018

mcollina
mcollina previously approved these changes Aug 6, 2018
Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mcollina mcollina added the author ready PRs that have at least one approval, no pending requests for changes, and a CI started. label Aug 6, 2018
@mcollina mcollina changed the title stream: flow for 'data' listeners upon removeal of last 'readable' listener stream: flow for 'data' listeners upon removal of last 'readable' listener Aug 6, 2018
@mcollina
Copy link
Member

mcollina commented Aug 6, 2018

jasnell
jasnell previously requested changes Aug 6, 2018
only when [`stream.read()`][stream-read] is called.
only when [`stream.read()`][stream-read] is called. But when last `'readable'`
was removed stream will try to [`stream.resume()`][stream-resume] to fulfill
`'data'` listeners. Therefore you can use it like this (bar.txt has 'abc\n'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid using you in the docs. This last sentence can just be dropped.

@lundibundi
Copy link
Member Author

@jasnell done. Though we will probably have to wait until CITGM machines are working properly again.

@mcollina
Copy link
Member

mcollina commented Aug 6, 2018

I think landing this without a CITGM run is ok.

CI: https://ci.nodejs.org/job/node-test-pull-request/16217/.

@mafintosh
Copy link
Member

So this basically restores the "magic" behaivor of the data event? Guess that makes sense, LGTM

if (!state.readableListening &&
self.listenerCount('data') > 0 &&
state.pipesCount === 0) {
self.resume();
Copy link
Member

@mafintosh mafintosh Aug 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the stream was explicitly paused before? Something like:

stream.pause()
stream.on('data', ...)
stream.once('readable', ...)

Seems bad that this would auto resume that, if that is the case

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lundibundi can you please add a unit test in that case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've added a test and now spotted a problem. After @mcollina PR to make readable take precedence over data there is no way now to determine whether it was user who called pause or we via (this.resume() in on('data') handler) because we use cf5f986#diff-ba6a0df0f5212f5cba5ca5179e209a17R878 and now state.flowing is false if we add 'data' listener after 'readable' listener even though user didn't explicitly pause the stream.

I'm not sure what's the correct solution here. If I add this.isPaused check in here it will only work if a user called it like this

r.on('data', (chunk) => receivedData += chunk);
r.once('readable', common.mustCall());

and won't work for

r.once('readable', common.mustCall());
r.on('data', (chunk) => receivedData += chunk);

which is extremely bizarre.

I'm thinking of not setting flowing at all until we have 0 'readable' listeners at which point check for data listeners and then set it to true/false (resume/pause).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking of not setting flowing at all until we have 0 'readable' listeners at which point check for data listeners and then set it to true/false (resume/pause).

I think that would work.

- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/21696
description: >
When using both `'readable'` and [`'data'`][] stream will try to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: the stream

@@ -825,7 +831,33 @@ result in increased throughput.

If both `'readable'` and [`'data'`][] are used at the same time, `'readable'`
takes precedence in controlling the flow, i.e. `'data'` will be emitted
only when [`stream.read()`][stream-read] is called.
only when [`stream.read()`][stream-read] is called. But when last `'readable'`
was removed stream will try to [`stream.resume()`][stream-resume] to fulfill
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: the stream

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks.

console.log('readable');
});
rr.on('data', (chunk) => {
console.log(chunk.toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might want to add rr.setEncoding('utf8'), or pass the encoding: 'utf8' option to createReadStream rather than using .toString(), to encourage best practices

(The current code could fail if bar.txt contains UTF-8 characters outside the ASCII range)

// (if pipesCount is not 0 then we have already 'flowed')
if (!state.readableListening &&
self.listenerCount('data') > 0 &&
state.pipesCount === 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test covering what happens if the last readable listener is removed but there is a pipe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@addaleax
Copy link
Member

addaleax commented Aug 6, 2018

I’d prefer a CITGM run, btw.

@mcollina
Copy link
Member

mcollina commented Aug 7, 2018

@jasnell jasnell dismissed their stale review August 7, 2018 13:48

Issue fixed :-)

@mcollina mcollina removed the author ready PRs that have at least one approval, no pending requests for changes, and a CI started. label Aug 8, 2018
@mcollina
Copy link
Member

mcollina commented Aug 8, 2018

@lundibundi
Copy link
Member Author

PTAL #21696 (comment)

Also, currently if we have 'readable' listener on a Readable stream, if we pipe it to other stream nothing will happen, try this (uncomment/comment 'readable' listener):

const data = ['foo', 'bar', 'baz'];
const r = new Readable({
  read: () => {},
});

// r.on('readable', () => {});

r.pipe(process.stdout);
r.push(data[0]);
r.push(data[1]);
r.push(data[2]);
r.push(null);

@mcollina
Copy link
Member

mcollina commented Aug 9, 2018

#21696 (comment) is as documented.

@mcollina mcollina dismissed their stale review August 9, 2018 08:28

It needs a little bit more work.

@lundibundi
Copy link
Member Author

lundibundi commented Aug 9, 2018

@mcollina Yeah we have that in the documentation of pause, 'readable' and 'data'. But in .pipe documentation it still says that

The readable.pipe() method attaches a Writable stream to the readable, causing it to switch automatically into flowing mode and push all of its data to the attached Writable

Therefore we should either change the doc or the code. And imo the latter will be better as we will restore the ability to tell if user manually called .pause and will not confuse it with us calling this.resume when there are >0 'readable' listeners. Furthermore, it should simplify Streams for users at least a bit.

I have a fix (the one I mentioned here #21696 (comment)) but for some reason test-tls-server-verify fails with 'AssertionError [ERR_ASSERTION]: 5 2 agent3 rejected, but should NOT have been' (and also test-stream-readable-reading-readingMore but it tests that flowing is false explicitly) and I'm trying to understand why.

@mcollina
Copy link
Member

mcollina commented Aug 9, 2018

Yeah we have that in the documentation of pause, 'readable' and 'data'. But in .pipe documentation it still says that

The docs would need changing, as .pipe() relies on on('data').

test-stream-readable-reading-readingMore might need to be changed as a result of this as it's really picky about the internals.

I'm not sure about test-tls-server-verify.

@lundibundi
Copy link
Member Author

lundibundi commented Aug 9, 2018

@mcollina But then the problem with isPaused I mentioned in #21696 (comment) arises. So either we will always flow after last 'readable' was removed and ignore paused state or we will have that bizarre behavior I mentioned.

Also, .pipe() manually calls .resume so it doesn't use .on('data') property of flowing the stream.

Maybe it will be better for me to push (my fix that also restores pipe) so you could take a look?
Edit: I've pushed the changes PTAL. Also for some reason after rebase on master test-tls-server-verify no longer fails. The other one still fails, I'll fix it soon.

When there is at least one 'data' listener try to flow when
last 'readable' listener gets removed and the stream is not piped.

Currently if we have both 'readable' and 'data' listeners set only
'readable' listener will get called and stream will be stalled without
'data' and 'end' events if 'readable' listener neither read()'s nor
resume()'s the stream.

Fixes: nodejs#21398
Now state.flowing will be set only after all of the 'readable'
listeners are gone and if we have at least one 'data' listener.

* on('data') will not flow (flowing === null and not false) if there
  are 'readable' listeners
* pipe() will work regardless of 'readable' listeners
* isPause reports only user .pause call (before setting 'data' listener
  when there is already 'readable' listener also set flowing to false)
* resume always sets stream to flowing state
@mcollina
Copy link
Member

mcollina commented Aug 9, 2018

I've added an alternative implementation of the fix: see #22209.

mcollina added a commit to mcollina/node that referenced this pull request Aug 21, 2018
Trott pushed a commit to Trott/io.js that referenced this pull request Aug 22, 2018
@mcollina
Copy link
Member

Fixed in 98cf84f.

@mcollina mcollina closed this Aug 22, 2018
targos pushed a commit that referenced this pull request Aug 24, 2018
targos pushed a commit that referenced this pull request Sep 3, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants