Skip to content

Commit

Permalink
stream: pipeline wait for close before calling the callback
Browse files Browse the repository at this point in the history
The pipeline should wait for close event to finish before calling
the callback.

The `finishCount` should not below 0 when calling finish function.

Fixes: #51540

Co-authored-by: wh0 <[email protected]>
PR-URL: #53462
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Robert Nagy <[email protected]>
  • Loading branch information
2 people authored and aduh95 committed Jul 16, 2024
1 parent 508abfe commit 82d88a8
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 4 deletions.
12 changes: 8 additions & 4 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ function pipelineImpl(streams, callback, opts) {
finishImpl(err, --finishCount === 0);
}

function finishOnlyHandleError(err) {
finishImpl(err, false);
}

function finishImpl(err, final) {
if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
error = err;
Expand Down Expand Up @@ -273,7 +277,7 @@ function pipelineImpl(streams, callback, opts) {
err.name !== 'AbortError' &&
err.code !== 'ERR_STREAM_PREMATURE_CLOSE'
) {
finish(err);
finishOnlyHandleError(err);
}
}
stream.on('error', onError);
Expand Down Expand Up @@ -366,7 +370,7 @@ function pipelineImpl(streams, callback, opts) {
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
finishCount += 2;
const cleanup = pipe(ret, stream, finish, { end });
const cleanup = pipe(ret, stream, finish, finishOnlyHandleError, { end });
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}
Expand Down Expand Up @@ -409,12 +413,12 @@ function pipelineImpl(streams, callback, opts) {
return ret;
}

function pipe(src, dst, finish, { end }) {
function pipe(src, dst, finish, finishOnlyHandleError, { end }) {
let ended = false;
dst.on('close', () => {
if (!ended) {
// Finish if the destination closes before the source has completed.
finish(new ERR_STREAM_PREMATURE_CLOSE());
finishOnlyHandleError(new ERR_STREAM_PREMATURE_CLOSE());
}
});

Expand Down
42 changes: 42 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -1664,5 +1664,47 @@ const tsp = require('timers/promises');
pipeline(r, w, common.mustCall((err) => {
assert.strictEqual(err, undefined);
}));
}

{
// See https://github.com/nodejs/node/issues/51540 for the following 2 tests
const src = new Readable();
const dst = new Writable({
destroy(error, cb) {
// Takes a while to destroy
setImmediate(cb);
},
});

pipeline(src, dst, (err) => {
assert.strictEqual(src.closed, true);
assert.strictEqual(dst.closed, true);
assert.strictEqual(err.message, 'problem');
});
src.destroy(new Error('problem'));
}

{
const src = new Readable();
const dst = new Writable({
destroy(error, cb) {
// Takes a while to destroy
setImmediate(cb);
},
});
const passThroughs = [];
for (let i = 0; i < 10; i++) {
passThroughs.push(new PassThrough());
}

pipeline(src, ...passThroughs, dst, (err) => {
assert.strictEqual(src.closed, true);
assert.strictEqual(dst.closed, true);
assert.strictEqual(err.message, 'problem');

for (let i = 0; i < passThroughs.length; i++) {
assert.strictEqual(passThroughs[i].closed, true);
}
});
src.destroy(new Error('problem'));
}

0 comments on commit 82d88a8

Please sign in to comment.