From 71051797b4b0d9c041a31389c1b00b0ab4a63b9c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 12 Oct 2023 22:34:34 +0200 Subject: [PATCH 1/4] http: reduce parts in chunked response when corking Refs: https://github.com/nodejs/performance/issues/57 PR-URL: https://github.com/nodejs/node/pull/50167 --- lib/_http_outgoing.js | 69 ++++++++++++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 178a3418dace0a..90a33785cc7343 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -82,6 +82,8 @@ let debug = require('internal/util/debuglog').debuglog('http', (fn) => { }); const kCorked = Symbol('corked'); +const kChunkedBuffer = Symbol('kChunkedBuffer'); +const kChunkedLength = Symbol('kChunkedLength'); const kUniqueHeaders = Symbol('kUniqueHeaders'); const kBytesWritten = Symbol('kBytesWritten'); const kErrored = Symbol('errored'); @@ -140,6 +142,8 @@ function OutgoingMessage(options) { this.finished = false; this._headerSent = false; this[kCorked] = 0; + this[kChunkedBuffer] = []; + this[kChunkedLength] = 0; this._closed = false; this.socket = null; @@ -192,7 +196,7 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableObjectMode', { ObjectDefineProperty(OutgoingMessage.prototype, 'writableLength', { __proto__: null, get() { - return this.outputSize + (this.socket ? this.socket.writableLength : 0); + return this.outputSize + this[kChunkedLength] + (this.socket ? this.socket.writableLength : 0); }, }); @@ -206,8 +210,7 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', { ObjectDefineProperty(OutgoingMessage.prototype, 'writableCorked', { __proto__: null, get() { - const corked = this.socket ? this.socket.writableCorked : 0; - return corked + this[kCorked]; + return this[kCorked]; }, }); @@ -238,6 +241,10 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'connection', { return this.socket; }, set: function(val) { + for (let n = 0; n < this[kCorked]; n++) { + val?.cork(); + this.socket?.uncork(); + } this.socket = val; }, }); @@ -299,19 +306,45 @@ OutgoingMessage.prototype._renderHeaders = function _renderHeaders() { }; OutgoingMessage.prototype.cork = function() { + this[kCorked]++; if (this.socket) { this.socket.cork(); - } else { - this[kCorked]++; } }; OutgoingMessage.prototype.uncork = function() { + this[kCorked]--; if (this.socket) { this.socket.uncork(); - } else if (this[kCorked]) { - this[kCorked]--; } + + if (this[kCorked] || this[kChunkedBuffer].length === 0) { + return; + } + + const len = this[kChunkedLength]; + const buf = this[kChunkedBuffer]; + + assert(this.chunkedEncoding); + + let callbacks; + this._send(NumberPrototypeToString(len, 16), 'latin1', null); + this._send(crlf_buf, null, null); + for (let n = 0; n < buf.length; n += 3) { + this._send(buf[n + 0], buf[n + 1], null); + if (buf[n + 2]) { + callbacks ??= []; + callbacks.push(buf[n + 2]); + } + } + this._send(crlf_buf, null, callbacks.length ? (err) => { + for (const callback of callbacks) { + callback(err); + } + } : null); + + this[kChunkedBuffer].length = 0; + this[kChunkedLength] = 0; }; OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) { @@ -938,10 +971,16 @@ function write_(msg, chunk, encoding, callback, fromEnd) { let ret; if (msg.chunkedEncoding && chunk.length !== 0) { len ??= typeof chunk === 'string' ? Buffer.byteLength(chunk, encoding) : chunk.byteLength; - msg._send(NumberPrototypeToString(len, 16), 'latin1', null); - msg._send(crlf_buf, null, null); - msg._send(chunk, encoding, null, len); - ret = msg._send(crlf_buf, null, callback); + if (msg[kCorked] && msg._headerSent) { + msg[kChunkedBuffer].push(chunk, encoding, callback); + msg[kChunkedLength] += len; + ret = msg[kChunkedLength] < msg[kHighWaterMark]; + } else { + msg._send(NumberPrototypeToString(len, 16), 'latin1', null); + msg._send(crlf_buf, null, null); + msg._send(chunk, encoding, null, len); + ret = msg._send(crlf_buf, null, callback); + } } else { ret = msg._send(chunk, encoding, callback, len); } @@ -1068,7 +1107,8 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { this.socket._writableState.corked = 1; this.socket.uncork(); } - this[kCorked] = 0; + this[kCorked] = 1; + this.uncork(); this.finished = true; @@ -1130,11 +1170,6 @@ OutgoingMessage.prototype._flush = function _flush() { }; OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) { - while (this[kCorked]) { - this[kCorked]--; - socket.cork(); - } - const outputLength = this.outputData.length; if (outputLength <= 0) return undefined; From af99b19721e986e11df3ea64b19cb952c0b36639 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 13 Oct 2023 18:11:34 +0200 Subject: [PATCH 2/4] fixup --- lib/_http_outgoing.js | 65 +++++++++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 90a33785cc7343..64e0d4ab714d79 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -82,6 +82,7 @@ let debug = require('internal/util/debuglog').debuglog('http', (fn) => { }); const kCorked = Symbol('corked'); +const kSocket = Symbol('kSocket'); const kChunkedBuffer = Symbol('kChunkedBuffer'); const kChunkedLength = Symbol('kChunkedLength'); const kUniqueHeaders = Symbol('kUniqueHeaders'); @@ -146,7 +147,7 @@ function OutgoingMessage(options) { this[kChunkedLength] = 0; this._closed = false; - this.socket = null; + this[kSocket] = null; this._header = null; this[kOutHeaders] = null; @@ -181,7 +182,7 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableFinished', { return ( this.finished && this.outputSize === 0 && - (!this.socket || this.socket.writableLength === 0) + (!this[kSocket] || this[kSocket].writableLength === 0) ); }, }); @@ -196,14 +197,14 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableObjectMode', { ObjectDefineProperty(OutgoingMessage.prototype, 'writableLength', { __proto__: null, get() { - return this.outputSize + this[kChunkedLength] + (this.socket ? this.socket.writableLength : 0); + return this.outputSize + this[kChunkedLength] + (this[kSocket] ? this[kSocket].writableLength : 0); }, }); ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', { __proto__: null, get() { - return this.socket ? this.socket.writableHighWaterMark : this[kHighWaterMark]; + return this[kSocket] ? this[kSocket].writableHighWaterMark : this[kHighWaterMark]; }, }); @@ -238,14 +239,24 @@ ObjectDefineProperty(OutgoingMessage.prototype, '_headers', { ObjectDefineProperty(OutgoingMessage.prototype, 'connection', { __proto__: null, get: function() { - return this.socket; + return this[kSocket]; + }, + set: function(val) { + this.socket = val; + }, +}); + +ObjectDefineProperty(OutgoingMessage.prototype, 'socket', { + __proto__: null, + get: function() { + return this[kSocket]; }, set: function(val) { for (let n = 0; n < this[kCorked]; n++) { val?.cork(); - this.socket?.uncork(); + this[kSocket]?.uncork(); } - this.socket = val; + this[kSocket] = val; }, }); @@ -307,15 +318,15 @@ OutgoingMessage.prototype._renderHeaders = function _renderHeaders() { OutgoingMessage.prototype.cork = function() { this[kCorked]++; - if (this.socket) { - this.socket.cork(); + if (this[kSocket]) { + this[kSocket].cork(); } }; OutgoingMessage.prototype.uncork = function() { this[kCorked]--; - if (this.socket) { - this.socket.uncork(); + if (this[kSocket]) { + this[kSocket].uncork(); } if (this[kCorked] || this[kChunkedBuffer].length === 0) { @@ -353,12 +364,12 @@ OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) { this.on('timeout', callback); } - if (!this.socket) { + if (!this[kSocket]) { this.once('socket', function socketSetTimeoutOnConnect(socket) { socket.setTimeout(msecs); }); } else { - this.socket.setTimeout(msecs); + this[kSocket].setTimeout(msecs); } return this; }; @@ -375,8 +386,8 @@ OutgoingMessage.prototype.destroy = function destroy(error) { this[kErrored] = error; - if (this.socket) { - this.socket.destroy(error); + if (this[kSocket]) { + this[kSocket].destroy(error); } else { this.once('socket', function socketDestroyOnConnect(socket) { socket.destroy(error); @@ -415,7 +426,7 @@ OutgoingMessage.prototype._send = function _send(data, encoding, callback, byteL OutgoingMessage.prototype._writeRaw = _writeRaw; function _writeRaw(data, encoding, callback, size) { - const conn = this.socket; + const conn = this[kSocket]; if (conn && conn.destroyed) { // The socket was destroyed. If we're still trying to write to it, // then we haven't gotten the 'close' event yet. @@ -1062,8 +1073,8 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { return this; } - if (this.socket) { - this.socket.cork(); + if (this[kSocket]) { + this[kSocket].cork(); } write_(this, chunk, encoding, null, true); @@ -1077,8 +1088,8 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { } return this; } else if (!this._header) { - if (this.socket) { - this.socket.cork(); + if (this[kSocket]) { + this[kSocket].cork(); } this._contentLength = 0; @@ -1102,10 +1113,10 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { process.nextTick(finish); } - if (this.socket) { + if (this[kSocket]) { // Fully uncork connection on end(). - this.socket._writableState.corked = 1; - this.socket.uncork(); + this[kSocket]._writableState.corked = 1; + this[kSocket].uncork(); } this[kCorked] = 1; this.uncork(); @@ -1116,8 +1127,8 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { // everything to the socket. debug('outgoing message end.'); if (this.outputData.length === 0 && - this.socket && - this.socket._httpMessage === this) { + this[kSocket] && + this[kSocket]._httpMessage === this) { this._finish(); } @@ -1128,7 +1139,7 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { // This function is called once all user data are flushed to the socket. // Note that it has a chance that the socket is not drained. OutgoingMessage.prototype._finish = function _finish() { - assert(this.socket); + assert(this[kSocket]); this.emit('prefinish'); }; @@ -1153,7 +1164,7 @@ OutgoingMessage.prototype._finish = function _finish() { // This function, _flush(), is called by both the Server and Client // to attempt to flush any pending messages out to the socket. OutgoingMessage.prototype._flush = function _flush() { - const socket = this.socket; + const socket = this[kSocket]; if (socket && socket.writable) { // There might be remaining data in this.output; write it out From 8d4e4e959e60ad111238cd2ecc094d1417777dde Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 13 Oct 2023 19:13:00 +0200 Subject: [PATCH 3/4] Update _http_outgoing.js MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Vinicius Lourenço <12551007+H4ad@users.noreply.github.com> --- lib/_http_outgoing.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 64e0d4ab714d79..a761f020dc1eac 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -242,7 +242,7 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'connection', { return this[kSocket]; }, set: function(val) { - this.socket = val; + this[kSocket] = val; }, }); From 306513bdb159dda7470376097d5e7f5a05fb4cf0 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 13 Oct 2023 19:13:48 +0200 Subject: [PATCH 4/4] Update _http_outgoing.js --- lib/_http_outgoing.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index a761f020dc1eac..64e0d4ab714d79 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -242,7 +242,7 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'connection', { return this[kSocket]; }, set: function(val) { - this[kSocket] = val; + this.socket = val; }, });