diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 1b40192d9458ba..49df23cba9f4c2 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -83,6 +83,75 @@ const nop = () => {}; const { errorOrDestroy } = destroyImpl; +const kObjectMode = 1 << 0; +const kEnded = 1 << 1; +const kEndEmitted = 1 << 2; +const kReading = 1 << 3; +const kConstructed = 1 << 4; +const kSync = 1 << 5; +const kNeedReadable = 1 << 6; +const kEmittedReadable = 1 << 7; +const kReadableListening = 1 << 8; +const kResumeScheduled = 1 << 9; +const kErrorEmitted = 1 << 10; +const kEmitClose = 1 << 11; +const kAutoDestroy = 1 << 12; +const kDestroyed = 1 << 13; +const kClosed = 1 << 14; +const kCloseEmitted = 1 << 15; +const kMultiAwaitDrain = 1 << 16; +const kReadingMore = 1 << 17; +const kDataEmitted = 1 << 18; + +// TODO(benjamingr) it is likely slower to do it this way than with free functions +function makeBitMapDescriptor(bit) { + return { + enumerable: false, + get() { return (this.state & bit) !== 0; }, + set(value) { + if (value) this.state |= bit; + else this.state &= ~bit; + }, + }; +} +ObjectDefineProperties(ReadableState.prototype, { + objectMode: makeBitMapDescriptor(kObjectMode), + ended: makeBitMapDescriptor(kEnded), + endEmitted: makeBitMapDescriptor(kEndEmitted), + reading: makeBitMapDescriptor(kReading), + // Stream is still being constructed and cannot be + // destroyed until construction finished or failed. + // Async construction is opt in, therefore we start as + // constructed. + constructed: makeBitMapDescriptor(kConstructed), + // A flag to be able to tell if the event 'readable'/'data' is emitted + // immediately, or on a later tick. We set this to true at first, because + // any actions that shouldn't happen until "later" should generally also + // not happen before the first read call. + sync: makeBitMapDescriptor(kSync), + // Whenever we return null, then we set a flag to say + // that we're awaiting a 'readable' event emission. + needReadable: makeBitMapDescriptor(kNeedReadable), + emittedReadable: makeBitMapDescriptor(kEmittedReadable), + readableListening: makeBitMapDescriptor(kReadableListening), + resumeScheduled: makeBitMapDescriptor(kResumeScheduled), + // True if the error was already emitted and should not be thrown again. + errorEmitted: makeBitMapDescriptor(kErrorEmitted), + emitClose: makeBitMapDescriptor(kEmitClose), + autoDestroy: makeBitMapDescriptor(kAutoDestroy), + // Has it been destroyed. + destroyed: makeBitMapDescriptor(kDestroyed), + // Indicates whether the stream has finished destroying. + closed: makeBitMapDescriptor(kClosed), + // True if close has been emitted or would have been emitted + // depending on emitClose. + closeEmitted: makeBitMapDescriptor(kCloseEmitted), + multiAwaitDrain: makeBitMapDescriptor(kMultiAwaitDrain), + // If true, a maybeReadMore has been scheduled. + readingMore: makeBitMapDescriptor(kReadingMore), + dataEmitted: makeBitMapDescriptor(kDataEmitted), +}); + function ReadableState(options, stream, isDuplex) { // Duplex streams are both readable and writable, but share // the same options object. @@ -92,13 +161,15 @@ function ReadableState(options, stream, isDuplex) { if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Stream.Duplex; + // Bit map field to store ReadableState more effciently with 1 bit per field + // instead of a V8 slot per field. + this.state = kEmitClose | kAutoDestroy | kConstructed | kSync; // Object stream flag. Used to make read(n) ignore n and to // make all the buffer merging and length checks go away. - this.objectMode = !!(options && options.objectMode); + if (options && options.objectMode) this.state |= kObjectMode; - if (isDuplex) - this.objectMode = this.objectMode || - !!(options && options.readableObjectMode); + if (isDuplex && options && options.readableObjectMode) + this.state |= kObjectMode; // The point at which it stops calling _read() to fill the buffer // Note: 0 is a valid value, means "don't call _read preemptively ever" @@ -113,41 +184,15 @@ function ReadableState(options, stream, isDuplex) { this.length = 0; this.pipes = []; this.flowing = null; - this.ended = false; - this.endEmitted = false; - this.reading = false; - - // Stream is still being constructed and cannot be - // destroyed until construction finished or failed. - // Async construction is opt in, therefore we start as - // constructed. - this.constructed = true; - // A flag to be able to tell if the event 'readable'/'data' is emitted - // immediately, or on a later tick. We set this to true at first, because - // any actions that shouldn't happen until "later" should generally also - // not happen before the first read call. - this.sync = true; - - // Whenever we return null, then we set a flag to say - // that we're awaiting a 'readable' event emission. - this.needReadable = false; - this.emittedReadable = false; - this.readableListening = false; - this.resumeScheduled = false; this[kPaused] = null; - // True if the error was already emitted and should not be thrown again. - this.errorEmitted = false; - // Should close be emitted on destroy. Defaults to true. - this.emitClose = !options || options.emitClose !== false; + if (options && options.emitClose === false) this.state &= ~kEmitClose; // Should .destroy() be called after 'end' (and potentially 'finish'). - this.autoDestroy = !options || options.autoDestroy !== false; + if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy; - // Has it been destroyed. - this.destroyed = false; // Indicates whether the stream has errored. When true no further // _read calls, 'data' or 'readable' events should occur. This is needed @@ -155,12 +200,6 @@ function ReadableState(options, stream, isDuplex) { // stream has failed. this.errored = null; - // Indicates whether the stream has finished destroying. - this.closed = false; - - // True if close has been emitted or would have been emitted - // depending on emitClose. - this.closeEmitted = false; // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. @@ -177,12 +216,6 @@ function ReadableState(options, stream, isDuplex) { // Ref the piped dest which we need a drain event on it // type: null | Writable | Set. this.awaitDrainWriters = null; - this.multiAwaitDrain = false; - - // If true, a maybeReadMore has been scheduled. - this.readingMore = false; - - this.dataEmitted = false; this.decoder = null; this.encoding = null; @@ -263,7 +296,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { const state = stream._readableState; let err; - if (!state.objectMode) { + if ((state.state & kObjectMode) === 0) { if (typeof chunk === 'string') { encoding = encoding || state.defaultEncoding; if (state.encoding !== encoding) { @@ -290,11 +323,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { if (err) { errorOrDestroy(stream, err); } else if (chunk === null) { - state.reading = false; + state.state &= ~kReading; onEofChunk(stream, state); - } else if (state.objectMode || (chunk && chunk.length > 0)) { + } else if (((state.state & kObjectMode) !== 0) || (chunk && chunk.length > 0)) { if (addToFront) { - if (state.endEmitted) + if ((state.state & kEndEmitted) !== 0) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); else if (state.destroyed || state.errored) return false; @@ -305,7 +338,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { } else if (state.destroyed || state.errored) { return false; } else { - state.reading = false; + state.state &= ~kReading; if (state.decoder && !encoding) { chunk = state.decoder.write(chunk); if (state.objectMode || chunk.length !== 0) @@ -317,7 +350,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { } } } else if (!addToFront) { - state.reading = false; + state.state &= ~kReading; maybeReadMore(stream, state); } @@ -333,7 +366,7 @@ function addChunk(stream, state, chunk, addToFront) { stream.listenerCount('data') > 0) { // Use the guard to avoid creating `Set()` repeatedly // when we have multiple pipes. - if (state.multiAwaitDrain) { + if ((state.state & kMultiAwaitDrain) !== 0) { state.awaitDrainWriters.clear(); } else { state.awaitDrainWriters = null; @@ -349,7 +382,7 @@ function addChunk(stream, state, chunk, addToFront) { else state.buffer.push(chunk); - if (state.needReadable) + if ((state.state & kNeedReadable) !== 0) emitReadable(stream); } maybeReadMore(stream, state); @@ -404,7 +437,7 @@ function computeNewHighWaterMark(n) { function howMuchToRead(n, state) { if (n <= 0 || (state.length === 0 && state.ended)) return 0; - if (state.objectMode) + if ((state.state & kObjectMode) !== 0) return 1; if (NumberIsNaN(n)) { // Only flow one buffer at a time. @@ -435,7 +468,7 @@ Readable.prototype.read = function(n) { state.highWaterMark = computeNewHighWaterMark(n); if (n !== 0) - state.emittedReadable = false; + state.state &= ~kEmittedReadable; // If we're doing read(0) to trigger a readable event, but we // already have a bunch of data in the buffer, then just trigger @@ -486,7 +519,7 @@ Readable.prototype.read = function(n) { // 3. Actually pull the requested chunks out of the buffer and return. // if we need a readable event, then we need to do some reading. - let doRead = state.needReadable; + let doRead = (state.state & kNeedReadable) !== 0; debug('need readable', doRead); // If we currently have less than the highWaterMark, then also read some. @@ -504,11 +537,10 @@ Readable.prototype.read = function(n) { debug('reading, ended or constructing', doRead); } else if (doRead) { debug('do read'); - state.reading = true; - state.sync = true; + state.state |= kReading | kSync; // If the length is currently zero, then we *need* a readable event. if (state.length === 0) - state.needReadable = true; + state.state |= kNeedReadable; // Call internal read method try { @@ -516,8 +548,8 @@ Readable.prototype.read = function(n) { } catch (err) { errorOrDestroy(this, err); } + state.state &= ~kSync; - state.sync = false; // If _read pushed data synchronously, then `reading` will be false, // and we need to re-evaluate how much data we can return to the user. if (!state.reading)