From 14586b08303bbe7717cdf0e7b7b333fd45a8fe3b Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sat, 20 Oct 2018 15:04:57 +0200 Subject: [PATCH] stream: async iteration should work with destroyed stream Fixes https://github.com/nodejs/node/issues/23730. --- lib/internal/streams/async_iterator.js | 69 ++++++++++++------- .../test-stream-readable-async-iterators.js | 40 ++++++++++- 2 files changed, 82 insertions(+), 27 deletions(-) diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 91c473ee9d29c5..25b393d21f1fcb 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -1,5 +1,7 @@ 'use strict'; +const finished = require('internal/streams/end-of-stream'); + const kLastResolve = Symbol('lastResolve'); const kLastReject = Symbol('lastReject'); const kError = Symbol('error'); @@ -34,30 +36,6 @@ function onReadable(iter) { process.nextTick(readAndResolve, iter); } -function onEnd(iter) { - const resolve = iter[kLastResolve]; - if (resolve !== null) { - iter[kLastPromise] = null; - iter[kLastResolve] = null; - iter[kLastReject] = null; - resolve(createIterResult(null, true)); - } - iter[kEnded] = true; -} - -function onError(iter, err) { - const reject = iter[kLastReject]; - // reject if we are waiting for data in the Promise - // returned by next() and store the error - if (reject !== null) { - iter[kLastPromise] = null; - iter[kLastResolve] = null; - iter[kLastReject] = null; - reject(err); - } - iter[kError] = err; -} - function wrapForNext(lastPromise, iter) { return function(resolve, reject) { lastPromise.then(function() { @@ -86,6 +64,22 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ return Promise.resolve(createIterResult(null, true)); } + if (this[kStream].destroyed) { + // We need to defer via nextTick because if .destroy(err) is + // called, the error will be emitted via nextTick, and + // we cannot guarantee that there is no error lingering around + // waiting to be emitted. + return new Promise((resolve, reject) => { + process.nextTick(() => { + if (this[kError]) { + reject(this[kError]); + } else { + resolve(createIterResult(null, true)); + } + }); + }); + } + // if we have multiple next() calls // we will wait for the previous Promise to finish // this logic is optimized to support for await loops, @@ -155,9 +149,32 @@ const createReadableStreamAsyncIterator = (stream) => { }, }); + finished(stream, (err) => { + if (err) { + const reject = iterator[kLastReject]; + // reject if we are waiting for data in the Promise + // returned by next() and store the error + if (reject !== null) { + iterator[kLastPromise] = null; + iterator[kLastResolve] = null; + iterator[kLastReject] = null; + reject(err); + } + iterator[kError] = err; + return; + } + + const resolve = iterator[kLastResolve]; + if (resolve !== null) { + iterator[kLastPromise] = null; + iterator[kLastResolve] = null; + iterator[kLastReject] = null; + resolve(createIterResult(null, true)); + } + iterator[kEnded] = true; + }); + stream.on('readable', onReadable.bind(null, iterator)); - stream.on('end', onEnd.bind(null, iterator)); - stream.on('error', onError.bind(null, iterator)); return iterator; }; diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index fb3c55846c4450..ec558955c6ed18 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -1,7 +1,7 @@ 'use strict'; const common = require('../common'); -const { Readable } = require('stream'); +const { Readable, PassThrough, pipeline } = require('stream'); const assert = require('assert'); async function tests() { @@ -324,6 +324,44 @@ async function tests() { assert.strictEqual(data, expected); })(); + + await (async function() { + console.log('.next() on destroyed stream'); + const readable = new Readable({ + read() { + // no-op + } + }); + + readable.destroy(); + + try { + await readable[Symbol.asyncIterator]().next(); + } catch (e) { + assert.strictEqual(e.code, 'ERR_STREAM_PREMATURE_CLOSE'); + } + })(); + + await (async function() { + console.log('.next() on pipelined stream'); + const readable = new Readable({ + read() { + // no-op + } + }); + + const passthrough = new PassThrough(); + const err = new Error('kaboom'); + pipeline(readable, passthrough, common.mustCall((e) => { + assert.strictEqual(e, err); + })); + readable.destroy(err); + try { + await readable[Symbol.asyncIterator]().next(); + } catch (e) { + assert.strictEqual(e, err); + } + })(); } // to avoid missing some tests if a promise does not resolve