Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: reduce parts in chunked response when corking #50167

Merged
merged 4 commits into from
Oct 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 87 additions & 41 deletions lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ let debug = require('internal/util/debuglog').debuglog('http', (fn) => {
});

const kCorked = Symbol('corked');
const kSocket = Symbol('kSocket');
const kChunkedBuffer = Symbol('kChunkedBuffer');
const kChunkedLength = Symbol('kChunkedLength');
const kUniqueHeaders = Symbol('kUniqueHeaders');
const kBytesWritten = Symbol('kBytesWritten');
const kErrored = Symbol('errored');
Expand Down Expand Up @@ -140,9 +143,11 @@ function OutgoingMessage(options) {
this.finished = false;
this._headerSent = false;
this[kCorked] = 0;
this[kChunkedBuffer] = [];
this[kChunkedLength] = 0;
this._closed = false;

this.socket = null;
this[kSocket] = null;
this._header = null;
this[kOutHeaders] = null;

Expand Down Expand Up @@ -177,7 +182,7 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableFinished', {
return (
this.finished &&
this.outputSize === 0 &&
(!this.socket || this.socket.writableLength === 0)
(!this[kSocket] || this[kSocket].writableLength === 0)
);
},
});
Expand All @@ -192,22 +197,21 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableObjectMode', {
ObjectDefineProperty(OutgoingMessage.prototype, 'writableLength', {
__proto__: null,
get() {
return this.outputSize + (this.socket ? this.socket.writableLength : 0);
return this.outputSize + this[kChunkedLength] + (this[kSocket] ? this[kSocket].writableLength : 0);
},
});

ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', {
__proto__: null,
get() {
return this.socket ? this.socket.writableHighWaterMark : this[kHighWaterMark];
return this[kSocket] ? this[kSocket].writableHighWaterMark : this[kHighWaterMark];
},
});

ObjectDefineProperty(OutgoingMessage.prototype, 'writableCorked', {
__proto__: null,
get() {
const corked = this.socket ? this.socket.writableCorked : 0;
return corked + this[kCorked];
return this[kCorked];
},
});

