From 38ced99f52e5f780eaf486f8cd2525b415399505 Mon Sep 17 00:00:00 2001 From: birenroy Date: Wed, 18 Jan 2023 16:20:11 -0500 Subject: [PATCH] http2: simplifies CodecImpl by implementing http2::adapter::DataFrameSource directly (#24881) * Simplifies CodecImpl by implementing http2::adapter::DataFrameSource directly. Signed-off-by: Biren Roy --- source/common/http/http2/codec_impl.cc | 115 ++++++++----------------- source/common/http/http2/codec_impl.h | 26 ++++-- 2 files changed, 57 insertions(+), 84 deletions(-) diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index 74eac32509f1..f911283ea77c 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -248,18 +248,8 @@ void ConnectionImpl::ServerStreamImpl::encode1xxHeaders(const ResponseHeaderMap& } void ConnectionImpl::StreamImpl::encodeHeadersBase(const HeaderMap& headers, bool end_stream) { - nghttp2_data_provider provider; - if (!end_stream) { - provider.source.ptr = this; - provider.read_callback = [](nghttp2_session*, int32_t, uint8_t*, size_t length, - uint32_t* data_flags, nghttp2_data_source* source, - void*) -> ssize_t { - return static_cast(source->ptr)->onDataSourceRead(length, data_flags); - }; - } - local_end_stream_ = end_stream; - submitHeaders(headers, end_stream ? nullptr : &provider); + submitHeaders(headers, end_stream); if (parent_.sendPendingFramesAndHandleError()) { // Intended to check through coverage that this error case is tested return; @@ -638,83 +628,59 @@ void ConnectionImpl::StreamImpl::submitTrailers(const HeaderMap& trailers) { parent_.adapter_->SubmitTrailer(stream_id_, final_headers); } -ssize_t ConnectionImpl::StreamImpl::onDataSourceRead(uint64_t length, uint32_t* data_flags) { - if (pending_send_data_->length() == 0 && !local_end_stream_) { - ASSERT(!data_deferred_); - data_deferred_ = true; - return NGHTTP2_ERR_DEFERRED; +std::pair +ConnectionImpl::StreamDataFrameSource::SelectPayloadLength(size_t max_length) { + if (stream_.pending_send_data_->length() == 0 && !stream_.local_end_stream_) { + ASSERT(!stream_.data_deferred_); + stream_.data_deferred_ = true; + return {kBlocked, false}; } else { - *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY; - if (local_end_stream_ && pending_send_data_->length() <= length) { - *data_flags |= NGHTTP2_DATA_FLAG_EOF; - if (pending_trailers_to_encode_) { - // We need to tell the library to not set end stream so that we can emit the trailers. - *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM; - submitTrailers(*pending_trailers_to_encode_); - pending_trailers_to_encode_.reset(); + const size_t length = std::min(max_length, stream_.pending_send_data_->length()); + bool end_data = false; + if (stream_.local_end_stream_ && length == stream_.pending_send_data_->length()) { + end_data = true; + if (stream_.pending_trailers_to_encode_) { + send_fin_ = false; + stream_.submitTrailers(*stream_.pending_trailers_to_encode_); + stream_.pending_trailers_to_encode_.reset(); } } - - return std::min(length, pending_send_data_->length()); + return {static_cast(length), end_data}; } } -void ConnectionImpl::StreamImpl::onDataSourceSend(const uint8_t* framehd, size_t length) { - // In this callback we are writing out a raw DATA frame without copying. nghttp2 assumes that we - // "just know" that the frame header is 9 bytes. - // https://nghttp2.org/documentation/types.html#c.nghttp2_send_data_callback - - parent_.protocol_constraints_.incrementOutboundDataFrameCount(); +bool ConnectionImpl::StreamDataFrameSource::Send(absl::string_view frame_header, + size_t payload_length) { + stream_.parent_.protocol_constraints_.incrementOutboundDataFrameCount(); Buffer::OwnedImpl output; - parent_.addOutboundFrameFragment(output, framehd, H2_FRAME_HEADER_SIZE); - if (!parent_.protocol_constraints_.checkOutboundFrameLimits().ok()) { + stream_.parent_.addOutboundFrameFragment( + output, reinterpret_cast(frame_header.data()), frame_header.size()); + if (!stream_.parent_.protocol_constraints_.checkOutboundFrameLimits().ok()) { ENVOY_CONN_LOG(debug, "error sending data frame: Too many frames in the outbound queue", - parent_.connection_); - setDetails(Http2ResponseCodeDetails::get().outbound_frame_flood); + stream_.parent_.connection_); + stream_.setDetails(Http2ResponseCodeDetails::get().outbound_frame_flood); } - parent_.stats_.pending_send_bytes_.sub(length); - output.move(*pending_send_data_, length); - parent_.connection_.write(output, false); + stream_.parent_.stats_.pending_send_bytes_.sub(payload_length); + output.move(*stream_.pending_send_data_, payload_length); + stream_.parent_.connection_.write(output, false); + return true; } -void ConnectionImpl::ClientStreamImpl::submitHeaders(const HeaderMap& headers, - nghttp2_data_provider* provider) { +void ConnectionImpl::ClientStreamImpl::submitHeaders(const HeaderMap& headers, bool end_stream) { ASSERT(stream_id_ == -1); - // TODO(birenroy): Once using the new wrapper, migrate callers from nghttp2_data_provider to - // DataFrameSource. - std::unique_ptr data_frame_source; - if (provider) { - data_frame_source = http2::adapter::MakeZeroCopyDataFrameSource( - *provider, &parent_.connection_, - [](nghttp2_session*, nghttp2_frame* frame, const uint8_t* framehd, size_t length, - nghttp2_data_source* source, void*) -> int { - ASSERT(frame->data.padlen == 0); - static_cast(source->ptr)->onDataSourceSend(framehd, length); - return 0; - }); - } - stream_id_ = - parent_.adapter_->SubmitRequest(buildHeaders(headers), std::move(data_frame_source), base()); + stream_id_ = parent_.adapter_->SubmitRequest( + buildHeaders(headers), end_stream ? nullptr : std::make_unique(*this), + base()); ASSERT(stream_id_ > 0); } -void ConnectionImpl::ServerStreamImpl::submitHeaders(const HeaderMap& headers, - nghttp2_data_provider* provider) { +void ConnectionImpl::ServerStreamImpl::submitHeaders(const HeaderMap& headers, bool end_stream) { ASSERT(stream_id_ != -1); - std::unique_ptr data_frame_source; - if (provider) { - data_frame_source = http2::adapter::MakeZeroCopyDataFrameSource( - *provider, &parent_.connection_, - [](nghttp2_session*, nghttp2_frame* frame, const uint8_t* framehd, size_t length, - nghttp2_data_source* source, void*) -> int { - ASSERT(frame->data.padlen == 0); - static_cast(source->ptr)->onDataSourceSend(framehd, length); - return 0; - }); - } - parent_.adapter_->SubmitResponse(stream_id_, buildHeaders(headers), std::move(data_frame_source)); + parent_.adapter_->SubmitResponse(stream_id_, buildHeaders(headers), + end_stream ? nullptr + : std::make_unique(*this)); } void ConnectionImpl::StreamImpl::onPendingFlushTimer() { @@ -1646,15 +1612,6 @@ ConnectionImpl::Http2Callbacks::Http2Callbacks() { return static_cast(user_data)->onSend(data, length); }); - nghttp2_session_callbacks_set_send_data_callback( - callbacks_, - [](nghttp2_session*, nghttp2_frame* frame, const uint8_t* framehd, size_t length, - nghttp2_data_source* source, void*) -> int { - ASSERT(frame->data.padlen == 0); - static_cast(source->ptr)->onDataSourceSend(framehd, length); - return 0; - }); - nghttp2_session_callbacks_set_on_begin_headers_callback( callbacks_, [](nghttp2_session*, const nghttp2_frame* frame, void* user_data) -> int { auto status = static_cast(user_data)->onBeginHeaders(frame); diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index eb1e34ab1329..3b0de0914964 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -212,14 +212,12 @@ class ConnectionImpl : public virtual Connection, void onPendingFlushTimer() override; StreamImpl* base() { return this; } - ssize_t onDataSourceRead(uint64_t length, uint32_t* data_flags); - void onDataSourceSend(const uint8_t* framehd, size_t length); void resetStreamWorker(StreamResetReason reason); static void buildHeaders(std::vector& final_headers, const HeaderMap& headers); static std::vector buildHeaders(const HeaderMap& headers); void saveHeader(HeaderString&& name, HeaderString&& value); void encodeHeadersBase(const HeaderMap& headers, bool end_stream); - virtual void submitHeaders(const HeaderMap& headers, nghttp2_data_provider* provider) PURE; + virtual void submitHeaders(const HeaderMap& headers, bool end_stream) PURE; void encodeTrailersBase(const HeaderMap& headers); void submitTrailers(const HeaderMap& trailers); // Returns true if the stream should defer the local reset stream until after the next call to @@ -400,6 +398,24 @@ class ConnectionImpl : public virtual Connection, void grantPeerAdditionalStreamWindow(); }; + // Encapsulates the logic for sending DATA frames on a given stream. + class StreamDataFrameSource : public http2::adapter::DataFrameSource { + public: + explicit StreamDataFrameSource(StreamImpl& stream) : stream_(stream) {} + + // Returns a pair of the next payload length, and whether that payload is the end of the data + // for this stream. + std::pair SelectPayloadLength(size_t max_length) override; + // Queues the frame header and a DATA frame payload of the specified length for writing. + bool Send(absl::string_view frame_header, size_t payload_length) override; + // Whether the codec should send the END_STREAM flag on the final DATA frame. + bool send_fin() const override { return send_fin_; } + + private: + StreamImpl& stream_; + bool send_fin_ = true; + }; + using StreamImplPtr = std::unique_ptr; /** @@ -416,7 +432,7 @@ class ConnectionImpl : public virtual Connection, // to flush would be covered by a request/stream/etc. timeout. void setFlushTimeout(std::chrono::milliseconds /*timeout*/) override {} // StreamImpl - void submitHeaders(const HeaderMap& headers, nghttp2_data_provider* provider) override; + void submitHeaders(const HeaderMap& headers, bool end_stream) override; // Do not use deferred reset on upstream connections. bool useDeferredReset() const override { return false; } StreamDecoder& decoder() override { return response_decoder_; } @@ -468,7 +484,7 @@ class ConnectionImpl : public virtual Connection, // StreamImpl void destroy() override; - void submitHeaders(const HeaderMap& headers, nghttp2_data_provider* provider) override; + void submitHeaders(const HeaderMap& headers, bool end_stream) override; // Enable deferred reset on downstream connections so outbound HTTP internal error replies are // written out before force resetting the stream, assuming there is enough H2 connection flow // control window is available.