Skip to content

Commit

Permalink
stream: unify stream utils
Browse files Browse the repository at this point in the history
Unify stream helps into utils.
  • Loading branch information
ronag committed Jul 7, 2021
1 parent 4de6f20 commit 0fd4a82
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 66 deletions.
22 changes: 6 additions & 16 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ const {
const {
Symbol,
} = primordials;
const {
kDestroyed,
isDestroyed,
isFinished,
} = require('internal/streams/utils');

const kDestroy = Symbol('kDestroy');
const kConstruct = Symbol('kConstruct');
Expand Down Expand Up @@ -364,8 +369,6 @@ function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function';
}

const kDestroyed = Symbol('kDestroyed');

function emitCloseLegacy(stream) {
stream.emit('close');
}
Expand All @@ -375,25 +378,13 @@ function emitErrorCloseLegacy(stream, err) {
process.nextTick(emitCloseLegacy, stream);
}

function isDestroyed(stream) {
return stream.destroyed || stream[kDestroyed];
}

function isReadable(stream) {
return stream.readable && !stream.readableEnded && !isDestroyed(stream);
}

function isWritable(stream) {
return stream.writable && !stream.writableEnded && !isDestroyed(stream);
}

// Normalize destroy for legacy.
function destroyer(stream, err) {
if (isDestroyed(stream)) {
return;
}

if (!err && (isReadable(stream) || isWritable(stream))) {
if (!err && !isFinished(stream)) {
err = new AbortError();
}

Expand Down Expand Up @@ -422,7 +413,6 @@ function destroyer(stream, err) {

module.exports = {
kDestroy,
isDestroyed,
construct,
destroyer,
destroy,
Expand Down
51 changes: 15 additions & 36 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ const {
validateObject,
} = require('internal/validators');

const {
isClosed,
isReadableStream,
isReadableEnded,
isWritableStream,
isWritableFinished,
} = require('internal/streams/utils');

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}
Expand All @@ -31,34 +39,8 @@ function isServerResponse(stream) {
);
}

function isReadable(stream) {
return typeof stream.readable === 'boolean' ||
typeof stream.readableEnded === 'boolean' ||
!!stream._readableState;
}

function isWritable(stream) {
return typeof stream.writable === 'boolean' ||
typeof stream.writableEnded === 'boolean' ||
!!stream._writableState;
}

function isWritableFinished(stream) {
if (stream.writableFinished) return true;
const wState = stream._writableState;
if (!wState || wState.errored) return false;
return wState.finished || (wState.ended && wState.length === 0);
}

const nop = () => {};

function isReadableEnded(stream) {
if (stream.readableEnded) return true;
const rState = stream._readableState;
if (!rState || rState.errored) return false;
return rState.endEmitted || (rState.ended && rState.length === 0);
}

function eos(stream, options, callback) {
if (arguments.length === 2) {
callback = options;
Expand All @@ -74,9 +56,9 @@ function eos(stream, options, callback) {
callback = once(callback);

const readable = options.readable ||
(options.readable !== false && isReadable(stream));
(options.readable !== false && isReadableStream(stream, true));
const writable = options.writable ||
(options.writable !== false && isWritable(stream));
(options.writable !== false && isWritableStream(stream, true));

const wState = stream._writableState;
const rState = stream._readableState;
Expand All @@ -94,11 +76,11 @@ function eos(stream, options, callback) {
state.autoDestroy &&
state.emitClose &&
state.closed === false &&
isReadable(stream) === readable &&
isWritable(stream) === writable
isReadableStream(stream) === readable &&
isWritableStream(stream) === writable
);

let writableFinished = stream.writableFinished || wState?.finished;
let writableFinished = isWritableFinished(stream);
const onfinish = () => {
writableFinished = true;
// Stream should not be destroyed here. If it is that
Expand All @@ -110,7 +92,7 @@ function eos(stream, options, callback) {
if (!readable || readableEnded) callback.call(stream);
};

let readableEnded = stream.readableEnded || rState?.endEmitted;
let readableEnded = isReadableEnded(stream);
const onend = () => {
readableEnded = true;
// Stream should not be destroyed here. If it is that
Expand All @@ -126,7 +108,7 @@ function eos(stream, options, callback) {
callback.call(stream, err);
};

let closed = wState?.closed || rState?.closed;
let closed = isClosed(stream);

const onclose = () => {
closed = true;
Expand Down Expand Up @@ -195,9 +177,6 @@ function eos(stream, options, callback) {
readableEnded
) {
process.nextTick(onclose);
} else if (!wState && !rState && stream._closed === true) {
// _closed is for OutgoingMessage which is not a proper Writable.
process.nextTick(onclose);
} else if ((rState && stream.req && stream.aborted)) {
process.nextTick(onclose);
}
Expand Down
8 changes: 4 additions & 4 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const { validateCallback } = require('internal/validators');

const {
isIterable,
isReadable,
isReadableStream,
isStream,
} = require('internal/streams/utils');

Expand Down Expand Up @@ -87,7 +87,7 @@ function popCallback(streams) {
function makeAsyncIterable(val) {
if (isIterable(val)) {
return val;
} else if (isReadable(val)) {
} else if (isReadableStream(val)) {
// Legacy streams are not Iterable.
return fromReadable(val);
}
Expand Down Expand Up @@ -216,7 +216,7 @@ function pipeline(...streams) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
} else if (isIterable(stream) || isReadable(stream)) {
} else if (isIterable(stream) || isReadableStream(stream)) {
ret = stream;
} else {
throw new ERR_INVALID_ARG_TYPE(
Expand Down Expand Up @@ -272,7 +272,7 @@ function pipeline(...streams) {
destroys.push(destroyer(ret, false, true, finish));
}
} else if (isStream(stream)) {
if (isReadable(ret)) {
if (isReadableStream(ret)) {
ret.pipe(stream);

// Compat. Before node v10.12.0 stdio used to throw an error so
Expand Down
129 changes: 121 additions & 8 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,44 @@
const {
SymbolAsyncIterator,
SymbolIterator,
Symbol
} = primordials;

function isReadable(obj) {
return !!(obj && typeof obj.pipe === 'function' &&
typeof obj.on === 'function');
const kDestroyed = Symbol('kDestroyed');

function isReadableStream(obj, strict) {
return !!(
obj &&
typeof obj.pipe === 'function' &&
typeof obj.on === 'function' &&
(!obj._writableState || obj._readableState?.readable !== false) && // Duplex
(!obj._writableState || obj._readableState) && // Writable has .pipe.
(
!strict ||
typeof obj.readable === 'boolean' ||
typeof obj.readableEnded === 'boolean' ||
obj._readableState
)
);
}

function isWritable(obj) {
return !!(obj && typeof obj.write === 'function' &&
typeof obj.on === 'function');
function isWritableStream(obj, strict) {
return !!(
obj &&
typeof obj.write === 'function' &&
typeof obj.on === 'function' &&
(!obj._readableState || obj._writableState?.writable !== false) && // Duplex
(
!strict ||
typeof obj.writable === 'boolean' ||
typeof obj.writableEnded === 'boolean' ||
obj._writableState
)
);
}

function isStream(obj) {
return isReadable(obj) || isWritable(obj);
return isReadableStream(obj) || isWritableStream(obj);
}

function isIterable(obj, isAsync) {
Expand All @@ -27,8 +51,97 @@ function isIterable(obj, isAsync) {
typeof obj[SymbolIterator] === 'function';
}

function isDestroyed(stream) {
if (!isStream(stream)) return null;
const wState = stream._writableState;
const rState = stream._readableState;
const state = wState || rState;
return !!(stream.destroyed || stream[kDestroyed] || state?.destroyed);
}

function isWritableFinished(stream) {
if (!isStream(stream)) return null;
if (stream.writableFinished === true) return true;
const wState = stream._writableState;
if (!wState || wState.errored) return false;
return wState.finished || (wState.ended && wState.length === 0);
}

function isReadableEnded(stream) {
if (!isStream(stream)) return null;
if (stream.readableEnded === true) return true;
const rState = stream._readableState;
if (!rState || rState.errored) return false;
return rState.endEmitted || (rState.ended && rState.length === 0);
}

function isFinished(stream, opts) {
if (!isStream(stream)) {
return null;
}

if (isDestroyed(stream)) {
return true;
}

if (
opts?.readable !== false &&
isReadableStream(stream) &&
!isReadableEnded(stream)
) {
return false;
}

if (
opts?.writable !== false &&
isWritableStream(stream) &&
!isWritableFinished(stream)
) {
return false;
}

return true;
}

function isClosed(stream) {
if (!isStream(stream)) {
return null;
}

const wState = stream._writableState;
const rState = stream._readableState;

if (
typeof wState?.closed === 'boolean' ||
typeof rState?.closed === 'boolean'
) {
return wState?.closed || rState?.closed;
}

// OutgoingMessage
if (
!wState &&
!rState &&
typeof stream._closed === 'boolean' &&
typeof stream._defaultKeepAlive === 'boolean' &&
typeof stream._removedConnection === 'boolean' &&
typeof stream._removedContLen === 'boolean'
) {
return stream._closed;
}

return null;
}

module.exports = {
kDestroyed,
isClosed,
isDestroyed,
isFinished,
isIterable,
isReadable,
isReadableStream,
isReadableEnded,
isStream,
isWritableStream,
isWritableFinished,
};
2 changes: 1 addition & 1 deletion test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
d._writableState = {};
d._writableState.finished = true;
finished(d, { readable: false, writable: true }, common.mustCall((err) => {
assert.strictEqual(err, undefined);
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}));
d._writableState.errored = true;
d.emit('close');
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,7 @@ const net = require('net');
const dst = new PassThrough();
dst.readable = false;
pipeline(src, dst, common.mustSucceed(() => {
assert.strictEqual(dst.destroyed, false);
assert.strictEqual(dst.destroyed, true);
}));
src.end();
}
Expand Down

0 comments on commit 0fd4a82

Please sign in to comment.