Skip to content

Commit

Permalink
Upstream TCP connection buffer and read buffer limits (#150). (#571)
Browse files Browse the repository at this point in the history
As with fd58242, but on the upstream cluster side.
  • Loading branch information
htuch authored and mattklein123 committed Mar 16, 2017
1 parent fca5cd5 commit c44ee0d
Show file tree
Hide file tree
Showing 18 changed files with 159 additions and 23 deletions.
6 changes: 5 additions & 1 deletion docs/configuration/cluster_manager/cluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Cluster
"name": "...",
"type": "...",
"connect_timeout_ms": "...",
"per_connection_buffer_limit_bytes": "...",
"lb_type": "...",
"hosts": [],
"service_name": "...",
Expand Down Expand Up @@ -37,6 +38,10 @@ connect_timeout_ms
*(required, integer)* The timeout for new network connections to hosts in the cluster specified
in milliseconds.

per_connection_buffer_limit_bytes
*(optional, integer)* Soft limit on size of the cluster's connections read and write buffers.
If unspecified, an implementation defined default is applied (1MiB).

lb_type
*(required, string)* The :ref:`load balancer type <arch_overview_load_balancing_types>` to use
when picking a host in the cluster. Possible options are *round_robin*, *least_request*,
Expand Down Expand Up @@ -172,7 +177,6 @@ outlier_detection
Each of the above configuration values can be overridden via
:ref:`runtime values <config_cluster_manager_cluster_runtime_outlier_detection>`.


.. toctree::
:hidden:

Expand Down
5 changes: 3 additions & 2 deletions docs/configuration/listeners/listeners.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ Each individual listener configuration has the following format:
"ssl_context": "{...}",
"bind_to_port": "...",
"use_proxy_proto": "...",
"use_original_dst": "..."
"use_original_dst": "...",
"per_connection_buffer_limit_bytes": "..."
}
port
Expand Down Expand Up @@ -54,7 +55,7 @@ use_original_dst

per_connection_buffer_limit_bytes
*(optional, integer)* Soft limit on size of the listener's new connection read and write buffers.
If unspecified, an implementation defined default is applied (1MB).
If unspecified, an implementation defined default is applied (1MiB).

.. toctree::
:hidden:
Expand Down
5 changes: 5 additions & 0 deletions include/envoy/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ class Connection : public Event::DeferredDeletable, public FilterManager {
* processing pipeline.
*/
virtual void setReadBufferLimit(uint32_t limit) PURE;

/**
* Get the value set with setReadBufferLimit.
*/
virtual uint32_t readBufferLimit() const PURE;
};

