Skip to content

Commit

Permalink
Revert "config: minor cleanup: remove DeltaSubscriptionState::resourc…
Browse files Browse the repository at this point in the history
…e_names_ (#8918)"

This reverts commit 80aedc1.

Revert "config: rename NewGrpcMuxImpl -> GrpcMuxImpl (#8919)"
This reverts commit 6d50553.

Revert "config: reinstate #8478 (unification of delta and SotW xDS), reverted by #8939 (#8974)"
This reverts commit a37522c.

Signed-off-by: Matt Klein <[email protected]>
  • Loading branch information
mattklein123 committed Nov 15, 2019
1 parent a29a083 commit c3f1125
Show file tree
Hide file tree
Showing 48 changed files with 1,845 additions and 1,661 deletions.
11 changes: 7 additions & 4 deletions api/envoy/api/v2/core/config_source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ message ApiConfigSource {
// the v2 protos is used.
REST = 1;

// "State of the world" gRPC v2 API, using Discovery{Request,Response} protos.
// gRPC v2 API.
GRPC = 2;

// "Delta" gRPC v2 API, using DeltaDiscovery{Request,Response} protos.
// Rather than sending Envoy the entire state with every update, the xDS server
// only sends what has changed since the last update.
// Using the delta xDS gRPC service, i.e. DeltaDiscovery{Request,Response}
// rather than Discovery{Request,Response}. Rather than sending Envoy the entire state
// with every update, the xDS server only sends what has changed since the last update.
//
// DELTA_GRPC is not yet entirely implemented! Initially, only CDS is available.
// Do not use for other xDSes. TODO(fredlas) update/remove this warning when appropriate.
DELTA_GRPC = 3;
}

Expand Down
11 changes: 7 additions & 4 deletions api/envoy/api/v3alpha/core/config_source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ message ApiConfigSource {
// the v2 protos is used.
REST = 1;

// "State of the world" gRPC v2 API, using Discovery{Request,Response} protos.
// gRPC v2 API.
GRPC = 2;

// "Delta" gRPC v2 API, using DeltaDiscovery{Request,Response} protos.
// Rather than sending Envoy the entire state with every update, the xDS server
// only sends what has changed since the last update.
// Using the delta xDS gRPC service, i.e. DeltaDiscovery{Request,Response}
// rather than Discovery{Request,Response}. Rather than sending Envoy the entire state
// with every update, the xDS server only sends what has changed since the last update.
//
// DELTA_GRPC is not yet entirely implemented! Initially, only CDS is available.
// Do not use for other xDSes. TODO(fredlas) update/remove this warning when appropriate.
DELTA_GRPC = 3;
}

Expand Down
89 changes: 64 additions & 25 deletions include/envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,48 @@ struct ControlPlaneStats {
ALL_CONTROL_PLANE_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT)
};

// TODO(fredlas) redundant to SubscriptionCallbacks; remove this one.
class GrpcMuxCallbacks {
public:
virtual ~GrpcMuxCallbacks() = default;

/**
* Called when a configuration update is received.
* @param resources vector of fetched resources corresponding to the configuration update.
* @param version_info update version.
* @throw EnvoyException with reason if the configuration is rejected. Otherwise the configuration
* is accepted. Accepted configurations have their version_info reflected in subsequent
* requests.
*/
virtual void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) PURE;

/**
* Called when either the subscription is unable to fetch a config update or when onConfigUpdate
* invokes an exception.
* @param reason supplies the update failure reason.
* @param e supplies any exception data on why the fetch failed. May be nullptr.
*/
virtual void onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException* e) PURE;

/**
* Obtain the "name" of a v2 API resource in a google.protobuf.Any, e.g. the route config name for
* a RouteConfiguration, based on the underlying resource type.
*/
virtual std::string resourceName(const ProtobufWkt::Any& resource) PURE;
};

/**
* Handle on an muxed gRPC subscription. The subscription is canceled on destruction.
*/
class GrpcMuxWatch {
public:
virtual ~GrpcMuxWatch() = default;
};

using GrpcMuxWatchPtr = std::unique_ptr<GrpcMuxWatch>;

struct Watch;

/**
Expand All @@ -40,12 +82,27 @@ class GrpcMux {
*/
virtual void start() PURE;

