Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: support readable/writableHighWaterMark for Duplex streams #14636

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1752,6 +1752,10 @@ constructor and implement *both* the `readable._read()` and
* `writableObjectMode` {boolean} Defaults to `false`. Sets `objectMode`
for writable side of the stream. Has no effect if `objectMode`
is `true`.
* `readableHighWaterMark` {number} Sets `highWaterMark` for the readable side
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestions:
Add a link to [#buffering]
Also in the first * (options) add links to [#constructor-new-streamwritableoptions] and [#new-streamreadableoptions]

of the stream. Has no effect if `highWaterMark` is provided.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd phrase it as The general `highWaterMark` option overrides this one. IMHO a positive-active sentence is a little bit clearer than a negative-passive one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that positive is better. I just copied this format from readable/writableObjectMode.

* `writableHighWaterMark` {number} Sets `highWaterMark` for the writable side
of the stream. Has no effect if `highWaterMark` is provided.

For example:

Expand Down
18 changes: 16 additions & 2 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,32 @@ function prependListener(emitter, event, fn) {
function ReadableState(options, stream) {
options = options || {};

// Duplex streams are both readable and writable, but share
// the same options object.
// However, some cases require setting options to different
// values for the readable and the writable sides of the duplex stream.
// These options can be provided separately as readableXXX and writableXXX.
var isDuplex = stream instanceof Stream.Duplex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcollina isn't instanceof discouraged?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but this was already there, this PR just moves it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion it is best to allow it also for Readable/Writable without checking for Duplex, because this is an inherited behavior that behaves different on the base class vs the child class, and this is more confusing than helping. Not sure if this will be agreed by any developer reading the docs though. I wonder what you guys think.


// object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away
this.objectMode = !!options.objectMode;

if (stream instanceof Stream.Duplex)
if (isDuplex)
this.objectMode = this.objectMode || !!options.readableObjectMode;

// the point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
var hwm = options.highWaterMark;
var readableHwm = options.readableHighWaterMark;
var defaultHwm = this.objectMode ? 16 : 16 * 1024;
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;

if (hwm || hwm === 0)
this.highWaterMark = hwm;
else if (isDuplex && (readableHwm || readableHwm === 0))
this.highWaterMark = readableHwm;
else
this.highWaterMark = defaultHwm;

// cast to ints.
this.highWaterMark = Math.floor(this.highWaterMark);
Expand Down
18 changes: 16 additions & 2 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,33 @@ function nop() {}
function WritableState(options, stream) {
options = options || {};

// Duplex streams are both readable and writable, but share
// the same options object.
// However, some cases require setting options to different
// values for the readable and the writable sides of the duplex stream.
// These options can be provided separately as readableXXX and writableXXX.
var isDuplex = stream instanceof Stream.Duplex;

// object stream flag to indicate whether or not this stream
// contains buffers or objects.
this.objectMode = !!options.objectMode;

if (stream instanceof Stream.Duplex)
if (isDuplex)
this.objectMode = this.objectMode || !!options.writableObjectMode;

// the point at which write() starts returning false
// Note: 0 is a valid value, means that we always return false if
// the entire buffer is not flushed immediately on write()
var hwm = options.highWaterMark;
var writableHwm = options.writableHighWaterMark;
var defaultHwm = this.objectMode ? 16 : 16 * 1024;
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;

if (hwm || hwm === 0)
this.highWaterMark = hwm;
else if (isDuplex && (writableHwm || writableHwm === 0))
this.highWaterMark = writableHwm;
else
this.highWaterMark = defaultHwm;

// cast to ints.
this.highWaterMark = Math.floor(this.highWaterMark);
Expand Down
71 changes: 71 additions & 0 deletions test/parallel/test-stream-transform-split-highwatermark.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
'use strict';
require('../common');
const assert = require('assert');

const { Transform, Readable, Writable } = require('stream');

const DEFAULT = 16 * 1024;

function testTransform(expectedReadableHwm, expectedWritableHwm, options) {
const t = new Transform(options);
assert.strictEqual(t._readableState.highWaterMark, expectedReadableHwm);
assert.strictEqual(t._writableState.highWaterMark, expectedWritableHwm);
}

// test overriding defaultHwm
testTransform(666, DEFAULT, { readableHighWaterMark: 666 });
testTransform(DEFAULT, 777, { writableHighWaterMark: 777 });
testTransform(666, 777, {
readableHighWaterMark: 666,
writableHighWaterMark: 777,
});

// test 0 overriding defaultHwm
testTransform(0, DEFAULT, { readableHighWaterMark: 0 });
testTransform(DEFAULT, 0, { writableHighWaterMark: 0 });

// test highWaterMark overriding
testTransform(555, 555, {
highWaterMark: 555,
readableHighWaterMark: 666,
});
testTransform(555, 555, {
highWaterMark: 555,
writableHighWaterMark: 777,
});
testTransform(555, 555, {
highWaterMark: 555,
readableHighWaterMark: 666,
writableHighWaterMark: 777,
});

// test highWaterMark = 0 overriding
testTransform(0, 0, {
highWaterMark: 0,
readableHighWaterMark: 666,
});
testTransform(0, 0, {
highWaterMark: 0,
writableHighWaterMark: 777,
});
testTransform(0, 0, {
highWaterMark: 0,
readableHighWaterMark: 666,
writableHighWaterMark: 777,
});

// test undefined, null, NaN
[undefined, null, NaN].forEach((v) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens with ['', 'foo', ()=>{}, Symbol('frog'), {}, []]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@refack The behavior will be unspecified in these cases, since highWaterMark is expected to be a number, but only checked to be a truthy value or 0.

This was also the case before my PR, though I would prefer it to be a specified behavior instead.

Perhaps the behavior should be to ignore non number values or else throw TypeError as a breaking change...

testTransform(DEFAULT, DEFAULT, { readableHighWaterMark: v });
testTransform(DEFAULT, DEFAULT, { writableHighWaterMark: v });
testTransform(666, DEFAULT, { highWaterMark: v, readableHighWaterMark: 666 });
testTransform(DEFAULT, 777, { highWaterMark: v, writableHighWaterMark: 777 });
});

// test non Duplex streams ignore the options
{
const r = new Readable({ readableHighWaterMark: 666 });
assert.strictEqual(r._readableState.highWaterMark, DEFAULT);
const w = new Writable({ writableHighWaterMark: 777 });
assert.strictEqual(w._writableState.highWaterMark, DEFAULT);
}