Skip to content

Commit

Permalink
Revert "Optimize EDS stream allocation (#22419)" (#25157)
Browse files Browse the repository at this point in the history
This reverts commit 97a7f00.

There was a crash reported by go-control-plane attributed to this change:
envoyproxy/go-control-plane#625

Signed-off-by: Greg Greenway <[email protected]>
  • Loading branch information
ggreenway authored Jan 26, 2023
1 parent a874dc1 commit 8affda4
Show file tree
Hide file tree
Showing 26 changed files with 68 additions and 1,478 deletions.
9 changes: 0 additions & 9 deletions envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,15 +369,6 @@ class ClusterManager {
*/
virtual Config::SubscriptionFactory& subscriptionFactory() PURE;

/**
* Obtain multiplexed subscription factory for the cluster manager.
* This factory shares mux per management server per xds resource type which reduces number of
* concurrent active grpc streams.
*
* @return Config::SubscriptionFactory& multiplexed subscription factory.
*/
virtual Config::SubscriptionFactory& multiplexedSubscriptionFactory() PURE;

/**
* Returns a struct with all the Stats::StatName objects needed by
* Clusters. This helps factor out some relatively heavy name
Expand Down
114 changes: 54 additions & 60 deletions source/common/config/subscription_factory_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "envoy/config/core/v3/config_source.pb.h"
#include "envoy/config/xds_resources_delegate.h"

#include "source/common/config/custom_config_validators_impl.h"
#include "source/common/config/filesystem_subscription_impl.h"
#include "source/common/config/grpc_mux_impl.h"
#include "source/common/config/grpc_subscription_impl.h"
Expand Down Expand Up @@ -71,18 +72,66 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(
Utility::apiConfigSourceRequestTimeout(api_config_source), restMethod(type_url), type_url,
callbacks, resource_decoder, stats, Utility::configSourceInitialFetchTimeout(config),
validation_visitor_);
case envoy::config::core::v3::ApiConfigSource::GRPC:
case envoy::config::core::v3::ApiConfigSource::DELTA_GRPC: {
case envoy::config::core::v3::ApiConfigSource::GRPC: {
GrpcMuxSharedPtr mux;
CustomConfigValidatorsPtr custom_config_validators =
std::make_unique<CustomConfigValidatorsImpl>(validation_visitor_, server_,
api_config_source.config_validators());
GrpcMuxSharedPtr mux =
getOrCreateMux(api_config_source, type_url, scope, custom_config_validators);
const std::string control_plane_id =
Utility::getGrpcControlPlane(api_config_source).value_or("");

if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
mux = std::make_shared<Config::XdsMux::GrpcMuxSotw>(
Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source,
scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, sotwGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source), local_info_,
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
xds_config_tracker_, xds_resources_delegate_, control_plane_id);
} else {
mux = std::make_shared<Config::GrpcMuxImpl>(
local_info_,
Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source,
scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, sotwGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source),
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
xds_config_tracker_, xds_resources_delegate_, control_plane_id);
}
return std::make_unique<GrpcSubscriptionImpl>(
std::move(mux), callbacks, resource_decoder, stats, type_url, dispatcher_,
Utility::configSourceInitialFetchTimeout(config),
/*is_aggregated*/ false, options);
}
case envoy::config::core::v3::ApiConfigSource::DELTA_GRPC: {
GrpcMuxSharedPtr mux;
CustomConfigValidatorsPtr custom_config_validators =
std::make_unique<CustomConfigValidatorsImpl>(validation_visitor_, server_,
api_config_source.config_validators());
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
mux = std::make_shared<Config::XdsMux::GrpcMuxDelta>(
Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source,
scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source), local_info_,
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
xds_config_tracker_);
} else {
mux = std::make_shared<Config::NewGrpcMuxImpl>(
Config::Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(),
api_config_source, scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source), local_info_,
std::move(custom_config_validators), xds_config_tracker_);
}
return std::make_unique<GrpcSubscriptionImpl>(
std::move(mux), callbacks, resource_decoder, stats, type_url, dispatcher_,
Utility::configSourceInitialFetchTimeout(config), /*is_aggregated*/ false, options);
}
}
throw EnvoyException("Invalid API config source API type");
}
Expand All @@ -103,6 +152,7 @@ SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
Stats::Scope& scope, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder) {
SubscriptionStats stats = Utility::generateStats(scope);

switch (collection_locator.scheme()) {
case xds::core::v3::ResourceLocator::FILE: {
const std::string path = Http::Utility::localPathFromFilePath(collection_locator.id());
Expand Down Expand Up @@ -178,61 +228,5 @@ SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
}
}

GrpcMuxSharedPtr SubscriptionFactoryImpl::getOrCreateMux(
const envoy::config::core::v3::ApiConfigSource& api_config_source, absl::string_view type_url,
Stats::Scope& scope, CustomConfigValidatorsPtr& custom_config_validators) {
GrpcMuxSharedPtr mux;
switch (api_config_source.api_type()) {
case envoy::config::core::v3::ApiConfigSource::GRPC: {
const std::string control_plane_id =
Utility::getGrpcControlPlane(api_config_source).value_or("");
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
mux = std::make_shared<Config::XdsMux::GrpcMuxSotw>(
Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source,
scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, sotwGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source), local_info_,
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
xds_config_tracker_, xds_resources_delegate_, control_plane_id);
} else {
mux = std::make_shared<Config::GrpcMuxImpl>(
local_info_,
Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source,
scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, sotwGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source),
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
xds_config_tracker_, xds_resources_delegate_, control_plane_id);
}
break;
}
case envoy::config::core::v3::ApiConfigSource::DELTA_GRPC: {
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
mux = std::make_shared<Config::XdsMux::GrpcMuxDelta>(
Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source,
scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source), local_info_,
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
xds_config_tracker_);
} else {
mux = std::make_shared<Config::NewGrpcMuxImpl>(
Config::Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(),
api_config_source, scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source), local_info_,
std::move(custom_config_validators), xds_config_tracker_);
}
break;
}
default:
throw EnvoyException("Unsupported api type in api config source, cannot create GRPC mux.");
}
return mux;
}
} // namespace Config
} // namespace Envoy
7 changes: 0 additions & 7 deletions source/common/config/subscription_factory_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include "envoy/upstream/cluster_manager.h"

