Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream: Introducing close_connections_on_host_set_change property #7675

Merged
merged 19 commits into from
Aug 23, 2019
Merged
4 changes: 4 additions & 0 deletions api/envoy/api/v2/cds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,10 @@ message Cluster {
// If panic mode is triggered, new hosts are still eligible for traffic; they simply do not
// contribute to the calculation when deciding whether panic mode is enabled or not.
bool ignore_new_hosts_until_first_hc = 5;

// If set to `true`, the cluster manager will drain all existing
// connections to upstream hosts whenever hosts are added or removed from the cluster.
bool close_connections_on_host_set_change = 6;
}

// Common configuration for all load balancer implementations.
Expand Down
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Version history
certificate validation context.
* upstream: added network filter chains to upstream connections, see :ref:`filters<envoy_api_field_Cluster.filters>`.
* upstream: use p2c to select hosts for least-requests load balancers if all host weights are the same, even in cases where weights are not equal to 1.
* upstream: added :ref:`an option <envoy_api_field_Cluster.CommonLbConfig.close_connections_on_host_set_change>` that allows draining HTTP, TCP connection pools on cluster membership change.
* zookeeper: parse responses and emit latency stats.

1.11.1 (August 13, 2019)
Expand Down
25 changes: 17 additions & 8 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,21 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) {
// Now setup for cross-thread updates.
cluster.prioritySet().addMemberUpdateCb(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed this on my last pass: this callback only triggers when hosts are added or removed. For example, a change to host weights would not trigger this callback, instead only triggering callbacks added with addPriorityUpdateCb for the appropriate priority. Not sure if this is intended or not, just pointing it out.

[&cluster, this](const HostVector&, const HostVector& hosts_removed) -> void {
// TODO(snowp): Should this be subject to merge windows?

// Whenever hosts are removed from the cluster, we make each TLS cluster drain it's
// connection pools for the removed hosts.
if (!hosts_removed.empty()) {
postThreadLocalHostRemoval(cluster, hosts_removed);
if (cluster.info()->lbConfig().close_connections_on_host_set_change()) {
for (const auto& host_set : cluster.prioritySet().hostSetsPerPriority()) {
// This will drain all tcp and http connection pools.
postThreadLocalDrainConnections(cluster, host_set->hosts());
}
} else {
// TODO(snowp): Should this be subject to merge windows?

// Whenever hosts are removed from the cluster, we make each TLS cluster drain it's
// connection pools for the removed hosts. If `close_connections_on_host_set_change` is
// enabled, this case will be covered by first `if` statement, where all
// connection pools are drained.
if (!hosts_removed.empty()) {
postThreadLocalDrainConnections(cluster, hosts_removed);
}
}
});

Expand Down Expand Up @@ -711,8 +720,8 @@ Tcp::ConnectionPool::Instance* ClusterManagerImpl::tcpConnPoolForCluster(
return entry->second->tcpConnPool(priority, context, transport_socket_options);
}

void ClusterManagerImpl::postThreadLocalHostRemoval(const Cluster& cluster,
const HostVector& hosts_removed) {
void ClusterManagerImpl::postThreadLocalDrainConnections(const Cluster& cluster,
const HostVector& hosts_removed) {
tls_->runOnAllThreads([this, name = cluster.info()->name(), hosts_removed]() {
ThreadLocalClusterManagerImpl::removeHosts(name, hosts_removed, *tls_);
});
Expand Down
3 changes: 2 additions & 1 deletion source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
std::size_t warmingClusterCount() const override { return warming_clusters_.size(); }

protected:
virtual void postThreadLocalHostRemoval(const Cluster& cluster, const HostVector& hosts_removed);
virtual void postThreadLocalDrainConnections(const Cluster& cluster,
const HostVector& hosts_removed);
virtual void postThreadLocalClusterUpdate(const Cluster& cluster, uint32_t priority,
const HostVector& hosts_added,
const HostVector& hosts_removed);
Expand Down
191 changes: 190 additions & 1 deletion test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class MockedUpdatedClusterManagerImpl : public TestClusterManagerImpl {
local_cluster_update_.post(priority, hosts_added, hosts_removed);
}

void postThreadLocalHostRemoval(const Cluster&, const HostVector& hosts_removed) override {
void postThreadLocalDrainConnections(const Cluster&, const HostVector& hosts_removed) override {
local_hosts_removed_.post(hosts_removed);
}

Expand Down Expand Up @@ -3321,6 +3321,195 @@ TEST_F(TcpKeepaliveTest, TcpKeepaliveWithAllOptions) {
expectSetsockoptSoKeepalive(7, 4, 1);
}

TEST_F(ClusterManagerImplTest, ConnPoolsDrainedOnHostSetChange) {
const std::string yaml = R"EOF(
static_resources:
clusters:
- name: cluster_1
connect_timeout: 0.250s
lb_policy: ROUND_ROBIN
type: STATIC
common_lb_config:
close_connections_on_host_set_change: true
)EOF";

ReadyWatcher initialized;
EXPECT_CALL(initialized, ready());

create(parseBootstrapFromV2Yaml(yaml));

// Set up for an initialize callback.
cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); });

std::unique_ptr<MockClusterUpdateCallbacks> callbacks(new NiceMock<MockClusterUpdateCallbacks>());
ClusterUpdateCallbacksHandlePtr cb =
cluster_manager_->addThreadLocalClusterUpdateCallbacks(*callbacks);

EXPECT_FALSE(cluster_manager_->get("cluster_1")->info()->addedViaApi());

// Verify that we get no hosts when the HostSet is empty.
EXPECT_EQ(nullptr, cluster_manager_->httpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, Http::Protocol::Http11, nullptr));
EXPECT_EQ(nullptr, cluster_manager_->tcpConnPoolForCluster("cluster_1", ResourcePriority::Default,
nullptr, nullptr));
EXPECT_EQ(nullptr,
cluster_manager_->tcpConnForCluster("cluster_1", nullptr, nullptr).connection_);

Cluster& cluster = cluster_manager_->activeClusters().begin()->second;

// Set up the HostSet.
HostSharedPtr host1 = makeTestHost(cluster.info(), "tcp://127.0.0.1:80");
HostSharedPtr host2 = makeTestHost(cluster.info(), "tcp://127.0.0.1:81");

HostVector hosts{host1, host2};
auto hosts_ptr = std::make_shared<HostVector>(hosts);

// Sending non-mergeable updates.
cluster.prioritySet().updateHosts(
0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr, hosts, {},
100);

EXPECT_EQ(1, factory_.stats_.counter("cluster_manager.cluster_updated").value());
EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.cluster_updated_via_merge").value());
EXPECT_EQ(0, factory_.stats_.counter("cluster_manager.update_merge_cancelled").value());

EXPECT_CALL(factory_, allocateConnPool_(_, _))
.Times(3)
.WillRepeatedly(ReturnNew<Http::ConnectionPool::MockInstance>());

EXPECT_CALL(factory_, allocateTcpConnPool_(_))
.Times(3)
.WillRepeatedly(ReturnNew<Tcp::ConnectionPool::MockInstance>());

// This should provide us a CP for each of the above hosts.
Http::ConnectionPool::MockInstance* cp1 =
dynamic_cast<Http::ConnectionPool::MockInstance*>(cluster_manager_->httpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, Http::Protocol::Http11, nullptr));
// Create persistent connection for host2.
Http::ConnectionPool::MockInstance* cp2 =
dynamic_cast<Http::ConnectionPool::MockInstance*>(cluster_manager_->httpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, Http::Protocol::Http2, nullptr));

