Skip to content

Commit

Permalink
stream: don't destroy final readable stream in pipeline
Browse files Browse the repository at this point in the history
If the last stream in a pipeline is still usable/readable
don't destroy it to allow further composition.

Fixes: #32105
Backport-PR-URL: #32111
PR-URL: #32110
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
  • Loading branch information
ronag authored and MylesBorins committed Mar 9, 2020
1 parent ddb8824 commit 4640ea2
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 3 deletions.
9 changes: 6 additions & 3 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@ function destroyStream(stream, err) {
if (typeof stream.close === 'function') return stream.close();
}

function destroyer(stream, reading, writing, callback) {
function destroyer(stream, reading, writing, final, callback) {
callback = once(callback);
let destroyed = false;

if (eos === undefined) eos = require('internal/streams/end-of-stream');
eos(stream, { readable: reading, writable: writing }, (err) => {
if (destroyed) return;
destroyed = true;
destroyStream(stream, err);
const readable = stream.readable || isRequest(stream);
if (err || !final || !readable) {
destroyStream(stream, err);
}
callback(err);
});

Expand Down Expand Up @@ -176,7 +179,7 @@ function pipeline(...streams) {
}

function wrap(stream, reading, writing, final) {
destroys.push(destroyer(stream, reading, writing, (err) => {
destroys.push(destroyer(stream, reading, writing, final, (err) => {
finish(err, final);
}));
}
Expand Down
46 changes: 46 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,52 @@ const { promisify } = require('util');
const dst = new PassThrough({ autoDestroy: false });
pipeline(src, dst, common.mustCall(() => {
assert.strictEqual(src.destroyed, true);
assert.strictEqual(dst.destroyed, false);
}));
src.end();
}

{
const server = http.createServer((req, res) => {
});

server.listen(0, () => {
const req = http.request({
port: server.address().port
});

const body = new PassThrough();
pipeline(
body,
req,
common.mustCall((err) => {
assert(!err);
assert(!req.res);
assert(!req.aborted);
req.abort();
server.close();
})
);
body.end();
});
}

{
const src = new PassThrough();
const dst = new PassThrough();
pipeline(src, dst, common.mustCall((err) => {
assert(!err);
assert.strictEqual(dst.destroyed, false);
}));
src.end();
}

{
const src = new PassThrough();
const dst = new PassThrough();
dst.readable = false;
pipeline(src, dst, common.mustCall((err) => {
assert(!err);
assert.strictEqual(dst.destroyed, true);
}));
src.end();
Expand Down

0 comments on commit 4640ea2

Please sign in to comment.