#include "source/common/common/logger.h"
#include "source/common/config/custom_config_validators_impl.h"

namespace Envoy {
namespace Config {
Expand All @@ -39,12 +38,6 @@ class SubscriptionFactoryImpl : public SubscriptionFactory, Logger::Loggable<Log
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder) override;

protected:
virtual GrpcMuxSharedPtr
getOrCreateMux(const envoy::config::core::v3::ApiConfigSource& api_config_source,
absl::string_view type_url, Stats::Scope& scope,
CustomConfigValidatorsPtr& custom_config_validators);

private:
const LocalInfo::LocalInfo& local_info_;
Event::Dispatcher& dispatcher_;
Expand Down
1 change: 0 additions & 1 deletion source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ RUNTIME_GUARD(envoy_reloadable_features_http_response_half_close);
RUNTIME_GUARD(envoy_reloadable_features_http_skip_adding_content_length_to_upgrade);
RUNTIME_GUARD(envoy_reloadable_features_http_strip_fragment_from_path_unsafe_if_disabled);
RUNTIME_GUARD(envoy_reloadable_features_lua_respond_with_send_local_reply);
RUNTIME_GUARD(envoy_reloadable_features_multiplex_eds);
RUNTIME_GUARD(envoy_reloadable_features_no_extension_lookup_by_name);
RUNTIME_GUARD(envoy_reloadable_features_no_full_scan_certs_on_sni_mismatch);
RUNTIME_GUARD(envoy_reloadable_features_oauth_header_passthrough_fix);
Expand Down
20 changes: 0 additions & 20 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,6 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "multiplexed_subscription_factory_lib",
srcs = ["multiplexed_subscription_factory.cc"],
hdrs = ["multiplexed_subscription_factory.h"],
deps = [
"//envoy/config:subscription_interface",
"//envoy/event:dispatcher_interface",
"//envoy/local_info:local_info_interface",
"//source/common/common:hash_lib",
"//source/common/config:custom_config_validators_lib",
"//source/common/config:grpc_mux_lib",
"//source/common/config:grpc_subscription_lib",
"//source/common/config:subscription_factory_lib",
"//source/common/config:utility_lib",
"//source/server:transport_socket_config_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "cluster_discovery_manager_lib",
srcs = ["cluster_discovery_manager.cc"],
Expand All @@ -103,7 +84,6 @@ envoy_cc_library(
":cluster_discovery_manager_lib",
":load_balancer_lib",
":load_stats_reporter_lib",
":multiplexed_subscription_factory_lib",
":od_cds_api_lib",
":ring_hash_lb_lib",
":subset_lb_lib",
Expand Down
5 changes: 0 additions & 5 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,6 @@ ClusterManagerImpl::ClusterManagerImpl(
server, makeOptRefFromPtr(xds_resources_delegate_.get()),
makeOptRefFromPtr(xds_config_tracker_.get()));

multiplexed_subscription_factory_ = std::make_unique<MultiplexedSubscriptionFactory>(
local_info, main_thread_dispatcher, *this, validation_context.dynamicValidationVisitor(), api,
server, makeOptRefFromPtr(xds_resources_delegate_.get()),
makeOptRefFromPtr(xds_config_tracker_.get()));

const auto& dyn_resources = bootstrap.dynamic_resources();

// Cluster loading happens in two phases: first all the primary clusters are loaded, and then all
Expand Down
6 changes: 0 additions & 6 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
#include "source/common/upstream/cluster_discovery_manager.h"
#include "source/common/upstream/host_utility.h"
#include "source/common/upstream/load_stats_reporter.h"
#include "source/common/upstream/multiplexed_subscription_factory.h"
#include "source/common/upstream/od_cds_api_impl.h"
#include "source/common/upstream/priority_conn_pool_map.h"
#include "source/common/upstream/upstream_impl.h"
Expand Down Expand Up @@ -329,10 +328,6 @@ class ClusterManagerImpl : public ClusterManager,

Config::SubscriptionFactory& subscriptionFactory() override { return *subscription_factory_; }

Config::SubscriptionFactory& multiplexedSubscriptionFactory() override {
return *multiplexed_subscription_factory_;
}

void
initializeSecondaryClusters(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) override;

Expand Down Expand Up @@ -817,7 +812,6 @@ class ClusterManagerImpl : public ClusterManager,
ClusterTimeoutBudgetStatNames cluster_timeout_budget_stat_names_;

std::unique_ptr<Config::SubscriptionFactoryImpl> subscription_factory_;
std::unique_ptr<MultiplexedSubscriptionFactory> multiplexed_subscription_factory_;
ClusterSet primary_clusters_;

std::unique_ptr<Config::XdsResourcesDelegate> xds_resources_delegate_;
Expand Down
39 changes: 0 additions & 39 deletions source/common/upstream/multiplexed_subscription_factory.cc

This file was deleted.

51 changes: 0 additions & 51 deletions source/common/upstream/multiplexed_subscription_factory.h

This file was deleted.

1 change: 0 additions & 1 deletion source/extensions/clusters/eds/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ envoy_cc_extension(
"//source/common/network:utility_lib",
"//source/common/protobuf:utility_lib",
"//source/common/upstream:cluster_factory_lib",
"//source/common/upstream:multiplexed_subscription_factory_lib",
"//source/common/upstream:upstream_includes",
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
Expand Down
Loading

0 comments on commit 8affda4

Please sign in to comment.