Skip to content

Commit

Permalink
quiche: handle stream blockage during decodeHeaders and decodeTrailers (
Browse files Browse the repository at this point in the history
#16128)

Commit Message: use posted callback to block/unblock quic stream in EnvoyQuicStream::readDisable() with weak_ptr to stream to handle stream life time issue.
Additional Description: currently if readDisabled() is called in decodeHeaders|Trailers(), the stream will be blocked right away which breaks the assumption QUICHE has that a stream shouldn't be blocked during OnInitialHeadersComplete() and OnTrailingHeadersComplete().
This change makes the stream state change completely outside of QUICHE call stack. (The unblocking is already outside of QUICHE call stack in existing implementation.) Also simplify the blockage state change logic.

Risk Level: low
Testing: added more unit tests and enabled Http2UpstreamIntegrationTest::ManyLargeSimultaneousRequestWithBufferLimits which was flaky.

Part of #2557 #14829

Signed-off-by: Dan Zhang <[email protected]>
Co-authored-by: Dan Zhang <[email protected]>
  • Loading branch information
danzh2010 and danzh1989 authored Apr 29, 2021
1 parent 26a60c2 commit 9e263d8
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 96 deletions.
21 changes: 8 additions & 13 deletions source/common/quic/envoy_quic_client_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,16 @@ void EnvoyQuicClientStream::resetStream(Http::StreamResetReason reason) {
Reset(envoyResetReasonToQuicRstError(reason));
}

void EnvoyQuicClientStream::switchStreamBlockState(bool should_block) {
if (should_block) {
void EnvoyQuicClientStream::switchStreamBlockState() {
// From when the callback got scheduled till now, readDisable() might have blocked and unblocked
// the stream multiple times, but those actions haven't taken any effect yet, and only the last
// state of read_disable_counter_ determines whether to unblock or block the quic stream. Unlike
// Envoy readDisable() the quic stream gets blocked/unblocked based on the most recent call. So a
// stream will be blocked upon SetBlockedUntilFlush() no matter how many times SetUnblocked() was
// called before, and vice versa.
if (read_disable_counter_ > 0) {
sequencer()->SetBlockedUntilFlush();
} else {
ASSERT(read_disable_counter_ == 0, "readDisable called in between.");
sequencer()->SetUnblocked();
}
}
Expand Down Expand Up @@ -176,12 +181,9 @@ void EnvoyQuicClientStream::OnInitialHeadersComplete(bool fin, size_t frame_len,

void EnvoyQuicClientStream::OnBodyAvailable() {
ASSERT(FinishedReadingHeaders());
ASSERT(read_disable_counter_ == 0);
ASSERT(!in_decode_data_callstack_);
if (read_side_closed()) {
return;
}
in_decode_data_callstack_ = true;

Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
// TODO(danzh): check Envoy per stream buffer limit.
Expand Down Expand Up @@ -210,12 +212,6 @@ void EnvoyQuicClientStream::OnBodyAvailable() {
}

if (!sequencer()->IsClosed() || read_side_closed()) {
in_decode_data_callstack_ = false;
if (read_disable_counter_ > 0) {
// If readDisable() was ever called during decodeData() and it meant to disable
// reading from downstream, the call must have been deferred. Call it now.
switchStreamBlockState(true);
}
return;
}

Expand All @@ -224,7 +220,6 @@ void EnvoyQuicClientStream::OnBodyAvailable() {
maybeDecodeTrailers();

OnFinRead();
in_decode_data_callstack_ = false;
}

void EnvoyQuicClientStream::OnTrailingHeadersComplete(bool fin, size_t frame_len,
Expand Down
2 changes: 1 addition & 1 deletion source/common/quic/envoy_quic_client_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class EnvoyQuicClientStream : public quic::QuicSpdyClientStream,

protected:
// EnvoyQuicStream
void switchStreamBlockState(bool should_block) override;
void switchStreamBlockState() override;
uint32_t streamId() override;
Network::Connection* connection() override;

Expand Down
23 changes: 8 additions & 15 deletions source/common/quic/envoy_quic_server_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,16 @@ void EnvoyQuicServerStream::resetStream(Http::StreamResetReason reason) {
}
}

void EnvoyQuicServerStream::switchStreamBlockState(bool should_block) {
ASSERT(FinishedReadingHeaders(),
"Upperstream buffer limit is reached before request body is delivered.");
if (should_block) {
void EnvoyQuicServerStream::switchStreamBlockState() {
// From when the callback got scheduled till now, readDisable() might have blocked and unblocked
// the stream multiple times, but those actions haven't taken any effect yet, and only the last
// state of read_disable_counter_ determines whether to unblock or block the quic stream.
// Unlike Envoy readDisable() the quic stream gets blocked/unblocked based on the most recent
// call. So a stream will be blocked upon SetBlockedUntilFlush() no matter how many times
// SetUnblocked() was called before, and vice versa.
if (read_disable_counter_ > 0) {
sequencer()->SetBlockedUntilFlush();
} else {
ASSERT(read_disable_counter_ == 0, "readDisable called in between.");
sequencer()->SetUnblocked();
}
}
Expand Down Expand Up @@ -174,12 +177,9 @@ void EnvoyQuicServerStream::OnInitialHeadersComplete(bool fin, size_t frame_len,

void EnvoyQuicServerStream::OnBodyAvailable() {
ASSERT(FinishedReadingHeaders());
ASSERT(read_disable_counter_ == 0);
ASSERT(!in_decode_data_callstack_);
if (read_side_closed()) {
return;
}
in_decode_data_callstack_ = true;

Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
// TODO(danzh): check Envoy per stream buffer limit.
Expand Down Expand Up @@ -209,12 +209,6 @@ void EnvoyQuicServerStream::OnBodyAvailable() {
}

if (!sequencer()->IsClosed() || read_side_closed()) {
in_decode_data_callstack_ = false;
if (read_disable_counter_ > 0) {
// If readDisable() was ever called during decodeData() and it meant to disable
// reading from downstream, the call must have been deferred. Call it now.
switchStreamBlockState(true);
}
return;
}

Expand All @@ -223,7 +217,6 @@ void EnvoyQuicServerStream::OnBodyAvailable() {
maybeDecodeTrailers();

OnFinRead();
in_decode_data_callstack_ = false;
}

void EnvoyQuicServerStream::OnTrailingHeadersComplete(bool fin, size_t frame_len,
Expand Down
2 changes: 1 addition & 1 deletion source/common/quic/envoy_quic_server_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class EnvoyQuicServerStream : public quic::QuicSpdyServerStreamBase,

protected:
// EnvoyQuicStream
void switchStreamBlockState(bool should_block) override;
void switchStreamBlockState() override;
uint32_t streamId() override;
Network::Connection* connection() override;

Expand Down
60 changes: 22 additions & 38 deletions source/common/quic/envoy_quic_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ class EnvoyQuicStream : public virtual Http::StreamEncoder,
: stats_(stats), http3_options_(http3_options),
send_buffer_simulation_(buffer_limit / 2, buffer_limit, std::move(below_low_watermark),
std::move(above_high_watermark), ENVOY_LOGGER()),
filter_manager_connection_(filter_manager_connection) {}
filter_manager_connection_(filter_manager_connection),
async_stream_blockage_change_(
filter_manager_connection.dispatcher().createSchedulableCallback(
[this]() { switchStreamBlockState(); })) {}

~EnvoyQuicStream() override = default;

Expand All @@ -70,31 +73,15 @@ class EnvoyQuicStream : public virtual Http::StreamEncoder,
}
}

if (status_changed && !in_decode_data_callstack_) {
// Avoid calling this while decoding data because transient disabling and
// enabling reading may trigger another decoding data inside the
// callstack which messes up stream state.
if (disable) {
// Block QUIC stream right away. And if there are queued switching
// state callback, update the desired state as well.
switchStreamBlockState(true);
if (unblock_posted_) {
should_block_ = true;
}
} else {
should_block_ = false;
if (!unblock_posted_) {
// If this is the first time unblocking stream is desired, post a
// callback to do it in next loop. This is because unblocking QUIC
// stream can lead to immediate upstream encoding.
unblock_posted_ = true;
connection()->dispatcher().post([this] {
unblock_posted_ = false;
switchStreamBlockState(should_block_);
});
}
}
if (!status_changed) {
return;
}

// If the status transiently changed from unblocked to blocked and then unblocked, the quic
// stream will be spuriously unblocked and call OnDataAvailable(). This call shouldn't take any
// effect because any available data should have been processed already upon arrival or they
// were blocked by some condition other than flow control, i.e. Qpack decoding.
async_stream_blockage_change_->scheduleCallbackNextIteration();
}

void addCallbacks(Http::StreamCallbacks& callbacks) override {
Expand Down Expand Up @@ -139,7 +126,7 @@ class EnvoyQuicStream : public virtual Http::StreamEncoder,
absl::string_view responseDetails() override { return details_; }

protected:
virtual void switchStreamBlockState(bool should_block) PURE;
virtual void switchStreamBlockState() PURE;

// Needed for ENVOY_STREAM_LOG.
virtual uint32_t streamId() PURE;
Expand All @@ -149,10 +136,12 @@ class EnvoyQuicStream : public virtual Http::StreamEncoder,
// notified more than once about end of stream. So once this is true, no need
// to set it in the callback to Envoy stream any more.
bool end_stream_decoded_{false};
// The latest state a QUIC stream blockage state change callback should look at. As
// more readDisable() calls may happen between the callback is posted and it's
// executed, the stream might be unblocked and blocked several times. If this
// counter is 0, the callback should unblock the stream. Otherwise it should
// block the stream.
uint32_t read_disable_counter_{0u};
// If true, switchStreamBlockState() should be deferred till this variable
// becomes false.
bool in_decode_data_callstack_{false};

Http::Http3::CodecStats& stats_;
const envoy::config::core::v3::Http3ProtocolOptions& http3_options_;
Expand All @@ -169,16 +158,11 @@ class EnvoyQuicStream : public virtual Http::StreamEncoder,
// directly and buffers them in filters if needed. Itself doesn't buffer request data.
EnvoyQuicSimulatedWatermarkBuffer send_buffer_simulation_;

// True if there is posted unblocking QUIC stream callback. There should be
// only one such callback no matter how many times readDisable() is called.
bool unblock_posted_{false};
// The latest state an unblocking QUIC stream callback should look at. As
// more readDisable() calls may happen between the callback is posted and it's
// executed, the stream might be unblocked and blocked several times. Only the
// latest desired state should be considered by the callback.
bool should_block_{false};

QuicFilterManagerConnectionImpl& filter_manager_connection_;
// Used to block or unblock stream in the next event loop. QUICHE doesn't like stream blockage
// state change in its own call stack. And Envoy upstream doesn't like quic stream to be unblocked
// in its callstack either because the stream will push data right away.
Event::SchedulableCallbackPtr async_stream_blockage_change_;
};

} // namespace Quic
Expand Down
2 changes: 1 addition & 1 deletion source/docs/quiche_integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The HCM will call encoder's encodeHeaders() to write response headers, and then

All arrived out-of-order data is buffered in QUICHE stream. This buffer is capped by max stream flow control window in QUICHE which is 16MB. Once bytes are put in sequence and ready to be used, OnBodyDataAvailable() is called. The stream implementation overrides this call and calls StreamDecoder::decodeData() in it. Request and response body are buffered in each L7 filter if desired, and the stream itself doesn't buffer any of them unless set as read blocked.

When upstream or any L7 filter reaches its buffer limit, it will call Http::Stream::readDisable() with false to set QUIC stream to be read blocked. In this state, even if more request/response body is available to be delivered, OnBodyDataAvailable() will not be called. As a result, downstream flow control will not shift as no data will be consumed. As both filters and upstream buffers can call readDisable(), each stream has a counter of how many times the HCM blocks the stream. When the counter is cleared, the stream will set its state to unblocked and thus deliver any new and existing available data buffered in the QUICHE stream.
When upstream or any L7 filter reaches its buffer limit, it will call Http::Stream::readDisable() with false to set QUIC stream to be read blocked. This takes effect in the next event loop. Once the stream is read blocked, even if more request/response body is available to be delivered, OnBodyDataAvailable() will not be called. As a result, downstream flow control will not shift as no data will be consumed. As both filters and upstream buffers can call readDisable(), each stream has a counter of how many times the HCM blocks the stream. When the counter is cleared, the stream will set its state to unblocked also in the next event loop and thus deliver any new and existing available data buffered in the QUICHE stream object. readDisable() can be called to block and unblock stream multiple times within one event loop, the stream blockage state is determined by the final state of the counter.

#### Send buffer

Expand Down
29 changes: 29 additions & 0 deletions test/common/quic/envoy_quic_client_stream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class EnvoyQuicClientStreamTest : public testing::TestWithParam<bool> {

void SetUp() override {
quic_session_.Initialize();
quic_connection_->setEnvoyConnection(quic_session_);
setQuicConfigWithDefaultValues(quic_session_.config());
quic_session_.OnConfigNegotiated();
quic_connection_->setUpConnectionSocket();
Expand Down Expand Up @@ -576,5 +577,33 @@ TEST_P(EnvoyQuicClientStreamTest, CloseConnectionDuringDecodingTrailer) {
}
}

// Tests that posted stream block callback won't cause use-after-free crash.
TEST_P(EnvoyQuicClientStreamTest, ReadDisabledBeforeClose) {
const auto result = quic_stream_->encodeHeaders(request_headers_, /*end_stream=*/true);
EXPECT_TRUE(result.ok());

EXPECT_CALL(stream_decoder_, decodeHeaders_(_, /*end_stream=*/!quic::VersionUsesHttp3(
quic_version_.transport_version)))
.WillOnce(Invoke([this](const Http::ResponseHeaderMapPtr& headers, bool) {
EXPECT_EQ("200", headers->getStatusValue());
quic_stream_->readDisable(true);
}));
if (quic_version_.UsesHttp3()) {
EXPECT_CALL(stream_decoder_, decodeData(BufferStringEqual(""), /*end_stream=*/true));
std::string payload = spdyHeaderToHttp3StreamPayload(spdy_response_headers_);
quic::QuicStreamFrame frame(stream_id_, true, 0, payload);
quic_stream_->OnStreamFrame(frame);
} else {
quic_stream_->OnStreamHeaderList(/*fin=*/true, response_headers_.uncompressed_header_bytes(),
response_headers_);
}
// Reset to close the stream.
EXPECT_CALL(stream_callbacks_, onResetStream(Http::StreamResetReason::LocalReset, _));
quic_stream_->resetStream(Http::StreamResetReason::LocalReset);
EXPECT_EQ(1u, quic_session_.closed_streams()->size());
quic_session_.closed_streams()->clear();
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}

} // namespace Quic
} // namespace Envoy
Loading

0 comments on commit 9e263d8

Please sign in to comment.