diff --git a/envoy/upstream/cluster_manager.h b/envoy/upstream/cluster_manager.h index 0bb896daeacc..a28e19c9ac8c 100644 --- a/envoy/upstream/cluster_manager.h +++ b/envoy/upstream/cluster_manager.h @@ -369,15 +369,6 @@ class ClusterManager { */ virtual Config::SubscriptionFactory& subscriptionFactory() PURE; - /** - * Obtain multiplexed subscription factory for the cluster manager. - * This factory shares mux per management server per xds resource type which reduces number of - * concurrent active grpc streams. - * - * @return Config::SubscriptionFactory& multiplexed subscription factory. - */ - virtual Config::SubscriptionFactory& multiplexedSubscriptionFactory() PURE; - /** * Returns a struct with all the Stats::StatName objects needed by * Clusters. This helps factor out some relatively heavy name diff --git a/source/common/config/subscription_factory_impl.cc b/source/common/config/subscription_factory_impl.cc index 818d7c655b63..d08dce8d3efa 100644 --- a/source/common/config/subscription_factory_impl.cc +++ b/source/common/config/subscription_factory_impl.cc @@ -3,6 +3,7 @@ #include "envoy/config/core/v3/config_source.pb.h" #include "envoy/config/xds_resources_delegate.h" +#include "source/common/config/custom_config_validators_impl.h" #include "source/common/config/filesystem_subscription_impl.h" #include "source/common/config/grpc_mux_impl.h" #include "source/common/config/grpc_subscription_impl.h" @@ -71,18 +72,66 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource( Utility::apiConfigSourceRequestTimeout(api_config_source), restMethod(type_url), type_url, callbacks, resource_decoder, stats, Utility::configSourceInitialFetchTimeout(config), validation_visitor_); - case envoy::config::core::v3::ApiConfigSource::GRPC: - case envoy::config::core::v3::ApiConfigSource::DELTA_GRPC: { + case envoy::config::core::v3::ApiConfigSource::GRPC: { + GrpcMuxSharedPtr mux; CustomConfigValidatorsPtr custom_config_validators = std::make_unique(validation_visitor_, server_, api_config_source.config_validators()); - GrpcMuxSharedPtr mux = - getOrCreateMux(api_config_source, type_url, scope, custom_config_validators); + const std::string control_plane_id = + Utility::getGrpcControlPlane(api_config_source).value_or(""); + + if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) { + mux = std::make_shared( + Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source, + scope, true) + ->createUncachedRawAsyncClient(), + dispatcher_, sotwGrpcMethod(type_url), api_.randomGenerator(), scope, + Utility::parseRateLimitSettings(api_config_source), local_info_, + api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), + xds_config_tracker_, xds_resources_delegate_, control_plane_id); + } else { + mux = std::make_shared( + local_info_, + Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source, + scope, true) + ->createUncachedRawAsyncClient(), + dispatcher_, sotwGrpcMethod(type_url), api_.randomGenerator(), scope, + Utility::parseRateLimitSettings(api_config_source), + api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), + xds_config_tracker_, xds_resources_delegate_, control_plane_id); + } return std::make_unique( std::move(mux), callbacks, resource_decoder, stats, type_url, dispatcher_, Utility::configSourceInitialFetchTimeout(config), /*is_aggregated*/ false, options); } + case envoy::config::core::v3::ApiConfigSource::DELTA_GRPC: { + GrpcMuxSharedPtr mux; + CustomConfigValidatorsPtr custom_config_validators = + std::make_unique(validation_visitor_, server_, + api_config_source.config_validators()); + if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) { + mux = std::make_shared( + Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source, + scope, true) + ->createUncachedRawAsyncClient(), + dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope, + Utility::parseRateLimitSettings(api_config_source), local_info_, + api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), + xds_config_tracker_); + } else { + mux = std::make_shared( + Config::Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), + api_config_source, scope, true) + ->createUncachedRawAsyncClient(), + dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope, + Utility::parseRateLimitSettings(api_config_source), local_info_, + std::move(custom_config_validators), xds_config_tracker_); + } + return std::make_unique( + std::move(mux), callbacks, resource_decoder, stats, type_url, dispatcher_, + Utility::configSourceInitialFetchTimeout(config), /*is_aggregated*/ false, options); + } } throw EnvoyException("Invalid API config source API type"); } @@ -103,6 +152,7 @@ SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl( Stats::Scope& scope, SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder) { SubscriptionStats stats = Utility::generateStats(scope); + switch (collection_locator.scheme()) { case xds::core::v3::ResourceLocator::FILE: { const std::string path = Http::Utility::localPathFromFilePath(collection_locator.id()); @@ -178,61 +228,5 @@ SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl( } } -GrpcMuxSharedPtr SubscriptionFactoryImpl::getOrCreateMux( - const envoy::config::core::v3::ApiConfigSource& api_config_source, absl::string_view type_url, - Stats::Scope& scope, CustomConfigValidatorsPtr& custom_config_validators) { - GrpcMuxSharedPtr mux; - switch (api_config_source.api_type()) { - case envoy::config::core::v3::ApiConfigSource::GRPC: { - const std::string control_plane_id = - Utility::getGrpcControlPlane(api_config_source).value_or(""); - if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) { - mux = std::make_shared( - Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source, - scope, true) - ->createUncachedRawAsyncClient(), - dispatcher_, sotwGrpcMethod(type_url), api_.randomGenerator(), scope, - Utility::parseRateLimitSettings(api_config_source), local_info_, - api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), - xds_config_tracker_, xds_resources_delegate_, control_plane_id); - } else { - mux = std::make_shared( - local_info_, - Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source, - scope, true) - ->createUncachedRawAsyncClient(), - dispatcher_, sotwGrpcMethod(type_url), api_.randomGenerator(), scope, - Utility::parseRateLimitSettings(api_config_source), - api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), - xds_config_tracker_, xds_resources_delegate_, control_plane_id); - } - break; - } - case envoy::config::core::v3::ApiConfigSource::DELTA_GRPC: { - if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) { - mux = std::make_shared( - Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source, - scope, true) - ->createUncachedRawAsyncClient(), - dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope, - Utility::parseRateLimitSettings(api_config_source), local_info_, - api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators), - xds_config_tracker_); - } else { - mux = std::make_shared( - Config::Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), - api_config_source, scope, true) - ->createUncachedRawAsyncClient(), - dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope, - Utility::parseRateLimitSettings(api_config_source), local_info_, - std::move(custom_config_validators), xds_config_tracker_); - } - break; - } - default: - throw EnvoyException("Unsupported api type in api config source, cannot create GRPC mux."); - } - return mux; -} } // namespace Config } // namespace Envoy diff --git a/source/common/config/subscription_factory_impl.h b/source/common/config/subscription_factory_impl.h index eebbd5c6929b..2f37e08408e7 100644 --- a/source/common/config/subscription_factory_impl.h +++ b/source/common/config/subscription_factory_impl.h @@ -12,7 +12,6 @@ #include "envoy/upstream/cluster_manager.h" #include "source/common/common/logger.h" -#include "source/common/config/custom_config_validators_impl.h" namespace Envoy { namespace Config { @@ -39,12 +38,6 @@ class SubscriptionFactoryImpl : public SubscriptionFactory, Logger::Loggable( - local_info, main_thread_dispatcher, *this, validation_context.dynamicValidationVisitor(), api, - server, makeOptRefFromPtr(xds_resources_delegate_.get()), - makeOptRefFromPtr(xds_config_tracker_.get())); - const auto& dyn_resources = bootstrap.dynamic_resources(); // Cluster loading happens in two phases: first all the primary clusters are loaded, and then all diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 0164af14f658..584e08709b33 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -37,7 +37,6 @@ #include "source/common/upstream/cluster_discovery_manager.h" #include "source/common/upstream/host_utility.h" #include "source/common/upstream/load_stats_reporter.h" -#include "source/common/upstream/multiplexed_subscription_factory.h" #include "source/common/upstream/od_cds_api_impl.h" #include "source/common/upstream/priority_conn_pool_map.h" #include "source/common/upstream/upstream_impl.h" @@ -329,10 +328,6 @@ class ClusterManagerImpl : public ClusterManager, Config::SubscriptionFactory& subscriptionFactory() override { return *subscription_factory_; } - Config::SubscriptionFactory& multiplexedSubscriptionFactory() override { - return *multiplexed_subscription_factory_; - } - void initializeSecondaryClusters(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) override; @@ -817,7 +812,6 @@ class ClusterManagerImpl : public ClusterManager, ClusterTimeoutBudgetStatNames cluster_timeout_budget_stat_names_; std::unique_ptr subscription_factory_; - std::unique_ptr multiplexed_subscription_factory_; ClusterSet primary_clusters_; std::unique_ptr xds_resources_delegate_; diff --git a/source/common/upstream/multiplexed_subscription_factory.cc b/source/common/upstream/multiplexed_subscription_factory.cc deleted file mode 100644 index 7687c556ed3d..000000000000 --- a/source/common/upstream/multiplexed_subscription_factory.cc +++ /dev/null @@ -1,39 +0,0 @@ -#include "source/common/upstream/multiplexed_subscription_factory.h" - -#include "source/common/common/hash.h" -#include "source/common/config/utility.h" - -namespace Envoy { -namespace Upstream { - -MultiplexedSubscriptionFactory::MultiplexedSubscriptionFactory( - const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher, - Upstream::ClusterManager& cm, ProtobufMessage::ValidationVisitor& validation_visitor, - Api::Api& api, const Server::Instance& server, - Config::XdsResourcesDelegateOptRef xds_resources_delegate, - Config::XdsConfigTrackerOptRef xds_config_tracker) - : Config::SubscriptionFactoryImpl(local_info, dispatcher, cm, validation_visitor, api, server, - xds_resources_delegate, xds_config_tracker){}; - -Config::GrpcMuxSharedPtr MultiplexedSubscriptionFactory::getOrCreateMux( - const envoy::config::core::v3::ApiConfigSource& config_source, absl::string_view type_url, - Stats::Scope& scope, Config::CustomConfigValidatorsPtr& custom_config_validators) { - if (config_source.api_type() == envoy::config::core::v3::ApiConfigSource::GRPC || - config_source.api_type() == envoy::config::core::v3::ApiConfigSource::DELTA_GRPC) { - const uint64_t xds_server_hash = MessageUtil::hash(config_source.grpc_services(0)); - const uint64_t xds_type_hash = HashUtil::xxHash64(type_url); - const uint64_t mux_key = xds_server_hash ^ xds_type_hash; - if (muxes_.find(mux_key) == muxes_.end()) { - muxes_.emplace( - std::make_pair(mux_key, Config::SubscriptionFactoryImpl::getOrCreateMux( - config_source, type_url, scope, custom_config_validators))); - } - return muxes_.at(mux_key); - } else { - return Config::SubscriptionFactoryImpl::getOrCreateMux(config_source, type_url, scope, - custom_config_validators); - } -} - -} // namespace Upstream -} // namespace Envoy diff --git a/source/common/upstream/multiplexed_subscription_factory.h b/source/common/upstream/multiplexed_subscription_factory.h deleted file mode 100644 index 444fced8b79c..000000000000 --- a/source/common/upstream/multiplexed_subscription_factory.h +++ /dev/null @@ -1,51 +0,0 @@ -#pragma once - -#include "envoy/config/core/v3/base.pb.h" -#include "envoy/config/subscription.h" -#include "envoy/stats/scope.h" - -#include "source/common/common/assert.h" -#include "source/common/config/custom_config_validators_impl.h" -#include "source/common/config/grpc_mux_impl.h" -#include "source/common/config/grpc_subscription_impl.h" -#include "source/common/config/subscription_factory_impl.h" -#include "source/common/config/utility.h" -#include "source/common/protobuf/protobuf.h" -#include "source/server/transport_socket_config_impl.h" - -/** - * MultiplexedSubscriptionFactory is used for instantiation of XDS subscriptions so as to minimize - * the number of open grpc connections used by these subscriptions. This is done by sharing a grpc - * multiplexer between subscriptions handled per subscription server and xds resource type. Please - * see https://github.com/envoyproxy/envoy/issues/2943 for additional information and related - * issues. - * - */ - -namespace Envoy { -namespace Upstream { - -class MultiplexedSubscriptionFactory : public Config::SubscriptionFactoryImpl { -public: - ~MultiplexedSubscriptionFactory() override = default; - - MultiplexedSubscriptionFactory(const LocalInfo::LocalInfo& local_info, - Event::Dispatcher& dispatcher, Upstream::ClusterManager& cm, - ProtobufMessage::ValidationVisitor& validation_visitor, - Api::Api& api, const Server::Instance& server, - Config::XdsResourcesDelegateOptRef xds_resources_delegate, - Config::XdsConfigTrackerOptRef xds_config_tracker); - -protected: - // Config::SubscriptionFactoryImpl - Config::GrpcMuxSharedPtr - getOrCreateMux(const envoy::config::core::v3::ApiConfigSource& api_config_source, - absl::string_view type_url, Stats::Scope& scope, - Config::CustomConfigValidatorsPtr& custom_config_validators) override; - -private: - absl::flat_hash_map muxes_; - friend class MultiplexedSubscriptionFactoryPeer; -}; -} // namespace Upstream -} // namespace Envoy diff --git a/source/extensions/clusters/eds/BUILD b/source/extensions/clusters/eds/BUILD index 75ecfcac7d2d..cbdbbd402395 100644 --- a/source/extensions/clusters/eds/BUILD +++ b/source/extensions/clusters/eds/BUILD @@ -34,7 +34,6 @@ envoy_cc_extension( "//source/common/network:utility_lib", "//source/common/protobuf:utility_lib", "//source/common/upstream:cluster_factory_lib", - "//source/common/upstream:multiplexed_subscription_factory_lib", "//source/common/upstream:upstream_includes", "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", diff --git a/source/extensions/clusters/eds/eds.cc b/source/extensions/clusters/eds/eds.cc index 16e05c1f3f23..cb44c83bc536 100644 --- a/source/extensions/clusters/eds/eds.cc +++ b/source/extensions/clusters/eds/eds.cc @@ -9,7 +9,6 @@ #include "source/common/common/utility.h" #include "source/common/config/api_version.h" #include "source/common/config/decoded_resource_impl.h" -#include "source/common/runtime/runtime_features.h" namespace Envoy { namespace Upstream { @@ -38,23 +37,10 @@ EdsClusterImpl::EdsClusterImpl( initialize_phase_ = InitializePhase::Secondary; } const auto resource_name = getResourceName(); - if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.multiplex_eds") && - (eds_config.api_config_source().api_type() == - envoy::config::core::v3::ApiConfigSource::GRPC || - eds_config.api_config_source().api_type() == - envoy::config::core::v3::ApiConfigSource::DELTA_GRPC)) { - ENVOY_LOG(trace, "Multiplexing EDS updates over single stream for cluster ", cluster_name_); - subscription_ = - factory_context.clusterManager() - .multiplexedSubscriptionFactory() - .subscriptionFromConfigSource(eds_config, Grpc::Common::typeUrl(resource_name), - info_->statsScope(), *this, resource_decoder_, {}); - } else { - subscription_ = - factory_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource( - eds_config, Grpc::Common::typeUrl(resource_name), info_->statsScope(), *this, - resource_decoder_, {}); - } + subscription_ = + factory_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource( + eds_config, Grpc::Common::typeUrl(resource_name), info_->statsScope(), *this, + resource_decoder_, {}); } void EdsClusterImpl::startPreInit() { subscription_->start({cluster_name_}); } diff --git a/source/extensions/clusters/eds/eds.h b/source/extensions/clusters/eds/eds.h index c9aca23ea9d6..57adb83c7aaa 100644 --- a/source/extensions/clusters/eds/eds.h +++ b/source/extensions/clusters/eds/eds.h @@ -17,7 +17,6 @@ #include "source/common/config/subscription_base.h" #include "source/common/upstream/cluster_factory_impl.h" -#include "source/common/upstream/multiplexed_subscription_factory.h" #include "source/common/upstream/upstream_impl.h" #include "source/extensions/clusters/eds/leds.h" diff --git a/test/common/config/subscription_factory_impl_test.cc b/test/common/config/subscription_factory_impl_test.cc index 5ed43a60b8ff..6cd037e0fef9 100644 --- a/test/common/config/subscription_factory_impl_test.cc +++ b/test/common/config/subscription_factory_impl_test.cc @@ -108,28 +108,6 @@ TEST_F(SubscriptionFactoryTest, NoConfigSpecifier) { "Missing config source specifier in envoy::config::core::v3::ConfigSource"); } -TEST_F(SubscriptionFactoryTest, UnsupportedConfigSourceAggregatedGrpc) { - envoy::config::core::v3::ConfigSource config; - Upstream::ClusterManager::ClusterSet primary_clusters; - config.mutable_api_config_source()->set_api_type( - envoy::config::core::v3::ApiConfigSource::AGGREGATED_GRPC); - config.mutable_api_config_source()->set_transport_api_version(envoy::config::core::v3::V3); - EXPECT_CALL(cm_, primaryClusters()).WillOnce(ReturnRef(primary_clusters)); - EXPECT_THROW_WITH_MESSAGE(subscriptionFromConfigSource(config), EnvoyException, - "Unsupported config source AGGREGATED_GRPC"); -} - -TEST_F(SubscriptionFactoryTest, UnsupportedConfigSourceAggregatedDeltaGrpc) { - envoy::config::core::v3::ConfigSource config; - Upstream::ClusterManager::ClusterSet primary_clusters; - config.mutable_api_config_source()->set_api_type( - envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC); - config.mutable_api_config_source()->set_transport_api_version(envoy::config::core::v3::V3); - EXPECT_CALL(cm_, primaryClusters()).WillOnce(ReturnRef(primary_clusters)); - EXPECT_THROW_WITH_MESSAGE(subscriptionFromConfigSource(config), EnvoyException, - "Unsupported config source AGGREGATED_DELTA_GRPC"); -} - TEST_F(SubscriptionFactoryTest, RestClusterEmpty) { envoy::config::core::v3::ConfigSource config; Upstream::ClusterManager::ClusterSet primary_clusters; diff --git a/test/common/grpc/grpc_client_integration.h b/test/common/grpc/grpc_client_integration.h index f22a30ee4145..84abeda51cf3 100644 --- a/test/common/grpc/grpc_client_integration.h +++ b/test/common/grpc/grpc_client_integration.h @@ -18,8 +18,6 @@ enum class SotwOrDelta { Sotw, Delta, UnifiedSotw, UnifiedDelta }; // Unified or Legacy grpc mux implementation enum class LegacyOrUnified { Legacy, Unified }; -enum class EdsUpdateMode { Multiplexed, StreamPerCluster }; - class BaseGrpcClientIntegrationParamTest { public: virtual ~BaseGrpcClientIntegrationParamTest() = default; @@ -123,27 +121,6 @@ class DeltaSotwIntegrationParamTest SotwOrDelta sotwOrDelta() const { return std::get<2>(GetParam()); } }; -class MultiplexedDeltaSotwIntegrationParamTest - : public BaseGrpcClientIntegrationParamTest, - public testing::TestWithParam< - std::tuple> { -public: - ~MultiplexedDeltaSotwIntegrationParamTest() override = default; - static std::string protocolTestParamsToString( - const ::testing::TestParamInfo< - std::tuple>& p) { - return fmt::format( - "{}_{}_{}_{}", std::get<0>(p.param) == Network::Address::IpVersion::v4 ? "IPv4" : "IPv6", - std::get<1>(p.param) == ClientType::GoogleGrpc ? "GoogleGrpc" : "EnvoyGrpc", - std::get<2>(p.param) == SotwOrDelta::Delta ? "Delta" : "StateOfTheWorld", - std::get<3>(p.param) == EdsUpdateMode::Multiplexed ? "Multiplexed" : "StreamPerCluster"); - } - Network::Address::IpVersion ipVersion() const override { return std::get<0>(GetParam()); } - ClientType clientType() const override { return std::get<1>(GetParam()); } - SotwOrDelta sotwOrDelta() const { return std::get<2>(GetParam()); } - EdsUpdateMode edsUpdateMode() const { return std::get<3>(GetParam()); } -}; - // Skip tests based on gRPC client type. #define SKIP_IF_GRPC_CLIENT(client_type) \ if (clientType() == (client_type)) { \ @@ -169,12 +146,6 @@ class MultiplexedDeltaSotwIntegrationParamTest testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), \ testing::ValuesIn(TestEnvironment::getsGrpcVersionsForTest()), \ testing::Values(Grpc::LegacyOrUnified::Legacy, Grpc::LegacyOrUnified::Unified)) -#define EDS_MODE_DELTA_SOTW_GRPC_CLIENT_INTEGRATION_PARAMS \ - testing::Combine( \ - testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), \ - testing::ValuesIn(TestEnvironment::getsGrpcVersionsForTest()), \ - testing::Values(Grpc::SotwOrDelta::Sotw, Grpc::SotwOrDelta::Delta), \ - testing::Values(Grpc::EdsUpdateMode::Multiplexed, Grpc::EdsUpdateMode::StreamPerCluster)) #define DELTA_SOTW_UNIFIED_GRPC_CLIENT_INTEGRATION_PARAMS \ testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), \ testing::ValuesIn(TestEnvironment::getsGrpcVersionsForTest()), \ diff --git a/test/common/upstream/BUILD b/test/common/upstream/BUILD index ce7344cca6c4..6d5162d2d9ad 100644 --- a/test/common/upstream/BUILD +++ b/test/common/upstream/BUILD @@ -138,23 +138,6 @@ envoy_cc_test( deps = ["//source/common/upstream:scheduler_lib"], ) -envoy_cc_test( - name = "multiplexed_subscription_factory_test", - srcs = ["multiplexed_subscription_factory_test.cc"], - deps = [ - "//source/common/upstream:multiplexed_subscription_factory_lib", - "//test/mocks/api:api_mocks", - "//test/mocks/config:config_mocks", - "//test/mocks/config:custom_config_validators_mocks", - "//test/mocks/local_info:local_info_mocks", - "//test/mocks/protobuf:protobuf_mocks", - "//test/mocks/runtime:runtime_mocks", - "//test/mocks/server:server_mocks", - "//test/mocks/stats:stats_mocks", - "//test/mocks/upstream:upstream_mocks", - ], -) - envoy_cc_test_library( name = "health_check_fuzz_utils_lib", srcs = [ diff --git a/test/common/upstream/multiplexed_subscription_factory_test.cc b/test/common/upstream/multiplexed_subscription_factory_test.cc deleted file mode 100644 index 944f53ba303f..000000000000 --- a/test/common/upstream/multiplexed_subscription_factory_test.cc +++ /dev/null @@ -1,257 +0,0 @@ -#include - -#include "envoy/common/exception.h" -#include "envoy/stats/scope.h" - -#include "source/common/upstream/multiplexed_subscription_factory.h" - -#include "test/mocks/api/mocks.h" -#include "test/mocks/config/custom_config_validators.h" -#include "test/mocks/config/mocks.h" -#include "test/mocks/local_info/mocks.h" -#include "test/mocks/protobuf/mocks.h" -#include "test/mocks/runtime/mocks.h" -#include "test/mocks/server/mocks.h" -#include "test/mocks/stats/mocks.h" -#include "test/mocks/upstream/mocks.h" - -#include "gmock/gmock.h" -#include "gtest/gtest.h" - -using ::testing::_; -using ::testing::Invoke; -using ::testing::ReturnRef; - -namespace Envoy { -namespace Upstream { - -class MultiplexedSubscriptionFactoryPeer { -public: - static size_t optimizedMuxesSize(MultiplexedSubscriptionFactory& factory) { - return factory.muxes_.size(); - } -}; - -class MultiplexedSubscriptionFactoryForTesting : public MultiplexedSubscriptionFactory { -public: - MultiplexedSubscriptionFactoryForTesting( - const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher, - Upstream::ClusterManager& cm, Api::Api& api, - ProtobufMessage::ValidationVisitor& validation_visitor, const Server::Instance& server, - Config::XdsResourcesDelegateOptRef xds_resources_delegate, - Config::XdsConfigTrackerOptRef xds_config_tracker) - : MultiplexedSubscriptionFactory(local_info, dispatcher, cm, validation_visitor, api, server, - xds_resources_delegate, xds_config_tracker){}; - - Config::GrpcMuxSharedPtr - testGetOrCreateMux(const envoy::config::core::v3::ApiConfigSource& api_config_source, - absl::string_view type_url, Stats::Scope& scope, - Config::CustomConfigValidatorsPtr& custom_config_validators) { - return MultiplexedSubscriptionFactory::getOrCreateMux(api_config_source, type_url, scope, - custom_config_validators); - } -}; - -class MultiplexedSubscriptionFactoryTest - : public ::testing::TestWithParam { -public: - MultiplexedSubscriptionFactoryTest() - : type_url_("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"), - resource_decoder_(std::make_shared>()){}; - - NiceMock cm_; - Event::MockDispatcher dispatcher_; - NiceMock random_; - NiceMock callbacks_; - Stats::MockIsolatedStatsStore stats_store_; - Stats::Scope& stats_scope_{*stats_store_.rootScope()}; - NiceMock api_; - NiceMock local_info_; - Grpc::MockAsyncClient* async_client_; - const std::string type_url_; - Config::RateLimitSettings rate_limit_settings_; - NiceMock server_; - NiceMock validation_visitor_; - Config::OpaqueResourceDecoderSharedPtr resource_decoder_; - NiceMock xds_resources_delegate_; - NiceMock xds_config_tracker_; -}; - -using MultiplexedSubscriptionFactoryForGrpcTest = MultiplexedSubscriptionFactoryTest; - -INSTANTIATE_TEST_SUITE_P(GrpcApiConfigSource, MultiplexedSubscriptionFactoryForGrpcTest, - ::testing::Values(envoy::config::core::v3::ApiConfigSource::GRPC, - envoy::config::core::v3::ApiConfigSource::DELTA_GRPC)); - -// Verify the same mux instance is returned for same config source. -TEST_P(MultiplexedSubscriptionFactoryForGrpcTest, ShouldReturnSameMuxForSameConfigSource) { - envoy::config::core::v3::ConfigSource config1; - auto config_source = config1.mutable_api_config_source(); - config_source->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name("primary_xds_cluster"); - config_source->set_api_type(GetParam()); - auto factory = MultiplexedSubscriptionFactoryForTesting( - local_info_, dispatcher_, cm_, api_, validation_visitor_, server_, xds_resources_delegate_, - xds_config_tracker_); - - EXPECT_CALL(dispatcher_, createTimer_(_)); - Config::CustomConfigValidatorsPtr config_validators = - std::make_unique>(); - auto first_mux = factory.testGetOrCreateMux(config1.api_config_source(), type_url_, stats_scope_, - config_validators); - config_validators = std::make_unique>(); - auto second_mux = factory.testGetOrCreateMux(config1.api_config_source(), type_url_, stats_scope_, - config_validators); - EXPECT_EQ(first_mux.get(), second_mux.get()); - EXPECT_EQ(1, MultiplexedSubscriptionFactoryPeer::optimizedMuxesSize(factory)); -} - -// Verify the same mux instance is returned when the same management servers are used. -TEST_P(MultiplexedSubscriptionFactoryForGrpcTest, ShouldReturnSameMuxForSameGrpcService) { - envoy::config::core::v3::ConfigSource config1; - auto config_source = config1.mutable_api_config_source(); - config_source->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name("primary_xds_cluster"); - config_source->set_api_type(GetParam()); - envoy::config::core::v3::ConfigSource config2; - config_source = config2.mutable_api_config_source(); - config_source->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name("primary_xds_cluster"); - config_source->set_api_type(GetParam()); - config_source = config2.mutable_api_config_source(); - config_source->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name( - "fallback_xds_cluster"); - auto factory = MultiplexedSubscriptionFactoryForTesting( - local_info_, dispatcher_, cm_, api_, validation_visitor_, server_, xds_resources_delegate_, - xds_config_tracker_); - - EXPECT_CALL(dispatcher_, createTimer_(_)); - Config::CustomConfigValidatorsPtr config_validators = - std::make_unique>(); - auto first_mux = factory.testGetOrCreateMux(config1.api_config_source(), type_url_, stats_scope_, - config_validators); - config_validators = std::make_unique>(); - auto second_mux = factory.testGetOrCreateMux(config1.api_config_source(), type_url_, stats_scope_, - config_validators); - EXPECT_EQ(first_mux.get(), second_mux.get()); - EXPECT_EQ(1, MultiplexedSubscriptionFactoryPeer::optimizedMuxesSize(factory)); -} - -// Verify that a new mux instance is created if a different config_source is used. -TEST_P(MultiplexedSubscriptionFactoryForGrpcTest, ShouldReturnDiffMuxesForDiffXdsServers) { - auto factory = MultiplexedSubscriptionFactoryForTesting( - local_info_, dispatcher_, cm_, api_, validation_visitor_, server_, xds_resources_delegate_, - xds_config_tracker_); - - EXPECT_CALL(dispatcher_, createTimer_(_)).Times(2); - envoy::config::core::v3::ConfigSource first_config; - auto config_source = first_config.mutable_api_config_source(); - config_source->set_api_type(GetParam()); - config_source->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name("first_cluster"); - Config::CustomConfigValidatorsPtr config_validators = - std::make_unique>(); - auto first_mux = factory.testGetOrCreateMux(first_config.api_config_source(), type_url_, - stats_scope_, config_validators); - envoy::config::core::v3::ConfigSource second_config; - config_source = second_config.mutable_api_config_source(); - config_source->set_api_type(GetParam()); - config_source->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name("second_cluster"); - config_validators = std::make_unique>(); - auto second_mux = factory.testGetOrCreateMux(second_config.api_config_source(), type_url_, - stats_scope_, config_validators); - EXPECT_NE(first_mux.get(), second_mux.get()); - EXPECT_EQ(2, MultiplexedSubscriptionFactoryPeer::optimizedMuxesSize(factory)); -} - -// Verify that a new mux instance is created if a different xds method is used. -TEST_P(MultiplexedSubscriptionFactoryForGrpcTest, ShouldReturnDiffMuxesForDiffXdsTypes) { - envoy::config::core::v3::ConfigSource config; - auto config_source = config.mutable_api_config_source(); - config_source->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name("xds_cluster"); - config_source->set_api_type(GetParam()); - auto factory = MultiplexedSubscriptionFactoryForTesting( - local_info_, dispatcher_, cm_, api_, validation_visitor_, server_, xds_resources_delegate_, - xds_config_tracker_); - EXPECT_CALL(dispatcher_, createTimer_(_)).Times(3); - Config::CustomConfigValidatorsPtr config_validators = - std::make_unique>(); - auto first_mux = factory.testGetOrCreateMux(config.api_config_source(), type_url_, stats_scope_, - config_validators); - config_validators = std::make_unique>(); - auto second_mux = factory.testGetOrCreateMux( - config.api_config_source(), "type.googleapis.com/envoy.config.cluster.v3.Cluster", - stats_scope_, config_validators); - EXPECT_NE(first_mux.get(), second_mux.get()); - config_validators = std::make_unique>(); - auto third_mux = factory.testGetOrCreateMux( - config.api_config_source(), "type.googleapis.com/envoy.config.listener.v3.Listener", - stats_scope_, config_validators); - EXPECT_NE(first_mux.get(), third_mux.get()); - EXPECT_NE(second_mux.get(), third_mux.get()); - EXPECT_EQ(3, MultiplexedSubscriptionFactoryPeer::optimizedMuxesSize(factory)); -} - -// Verify that muxes managed by MultiplexedSubscriptionFactory are used when ApiConfigSource::GRPC -// api type is used. -TEST_P(MultiplexedSubscriptionFactoryForGrpcTest, - ShouldUseGetOrCreateMuxWhenApiConfigSourceIsUsed) { - auto factory = MultiplexedSubscriptionFactoryForTesting( - local_info_, dispatcher_, cm_, api_, validation_visitor_, server_, xds_resources_delegate_, - xds_config_tracker_); - envoy::config::core::v3::ConfigSource config; - auto config_source = config.mutable_api_config_source(); - config_source->set_api_type(GetParam()); - config_source->set_transport_api_version(envoy::config::core::v3::V3); - config_source->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name("first_cluster"); - Upstream::ClusterManager::ClusterSet primary_clusters; - primary_clusters.insert("first_cluster"); - EXPECT_CALL(cm_, primaryClusters()).WillOnce(ReturnRef(primary_clusters)); - NiceMock* async_client_factory{ - new NiceMock()}; - EXPECT_CALL(*async_client_factory, createUncachedRawAsyncClient()).WillOnce(Invoke([] { - return std::make_unique>(); - })); - NiceMock async_client_manager; - EXPECT_CALL(async_client_manager, factoryForGrpcService(_, _, _)) - .WillOnce(Invoke( - [async_client_factory](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) { - return Grpc::AsyncClientFactoryPtr(async_client_factory); - })); - EXPECT_CALL(cm_, grpcAsyncClientManager()).WillOnce(ReturnRef(async_client_manager)); - EXPECT_CALL(dispatcher_, createTimer_(_)); - auto subscription = - factory.subscriptionFromConfigSource(config, Config::TypeUrl::get().ClusterLoadAssignment, - stats_scope_, callbacks_, resource_decoder_, {}); - Config::CustomConfigValidatorsPtr config_validators = - std::make_unique>(); - auto expected_mux = factory.testGetOrCreateMux(config.api_config_source(), type_url_, - stats_scope_, config_validators); - EXPECT_EQ(expected_mux.get(), - (dynamic_cast(*subscription).grpcMux()).get()); -} - -using MultiplexedSubscriptionFactoryForNonGrpcTest = MultiplexedSubscriptionFactoryTest; - -INSTANTIATE_TEST_SUITE_P(NonGrpcApiConfigSource, MultiplexedSubscriptionFactoryForNonGrpcTest, - ::testing::Values(envoy::config::core::v3::ApiConfigSource::REST)); - -TEST_P(MultiplexedSubscriptionFactoryForNonGrpcTest, - ShouldUseBaseGetOrCreateMuxWhenNonGrpcConfigSourceIsUsed) { - auto factory = MultiplexedSubscriptionFactoryForTesting( - local_info_, dispatcher_, cm_, api_, validation_visitor_, server_, xds_resources_delegate_, - xds_config_tracker_); - envoy::config::core::v3::ConfigSource config; - auto* api_config_source = config.mutable_api_config_source(); - api_config_source->set_api_type(envoy::config::core::v3::ApiConfigSource::REST); - api_config_source->set_transport_api_version(envoy::config::core::v3::V3); - api_config_source->mutable_refresh_delay()->set_seconds(1); - api_config_source->mutable_request_timeout()->set_seconds(5); - api_config_source->add_cluster_names("xds_cluster"); - Upstream::ClusterManager::ClusterSet primary_clusters; - primary_clusters.insert("xds_cluster"); - EXPECT_CALL(cm_, primaryClusters()).WillOnce(ReturnRef(primary_clusters)); - EXPECT_CALL(dispatcher_, createTimer_(_)); - factory.subscriptionFromConfigSource(config, Config::TypeUrl::get().ClusterLoadAssignment, - stats_scope_, callbacks_, resource_decoder_, {}); - EXPECT_EQ(0, MultiplexedSubscriptionFactoryPeer::optimizedMuxesSize(factory)); -} - -} // namespace Upstream -} // namespace Envoy diff --git a/test/extensions/clusters/eds/eds_speed_test.cc b/test/extensions/clusters/eds/eds_speed_test.cc index 0d3da41d1ca6..8df731effeff 100644 --- a/test/extensions/clusters/eds/eds_speed_test.cc +++ b/test/extensions/clusters/eds/eds_speed_test.cc @@ -15,7 +15,6 @@ #include "source/common/config/protobuf_link_hacks.h" #include "source/common/config/utility.h" #include "source/common/config/xds_mux/grpc_mux_impl.h" -#include "source/common/runtime/runtime_features.h" #include "source/common/singleton/manager_impl.h" #include "source/extensions/clusters/eds/eds.h" #include "source/server/transport_socket_config_impl.h" @@ -81,26 +80,13 @@ class EdsSpeedTest { refresh_delay: 1s )EOF", Envoy::Upstream::Cluster::InitializePhase::Secondary); - if (edsMultiplexingEnabled()) { - EXPECT_CALL(*server_context_.cluster_manager_.multiplexed_subscription_factory_.subscription_, - start(_)); - } else { - EXPECT_CALL(*server_context_.cluster_manager_.subscription_factory_.subscription_, start(_)); - } + EXPECT_CALL(*server_context_.cluster_manager_.subscription_factory_.subscription_, start(_)); cluster_->initialize([this] { initialized_ = true; }); EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(testing::Return(&async_stream_)); subscription_->start({"fare"}); } - bool edsMultiplexingEnabled() { - return Runtime::runtimeFeatureEnabled("envoy.reloadable_features.multiplex_eds") && - (eds_cluster_.eds_cluster_config().eds_config().api_config_source().api_type() == - envoy::config::core::v3::ApiConfigSource::GRPC || - eds_cluster_.eds_cluster_config().eds_config().api_config_source().api_type() == - envoy::config::core::v3::ApiConfigSource::DELTA_GRPC); - } - void resetCluster(const std::string& yaml_config, Cluster::InitializePhase initialize_phase) { local_info_.node_.mutable_locality()->set_zone("us-east-1a"); eds_cluster_ = parseClusterFromV3Yaml(yaml_config); @@ -113,11 +99,7 @@ class EdsSpeedTest { cluster_ = std::make_shared(server_context_, eds_cluster_, runtime_, factory_context, std::move(scope), false); EXPECT_EQ(initialize_phase, cluster_->initializePhase()); - eds_callbacks_ = - edsMultiplexingEnabled() - ? server_context_.cluster_manager_.multiplexed_subscription_factory_.callbacks_ - : server_context_.cluster_manager_.subscription_factory_.callbacks_; - ; + eds_callbacks_ = server_context_.cluster_manager_.subscription_factory_.callbacks_; subscription_ = std::make_unique( grpc_mux_, *eds_callbacks_, resource_decoder_, subscription_stats_, type_url_, server_context_.dispatcher_, std::chrono::milliseconds(), false, diff --git a/test/extensions/clusters/eds/eds_test.cc b/test/extensions/clusters/eds/eds_test.cc index f7fc0539a10c..7bc0b992b2a4 100644 --- a/test/extensions/clusters/eds/eds_test.cc +++ b/test/extensions/clusters/eds/eds_test.cc @@ -134,27 +134,11 @@ class EdsTest : public testing::Test { cluster_ = std::make_shared(server_context_, eds_cluster_, runtime_.loader(), factory_context, std::move(scope), false); EXPECT_EQ(initialize_phase, cluster_->initializePhase()); - eds_callbacks_ = - edsMultiplexingEnabled() - ? server_context_.cluster_manager_.multiplexed_subscription_factory_.callbacks_ - : server_context_.cluster_manager_.subscription_factory_.callbacks_; - } - - bool edsMultiplexingEnabled() { - return Runtime::runtimeFeatureEnabled("envoy.reloadable_features.multiplex_eds") && - (eds_cluster_.eds_cluster_config().eds_config().api_config_source().api_type() == - envoy::config::core::v3::ApiConfigSource::GRPC || - eds_cluster_.eds_cluster_config().eds_config().api_config_source().api_type() == - envoy::config::core::v3::ApiConfigSource::DELTA_GRPC); + eds_callbacks_ = server_context_.cluster_manager_.subscription_factory_.callbacks_; } void initialize() { - if (edsMultiplexingEnabled()) { - EXPECT_CALL(*server_context_.cluster_manager_.multiplexed_subscription_factory_.subscription_, - start(_)); - } else { - EXPECT_CALL(*server_context_.cluster_manager_.subscription_factory_.subscription_, start(_)); - } + EXPECT_CALL(*server_context_.cluster_manager_.subscription_factory_.subscription_, start(_)); cluster_->initialize([this] { initialized_ = true; }); } @@ -2546,99 +2530,6 @@ TEST_F(EdsTest, OnConfigUpdateLedsAndEndpoints) { "(resource: xdstp://foo/leds/collection) and a list of endpoints."); } -TEST_F(EdsTest, MultiplexEdsEnabledViaRuntime) { - runtime_.mergeValues({{"envoy.reloadable_features.multiplex_eds", "true"}}); - EXPECT_CALL(server_context_.cluster_manager_.multiplexed_subscription_factory_, - subscriptionFromConfigSource(_, _, _, _, _, _)); - EXPECT_CALL(server_context_.cluster_manager_.subscription_factory_, - subscriptionFromConfigSource(_, _, _, _, _, _)) - .Times(0); - resetCluster(R"EOF( - name: some_cluster - connect_timeout: 0.25s - type: EDS - eds_cluster_config: - eds_config: - resource_api_version: V3 - api_config_source: - api_type: GRPC - transport_api_version: V3 - grpc_services: - envoy_grpc: - cluster_name: eds_cluster - )EOF", - Cluster::InitializePhase::Secondary); -} - -TEST_F(EdsTest, MultiplexEdsDisabledViaRuntime) { - runtime_.mergeValues({{"envoy.reloadable_features.multiplex_eds", "false"}}); - EXPECT_CALL(server_context_.cluster_manager_.multiplexed_subscription_factory_, - subscriptionFromConfigSource(_, _, _, _, _, _)) - .Times(0); - EXPECT_CALL(server_context_.cluster_manager_.subscription_factory_, - subscriptionFromConfigSource(_, _, _, _, _, _)); - resetCluster(R"EOF( - name: some_cluster - connect_timeout: 0.25s - type: EDS - eds_cluster_config: - eds_config: - resource_api_version: V3 - api_config_source: - api_type: GRPC - transport_api_version: V3 - grpc_services: - envoy_grpc: - cluster_name: eds_cluster - )EOF", - Cluster::InitializePhase::Secondary); -} - -TEST_F(EdsTest, MultiplexEdsWithUnsupportedApiTypeFallsbackToNonMultiplexedEds) { - runtime_.mergeValues({{"envoy.reloadable_features.multiplex_eds", "true"}}); - EXPECT_CALL(server_context_.cluster_manager_.multiplexed_subscription_factory_, - subscriptionFromConfigSource(_, _, _, _, _, _)) - .Times(0); - EXPECT_CALL(server_context_.cluster_manager_.subscription_factory_, - subscriptionFromConfigSource(_, _, _, _, _, _)); - resetCluster(R"EOF( - name: some_cluster - connect_timeout: 0.25s - type: EDS - eds_cluster_config: - eds_config: - api_config_source: - api_type: REST - cluster_names: - - eds - refresh_delay: 1s - )EOF", - Cluster::InitializePhase::Secondary); -} - -TEST_F(EdsTest, MultiplexEdsEnabledByDefault) { - EXPECT_CALL(server_context_.cluster_manager_.multiplexed_subscription_factory_, - subscriptionFromConfigSource(_, _, _, _, _, _)); - EXPECT_CALL(server_context_.cluster_manager_.subscription_factory_, - subscriptionFromConfigSource(_, _, _, _, _, _)) - .Times(0); - resetCluster(R"EOF( - name: some_cluster - connect_timeout: 0.25s - type: EDS - eds_cluster_config: - eds_config: - resource_api_version: V3 - api_config_source: - api_type: GRPC - transport_api_version: V3 - grpc_services: - envoy_grpc: - cluster_name: eds_cluster - )EOF", - Cluster::InitializePhase::Secondary); -} - } // namespace } // namespace Upstream } // namespace Envoy diff --git a/test/integration/BUILD b/test/integration/BUILD index 4eb21daa6a31..8563cf0e826e 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -151,25 +151,6 @@ envoy_cc_test( ], ) -envoy_cc_test( - name = "eds_grpc_integration_test", - srcs = ["eds_grpc_integration_test.cc"], - deps = [ - ":http_integration_lib", - "//source/common/runtime:runtime_features_lib", - "//source/common/upstream:load_balancer_lib", - "//test/config:utility_lib", - "//test/integration/filters:eds_ready_filter_config_lib", - "//test/test_common:network_utility_lib", - "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", - "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", - "@envoy_api//envoy/config/core/v3:pkg_cc_proto", - "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", - "@envoy_api//envoy/extensions/filters/network/http_connection_manager/v3:pkg_cc_proto", - "@envoy_api//envoy/type/v3:pkg_cc_proto", - ], -) - envoy_cc_test( name = "leds_integration_test", srcs = ["leds_integration_test.cc"], diff --git a/test/integration/base_integration_test.cc b/test/integration/base_integration_test.cc index fca24404e463..5337dd030797 100644 --- a/test/integration/base_integration_test.cc +++ b/test/integration/base_integration_test.cc @@ -588,25 +588,6 @@ AssertionResult BaseIntegrationTest::compareDiscoveryRequest( } } -AssertionResult BaseIntegrationTest::compareDiscoveryRequest( - const std::string& expected_type_url, const std::string& expected_version, - const std::vector& expected_resource_names, - const std::vector& expected_resource_names_added, - const std::vector& expected_resource_names_removed, FakeStreamPtr& stream, - bool expect_node, const Protobuf::int32 expected_error_code, - const std::string& expected_error_message) { - if (sotw_or_delta_ == Grpc::SotwOrDelta::Sotw || - sotw_or_delta_ == Grpc::SotwOrDelta::UnifiedSotw) { - return compareSotwDiscoveryRequest(expected_type_url, expected_version, expected_resource_names, - expect_node, expected_error_code, expected_error_message, - stream.get()); - } else { - return compareDeltaDiscoveryRequest(expected_type_url, expected_resource_names_added, - expected_resource_names_removed, stream, - expected_error_code, expected_error_message, expect_node); - } -} - AssertionResult compareSets(const std::set& set1, const std::set& set2, absl::string_view name) { if (set1 == set2) { @@ -820,98 +801,4 @@ void BaseIntegrationTest::checkForMissingTagExtractionRules() { test_server_->statStore().forEachGauge(nullptr, check_metric); test_server_->statStore().forEachHistogram(nullptr, check_metric); } - -AssertionResult BaseIntegrationTest::internalCompareDiscoveryRequest( - const DiscoveryRequestExpectedContents& expected_request, - const envoy::service::discovery::v3::DiscoveryRequest& actual_request, - const std::set& actual_sub) { - - if (actual_request.type_url() != expected_request.type_url_) { - return AssertionFailure() << fmt::format("type_url {} does not match expected {}.", - actual_request.type_url(), expected_request.type_url_); - } - auto sub_result = - compareSets(expected_request.subscriptions_, actual_sub, "expected_resource_subscriptions"); - if (!sub_result) { - return sub_result; - } - - if (actual_request.error_detail().code() != expected_request.error_code_) { - return AssertionFailure() << fmt::format( - "error code {} does not match expected {}. (Error message is {}).", - actual_request.error_detail().code(), expected_request.error_code_, - actual_request.error_detail().message()); - } - if (expected_request.error_code_ != Grpc::Status::WellKnownGrpcStatus::Ok && - actual_request.error_detail().message().find(expected_request.error_substring_) == - std::string::npos) { - return AssertionFailure() << "\"" << expected_request.error_substring_ - << "\" is not a substring of actual error message \"" - << actual_request.error_detail().message() << "\""; - } - return AssertionSuccess(); -} - -AssertionResult BaseIntegrationTest::internalCompareDeltaDiscoveryRequest( - const DiscoveryRequestExpectedContents& expected_request, - const envoy::service::discovery::v3::DeltaDiscoveryRequest& actual_request, - const std::set& actual_sub, const std::set& actual_unsub) { - - if (actual_request.type_url() != expected_request.type_url_) { - return AssertionFailure() << fmt::format("type_url {} does not match expected {}.", - actual_request.type_url(), expected_request.type_url_); - } - auto sub_result = - compareSets(expected_request.subscriptions_, actual_sub, "expected_resource_subscriptions"); - if (!sub_result) { - return sub_result; - } - auto unsub_result = compareSets(expected_request.unsubscriptions_, actual_unsub, - "expected_resource_unsubscriptions"); - if (!unsub_result) { - return unsub_result; - } - - if (actual_request.error_detail().code() != expected_request.error_code_) { - return AssertionFailure() << fmt::format( - "error code {} does not match expected {}. (Error message is {}).", - actual_request.error_detail().code(), expected_request.error_code_, - actual_request.error_detail().message()); - } - if (expected_request.error_code_ != Grpc::Status::WellKnownGrpcStatus::Ok && - actual_request.error_detail().message().find(expected_request.error_substring_) == - std::string::npos) { - return AssertionFailure() << "\"" << expected_request.error_substring_ - << "\" is not a substring of actual error message \"" - << actual_request.error_detail().message() << "\""; - } - return AssertionSuccess(); -} - -AssertionResult BaseIntegrationTest::assertExpectedDiscoveryRequest( - const envoy::service::discovery::v3::DiscoveryRequest& request, - const BaseIntegrationTest::DiscoveryRequestExpectedContents& expected_request) { - if (!request.has_node() || request.node().id().empty() || request.node().cluster().empty()) { - return AssertionFailure() << "Weird node field"; - } - - // Convert subscribed/unsubscribed names into sets to ignore ordering. - const std::set actual_sub{request.resource_names().begin(), - request.resource_names().end()}; - return internalCompareDiscoveryRequest(expected_request, request, actual_sub); -} - -AssertionResult BaseIntegrationTest::assertExpectedDeltaDiscoveryRequest( - const envoy::service::discovery::v3::DeltaDiscoveryRequest& request, - const BaseIntegrationTest::DiscoveryRequestExpectedContents& expected_request) { - if (!request.has_node() || request.node().id().empty() || request.node().cluster().empty()) { - return AssertionFailure() << "Weird node field"; - } - const std::set actual_sub{request.resource_names_subscribe().begin(), - request.resource_names_subscribe().end()}; - const std::set actual_unsub{request.resource_names_unsubscribe().begin(), - request.resource_names_unsubscribe().end()}; - return internalCompareDeltaDiscoveryRequest(expected_request, request, actual_sub, actual_unsub); -} - } // namespace Envoy diff --git a/test/integration/base_integration_test.h b/test/integration/base_integration_test.h index 3a7daa23ed7a..e099358f8dfa 100644 --- a/test/integration/base_integration_test.h +++ b/test/integration/base_integration_test.h @@ -187,16 +187,6 @@ class BaseIntegrationTest : protected Logger::Loggable { const std::vector& expected_resource_names_removed, bool expect_node = false, const Protobuf::int32 expected_error_code = Grpc::Status::WellKnownGrpcStatus::Ok, const std::string& expected_error_message = ""); - - AssertionResult compareDiscoveryRequest( - const std::string& expected_type_url, const std::string& expected_version, - const std::vector& expected_resource_names, - const std::vector& expected_resource_names_added, - const std::vector& expected_resource_names_removed, FakeStreamPtr& stream, - bool expect_node = false, - const Protobuf::int32 expected_error_code = Grpc::Status::WellKnownGrpcStatus::Ok, - const std::string& expected_error_message = ""); - template void sendDiscoveryResponse(const std::string& type_url, const std::vector& state_of_the_world, const std::vector& added_or_updated, @@ -209,19 +199,6 @@ class BaseIntegrationTest : protected Logger::Loggable { } } - template - void sendDiscoveryResponse(const std::string& type_url, const std::vector& state_of_the_world, - const std::vector& added_or_updated, - const std::vector& removed, const std::string& version, - FakeStreamPtr& stream) { - if (sotw_or_delta_ == Grpc::SotwOrDelta::Sotw || - sotw_or_delta_ == Grpc::SotwOrDelta::UnifiedSotw) { - sendSotwDiscoveryResponse(type_url, state_of_the_world, version, stream.get()); - } else { - sendDeltaDiscoveryResponse(type_url, added_or_updated, removed, version, stream); - } - } - AssertionResult compareDeltaDiscoveryRequest( const std::string& expected_type_url, const std::vector& expected_resource_subscriptions, @@ -246,43 +223,6 @@ class BaseIntegrationTest : protected Logger::Loggable { const Protobuf::int32 expected_error_code = Grpc::Status::WellKnownGrpcStatus::Ok, const std::string& expected_error_message = "", FakeStream* stream = nullptr); - struct DiscoveryRequestExpectedContents { - DiscoveryRequestExpectedContents( - const std::string& type_url, const std::vector& subscriptions, - const std::vector& unsubscriptions, - const Protobuf::int32 error_code = Grpc::Status::WellKnownGrpcStatus::Ok, - const std::string& error_substring = "") - : type_url_(type_url), - // Convert subscribed/unsubscribed names into sets to ignore ordering. - subscriptions_(subscriptions.begin(), subscriptions.end()), - unsubscriptions_(unsubscriptions.begin(), unsubscriptions.end()), error_code_(error_code), - error_substring_(error_substring) {} - - const std::string& type_url_; - const std::set subscriptions_; - const std::set unsubscriptions_; - const Protobuf::int32 error_code_; - const std::string& error_substring_; - }; - - AssertionResult internalCompareDeltaDiscoveryRequest( - const DiscoveryRequestExpectedContents& expected_request, - const envoy::service::discovery::v3::DeltaDiscoveryRequest& actual_request, - const std::set& actual_sub, const std::set& actual_unsub); - - AssertionResult internalCompareDiscoveryRequest( - const DiscoveryRequestExpectedContents& expected_request, - const envoy::service::discovery::v3::DiscoveryRequest& actual_request, - const std::set& actual_sub); - - AssertionResult assertExpectedDeltaDiscoveryRequest( - const envoy::service::discovery::v3::DeltaDiscoveryRequest& request, - const DiscoveryRequestExpectedContents& expected_request); - - AssertionResult - assertExpectedDiscoveryRequest(const envoy::service::discovery::v3::DiscoveryRequest& request, - const DiscoveryRequestExpectedContents& expected_request); - template void sendSotwDiscoveryResponse(const std::string& type_url, const std::vector& messages, const std::string& version, FakeStream* stream = nullptr) { diff --git a/test/integration/eds_grpc_integration_test.cc b/test/integration/eds_grpc_integration_test.cc deleted file mode 100644 index cec8ebffca1a..000000000000 --- a/test/integration/eds_grpc_integration_test.cc +++ /dev/null @@ -1,601 +0,0 @@ -#include "envoy/config/bootstrap/v3/bootstrap.pb.h" -#include "envoy/config/cluster/v3/cluster.pb.h" -#include "envoy/config/core/v3/health_check.pb.h" -#include "envoy/config/endpoint/v3/endpoint.pb.h" -#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" -#include "envoy/type/v3/http.pb.h" - -#include "source/common/runtime/runtime_features.h" -#include "source/common/upstream/load_balancer_impl.h" - -#include "test/config/utility.h" -#include "test/integration/http_integration.h" -#include "test/test_common/network_utility.h" - -#include "gtest/gtest.h" - -namespace Envoy { -namespace { - -class EdsOverGrpcIntegrationTest : public Grpc::MultiplexedDeltaSotwIntegrationParamTest, - public HttpIntegrationTest { -protected: - struct FakeUpstreamInfo { - FakeHttpConnectionPtr connection_; - FakeUpstream* upstream_{}; - absl::flat_hash_map stream_by_resource_name_; - static constexpr char default_stream_name[] = "default"; - // Used for cases where only a single stream is needed. - FakeStreamPtr& defaultStream() { return stream_by_resource_name_[default_stream_name]; } - }; - - EdsOverGrpcIntegrationTest() - : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, ipVersion()), - codec_client_type_(envoy::type::v3::HTTP2) { - use_lds_ = true; - test_skipped_ = false; - } - - void TearDown() override { - if (!test_skipped_) { - resetConnections(); - cleanUpXdsConnection(); - } - } - - void resetConnections() { - // First disconnect upstream connections to avoid FIN messages causing unexpected - // disconnects on the fake servers. - for (auto& host_upstream_info : hosts_upstreams_info_) { - resetFakeUpstreamInfo(host_upstream_info); - } - } - - // A helper function to set the endpoints health status. - void setEndpointsHealthStatus( - const absl::flat_hash_set& endpoints_idxs, - envoy::config::core::v3::HealthStatus health_status, absl::string_view collection_prefix, - absl::flat_hash_map& - updated_endpoints) { - for (const auto endpoint_idx : endpoints_idxs) { - const std::string endpoint_name = absl::StrCat(collection_prefix, "endpoint", endpoint_idx); - envoy::config::endpoint::v3::LbEndpoint endpoint; - // Shift fake_upstreams_ by 1 (due to EDS fake upstream). - setUpstreamAddress(endpoint_idx + 1, endpoint); - endpoint.set_health_status(health_status); - updated_endpoints.emplace(endpoint_name, endpoint); - } - } - - void setEndpoints(uint32_t total_endpoints, uint32_t healthy_endpoints, - uint32_t degraded_endpoints, bool remaining_unhealthy = true, - absl::optional overprovisioning_factor = absl::nullopt, - bool await_update = true) { - envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment; - cluster_load_assignment.set_cluster_name("cluster_0"); - if (overprovisioning_factor.has_value()) { - cluster_load_assignment.mutable_policy()->mutable_overprovisioning_factor()->set_value( - overprovisioning_factor.value()); - } - auto* locality_lb_endpoints = cluster_load_assignment.add_endpoints(); - locality_lb_endpoints->set_priority(1); - for (uint32_t i = 0; i < total_endpoints; ++i) { - auto* endpoint = locality_lb_endpoints->add_lb_endpoints(); - // Skip EDS upstream. - setUpstreamAddress(i + 1, *endpoint); - // First N endpoints are degraded, next M are healthy and the remaining endpoints are - // unhealthy or unknown depending on remaining_unhealthy. - if (i < degraded_endpoints) { - endpoint->set_health_status(envoy::config::core::v3::DEGRADED); - } else if (i >= healthy_endpoints + degraded_endpoints) { - endpoint->set_health_status(remaining_unhealthy ? envoy::config::core::v3::UNHEALTHY - : envoy::config::core::v3::UNKNOWN); - } - } - auto& stream = - (edsUpdateMode() == Grpc::EdsUpdateMode::Multiplexed) ? xds_stream_ : xds_streams_.front(); - sendDiscoveryResponse( - Config::TypeUrl::get().ClusterLoadAssignment, {cluster_load_assignment}, - {cluster_load_assignment}, {}, std::to_string(eds_version_++), stream); - if (await_update) { - // Receive EDS ack. - EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, - std::to_string(eds_version_ - 1), {}, {}, {}, stream, - true, Grpc::Status::WellKnownGrpcStatus::Ok, "")); - } - } - - void createUpstreams() override { - // Add EDS upstream. - addFakeUpstream(Http::CodecType::HTTP2); - HttpIntegrationTest::createUpstreams(); - hosts_upstreams_info_.reserve(fake_upstreams_count_); - // Skip the first fake upstream as it is reserved for EDS. - for (size_t i = 1; i < fake_upstreams_.size(); ++i) { - FakeUpstreamInfo host_info; - host_info.upstream_ = &(*fake_upstreams_[i]); - hosts_upstreams_info_.emplace_back(std::move(host_info)); - } - } - - void initializeTest(bool http_active_hc, uint32_t num_clusters_to_add, - bool ignore_new_hosts_until_first_hc) { - setUpstreamCount(4); - setUpstreamProtocol(Http::CodecType::HTTP2); - if (edsUpdateMode() == Grpc::EdsUpdateMode::StreamPerCluster) { - config_helper_.addRuntimeOverride("envoy.reloadable_features.multiplex_eds", "false"); - } else { - config_helper_.addRuntimeOverride("envoy.reloadable_features.multiplex_eds", "true"); - } - config_helper_.addConfigModifier( - [this, http_active_hc, num_clusters_to_add, - ignore_new_hosts_until_first_hc](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { - // Add a static EDS cluster. - auto* eds_cluster = bootstrap.mutable_static_resources()->add_clusters(); - eds_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); - eds_cluster->set_name("eds_cluster"); - eds_cluster->mutable_load_assignment()->set_cluster_name("eds_cluster"); - ConfigHelper::setHttp2(*eds_cluster); - // Remove the static cluster (cluster_0) and set up CDS. - bootstrap.mutable_dynamic_resources()->mutable_cds_config()->set_resource_api_version( - envoy::config::core::v3::ApiVersion::V3); - bootstrap.mutable_dynamic_resources() - ->mutable_cds_config() - ->mutable_path_config_source() - ->set_path(cds_helper_.cds_path()); - bootstrap.mutable_static_resources()->mutable_clusters()->erase( - bootstrap.mutable_static_resources()->mutable_clusters()->begin()); - for (uint32_t i = 0; i < num_clusters_to_add; ++i) { - auto cluster_name = "cluster_" + std::to_string(i); - auto cluster_to_add = - buildCluster(cluster_name, http_active_hc, ignore_new_hosts_until_first_hc); - envoy::config::endpoint::v3::ClusterLoadAssignment cla_to_add; - cla_to_add.set_cluster_name(cluster_name); - clas_to_add_.push_back(cla_to_add); - clusters_to_add_.push_back(cluster_to_add); - resource_names_.push_back(cluster_name); - } - cds_helper_.setCds(clusters_to_add_); - }); - // Set validate_clusters to false to allow us to reference a CDS cluster. - config_helper_.addConfigModifier( - [](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& - hcm) { hcm.mutable_route_config()->mutable_validate_clusters()->set_value(false); }); - defer_listener_finalization_ = true; - HttpIntegrationTest::initialize(); - // Add the assignment and localities. - cluster_load_assignment_.set_cluster_name("cluster_0"); - acceptXdsConnection(); - - if (edsUpdateMode() == Grpc::EdsUpdateMode::Multiplexed) { - initXdsStream(xds_stream_); - for (uint32_t i = 0; i < num_clusters_to_add; ++i) { - EXPECT_TRUE(compareDiscoveryRequest( - Config::TypeUrl::get().ClusterLoadAssignment, "", - {resource_names_.begin(), resource_names_.begin() + i + 1}, {}, {}, true)); - } - sendDiscoveryResponse( - Config::TypeUrl::get().ClusterLoadAssignment, clas_to_add_, clas_to_add_, {}, - std::to_string(eds_version_)); - // Receive EDS ack. - EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, - std::to_string(eds_version_), {}, {}, {}, true)); - eds_version_++; - } else { - std::list expected_requests_contents; - for (uint32_t i = 0; i < num_clusters_to_add; ++i) { - xds_streams_.emplace_back(); - initXdsStream(xds_streams_.back()); - expected_requests_contents.push_back({Config::TypeUrl::get().ClusterLoadAssignment, - {"cluster_" + std::to_string(i)}, - {}, - Grpc::Status::WellKnownGrpcStatus::Ok, - ""}); - compareMultipleDiscoveryRequestsOnMultipleStreams(expected_requests_contents); - } - } - // Wait for our statically specified listener to become ready, and register its port in the - // test framework's downstream listener port map. - test_server_->waitUntilListenersReady(); - registerTestServerPorts({"http"}); - test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); - } - - AssertionResult compareMultipleDiscoveryRequestsOnMultipleStreams( - std::list& expected_requests_contents) { - uint32_t curr_stream_idx = 0; - bool comparison_result; - while (curr_stream_idx < xds_streams_.size()) { - for (auto it = expected_requests_contents.begin(); it != expected_requests_contents.end(); - ++it) { - const auto expected_request = *it; - if (sotw_or_delta_ == Grpc::SotwOrDelta::Sotw) { - envoy::service::discovery::v3::DiscoveryRequest request; - VERIFY_ASSERTION( - xds_streams_[curr_stream_idx]->waitForGrpcMessage(*dispatcher_, request)); - comparison_result = assertExpectedDiscoveryRequest(request, expected_request); - } else { - envoy::service::discovery::v3::DeltaDiscoveryRequest request; - VERIFY_ASSERTION( - xds_streams_[curr_stream_idx]->waitForGrpcMessage(*dispatcher_, request)); - comparison_result = assertExpectedDeltaDiscoveryRequest(request, expected_request); - } - if (comparison_result) { - sendDiscoveryResponse( - Config::TypeUrl::get().ClusterLoadAssignment, {clas_to_add_[curr_stream_idx]}, - {clas_to_add_[curr_stream_idx]}, {}, std::to_string(eds_version_), - xds_streams_[curr_stream_idx]); - // Receive EDS ack. - EXPECT_TRUE(compareDiscoveryRequest( - Config::TypeUrl::get().ClusterLoadAssignment, std::to_string(eds_version_), {}, {}, - {}, xds_streams_[curr_stream_idx], true, Grpc::Status::WellKnownGrpcStatus::Ok, - /*expected_error_message=*/"")); - ++curr_stream_idx; - expected_requests_contents.erase(it); - ++eds_version_; - // Request received on current stream, no more requests expected for this stream. - break; - } else { - // No matching request found. - if (it == expected_requests_contents.end()) { - return AssertionFailure(); - } - continue; - } - } - } - if (!expected_requests_contents.empty()) { - return AssertionFailure(); - } else { - return AssertionSuccess(); - } - } - - envoy::config::cluster::v3::Cluster buildCluster(const std::string& name, bool http_active_hc, - bool ignore_new_hosts_until_first_hc) { - envoy::config::cluster::v3::Cluster cluster; - if (ignore_new_hosts_until_first_hc) { - cluster.mutable_common_lb_config()->set_ignore_new_hosts_until_first_hc(true); - } - cluster.set_name(name); - cluster.set_type(envoy::config::cluster::v3::Cluster::EDS); - cluster.mutable_connect_timeout()->CopyFrom(Protobuf::util::TimeUtil::SecondsToDuration(5)); - auto* eds_cluster_config = cluster.mutable_eds_cluster_config(); - eds_cluster_config->mutable_eds_config()->set_resource_api_version( - envoy::config::core::v3::ApiVersion::V3); - auto* api_config_source = eds_cluster_config->mutable_eds_config()->mutable_api_config_source(); - api_config_source->set_api_type(envoy::config::core::v3::ApiConfigSource::GRPC); - api_config_source->set_transport_api_version(envoy::config::core::v3::ApiVersion::V3); - auto* grpc_service = api_config_source->add_grpc_services(); - setGrpcService(*grpc_service, "eds_cluster", fake_upstreams_[0]->localAddress()); - if (http_active_hc) { - auto* health_check = cluster.add_health_checks(); - health_check->mutable_timeout()->set_seconds(30); - // TODO(mattklein123): Consider using simulated time here. - health_check->mutable_interval()->CopyFrom( - Protobuf::util::TimeUtil::MillisecondsToDuration(100)); - health_check->mutable_no_traffic_interval()->CopyFrom( - Protobuf::util::TimeUtil::MillisecondsToDuration(100)); - health_check->mutable_unhealthy_threshold()->set_value(1); - health_check->mutable_healthy_threshold()->set_value(1); - health_check->mutable_http_health_check()->set_path("/healthcheck"); - health_check->mutable_http_health_check()->set_codec_client_type(codec_client_type_); - } - return cluster; - } - - void acceptXdsConnection() { - AssertionResult result = - fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, xds_connection_); - RELEASE_ASSERT(result, result.message()); - } - - void initXdsStream(FakeStreamPtr& stream) { - AssertionResult result = xds_connection_->waitForNewStream(*dispatcher_, stream); - RELEASE_ASSERT(result, result.message()); - stream->startGrpcStream(); - } - - void resetFakeUpstreamInfo(FakeUpstreamInfo& upstream_info) { - if (upstream_info.connection_ == nullptr || upstream_info.upstream_ == nullptr) { - upstream_info.upstream_ = nullptr; - return; - } - AssertionResult result = upstream_info.connection_->close(); - RELEASE_ASSERT(result, result.message()); - result = upstream_info.connection_->waitForDisconnect(); - RELEASE_ASSERT(result, result.message()); - upstream_info.connection_.reset(); - upstream_info.upstream_ = nullptr; - } - - void waitForHealthCheck(uint32_t upstream_info_idx) { - auto& host_info = hosts_upstreams_info_[upstream_info_idx]; - if (host_info.connection_ == nullptr) { - ASSERT_TRUE(host_info.upstream_->waitForHttpConnection(*dispatcher_, host_info.connection_)); - } - ASSERT_TRUE(host_info.connection_->waitForNewStream(*dispatcher_, host_info.defaultStream())); - ASSERT_TRUE(host_info.defaultStream()->waitForEndStream(*dispatcher_)); - - EXPECT_EQ(host_info.defaultStream()->headers().getPathValue(), "/healthcheck"); - EXPECT_EQ(host_info.defaultStream()->headers().getMethodValue(), "GET"); - } - - void setEndpointsInPriorities(uint32_t first_priority, uint32_t second_priority, - bool await_update = true) { - envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment; - cluster_load_assignment.set_cluster_name("cluster_0"); - { - for (uint32_t i = 0; i < first_priority; ++i) { - auto* locality_lb_endpoints = cluster_load_assignment.add_endpoints(); - auto* endpoint = locality_lb_endpoints->add_lb_endpoints(); - setUpstreamAddress(i, *endpoint); - } - } - { - for (uint32_t i = first_priority; i < first_priority + second_priority; ++i) { - auto* locality_lb_endpoints = cluster_load_assignment.add_endpoints(); - locality_lb_endpoints->set_priority(1); - auto* endpoint = locality_lb_endpoints->add_lb_endpoints(); - setUpstreamAddress(i, *endpoint); - } - } - auto& stream = - (edsUpdateMode() == Grpc::EdsUpdateMode::Multiplexed) ? xds_stream_ : xds_streams_.front(); - sendDiscoveryResponse( - Config::TypeUrl::get().ClusterLoadAssignment, {cluster_load_assignment}, - {cluster_load_assignment}, {}, std::to_string(eds_version_++), stream); - - if (await_update) { - // Receive EDS ack. - EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, - std::to_string(eds_version_ - 1), {}, {}, {}, stream, - true, Grpc::Status::WellKnownGrpcStatus::Ok, "")); - } - } - - void initializeTest(bool http_active_hc) { initializeTest(http_active_hc, 1, false); } - - envoy::type::v3::CodecClientType codec_client_type_{}; - CdsHelper cds_helper_; - envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment_; - std::vector clusters_to_add_; - std::vector clas_to_add_; - std::vector resource_names_; - std::vector hosts_upstreams_info_; - uint32_t eds_version_{}; - bool test_skipped_{false}; - std::vector xds_streams_; -}; - -INSTANTIATE_TEST_SUITE_P(IpVersionsClientTypeSotwOrDeltaEdsMode, EdsOverGrpcIntegrationTest, - EDS_MODE_DELTA_SOTW_GRPC_CLIENT_INTEGRATION_PARAMS); - -// Validates that endpoints can be added and then moved to other priorities without causing crashes. -// Primarily as a regression test for https://github.com/envoyproxy/envoy/issues/8764 -TEST_P(EdsOverGrpcIntegrationTest, Http2UpdatePriorities) { - initializeTest(true); - setEndpointsInPriorities(2, 2); - setEndpointsInPriorities(4, 0); - setEndpointsInPriorities(0, 4); -} - -// Verify that a host stabilized via active health checking which is first removed from EDS and -// then fails health checking is removed. -TEST_P(EdsOverGrpcIntegrationTest, RemoveAfterHcFail) { - initializeTest(true); - setEndpoints(1, 0, 0, false); - EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_total")->value()); - EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); - // Wait for the first HC and verify the host is healthy. - waitForHealthCheck(0); - hosts_upstreams_info_[0].defaultStream()->encodeHeaders( - Http::TestResponseHeaderMapImpl{{":status", "200"}}, true); - test_server_->waitForGaugeEq("cluster.cluster_0.membership_healthy", 1); - EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_total")->value()); - // Clear out the host and verify the host is still healthy. - setEndpoints(0, 0, 0); - EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_total")->value()); - EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); - // Fail HC and verify the host is gone. - waitForHealthCheck(0); - hosts_upstreams_info_[0].defaultStream()->encodeHeaders( - Http::TestResponseHeaderMapImpl{{":status", "503"}, {"connection", "close"}}, true); - test_server_->waitForGaugeEq("cluster.cluster_0.membership_healthy", 0); - EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_total")->value()); -} - -// Verifies that endpoints are ignored until health checked when configured to. -TEST_P(EdsOverGrpcIntegrationTest, EndpointWarmingSuccessfulHc) { - // Endpoints are initially excluded. - initializeTest(true, 1, true); - setEndpoints(1, 0, 0, false); - EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_total")->value()); - EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_excluded")->value()); - EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); - // Wait for the first HC and verify the host is healthy and that it is no longer being - // excluded. - // The other endpoint should still be excluded. - waitForNextUpstreamRequest(1); - upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true); - test_server_->waitForGaugeEq("cluster.cluster_0.membership_excluded", 0); - EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_total")->value()); - EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); -} - -// Verifies that endpoints are ignored until health checked when configured to when the first -// health check fails. -TEST_P(EdsOverGrpcIntegrationTest, EndpointWarmingFailedHc) { - // Endpoints are initially excluded. - initializeTest(true, 1, true); - setEndpoints(1, 0, 0, false); - EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_total")->value()); - EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_excluded")->value()); - EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); - - // Wait for the first HC and verify the host is healthy and that it is no longer being - // excluded. - // The other endpoint should still be excluded. - waitForNextUpstreamRequest(1); - upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); - test_server_->waitForGaugeEq("cluster.cluster_0.membership_excluded", 0); - EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_total")->value()); - EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); -} - -// Validate that health status updates are consumed from EDS. -TEST_P(EdsOverGrpcIntegrationTest, HealthUpdate) { - initializeTest(false); - // Initial state, no cluster members. - EXPECT_EQ(0, test_server_->counter("cluster.cluster_0.membership_change")->value()); - EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_total")->value()); - EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); - // 2/2 healthy endpoints. - setEndpoints(2, 2, 0); - EXPECT_EQ(1, test_server_->counter("cluster.cluster_0.membership_change")->value()); - EXPECT_EQ(2, test_server_->gauge("cluster.cluster_0.membership_total")->value()); - EXPECT_EQ(2, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); - // Drop to 0/2 healthy endpoints. - setEndpoints(2, 0, 0); - EXPECT_EQ(1, test_server_->counter("cluster.cluster_0.membership_change")->value()); - EXPECT_EQ(2, test_server_->gauge("cluster.cluster_0.membership_total")->value()); - EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); - // Increase to 1/2 healthy endpoints. - setEndpoints(2, 1, 0); - EXPECT_EQ(1, test_server_->counter("cluster.cluster_0.membership_change")->value()); - EXPECT_EQ(2, test_server_->gauge("cluster.cluster_0.membership_total")->value()); - EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); - // Add host and modify health to 2/3 healthy endpoints. - setEndpoints(3, 2, 0); - EXPECT_EQ(2, test_server_->counter("cluster.cluster_0.membership_change")->value()); - EXPECT_EQ(3, test_server_->gauge("cluster.cluster_0.membership_total")->value()); - EXPECT_EQ(2, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); - // Modify health to 2/3 healthy and 1/3 degraded. - setEndpoints(3, 2, 1); - EXPECT_EQ(2, test_server_->counter("cluster.cluster_0.membership_change")->value()); - EXPECT_EQ(3, test_server_->gauge("cluster.cluster_0.membership_total")->value()); - EXPECT_EQ(2, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); - EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_degraded")->value()); -} - -// Validate that overprovisioning_factor update are picked up by Envoy. -TEST_P(EdsOverGrpcIntegrationTest, OverprovisioningFactorUpdate) { - initializeTest(false); - // Default overprovisioning factor. - setEndpoints(4, 4, 0); - auto get_and_compare = [this](const uint32_t expected_factor) { - const auto& cluster_map = test_server_->server().clusterManager().clusters(); - EXPECT_EQ(2, cluster_map.active_clusters_.size()); - EXPECT_EQ(1, cluster_map.active_clusters_.count("cluster_0")); - const auto& cluster_ref = cluster_map.active_clusters_.find("cluster_0")->second; - const auto& hostset_per_priority = cluster_ref.get().prioritySet().hostSetsPerPriority(); - EXPECT_EQ(2, hostset_per_priority.size()); - const Envoy::Upstream::HostSetPtr& host_set = hostset_per_priority[0]; - EXPECT_EQ(expected_factor, host_set->overprovisioningFactor()); - }; - get_and_compare(Envoy::Upstream::kDefaultOverProvisioningFactor); - // Use new overprovisioning factor 200. - setEndpoints(4, 4, 0, true, 200); - get_and_compare(200); -} - -// Verifies that EDS update only triggers member update callbacks once per update. -TEST_P(EdsOverGrpcIntegrationTest, BatchMemberUpdateCb) { - initializeTest(false); - uint32_t member_update_count{}; - auto& priority_set = test_server_->server() - .clusterManager() - .clusters() - .active_clusters_.find("cluster_0") - ->second.get() - .prioritySet(); - // Keep track of how many times we're seeing a member update callback. - auto member_update_cb = priority_set.addMemberUpdateCb([&](const auto& hosts_added, const auto&) { - // We should see both hosts present in the member update callback. - EXPECT_EQ(2, hosts_added.size()); - member_update_count++; - }); - setEndpoints(2, 2, 0); - EXPECT_EQ(1, member_update_count); -} - -TEST_P(EdsOverGrpcIntegrationTest, StatsReadyFilter) { - config_helper_.prependFilter("name: eds-ready-filter"); - initializeTest(false); - // Initial state: no healthy endpoints - EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); - BufferingStreamDecoderPtr response = IntegrationUtil::makeSingleRequest( - lookupPort("http"), "GET", "/cluster1", "", downstream_protocol_, version_, "foo.com"); - ASSERT_TRUE(response->complete()); - EXPECT_EQ("500", response->headers().getStatusValue()); - EXPECT_EQ("EDS not ready", response->body()); - cleanupUpstreamAndDownstream(); - // 2/2 healthy endpoints. - setEndpoints(2, 2, 0); - EXPECT_EQ(2, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); - response = IntegrationUtil::makeSingleRequest(lookupPort("http"), "GET", "/cluster1", "", - downstream_protocol_, version_, "foo.com"); - ASSERT_TRUE(response->complete()); - EXPECT_EQ("200", response->headers().getStatusValue()); - EXPECT_EQ("EDS is ready", response->body()); - cleanupUpstreamAndDownstream(); -} - -TEST_P(EdsOverGrpcIntegrationTest, ReuseMuxAndStreamForMultipleClusters) { - if (edsUpdateMode() == Grpc::EdsUpdateMode::Multiplexed) { - initializeTest(false, 2, false); - EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_total")->value()); - EXPECT_EQ(0, test_server_->gauge("cluster.cluster_1.membership_total")->value()); - switch (clientType()) { - case Grpc::ClientType::EnvoyGrpc: - // As EDS uses HTTP2, number of streams created by Envoy for EDS cluster equals to number - // of requests. - EXPECT_EQ(1UL, test_server_->counter("cluster.eds_cluster.upstream_rq_total")->value()); - break; - case Grpc::ClientType::GoogleGrpc: - // One EDS mux/stream is created and reused for 2 clusters when initializing first EDS - // cluster (cluster_0). As a consequence, only one Google async grpc client and one - // corresponding set of client stats should be created. - EXPECT_EQ(1UL, - test_server_->counter("cluster.cluster_0.grpc.eds_cluster.streams_total")->value()); - EXPECT_EQ(TestUtility::findCounter(test_server_->statStore(), - "cluster.cluster_1.grpc.eds_cluster.streams_total"), - nullptr); - break; - default: - PANIC("reached unexpected code"); - } - } else { - test_skipped_ = true; - } -} - -TEST_P(EdsOverGrpcIntegrationTest, StreamPerClusterMultipleClusters) { - if (edsUpdateMode() == Grpc::EdsUpdateMode::StreamPerCluster) { - initializeTest(false, 2, false); - EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_total")->value()); - EXPECT_EQ(0, test_server_->gauge("cluster.cluster_1.membership_total")->value()); - switch (clientType()) { - case Grpc::ClientType::EnvoyGrpc: - // As EDS uses HTTP2, number of streams created by Envoy for EDS cluster equals to number - // of requests. - EXPECT_EQ(2UL, test_server_->counter("cluster.eds_cluster.upstream_rq_total")->value()); - break; - case Grpc::ClientType::GoogleGrpc: - // One EDS mux/stream is created for each cluster. As a consequence, 2 sets of client stats - // should be instantiated. - EXPECT_EQ(1UL, - test_server_->counter("cluster.cluster_0.grpc.eds_cluster.streams_total")->value()); - EXPECT_EQ(1UL, - test_server_->counter("cluster.cluster_1.grpc.eds_cluster.streams_total")->value()); - break; - default: - PANIC("reached unexpected code"); - } - } else { - test_skipped_ = true; - } -} - -} // namespace -} // namespace Envoy diff --git a/test/integration/eds_integration_test.cc b/test/integration/eds_integration_test.cc index ce705d269a00..d3be9d2ccb20 100644 --- a/test/integration/eds_integration_test.cc +++ b/test/integration/eds_integration_test.cc @@ -315,8 +315,8 @@ TEST_P(EdsIntegrationTest, FinishWarmingIgnoreHealthCheck) { EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); EXPECT_EQ(0, test_server_->gauge("cluster_manager.warming_clusters")->value()); - // Trigger a CDS update. This should cause a new cluster to require warming, blocked on the - // host being health checked. + // Trigger a CDS update. This should cause a new cluster to require warming, blocked on the host + // being health checked. cluster_.mutable_circuit_breakers()->add_thresholds()->mutable_max_connections()->set_value(100); cds_helper_.setCds({cluster_}); test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); @@ -340,8 +340,8 @@ TEST_P(EdsIntegrationTest, EndpointWarmingSuccessfulHc) { EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_excluded")->value()); EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); - // Wait for the first HC and verify the host is healthy and that it is no longer being - // excluded. The other endpoint should still be excluded. + // Wait for the first HC and verify the host is healthy and that it is no longer being excluded. + // The other endpoint should still be excluded. waitForNextUpstreamRequest(0); upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true); test_server_->waitForGaugeEq("cluster.cluster_0.membership_excluded", 0); @@ -362,8 +362,8 @@ TEST_P(EdsIntegrationTest, EndpointWarmingFailedHc) { EXPECT_EQ(1, test_server_->gauge("cluster.cluster_0.membership_excluded")->value()); EXPECT_EQ(0, test_server_->gauge("cluster.cluster_0.membership_healthy")->value()); - // Wait for the first HC and verify the host is healthy and that it is no longer being - // excluded. The other endpoint should still be excluded. + // Wait for the first HC and verify the host is healthy and that it is no longer being excluded. + // The other endpoint should still be excluded. waitForNextUpstreamRequest(0); upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "503"}}, true); test_server_->waitForGaugeEq("cluster.cluster_0.membership_excluded", 0); diff --git a/test/mocks/upstream/cluster_manager.cc b/test/mocks/upstream/cluster_manager.cc index 1943e5e71d4b..44d845b6c3ff 100644 --- a/test/mocks/upstream/cluster_manager.cc +++ b/test/mocks/upstream/cluster_manager.cc @@ -26,8 +26,6 @@ MockClusterManager::MockClusterManager() ON_CALL(*this, grpcAsyncClientManager()).WillByDefault(ReturnRef(async_client_manager_)); ON_CALL(*this, localClusterName()).WillByDefault((ReturnRef(local_cluster_name_))); ON_CALL(*this, subscriptionFactory()).WillByDefault(ReturnRef(subscription_factory_)); - ON_CALL(*this, multiplexedSubscriptionFactory()) - .WillByDefault(ReturnRef(multiplexed_subscription_factory_)); ON_CALL(*this, allocateOdCdsApi(_, _, _)) .WillByDefault(Invoke([](const envoy::config::core::v3::ConfigSource&, OptRef, diff --git a/test/mocks/upstream/cluster_manager.h b/test/mocks/upstream/cluster_manager.h index e1daacc5f0b6..50d26fed8317 100644 --- a/test/mocks/upstream/cluster_manager.h +++ b/test/mocks/upstream/cluster_manager.h @@ -56,7 +56,6 @@ class MockClusterManager : public ClusterManager { MOCK_METHOD(ClusterUpdateCallbacksHandle*, addThreadLocalClusterUpdateCallbacks_, (ClusterUpdateCallbacks & callbacks)); MOCK_METHOD(Config::SubscriptionFactory&, subscriptionFactory, ()); - MOCK_METHOD(Config::SubscriptionFactory&, multiplexedSubscriptionFactory, ()); const ClusterTrafficStatNames& clusterStatNames() const override { return cluster_stat_names_; } const ClusterConfigUpdateStatNames& clusterConfigUpdateStatNames() const override { return cluster_config_update_stat_names_; @@ -95,7 +94,6 @@ class MockClusterManager : public ClusterManager { absl::optional local_cluster_name_; NiceMock cluster_manager_factory_; NiceMock subscription_factory_; - NiceMock multiplexed_subscription_factory_; absl::flat_hash_map> active_clusters_; absl::flat_hash_map> warming_clusters_; Stats::TestUtil::TestSymbolTable symbol_table_; diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index 2ba06fbfb282..4c71a01c7fe9 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -924,7 +924,6 @@ mutator mutex mutexes mux -muxes muxed mysql namelen