typedef std::unique_ptr<Connection> ConnectionPtr;
Expand Down
5 changes: 5 additions & 0 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ class ClusterInfo {
*/
virtual std::chrono::milliseconds connectTimeout() const PURE;

/**
* @return soft limit on size of the cluster's connections read and write buffers.
*/
virtual uint32_t perConnectionBufferLimitBytes() const PURE;

/**
* @return uint64_t features supported by the cluster. @see Features.
*/
Expand Down
5 changes: 5 additions & 0 deletions source/common/json/config_schemas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,11 @@ const std::string Json::Schema::CLUSTER_SCHEMA(R"EOF(
"minimum" : 0,
"exclusiveMinimum" : true
},
"per_connection_buffer_limit_bytes" : {
"type" : "integer",
"minimum" : 0,
"exclusiveMinimum" : true
},
"lb_type" : {
"type" : "string",
"enum" : ["round_robin", "least_request", "random", "ring_hash"]
Expand Down
1 change: 1 addition & 0 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class ConnectionImpl : public virtual Connection,
State state() override;
void write(Buffer::Instance& data) override;
void setReadBufferLimit(uint32_t limit) override { read_buffer_limit_ = limit; }
uint32_t readBufferLimit() const override { return read_buffer_limit_; }

// Network::BufferSource
Buffer::Instance& getReadBuffer() override { return read_buffer_; }
Expand Down
13 changes: 7 additions & 6 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ Host::CreateConnectionData HostImpl::createConnection(Event::Dispatcher& dispatc
Network::ClientConnectionPtr HostImpl::createConnection(Event::Dispatcher& dispatcher,
const ClusterInfo& cluster,
Network::Address::InstancePtr address) {
if (cluster.sslContext()) {
return Network::ClientConnectionPtr{
dispatcher.createSslClientConnection(*cluster.sslContext(), address)};
} else {
return Network::ClientConnectionPtr{dispatcher.createClientConnection(address)};
}
Network::ClientConnectionPtr connection =
cluster.sslContext() ? dispatcher.createSslClientConnection(*cluster.sslContext(), address)
: dispatcher.createClientConnection(address);
connection->setReadBufferLimit(cluster.perConnectionBufferLimitBytes());
return connection;
}

void HostImpl::weight(uint32_t new_weight) { weight_ = std::max(1U, std::min(100U, new_weight)); }
Expand All @@ -60,6 +59,8 @@ ClusterInfoImpl::ClusterInfoImpl(const Json::Object& config, Runtime::Loader& ru
: runtime_(runtime), name_(config.getString("name")),
max_requests_per_connection_(config.getInteger("max_requests_per_connection", 0)),
connect_timeout_(std::chrono::milliseconds(config.getInteger("connect_timeout_ms"))),
per_connection_buffer_limit_bytes_(
config.getInteger("per_connection_buffer_limit_bytes", 1024 * 1024)),
stats_scope_(stats.createScope(fmt::format("cluster.{}.", name_))),
stats_(generateStats(*stats_scope_)), features_(parseFeatures(config)),
http_codec_options_(Http::Utility::parseCodecOptions(config)),
Expand Down
4 changes: 4 additions & 0 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ class ClusterInfoImpl : public ClusterInfo {

// Upstream::ClusterInfo
std::chrono::milliseconds connectTimeout() const override { return connect_timeout_; }
uint32_t perConnectionBufferLimitBytes() const override {
return per_connection_buffer_limit_bytes_;
}
uint64_t features() const override { return features_; }
uint64_t httpCodecOptions() const override { return http_codec_options_; }
LoadBalancerType lbType() const override { return lb_type_; }
Expand Down Expand Up @@ -189,6 +192,7 @@ class ClusterInfoImpl : public ClusterInfo {
const std::string name_;
const uint64_t max_requests_per_connection_;
const std::chrono::milliseconds connect_timeout_;
const uint32_t per_connection_buffer_limit_bytes_;
Stats::ScopePtr stats_scope_;
mutable ClusterStats stats_;
Ssl::ClientContextPtr ssl_ctx_;
Expand Down
17 changes: 17 additions & 0 deletions test/common/http/http1/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,23 @@ TEST_F(Http1ConnPoolImplTest, VerifyTimingStats) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that buffer limits are set.
*/
TEST_F(Http1ConnPoolImplTest, VerifyBufferLimits) {
NiceMock<Http::MockStreamDecoder> outer_decoder;
ConnPoolCallbacks callbacks;
conn_pool_.expectClientCreate();
EXPECT_CALL(*cluster_, perConnectionBufferLimitBytes()).WillOnce(Return(8192));
EXPECT_CALL(*conn_pool_.test_clients_.back().connection_, setReadBufferLimit(8192));
Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(outer_decoder, callbacks);
EXPECT_NE(nullptr, handle);

EXPECT_CALL(conn_pool_, onClientDestroy());
conn_pool_.test_clients_[0].connection_->raiseEvents(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();
}

/**
* Tests a request that generates a new connection, completes, and then a second request that uses
* the same connection.
Expand Down
20 changes: 20 additions & 0 deletions test/common/http/http2/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,26 @@ TEST_F(Http2ConnPoolImplTest, VerifyConnectionTimingStats) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that buffer limits are set.
*/
TEST_F(Http2ConnPoolImplTest, VerifyBufferLimits) {
expectClientCreate();
EXPECT_CALL(*cluster_, perConnectionBufferLimitBytes()).WillOnce(Return(8192));
EXPECT_CALL(*test_clients_.back().connection_, setReadBufferLimit(8192));

ActiveTestRequest r1(*this, 0);
EXPECT_CALL(r1.inner_encoder_, encodeHeaders(_, true));
r1.callbacks_.outer_encoder_->encodeHeaders(HeaderMapImpl{}, true);
expectClientConnect(0);
EXPECT_CALL(r1.decoder_, decodeHeaders_(_, true));
r1.inner_decoder_->decodeHeaders(HeaderMapPtr{new HeaderMapImpl{}}, true);

test_clients_[0].connection_->raiseEvents(Network::ConnectionEvent::RemoteClose);
EXPECT_CALL(*this, onClientDestroy());
dispatcher_.clearDeferredDeleteList();
}

TEST_F(Http2ConnPoolImplTest, RequestAndResponse) {
InSequence s;

Expand Down
1 change: 1 addition & 0 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class ReadBufferLimitTest : public testing::Test {
server_connection = std::move(conn);
server_connection->addReadFilter(read_filter);
EXPECT_EQ("", server_connection->nextProtocol());
EXPECT_EQ(read_buffer_limit, server_connection->readBufferLimit());
}));

uint32_t filter_seen = 0;
Expand Down
1 change: 1 addition & 0 deletions test/common/ssl/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ class SslReadBufferLimitTest : public testing::Test {
server_connection = std::move(conn);
server_connection->addReadFilter(read_filter);
EXPECT_EQ("", server_connection->nextProtocol());
EXPECT_EQ(read_buffer_limit, server_connection->readBufferLimit());
}));

uint32_t filter_seen = 0;
Expand Down
28 changes: 28 additions & 0 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,34 @@ TEST_F(ClusterManagerImplTest, UnknownCluster) {
factory_.tls_.shutdownThread();
}

/**
* Test that buffer limits are set on new TCP connections.
*/
TEST_F(ClusterManagerImplTest, VerifyBufferLimits) {
std::string json = R"EOF(
{
"clusters": [
{
"name": "cluster_1",
"connect_timeout_ms": 250,
"per_connection_buffer_limit_bytes": 8192,
"type": "static",
"lb_type": "round_robin",
"hosts": [{"url": "tcp://127.0.0.1:11001"}]
}]
}
)EOF";

Json::ObjectPtr loader = Json::Factory::LoadFromString(json);
create(*loader);
Network::MockClientConnection* connection = new NiceMock<Network::MockClientConnection>();
EXPECT_CALL(*connection, setReadBufferLimit(8192));
EXPECT_CALL(factory_.tls_.dispatcher_, createClientConnection_(_)).WillOnce(Return(connection));
auto conn_data = cluster_manager_->tcpConnForCluster("cluster_1");
EXPECT_EQ(connection, conn_data.connection_.get());
factory_.tls_.shutdownThread();
}

TEST_F(ClusterManagerImplTest, ShutdownOrder) {
std::string json = R"EOF(
{
Expand Down
21 changes: 11 additions & 10 deletions test/common/upstream/health_checker_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "test/test_common/utility.h"

using testing::_;
using testing::Invoke;
using testing::NiceMock;
using testing::Return;
using testing::ReturnRef;
Expand All @@ -23,12 +24,12 @@ class TestHttpHealthCheckerImpl : public HttpHealthCheckerImpl {
public:
using HttpHealthCheckerImpl::HttpHealthCheckerImpl;

Http::CodecClient* createCodecClient(Upstream::Host::CreateConnectionData&) override {
return createCodecClient_();
Http::CodecClient* createCodecClient(Upstream::Host::CreateConnectionData& conn_data) override {
return createCodecClient_(conn_data);
};

// HttpHealthCheckerImpl
MOCK_METHOD0(createCodecClient_, Http::CodecClient*());
MOCK_METHOD1(createCodecClient_, Http::CodecClient*(Upstream::Host::CreateConnectionData&));
};

class HttpHealthCheckerImplTest : public testing::Test {
Expand All @@ -41,7 +42,6 @@ class HttpHealthCheckerImplTest : public testing::Test {
Http::MockClientConnection* codec_{};
Stats::IsolatedStoreImpl stats_store_;
Network::MockClientConnection* client_connection_{};
Http::CodecClient* codec_client_{};
NiceMock<Http::MockStreamEncoder> request_encoder_;
Http::StreamDecoder* stream_response_callbacks_{};
};
Expand Down Expand Up @@ -104,14 +104,15 @@ class HttpHealthCheckerImplTest : public testing::Test {
void expectClientCreate(size_t index) {
TestSession& test_session = *test_sessions_[index];

test_session.codec_ = new NiceMock<Http::MockClientConnection>();
auto* codec = test_session.codec_ = new NiceMock<Http::MockClientConnection>();
test_session.client_connection_ = new NiceMock<Network::MockClientConnection>();
auto create_codec_client = [codec](Upstream::Host::CreateConnectionData& conn_data) {
return new CodecClientForTest(std::move(conn_data.connection_), codec, nullptr, nullptr);
};

Network::ClientConnectionPtr connection{test_session.client_connection_};
test_session.codec_client_ =
new CodecClientForTest(std::move(connection), test_session.codec_, nullptr, nullptr);
EXPECT_CALL(*health_checker_, createCodecClient_())
.WillOnce(Return(test_session.codec_client_));
EXPECT_CALL(dispatcher_, createClientConnection_(_))
.WillOnce(Return(test_session.client_connection_));
EXPECT_CALL(*health_checker_, createCodecClient_(_)).WillOnce(Invoke(create_codec_client));
}

void expectStreamCreate(size_t index) {
Expand Down
12 changes: 8 additions & 4 deletions test/common/upstream/logical_dns_cluster_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ TEST_F(LogicalDnsClusterTest, Basic) {
HostPtr logical_host = cluster_->hosts()[0];

EXPECT_CALL(dispatcher_, createClientConnection_(
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443"))));
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443"))))
.WillOnce(Return(new NiceMock<Network::MockClientConnection>()));
logical_host->createConnection(dispatcher_);
logical_host->outlierDetector().putHttpResponseCode(200);

Expand All @@ -133,7 +134,8 @@ TEST_F(LogicalDnsClusterTest, Basic) {

EXPECT_EQ(logical_host, cluster_->hosts()[0]);
EXPECT_CALL(dispatcher_, createClientConnection_(
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443"))));
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:443"))))
.WillOnce(Return(new NiceMock<Network::MockClientConnection>()));
Host::CreateConnectionData data = logical_host->createConnection(dispatcher_);
EXPECT_FALSE(data.host_description_->canary());
EXPECT_EQ(&cluster_->hosts()[0]->cluster(), &data.host_description_->cluster());
Expand All @@ -152,7 +154,8 @@ TEST_F(LogicalDnsClusterTest, Basic) {

EXPECT_EQ(logical_host, cluster_->hosts()[0]);
EXPECT_CALL(dispatcher_, createClientConnection_(
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443"))));
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443"))))
.WillOnce(Return(new NiceMock<Network::MockClientConnection>()));
logical_host->createConnection(dispatcher_);

expectResolve();
Expand All @@ -164,7 +167,8 @@ TEST_F(LogicalDnsClusterTest, Basic) {

EXPECT_EQ(logical_host, cluster_->hosts()[0]);
EXPECT_CALL(dispatcher_, createClientConnection_(
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443"))));
PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.3:443"))))
.WillOnce(Return(new NiceMock<Network::MockClientConnection>()));
logical_host->createConnection(dispatcher_);

// Make sure we cancel.
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/network/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class MockConnection : public Connection, public MockConnectionBase {
MOCK_METHOD0(state, State());
MOCK_METHOD1(write, void(Buffer::Instance& data));
MOCK_METHOD1(setReadBufferLimit, void(uint32_t limit));
MOCK_CONST_METHOD0(readBufferLimit, uint32_t());
};

/**
Expand Down Expand Up @@ -89,6 +90,7 @@ class MockClientConnection : public ClientConnection, public MockConnectionBase
MOCK_METHOD0(state, State());
MOCK_METHOD1(write, void(Buffer::Instance& data));
MOCK_METHOD1(setReadBufferLimit, void(uint32_t limit));
MOCK_CONST_METHOD0(readBufferLimit, uint32_t());

// Network::ClientConnection
MOCK_METHOD0(connect, void());
Expand Down
1 change: 1 addition & 0 deletions test/mocks/upstream/cluster_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class MockClusterInfo : public ClusterInfo {

// Upstream::ClusterInfo
MOCK_CONST_METHOD0(connectTimeout, std::chrono::milliseconds());
MOCK_CONST_METHOD0(perConnectionBufferLimitBytes, uint32_t());
MOCK_CONST_METHOD0(features, uint64_t());
MOCK_CONST_METHOD0(httpCodecOptions, uint64_t());
MOCK_CONST_METHOD0(lbType, LoadBalancerType());
Expand Down
35 changes: 35 additions & 0 deletions test/server/configuration_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,41 @@ TEST(ConfigurationImplTest, VerifySubjectAltNameConfig) {
EXPECT_TRUE(config.listeners().back()->sslContext() != nullptr);
}

TEST(ConfigurationImplTest, SetUpstreamClusterPerConnectionBufferLimit) {
std::string json = R"EOF(
{
"listeners" : [],
"cluster_manager": {
"clusters": [
{
"name": "test_cluster",
"type": "static",
"connect_timeout_ms": 1,
"per_connection_buffer_limit_bytes": 8192,
"lb_type": "round_robin",
"hosts": []
}
]
}
}
)EOF";

Json::ObjectPtr loader = Json::Factory::LoadFromString(json);

NiceMock<Server::MockInstance> server;
MainImpl config(server);
config.initialize(*loader);

ASSERT_EQ(1U, config.clusterManager().clusters().count("test_cluster"));
EXPECT_EQ(8192U, config.clusterManager()
.clusters()
.find("test_cluster")
->second.get()
.info()
->perConnectionBufferLimitBytes());
server.thread_local_.shutdownThread();
}

TEST(ConfigurationImplTest, BadListenerConfig) {
std::string json = R"EOF(
{
Expand Down

0 comments on commit c44ee0d

Please sign in to comment.