Skip to content

Commit

Permalink
[buffer] Add on-drain hook to buffer API and use it to avoid fragment…
Browse files Browse the repository at this point in the history
…ation due to tracking of H2 data and control frames in the output buffer (#144)

Signed-off-by: antonio <[email protected]>
  • Loading branch information
antoniovicente authored and tonya11en committed Jun 30, 2020
1 parent 0e49a49 commit 5eba69a
Show file tree
Hide file tree
Showing 8 changed files with 390 additions and 57 deletions.
9 changes: 9 additions & 0 deletions include/envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ class Instance {
public:
virtual ~Instance() = default;

/**
* Register function to call when the last byte in the last slice of this
* buffer has fully drained. Note that slices may be transferred to
* downstream buffers, drain trackers are transferred along with the bytes
* they track so the function is called only after the last byte is drained
* from all buffers.
*/
virtual void addDrainTracker(std::function<void()> drain_tracker) PURE;

/**
* Copy data into the buffer (deprecated, use absl::string_view variant
* instead).
Expand Down
14 changes: 11 additions & 3 deletions source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ void OwnedImpl::addImpl(const void* data, uint64_t size) {
}
}

void OwnedImpl::addDrainTracker(std::function<void()> drain_tracker) {
ASSERT(!slices_.empty());
slices_.back()->addDrainTracker(std::move(drain_tracker));
}

void OwnedImpl::add(const void* data, uint64_t size) { addImpl(data, size); }

void OwnedImpl::addBufferFragment(BufferFragment& fragment) {
Expand Down Expand Up @@ -231,9 +236,11 @@ void* OwnedImpl::linearize(uint32_t size) {
auto dest = static_cast<uint8_t*>(reservation.mem_);
do {
uint64_t data_size = slices_.front()->dataSize();
memcpy(dest, slices_.front()->data(), data_size);
bytes_copied += data_size;
dest += data_size;
if (data_size > 0) {
memcpy(dest, slices_.front()->data(), data_size);
bytes_copied += data_size;
dest += data_size;
}
slices_.pop_front();
} while (bytes_copied < linearized_size);
ASSERT(dest == static_cast<const uint8_t*>(reservation.mem_) + linearized_size);
Expand All @@ -256,6 +263,7 @@ void OwnedImpl::coalesceOrAddSlice(SlicePtr&& other_slice) {
// Copy content of the `other_slice`. The `move` methods which call this method effectively
// drain the source buffer.
addImpl(other_slice->data(), slice_size);
other_slice->transferDrainTrackersTo(*slices_.back());
} else {
// Take ownership of the slice.
slices_.emplace_back(std::move(other_slice));
Expand Down
21 changes: 20 additions & 1 deletion source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ class Slice {
public:
using Reservation = RawSlice;

virtual ~Slice() = default;
virtual ~Slice() {
for (const auto& drain_tracker : drain_trackers_) {
drain_tracker();
}
}

/**
* @return a pointer to the start of the usable content.
Expand Down Expand Up @@ -137,6 +141,9 @@ class Slice {
*/
uint64_t append(const void* data, uint64_t size) {
uint64_t copy_size = std::min(size, reservableSize());
if (copy_size == 0) {
return 0;
}
uint8_t* dest = base_ + reservable_;
reservable_ += copy_size;
// NOLINTNEXTLINE(clang-analyzer-core.NullDereference)
Expand Down Expand Up @@ -193,6 +200,15 @@ class Slice {
return SliceRepresentation{dataSize(), reservableSize(), capacity_};
}

void transferDrainTrackersTo(Slice& destination) {
destination.drain_trackers_.splice(destination.drain_trackers_.end(), drain_trackers_);
ASSERT(drain_trackers_.empty());
}

void addDrainTracker(std::function<void()> drain_tracker) {
drain_trackers_.emplace_back(std::move(drain_tracker));
}

protected:
Slice(uint64_t data, uint64_t reservable, uint64_t capacity)
: data_(data), reservable_(reservable), capacity_(capacity) {}
Expand All @@ -208,6 +224,8 @@ class Slice {

/** Total number of bytes in the slice */
uint64_t capacity_;

std::list<std::function<void()>> drain_trackers_;
};

using SlicePtr = std::unique_ptr<Slice>;
Expand Down Expand Up @@ -510,6 +528,7 @@ class OwnedImpl : public LibEventInstance {
OwnedImpl(const void* data, uint64_t size);

// Buffer::Instance
void addDrainTracker(std::function<void()> drain_tracker) override;
void add(const void* data, uint64_t size) override;
void addBufferFragment(BufferFragment& fragment) override;
void add(absl::string_view data) override;
Expand Down
26 changes: 8 additions & 18 deletions source/common/http/http2/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -499,13 +499,9 @@ ConnectionImpl::ConnectionImpl(Network::Connection& connection, CodecStats& stat
stream_error_on_invalid_http_messaging_(
http2_options.stream_error_on_invalid_http_messaging()),
flood_detected_(false), max_outbound_frames_(http2_options.max_outbound_frames().value()),
frame_buffer_releasor_([this](const Buffer::OwnedBufferFragmentImpl* fragment) {
releaseOutboundFrame(fragment);
}),
frame_buffer_releasor_([this]() { releaseOutboundFrame(); }),
max_outbound_control_frames_(http2_options.max_outbound_control_frames().value()),
control_frame_buffer_releasor_([this](const Buffer::OwnedBufferFragmentImpl* fragment) {
releaseOutboundControlFrame(fragment);
}),
control_frame_buffer_releasor_([this]() { releaseOutboundControlFrame(); }),
max_consecutive_inbound_frames_with_empty_payload_(
http2_options.max_consecutive_inbound_frames_with_empty_payload().value()),
max_inbound_priority_frames_per_stream_(
Expand Down Expand Up @@ -819,27 +815,21 @@ bool ConnectionImpl::addOutboundFrameFragment(Buffer::OwnedImpl& output, const u
return false;
}

auto fragment = Buffer::OwnedBufferFragmentImpl::create(
absl::string_view(reinterpret_cast<const char*>(data), length),
is_outbound_flood_monitored_control_frame ? control_frame_buffer_releasor_
: frame_buffer_releasor_);

// The Buffer::OwnedBufferFragmentImpl object will be deleted in the *frame_buffer_releasor_
// callback.
output.addBufferFragment(*fragment.release());
output.add(data, length);
output.addDrainTracker(is_outbound_flood_monitored_control_frame ? control_frame_buffer_releasor_
: frame_buffer_releasor_);
return true;
}

void ConnectionImpl::releaseOutboundFrame(const Buffer::OwnedBufferFragmentImpl* fragment) {
void ConnectionImpl::releaseOutboundFrame() {
ASSERT(outbound_frames_ >= 1);
--outbound_frames_;
delete fragment;
}

void ConnectionImpl::releaseOutboundControlFrame(const Buffer::OwnedBufferFragmentImpl* fragment) {
void ConnectionImpl::releaseOutboundControlFrame() {
ASSERT(outbound_control_frames_ >= 1);
--outbound_control_frames_;
releaseOutboundFrame(fragment);
releaseOutboundFrame();
}

ssize_t ConnectionImpl::onSend(const uint8_t* data, size_t length) {
Expand Down
8 changes: 4 additions & 4 deletions source/common/http/http2/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
// Maximum number of outbound frames. Initialized from corresponding http2_protocol_options.
// Default value is 10000.
const uint32_t max_outbound_frames_;
const Buffer::OwnedBufferFragmentImpl::Releasor frame_buffer_releasor_;
const std::function<void()> frame_buffer_releasor_;
// This counter keeps track of the number of outbound frames of types PING, SETTINGS and
// RST_STREAM (these that were buffered in the underlying connection but not yet written into the
// socket). If this counter exceeds the `max_outbound_control_frames_' value the connection is
Expand All @@ -433,7 +433,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
// Maximum number of outbound frames of types PING, SETTINGS and RST_STREAM. Initialized from
// corresponding http2_protocol_options. Default value is 1000.
const uint32_t max_outbound_control_frames_;
const Buffer::OwnedBufferFragmentImpl::Releasor control_frame_buffer_releasor_;
const std::function<void()> control_frame_buffer_releasor_;
// This counter keeps track of the number of consecutive inbound frames of types HEADERS,
// CONTINUATION and DATA with an empty payload and no end stream flag. If this counter exceeds
// the `max_consecutive_inbound_frames_with_empty_payload_` value the connection is terminated.
Expand Down Expand Up @@ -497,8 +497,8 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
void incrementOutboundFrameCount(bool is_outbound_flood_monitored_control_frame);
virtual bool trackInboundFrames(const nghttp2_frame_hd* hd, uint32_t padding_length) PURE;
virtual bool checkInboundFrameLimits() PURE;
void releaseOutboundFrame(const Buffer::OwnedBufferFragmentImpl* fragment);
void releaseOutboundControlFrame(const Buffer::OwnedBufferFragmentImpl* fragment);
void releaseOutboundFrame();
void releaseOutboundControlFrame();

bool dispatching_ : 1;
bool raised_goaway_ : 1;
Expand Down
6 changes: 6 additions & 0 deletions test/common/buffer/buffer_fuzz.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ void releaseFragmentAllocation(const void* p, size_t, const Buffer::BufferFragme
// walk off the edge; the caller should be guaranteeing this.
class StringBuffer : public Buffer::Instance {
public:
void addDrainTracker(std::function<void()> drain_tracker) override {
// Not implemented well.
ASSERT(false);
drain_tracker();
}

void add(const void* data, uint64_t size) override {
FUZZ_ASSERT(start_ + size_ + size <= data_.size());
::memcpy(mutableEnd(), data, size);
Expand Down
Loading

0 comments on commit 5eba69a

Please sign in to comment.