Tcp::ConnectionPool::MockInstance* tcp1 =
dynamic_cast<Tcp::ConnectionPool::MockInstance*>(cluster_manager_->tcpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, nullptr, nullptr));

Tcp::ConnectionPool::MockInstance* tcp2 =
dynamic_cast<Tcp::ConnectionPool::MockInstance*>(cluster_manager_->tcpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, nullptr, nullptr));

EXPECT_NE(cp1, cp2);
EXPECT_NE(tcp1, tcp2);

EXPECT_CALL(*cp2, addDrainedCallback(_))
.WillOnce(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); }));

EXPECT_CALL(*cp1, addDrainedCallback(_))
.WillOnce(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); }));

EXPECT_CALL(*tcp1, addDrainedCallback(_))
.WillOnce(Invoke([](Tcp::ConnectionPool::Instance::DrainedCb cb) { cb(); }));

EXPECT_CALL(*tcp2, addDrainedCallback(_))
.WillOnce(Invoke([](Tcp::ConnectionPool::Instance::DrainedCb cb) { cb(); }));

HostVector hosts_removed;
hosts_removed.push_back(host2);

// This update should drain all connection pools (host1, host2).
cluster.prioritySet().updateHosts(
0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr, {},
hosts_removed, 100);

// Recreate connection pool for host1.
cp1 = dynamic_cast<Http::ConnectionPool::MockInstance*>(cluster_manager_->httpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, Http::Protocol::Http11, nullptr));

