Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

stream: switch _writableState.buffer to queue #8826

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 48 additions & 20 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ Writable.WritableState = WritableState;

var util = require('util');
var Stream = require('stream');
var debug = util.debuglog('stream');

util.inherits(Writable, Stream);

function WriteReq(chunk, encoding, cb) {
this.chunk = chunk;
this.encoding = encoding;
this.callback = cb;
this.next = null;
}

function WritableState(options, stream) {
Expand Down Expand Up @@ -109,7 +111,8 @@ function WritableState(options, stream) {
// the amount that is being written when _write is called.
this.writelen = 0;

this.buffer = [];
this.bufferedRequest = null;
this.lastBufferedRequest = null;

// number of pending user-supplied write callbacks
// this must be 0 before 'finish' can be emitted
Expand All @@ -123,6 +126,23 @@ function WritableState(options, stream) {
this.errorEmitted = false;
}

WritableState.prototype.getBuffer = function writableStateGetBuffer() {
var current = this.bufferedRequest;
var out = [];
while (current) {
out.push(current);
current = current.next;
}
return out;
};

Object.defineProperty(WritableState.prototype, 'buffer', {
get: util.deprecate(function() {
return this.getBuffer();
}, '_writableState.buffer is deprecated. Use ' +
'_writableState.getBuffer() instead.')
});

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is internal/private/undocumented state, I'm not sure we need to do this deprecation warning as we've never really explained to people how to interact with this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool -- I was mostly concerned with supporting projects like vstream that reach into ReadableState for metrics. I figured that if there's one I know about, there's probably more that I don't know about :)


function Writable(options) {
// Writable ctor is applied to Duplexes, though they're not
// instanceof Writable, they're instanceof Readable.
Expand Down Expand Up @@ -216,7 +236,7 @@ Writable.prototype.uncork = function() {
!state.corked &&
!state.finished &&
!state.bufferProcessing &&
state.buffer.length)
state.bufferedRequest)
clearBuffer(this, state);
}
};
Expand Down Expand Up @@ -255,8 +275,15 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
if (!ret)
state.needDrain = true;

if (state.writing || state.corked)
state.buffer.push(new WriteReq(chunk, encoding, cb));
if (state.writing || state.corked) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
if (last) {
last.next = state.lastBufferedRequest;
} else {
state.bufferedRequest = state.lastBufferedRequest;
}
}
else
doWrite(stream, state, false, len, chunk, encoding, cb);

Expand Down Expand Up @@ -313,7 +340,7 @@ function onwrite(stream, er) {
if (!finished &&
!state.corked &&
!state.bufferProcessing &&
state.buffer.length) {
state.bufferedRequest) {
clearBuffer(stream, state);
}

Expand Down Expand Up @@ -349,52 +376,53 @@ function onwriteDrain(stream, state) {
// if there's something in the buffer waiting, then process it
function clearBuffer(stream, state) {
state.bufferProcessing = true;
var entry = state.bufferedRequest;

if (stream._writev && state.buffer.length > 1) {
if (stream._writev && entry && entry.next) {
// Fast case, write everything using _writev()
var buffer = [];
var cbs = [];
for (var c = 0; c < state.buffer.length; c++)
cbs.push(state.buffer[c].callback);
while (entry) {
cbs.push(entry.callback);
buffer.push(entry);
entry = entry.next;
}

// count the one we are adding, as well.
// TODO(isaacs) clean this up
state.pendingcb++;
doWrite(stream, state, true, state.length, state.buffer, '', function(err) {
state.lastBufferedRequest = null;
doWrite(stream, state, true, state.length, buffer, '', function(err) {
for (var i = 0; i < cbs.length; i++) {
state.pendingcb--;
cbs[i](err);
}
});

// Clear buffer
state.buffer = [];
} else {
// Slow case, write chunks one-by-one
for (var c = 0; c < state.buffer.length; c++) {
var entry = state.buffer[c];
while (entry) {
var chunk = entry.chunk;
var encoding = entry.encoding;
var cb = entry.callback;
var len = state.objectMode ? 1 : chunk.length;

doWrite(stream, state, false, len, chunk, encoding, cb);

entry = entry.next;
// if we didn't call the onwrite immediately, then
// it means that we need to wait until it does.
// also, that means that the chunk and cb are currently
// being processed, so move the buffer counter past them.
if (state.writing) {
c++;
break;
}
}

if (c < state.buffer.length)
state.buffer = state.buffer.slice(c);
else
state.buffer.length = 0;
if (entry === null)
state.lastBufferedRequest = null;
}

state.bufferedRequest = entry;
state.bufferProcessing = false;
}

Expand Down Expand Up @@ -435,7 +463,7 @@ Writable.prototype.end = function(chunk, encoding, cb) {
function needFinish(stream, state) {
return (state.ending &&
state.length === 0 &&
state.buffer.length === 0 &&
state.bufferedRequest === null &&
!state.finished &&
!state.writing);
}
Expand Down
2 changes: 1 addition & 1 deletion lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ Socket.prototype.__defineGetter__('bytesWritten', function() {
data = this._pendingData,
encoding = this._pendingEncoding;

state.buffer.forEach(function(el) {
state.getBuffer().forEach(function(el) {
if (util.isBuffer(el.chunk))
bytes += el.chunk.length;
else
Expand Down
2 changes: 1 addition & 1 deletion test/simple/test-stream2-transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ test('writable side consumption', function(t) {
t.equal(tx._readableState.length, 10);
t.equal(transformed, 10);
t.equal(tx._transformState.writechunk.length, 5);
t.same(tx._writableState.buffer.map(function(c) {
t.same(tx._writableState.getBuffer().map(function(c) {
return c.chunk.length;
}), [6, 7, 8, 9, 10]);

Expand Down