/**
* Start a configuration subscription asynchronously for some API type and resources.
* @param type_url type URL corresponding to xDS API, e.g.
* type.googleapis.com/envoy.api.v2.Cluster.
* @param resources set of resource names to watch for. If this is empty, then all
* resources for type_url will result in callbacks.
* @param callbacks the callbacks to be notified of configuration updates. These must be valid
* until GrpcMuxWatch is destroyed.
* @return GrpcMuxWatchPtr a handle to cancel the subscription with. E.g. when a cluster goes
* away, its EDS updates should be cancelled by destroying the GrpcMuxWatchPtr.
*/
virtual GrpcMuxWatchPtr subscribe(const std::string& type_url,
const std::set<std::string>& resources,
GrpcMuxCallbacks& callbacks) PURE;

/**
* Pause discovery requests for a given API type. This is useful when we're processing an update
* for LDS or CDS and don't want a flood of updates for RDS or EDS respectively. Discovery
* requests may later be resumed with resume().
* @param type_url type URL corresponding to xDS API, e.g.
* type.googleapis.com/envoy.api.v2.Cluster
* type.googleapis.com/envoy.api.v2.Cluster.
*/
virtual void pause(const std::string& type_url) PURE;

Expand All @@ -56,30 +113,18 @@ class GrpcMux {
*/
virtual void resume(const std::string& type_url) PURE;

// TODO(fredlas) PR #8478 will remove this.
/**
* Registers a GrpcSubscription with the GrpcMux. 'watch' may be null (meaning this is an add),
* or it may be the Watch* previously returned by this function (which makes it an update).
* @param type_url type URL corresponding to xDS API e.g. type.googleapis.com/envoy.api.v2.Cluster
* @param watch the Watch* to be updated, or nullptr to add one.
* @param resources the set of resource names for 'watch' to start out interested in. If empty,
* 'watch' is treated as interested in *all* resources (of type type_url).
* @param callbacks the callbacks that receive updates for 'resources' when they arrive.
* @param init_fetch_timeout how long to wait for this new subscription's first update. Ignored
* unless the addOrUpdateWatch() call is the first for 'type_url'.
* @return Watch* the opaque watch token added or updated, to be used in future addOrUpdateWatch
* calls.
* Whether this GrpcMux is delta.
* @return bool whether this GrpcMux is delta.
*/
virtual bool isDelta() const PURE;

// For delta
virtual Watch* addOrUpdateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) PURE;

/**
* Cleanup of a Watch* added by addOrUpdateWatch(). Receiving a Watch* from addOrUpdateWatch()
* makes you responsible for eventually invoking this cleanup.
* @param type_url type URL corresponding to xDS API e.g. type.googleapis.com/envoy.api.v2.Cluster
* @param watch the watch to be cleaned up.
*/
virtual void removeWatch(const std::string& type_url, Watch* watch) PURE;

/**
Expand All @@ -89,12 +134,6 @@ class GrpcMux {
* @return bool whether the API is paused.
*/
virtual bool paused(const std::string& type_url) const PURE;

/**
* Passes through to all multiplexed SubscriptionStates. To be called when something
* definitive happens with the initial fetch: either an update is successfully received,
* or some sort of error happened.*/
virtual void disableInitFetchTimeoutTimer() PURE;
};

using GrpcMuxPtr = std::unique_ptr<GrpcMux>;
Expand Down
95 changes: 57 additions & 38 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ envoy_cc_library(
)

