From 0fd4a821dc3292eebe03415f7f4015b3ece04c7c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 7 Jul 2021 11:33:55 +0200 Subject: [PATCH] stream: unify stream utils Unify stream helps into utils. --- lib/internal/streams/destroy.js | 22 ++--- lib/internal/streams/end-of-stream.js | 51 +++------- lib/internal/streams/pipeline.js | 8 +- lib/internal/streams/utils.js | 129 ++++++++++++++++++++++++-- test/parallel/test-stream-finished.js | 2 +- test/parallel/test-stream-pipeline.js | 2 +- 6 files changed, 148 insertions(+), 66 deletions(-) diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index df2d9f7f71987a..4dbcf7ba488837 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -10,6 +10,11 @@ const { const { Symbol, } = primordials; +const { + kDestroyed, + isDestroyed, + isFinished, +} = require('internal/streams/utils'); const kDestroy = Symbol('kDestroy'); const kConstruct = Symbol('kConstruct'); @@ -364,8 +369,6 @@ function isRequest(stream) { return stream && stream.setHeader && typeof stream.abort === 'function'; } -const kDestroyed = Symbol('kDestroyed'); - function emitCloseLegacy(stream) { stream.emit('close'); } @@ -375,25 +378,13 @@ function emitErrorCloseLegacy(stream, err) { process.nextTick(emitCloseLegacy, stream); } -function isDestroyed(stream) { - return stream.destroyed || stream[kDestroyed]; -} - -function isReadable(stream) { - return stream.readable && !stream.readableEnded && !isDestroyed(stream); -} - -function isWritable(stream) { - return stream.writable && !stream.writableEnded && !isDestroyed(stream); -} - // Normalize destroy for legacy. function destroyer(stream, err) { if (isDestroyed(stream)) { return; } - if (!err && (isReadable(stream) || isWritable(stream))) { + if (!err && !isFinished(stream)) { err = new AbortError(); } @@ -422,7 +413,6 @@ function destroyer(stream, err) { module.exports = { kDestroy, - isDestroyed, construct, destroyer, destroy, diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index efc2441c51ee39..382deee573ad7e 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -17,6 +17,14 @@ const { validateObject, } = require('internal/validators'); +const { + isClosed, + isReadableStream, + isReadableEnded, + isWritableStream, + isWritableFinished, +} = require('internal/streams/utils'); + function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; } @@ -31,34 +39,8 @@ function isServerResponse(stream) { ); } -function isReadable(stream) { - return typeof stream.readable === 'boolean' || - typeof stream.readableEnded === 'boolean' || - !!stream._readableState; -} - -function isWritable(stream) { - return typeof stream.writable === 'boolean' || - typeof stream.writableEnded === 'boolean' || - !!stream._writableState; -} - -function isWritableFinished(stream) { - if (stream.writableFinished) return true; - const wState = stream._writableState; - if (!wState || wState.errored) return false; - return wState.finished || (wState.ended && wState.length === 0); -} - const nop = () => {}; -function isReadableEnded(stream) { - if (stream.readableEnded) return true; - const rState = stream._readableState; - if (!rState || rState.errored) return false; - return rState.endEmitted || (rState.ended && rState.length === 0); -} - function eos(stream, options, callback) { if (arguments.length === 2) { callback = options; @@ -74,9 +56,9 @@ function eos(stream, options, callback) { callback = once(callback); const readable = options.readable || - (options.readable !== false && isReadable(stream)); + (options.readable !== false && isReadableStream(stream, true)); const writable = options.writable || - (options.writable !== false && isWritable(stream)); + (options.writable !== false && isWritableStream(stream, true)); const wState = stream._writableState; const rState = stream._readableState; @@ -94,11 +76,11 @@ function eos(stream, options, callback) { state.autoDestroy && state.emitClose && state.closed === false && - isReadable(stream) === readable && - isWritable(stream) === writable + isReadableStream(stream) === readable && + isWritableStream(stream) === writable ); - let writableFinished = stream.writableFinished || wState?.finished; + let writableFinished = isWritableFinished(stream); const onfinish = () => { writableFinished = true; // Stream should not be destroyed here. If it is that @@ -110,7 +92,7 @@ function eos(stream, options, callback) { if (!readable || readableEnded) callback.call(stream); }; - let readableEnded = stream.readableEnded || rState?.endEmitted; + let readableEnded = isReadableEnded(stream); const onend = () => { readableEnded = true; // Stream should not be destroyed here. If it is that @@ -126,7 +108,7 @@ function eos(stream, options, callback) { callback.call(stream, err); }; - let closed = wState?.closed || rState?.closed; + let closed = isClosed(stream); const onclose = () => { closed = true; @@ -195,9 +177,6 @@ function eos(stream, options, callback) { readableEnded ) { process.nextTick(onclose); - } else if (!wState && !rState && stream._closed === true) { - // _closed is for OutgoingMessage which is not a proper Writable. - process.nextTick(onclose); } else if ((rState && stream.req && stream.aborted)) { process.nextTick(onclose); } diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 5759dbd4a580a3..ad08054408fa0e 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -27,7 +27,7 @@ const { validateCallback } = require('internal/validators'); const { isIterable, - isReadable, + isReadableStream, isStream, } = require('internal/streams/utils'); @@ -87,7 +87,7 @@ function popCallback(streams) { function makeAsyncIterable(val) { if (isIterable(val)) { return val; - } else if (isReadable(val)) { + } else if (isReadableStream(val)) { // Legacy streams are not Iterable. return fromReadable(val); } @@ -216,7 +216,7 @@ function pipeline(...streams) { throw new ERR_INVALID_RETURN_VALUE( 'Iterable, AsyncIterable or Stream', 'source', ret); } - } else if (isIterable(stream) || isReadable(stream)) { + } else if (isIterable(stream) || isReadableStream(stream)) { ret = stream; } else { throw new ERR_INVALID_ARG_TYPE( @@ -272,7 +272,7 @@ function pipeline(...streams) { destroys.push(destroyer(ret, false, true, finish)); } } else if (isStream(stream)) { - if (isReadable(ret)) { + if (isReadableStream(ret)) { ret.pipe(stream); // Compat. Before node v10.12.0 stdio used to throw an error so diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 62eea022685e18..03db2550fd3ccf 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -3,20 +3,44 @@ const { SymbolAsyncIterator, SymbolIterator, + Symbol } = primordials; -function isReadable(obj) { - return !!(obj && typeof obj.pipe === 'function' && - typeof obj.on === 'function'); +const kDestroyed = Symbol('kDestroyed'); + +function isReadableStream(obj, strict) { + return !!( + obj && + typeof obj.pipe === 'function' && + typeof obj.on === 'function' && + (!obj._writableState || obj._readableState?.readable !== false) && // Duplex + (!obj._writableState || obj._readableState) && // Writable has .pipe. + ( + !strict || + typeof obj.readable === 'boolean' || + typeof obj.readableEnded === 'boolean' || + obj._readableState + ) + ); } -function isWritable(obj) { - return !!(obj && typeof obj.write === 'function' && - typeof obj.on === 'function'); +function isWritableStream(obj, strict) { + return !!( + obj && + typeof obj.write === 'function' && + typeof obj.on === 'function' && + (!obj._readableState || obj._writableState?.writable !== false) && // Duplex + ( + !strict || + typeof obj.writable === 'boolean' || + typeof obj.writableEnded === 'boolean' || + obj._writableState + ) + ); } function isStream(obj) { - return isReadable(obj) || isWritable(obj); + return isReadableStream(obj) || isWritableStream(obj); } function isIterable(obj, isAsync) { @@ -27,8 +51,97 @@ function isIterable(obj, isAsync) { typeof obj[SymbolIterator] === 'function'; } +function isDestroyed(stream) { + if (!isStream(stream)) return null; + const wState = stream._writableState; + const rState = stream._readableState; + const state = wState || rState; + return !!(stream.destroyed || stream[kDestroyed] || state?.destroyed); +} + +function isWritableFinished(stream) { + if (!isStream(stream)) return null; + if (stream.writableFinished === true) return true; + const wState = stream._writableState; + if (!wState || wState.errored) return false; + return wState.finished || (wState.ended && wState.length === 0); +} + +function isReadableEnded(stream) { + if (!isStream(stream)) return null; + if (stream.readableEnded === true) return true; + const rState = stream._readableState; + if (!rState || rState.errored) return false; + return rState.endEmitted || (rState.ended && rState.length === 0); +} + +function isFinished(stream, opts) { + if (!isStream(stream)) { + return null; + } + + if (isDestroyed(stream)) { + return true; + } + + if ( + opts?.readable !== false && + isReadableStream(stream) && + !isReadableEnded(stream) + ) { + return false; + } + + if ( + opts?.writable !== false && + isWritableStream(stream) && + !isWritableFinished(stream) + ) { + return false; + } + + return true; +} + +function isClosed(stream) { + if (!isStream(stream)) { + return null; + } + + const wState = stream._writableState; + const rState = stream._readableState; + + if ( + typeof wState?.closed === 'boolean' || + typeof rState?.closed === 'boolean' + ) { + return wState?.closed || rState?.closed; + } + + // OutgoingMessage + if ( + !wState && + !rState && + typeof stream._closed === 'boolean' && + typeof stream._defaultKeepAlive === 'boolean' && + typeof stream._removedConnection === 'boolean' && + typeof stream._removedContLen === 'boolean' + ) { + return stream._closed; + } + + return null; +} + module.exports = { + kDestroyed, + isClosed, + isDestroyed, + isFinished, isIterable, - isReadable, + isReadableStream, + isReadableEnded, isStream, + isWritableStream, + isWritableFinished, }; diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 8e371911698336..d2207664eda40d 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -415,7 +415,7 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); d._writableState = {}; d._writableState.finished = true; finished(d, { readable: false, writable: true }, common.mustCall((err) => { - assert.strictEqual(err, undefined); + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); })); d._writableState.errored = true; d.emit('close'); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index e2e5fe2e0d561a..1f4474e6b5fce6 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1035,7 +1035,7 @@ const net = require('net'); const dst = new PassThrough(); dst.readable = false; pipeline(src, dst, common.mustSucceed(() => { - assert.strictEqual(dst.destroyed, false); + assert.strictEqual(dst.destroyed, true); })); src.end(); }