Skip to content

Commit

Permalink
gRPC: Implement max_receive_message_length in envoy gRPC (#32711)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: tyxia <[email protected]>
  • Loading branch information
tyxia authored Apr 4, 2024
1 parent f61b28f commit c3f7225
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 6 deletions.
6 changes: 6 additions & 0 deletions api/envoy/config/core/v3/grpc_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ message GrpcService {
// Currently only supported for xDS gRPC streams.
// If not set, xDS gRPC streams default base interval:500ms, maximum interval:30s will be applied.
RetryPolicy retry_policy = 3;

// Maximum gRPC message size that is allowed to be received.
// If a message over this limit is received, the gRPC stream is terminated with the RESOURCE_EXHAUSTED error.
// This limit is applied to individual messages in the streaming response and not the total size of streaming response.
// Defaults to 0, which means unlimited.
google.protobuf.UInt32Value max_receive_message_length = 4;
}

// [#next-free-field: 9]
Expand Down
5 changes: 5 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -463,3 +463,8 @@ deprecated:
deprecated :ref:`split_spans_for_request <envoy_v3_api_field_config.trace.v3.ZipkinConfig.split_spans_for_request>`
in favor of :ref:`spawn_upstream_span
<envoy_v3_api_field_extensions.filters.network.http_connection_manager.v3.HttpConnectionManager.Tracing.spawn_upstream_span>`.
- area: grpc
change: |
Added maximum gRPC message size that is allowed to be received in Envoy gRPC. If a message over this limit is received,
the gRPC stream is terminated with the RESOURCE_EXHAUSTED error. This limit is applied to individual messages in the
streaming response and not the total size of streaming response. Defaults to 0, which means unlimited.
6 changes: 5 additions & 1 deletion source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ namespace Grpc {
AsyncClientImpl::AsyncClientImpl(Upstream::ClusterManager& cm,
const envoy::config::core::v3::GrpcService& config,
TimeSource& time_source)
: cm_(cm), remote_cluster_name_(config.envoy_grpc().cluster_name()),
: max_recv_message_length_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.envoy_grpc(), max_receive_message_length, 0)),
cm_(cm), remote_cluster_name_(config.envoy_grpc().cluster_name()),
host_name_(config.envoy_grpc().authority()), time_source_(time_source),
metadata_parser_(Router::HeaderParser::configure(
config.initial_metadata(),
Expand Down Expand Up @@ -81,6 +83,8 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, absl::string_view serv
if (!options.retry_policy.has_value() && parent_.retryPolicy().has_value()) {
options_.setRetryPolicy(*parent_.retryPolicy());
}
// Configure the maximum frame length
decoder_.setMaxFrameLength(parent_.max_recv_message_length_);
}

void AsyncStreamImpl::initialize(bool buffer_body_for_retry) {
Expand Down
1 change: 1 addition & 0 deletions source/common/grpc/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class AsyncClientImpl final : public RawAsyncClient {
}

private:
const uint32_t max_recv_message_length_;
Upstream::ClusterManager& cm_;
const std::string remote_cluster_name_;
// The host header value in the http transport.
Expand Down
22 changes: 22 additions & 0 deletions test/common/grpc/grpc_client_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,28 @@ TEST_P(GrpcClientIntegrationTest, BadReplyGrpcFraming) {
dispatcher_helper_.runDispatcher();
}

// Validate that a reply that exceeds gRPC maximum frame size is handled as an RESOURCE_EXHAUSTED
// gRPC error.
TEST_P(GrpcClientIntegrationTest, BadReplyOverGrpcFrameLimit) {
// Only testing behavior of Envoy client, since `max_receive_message_length` configuration is
// added to Envoy-gRPC only.
SKIP_IF_GRPC_CLIENT(ClientType::GoogleGrpc);

helloworld::HelloReply reply;
reply.set_message("HelloWorld");

initialize(/*envoy_grpc_max_recv_msg_length=*/2);

auto stream = createStream(empty_metadata_);
stream->sendRequest();
stream->sendServerInitialMetadata(empty_metadata_);
stream->expectTrailingMetadata(empty_metadata_);
stream->expectGrpcStatus(Status::WellKnownGrpcStatus::ResourceExhausted);
auto serialized_response = Grpc::Common::serializeToGrpcFrame(reply);
stream->fake_stream_->encodeData(*serialized_response, true);
dispatcher_helper_.runDispatcher();
}

// Validate that custom channel args can be set on the Google gRPC client.
//
TEST_P(GrpcClientIntegrationTest, CustomChannelArgs) {
Expand Down
15 changes: 10 additions & 5 deletions test/common/grpc/grpc_client_integration_test_harness.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,14 @@ class GrpcClientIntegrationTest : public GrpcClientIntegrationParamTest {
dispatcher_(api_->allocateDispatcher("test_thread")),
http_context_(stats_store_.symbolTable()), router_context_(stats_store_.symbolTable()) {}

virtual void initialize() {
virtual void initialize(uint32_t envoy_grpc_max_recv_msg_length = 0) {
if (fake_upstream_ == nullptr) {
fake_upstream_config_.upstream_protocol_ = Http::CodecType::HTTP2;
fake_upstream_ = std::make_unique<FakeUpstream>(0, ipVersion(), fake_upstream_config_);
}
switch (clientType()) {
case ClientType::EnvoyGrpc:
grpc_client_ = createAsyncClientImpl();
grpc_client_ = createAsyncClientImpl(envoy_grpc_max_recv_msg_length);
break;
case ClientType::GoogleGrpc: {
grpc_client_ = createGoogleAsyncClientImpl();
Expand Down Expand Up @@ -321,7 +321,7 @@ class GrpcClientIntegrationTest : public GrpcClientIntegrationParamTest {

// Create a Grpc::AsyncClientImpl instance backed by enough fake/mock
// infrastructure to initiate a loopback TCP connection to fake_upstream_.
RawAsyncClientPtr createAsyncClientImpl() {
RawAsyncClientPtr createAsyncClientImpl(uint32_t envoy_grpc_max_recv_msg_length = 0) {
client_connection_ = std::make_unique<Network::ClientConnectionImpl>(
*dispatcher_, fake_upstream_->localAddress(), nullptr,
std::move(async_client_transport_socket_), nullptr, nullptr);
Expand All @@ -347,6 +347,11 @@ class GrpcClientIntegrationTest : public GrpcClientIntegrationParamTest {
.WillRepeatedly(ReturnRef(*http_async_client_));
envoy::config::core::v3::GrpcService config;
config.mutable_envoy_grpc()->set_cluster_name("fake_cluster");
if (envoy_grpc_max_recv_msg_length != 0) {
config.mutable_envoy_grpc()->mutable_max_receive_message_length()->set_value(
envoy_grpc_max_recv_msg_length);
}

fillServiceWideInitialMetadata(config);
return std::make_unique<AsyncClientImpl>(cm_, config, dispatcher_->timeSource());
}
Expand Down Expand Up @@ -548,7 +553,7 @@ class GrpcSslClientIntegrationTest : public GrpcClientIntegrationTest {
return config;
}

void initialize() override {
void initialize(uint32_t envoy_grpc_max_recv_msg_length = 0) override {
envoy::extensions::transport_sockets::tls::v3::UpstreamTlsContext tls_context;
auto* common_tls_context = tls_context.mutable_common_tls_context();
auto* validation_context = common_tls_context->mutable_validation_context();
Expand All @@ -575,7 +580,7 @@ class GrpcSslClientIntegrationTest : public GrpcClientIntegrationTest {
fake_upstream_ =
std::make_unique<FakeUpstream>(createUpstreamSslContext(), 0, ipVersion(), config);

GrpcClientIntegrationTest::initialize();
GrpcClientIntegrationTest::initialize(envoy_grpc_max_recv_msg_length);
}

Network::DownstreamTransportSocketFactoryPtr createUpstreamSslContext() {
Expand Down

0 comments on commit c3f7225

Please sign in to comment.