Skip to content

Commit

Permalink
config: rename NewGrpcMuxImpl -> GrpcMuxImpl (#8919)
Browse files Browse the repository at this point in the history
Signed-off-by: Bill Gallagher <[email protected]>
  • Loading branch information
Bill Gallagher authored and mattklein123 committed Nov 14, 2019
1 parent b7bef67 commit 6d50553
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 176 deletions.
4 changes: 2 additions & 2 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ envoy_cc_library(

envoy_cc_library(
name = "grpc_mux_lib",
srcs = ["new_grpc_mux_impl.cc"],
hdrs = ["new_grpc_mux_impl.h"],
srcs = ["grpc_mux_impl.cc"],
hdrs = ["grpc_mux_impl.h"],
deps = [
":delta_subscription_state_lib",
":grpc_stream_lib",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "common/config/new_grpc_mux_impl.h"
#include "common/config/grpc_mux_impl.h"

#include "common/common/assert.h"
#include "common/common/backoff_strategy.h"
Expand All @@ -10,17 +10,17 @@
namespace Envoy {
namespace Config {

NewGrpcMuxImpl::NewGrpcMuxImpl(std::unique_ptr<SubscriptionStateFactory> subscription_state_factory,
bool skip_subsequent_node, const LocalInfo::LocalInfo& local_info)
GrpcMuxImpl::GrpcMuxImpl(std::unique_ptr<SubscriptionStateFactory> subscription_state_factory,
bool skip_subsequent_node, const LocalInfo::LocalInfo& local_info)
: subscription_state_factory_(std::move(subscription_state_factory)),
skip_subsequent_node_(skip_subsequent_node), local_info_(local_info) {
Config::Utility::checkLocalInfo("ads", local_info);
}

Watch* NewGrpcMuxImpl::addOrUpdateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) {
Watch* GrpcMuxImpl::addOrUpdateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) {
if (watch == nullptr) {
return addWatch(type_url, resources, callbacks, init_fetch_timeout);
} else {
Expand All @@ -29,24 +29,24 @@ Watch* NewGrpcMuxImpl::addOrUpdateWatch(const std::string& type_url, Watch* watc
}
}

void NewGrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) {
void GrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) {
updateWatch(type_url, watch, {});
watchMapFor(type_url).removeWatch(watch);
}

void NewGrpcMuxImpl::pause(const std::string& type_url) { pausable_ack_queue_.pause(type_url); }
void GrpcMuxImpl::pause(const std::string& type_url) { pausable_ack_queue_.pause(type_url); }

void NewGrpcMuxImpl::resume(const std::string& type_url) {
void GrpcMuxImpl::resume(const std::string& type_url) {
pausable_ack_queue_.resume(type_url);
trySendDiscoveryRequests();
}

bool NewGrpcMuxImpl::paused(const std::string& type_url) const {
bool GrpcMuxImpl::paused(const std::string& type_url) const {
return pausable_ack_queue_.paused(type_url);
}

void NewGrpcMuxImpl::genericHandleResponse(const std::string& type_url,
const void* response_proto_ptr) {
void GrpcMuxImpl::genericHandleResponse(const std::string& type_url,
const void* response_proto_ptr) {
auto sub = subscriptions_.find(type_url);
if (sub == subscriptions_.end()) {
ENVOY_LOG(warn,
Expand All @@ -59,23 +59,23 @@ void NewGrpcMuxImpl::genericHandleResponse(const std::string& type_url,
trySendDiscoveryRequests();
}

void NewGrpcMuxImpl::start() { establishGrpcStream(); }
void GrpcMuxImpl::start() { establishGrpcStream(); }

void NewGrpcMuxImpl::handleEstablishedStream() {
void GrpcMuxImpl::handleEstablishedStream() {
for (auto& sub : subscriptions_) {
sub.second->markStreamFresh();
}
set_any_request_sent_yet_in_current_stream(false);
trySendDiscoveryRequests();
}

void NewGrpcMuxImpl::disableInitFetchTimeoutTimer() {
void GrpcMuxImpl::disableInitFetchTimeoutTimer() {
for (auto& sub : subscriptions_) {
sub.second->disableInitFetchTimeoutTimer();
}
}

void NewGrpcMuxImpl::handleStreamEstablishmentFailure() {
void GrpcMuxImpl::handleStreamEstablishmentFailure() {
// If this happens while Envoy is still initializing, the onConfigUpdateFailed() we ultimately
// call on CDS will cause LDS to start up, which adds to subscriptions_ here. So, to avoid a
// crash, the iteration needs to dance around a little: collect pointers to all
Expand All @@ -95,9 +95,9 @@ void NewGrpcMuxImpl::handleStreamEstablishmentFailure() {
} while (all_subscribed.size() != subscriptions_.size());
}

Watch* NewGrpcMuxImpl::addWatch(const std::string& type_url, const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) {
Watch* GrpcMuxImpl::addWatch(const std::string& type_url, const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) {
auto watch_map = watch_maps_.find(type_url);
if (watch_map == watch_maps_.end()) {
// We don't yet have a subscription for type_url! Make one!
Expand All @@ -116,8 +116,8 @@ Watch* NewGrpcMuxImpl::addWatch(const std::string& type_url, const std::set<std:
// Updates the list of resource names watched by the given watch. If an added name is new across
// the whole subscription, or if a removed name has no other watch interested in it, then the
// subscription will enqueue and attempt to send an appropriate discovery request.
void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources) {
void GrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources) {
ASSERT(watch != nullptr);
SubscriptionState& sub = subscriptionStateFor(type_url);
WatchMap& watch_map = watchMapFor(type_url);
Expand All @@ -131,23 +131,23 @@ void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch,
}
}

SubscriptionState& NewGrpcMuxImpl::subscriptionStateFor(const std::string& type_url) {
SubscriptionState& GrpcMuxImpl::subscriptionStateFor(const std::string& type_url) {
auto sub = subscriptions_.find(type_url);
RELEASE_ASSERT(sub != subscriptions_.end(),
fmt::format("Tried to look up SubscriptionState for non-existent subscription {}.",
type_url));
return *sub->second;
}

WatchMap& NewGrpcMuxImpl::watchMapFor(const std::string& type_url) {
WatchMap& GrpcMuxImpl::watchMapFor(const std::string& type_url) {
auto watch_map = watch_maps_.find(type_url);
RELEASE_ASSERT(
watch_map != watch_maps_.end(),
fmt::format("Tried to look up WatchMap for non-existent subscription {}.", type_url));
return *watch_map->second;
}

void NewGrpcMuxImpl::trySendDiscoveryRequests() {
void GrpcMuxImpl::trySendDiscoveryRequests() {
while (true) {
// Do any of our subscriptions even want to send a request?
absl::optional<std::string> request_type_if_any = whoWantsToSendDiscoveryRequest();
Expand Down Expand Up @@ -179,7 +179,7 @@ void NewGrpcMuxImpl::trySendDiscoveryRequests() {

// Checks whether external conditions allow sending a discovery request. (Does not check
// whether we *want* to send a discovery request).
bool NewGrpcMuxImpl::canSendDiscoveryRequest(const std::string& type_url) {
bool GrpcMuxImpl::canSendDiscoveryRequest(const std::string& type_url) {
RELEASE_ASSERT(
!pausable_ack_queue_.paused(type_url),
fmt::format("canSendDiscoveryRequest() called on paused type_url {}. Pausedness is "
Expand All @@ -202,7 +202,7 @@ bool NewGrpcMuxImpl::canSendDiscoveryRequest(const std::string& type_url) {
// First, prioritizes ACKs over non-ACK subscription interest updates.
// Then, prioritizes non-ACK updates in the order the various types
// of subscriptions were activated.
absl::optional<std::string> NewGrpcMuxImpl::whoWantsToSendDiscoveryRequest() {
absl::optional<std::string> GrpcMuxImpl::whoWantsToSendDiscoveryRequest() {
// All ACKs are sent before plain updates. trySendDiscoveryRequests() relies on this. So, choose
// type_url from pausable_ack_queue_ if possible, before looking at pending updates.
if (!pausable_ack_queue_.empty()) {
Expand All @@ -225,8 +225,8 @@ GrpcMuxDelta::GrpcMuxDelta(Grpc::RawAsyncClientPtr&& async_client, Event::Dispat
Runtime::RandomGenerator& random, Stats::Scope& scope,
const RateLimitSettings& rate_limit_settings,
const LocalInfo::LocalInfo& local_info, bool skip_subsequent_node)
: NewGrpcMuxImpl(std::make_unique<DeltaSubscriptionStateFactory>(dispatcher),
skip_subsequent_node, local_info),
: GrpcMuxImpl(std::make_unique<DeltaSubscriptionStateFactory>(dispatcher), skip_subsequent_node,
local_info),
grpc_stream_(this, std::move(async_client), service_method, random, dispatcher, scope,
rate_limit_settings) {}

Expand Down Expand Up @@ -260,8 +260,8 @@ GrpcMuxSotw::GrpcMuxSotw(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatch
Runtime::RandomGenerator& random, Stats::Scope& scope,
const RateLimitSettings& rate_limit_settings,
const LocalInfo::LocalInfo& local_info, bool skip_subsequent_node)
: NewGrpcMuxImpl(std::make_unique<SotwSubscriptionStateFactory>(dispatcher),
skip_subsequent_node, local_info),
: GrpcMuxImpl(std::make_unique<SotwSubscriptionStateFactory>(dispatcher), skip_subsequent_node,
local_info),
grpc_stream_(this, std::move(async_client), service_method, random, dispatcher, scope,
rate_limit_settings) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ namespace Config {
// This class owns the GrpcStream used to talk to the server, maintains queuing
// logic to properly order the subscription(s)' various messages, and allows
// starting/stopping/pausing of the subscriptions.
class NewGrpcMuxImpl : public GrpcMux, Logger::Loggable<Logger::Id::config> {
class GrpcMuxImpl : public GrpcMux, Logger::Loggable<Logger::Id::config> {
public:
NewGrpcMuxImpl(std::unique_ptr<SubscriptionStateFactory> subscription_state_factory,
bool skip_subsequent_node, const LocalInfo::LocalInfo& local_info);
GrpcMuxImpl(std::unique_ptr<SubscriptionStateFactory> subscription_state_factory,
bool skip_subsequent_node, const LocalInfo::LocalInfo& local_info);

Watch* addOrUpdateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources, SubscriptionCallbacks& callbacks,
Expand Down Expand Up @@ -120,7 +120,7 @@ class NewGrpcMuxImpl : public GrpcMux, Logger::Loggable<Logger::Id::config> {
const LocalInfo::LocalInfo& local_info_;
};

class GrpcMuxDelta : public NewGrpcMuxImpl,
class GrpcMuxDelta : public GrpcMuxImpl,
public GrpcStreamCallbacks<envoy::api::v2::DeltaDiscoveryResponse> {
public:
GrpcMuxDelta(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher,
Expand All @@ -147,7 +147,7 @@ class GrpcMuxDelta : public NewGrpcMuxImpl,
grpc_stream_;
};

class GrpcMuxSotw : public NewGrpcMuxImpl,
class GrpcMuxSotw : public GrpcMuxImpl,
public GrpcStreamCallbacks<envoy::api::v2::DiscoveryResponse> {
public:
GrpcMuxSotw(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher,
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include "envoy/config/subscription.h"

#include "common/config/new_grpc_mux_impl.h"
#include "common/config/grpc_mux_impl.h"
#include "common/config/utility.h"

namespace Envoy {
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/subscription_factory_impl.cc
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#include "common/config/subscription_factory_impl.h"

#include "common/config/filesystem_subscription_impl.h"
#include "common/config/grpc_mux_impl.h"
#include "common/config/grpc_subscription_impl.h"
#include "common/config/http_subscription_impl.h"
#include "common/config/new_grpc_mux_impl.h"
#include "common/config/type_to_endpoint.h"
#include "common/config/utility.h"
#include "common/protobuf/protobuf.h"
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "envoy/upstream/cluster_manager.h"

#include "common/common/cleanup.h"
#include "common/config/new_grpc_mux_impl.h"
#include "common/config/grpc_mux_impl.h"
#include "common/config/subscription_factory_impl.h"
#include "common/http/async_client_impl.h"
#include "common/upstream/load_stats_reporter.h"
Expand Down
22 changes: 0 additions & 22 deletions test/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -94,28 +94,6 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "new_grpc_mux_impl_test",
srcs = ["new_grpc_mux_impl_test.cc"],
deps = [
"//source/common/config:grpc_mux_lib",
"//source/common/config:protobuf_link_hacks",
"//source/common/config:resources_lib",
"//source/common/protobuf",
"//source/common/stats:isolated_store_lib",
"//test/mocks:common_lib",
"//test/mocks/config:config_mocks",
"//test/mocks/event:event_mocks",
"//test/mocks/grpc:grpc_mocks",
"//test/mocks/local_info:local_info_mocks",
"//test/mocks/runtime:runtime_mocks",
"//test/test_common:logging_lib",
"//test/test_common:simulated_time_system_lib",
"//test/test_common:utility_lib",
"@envoy_api//envoy/api/v2:pkg_cc_proto",
],
)

envoy_cc_test(
name = "grpc_stream_test",
srcs = ["grpc_stream_test.cc"],
Expand Down
2 changes: 1 addition & 1 deletion test/common/config/grpc_mux_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "envoy/api/v2/eds.pb.h"

#include "common/common/empty_string.h"
#include "common/config/new_grpc_mux_impl.h"
#include "common/config/grpc_mux_impl.h"
#include "common/config/protobuf_link_hacks.h"
#include "common/config/resources.h"
#include "common/config/utility.h"
Expand Down
Loading

0 comments on commit 6d50553

Please sign in to comment.