Skip to content

Commit

Permalink
stream: error once
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Aug 16, 2019
1 parent 3bd02e9 commit dcec025
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 90 deletions.
3 changes: 3 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ The stream is not closed when the `'error'` event is emitted unless the
[`autoDestroy`][writable-new] option was set to `true` when creating the
stream.

After `'error'`, no further events other than `'close'` *should* be emitted
(including `'error'` events).

##### Event: 'finish'
<!-- YAML
added: v0.9.4
Expand Down
3 changes: 3 additions & 0 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ function ReadableState(options, stream, isDuplex) {
this.resumeScheduled = false;
this.paused = true;

// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;

// Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false;

Expand Down
111 changes: 60 additions & 51 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,37 @@
'use strict';

function needError(stream, err) {
if (!err) {
return false;
}

const r = stream._readableState;
const w = stream._writableState;

if ((w && w.errorEmitted) || (r && r.errorEmitted)) {
return false;
}

if (w) {
w.errorEmitted = true;
}
if (r) {
r.errorEmitted = true;
}

return true;
}

// Undocumented cb() API, needed for core, not for public API
function destroy(err, cb) {
const readableDestroyed = this._readableState &&
this._readableState.destroyed;
const writableDestroyed = this._writableState &&
this._writableState.destroyed;
const r = this._readableState;
const w = this._writableState;

if (readableDestroyed || writableDestroyed) {
if ((w && w.destroyed) || (r && r.destroyed)) {
if (cb) {
cb(err);
} else if (err) {
if (!this._writableState) {
process.nextTick(emitErrorNT, this, err);
} else if (!this._writableState.errorEmitted) {
this._writableState.errorEmitted = true;
process.nextTick(emitErrorNT, this, err);
}
} else if (needError(this, err)) {
process.nextTick(emitErrorNT, this, err);
}

return this;
Expand All @@ -25,28 +40,19 @@ function destroy(err, cb) {
// We set destroyed to true before firing error callbacks in order
// to make it re-entrance safe in case destroy() is called within callbacks

if (this._readableState) {
this._readableState.destroyed = true;
if (w) {
w.destroyed = true;
}

// If this is a duplex stream mark the writable part as destroyed as well
if (this._writableState) {
this._writableState.destroyed = true;
if (r) {
r.destroyed = true;
}

this._destroy(err || null, (err) => {
if (!cb && err) {
if (!this._writableState) {
process.nextTick(emitErrorAndCloseNT, this, err);
} else if (!this._writableState.errorEmitted) {
this._writableState.errorEmitted = true;
process.nextTick(emitErrorAndCloseNT, this, err);
} else {
process.nextTick(emitCloseNT, this);
}
} else if (cb) {
if (cb) {
process.nextTick(emitCloseNT, this);
cb(err);
} else if (needError(this, err)) {
process.nextTick(emitErrorAndCloseNT, this, err);
} else {
process.nextTick(emitCloseNT, this);
}
Expand All @@ -61,29 +67,36 @@ function emitErrorAndCloseNT(self, err) {
}

function emitCloseNT(self) {
if (self._writableState && !self._writableState.emitClose)
const r = self._readableState;
const w = self._writableState;

if (w && !w.emitClose)
return;
if (self._readableState && !self._readableState.emitClose)
if (r && !r.emitClose)
return;
self.emit('close');
}

function undestroy() {
if (this._readableState) {
this._readableState.destroyed = false;
this._readableState.reading = false;
this._readableState.ended = false;
this._readableState.endEmitted = false;
const r = this._readableState;
const w = this._writableState;

if (r) {
r.destroyed = false;
r.reading = false;
r.ended = false;
r.endEmitted = false;
r.errorEmitted = false;
}

if (this._writableState) {
this._writableState.destroyed = false;
this._writableState.ended = false;
this._writableState.ending = false;
this._writableState.finalCalled = false;
this._writableState.prefinished = false;
this._writableState.finished = false;
this._writableState.errorEmitted = false;
if (w) {
w.destroyed = false;
w.ended = false;
w.ending = false;
w.finalCalled = false;
w.prefinished = false;
w.finished = false;
w.errorEmitted = false;
}
}

Expand All @@ -98,17 +111,13 @@ function errorOrDestroy(stream, err) {
// the error to be emitted nextTick. In a future
// semver major update we should change the default to this.

const rState = stream._readableState;
const wState = stream._writableState;
const r = stream._readableState;
const w = stream._writableState;

if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy)) {
if ((r && r.autoDestroy) || (w && w.autoDestroy))
stream.destroy(err);
} else {
if (wState) {
wState.errorEmitted = true;
}
else if (needError(stream, err))
stream.emit('error', err);
}
}


Expand Down
6 changes: 4 additions & 2 deletions test/parallel/test-net-connect-buffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,14 @@ tcp.listen(0, common.mustCall(function() {
[],
{}
].forEach((value) => {
common.expectsError(() => socket.write(value), {
// We need to check the callback since 'error' will only
// be emitted once per instance.
socket.write(value, common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError,
message: 'The "chunk" argument must be one of type string or Buffer. ' +
`Received type ${typeof value}`
});
}));
});

// Write a string that contains a multi-byte character sequence to test that
Expand Down
19 changes: 19 additions & 0 deletions test/parallel/test-stream-error-once.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
'use strict';
const common = require('../common');
const { Writable, Readable } = require('stream');

{
const writable = new Writable();
writable.on('error', common.mustCall());
writable.end();
writable.write('h');
writable.write('h');
}

{
const readable = new Readable();
readable.on('error', common.mustCall());
readable.push(null);
readable.push('h');
readable.push('h');
}
33 changes: 24 additions & 9 deletions test/parallel/test-stream-readable-invalid-chunk.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,32 @@
const common = require('../common');
const stream = require('stream');

const readable = new stream.Readable({
read: () => {}
});

function checkError(fn) {
common.expectsError(fn, {
function testPushArg(val) {
const readable = new stream.Readable({
read: () => {}
});
readable.on('error', common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
}));
readable.push(val);
}

testPushArg([]);
testPushArg({});
testPushArg(0);

function testUnshiftArg(val) {
const readable = new stream.Readable({
read: () => {}
});
readable.on('error', common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
}));
readable.unshift(val);
}

checkError(() => readable.push([]));
checkError(() => readable.push({}));
checkError(() => readable.push(0));
testUnshiftArg([]);
testUnshiftArg({});
testUnshiftArg(0);
17 changes: 0 additions & 17 deletions test/parallel/test-stream-readable-unshift.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,6 @@ const { Readable } = require('stream');

}

{
// Check that error is thrown for invalid chunks

const readable = new Readable({ read() {} });
function checkError(fn) {
common.expectsError(fn, {
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
});
}

checkError(() => readable.unshift([]));
checkError(() => readable.unshift({}));
checkError(() => readable.unshift(0));

}

{
// Check that ObjectMode works
const readable = new Readable({ objectMode: true, read() {} });
Expand Down
8 changes: 1 addition & 7 deletions test/parallel/test-stream-unshift-read-race.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,7 @@ w._write = function(chunk, encoding, cb) {
};

r.on('end', common.mustCall(function() {
common.expectsError(function() {
r.unshift(Buffer.allocUnsafe(1));
}, {
code: 'ERR_STREAM_UNSHIFT_AFTER_END_EVENT',
type: Error,
message: 'stream.unshift() after end event'
});
r.unshift(Buffer.allocUnsafe(1));
w.end();
}));

Expand Down
5 changes: 1 addition & 4 deletions test/parallel/test-stream2-writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,7 @@ const helloWorldBuffer = Buffer.from('hello world');
w._final = common.mustCall(function(cb) {
cb(new Error('test'));
});
w._write = function(chunk, e, cb) {
process.nextTick(cb);
};
w.once('error', common.mustCall((err) => {
w.on('error', common.mustCall((err) => {
assert.strictEqual(w._writableState.errorEmitted, true);
assert.strictEqual(err.message, 'test');
w.on('error', common.mustNotCall());
Expand Down

0 comments on commit dcec025

Please sign in to comment.