Skip to content

Commit

Permalink
http2: simplifies CodecImpl by implementing http2::adapter::DataFrame…
Browse files Browse the repository at this point in the history
…Source directly (envoyproxy#24881)

* Simplifies CodecImpl by implementing http2::adapter::DataFrameSource directly.

Signed-off-by: Biren Roy <[email protected]>
  • Loading branch information
birenroy authored Jan 18, 2023
1 parent 742a3b2 commit 38ced99
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 84 deletions.
115 changes: 36 additions & 79 deletions source/common/http/http2/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,18 +248,8 @@ void ConnectionImpl::ServerStreamImpl::encode1xxHeaders(const ResponseHeaderMap&
}

void ConnectionImpl::StreamImpl::encodeHeadersBase(const HeaderMap& headers, bool end_stream) {
nghttp2_data_provider provider;
if (!end_stream) {
provider.source.ptr = this;
provider.read_callback = [](nghttp2_session*, int32_t, uint8_t*, size_t length,
uint32_t* data_flags, nghttp2_data_source* source,
void*) -> ssize_t {
return static_cast<StreamImpl*>(source->ptr)->onDataSourceRead(length, data_flags);
};
}

local_end_stream_ = end_stream;
submitHeaders(headers, end_stream ? nullptr : &provider);
submitHeaders(headers, end_stream);
if (parent_.sendPendingFramesAndHandleError()) {
// Intended to check through coverage that this error case is tested
return;
Expand Down Expand Up @@ -638,83 +628,59 @@ void ConnectionImpl::StreamImpl::submitTrailers(const HeaderMap& trailers) {
parent_.adapter_->SubmitTrailer(stream_id_, final_headers);
}

ssize_t ConnectionImpl::StreamImpl::onDataSourceRead(uint64_t length, uint32_t* data_flags) {
if (pending_send_data_->length() == 0 && !local_end_stream_) {
ASSERT(!data_deferred_);
data_deferred_ = true;
return NGHTTP2_ERR_DEFERRED;
std::pair<int64_t, bool>
ConnectionImpl::StreamDataFrameSource::SelectPayloadLength(size_t max_length) {
if (stream_.pending_send_data_->length() == 0 && !stream_.local_end_stream_) {
ASSERT(!stream_.data_deferred_);
stream_.data_deferred_ = true;
return {kBlocked, false};
} else {
*data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
if (local_end_stream_ && pending_send_data_->length() <= length) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
if (pending_trailers_to_encode_) {
// We need to tell the library to not set end stream so that we can emit the trailers.
*data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
submitTrailers(*pending_trailers_to_encode_);
pending_trailers_to_encode_.reset();
const size_t length = std::min<size_t>(max_length, stream_.pending_send_data_->length());
bool end_data = false;
if (stream_.local_end_stream_ && length == stream_.pending_send_data_->length()) {
end_data = true;
if (stream_.pending_trailers_to_encode_) {
send_fin_ = false;
stream_.submitTrailers(*stream_.pending_trailers_to_encode_);
stream_.pending_trailers_to_encode_.reset();
}
}

return std::min(length, pending_send_data_->length());
return {static_cast<int64_t>(length), end_data};
}
}

void ConnectionImpl::StreamImpl::onDataSourceSend(const uint8_t* framehd, size_t length) {
// In this callback we are writing out a raw DATA frame without copying. nghttp2 assumes that we
// "just know" that the frame header is 9 bytes.
// https://nghttp2.org/documentation/types.html#c.nghttp2_send_data_callback

parent_.protocol_constraints_.incrementOutboundDataFrameCount();
bool ConnectionImpl::StreamDataFrameSource::Send(absl::string_view frame_header,
size_t payload_length) {
stream_.parent_.protocol_constraints_.incrementOutboundDataFrameCount();

Buffer::OwnedImpl output;
parent_.addOutboundFrameFragment(output, framehd, H2_FRAME_HEADER_SIZE);
if (!parent_.protocol_constraints_.checkOutboundFrameLimits().ok()) {
stream_.parent_.addOutboundFrameFragment(
output, reinterpret_cast<const uint8_t*>(frame_header.data()), frame_header.size());
if (!stream_.parent_.protocol_constraints_.checkOutboundFrameLimits().ok()) {
ENVOY_CONN_LOG(debug, "error sending data frame: Too many frames in the outbound queue",
parent_.connection_);
setDetails(Http2ResponseCodeDetails::get().outbound_frame_flood);
stream_.parent_.connection_);
stream_.setDetails(Http2ResponseCodeDetails::get().outbound_frame_flood);
}

parent_.stats_.pending_send_bytes_.sub(length);
output.move(*pending_send_data_, length);
parent_.connection_.write(output, false);
stream_.parent_.stats_.pending_send_bytes_.sub(payload_length);
output.move(*stream_.pending_send_data_, payload_length);
stream_.parent_.connection_.write(output, false);
return true;
}

void ConnectionImpl::ClientStreamImpl::submitHeaders(const HeaderMap& headers,
nghttp2_data_provider* provider) {
void ConnectionImpl::ClientStreamImpl::submitHeaders(const HeaderMap& headers, bool end_stream) {
ASSERT(stream_id_ == -1);
// TODO(birenroy): Once using the new wrapper, migrate callers from nghttp2_data_provider to
// DataFrameSource.
std::unique_ptr<http2::adapter::DataFrameSource> data_frame_source;
if (provider) {
data_frame_source = http2::adapter::MakeZeroCopyDataFrameSource(
*provider, &parent_.connection_,
[](nghttp2_session*, nghttp2_frame* frame, const uint8_t* framehd, size_t length,
nghttp2_data_source* source, void*) -> int {
ASSERT(frame->data.padlen == 0);
static_cast<StreamImpl*>(source->ptr)->onDataSourceSend(framehd, length);
return 0;
});
}
stream_id_ =
parent_.adapter_->SubmitRequest(buildHeaders(headers), std::move(data_frame_source), base());
stream_id_ = parent_.adapter_->SubmitRequest(
buildHeaders(headers), end_stream ? nullptr : std::make_unique<StreamDataFrameSource>(*this),
base());
ASSERT(stream_id_ > 0);
}

void ConnectionImpl::ServerStreamImpl::submitHeaders(const HeaderMap& headers,
nghttp2_data_provider* provider) {
void ConnectionImpl::ServerStreamImpl::submitHeaders(const HeaderMap& headers, bool end_stream) {
ASSERT(stream_id_ != -1);
std::unique_ptr<http2::adapter::DataFrameSource> data_frame_source;
if (provider) {
data_frame_source = http2::adapter::MakeZeroCopyDataFrameSource(
*provider, &parent_.connection_,
[](nghttp2_session*, nghttp2_frame* frame, const uint8_t* framehd, size_t length,
nghttp2_data_source* source, void*) -> int {
ASSERT(frame->data.padlen == 0);
static_cast<StreamImpl*>(source->ptr)->onDataSourceSend(framehd, length);
return 0;
});
}
parent_.adapter_->SubmitResponse(stream_id_, buildHeaders(headers), std::move(data_frame_source));
parent_.adapter_->SubmitResponse(stream_id_, buildHeaders(headers),
end_stream ? nullptr
: std::make_unique<StreamDataFrameSource>(*this));
}

void ConnectionImpl::StreamImpl::onPendingFlushTimer() {
Expand Down Expand Up @@ -1646,15 +1612,6 @@ ConnectionImpl::Http2Callbacks::Http2Callbacks() {
return static_cast<ConnectionImpl*>(user_data)->onSend(data, length);
});

nghttp2_session_callbacks_set_send_data_callback(
callbacks_,
[](nghttp2_session*, nghttp2_frame* frame, const uint8_t* framehd, size_t length,
nghttp2_data_source* source, void*) -> int {
ASSERT(frame->data.padlen == 0);
static_cast<StreamImpl*>(source->ptr)->onDataSourceSend(framehd, length);
return 0;
});

nghttp2_session_callbacks_set_on_begin_headers_callback(
callbacks_, [](nghttp2_session*, const nghttp2_frame* frame, void* user_data) -> int {
auto status = static_cast<ConnectionImpl*>(user_data)->onBeginHeaders(frame);
Expand Down
26 changes: 21 additions & 5 deletions source/common/http/http2/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,12 @@ class ConnectionImpl : public virtual Connection,
void onPendingFlushTimer() override;

StreamImpl* base() { return this; }
ssize_t onDataSourceRead(uint64_t length, uint32_t* data_flags);
void onDataSourceSend(const uint8_t* framehd, size_t length);
void resetStreamWorker(StreamResetReason reason);
static void buildHeaders(std::vector<nghttp2_nv>& final_headers, const HeaderMap& headers);
static std::vector<http2::adapter::Header> buildHeaders(const HeaderMap& headers);
void saveHeader(HeaderString&& name, HeaderString&& value);
void encodeHeadersBase(const HeaderMap& headers, bool end_stream);
virtual void submitHeaders(const HeaderMap& headers, nghttp2_data_provider* provider) PURE;
virtual void submitHeaders(const HeaderMap& headers, bool end_stream) PURE;
void encodeTrailersBase(const HeaderMap& headers);
void submitTrailers(const HeaderMap& trailers);
// Returns true if the stream should defer the local reset stream until after the next call to
Expand Down Expand Up @@ -400,6 +398,24 @@ class ConnectionImpl : public virtual Connection,
void grantPeerAdditionalStreamWindow();
};

// Encapsulates the logic for sending DATA frames on a given stream.
class StreamDataFrameSource : public http2::adapter::DataFrameSource {
public:
explicit StreamDataFrameSource(StreamImpl& stream) : stream_(stream) {}

// Returns a pair of the next payload length, and whether that payload is the end of the data
// for this stream.
std::pair<int64_t, bool> SelectPayloadLength(size_t max_length) override;
// Queues the frame header and a DATA frame payload of the specified length for writing.
bool Send(absl::string_view frame_header, size_t payload_length) override;
// Whether the codec should send the END_STREAM flag on the final DATA frame.
bool send_fin() const override { return send_fin_; }

private:
StreamImpl& stream_;
bool send_fin_ = true;
};

using StreamImplPtr = std::unique_ptr<StreamImpl>;

/**
Expand All @@ -416,7 +432,7 @@ class ConnectionImpl : public virtual Connection,
// to flush would be covered by a request/stream/etc. timeout.
void setFlushTimeout(std::chrono::milliseconds /*timeout*/) override {}
// StreamImpl
void submitHeaders(const HeaderMap& headers, nghttp2_data_provider* provider) override;
void submitHeaders(const HeaderMap& headers, bool end_stream) override;
// Do not use deferred reset on upstream connections.
bool useDeferredReset() const override { return false; }
StreamDecoder& decoder() override { return response_decoder_; }
Expand Down Expand Up @@ -468,7 +484,7 @@ class ConnectionImpl : public virtual Connection,

// StreamImpl
void destroy() override;
void submitHeaders(const HeaderMap& headers, nghttp2_data_provider* provider) override;
void submitHeaders(const HeaderMap& headers, bool end_stream) override;
// Enable deferred reset on downstream connections so outbound HTTP internal error replies are
// written out before force resetting the stream, assuming there is enough H2 connection flow
// control window is available.
Expand Down

0 comments on commit 38ced99

Please sign in to comment.