Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add auto-destroy mode #22795

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,11 @@ changes:
pr-url: https://github.com/nodejs/node/pull/18438
description: >
Add `emitClose` option to specify if `'close'` is emitted on destroy
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/22795
description: >
Add `autoDestroy` option to automatically `destroy()` the stream
when it emits `'finish'` or errors
-->

* `options` {Object}
Expand Down Expand Up @@ -1521,6 +1526,8 @@ changes:
[`stream._destroy()`][writable-_destroy] method.
* `final` {Function} Implementation for the
[`stream._final()`][stream-_final] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `false`.

```js
const { Writable } = require('stream');
Expand Down Expand Up @@ -1756,6 +1763,14 @@ Custom `Readable` streams *must* call the `new stream.Readable([options])`
constructor and implement the `readable._read()` method.

#### new stream.Readable([options])
<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/22795
description: >
Add `autoDestroy` option to automatically `destroy()` the stream
when it emits `'end'` or errors
-->

* `options` {Object}
* `highWaterMark` {number} The maximum [number of bytes][hwm-gotcha] to store
Expand All @@ -1770,6 +1785,8 @@ constructor and implement the `readable._read()` method.
method.
* `destroy` {Function} Implementation for the
[`stream._destroy()`][readable-_destroy] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `false`.

```js
const { Readable } = require('stream');
Expand Down
23 changes: 18 additions & 5 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ let createReadableStreamAsyncIterator;

util.inherits(Readable, Stream);

const { errorOrDestroy } = destroyImpl;
const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume'];

