Skip to content

Commit

Permalink
stream: use finished for async iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Jul 5, 2021
1 parent 70cf0dc commit d893958
Showing 1 changed file with 17 additions and 44 deletions.
61 changes: 17 additions & 44 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const { Buffer } = require('buffer');
const {
addAbortSignalNoValidate,
} = require('internal/streams/add-abort-signal');
const eos = require('internal/streams/end-of-stream');

let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
debug = fn;
Expand All @@ -59,7 +60,6 @@ const {
const {
ERR_INVALID_ARG_TYPE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_STREAM_PREMATURE_CLOSE,
ERR_STREAM_PUSH_AFTER_EOF,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
} = require('internal/errors').codes;
Expand Down Expand Up @@ -1090,12 +1090,6 @@ function streamToAsyncIterator(stream, options) {
async function* createAsyncIterator(stream, options) {
let callback = nop;

const opts = {
destroyOnReturn: true,
destroyOnError: true,
...options,
};

function next(resolve) {
if (this === stream) {
callback();
Expand All @@ -1105,54 +1099,33 @@ async function* createAsyncIterator(stream, options) {
}
}

const state = stream._readableState;
stream.on('readable', next);

let error;
eos(stream, { writable: false }, (err) => {
error = err || null;
callback();
callback = nop;
});

let error = state.errored;
let errorEmitted = state.errorEmitted;
let endEmitted = state.endEmitted;
let closeEmitted = state.closeEmitted;

stream
.on('readable', next)
.on('error', function(err) {
error = err;
errorEmitted = true;
next.call(this);
})
.on('end', function() {
endEmitted = true;
next.call(this);
})
.on('close', function() {
closeEmitted = true;
next.call(this);
});

let errorThrown = false;
try {
while (true) {
const chunk = stream.destroyed ? null : stream.read();
const chunk = stream.read();
if (chunk !== null) {
yield chunk;
} else if (errorEmitted) {
throw error;
} else if (endEmitted) {
} else if (error !== undefined) {
break;
} else if (closeEmitted) {
throw new ERR_STREAM_PREMATURE_CLOSE();
} else {
await new Promise(next);
}
}
} catch (err) {
if (opts.destroyOnError) {
destroyImpl.destroyer(stream, err);
}
errorThrown = true;
throw err;
} finally {
if (!errorThrown && opts.destroyOnReturn) {
if (state.autoDestroy || !endEmitted) {
if (error) {
if (options?.destroyOnError !== false) {
destroyImpl.destroyer(stream, error);
}
} else if (options?.destroyOnReturn !== false) {
if (stream._readableState.autoDestroy) {
destroyImpl.destroyer(stream, null);
}
}
Expand Down

0 comments on commit d893958

Please sign in to comment.