Skip to content

Commit

Permalink
stream: cleanup async handling
Browse files Browse the repository at this point in the history
Cleanup async stream method handling.
  • Loading branch information
ronag committed Jul 9, 2021
1 parent a5ba28d commit 495aa7a
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 106 deletions.
118 changes: 30 additions & 88 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,16 @@ function destroy(err, cb) {

function _destroy(self, err, cb) {
let called = false;
const result = self._destroy(err || null, (err) => {
const r = self._readableState;
const w = self._writableState;

function onDestroy(err) {
if (called) {
return;
}
called = true;

const r = self._readableState;
const w = self._writableState;

checkError(err, w, r);

if (w) {
Expand All @@ -94,59 +98,19 @@ function _destroy(self, err, cb) {
} else {
process.nextTick(emitCloseNT, self);
}
});
if (result !== undefined && result !== null) {
}
const result = self._destroy(err || null, onDestroy);
if (result != null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
function() {
if (called)
return;

const r = self._readableState;
const w = self._writableState;

if (w) {
w.closed = true;
}
if (r) {
r.closed = true;
}

if (typeof cb === 'function') {
process.nextTick(cb);
}

process.nextTick(emitCloseNT, self);
process.nextTick(onDestroy, null);
},
function(err) {
const r = self._readableState;
const w = self._writableState;
err.stack; // eslint-disable-line no-unused-expressions

called = true;

if (w && !w.errored) {
w.errored = err;
}
if (r && !r.errored) {
r.errored = err;
}

if (w) {
w.closed = true;
}
if (r) {
r.closed = true;
}

if (typeof cb === 'function') {
process.nextTick(cb, err);
}

process.nextTick(emitErrorCloseNT, self, err);
process.nextTick(onDestroy, err);
});
}
} catch (err) {
Expand Down Expand Up @@ -285,69 +249,47 @@ function construct(stream, cb) {
}

function constructNT(stream) {
const r = stream._readableState;
const w = stream._writableState;
// With duplex streams we use the writable side for state.
const s = w || r;

let called = false;
const result = stream._construct((err) => {

function onConstruct(err) {
if (called) {
errorOrDestroy(stream, err ?? new ERR_MULTIPLE_CALLBACK());
return;
}
called = true;

const r = stream._readableState;
const w = stream._writableState;
const s = w || r;

if (r) {
r.constructed = true;
}
if (w) {
w.constructed = true;
}

if (called) {
err = new ERR_MULTIPLE_CALLBACK();
} else {
called = true;
}

if (s.destroyed) {
stream.emit(kDestroy, err);
} else if (err) {
errorOrDestroy(stream, err, true);
} else {
process.nextTick(emitConstructNT, stream);
}
});
if (result !== undefined && result !== null) {
}

const result = stream._construct(onConstruct);
if (result != null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
function() {
// If the callback was invoked, do nothing further.
if (called)
return;
if (r) {
r.constructed = true;
}
if (w) {
w.constructed = true;
}
if (s.destroyed) {
process.nextTick(() => stream.emit(kDestroy));
} else {
process.nextTick(emitConstructNT, stream);
}
process.nextTick(onConstruct, null);
},
function(err) {
if (r) {
r.constructed = true;
}
if (w) {
w.constructed = true;
}
called = true;
if (s.destroyed) {
process.nextTick(() => stream.emit(kDestroy, err));
} else {
process.nextTick(errorOrDestroy, stream, err);
}
process.nextTick(onConstruct, err);
});
}
} catch (err) {
Expand Down
19 changes: 18 additions & 1 deletion lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,25 @@ Readable.prototype.read = function(n) {
// If the length is currently zero, then we *need* a readable event.
if (state.length === 0)
state.needReadable = true;

// Call internal read method
this._read(state.highWaterMark);
const result = this._read(state.highWaterMark);
if (result != null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
nop,
function(err) {
errorOrDestroy(this, err);
});
}
} catch (err) {
errorOrDestroy(this, err);
}
}

state.sync = false;
// If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
Expand Down
36 changes: 19 additions & 17 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -658,9 +658,15 @@ function needFinish(state) {
}

function callFinal(stream, state) {
state.sync = true;
state.pendingcb++;
const result = stream._final((err) => {
let called = false;

function onFinish(err) {
if (called) {
errorOrDestroy(stream, err ?? ERR_MULTIPLE_CALLBACK());
return;
}
called = true;

state.pendingcb--;
if (err) {
const onfinishCallbacks = state[kOnFinished].splice(0);
Expand All @@ -677,31 +683,27 @@ function callFinal(stream, state) {
state.pendingcb++;
process.nextTick(finish, stream, state);
}
});
if (result !== undefined && result !== null) {
}

state.sync = true;
state.pendingcb++;
const result = stream._final(onFinish);

if (result != null) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
function() {
if (state.prefinished)
return;
state.prefinish = true;
process.nextTick(() => stream.emit('prefinish'));
state.pendingcb++;
process.nextTick(finish, stream, state);
process.nextTick(onFinish, null);
},
function(err) {
const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
process.nextTick(onfinishCallbacks[i], err);
}
process.nextTick(errorOrDestroy, stream, err, state.sync);
process.nextTick(onFinish, err);
});
}
} catch (err) {
process.nextTick(errorOrDestroy, stream, err, state.sync);
onFinish(stream, state, err);
}
}
state.sync = false;
Expand Down

0 comments on commit 495aa7a

Please sign in to comment.