Skip to content

Commit

Permalink
Optimize EDS stream allocation (envoyproxy#22419)
Browse files Browse the repository at this point in the history
Signed-off-by: Kateryna Nezdolii <[email protected]>
  • Loading branch information
Kateryna Nezdolii authored Jan 18, 2023
1 parent 38ced99 commit 97a7f00
Show file tree
Hide file tree
Showing 27 changed files with 1,481 additions and 68 deletions.
4 changes: 4 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ minor_behavior_changes:

bug_fixes:
# *Changes expected to improve the state of the world and are unlikely to have negative effects*
- area: eds
change: |
added ``envoy.reloadable_features.multiplex_eds`` to disable eds multiplexing. Eds multiplexing is enabled by default, so that all subscriptions for the same resource type and management server reuse a single channel/mux.
When eds multiplexing is disabled each subscription uses a dedicated channel/mux.
removed_config_or_runtime:
# *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`
Expand Down
9 changes: 9 additions & 0 deletions envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,15 @@ class ClusterManager {
*/
virtual Config::SubscriptionFactory& subscriptionFactory() PURE;

/**
* Obtain multiplexed subscription factory for the cluster manager.
* This factory shares mux per management server per xds resource type which reduces number of
* concurrent active grpc streams.
*
* @return Config::SubscriptionFactory& multiplexed subscription factory.
*/
virtual Config::SubscriptionFactory& multiplexedSubscriptionFactory() PURE;

/**
* Returns a struct with all the Stats::StatName objects needed by
* Clusters. This helps factor out some relatively heavy name
Expand Down
114 changes: 60 additions & 54 deletions source/common/config/subscription_factory_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include "envoy/config/core/v3/config_source.pb.h"
#include "envoy/config/xds_resources_delegate.h"

#include "source/common/config/custom_config_validators_impl.h"
#include "source/common/config/filesystem_subscription_impl.h"
#include "source/common/config/grpc_mux_impl.h"
#include "source/common/config/grpc_subscription_impl.h"
Expand Down Expand Up @@ -72,66 +71,18 @@ SubscriptionPtr SubscriptionFactoryImpl::subscriptionFromConfigSource(
Utility::apiConfigSourceRequestTimeout(api_config_source), restMethod(type_url), type_url,
callbacks, resource_decoder, stats, Utility::configSourceInitialFetchTimeout(config),
validation_visitor_);
case envoy::config::core::v3::ApiConfigSource::GRPC: {
GrpcMuxSharedPtr mux;
case envoy::config::core::v3::ApiConfigSource::GRPC:
case envoy::config::core::v3::ApiConfigSource::DELTA_GRPC: {
CustomConfigValidatorsPtr custom_config_validators =
std::make_unique<CustomConfigValidatorsImpl>(validation_visitor_, server_,
api_config_source.config_validators());
const std::string control_plane_id =
Utility::getGrpcControlPlane(api_config_source).value_or("");

if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
mux = std::make_shared<Config::XdsMux::GrpcMuxSotw>(
Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source,
scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, sotwGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source), local_info_,
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
xds_config_tracker_, xds_resources_delegate_, control_plane_id);
} else {
mux = std::make_shared<Config::GrpcMuxImpl>(
local_info_,
Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source,
scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, sotwGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source),
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
xds_config_tracker_, xds_resources_delegate_, control_plane_id);
}
GrpcMuxSharedPtr mux =
getOrCreateMux(api_config_source, type_url, scope, custom_config_validators);
return std::make_unique<GrpcSubscriptionImpl>(
std::move(mux), callbacks, resource_decoder, stats, type_url, dispatcher_,
Utility::configSourceInitialFetchTimeout(config),
/*is_aggregated*/ false, options);
}
case envoy::config::core::v3::ApiConfigSource::DELTA_GRPC: {
GrpcMuxSharedPtr mux;
CustomConfigValidatorsPtr custom_config_validators =
std::make_unique<CustomConfigValidatorsImpl>(validation_visitor_, server_,
api_config_source.config_validators());
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
mux = std::make_shared<Config::XdsMux::GrpcMuxDelta>(
Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source,
scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source), local_info_,
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
xds_config_tracker_);
} else {
mux = std::make_shared<Config::NewGrpcMuxImpl>(
Config::Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(),
api_config_source, scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source), local_info_,
std::move(custom_config_validators), xds_config_tracker_);
}
return std::make_unique<GrpcSubscriptionImpl>(
std::move(mux), callbacks, resource_decoder, stats, type_url, dispatcher_,
Utility::configSourceInitialFetchTimeout(config), /*is_aggregated*/ false, options);
}
}
throw EnvoyException("Invalid API config source API type");
}
Expand All @@ -152,7 +103,6 @@ SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
Stats::Scope& scope, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder) {
SubscriptionStats stats = Utility::generateStats(scope);

switch (collection_locator.scheme()) {
case xds::core::v3::ResourceLocator::FILE: {
const std::string path = Http::Utility::localPathFromFilePath(collection_locator.id());
Expand Down Expand Up @@ -228,5 +178,61 @@ SubscriptionPtr SubscriptionFactoryImpl::collectionSubscriptionFromUrl(
}
}

GrpcMuxSharedPtr SubscriptionFactoryImpl::getOrCreateMux(
const envoy::config::core::v3::ApiConfigSource& api_config_source, absl::string_view type_url,
Stats::Scope& scope, CustomConfigValidatorsPtr& custom_config_validators) {
GrpcMuxSharedPtr mux;
switch (api_config_source.api_type()) {
case envoy::config::core::v3::ApiConfigSource::GRPC: {
const std::string control_plane_id =
Utility::getGrpcControlPlane(api_config_source).value_or("");
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
mux = std::make_shared<Config::XdsMux::GrpcMuxSotw>(
Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source,
scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, sotwGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source), local_info_,
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
xds_config_tracker_, xds_resources_delegate_, control_plane_id);
} else {
mux = std::make_shared<Config::GrpcMuxImpl>(
local_info_,
Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source,
scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, sotwGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source),
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
xds_config_tracker_, xds_resources_delegate_, control_plane_id);
}
break;
}
case envoy::config::core::v3::ApiConfigSource::DELTA_GRPC: {
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
mux = std::make_shared<Config::XdsMux::GrpcMuxDelta>(
Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(), api_config_source,
scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source), local_info_,
api_config_source.set_node_on_first_message_only(), std::move(custom_config_validators),
xds_config_tracker_);
} else {
mux = std::make_shared<Config::NewGrpcMuxImpl>(
Config::Utility::factoryForGrpcApiConfigSource(cm_.grpcAsyncClientManager(),
api_config_source, scope, true)
->createUncachedRawAsyncClient(),
dispatcher_, deltaGrpcMethod(type_url), api_.randomGenerator(), scope,
Utility::parseRateLimitSettings(api_config_source), local_info_,
std::move(custom_config_validators), xds_config_tracker_);
}
break;
}
default:
throw EnvoyException("Unsupported api type in api config source, cannot create GRPC mux.");
}
return mux;
}
} // namespace Config
} // namespace Envoy
7 changes: 7 additions & 0 deletions source/common/config/subscription_factory_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "envoy/upstream/cluster_manager.h"

