Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: need to cleanup event listeners if last stream is readable #41954

Merged
merged 2 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 38 additions & 11 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@ function destroyer(stream, reading, writing) {
finished = true;
});

eos(stream, { readable: reading, writable: writing }, (err) => {
const cleanup = eos(stream, { readable: reading, writable: writing }, (err) => {
finished = !err;
});

return (err) => {
if (finished) return;
finished = true;
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
return {
callback: (err) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
callback: (err) => {
destroy: (err) => {

if (finished) return;
finished = true;
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
},
cleanup
};
}

Expand Down Expand Up @@ -159,6 +162,10 @@ function pipelineImpl(streams, callback, opts) {
const signal = ac.signal;
const outerSignal = opts?.signal;

// Need to cleanup event listeners if last stream is readable
// https://github.com/nodejs/node/issues/35452
const lastStreamCleanup = streams[streams.length - 1].readable && [];
ronag marked this conversation as resolved.
Show resolved Hide resolved

validateAbortSignal(outerSignal, 'options.signal');

function abort() {
Expand Down Expand Up @@ -194,6 +201,11 @@ function pipelineImpl(streams, callback, opts) {
ac.abort();

if (final) {
if (!error) {
while (lastStreamCleanup && lastStreamCleanup.length) {
lastStreamCleanup.shift()();
}
Copy link
Member

@ronag ronag Feb 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while (lastStreamCleanup && lastStreamCleanup.length) {
lastStreamCleanup.shift()();
}
lastStreamCleanup.forEach(fn => fn());

}
process.nextTick(callback, error, value);
}
}
Expand All @@ -204,22 +216,34 @@ function pipelineImpl(streams, callback, opts) {
const reading = i < streams.length - 1;
const writing = i > 0;
const end = reading || opts?.end !== false;
const isLastStream = i === streams.length - 1;

if (isNodeStream(stream)) {
if (end) {
destroys.push(destroyer(stream, reading, writing));
const { callback, cleanup } = destroyer(stream, reading, writing);
destroys.push(callback);

if (lastStreamCleanup && isLastStream) {
lastStreamCleanup.push(cleanup);
}
Copy link
Member

@ronag ronag Feb 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const { callback, cleanup } = destroyer(stream, reading, writing);
destroys.push(callback);
if (lastStreamCleanup && isLastStream) {
lastStreamCleanup.push(cleanup);
}
const { destroy, cleanup } = destroyer(stream, reading, writing);
destroys.push(destroy);
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}

}

// Catch stream errors that occur after pipe/pump has completed.
stream.on('error', (err) => {
function onError(err) {
if (
err &&
err.name !== 'AbortError' &&
err.code !== 'ERR_STREAM_PREMATURE_CLOSE'
) {
finish(err);
}
});
}
stream.on('error', onError);
if (lastStreamCleanup && isLastStream) {
lastStreamCleanup.push(() => {
stream.removeListener('error', onError);
});
}
Copy link
Member

@ronag ronag Feb 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (lastStreamCleanup && isLastStream) {
lastStreamCleanup.push(() => {
stream.removeListener('error', onError);
});
}
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(() => {
stream.removeListener('error', onError);
});
}

}

if (i === 0) {
Expand Down Expand Up @@ -285,12 +309,15 @@ function pipelineImpl(streams, callback, opts) {

ret = pt;

destroys.push(destroyer(ret, false, true));
destroys.push(destroyer(ret, false, true).callback);
Copy link
Member

@ronag ronag Feb 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
destroys.push(destroyer(ret, false, true).callback);
const { destroy, cleanup } = (destroyer(ret, false, true)
destroys.push(destroy);
if (isLastStream) {
lastStreamCleanup.push(cleanup);
}

}
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
finishCount += 2;
pipe(ret, stream, finish, { end });
const cleanup = pipe(ret, stream, finish, { end });
if (lastStreamCleanup && isLastStream) {
lastStreamCleanup.push(cleanup);
}
Copy link
Member

@ronag ronag Feb 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (lastStreamCleanup && isLastStream) {
lastStreamCleanup.push(cleanup);
}
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}

} else if (isIterable(ret)) {
finishCount++;
pump(ret, stream, finish, { end });
Expand Down Expand Up @@ -345,7 +372,7 @@ function pipe(src, dst, finish, { end }) {
finish(err);
}
});
eos(dst, { readable: false, writable: true }, finish);
return eos(dst, { readable: false, writable: true }, finish);
}

module.exports = { pipelineImpl, pipeline };
54 changes: 54 additions & 0 deletions test/parallel/test-stream-pipeline-listeners.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
'use strict';

const common = require('../common');
const { pipeline, Duplex, PassThrough, Writable } = require('stream');
const assert = require('assert');

const a = new PassThrough();
a.end('foobar');

const b = new Duplex({
write(chunk, encoding, callback) {
callback();
}
});

process.on('uncaughtException', common.mustCall((err) => {
assert.strictEqual(err.message, 'no way');
}, 1));

pipeline(a, b, common.mustCall((error) => {
if (error) {
assert.ifError(error);
}

// Ensure that listeners is removed if last stream is readble
// And other stream's listeners unchanged
assert(a.listenerCount('error') > 0);
assert.strictEqual(b.listenerCount('error'), 0);
setTimeout(() => {
assert.strictEqual(b.listenerCount('error'), 0);
b.destroy(new Error('no way'));
}, 100);
}));

// If last stream is not readable, will not throw and remove listeners
const c = new PassThrough();
c.end('foobar');
const d = new Writable({
write(chunk, encoding, callback) {
callback();
}
});
pipeline(c, d, common.mustCall((error) => {
if (error) {
assert.ifError(error);
}

assert(c.listenerCount('error') > 0);
assert(d.listenerCount('error') > 0);
setTimeout(() => {
assert(d.listenerCount('error') > 0);
d.destroy(new Error('no way'));
}, 100);
}));