Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v14.x] http2: use and support non-empty DATA frame with END_STREAM flag #34838

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 82 additions & 22 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,7 @@ class Http2Session extends EventEmitter {
streams: new Map(),
pendingStreams: new Set(),
pendingAck: 0,
shutdownWritableCalled: false,
writeQueueSize: 0,
originSet: undefined
};
Expand Down Expand Up @@ -1702,6 +1703,26 @@ function afterShutdown(status) {
stream[kMaybeDestroy]();
}

function shutdownWritable(callback) {
const handle = this[kHandle];
if (!handle) return callback();
const state = this[kState];
if (state.shutdownWritableCalled) {
// Backport v14.x: Session required for debugging stream object
// debugStreamObj(this, 'shutdownWritable() already called');
return callback();
}
state.shutdownWritableCalled = true;

const req = new ShutdownWrap();
req.oncomplete = afterShutdown;
req.callback = callback;
req.handle = handle;
const err = handle.shutdown(req);
if (err === 1) // synchronous finish
return afterShutdown.call(req, 0);
}

function finishSendTrailers(stream, headersList) {
// The stream might be destroyed and in that case
// there is nothing to do.
Expand Down Expand Up @@ -1962,10 +1983,48 @@ class Http2Stream extends Duplex {

let req;

let waitingForWriteCallback = true;
let waitingForEndCheck = true;
let writeCallbackErr;
let endCheckCallbackErr;
const done = () => {
if (waitingForEndCheck || waitingForWriteCallback) return;
const err = writeCallbackErr || endCheckCallbackErr;
// writeGeneric does not destroy on error and
// we cannot enable autoDestroy,
// so make sure to destroy on error.
if (err) {
this.destroy(err);
}
cb(err);
};
const writeCallback = (err) => {
waitingForWriteCallback = false;
writeCallbackErr = err;
done();
};
const endCheckCallback = (err) => {
waitingForEndCheck = false;
endCheckCallbackErr = err;
done();
};
// Shutdown write stream right after last chunk is sent
// so final DATA frame can include END_STREAM flag
process.nextTick(() => {
if (writeCallbackErr ||
!this._writableState.ending ||
this._writableState.buffered.length ||
(this[kState].flags & STREAM_FLAGS_HAS_TRAILERS))
return endCheckCallback();
// Backport v14.x: Session required for debugging stream object
// debugStreamObj(this, 'shutting down writable on last write');
shutdownWritable.call(this, endCheckCallback);
});

if (writev)
req = writevGeneric(this, data, cb);
req = writevGeneric(this, data, writeCallback);
else
req = writeGeneric(this, data, encoding, cb);
req = writeGeneric(this, data, encoding, writeCallback);

trackWriteState(this, req.bytes);
}
Expand All @@ -1979,21 +2038,13 @@ class Http2Stream extends Duplex {
}

_final(cb) {
const handle = this[kHandle];
if (this.pending) {
this.once('ready', () => this._final(cb));
} else if (handle !== undefined) {
debugStreamObj(this, '_final shutting down');
const req = new ShutdownWrap();
req.oncomplete = afterShutdown;
req.callback = cb;
req.handle = handle;
const err = handle.shutdown(req);
if (err === 1) // synchronous finish
return afterShutdown.call(req, 0);
} else {
cb();
return;
}
// Backport v14.x: Session required for debugging stream object
// debugStreamObj(this, 'shutting down writable on _final');
shutdownWritable.call(this, cb);
}

_read(nread) {
Expand Down Expand Up @@ -2098,11 +2149,20 @@ class Http2Stream extends Duplex {
debugStream(this[kID] || 'pending', session[kType], 'destroying stream');

const state = this[kState];
const sessionCode = session[kState].goawayCode ||
session[kState].destroyCode;
const code = err != null ?
sessionCode || NGHTTP2_INTERNAL_ERROR :
state.rstCode || sessionCode;
const sessionState = session[kState];
const sessionCode = sessionState.goawayCode || sessionState.destroyCode;

// If a stream has already closed successfully, there is no error
// to report from this stream, even if the session has errored.
// This can happen if the stream was already in process of destroying
// after a successful close, but the session had a error between
// this stream's close and destroy operations.
// Previously, this always overrode a successful close operation code
// NGHTTP2_NO_ERROR (0) with sessionCode because the use of the || operator.
const code = (err != null ?
(sessionCode || NGHTTP2_INTERNAL_ERROR) :
(this.closed ? this.rstCode : sessionCode)
);
const hasHandle = handle !== undefined;

if (!this.closed)
Expand All @@ -2111,13 +2171,13 @@ class Http2Stream extends Duplex {

if (hasHandle) {
handle.destroy();
session[kState].streams.delete(id);
sessionState.streams.delete(id);
} else {
session[kState].pendingStreams.delete(this);
sessionState.pendingStreams.delete(this);
}

// Adjust the write queue size for accounting
session[kState].writeQueueSize -= state.writeQueueSize;
sessionState.writeQueueSize -= state.writeQueueSize;
state.writeQueueSize = 0;

// RST code 8 not emitted as an error as its used by clients to signify
Expand Down
13 changes: 7 additions & 6 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ ssize_t Http2Session::OnMaxFrameSizePadding(size_t frameLen,
// quite expensive. This is a potential performance optimization target later.
ssize_t Http2Session::ConsumeHTTP2Data() {
CHECK_NOT_NULL(stream_buf_.base);
CHECK_LT(stream_buf_offset_, stream_buf_.len);
CHECK_LE(stream_buf_offset_, stream_buf_.len);
size_t read_len = stream_buf_.len - stream_buf_offset_;

// multiple side effects.
Expand All @@ -753,11 +753,11 @@ ssize_t Http2Session::ConsumeHTTP2Data() {
CHECK_GT(ret, 0);
CHECK_LE(static_cast<size_t>(ret), read_len);

if (static_cast<size_t>(ret) < read_len) {
// Mark the remainder of the data as available for later consumption.
stream_buf_offset_ += ret;
return ret;
}
// Mark the remainder of the data as available for later consumption.
// Even if all bytes were received, a paused stream may delay the
// nghttp2_on_frame_recv_callback which may have an END_STREAM flag.
stream_buf_offset_ += ret;
return ret;
}

// We are done processing the current input chunk.
Expand Down Expand Up @@ -1093,6 +1093,7 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
if (session->is_write_in_progress()) {
CHECK(session->is_reading_stopped());
session->set_receive_paused();
Debug(session, "receive paused");
return NGHTTP2_ERR_PAUSE;
}

Expand Down
56 changes: 39 additions & 17 deletions test/parallel/test-http2-misbehaving-multiplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Flags: --expose-internals

const common = require('../common');
const assert = require('assert');

if (!common.hasCrypto)
common.skip('missing crypto');
Expand All @@ -13,16 +14,36 @@ const h2test = require('../common/http2');
let client;

const server = h2.createServer();
let gotFirstStreamId1;
server.on('stream', common.mustCall((stream) => {
stream.respond();
stream.end('ok');

// The error will be emitted asynchronously
stream.on('error', common.expectsError({
constructor: NghttpError,
code: 'ERR_HTTP2_ERROR',
message: 'Stream was already closed or invalid'
}));
// Http2Server should be fast enough to respond to and close
// the first streams with ID 1 and ID 3 without errors.

// Test for errors in 'close' event to ensure no errors on some streams.
stream.on('error', () => {});
stream.on('close', (err) => {
if (stream.id === 1) {
if (gotFirstStreamId1) {
// We expect our outgoing frames to fail on Stream ID 1 the second time
// because a stream with ID 1 was already closed before.
common.expectsError({
constructor: NghttpError,
code: 'ERR_HTTP2_ERROR',
message: 'Stream was already closed or invalid'
});
return;
}
gotFirstStreamId1 = true;
}
assert.strictEqual(err, undefined);
});

// Stream ID 5 should never reach the server
assert.notStrictEqual(stream.id, 5);

}, 2));

server.on('session', common.mustCall((session) => {
Expand All @@ -35,26 +56,27 @@ server.on('session', common.mustCall((session) => {

const settings = new h2test.SettingsFrame();
const settingsAck = new h2test.SettingsFrame(true);
const head1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
const head2 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
const head3 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
const head4 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);
// HeadersFrame(id, payload, padding, END_STREAM)
const id1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
const id3 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
const id5 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);

server.listen(0, () => {
client = net.connect(server.address().port, () => {
client.write(h2test.kClientMagic, () => {
client.write(settings.data, () => {
client.write(settingsAck.data);
// This will make it ok.
client.write(head1.data, () => {
// This will make it ok.
client.write(head2.data, () => {
// Stream ID 1 frame will make it OK.
client.write(id1.data, () => {
// Stream ID 3 frame will make it OK.
client.write(id3.data, () => {
// A second Stream ID 1 frame should fail.
// This will cause an error to occur because the client is
// attempting to reuse an already closed stream. This must
// cause the server session to be torn down.
client.write(head3.data, () => {
// This won't ever make it to the server
client.write(head4.data);
client.write(id1.data, () => {
// This Stream ID 5 frame will never make it to the server
client.write(id5.data);
});
});
});
Expand Down
61 changes: 61 additions & 0 deletions test/parallel/test-http2-pack-end-stream-flag.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const http2 = require('http2');

const { PerformanceObserver } = require('perf_hooks');

const server = http2.createServer();

server.on('stream', (stream, headers) => {
stream.respond({
'content-type': 'text/html',
':status': 200
});
switch (headers[':path']) {
case '/singleEnd':
stream.end('OK');
break;
case '/sequentialEnd':
stream.write('OK');
stream.end();
break;
case '/delayedEnd':
stream.write('OK', () => stream.end());
break;
}
});

function testRequest(path, targetFrameCount, callback) {
const obs = new PerformanceObserver((list, observer) => {
const entry = list.getEntries()[0];
if (entry.name !== 'Http2Session') return;
if (entry.type !== 'client') return;
assert.strictEqual(entry.framesReceived, targetFrameCount);
observer.disconnect();
callback();
});
obs.observe({ entryTypes: ['http2'] });
const client = http2.connect(`http://localhost:${server.address().port}`, () => {
const req = client.request({ ':path': path });
req.resume();
req.end();
req.on('end', () => client.close());
});
}

// SETTINGS => SETTINGS => HEADERS => DATA
const MIN_FRAME_COUNT = 4;

server.listen(0, () => {
testRequest('/singleEnd', MIN_FRAME_COUNT, () => {
testRequest('/sequentialEnd', MIN_FRAME_COUNT, () => {
testRequest('/delayedEnd', MIN_FRAME_COUNT + 1, () => {
server.close();
});
});
});
});
2 changes: 1 addition & 1 deletion test/parallel/test-http2-padding-aligned.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const makeDuplexPair = require('../common/duplexpair');
// The lengths of the expected writes... note that this is highly
// sensitive to how the internals are implemented.
const serverLengths = [24, 9, 9, 32];
const clientLengths = [9, 9, 48, 9, 1, 21, 1, 16];
const clientLengths = [9, 9, 48, 9, 1, 21, 1];

// Adjust for the 24-byte preamble and two 9-byte settings frames, and
// the result must be equally divisible by 8
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-http2-perf_hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const obs = new PerformanceObserver(common.mustCall((items) => {
break;
case 'client':
assert.strictEqual(entry.streamCount, 1);
assert.strictEqual(entry.framesReceived, 8);
assert.strictEqual(entry.framesReceived, 7);
break;
default:
assert.fail('invalid Http2Session type');
Expand Down