function prependListener(emitter, event, fn) {
Expand Down Expand Up @@ -117,6 +118,9 @@ function ReadableState(options, stream, isDuplex) {
// Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false;

// Should .destroy() be called after 'end' (and potentially 'finish')
this.autoDestroy = !!options.autoDestroy;

// has it been destroyed
this.destroyed = false;

Expand Down Expand Up @@ -235,7 +239,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
if (!skipChunkCheck)
er = chunkInvalid(state, chunk);
if (er) {
stream.emit('error', er);
errorOrDestroy(stream, er);
} else if (state.objectMode || chunk && chunk.length > 0) {
if (typeof chunk !== 'string' &&
!state.objectMode &&
Expand All @@ -245,11 +249,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {

if (addToFront) {
if (state.endEmitted)
stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else
addChunk(stream, state, chunk, true);
} else if (state.ended) {
stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF());
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed) {
return false;
} else {
Expand Down Expand Up @@ -581,7 +585,7 @@ function maybeReadMore_(stream, state) {
// for virtual (non-string, non-buffer) streams, "length" is somewhat
// arbitrary, and perhaps not very meaningful.
Readable.prototype._read = function(n) {
this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
};

Readable.prototype.pipe = function(dest, pipeOpts) {
Expand Down Expand Up @@ -687,7 +691,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
unpipe();
dest.removeListener('error', onerror);
if (EE.listenerCount(dest, 'error') === 0)
dest.emit('error', er);
errorOrDestroy(dest, er);
}

// Make sure our error handler is attached before userland ones.
Expand Down Expand Up @@ -1092,5 +1096,14 @@ function endReadableNT(state, stream) {
state.endEmitted = true;
stream.readable = false;
stream.emit('end');

if (state.autoDestroy) {
// In case of duplex streams we need a way to detect
// if the writable side is ready for autoDestroy as well
const wState = stream._writableState;
if (!wState || (wState.autoDestroy && wState.finished)) {
stream.destroy();
}
}
}
}
26 changes: 20 additions & 6 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const {
ERR_UNKNOWN_ENCODING
} = require('internal/errors').codes;

const { errorOrDestroy } = destroyImpl;

util.inherits(Writable, Stream);

function nop() {}
Expand Down Expand Up @@ -147,6 +149,9 @@ function WritableState(options, stream, isDuplex) {
// Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false;

// Should .destroy() be called after 'finish' (and potentially 'end')
this.autoDestroy = !!options.autoDestroy;

// count buffered requests
this.bufferedRequestCount = 0;

Expand Down Expand Up @@ -235,14 +240,14 @@ function Writable(options) {

// Otherwise people can pipe Writable streams, which is just wrong.
Writable.prototype.pipe = function() {
this.emit('error', new ERR_STREAM_CANNOT_PIPE());
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
};


function writeAfterEnd(stream, cb) {
var er = new ERR_STREAM_WRITE_AFTER_END();
// TODO: defer error events consistently everywhere, not just the cb
stream.emit('error', er);
errorOrDestroy(stream, er);
process.nextTick(cb, er);
}

Expand All @@ -258,7 +263,7 @@ function validChunk(stream, state, chunk, cb) {
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
}
if (er) {
stream.emit('error', er);
errorOrDestroy(stream, er);
process.nextTick(cb, er);
return false;
}
Expand Down Expand Up @@ -422,13 +427,13 @@ function onwriteError(stream, state, sync, er, cb) {
// after error
process.nextTick(finishMaybe, stream, state);
stream._writableState.errorEmitted = true;
stream.emit('error', er);
errorOrDestroy(stream, er);
} else {
// the caller expect this to happen before if
// it is async
cb(er);
stream._writableState.errorEmitted = true;
stream.emit('error', er);
errorOrDestroy(stream, er);
// this can emit finish, but finish must
// always follow error
finishMaybe(stream, state);
Expand Down Expand Up @@ -612,7 +617,7 @@ function callFinal(stream, state) {
stream._final((err) => {
state.pendingcb--;
if (err) {
stream.emit('error', err);
errorOrDestroy(stream, err);
}
state.prefinished = true;
stream.emit('prefinish');
Expand All @@ -639,6 +644,15 @@ function finishMaybe(stream, state) {
if (state.pendingcb === 0) {
state.finished = true;
stream.emit('finish');

if (state.autoDestroy) {
// In case of duplex streams we need a way to detect
// if the readable side is ready for autoDestroy as well
const rState = stream._readableState;
if (!rState || (rState.autoDestroy && rState.endEmitted)) {
stream.destroy();
}
}
}
}
return need;
Expand Down
20 changes: 19 additions & 1 deletion lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,25 @@ function emitErrorNT(self, err) {
self.emit('error', err);
}

function errorOrDestroy(stream, err) {
// We have tests that rely on errors being emitted
// in the same tick, so changing this is semver major.
// For now when you opt-in to autoDestroy we allow
// the error to be emitted nextTick. In a future
// semver major update we should change the default to this.

const rState = stream._readableState;
const wState = stream._writableState;

if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy))
stream.destroy(err);
else
stream.emit('error', err);
}


module.exports = {
destroy,
undestroy
undestroy,
errorOrDestroy
};
84 changes: 84 additions & 0 deletions test/parallel/test-stream-auto-destroy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
'use strict';
const common = require('../common');
const stream = require('stream');
const assert = require('assert');

{
const r = new stream.Readable({
autoDestroy: true,
read() {
this.push('hello');
this.push('world');
this.push(null);
},
destroy: common.mustCall((err, cb) => cb())
});

let ended = false;

r.resume();

r.on('end', common.mustCall(() => {
ended = true;
}));

r.on('close', common.mustCall(() => {
assert(ended);
}));
}

{
const w = new stream.Writable({
autoDestroy: true,
write(data, enc, cb) {
cb(null);
},
destroy: common.mustCall((err, cb) => cb())
});

let finished = false;

w.write('hello');
w.write('world');
w.end();

w.on('finish', common.mustCall(() => {
finished = true;
}));

w.on('close', common.mustCall(() => {
assert(finished);
}));
}

{
const t = new stream.Transform({
autoDestroy: true,
transform(data, enc, cb) {
cb(null, data);
},
destroy: common.mustCall((err, cb) => cb())
});

let ended = false;
let finished = false;

t.write('hello');
t.write('world');
t.end();

t.resume();

t.on('end', common.mustCall(() => {
ended = true;
}));

t.on('finish', common.mustCall(() => {
finished = true;
}));

t.on('close', common.mustCall(() => {
assert(ended);
assert(finished);
}));
}