Skip to content

Commit

Permalink
[server][cache] Ensure subscription is properly updated for sotw and …
Browse files Browse the repository at this point in the history
…delta (#9)

Currently sotw and delta watches use different interfaces to track the stream state
An assumption used to be that sotw watches are stateless and can be processed from the request itself. This was already altered in envoyproxy#508, but it only covers the usecases partially (e.g. it does not cover subscribing/unsubscribing to wildcard).
The current object passed to track the subscription provides multiple issues:
 - it is not an interface, restricting evolution or testing for users building a separate cache
 - it tracks some objects per types, other globally, based on sotw or delta, making it confusing and hard to use in a consistent way

This commit replaces the legacy streamState by the notion of Subscription per type. StreamState is not relevant to the cache, as it only requires to know the subscription status (e.g. the last nonce is not needed), while a stream can multiplex multiple types.
Further work will come to:
 - fill the subscription with the same attributes in sotw and delta, allowing to take proper decisions on what to reply
 - Fix multiple issues within the cache in sotw
 - Fix handling of explicit vs. legacy wildcard in sotw and delta

Now that the `Subscription` has been standardized to be uniform for `sotw` and `delta`, as well as always for the given request type, this PR ensures all the fields are properly populated in all cases
A follow-up PR will rely on those informations to address current issues in the cache implementations related to resource tracking

Signed-off-by: Valerian Roche <[email protected]>
  • Loading branch information
valerian-roche committed Feb 16, 2024
1 parent 91017dc commit 8135ddb
Show file tree
Hide file tree
Showing 25 changed files with 1,019 additions and 623 deletions.
33 changes: 30 additions & 3 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"google.golang.org/protobuf/types/known/durationpb"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
Expand All @@ -37,6 +36,34 @@ type Request = discovery.DiscoveryRequest
// DeltaRequest is an alias for the delta discovery request type.
type DeltaRequest = discovery.DeltaDiscoveryRequest

// Subscription stores the server view of the client state for a given resource type.
// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources).
// Though the methods may return mutable parts of the state for performance reasons,
// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation.
type Subscription interface {
// ReturnedResources returns a list of resources that were sent to the client and their associated versions.
// The versions are:
// - delta protocol: version of the specific resource set in the response.
// - sotw protocol: version of the global response when the resource was last sent.
ReturnedResources() map[string]string

// SubscribedResources returns the list of resources currently subscribed to by the client for the type.
// For delta it keeps track of subscription updates across requests
// For sotw it is a normalized view of the last request resources
SubscribedResources() map[string]struct{}

// IsWildcard returns whether the client has a wildcard watch.
// This considers subtleties related to the current migration of wildcard definitions within the protocol.
// More details on the behavior of wildcard are present at https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return
IsWildcard() bool

// WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription.
// It is currently only applicable to delta-xds.
// If the request is wildcard, it will always return true,
// otherwise it will compare the provided resources to the list of resources currently subscribed
WatchesResources(resourceNames map[string]struct{}) bool
}

// ConfigWatcher requests watches for configuration resources by a node, last
// applied version identifier, and resource names hint. The watch should send
// the responses when they are ready. The watch can be canceled by the
Expand All @@ -54,7 +81,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateWatch(*Request, stream.StreamState, chan Response) (cancel func())
CreateWatch(*Request, Subscription, chan Response) (cancel func(), err error)

// CreateDeltaWatch returns a new open incremental xDS watch.
// This is the entrypoint to propagate configuration changes the
Expand All @@ -66,7 +93,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func())
CreateDeltaWatch(*DeltaRequest, Subscription, chan DeltaResponse) (cancel func(), err error)
}

// ConfigFetcher fetches configuration resources from cache
Expand Down
17 changes: 8 additions & 9 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

// groups together resource-related arguments for the createDeltaResponse function
Expand All @@ -28,16 +27,16 @@ type resourceContainer struct {
systemVersion string
}

