From aad8002b88b23abcd90099e5926fb6812e7c8ae0 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 14 Oct 2023 23:50:30 +0200 Subject: [PATCH] stream: use private symbol for bitmap state PR-URL: https://github.com/nodejs/node/pull/49993 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Matteo Collina Reviewed-By: Yagiz Nizipli Reviewed-By: Raz Luvaton --- lib/internal/streams/readable.js | 45 +++--- lib/internal/streams/writable.js | 257 ++++++++++++++++--------------- 2 files changed, 152 insertions(+), 150 deletions(-) diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 49df23cba9f4c2..f551053bf7b79c 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -73,6 +73,7 @@ const { const { validateObject } = require('internal/validators'); const kPaused = Symbol('kPaused'); +const kState = Symbol('kState'); const { StringDecoder } = require('string_decoder'); const from = require('internal/streams/from'); @@ -107,10 +108,10 @@ const kDataEmitted = 1 << 18; function makeBitMapDescriptor(bit) { return { enumerable: false, - get() { return (this.state & bit) !== 0; }, + get() { return (this[kState] & bit) !== 0; }, set(value) { - if (value) this.state |= bit; - else this.state &= ~bit; + if (value) this[kState] |= bit; + else this[kState] &= ~bit; }, }; } @@ -163,13 +164,13 @@ function ReadableState(options, stream, isDuplex) { // Bit map field to store ReadableState more effciently with 1 bit per field // instead of a V8 slot per field. - this.state = kEmitClose | kAutoDestroy | kConstructed | kSync; + this[kState] = kEmitClose | kAutoDestroy | kConstructed | kSync; // Object stream flag. Used to make read(n) ignore n and to // make all the buffer merging and length checks go away. - if (options && options.objectMode) this.state |= kObjectMode; + if (options && options.objectMode) this[kState] |= kObjectMode; if (isDuplex && options && options.readableObjectMode) - this.state |= kObjectMode; + this[kState] |= kObjectMode; // The point at which it stops calling _read() to fill the buffer // Note: 0 is a valid value, means "don't call _read preemptively ever" @@ -188,10 +189,10 @@ function ReadableState(options, stream, isDuplex) { this[kPaused] = null; // Should close be emitted on destroy. Defaults to true. - if (options && options.emitClose === false) this.state &= ~kEmitClose; + if (options && options.emitClose === false) this[kState] &= ~kEmitClose; // Should .destroy() be called after 'end' (and potentially 'finish'). - if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy; + if (options && options.autoDestroy === false) this[kState] &= ~kAutoDestroy; // Indicates whether the stream has errored. When true no further @@ -296,7 +297,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { const state = stream._readableState; let err; - if ((state.state & kObjectMode) === 0) { + if ((state[kState] & kObjectMode) === 0) { if (typeof chunk === 'string') { encoding = encoding || state.defaultEncoding; if (state.encoding !== encoding) { @@ -323,11 +324,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { if (err) { errorOrDestroy(stream, err); } else if (chunk === null) { - state.state &= ~kReading; + state[kState] &= ~kReading; onEofChunk(stream, state); - } else if (((state.state & kObjectMode) !== 0) || (chunk && chunk.length > 0)) { + } else if (((state[kState] & kObjectMode) !== 0) || (chunk && chunk.length > 0)) { if (addToFront) { - if ((state.state & kEndEmitted) !== 0) + if ((state[kState] & kEndEmitted) !== 0) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); else if (state.destroyed || state.errored) return false; @@ -338,7 +339,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { } else if (state.destroyed || state.errored) { return false; } else { - state.state &= ~kReading; + state[kState] &= ~kReading; if (state.decoder && !encoding) { chunk = state.decoder.write(chunk); if (state.objectMode || chunk.length !== 0) @@ -350,7 +351,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) { } } } else if (!addToFront) { - state.state &= ~kReading; + state[kState] &= ~kReading; maybeReadMore(stream, state); } @@ -366,7 +367,7 @@ function addChunk(stream, state, chunk, addToFront) { stream.listenerCount('data') > 0) { // Use the guard to avoid creating `Set()` repeatedly // when we have multiple pipes. - if ((state.state & kMultiAwaitDrain) !== 0) { + if ((state[kState] & kMultiAwaitDrain) !== 0) { state.awaitDrainWriters.clear(); } else { state.awaitDrainWriters = null; @@ -382,7 +383,7 @@ function addChunk(stream, state, chunk, addToFront) { else state.buffer.push(chunk); - if ((state.state & kNeedReadable) !== 0) + if ((state[kState] & kNeedReadable) !== 0) emitReadable(stream); } maybeReadMore(stream, state); @@ -437,7 +438,7 @@ function computeNewHighWaterMark(n) { function howMuchToRead(n, state) { if (n <= 0 || (state.length === 0 && state.ended)) return 0; - if ((state.state & kObjectMode) !== 0) + if ((state[kState] & kObjectMode) !== 0) return 1; if (NumberIsNaN(n)) { // Only flow one buffer at a time. @@ -468,7 +469,7 @@ Readable.prototype.read = function(n) { state.highWaterMark = computeNewHighWaterMark(n); if (n !== 0) - state.state &= ~kEmittedReadable; + state[kState] &= ~kEmittedReadable; // If we're doing read(0) to trigger a readable event, but we // already have a bunch of data in the buffer, then just trigger @@ -519,7 +520,7 @@ Readable.prototype.read = function(n) { // 3. Actually pull the requested chunks out of the buffer and return. // if we need a readable event, then we need to do some reading. - let doRead = (state.state & kNeedReadable) !== 0; + let doRead = (state[kState] & kNeedReadable) !== 0; debug('need readable', doRead); // If we currently have less than the highWaterMark, then also read some. @@ -537,10 +538,10 @@ Readable.prototype.read = function(n) { debug('reading, ended or constructing', doRead); } else if (doRead) { debug('do read'); - state.state |= kReading | kSync; + state[kState] |= kReading | kSync; // If the length is currently zero, then we *need* a readable event. if (state.length === 0) - state.state |= kNeedReadable; + state[kState] |= kNeedReadable; // Call internal read method try { @@ -548,7 +549,7 @@ Readable.prototype.read = function(n) { } catch (err) { errorOrDestroy(this, err); } - state.state &= ~kSync; + state[kState] &= ~kSync; // If _read pushed data synchronously, then `reading` will be false, // and we need to re-evaluate how much data we can return to the user. diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index cd191fb70aa803..b8f8b72ce8d113 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -78,6 +78,7 @@ const kDefaultEncodingValue = Symbol('kDefaultEncodingValue'); const kWriteCbValue = Symbol('kWriteCbValue'); const kAfterWriteTickInfoValue = Symbol('kAfterWriteTickInfoValue'); const kBufferedValue = Symbol('kBufferedValue'); +const kState = Symbol('kState'); const kObjectMode = 1 << 0; const kEnded = 1 << 1; @@ -115,10 +116,10 @@ const kBuffered = 1 << 30; function makeBitMapDescriptor(bit) { return { enumerable: false, - get() { return (this.state & bit) !== 0; }, + get() { return (this[kState] & bit) !== 0; }, set(value) { - if (value) this.state |= bit; - else this.state &= ~bit; + if (value) this[kState] |= bit; + else this[kState] &= ~bit; }, }; } @@ -200,13 +201,13 @@ ObjectDefineProperties(WritableState.prototype, { errored: { __proto__: null, enumerable: false, - get() { return (this.state & kErrored) !== 0 ? this[kErroredValue] : null; }, + get() { return (this[kState] & kErrored) !== 0 ? this[kErroredValue] : null; }, set(value) { if (value) { this[kErroredValue] = value; - this.state |= kErrored; + this[kState] |= kErrored; } else { - this.state &= ~kErrored; + this[kState] &= ~kErrored; } }, }, @@ -214,15 +215,15 @@ ObjectDefineProperties(WritableState.prototype, { writable: { __proto__: null, enumerable: false, - get() { return (this.state & kHasWritable) !== 0 ? (this.state & kWritable) !== 0 : undefined; }, + get() { return (this[kState] & kHasWritable) !== 0 ? (this[kState] & kWritable) !== 0 : undefined; }, set(value) { if (value == null) { - this.state &= ~(kHasWritable | kWritable); + this[kState] &= ~(kHasWritable | kWritable); } else if (value) { - this.state |= (kHasWritable | kWritable); + this[kState] |= (kHasWritable | kWritable); } else { - this.state |= kHasWritable; - this.state &= ~kWritable; + this[kState] |= kHasWritable; + this[kState] &= ~kWritable; } }, }, @@ -230,12 +231,12 @@ ObjectDefineProperties(WritableState.prototype, { defaultEncoding: { __proto__: null, enumerable: false, - get() { return (this.state & kDefaultUTF8Encoding) !== 0 ? 'utf8' : this[kDefaultEncodingValue]; }, + get() { return (this[kState] & kDefaultUTF8Encoding) !== 0 ? 'utf8' : this[kDefaultEncodingValue]; }, set(value) { if (value === 'utf8' || value === 'utf-8') { - this.state |= kDefaultUTF8Encoding; + this[kState] |= kDefaultUTF8Encoding; } else { - this.state &= ~kDefaultUTF8Encoding; + this[kState] &= ~kDefaultUTF8Encoding; this[kDefaultEncodingValue] = value; } }, @@ -245,13 +246,13 @@ ObjectDefineProperties(WritableState.prototype, { writecb: { __proto__: null, enumerable: false, - get() { return (this.state & kWriteCb) !== 0 ? this[kWriteCbValue] : nop; }, + get() { return (this[kState] & kWriteCb) !== 0 ? this[kWriteCbValue] : nop; }, set(value) { if (value) { this[kWriteCbValue] = value; - this.state |= kWriteCb; + this[kState] |= kWriteCb; } else { - this.state &= ~kWriteCb; + this[kState] &= ~kWriteCb; } }, }, @@ -261,13 +262,13 @@ ObjectDefineProperties(WritableState.prototype, { afterWriteTickInfo: { __proto__: null, enumerable: false, - get() { return (this.state & kAfterWriteTickInfo) !== 0 ? this[kAfterWriteTickInfoValue] : null; }, + get() { return (this[kState] & kAfterWriteTickInfo) !== 0 ? this[kAfterWriteTickInfoValue] : null; }, set(value) { if (value) { this[kAfterWriteTickInfoValue] = value; - this.state |= kAfterWriteTickInfo; + this[kState] |= kAfterWriteTickInfo; } else { - this.state &= ~kAfterWriteTickInfo; + this[kState] &= ~kAfterWriteTickInfo; } }, }, @@ -275,13 +276,13 @@ ObjectDefineProperties(WritableState.prototype, { buffered: { __proto__: null, enumerable: false, - get() { return (this.state & kBuffered) !== 0 ? this[kBufferedValue] : []; }, + get() { return (this[kState] & kBuffered) !== 0 ? this[kBufferedValue] : []; }, set(value) { this[kBufferedValue] = value; if (value) { - this.state |= kBuffered; + this[kState] |= kBuffered; } else { - this.state &= ~kBuffered; + this[kState] &= ~kBuffered; } }, }, @@ -299,10 +300,10 @@ function WritableState(options, stream, isDuplex) { // Bit map field to store WritableState more effciently with 1 bit per field // instead of a V8 slot per field. - this.state = kSync | kConstructed | kEmitClose | kAutoDestroy; + this[kState] = kSync | kConstructed | kEmitClose | kAutoDestroy; - if (options && options.objectMode) this.state |= kObjectMode; - if (isDuplex && options && options.writableObjectMode) this.state |= kObjectMode; + if (options && options.objectMode) this[kState] |= kObjectMode; + if (isDuplex && options && options.writableObjectMode) this[kState] |= kObjectMode; // The point at which write() starts returning false // Note: 0 is a valid value, means that we always return false if @@ -311,22 +312,22 @@ function WritableState(options, stream, isDuplex) { getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex) : getDefaultHighWaterMark(false); - if (!options || options.decodeStrings !== false) this.state |= kDecodeStrings; + if (!options || options.decodeStrings !== false) this[kState] |= kDecodeStrings; // Should close be emitted on destroy. Defaults to true. - if (options && options.emitClose === false) this.state &= ~kEmitClose; + if (options && options.emitClose === false) this[kState] &= ~kEmitClose; // Should .destroy() be called after 'end' (and potentially 'finish'). - if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy; + if (options && options.autoDestroy === false) this[kState] &= ~kAutoDestroy; // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. // Everything else in the universe uses 'utf8', though. const defaultEncoding = options?.defaultEncoding; if (defaultEncoding == null || defaultEncoding === 'utf8' || defaultEncoding === 'utf-8') { - this.state |= kDefaultUTF8Encoding; + this[kState] |= kDefaultUTF8Encoding; } else if (Buffer.isEncoding(defaultEncoding)) { - this.state &= ~kDefaultUTF8Encoding; + this[kState] &= ~kDefaultUTF8Encoding; this[kDefaultEncodingValue] = defaultEncoding; } else { throw new ERR_UNKNOWN_ENCODING(defaultEncoding); @@ -356,18 +357,18 @@ function WritableState(options, stream, isDuplex) { function resetBuffer(state) { state[kBufferedValue] = null; state.bufferedIndex = 0; - state.state |= kAllBuffers | kAllNoop; - state.state &= ~kBuffered; + state[kState] |= kAllBuffers | kAllNoop; + state[kState] &= ~kBuffered; } WritableState.prototype.getBuffer = function getBuffer() { - return (this.state & kBuffered) === 0 ? [] : ArrayPrototypeSlice(this.buffered, this.bufferedIndex); + return (this[kState] & kBuffered) === 0 ? [] : ArrayPrototypeSlice(this.buffered, this.bufferedIndex); }; ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', { __proto__: null, get() { - return (this.state & kBuffered) === 0 ? 0 : this[kBufferedValue].length - this.bufferedIndex; + return (this[kState] & kBuffered) === 0 ? 0 : this[kBufferedValue].length - this.bufferedIndex; }, }); @@ -414,11 +415,11 @@ function Writable(options) { destroyImpl.construct(this, () => { const state = this._writableState; - if ((state.state & kWriting) === 0) { + if ((state[kState] & kWriting) === 0) { clearBuffer(this, state); } - if ((state.state & kEnding) !== 0) { + if ((state[kState] & kEnding) !== 0) { finishMaybe(this, state); } }); @@ -444,10 +445,10 @@ function _write(stream, chunk, encoding, cb) { if (typeof encoding === 'function') { cb = encoding; - encoding = (state.state & kDefaultUTF8Encoding) !== 0 ? 'utf8' : state.defaultEncoding; + encoding = (state[kState] & kDefaultUTF8Encoding) !== 0 ? 'utf8' : state.defaultEncoding; } else { if (!encoding) - encoding = (state.state & kDefaultUTF8Encoding) !== 0 ? 'utf8' : state.defaultEncoding; + encoding = (state[kState] & kDefaultUTF8Encoding) !== 0 ? 'utf8' : state.defaultEncoding; else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding)) throw new ERR_UNKNOWN_ENCODING(encoding); if (typeof cb !== 'function') @@ -456,9 +457,9 @@ function _write(stream, chunk, encoding, cb) { if (chunk === null) { throw new ERR_STREAM_NULL_VALUES(); - } else if ((state.state & kObjectMode) === 0) { + } else if ((state[kState] & kObjectMode) === 0) { if (typeof chunk === 'string') { - if ((state.state & kDecodeStrings) !== 0) { + if ((state[kState] & kDecodeStrings) !== 0) { chunk = Buffer.from(chunk, encoding); encoding = 'buffer'; } @@ -474,9 +475,9 @@ function _write(stream, chunk, encoding, cb) { } let err; - if ((state.state & kEnding) !== 0) { + if ((state[kState] & kEnding) !== 0) { err = new ERR_STREAM_WRITE_AFTER_END(); - } else if ((state.state & kDestroyed) !== 0) { + } else if ((state[kState] & kDestroyed) !== 0) { err = new ERR_STREAM_DESTROYED('write'); } @@ -496,7 +497,7 @@ Writable.prototype.write = function(chunk, encoding, cb) { Writable.prototype.cork = function() { const state = this._writableState; - state.state |= kCorked; + state[kState] |= kCorked; state.corked++; }; @@ -507,10 +508,10 @@ Writable.prototype.uncork = function() { state.corked--; if (!state.corked) { - state.state &= ~kCorked; + state[kState] &= ~kCorked; } - if ((state.state & kWriting) === 0) + if ((state[kState] & kWriting) === 0) clearBuffer(this, state); } }; @@ -529,42 +530,42 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. function writeOrBuffer(stream, state, chunk, encoding, callback) { - const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length; + const len = (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length; state.length += len; - if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) { - if ((state.state & kBuffered) === 0) { - state.state |= kBuffered; + if ((state[kState] & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) { + if ((state[kState] & kBuffered) === 0) { + state[kState] |= kBuffered; state[kBufferedValue] = []; } state[kBufferedValue].push({ chunk, encoding, callback }); - if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') { - state.state &= ~kAllBuffers; + if ((state[kState] & kAllBuffers) !== 0 && encoding !== 'buffer') { + state[kState] &= ~kAllBuffers; } - if ((state.state & kAllNoop) !== 0 && callback !== nop) { - state.state &= ~kAllNoop; + if ((state[kState] & kAllNoop) !== 0 && callback !== nop) { + state[kState] &= ~kAllNoop; } } else { state.writelen = len; if (callback !== nop) { state.writecb = callback; } - state.state |= kWriting | kSync | kExpectWriteCb; + state[kState] |= kWriting | kSync | kExpectWriteCb; stream._write(chunk, encoding, state.onwrite); - state.state &= ~kSync; + state[kState] &= ~kSync; } const ret = state.length < state.highWaterMark; if (!ret) { - state.state |= kNeedDrain; + state[kState] |= kNeedDrain; } // Return false if errored or destroyed in order to break // any synchronous while(stream.write(data)) loops. - return ret && (state.state & (kDestroyed | kErrored)) === 0; + return ret && (state[kState] & (kDestroyed | kErrored)) === 0; } function doWrite(stream, state, writev, len, chunk, encoding, cb) { @@ -572,14 +573,14 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) { if (cb !== nop) { state.writecb = cb; } - state.state |= kWriting | kSync | kExpectWriteCb; - if ((state.state & kDestroyed) !== 0) + state[kState] |= kWriting | kSync | kExpectWriteCb; + if ((state[kState] & kDestroyed) !== 0) state.onwrite(new ERR_STREAM_DESTROYED('write')); else if (writev) stream._writev(chunk, state.onwrite); else stream._write(chunk, encoding, state.onwrite); - state.state &= ~kSync; + state[kState] &= ~kSync; } function onwriteError(stream, state, er, cb) { @@ -598,15 +599,15 @@ function onwriteError(stream, state, er, cb) { function onwrite(stream, er) { const state = stream._writableState; - if ((state.state & kExpectWriteCb) === 0) { + if ((state[kState] & kExpectWriteCb) === 0) { errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); return; } - const sync = (state.state & kSync) !== 0; - const cb = (state.state & kWriteCb) !== 0 ? state[kWriteCbValue] : nop; + const sync = (state[kState] & kSync) !== 0; + const cb = (state[kState] & kWriteCb) !== 0 ? state[kWriteCbValue] : nop; - state.state &= ~(kWriting | kExpectWriteCb | kWriteCb); + state[kState] &= ~(kWriting | kExpectWriteCb | kWriteCb); state.length -= state.writelen; state.writelen = 0; @@ -614,9 +615,9 @@ function onwrite(stream, er) { // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 er.stack; // eslint-disable-line no-unused-expressions - if ((state.state & kErrored) === 0) { + if ((state[kState] & kErrored) === 0) { state[kErroredValue] = er; - state.state |= kErrored; + state[kState] |= kErrored; } // In case of duplex streams we need to notify the readable side of the @@ -631,38 +632,38 @@ function onwrite(stream, er) { onwriteError(stream, state, er, cb); } } else { - if ((state.state & kBuffered) !== 0) { + if ((state[kState] & kBuffered) !== 0) { clearBuffer(stream, state); } if (sync) { - const needDrain = (state.state & kNeedDrain) !== 0 && state.length === 0; - const needTick = needDrain || (state.state & kDestroyed !== 0) || cb !== nop; + const needDrain = (state[kState] & kNeedDrain) !== 0 && state.length === 0; + const needTick = needDrain || (state[kState] & kDestroyed !== 0) || cb !== nop; // It is a common case that the callback passed to .write() is always // the same. In that case, we do not schedule a new nextTick(), but // rather just increase a counter, to improve performance and avoid // memory allocations. if (cb === nop) { - if ((state.state & kAfterWritePending) === 0 && needTick) { + if ((state[kState] & kAfterWritePending) === 0 && needTick) { process.nextTick(afterWrite, stream, state, 1, cb); - state.state |= kAfterWritePending; + state[kState] |= kAfterWritePending; } else { state.pendingcb--; - if ((state.state & kEnding) !== 0) { + if ((state[kState] & kEnding) !== 0) { finishMaybe(stream, state, true); } } - } else if ((state.state & kAfterWriteTickInfo) !== 0 && + } else if ((state[kState] & kAfterWriteTickInfo) !== 0 && state[kAfterWriteTickInfoValue].cb === cb) { state[kAfterWriteTickInfoValue].count++; } else if (needTick) { state[kAfterWriteTickInfoValue] = { count: 1, cb, stream, state }; process.nextTick(afterWriteTick, state[kAfterWriteTickInfoValue]); - state.state |= (kAfterWritePending | kAfterWriteTickInfo); + state[kState] |= (kAfterWritePending | kAfterWriteTickInfo); } else { state.pendingcb--; - if ((state.state & kEnding) !== 0) { + if ((state[kState] & kEnding) !== 0) { finishMaybe(stream, state, true); } } @@ -673,17 +674,17 @@ function onwrite(stream, er) { } function afterWriteTick({ stream, state, count, cb }) { - state.state &= ~kAfterWriteTickInfo; + state[kState] &= ~kAfterWriteTickInfo; state[kAfterWriteTickInfoValue] = null; return afterWrite(stream, state, count, cb); } function afterWrite(stream, state, count, cb) { - state.state &= ~kAfterWritePending; + state[kState] &= ~kAfterWritePending; - const needDrain = (state.state & (kEnding | kNeedDrain | kDestroyed)) === kNeedDrain && state.length === 0; + const needDrain = (state[kState] & (kEnding | kNeedDrain | kDestroyed)) === kNeedDrain && state.length === 0; if (needDrain) { - state.state &= ~kNeedDrain; + state[kState] &= ~kNeedDrain; stream.emit('drain'); } @@ -692,25 +693,25 @@ function afterWrite(stream, state, count, cb) { cb(null); } - if ((state.state & kDestroyed) !== 0) { + if ((state[kState] & kDestroyed) !== 0) { errorBuffer(state); } - if ((state.state & kEnding) !== 0) { + if ((state[kState] & kEnding) !== 0) { finishMaybe(stream, state, true); } } // If there's something in the buffer waiting, then invoke callbacks. function errorBuffer(state) { - if ((state.state & kWriting) !== 0) { + if ((state[kState] & kWriting) !== 0) { return; } - if ((state.state & kBuffered) !== 0) { + if ((state[kState] & kBuffered) !== 0) { for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { const { chunk, callback } = state[kBufferedValue][n]; - const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length; + const len = (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length; state.length -= len; callback(state.errored ?? new ERR_STREAM_DESTROYED('write')); } @@ -724,11 +725,11 @@ function errorBuffer(state) { // If there's something in the buffer waiting, then process it. function clearBuffer(stream, state) { - if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kBuffered)) !== kBuffered) { + if ((state[kState] & (kDestroyed | kBufferProcessing | kCorked | kBuffered)) !== kBuffered) { return; } - const objectMode = (state.state & kObjectMode) !== 0; + const objectMode = (state[kState] & kObjectMode) !== 0; const { [kBufferedValue]: buffered, bufferedIndex } = state; const bufferedLength = buffered.length - bufferedIndex; @@ -738,20 +739,20 @@ function clearBuffer(stream, state) { let i = bufferedIndex; - state.state |= kBufferProcessing; + state[kState] |= kBufferProcessing; if (bufferedLength > 1 && stream._writev) { state.pendingcb -= bufferedLength - 1; - const callback = (state.state & kAllNoop) !== 0 ? nop : (err) => { + const callback = (state[kState] & kAllNoop) !== 0 ? nop : (err) => { for (let n = i; n < buffered.length; ++n) { buffered[n].callback(err); } }; // Make a copy of `buffered` if it's going to be used by `callback` above, // since `doWrite` will mutate the array. - const chunks = (state.state & kAllNoop) !== 0 && i === 0 ? + const chunks = (state[kState] & kAllNoop) !== 0 && i === 0 ? buffered : ArrayPrototypeSlice(buffered, i); - chunks.allBuffers = (state.state & kAllBuffers) !== 0; + chunks.allBuffers = (state[kState] & kAllBuffers) !== 0; doWrite(stream, state, true, state.length, chunks, '', callback); @@ -762,7 +763,7 @@ function clearBuffer(stream, state) { buffered[i++] = null; const len = objectMode ? 1 : chunk.length; doWrite(stream, state, false, len, chunk, encoding, callback); - } while (i < buffered.length && (state.state & kWriting) === 0); + } while (i < buffered.length && (state[kState] & kWriting) === 0); if (i === buffered.length) { resetBuffer(state); @@ -773,7 +774,7 @@ function clearBuffer(stream, state) { state.bufferedIndex = i; } } - state.state &= ~kBufferProcessing; + state[kState] &= ~kBufferProcessing; } Writable.prototype._write = function(chunk, encoding, cb) { @@ -808,38 +809,38 @@ Writable.prototype.end = function(chunk, encoding, cb) { } // .end() fully uncorks. - if ((state.state & kCorked) !== 0) { + if ((state[kState] & kCorked) !== 0) { state.corked = 1; this.uncork(); } if (err) { // Do nothing... - } else if ((state.state & (kEnding | kErrored)) === 0) { + } else if ((state[kState] & (kEnding | kErrored)) === 0) { // This is forgiving in terms of unnecessary calls to end() and can hide // logic errors. However, usually such errors are harmless and causing a // hard error can be disproportionately destructive. It is not always // trivial for the user to determine whether end() needs to be called // or not. - state.state |= kEnding; + state[kState] |= kEnding; finishMaybe(this, state, true); - state.state |= kEnded; - } else if ((state.state & kFinished) !== 0) { + state[kState] |= kEnded; + } else if ((state[kState] & kFinished) !== 0) { err = new ERR_STREAM_ALREADY_FINISHED('end'); - } else if ((state.state & kDestroyed) !== 0) { + } else if ((state[kState] & kDestroyed) !== 0) { err = new ERR_STREAM_DESTROYED('end'); } if (typeof cb === 'function') { if (err) { process.nextTick(cb, err); - } else if ((state.state & kErrored) !== 0) { + } else if ((state[kState] & kErrored) !== 0) { process.nextTick(cb, state[kErroredValue]); - } else if ((state.state & kFinished) !== 0) { + } else if ((state[kState] & kFinished) !== 0) { process.nextTick(cb, null); } else { - state.state |= kOnFinished; + state[kState] |= kOnFinished; state[kOnFinishedValue] ??= []; state[kOnFinishedValue].push(cb); } @@ -851,7 +852,7 @@ Writable.prototype.end = function(chunk, encoding, cb) { function needFinish(state) { return ( // State is ended && constructed but not destroyed, finished, writing, errorEmitted or closedEmitted - (state.state & ( + (state[kState] & ( kEnding | kDestroyed | kConstructed | @@ -877,9 +878,9 @@ function callFinal(stream, state) { state.pendingcb--; if (err) { callFinishedCallbacks(state, err); - errorOrDestroy(stream, err, (state.state & kSync) !== 0); + errorOrDestroy(stream, err, (state[kState] & kSync) !== 0); } else if (needFinish(state)) { - state.state |= kPrefinished; + state[kState] |= kPrefinished; stream.emit('prefinish'); // Backwards compat. Don't check state.sync here. // Some streams assume 'finish' will be emitted @@ -889,7 +890,7 @@ function callFinal(stream, state) { } } - state.state |= kSync; + state[kState] |= kSync; state.pendingcb++; try { @@ -898,16 +899,16 @@ function callFinal(stream, state) { onFinish(err); } - state.state &= ~kSync; + state[kState] &= ~kSync; } function prefinish(stream, state) { - if ((state.state & (kPrefinished | kFinalCalled)) === 0) { - if (typeof stream._final === 'function' && (state.state & kDestroyed) === 0) { - state.state |= kFinalCalled; + if ((state[kState] & (kPrefinished | kFinalCalled)) === 0) { + if (typeof stream._final === 'function' && (state[kState] & kDestroyed) === 0) { + state[kState] |= kFinalCalled; callFinal(stream, state); } else { - state.state |= kPrefinished; + state[kState] |= kPrefinished; stream.emit('prefinish'); } } @@ -936,13 +937,13 @@ function finishMaybe(stream, state, sync) { function finish(stream, state) { state.pendingcb--; - state.state |= kFinished; + state[kState] |= kFinished; callFinishedCallbacks(state, null); stream.emit('finish'); - if ((state.state & kAutoDestroy) !== 0) { + if ((state[kState] & kAutoDestroy) !== 0) { // In case of duplex streams we need a way to detect // if the readable side is ready for autoDestroy as well. const rState = stream._readableState; @@ -959,13 +960,13 @@ function finish(stream, state) { } function callFinishedCallbacks(state, err) { - if ((state.state & kOnFinished) === 0) { + if ((state[kState] & kOnFinished) === 0) { return; } const onfinishCallbacks = state[kOnFinishedValue]; state[kOnFinishedValue] = null; - state.state &= ~kOnFinished; + state[kState] &= ~kOnFinished; for (let i = 0; i < onfinishCallbacks.length; i++) { onfinishCallbacks[i](err); } @@ -975,21 +976,21 @@ ObjectDefineProperties(Writable.prototype, { closed: { __proto__: null, get() { - return this._writableState ? (this._writableState.state & kClosed) !== 0 : false; + return this._writableState ? (this._writableState[kState] & kClosed) !== 0 : false; }, }, destroyed: { __proto__: null, get() { - return this._writableState ? (this._writableState.state & kDestroyed) !== 0 : false; + return this._writableState ? (this._writableState[kState] & kDestroyed) !== 0 : false; }, set(value) { // Backward compatibility, the user is explicitly managing destroyed. if (!this._writableState) return; - if (value) this._writableState.state |= kDestroyed; - else this._writableState.state &= ~kDestroyed; + if (value) this._writableState[kState] |= kDestroyed; + else this._writableState[kState] &= ~kDestroyed; }, }, @@ -1002,7 +1003,7 @@ ObjectDefineProperties(Writable.prototype, { // Compat. The user might manually disable writable side through // deprecated setter. return !!w && w.writable !== false && !w.errored && - (w.state & (kEnding | kEnded | kDestroyed)) === 0; + (w[kState] & (kEnding | kEnded | kDestroyed)) === 0; }, set(val) { // Backwards compatible. @@ -1016,7 +1017,7 @@ ObjectDefineProperties(Writable.prototype, { __proto__: null, get() { const state = this._writableState; - return state ? (state.state & kFinished) !== 0 : false; + return state ? (state[kState] & kFinished) !== 0 : false; }, }, @@ -1024,7 +1025,7 @@ ObjectDefineProperties(Writable.prototype, { __proto__: null, get() { const state = this._writableState; - return state ? (state.state & kObjectMode) !== 0 : false; + return state ? (state[kState] & kObjectMode) !== 0 : false; }, }, @@ -1040,7 +1041,7 @@ ObjectDefineProperties(Writable.prototype, { __proto__: null, get() { const state = this._writableState; - return state ? (state.state & kEnding) !== 0 : false; + return state ? (state[kState] & kEnding) !== 0 : false; }, }, @@ -1048,7 +1049,7 @@ ObjectDefineProperties(Writable.prototype, { __proto__: null, get() { const state = this._writableState; - return state ? (state.state & (kDestroyed | kEnding | kNeedDrain)) === kNeedDrain : false; + return state ? (state[kState] & (kDestroyed | kEnding | kNeedDrain)) === kNeedDrain : false; }, }, @@ -1090,9 +1091,9 @@ ObjectDefineProperties(Writable.prototype, { get: function() { const state = this._writableState; return ( - (state.state & (kHasWritable | kWritable)) !== kHasWritable && - (state.state & (kDestroyed | kErrored)) !== 0 && - (state.state & kFinished) === 0 + (state[kState] & (kHasWritable | kWritable)) !== kHasWritable && + (state[kState] & (kDestroyed | kErrored)) !== 0 && + (state[kState] & kFinished) === 0 ); }, }, @@ -1103,7 +1104,7 @@ Writable.prototype.destroy = function(err, cb) { const state = this._writableState; // Invoke pending callbacks. - if ((state.state & (kBuffered | kOnFinished | kDestroyed)) !== kDestroyed) { + if ((state[kState] & (kBuffered | kOnFinished | kDestroyed)) !== kDestroyed) { process.nextTick(errorBuffer, state); }