envoy_cc_library(
name = "grpc_subscription_lib",
srcs = ["grpc_subscription_impl.cc"],
hdrs = ["grpc_subscription_impl.h"],
name = "delta_subscription_lib",
srcs = ["delta_subscription_impl.cc"],
hdrs = ["delta_subscription_impl.h"],
deps = [
":grpc_mux_lib",
":grpc_stream_lib",
":new_grpc_mux_lib",
":utility_lib",
"//include/envoy/config:subscription_interface",
"//include/envoy/grpc:async_client_interface",
Expand All @@ -66,35 +66,15 @@ envoy_cc_library(
srcs = ["delta_subscription_state.cc"],
hdrs = ["delta_subscription_state.h"],
deps = [
":subscription_state_lib",
"//source/common/grpc:common_lib",
"//source/common/protobuf",
"@envoy_api//envoy/api/v2:pkg_cc_proto",
],
)

envoy_cc_library(
name = "sotw_subscription_state_lib",
srcs = ["sotw_subscription_state.cc"],
hdrs = ["sotw_subscription_state.h"],
deps = [
":subscription_state_lib",
"//source/common/grpc:common_lib",
"//source/common/protobuf",
"@envoy_api//envoy/api/v2:pkg_cc_proto",
],
)

envoy_cc_library(
name = "subscription_state_lib",
srcs = ["subscription_state.cc"],
hdrs = ["subscription_state.h"],
deps = [
":update_ack_lib",
":pausable_ack_queue_lib",
"//include/envoy/config:subscription_interface",
"//include/envoy/event:dispatcher_interface",
"//include/envoy/local_info:local_info_interface",
"//source/common/common:assert_lib",
"//source/common/common:backoff_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/common:token_bucket_impl_lib",
"//source/common/grpc:common_lib",
"//source/common/protobuf",
"@envoy_api//envoy/api/v2:pkg_cc_proto",
],
)
Expand Down Expand Up @@ -137,11 +117,54 @@ envoy_cc_library(
name = "grpc_mux_lib",
srcs = ["grpc_mux_impl.cc"],
hdrs = ["grpc_mux_impl.h"],
deps = [
":grpc_stream_lib",
":utility_lib",
"//include/envoy/config:grpc_mux_interface",
"//include/envoy/config:subscription_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/common:minimal_logger_lib",
"//source/common/protobuf",
],
)

envoy_cc_library(
name = "grpc_mux_subscription_lib",
srcs = ["grpc_mux_subscription_impl.cc"],
hdrs = ["grpc_mux_subscription_impl.h"],
deps = [
"//include/envoy/config:grpc_mux_interface",
"//include/envoy/config:subscription_interface",
"//include/envoy/event:dispatcher_interface",
"//source/common/common:assert_lib",
"//source/common/common:minimal_logger_lib",
"//source/common/grpc:common_lib",
"//source/common/protobuf",
"@envoy_api//envoy/api/v2:pkg_cc_proto",
],
)

envoy_cc_library(
name = "grpc_subscription_lib",
hdrs = ["grpc_subscription_impl.h"],
deps = [
":grpc_mux_lib",
":grpc_mux_subscription_lib",
"//include/envoy/config:subscription_interface",
"//include/envoy/event:dispatcher_interface",
"//include/envoy/grpc:async_client_interface",
"@envoy_api//envoy/api/v2/core:pkg_cc_proto",
],
)

envoy_cc_library(
name = "new_grpc_mux_lib",
srcs = ["new_grpc_mux_impl.cc"],
hdrs = ["new_grpc_mux_impl.h"],
deps = [
":delta_subscription_state_lib",
":grpc_stream_lib",
":pausable_ack_queue_lib",
":sotw_subscription_state_lib",
":watch_map_lib",
"//include/envoy/event:dispatcher_interface",
"//include/envoy/grpc:async_client_interface",
Expand Down Expand Up @@ -189,8 +212,8 @@ envoy_cc_library(
srcs = ["pausable_ack_queue.cc"],
hdrs = ["pausable_ack_queue.h"],
deps = [
":update_ack_lib",
"//source/common/common:assert_lib",
"@envoy_api//envoy/api/v2:pkg_cc_proto",
],
)

Expand Down Expand Up @@ -234,7 +257,9 @@ envoy_cc_library(
srcs = ["subscription_factory_impl.cc"],
hdrs = ["subscription_factory_impl.h"],
deps = [
":delta_subscription_lib",
":filesystem_subscription_lib",
":grpc_mux_subscription_lib",
":grpc_subscription_lib",
":http_subscription_lib",
":type_to_endpoint_lib",
Expand All @@ -259,12 +284,6 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "update_ack_lib",
hdrs = ["update_ack.h"],
deps = ["@envoy_api//envoy/api/v2:pkg_cc_proto"],
)

envoy_cc_library(
name = "utility_lib",
srcs = ["utility.cc"],
Expand Down
48 changes: 18 additions & 30 deletions source/common/config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,51 +9,39 @@ you can mostly forget the filesystem/REST/gRPC distinction, and you can
especially forget about the gRPC flavors. All of that is specified in the
bootstrap config, which is read and put into action by ClusterManagerImpl.

## If you are working on Envoy's gRPC xDS client logic itself, read on.
Note that there can be multiple active gRPC subscriptions for a single resource
type. This concept is called "resource watches". If one EDS subscription
subscribes to X and Y, and another subscribes to Y and Z, the underlying
subscription logic will maintain a subscription to the union: X Y and Z. Updates
to X will be delivered to the first object, Y to both, Z to the second. This
logic is implemented by WatchMap.

### If you are working on Envoy's gRPC xDS client logic itself, read on.

When using gRPC, xDS has two pairs of options: aggregated/non-aggregated, and
delta/state-of-the-world updates. All four combinations of these are usable.

## Aggregated (ADS) vs not (xDS)

"Aggregated" means that EDS, CDS, etc resources are all carried by the same gRPC stream.
For Envoy's implementation of xDS client logic, there is effectively no difference
between aggregated xDS and non-aggregated: they both use the same request/response protos. The
non-aggregated case is handled by running the aggregated logic, and just happening to only have 1
xDS subscription type to "aggregate", i.e., GrpcMux only has one SubscriptionState
entry in its map.
xDS subscription type to "aggregate", i.e., NewGrpcMuxImpl only has one
DeltaSubscriptionState entry in its map.

However, to the config server, there is a huge difference: when using ADS (caused
by the user providing an ads_config in the bootstrap config), the gRPC client sets
its method string to {Delta,Stream}AggregatedResources, as opposed to {Delta,Stream}Clusters,
{Delta,Stream}Routes, etc. So, despite using the same request/response protos,
and having identical client code, they're actually different gRPC services.

## Delta vs state-of-the-world (SotW)

Delta vs state-of-the-world is a question of wire format and protocol behavior.
The protos in question are named [Delta]Discovery{Request,Response}. GrpcMux can work
with either pair. Almost all GrpcMux logic is in the shared GrpcMuxImpl base class;
SotwGrpcMux and DeltaGrpcMux exist to be adapters for the specific protobuf types, since
protobufs are not amenable to polymorphism.

All delta/SotW specific logic is handled by GrpcMux and SubscriptionState. GrpcSubscriptionImpl
simply holds a shared_ptr to a GrpcMux interface; it has no need to know about delta vs SotW.
Delta vs state-of-the-world is a question of wire format: the protos in question are named
[Delta]Discovery{Request,Response}. That is what the GrpcMux interface is useful for: its
NewGrpcMuxImpl (TODO may be renamed) implementation works with DeltaDiscovery{Request,Response} and has
delta-specific logic; its GrpxMuxImpl implementation (TODO will be merged into NewGrpcMuxImpl) works with Discovery{Request,Response}
and has SotW-specific logic. Both the delta and SotW Subscription implementations (TODO will be merged) hold a shared_ptr<GrpcMux>.
The shared_ptr allows for both non- and aggregated: if non-aggregated, you'll be the only holder of that shared_ptr.

![xDS_code_diagram](xDS_code_diagram.png)

The orange flow does not necessarily have to happen in response to the blue flow; there can be
spontaneous updates. ACKs are not shown in this diagram; they are also carried by the
[Delta]DiscoveryRequest protos.

What does GrpcMux even do in this diagram? Just own things and pass through function calls?
Answer: 1) it sequences the requests and ACKs that the various type_urls send, 2) it handles the
protobuf vs polymorphism impedance mismatch, allowing all delta-vs-SotW-agnostic code
to be reused.

Note that there can be multiple active gRPC subscriptions for a single resource
type. This concept is called "resource watches". If one EDS subscription
subscribes to X and Y, and another subscribes to Y and Z, the underlying
subscription logic will maintain a subscription to the union: X Y and Z. Updates
to X will be delivered to the first object, Y to both, Z to the second. This
logic is implemented by WatchMap.
Note that the orange flow does not necessarily have to happen in response to the blue flow; there can be spontaneous updates. ACKs are not shown in this diagram; they are also carred by the [Delta]DiscoveryRequest protos.
What does GrpcXdsContext even do in this diagram? Just own things and pass through function calls? Answer: it sequences the requests and ACKs that the various type_urls send.
Loading

0 comments on commit c3f1125

Please sign in to comment.