func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse {
func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscription, resources resourceContainer) *RawDeltaResponse {
// variables to build our response with
var nextVersionMap map[string]string
var filtered []types.Resource
var toRemove []string

// If we are handling a wildcard request, we want to respond with all resources
switch {
case state.IsWildcard():
if len(state.GetResourceVersions()) == 0 {
case sub.IsWildcard():
if len(sub.ReturnedResources()) == 0 {
filtered = make([]types.Resource, 0, len(resources.resourceMap))
}
nextVersionMap = make(map[string]string, len(resources.resourceMap))
Expand All @@ -46,25 +45,25 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
// we can just set it here to be used for comparison later
version := resources.versionMap[name]
nextVersionMap[name] = version
prevVersion, found := state.GetResourceVersions()[name]
prevVersion, found := sub.ReturnedResources()[name]
if !found || (prevVersion != version) {
filtered = append(filtered, r)
}
}

// Compute resources for removal
// The resource version can be set to "" here to trigger a removal even if never returned before
for name := range state.GetResourceVersions() {
for name := range sub.ReturnedResources() {
if _, ok := resources.resourceMap[name]; !ok {
toRemove = append(toRemove, name)
}
}
default:
nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames()))
nextVersionMap = make(map[string]string, len(sub.SubscribedResources()))
// state.GetResourceVersions() may include resources no longer subscribed
// In the current code this gets silently cleaned when updating the version map
for name := range state.GetSubscribedResourceNames() {
prevVersion, found := state.GetResourceVersions()[name]
for name := range sub.SubscribedResources() {
prevVersion, found := sub.ReturnedResources()[name]
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
Expand Down
88 changes: 51 additions & 37 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/test/resource/v3"
)

func assertResourceMapEqual(t *testing.T, want map[string]types.Resource, got map[string]types.Resource) {
func assertResourceMapEqual(t *testing.T, want, got map[string]types.Resource) {
t.Helper()

if !cmp.Equal(want, got, protocmp.Transform()) {
Expand All @@ -32,32 +32,34 @@ func assertResourceMapEqual(t *testing.T, want map[string]types.Resource, got ma
func TestSnapshotCacheDeltaWatch(t *testing.T) {
c := cache.NewSnapshotCache(false, group{}, logger{t: t})
watches := make(map[string]chan cache.DeltaResponse)
subscriptions := make(map[string]stream.Subscription)

// Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
subscriptions[typ] = stream.NewDeltaSubscription(nil, nil, nil)
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.NewStreamState(true, nil), watches[typ])
TypeUrl: typ,
}, subscriptions[typ], watches[typ])
require.NoError(t, err)
}

if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
t.Fatal(err)
}

versionMap := make(map[string]map[string]string)
for _, typ := range testTypes {
t.Run(typ, func(t *testing.T) {
select {
case out := <-watches[typ]:
snapshot := fixture.snapshot()
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
vMap := out.GetNextVersionMap()
versionMap[typ] = vMap
sub := subscriptions[typ]
sub.SetReturnedResources(out.GetNextVersionMap())
subscriptions[typ] = sub
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
}
Expand All @@ -68,17 +70,23 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
// all resources as well as individual resource removals
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(false, versionMap[typ])
for resource := range versionMap[typ] {
state.GetSubscribedResourceNames()[resource] = struct{}{}
}
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
req := &discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, state, watches[typ])
TypeUrl: typ,
// We must set a nonce to avoid the specific behavior
// of returning immediately on wildcard and first request
ResponseNonce: "nonce",
}
sub := subscriptions[typ]
if len(names[typ]) > 0 {
sub.UpdateResourceSubscriptions(names[typ], []string{"*"})
req.ResourceNamesSubscribe = names[typ]
req.ResourceNamesUnsubscribe = []string{"*"}
}
_, err := c.CreateDeltaWatch(req, sub, watches[typ])
require.NoError(t, err)
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
Expand All @@ -101,8 +109,9 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
snapshot2 := fixture.snapshot()
snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{resource.MakeEndpoint(clusterName, 9090)})
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType))
vMap := out.GetNextVersionMap()
versionMap[testTypes[0]] = vMap
sub := subscriptions[testTypes[0]]
sub.SetReturnedResources(out.GetNextVersionMap())
subscriptions[testTypes[0]] = sub
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
}
Expand All @@ -111,20 +120,21 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
func TestDeltaRemoveResources(t *testing.T) {
c := cache.NewSnapshotCache(false, group{}, logger{t: t})
watches := make(map[string]chan cache.DeltaResponse)
streams := make(map[string]*stream.StreamState)
subscriptions := make(map[string]*stream.Subscription)

for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(true, make(map[string]string))
streams[typ] = &state
sub := stream.NewDeltaSubscription(nil, nil, nil)
subscriptions[typ] = &sub
// We don't specify any resource name subscriptions here because we want to make sure we test wildcard
// functionality. This means we should receive all resources back without requesting a subscription by name.
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
}, *streams[typ], watches[typ])
}, *subscriptions[typ], watches[typ])
require.NoError(t, err)
}

