Skip to content

Commit

Permalink
stream: do not swallow errors with async iterators and pipeline
Browse files Browse the repository at this point in the history
Before this patch, pipeline() could swallow errors by pre-emptively
producing a ERR_STREAM_PREMATURE_CLOSE that was not really helpful
to the user.

Co-Authored-By: Robert Nagy <[email protected]>

PR-URL: #32051
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
  • Loading branch information
mcollina authored and MylesBorins committed Mar 11, 2020
1 parent 55a8ca8 commit 0a00552
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 15 deletions.
39 changes: 24 additions & 15 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ async function pump(iterable, writable, finish) {
if (!EE) {
EE = require('events');
}
let error;
try {
for await (const chunk of iterable) {
if (!writable.write(chunk)) {
Expand All @@ -118,7 +119,9 @@ async function pump(iterable, writable, finish) {
}
writable.end();
} catch (err) {
finish(err);
error = err;
} finally {
finish(error);
}
}

Expand All @@ -135,36 +138,37 @@ function pipeline(...streams) {
let value;
const destroys = [];

function finish(err, final) {
if (!error && err) {
let finishCount = 0;

function finish(err) {
const final = --finishCount === 0;

if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
error = err;
}

if (error || final) {
for (const destroy of destroys) {
destroy(error);
}
if (!error && !final) {
return;
}

while (destroys.length) {
destroys.shift()(error);
}

if (final) {
callback(error, value);
}
}

function wrap(stream, reading, writing, final) {
destroys.push(destroyer(stream, reading, writing, final, (err) => {
finish(err, final);
}));
}

let ret;
for (let i = 0; i < streams.length; i++) {
const stream = streams[i];
const reading = i < streams.length - 1;
const writing = i > 0;

if (isStream(stream)) {
wrap(stream, reading, writing, !reading);
finishCount++;
destroys.push(destroyer(stream, reading, writing, !reading, finish));
}

if (i === 0) {
Expand Down Expand Up @@ -210,20 +214,25 @@ function pipeline(...streams) {
pt.destroy(err);
});
} else if (isIterable(ret, true)) {
finishCount++;
pump(ret, pt, finish);
} else {
throw new ERR_INVALID_RETURN_VALUE(
'AsyncIterable or Promise', 'destination', ret);
}

ret = pt;
wrap(ret, false, true, true);

finishCount++;
destroys.push(destroyer(ret, false, true, true, finish));
}
} else if (isStream(stream)) {
if (isReadable(ret)) {
ret.pipe(stream);
} else {
ret = makeAsyncIterable(ret);

finishCount++;
pump(ret, stream, finish);
}
ret = stream;
Expand Down
27 changes: 27 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -968,3 +968,30 @@ const { promisify } = require('util');
}));
src.end();
}

{
let res = '';
const rs = new Readable({
read() {
setImmediate(() => {
rs.push('hello');
});
}
});
const ws = new Writable({
write: common.mustNotCall()
});
pipeline(rs, async function*(stream) {
/* eslint no-unused-vars: off */
for await (const chunk of stream) {
throw new Error('kaboom');
}
}, async function *(source) {
for await (const chunk of source) {
res += chunk;
}
}, ws, common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
assert.strictEqual(res, '');
}));
}

0 comments on commit 0a00552

Please sign in to comment.