Skip to content

Commit

Permalink
async http: set buffer limit for response and do not buffer for mirror
Browse files Browse the repository at this point in the history
Signed-off-by: Boteng Yao <[email protected]>
Signed-off-by: Yan Avlasov <[email protected]>
Signed-off-by: Ryan Northey <[email protected]>
  • Loading branch information
botengyao authored and jwendell committed May 29, 2024
1 parent a0d1adc commit 1ae92af
Show file tree
Hide file tree
Showing 13 changed files with 233 additions and 18 deletions.
4 changes: 4 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ bug_fixes:
Only 101 is considered a successful response for websocket handshake for HTTP/1.1, and Envoy as a proxy will proxy the response
header from upstream to downstream and then close the request if other status is received. This behavior can be
reverted by ``envoy_reloadable_features_check_switch_protocol_websocket_handshake``.
- area: async http client
change: |
Added one option to disable the response body buffering for mirror request. Also introduced a 32MB cap for the response
buffer, which can be changed by the runtime flag ``http.async_response_buffer_limit`` based on the product needs.
removed_config_or_runtime:
# *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`
Expand Down
14 changes: 13 additions & 1 deletion envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ class AsyncClient {
*/
enum class FailureReason {
// The stream has been reset.
Reset
Reset,
// The stream exceeds the response buffer limit.
ExceedResponseBufferLimit
};

/**
Expand Down Expand Up @@ -291,6 +293,11 @@ class AsyncClient {
return *this;
}

StreamOptions& setDiscardResponseBody(bool discard) {
discard_response_body = discard;
return *this;
}

// For gmock test
bool operator==(const StreamOptions& src) const {
return timeout == src.timeout && buffer_body_for_retry == src.buffer_body_for_retry &&
Expand Down Expand Up @@ -328,6 +335,7 @@ class AsyncClient {
OptRef<Router::FilterConfig> filter_config_;

bool is_shadow{false};
bool discard_response_body{false};
};

/**
Expand Down Expand Up @@ -391,6 +399,10 @@ class AsyncClient {
buffer_limit_ = limit;
return *this;
}
RequestOptions& setDiscardResponseBody(bool discard) {
discard_response_body = discard;
return *this;
}

// For gmock test
bool operator==(const RequestOptions& src) const {
Expand Down
37 changes: 31 additions & 6 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

namespace Envoy {
namespace Http {

const absl::string_view AsyncClientImpl::ResponseBufferLimit = "http.async_response_buffer_limit";

AsyncClientImpl::AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster,
Stats::Store& stats_store, Event::Dispatcher& dispatcher,
const LocalInfo::LocalInfo& local_info,
Expand All @@ -25,7 +28,7 @@ AsyncClientImpl::AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster,
config_(http_context.asyncClientStatPrefix(), local_info, *stats_store.rootScope(), cm,
runtime, random, std::move(shadow_writer), true, false, false, false, false, false,
{}, dispatcher.timeSource(), http_context, router_context),
dispatcher_(dispatcher) {}
dispatcher_(dispatcher), runtime_(runtime) {}

AsyncClientImpl::~AsyncClientImpl() {
while (!active_streams_.empty()) {
Expand Down Expand Up @@ -77,7 +80,8 @@ AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callba

AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
const AsyncClient::StreamOptions& options)
: parent_(parent), stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()),
: parent_(parent), discard_response_body_(options.discard_response_body),
stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()),
router_(options.filter_config_ ? *options.filter_config_ : parent.config_,
parent.config_.async_stats_),
stream_info_(Protocol::Http11, parent.dispatcher().timeSource(), nullptr),
Expand Down Expand Up @@ -257,7 +261,9 @@ void AsyncStreamImpl::resetStream(Http::StreamResetReason, absl::string_view) {
AsyncRequestSharedImpl::AsyncRequestSharedImpl(AsyncClientImpl& parent,
AsyncClient::Callbacks& callbacks,
const AsyncClient::RequestOptions& options)
: AsyncStreamImpl(parent, *this, options), callbacks_(callbacks) {
: AsyncStreamImpl(parent, *this, options), callbacks_(callbacks),
response_buffer_limit_(parent.runtime_.snapshot().getInteger(
AsyncClientImpl::ResponseBufferLimit, kBufferLimitForResponse)) {
if (nullptr != options.parent_span_) {
const std::string child_span_name =
options.child_span_name_.empty()
Expand Down Expand Up @@ -306,7 +312,22 @@ void AsyncRequestSharedImpl::onHeaders(ResponseHeaderMapPtr&& headers, bool) {
response_ = std::make_unique<ResponseMessageImpl>(std::move(headers));
}

void AsyncRequestSharedImpl::onData(Buffer::Instance& data, bool) { response_->body().move(data); }
void AsyncRequestSharedImpl::onData(Buffer::Instance& data, bool) {
if (discard_response_body_) {
data.drain(data.length());
return;
}

if (response_->body().length() + data.length() > response_buffer_limit_) {
ENVOY_LOG_EVERY_POW_2(warn, "the buffer size limit for async client response body "
"has been exceeded, draining data");
data.drain(data.length());
response_buffer_overlimit_ = true;
reset();
} else {
response_->body().move(data);
}
}

void AsyncRequestSharedImpl::onTrailers(ResponseTrailerMapPtr&& trailers) {
response_->trailers(std::move(trailers));
Expand All @@ -327,8 +348,12 @@ void AsyncRequestSharedImpl::onReset() {
Tracing::EgressConfig::get());

if (!cancelled_) {
// In this case we don't have a valid response so we do need to raise a failure.
callbacks_.onFailure(*this, AsyncClient::FailureReason::Reset);
if (response_buffer_overlimit_) {
callbacks_.onFailure(*this, AsyncClient::FailureReason::ExceedResponseBufferLimit);
} else {
// In this case we don't have a valid response so we do need to raise a failure.
callbacks_.onFailure(*this, AsyncClient::FailureReason::Reset);
}
}
}

Expand Down
9 changes: 8 additions & 1 deletion source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ namespace {
// Limit the size of buffer for data used for retries.
// This is currently fixed to 64KB.
constexpr uint64_t kBufferLimitForRetry = 1 << 16;
// Response buffer limit 32MB.
constexpr uint64_t kBufferLimitForResponse = 32 * 1024 * 1024;
} // namespace

class AsyncStreamImpl;
Expand All @@ -74,12 +76,14 @@ class AsyncClientImpl final : public AsyncClient {
Singleton::Manager& singleton_manager_;
Upstream::ClusterInfoConstSharedPtr cluster_;
Event::Dispatcher& dispatcher() override { return dispatcher_; }
static const absl::string_view ResponseBufferLimit;

private:
template <typename T> T* internalStartRequest(T* async_request);
Router::FilterConfig config_;
Event::Dispatcher& dispatcher_;
std::list<std::unique_ptr<AsyncStreamImpl>> active_streams_;
Runtime::Loader& runtime_;

friend class AsyncStreamImpl;
friend class AsyncRequestSharedImpl;
Expand All @@ -92,7 +96,7 @@ class AsyncClientImpl final : public AsyncClient {
class AsyncStreamImpl : public virtual AsyncClient::Stream,
public StreamDecoderFilterCallbacks,
public Event::DeferredDeletable,
Logger::Loggable<Logger::Id::http>,
public Logger::Loggable<Logger::Id::http>,
public LinkedObject<AsyncStreamImpl>,
public ScopeTrackedObject {
public:
Expand Down Expand Up @@ -151,6 +155,7 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream,
absl::optional<AsyncClient::StreamDestructorCallbacks> destructor_callback_;
// Callback to listen for low/high/overflow watermark events.
absl::optional<std::reference_wrapper<DecoderFilterWatermarkCallbacks>> watermark_callbacks_;
const bool discard_response_body_;

private:
void cleanup();
Expand Down Expand Up @@ -303,6 +308,8 @@ class AsyncRequestSharedImpl : public virtual AsyncClient::Request,
Tracing::SpanPtr child_span_;
std::unique_ptr<ResponseMessageImpl> response_;
bool cancelled_{};
bool response_buffer_overlimit_{};
const uint64_t response_buffer_limit_;
};

class AsyncOngoingRequestImpl final : public AsyncClient::OngoingRequest,
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,8 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
.setBufferAccount(callbacks_->account())
// A buffer limit of 1 is set in the case that retry_shadow_buffer_limit_ == 0,
// because a buffer limit of zero on async clients is interpreted as no buffer limit.
.setBufferLimit(1 > retry_shadow_buffer_limit_ ? 1 : retry_shadow_buffer_limit_);
.setBufferLimit(1 > retry_shadow_buffer_limit_ ? 1 : retry_shadow_buffer_limit_)
.setDiscardResponseBody(true);
options.setFilterConfig(config_);
if (end_stream) {
// This is a header-only request, and can be dispatched immediately to the shadow
Expand Down
5 changes: 3 additions & 2 deletions source/extensions/common/wasm/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1849,8 +1849,9 @@ void Context::onHttpCallFailure(uint32_t token, Http::AsyncClient::FailureReason
return;
}
status_code_ = static_cast<uint32_t>(WasmResult::BrokenConnection);
// This is the only value currently.
ASSERT(reason == Http::AsyncClient::FailureReason::Reset);
// TODO(botengyao): handle different failure reasons.
ASSERT(reason == Http::AsyncClient::FailureReason::Reset ||
reason == Http::AsyncClient::FailureReason::ExceedResponseBufferLimit);
status_message_ = "reset";
// Deferred "after VM call" actions are going to be executed upon returning from
// ContextBase::*, which might include deleting Context object via proxy_done().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ void RestApiFetcher::onSuccess(const Http::AsyncClient::Request& request,

void RestApiFetcher::onFailure(const Http::AsyncClient::Request&,
Http::AsyncClient::FailureReason reason) {
// Currently Http::AsyncClient::FailureReason only has one value: "Reset".
ASSERT(reason == Http::AsyncClient::FailureReason::Reset);
ASSERT(reason == Http::AsyncClient::FailureReason::Reset ||
reason == Http::AsyncClient::FailureReason::ExceedResponseBufferLimit);
onFetchFailure(Config::ConfigUpdateFailureReason::ConnectionFailure, nullptr);
requestComplete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ void RawHttpClientImpl::onSuccess(const Http::AsyncClient::Request&,

void RawHttpClientImpl::onFailure(const Http::AsyncClient::Request&,
Http::AsyncClient::FailureReason reason) {
ASSERT(reason == Http::AsyncClient::FailureReason::Reset);
// TODO(botengyao): handle different failure reasons.
ASSERT(reason == Http::AsyncClient::FailureReason::Reset ||
reason == Http::AsyncClient::FailureReason::ExceedResponseBufferLimit);
callbacks_->onComplete(std::make_unique<Response>(errorResponse()));
callbacks_ = nullptr;
}
Expand Down
5 changes: 3 additions & 2 deletions source/extensions/filters/http/gcp_authn/gcp_authn_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ void GcpAuthnClient::onSuccess(const Http::AsyncClient::Request&,

void GcpAuthnClient::onFailure(const Http::AsyncClient::Request&,
Http::AsyncClient::FailureReason reason) {
// Http::AsyncClient::FailureReason only has one value: "Reset".
ASSERT(reason == Http::AsyncClient::FailureReason::Reset);
// TODO(botengyao): handle different failure reasons.
ASSERT(reason == Http::AsyncClient::FailureReason::Reset ||
reason == Http::AsyncClient::FailureReason::ExceedResponseBufferLimit);
ENVOY_LOG(error, "Request failed: stream has been reset");
active_request_ = nullptr;
onError();
Expand Down
3 changes: 3 additions & 0 deletions source/extensions/tracers/datadog/agent_http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ void AgentHTTPClient::onFailure(const Http::AsyncClient::Request& request,
case Http::AsyncClient::FailureReason::Reset:
message += "The stream has been reset.";
break;
case Http::AsyncClient::FailureReason::ExceedResponseBufferLimit:
message += "The stream exceeds the response buffer limit.";
break;
default:
message += "Unknown error.";
}
Expand Down
1 change: 1 addition & 0 deletions test/common/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ envoy_cc_test(
"//test/mocks/runtime:runtime_mocks",
"//test/mocks/stats:stats_mocks",
"//test/mocks/upstream:cluster_manager_mocks",
"//test/test_common:test_runtime_lib",
"//test/test_common:test_time_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/config/route/v3:pkg_cc_proto",
Expand Down
123 changes: 123 additions & 0 deletions test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,129 @@ TEST_F(AsyncClientImplTest, Basic) {
.value());
}

TEST_F(AsyncClientImplTest, NoResponseBodyBuffering) {
message_->body().add("test body");
Buffer::Instance& data = message_->body();

EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _, _))
.WillOnce(Invoke(
[&](ResponseDecoder& decoder, ConnectionPool::Callbacks& callbacks,
const ConnectionPool::Instance::StreamOptions&) -> ConnectionPool::Cancellable* {
callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_,
stream_info_, {});
response_decoder_ = &decoder;
return nullptr;
}));

TestRequestHeaderMapImpl copy(message_->headers());
copy.addCopy("x-envoy-internal", "true");
copy.addCopy("x-forwarded-for", "127.0.0.1");
copy.addCopy(":scheme", "http");

EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&copy), false));
EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(&data), true));

auto* request = client_.send(std::move(message_), callbacks_,
AsyncClient::RequestOptions().setDiscardResponseBody(true));
EXPECT_NE(request, nullptr);

EXPECT_CALL(callbacks_, onBeforeFinalizeUpstreamSpan(_, _));
EXPECT_CALL(callbacks_, onSuccess_(_, _))
.WillOnce(Invoke([](const AsyncClient::Request&, ResponseMessage* response) -> void {
// Verify that there is zero response body.
EXPECT_EQ(response->body().length(), 0);
}));
ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}});
response_decoder_->decodeHeaders(std::move(response_headers), false);
response_decoder_->decodeData(data, true);

EXPECT_EQ(
1UL,
cm_.thread_local_cluster_.cluster_.info_->stats_store_.counter("upstream_rq_200").value());
EXPECT_EQ(1UL, cm_.thread_local_cluster_.cluster_.info_->stats_store_
.counter("internal.upstream_rq_200")
.value());
}

TEST_F(AsyncClientImplTest, LargeResponseBody) {
message_->body().add("test body");
Buffer::Instance& data = message_->body();

EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _, _))
.WillOnce(Invoke(
[&](ResponseDecoder& decoder, ConnectionPool::Callbacks& callbacks,
const ConnectionPool::Instance::StreamOptions&) -> ConnectionPool::Cancellable* {
callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_,
stream_info_, {});
response_decoder_ = &decoder;
return nullptr;
}));

TestRequestHeaderMapImpl copy(message_->headers());
copy.addCopy("x-envoy-internal", "true");
copy.addCopy("x-forwarded-for", "127.0.0.1");
copy.addCopy(":scheme", "http");

EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&copy), false));
EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(&data), true));
ON_CALL(runtime_.snapshot_,
getInteger(AsyncClientImpl::ResponseBufferLimit, kBufferLimitForResponse))
.WillByDefault(Return(100));

auto* request = client_.send(std::move(message_), callbacks_, AsyncClient::RequestOptions());
EXPECT_NE(request, nullptr);

EXPECT_CALL(callbacks_, onBeforeFinalizeUpstreamSpan(_, _));
EXPECT_CALL(callbacks_, onFailure(_, AsyncClient::FailureReason::ExceedResponseBufferLimit));

Buffer::InstancePtr large_body{new Buffer::OwnedImpl(std::string(100 + 1, 'a'))};
ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}});
response_decoder_->decodeHeaders(std::move(response_headers), false);
response_decoder_->decodeData(*large_body, true);
EXPECT_EQ(large_body->length(), 0);
}

TEST_F(AsyncClientImplTest, LargeResponseBodyMultipleRead) {
message_->body().add("test body");
Buffer::Instance& data = message_->body();

EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _, _))
.WillOnce(Invoke(
[&](ResponseDecoder& decoder, ConnectionPool::Callbacks& callbacks,
const ConnectionPool::Instance::StreamOptions&) -> ConnectionPool::Cancellable* {
callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_,
stream_info_, {});
response_decoder_ = &decoder;
return nullptr;
}));

TestRequestHeaderMapImpl copy(message_->headers());
copy.addCopy("x-envoy-internal", "true");
copy.addCopy("x-forwarded-for", "127.0.0.1");
copy.addCopy(":scheme", "http");

EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&copy), false));
EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(&data), true));
ON_CALL(runtime_.snapshot_,
getInteger(AsyncClientImpl::ResponseBufferLimit, kBufferLimitForResponse))
.WillByDefault(Return(100));

auto* request = client_.send(std::move(message_), callbacks_, AsyncClient::RequestOptions());
EXPECT_NE(request, nullptr);

EXPECT_CALL(callbacks_, onBeforeFinalizeUpstreamSpan(_, _));
EXPECT_CALL(callbacks_, onFailure(_, AsyncClient::FailureReason::ExceedResponseBufferLimit));

Buffer::InstancePtr large_body{new Buffer::OwnedImpl(std::string(50, 'a'))};
Buffer::InstancePtr large_body_second{new Buffer::OwnedImpl(std::string(50, 'a'))};
Buffer::InstancePtr large_body_third{new Buffer::OwnedImpl(std::string(2, 'a'))};
ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}});
response_decoder_->decodeHeaders(std::move(response_headers), false);
response_decoder_->decodeData(*large_body, false);
response_decoder_->decodeData(*large_body_second, false);
response_decoder_->decodeData(*large_body_third, true);
}

TEST_F(AsyncClientImplTest, BasicOngoingRequest) {
auto headers = std::make_unique<TestRequestHeaderMapImpl>();
HttpTestUtility::addDefaultHeaders(*headers);
Expand Down
Loading

0 comments on commit 1ae92af

Please sign in to comment.