#include "source/common/common/logger.h"
#include "source/common/config/custom_config_validators_impl.h"

namespace Envoy {
namespace Config {
Expand All @@ -38,6 +39,12 @@ class SubscriptionFactoryImpl : public SubscriptionFactory, Logger::Loggable<Log
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoderSharedPtr resource_decoder) override;

protected:
virtual GrpcMuxSharedPtr
getOrCreateMux(const envoy::config::core::v3::ApiConfigSource& api_config_source,
absl::string_view type_url, Stats::Scope& scope,
CustomConfigValidatorsPtr& custom_config_validators);

private:
const LocalInfo::LocalInfo& local_info_;
Event::Dispatcher& dispatcher_;
Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ RUNTIME_GUARD(envoy_reloadable_features_http_skip_adding_content_length_to_upgra
RUNTIME_GUARD(envoy_reloadable_features_http_strip_fragment_from_path_unsafe_if_disabled);
RUNTIME_GUARD(envoy_reloadable_features_local_ratelimit_match_all_descriptors);
RUNTIME_GUARD(envoy_reloadable_features_lua_respond_with_send_local_reply);
RUNTIME_GUARD(envoy_reloadable_features_multiplex_eds);
RUNTIME_GUARD(envoy_reloadable_features_no_extension_lookup_by_name);
RUNTIME_GUARD(envoy_reloadable_features_no_full_scan_certs_on_sni_mismatch);
RUNTIME_GUARD(envoy_reloadable_features_oauth_header_passthrough_fix);
Expand Down
20 changes: 20 additions & 0 deletions source/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,25 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "multiplexed_subscription_factory_lib",
srcs = ["multiplexed_subscription_factory.cc"],
hdrs = ["multiplexed_subscription_factory.h"],
deps = [
"//envoy/config:subscription_interface",
"//envoy/event:dispatcher_interface",
"//envoy/local_info:local_info_interface",
"//source/common/common:hash_lib",
"//source/common/config:custom_config_validators_lib",
"//source/common/config:grpc_mux_lib",
"//source/common/config:grpc_subscription_lib",
"//source/common/config:subscription_factory_lib",
"//source/common/config:utility_lib",
"//source/server:transport_socket_config_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "cluster_discovery_manager_lib",
srcs = ["cluster_discovery_manager.cc"],
Expand All @@ -84,6 +103,7 @@ envoy_cc_library(
":cluster_discovery_manager_lib",
":load_balancer_lib",
":load_stats_reporter_lib",
":multiplexed_subscription_factory_lib",
":od_cds_api_lib",
":ring_hash_lb_lib",
":subset_lb_lib",
Expand Down
5 changes: 5 additions & 0 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,11 @@ ClusterManagerImpl::ClusterManagerImpl(
server, makeOptRefFromPtr(xds_resources_delegate_.get()),
makeOptRefFromPtr(xds_config_tracker_.get()));

multiplexed_subscription_factory_ = std::make_unique<MultiplexedSubscriptionFactory>(
local_info, main_thread_dispatcher, *this, validation_context.dynamicValidationVisitor(), api,
server, makeOptRefFromPtr(xds_resources_delegate_.get()),
makeOptRefFromPtr(xds_config_tracker_.get()));

const auto& dyn_resources = bootstrap.dynamic_resources();

// Cluster loading happens in two phases: first all the primary clusters are loaded, and then all
Expand Down
6 changes: 6 additions & 0 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "source/common/upstream/cluster_discovery_manager.h"
#include "source/common/upstream/host_utility.h"
#include "source/common/upstream/load_stats_reporter.h"
#include "source/common/upstream/multiplexed_subscription_factory.h"
#include "source/common/upstream/od_cds_api_impl.h"
#include "source/common/upstream/priority_conn_pool_map.h"
#include "source/common/upstream/upstream_impl.h"
Expand Down Expand Up @@ -328,6 +329,10 @@ class ClusterManagerImpl : public ClusterManager,

Config::SubscriptionFactory& subscriptionFactory() override { return *subscription_factory_; }

Config::SubscriptionFactory& multiplexedSubscriptionFactory() override {
return *multiplexed_subscription_factory_;
}

void
initializeSecondaryClusters(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) override;

Expand Down Expand Up @@ -812,6 +817,7 @@ class ClusterManagerImpl : public ClusterManager,
ClusterTimeoutBudgetStatNames cluster_timeout_budget_stat_names_;

std::unique_ptr<Config::SubscriptionFactoryImpl> subscription_factory_;
std::unique_ptr<MultiplexedSubscriptionFactory> multiplexed_subscription_factory_;
ClusterSet primary_clusters_;

std::unique_ptr<Config::XdsResourcesDelegate> xds_resources_delegate_;
Expand Down
39 changes: 39 additions & 0 deletions source/common/upstream/multiplexed_subscription_factory.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#include "source/common/upstream/multiplexed_subscription_factory.h"

#include "source/common/common/hash.h"
#include "source/common/config/utility.h"

namespace Envoy {
namespace Upstream {

MultiplexedSubscriptionFactory::MultiplexedSubscriptionFactory(
const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher,
Upstream::ClusterManager& cm, ProtobufMessage::ValidationVisitor& validation_visitor,
Api::Api& api, const Server::Instance& server,
Config::XdsResourcesDelegateOptRef xds_resources_delegate,
Config::XdsConfigTrackerOptRef xds_config_tracker)
: Config::SubscriptionFactoryImpl(local_info, dispatcher, cm, validation_visitor, api, server,
xds_resources_delegate, xds_config_tracker){};

Config::GrpcMuxSharedPtr MultiplexedSubscriptionFactory::getOrCreateMux(
const envoy::config::core::v3::ApiConfigSource& config_source, absl::string_view type_url,
Stats::Scope& scope, Config::CustomConfigValidatorsPtr& custom_config_validators) {
if (config_source.api_type() == envoy::config::core::v3::ApiConfigSource::GRPC ||
config_source.api_type() == envoy::config::core::v3::ApiConfigSource::DELTA_GRPC) {
const uint64_t xds_server_hash = MessageUtil::hash(config_source.grpc_services(0));
const uint64_t xds_type_hash = HashUtil::xxHash64(type_url);
const uint64_t mux_key = xds_server_hash ^ xds_type_hash;
if (muxes_.find(mux_key) == muxes_.end()) {
muxes_.emplace(
std::make_pair(mux_key, Config::SubscriptionFactoryImpl::getOrCreateMux(
config_source, type_url, scope, custom_config_validators)));
}
return muxes_.at(mux_key);
} else {
return Config::SubscriptionFactoryImpl::getOrCreateMux(config_source, type_url, scope,
custom_config_validators);
}
}

} // namespace Upstream
} // namespace Envoy
51 changes: 51 additions & 0 deletions source/common/upstream/multiplexed_subscription_factory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#pragma once

#include "envoy/config/core/v3/base.pb.h"
#include "envoy/config/subscription.h"
#include "envoy/stats/scope.h"

#include "source/common/common/assert.h"
#include "source/common/config/custom_config_validators_impl.h"
#include "source/common/config/grpc_mux_impl.h"
#include "source/common/config/grpc_subscription_impl.h"
#include "source/common/config/subscription_factory_impl.h"
#include "source/common/config/utility.h"
#include "source/common/protobuf/protobuf.h"
#include "source/server/transport_socket_config_impl.h"

/**
* MultiplexedSubscriptionFactory is used for instantiation of XDS subscriptions so as to minimize
* the number of open grpc connections used by these subscriptions. This is done by sharing a grpc
* multiplexer between subscriptions handled per subscription server and xds resource type. Please
* see https://github.com/envoyproxy/envoy/issues/2943 for additional information and related
* issues.
*
*/

namespace Envoy {
namespace Upstream {

class MultiplexedSubscriptionFactory : public Config::SubscriptionFactoryImpl {
public:
~MultiplexedSubscriptionFactory() override = default;

MultiplexedSubscriptionFactory(const LocalInfo::LocalInfo& local_info,
Event::Dispatcher& dispatcher, Upstream::ClusterManager& cm,
ProtobufMessage::ValidationVisitor& validation_visitor,
Api::Api& api, const Server::Instance& server,
Config::XdsResourcesDelegateOptRef xds_resources_delegate,
Config::XdsConfigTrackerOptRef xds_config_tracker);

protected:
// Config::SubscriptionFactoryImpl
Config::GrpcMuxSharedPtr
getOrCreateMux(const envoy::config::core::v3::ApiConfigSource& api_config_source,
absl::string_view type_url, Stats::Scope& scope,
Config::CustomConfigValidatorsPtr& custom_config_validators) override;

private:
absl::flat_hash_map<uint64_t, Config::GrpcMuxSharedPtr> muxes_;
friend class MultiplexedSubscriptionFactoryPeer;
};
} // namespace Upstream
} // namespace Envoy
1 change: 1 addition & 0 deletions source/extensions/clusters/eds/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ envoy_cc_extension(
"//source/common/network:utility_lib",
"//source/common/protobuf:utility_lib",
"//source/common/upstream:cluster_factory_lib",
"//source/common/upstream:multiplexed_subscription_factory_lib",
"//source/common/upstream:upstream_includes",
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
Expand Down
Loading

0 comments on commit 97a7f00

Please sign in to comment.