Skip to content

Commit

Permalink
fixup! stream: refactor to use more primordials
Browse files Browse the repository at this point in the history
  • Loading branch information
aduh95 committed Jan 28, 2021
1 parent 4381dd3 commit b93fda8
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 23 deletions.
12 changes: 5 additions & 7 deletions lib/internal/streams/buffer_list.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const {
SymbolIterator,
Uint8Array,
TypedArrayPrototypeSet,
TypedArrayPrototypeSlice,
} = primordials;

const { Buffer } = require('buffer');
Expand Down Expand Up @@ -81,11 +80,10 @@ module.exports = class BufferList {
consume(n, hasStrings) {
const data = this.head.data;
if (n < data.length) {
const slice = typeof data === 'string' ?
StringPrototypeSlice :
TypedArrayPrototypeSlice;
this.head.data = slice(data, n);
return slice(data, 0, n);
// `slice` is the same for buffers and strings.
const slice = data.slice(0, n);
this.head.data = data.slice(n);
return slice;
}
if (n === data.length) {
// First chunk is a perfect match.
Expand Down Expand Up @@ -160,7 +158,7 @@ module.exports = class BufferList {
new Uint8Array(buf.buffer, buf.byteOffset, n),
retLen - n);
this.head = p;
p.data = TypedArrayPrototypeSlice(buf, n);
p.data = buf.slice(n);
}
break;
}
Expand Down
5 changes: 2 additions & 3 deletions lib/internal/streams/duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
'use strict';

const {
FunctionPrototypeCall,
ObjectDefineProperties,
ObjectGetOwnPropertyDescriptor,
ObjectKeys,
Expand All @@ -54,8 +53,8 @@ function Duplex(options) {
if (!(this instanceof Duplex))
return new Duplex(options);

FunctionPrototypeCall(Readable, this, options);
FunctionPrototypeCall(Writable, this, options);
Readable.call(this, options);
Writable.call(this, options);
this.allowHalfOpen = true;

if (options) {
Expand Down
4 changes: 2 additions & 2 deletions lib/internal/streams/lazy_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
'use strict';

const {
FunctionPrototypeCall,
ObjectDefineProperties,
ObjectDefineProperty,
ObjectSetPrototypeOf,
ReflectApply,
} = primordials;

const stream = require('stream');
Expand All @@ -26,7 +26,7 @@ ObjectSetPrototypeOf(LazyTransform, stream.Transform);

function makeGetter(name) {
return function() {
ReflectApply(stream.Transform, this, [this._options]);
FunctionPrototypeCall(stream.Transform, this, this._options);
this._writableState.decodeStrings = false;

if (!this._options || !this._options.defaultEncoding) {
Expand Down
7 changes: 4 additions & 3 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ function Readable(options) {
addAbortSignalNoValidate(options.signal, this);
}

ReflectApply(Stream, this, [options]);
FunctionPrototypeCall(Stream, this, options);

destroyImpl.construct(this, () => {
if (this._readableState.needReadable) {
Expand Down Expand Up @@ -876,7 +876,7 @@ Readable.prototype.unpipe = function(dest) {
// Set up data events if they are asked for
// Ensure readable listeners eventually get something.
Readable.prototype.on = function(ev, fn) {
const res = ReflectApply(Stream.prototype.on, this, [ev, fn]);
const res = FunctionPrototypeCall(Stream.prototype.on, this, ev, fn);
const state = this._readableState;

if (ev === 'data') {
Expand Down Expand Up @@ -906,7 +906,8 @@ Readable.prototype.on = function(ev, fn) {
Readable.prototype.addListener = Readable.prototype.on;

Readable.prototype.removeListener = function(ev, fn) {
const res = ReflectApply(Stream.prototype.removeListener, this, [ev, fn]);
const res = FunctionPrototypeCall(Stream.prototype.removeListener, this,
ev, fn);

if (ev === 'readable') {
// We need to check if there is someone still listening to
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/streams/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ function Transform(options) {
if (!(this instanceof Transform))
return new Transform(options);

FunctionPrototypeCall(Duplex, this, options);
Duplex.call(this, options);

// We have implemented the _read method, and done the other things
// that Readable wants before the first _read call, so unset the
Expand Down
21 changes: 14 additions & 7 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,12 @@ const {
ArrayPrototypePush,
ArrayPrototypeSlice,
ArrayPrototypeSplice,
FunctionPrototype,
Error,
FunctionPrototypeBind,
FunctionPrototypeCall,
FunctionPrototypeSymbolHasInstance,
ObjectDefineProperty,
ObjectDefineProperties,
ObjectSetPrototypeOf,
ReflectApply,
StringPrototypeToLowerCase,
Symbol,
SymbolHasInstance,
Expand Down Expand Up @@ -77,7 +74,7 @@ const { errorOrDestroy } = destroyImpl;
ObjectSetPrototypeOf(Writable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Writable, Stream);

const nop = FunctionPrototype;
function nop() {}

const kOnFinished = Symbol('kOnFinished');

Expand Down Expand Up @@ -154,7 +151,7 @@ function WritableState(options, stream, isDuplex) {
this.bufferProcessing = false;

// The callback that's passed to _write(chunk, cb).
this.onwrite = FunctionPrototypeBind(onwrite, undefined, stream);
this.onwrite = onwrite.bind(undefined, stream);

// The callback that the user supplies to write(chunk, encoding, cb).
this.writecb = null;
Expand Down Expand Up @@ -273,6 +270,16 @@ function Writable(options) {
});
}

ObjectDefineProperty(Writable, SymbolHasInstance, {
value: function(object) {
if (FunctionPrototypeSymbolHasInstance(this, object)) return true;
if (this !== Writable) return false;

return object && object._writableState instanceof WritableState;
},
});


// Otherwise people can pipe Writable streams, which is just wrong.
Writable.prototype.pipe = function() {
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
Expand Down Expand Up @@ -372,7 +379,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
state.needDrain = true;

if (state.writing || state.corked || state.errored || !state.constructed) {
ArrayPrototypePush(state.buffered, { chunk, encoding, callback });
state.buffered.push({ chunk, encoding, callback });
if (state.allBuffers && encoding !== 'buffer') {
state.allBuffers = false;
}
Expand Down Expand Up @@ -855,7 +862,7 @@ Writable.prototype.destroy = function(err, cb) {
process.nextTick(errorBuffer, state);
}

ReflectApply(destroy, this, [err, cb]);
FunctionPrototypeCall(destroy, this, err, cb);
return this;
};

Expand Down

0 comments on commit b93fda8

Please sign in to comment.