From d89395852873cd6d294474ef90c4b219e09a2ade Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 6 Jul 2021 00:00:56 +0200 Subject: [PATCH] stream: use finished for async iteration --- lib/internal/streams/readable.js | 61 +++++++++----------------------- 1 file changed, 17 insertions(+), 44 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 6eb07cdd4a9b14..5e04b4bc439efc 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -45,6 +45,7 @@ const { Buffer } = require('buffer'); const { addAbortSignalNoValidate, } = require('internal/streams/add-abort-signal'); +const eos = require('internal/streams/end-of-stream'); let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { debug = fn; @@ -59,7 +60,6 @@ const { const { ERR_INVALID_ARG_TYPE, ERR_METHOD_NOT_IMPLEMENTED, - ERR_STREAM_PREMATURE_CLOSE, ERR_STREAM_PUSH_AFTER_EOF, ERR_STREAM_UNSHIFT_AFTER_END_EVENT, } = require('internal/errors').codes; @@ -1090,12 +1090,6 @@ function streamToAsyncIterator(stream, options) { async function* createAsyncIterator(stream, options) { let callback = nop; - const opts = { - destroyOnReturn: true, - destroyOnError: true, - ...options, - }; - function next(resolve) { if (this === stream) { callback(); @@ -1105,54 +1099,33 @@ async function* createAsyncIterator(stream, options) { } } - const state = stream._readableState; + stream.on('readable', next); + + let error; + eos(stream, { writable: false }, (err) => { + error = err || null; + callback(); + callback = nop; + }); - let error = state.errored; - let errorEmitted = state.errorEmitted; - let endEmitted = state.endEmitted; - let closeEmitted = state.closeEmitted; - - stream - .on('readable', next) - .on('error', function(err) { - error = err; - errorEmitted = true; - next.call(this); - }) - .on('end', function() { - endEmitted = true; - next.call(this); - }) - .on('close', function() { - closeEmitted = true; - next.call(this); - }); - - let errorThrown = false; try { while (true) { - const chunk = stream.destroyed ? null : stream.read(); + const chunk = stream.read(); if (chunk !== null) { yield chunk; - } else if (errorEmitted) { - throw error; - } else if (endEmitted) { + } else if (error !== undefined) { break; - } else if (closeEmitted) { - throw new ERR_STREAM_PREMATURE_CLOSE(); } else { await new Promise(next); } } - } catch (err) { - if (opts.destroyOnError) { - destroyImpl.destroyer(stream, err); - } - errorThrown = true; - throw err; } finally { - if (!errorThrown && opts.destroyOnReturn) { - if (state.autoDestroy || !endEmitted) { + if (error) { + if (options?.destroyOnError !== false) { + destroyImpl.destroyer(stream, error); + } + } else if (options?.destroyOnReturn !== false) { + if (stream._readableState.autoDestroy) { destroyImpl.destroyer(stream, null); } }