if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
Expand All @@ -138,7 +148,7 @@ func TestDeltaRemoveResources(t *testing.T) {
snapshot := fixture.snapshot()
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
nextVersionMap := out.GetNextVersionMap()
streams[typ].SetResourceVersions(nextVersionMap)
subscriptions[typ].SetReturnedResources(nextVersionMap)
case <-time.After(time.Second):
t.Fatal("failed to receive a snapshot response")
}
Expand All @@ -149,12 +159,14 @@ func TestDeltaRemoveResources(t *testing.T) {
// test the removal of certain resources from a partial snapshot
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
}, *streams[typ], watches[typ])
TypeUrl: typ,
ResponseNonce: "nonce",
}, *subscriptions[typ], watches[typ])
require.NoError(t, err)
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
Expand All @@ -177,7 +189,7 @@ func TestDeltaRemoveResources(t *testing.T) {
nextVersionMap := out.GetNextVersionMap()

// make sure the version maps are different since we no longer are tracking any endpoint resources
if reflect.DeepEqual(streams[testTypes[0]].GetResourceVersions(), nextVersionMap) {
if reflect.DeepEqual(subscriptions[testTypes[0]].ReturnedResources(), nextVersionMap) {
t.Fatalf("versionMap for the endpoint resource type did not change, received: %v, instead of an empty map", nextVersionMap)
}
case <-time.After(time.Second):
Expand All @@ -204,14 +216,15 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
t.Fatalf("snapshot failed: %s", err)
}
} else {
cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: id,
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: []string{clusterName},
}, stream.NewStreamState(false, make(map[string]string)), responses)
}, stream.NewDeltaSubscription([]string{clusterName}, nil, nil), responses)

require.NoError(t, err)
defer cancel()
}
})
Expand All @@ -226,21 +239,21 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {

// Create a non-buffered channel that will block sends.
watchCh := make(chan cache.DeltaResponse)
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
sub := stream.NewDeltaSubscription(names[rsrc.EndpointType], nil, nil)
_, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: names[rsrc.EndpointType],
}, state, watchCh)
}, sub, watchCh)
require.NoError(t, err)

// The first time we set the snapshot without consuming from the blocking channel, so this should time out.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()

err := c.SetSnapshot(ctx, key, fixture.snapshot())
err = c.SetSnapshot(ctx, key, fixture.snapshot())
require.EqualError(t, err, context.Canceled.Error())

// Now reset the snapshot with a consuming channel. This verifies that if setting the snapshot fails,
Expand Down Expand Up @@ -270,13 +283,14 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
for _, typ := range testTypes {
responses := make(chan cache.DeltaResponse, 1)
cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.NewStreamState(false, make(map[string]string)), responses)
}, stream.NewDeltaSubscription(names[typ], nil, nil), responses)
require.NoError(t, err)

// Cancel the watch
cancel()
Expand Down
Loading

0 comments on commit 8135ddb

Please sign in to comment.