tcp1 = dynamic_cast<Tcp::ConnectionPool::MockInstance*>(cluster_manager_->tcpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, nullptr, nullptr));

HostSharedPtr host3 = makeTestHost(cluster.info(), "tcp://127.0.0.1:82");

HostVector hosts_added;
hosts_added.push_back(host3);

EXPECT_CALL(*cp1, addDrainedCallback(_))
.WillOnce(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); }));

EXPECT_CALL(*tcp1, addDrainedCallback(_))
.WillOnce(Invoke([](Tcp::ConnectionPool::Instance::DrainedCb cb) { cb(); }));

// Adding host3 should drain connection pool for host1.
cluster.prioritySet().updateHosts(
0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr,
hosts_added, {}, 100);
}

TEST_F(ClusterManagerImplTest, ConnPoolsNotDrainedOnHostSetChange) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to have some common code for this test and the previous test since I'm guessing they are mostly the same? Also I wonder if the not draining case is already well covered but I'm fine either way.

Copy link
Member Author

@nezdolik nezdolik Aug 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattklein123 don't think is worth the effort to introduce common code/method to share between two tests, since that code would be very specific to those 2 tests. Think is nice to still have 2 tests, to test when property is set vs not set. Could try to reduce the code size of second test (basically we want to ensure that connections are not drained on host addition). WDYT?

Copy link
Member Author

@nezdolik nezdolik Aug 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies, i have to do force push to fix DCO (DCO broke after committing suggestions). Reduced test size in last commit, waiting for opinions.

const std::string yaml = R"EOF(
static_resources:
clusters:
- name: cluster_1
connect_timeout: 0.250s
lb_policy: ROUND_ROBIN
type: STATIC
)EOF";

ReadyWatcher initialized;
EXPECT_CALL(initialized, ready());
create(parseBootstrapFromV2Yaml(yaml));

// Set up for an initialize callback.
cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); });

std::unique_ptr<MockClusterUpdateCallbacks> callbacks(new NiceMock<MockClusterUpdateCallbacks>());
ClusterUpdateCallbacksHandlePtr cb =
cluster_manager_->addThreadLocalClusterUpdateCallbacks(*callbacks);

Cluster& cluster = cluster_manager_->activeClusters().begin()->second;

// Set up the HostSet.
HostSharedPtr host1 = makeTestHost(cluster.info(), "tcp://127.0.0.1:80");

HostVector hosts{host1};
auto hosts_ptr = std::make_shared<HostVector>(hosts);

// Sending non-mergeable updates.
cluster.prioritySet().updateHosts(
0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr, hosts, {},
100);

EXPECT_CALL(factory_, allocateConnPool_(_, _))
.Times(1)
.WillRepeatedly(ReturnNew<Http::ConnectionPool::MockInstance>());

EXPECT_CALL(factory_, allocateTcpConnPool_(_))
.Times(1)
.WillRepeatedly(ReturnNew<Tcp::ConnectionPool::MockInstance>());

// This should provide us a CP for each of the above hosts.
Http::ConnectionPool::MockInstance* cp1 =
dynamic_cast<Http::ConnectionPool::MockInstance*>(cluster_manager_->httpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, Http::Protocol::Http11, nullptr));

Tcp::ConnectionPool::MockInstance* tcp1 =
dynamic_cast<Tcp::ConnectionPool::MockInstance*>(cluster_manager_->tcpConnPoolForCluster(
"cluster_1", ResourcePriority::Default, nullptr, nullptr));

HostSharedPtr host2 = makeTestHost(cluster.info(), "tcp://127.0.0.1:82");
HostVector hosts_added;
hosts_added.push_back(host2);

// No connection pools should be drained.
EXPECT_CALL(*cp1, drainConnections()).Times(0);
EXPECT_CALL(*tcp1, drainConnections()).Times(0);

// No connection pools should be drained.
cluster.prioritySet().updateHosts(
0, HostSetImpl::partitionHosts(hosts_ptr, HostsPerLocalityImpl::empty()), nullptr,
hosts_added, {}, 100);
}

} // namespace
} // namespace Upstream
} // namespace Envoy