Expand Down Expand Up @@ -235,13 +239,27 @@ ObjectDefineProperty(OutgoingMessage.prototype, '_headers', {
ObjectDefineProperty(OutgoingMessage.prototype, 'connection', {
__proto__: null,
get: function() {
return this.socket;
return this[kSocket];
},
set: function(val) {
this.socket = val;
ronag marked this conversation as resolved.
Show resolved Hide resolved
},
});

ObjectDefineProperty(OutgoingMessage.prototype, 'socket', {
__proto__: null,
get: function() {
return this[kSocket];
},
set: function(val) {
for (let n = 0; n < this[kCorked]; n++) {
val?.cork();
this[kSocket]?.uncork();
}
this[kSocket] = val;
},
});

ObjectDefineProperty(OutgoingMessage.prototype, '_headerNames', {
__proto__: null,
get: internalUtil.deprecate(function() {
Expand Down Expand Up @@ -299,19 +317,45 @@ OutgoingMessage.prototype._renderHeaders = function _renderHeaders() {
};

OutgoingMessage.prototype.cork = function() {
if (this.socket) {
this.socket.cork();
} else {
this[kCorked]++;
this[kCorked]++;
if (this[kSocket]) {
this[kSocket].cork();
}
};

OutgoingMessage.prototype.uncork = function() {
if (this.socket) {
this.socket.uncork();
} else if (this[kCorked]) {
this[kCorked]--;
this[kCorked]--;
if (this[kSocket]) {
this[kSocket].uncork();
}

if (this[kCorked] || this[kChunkedBuffer].length === 0) {
return;
}

const len = this[kChunkedLength];
const buf = this[kChunkedBuffer];

assert(this.chunkedEncoding);

let callbacks;
this._send(NumberPrototypeToString(len, 16), 'latin1', null);
this._send(crlf_buf, null, null);
for (let n = 0; n < buf.length; n += 3) {
this._send(buf[n + 0], buf[n + 1], null);
if (buf[n + 2]) {
callbacks ??= [];
callbacks.push(buf[n + 2]);
ronag marked this conversation as resolved.
Show resolved Hide resolved
}
}
this._send(crlf_buf, null, callbacks.length ? (err) => {
for (const callback of callbacks) {
callback(err);
}
} : null);

this[kChunkedBuffer].length = 0;
this[kChunkedLength] = 0;
};

OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) {
Expand All @@ -320,12 +364,12 @@ OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) {
this.on('timeout', callback);
}

if (!this.socket) {
if (!this[kSocket]) {
this.once('socket', function socketSetTimeoutOnConnect(socket) {
socket.setTimeout(msecs);
});
} else {
this.socket.setTimeout(msecs);
this[kSocket].setTimeout(msecs);
}
return this;
};
Expand All @@ -342,8 +386,8 @@ OutgoingMessage.prototype.destroy = function destroy(error) {

this[kErrored] = error;

if (this.socket) {
this.socket.destroy(error);
if (this[kSocket]) {
this[kSocket].destroy(error);
} else {
this.once('socket', function socketDestroyOnConnect(socket) {
socket.destroy(error);
Expand Down Expand Up @@ -382,7 +426,7 @@ OutgoingMessage.prototype._send = function _send(data, encoding, callback, byteL

OutgoingMessage.prototype._writeRaw = _writeRaw;
function _writeRaw(data, encoding, callback, size) {
const conn = this.socket;
const conn = this[kSocket];
if (conn && conn.destroyed) {
// The socket was destroyed. If we're still trying to write to it,
// then we haven't gotten the 'close' event yet.
Expand Down Expand Up @@ -938,10 +982,16 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
let ret;
if (msg.chunkedEncoding && chunk.length !== 0) {
len ??= typeof chunk === 'string' ? Buffer.byteLength(chunk, encoding) : chunk.byteLength;
msg._send(NumberPrototypeToString(len, 16), 'latin1', null);
msg._send(crlf_buf, null, null);
msg._send(chunk, encoding, null, len);
ret = msg._send(crlf_buf, null, callback);
if (msg[kCorked] && msg._headerSent) {
msg[kChunkedBuffer].push(chunk, encoding, callback);
msg[kChunkedLength] += len;
ret = msg[kChunkedLength] < msg[kHighWaterMark];
} else {
msg._send(NumberPrototypeToString(len, 16), 'latin1', null);
msg._send(crlf_buf, null, null);
msg._send(chunk, encoding, null, len);
ret = msg._send(crlf_buf, null, callback);
}
} else {
ret = msg._send(chunk, encoding, callback, len);
}
Expand Down Expand Up @@ -1023,8 +1073,8 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
return this;
}

if (this.socket) {
this.socket.cork();
if (this[kSocket]) {
this[kSocket].cork();
}

write_(this, chunk, encoding, null, true);
Expand All @@ -1038,8 +1088,8 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
}
return this;
} else if (!this._header) {
if (this.socket) {
this.socket.cork();
if (this[kSocket]) {
this[kSocket].cork();
}

this._contentLength = 0;
Expand All @@ -1063,21 +1113,22 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
process.nextTick(finish);
}

if (this.socket) {
if (this[kSocket]) {
// Fully uncork connection on end().
this.socket._writableState.corked = 1;
this.socket.uncork();
this[kSocket]._writableState.corked = 1;
this[kSocket].uncork();
}
this[kCorked] = 0;
this[kCorked] = 1;
this.uncork();

this.finished = true;

// There is the first message on the outgoing queue, and we've sent
// everything to the socket.
debug('outgoing message end.');
if (this.outputData.length === 0 &&
this.socket &&
this.socket._httpMessage === this) {
this[kSocket] &&
this[kSocket]._httpMessage === this) {
this._finish();
}

Expand All @@ -1088,7 +1139,7 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
// This function is called once all user data are flushed to the socket.
// Note that it has a chance that the socket is not drained.
OutgoingMessage.prototype._finish = function _finish() {
assert(this.socket);
assert(this[kSocket]);
this.emit('prefinish');
};

Expand All @@ -1113,7 +1164,7 @@ OutgoingMessage.prototype._finish = function _finish() {
// This function, _flush(), is called by both the Server and Client
// to attempt to flush any pending messages out to the socket.
OutgoingMessage.prototype._flush = function _flush() {
const socket = this.socket;
const socket = this[kSocket];

if (socket && socket.writable) {
// There might be remaining data in this.output; write it out
Expand All @@ -1130,11 +1181,6 @@ OutgoingMessage.prototype._flush = function _flush() {
};

OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) {
while (this[kCorked]) {
this[kCorked]--;
socket.cork();
}

const outputLength = this.outputData.length;
if (outputLength <= 0)
return undefined;
Expand Down