From b01bf5788f543ee50b656f9e98b149475a4090d2 Mon Sep 17 00:00:00 2001 From: Valerian Roche Date: Tue, 16 Jan 2024 17:35:49 -0500 Subject: [PATCH] [server][cache] Ensure subscription is properly updated for sotw and delta (#9) Currently sotw and delta watches use different interfaces to track the stream state An assumption used to be that sotw watches are stateless and can be processed from the request itself. This was already altered in https://github.com/envoyproxy/go-control-plane/pull/508, but it only covers the usecases partially (e.g. it does not cover subscribing/unsubscribing to wildcard). The current object passed to track the subscription provides multiple issues: - it is not an interface, restricting evolution or testing for users building a separate cache - it tracks some objects per types, other globally, based on sotw or delta, making it confusing and hard to use in a consistent way This commit replaces the legacy streamState by the notion of Subscription per type. StreamState is not relevant to the cache, as it only requires to know the subscription status (e.g. the last nonce is not needed), while a stream can multiplex multiple types. Further work will come to: - fill the subscription with the same attributes in sotw and delta, allowing to take proper decisions on what to reply - Fix multiple issues within the cache in sotw - Fix handling of explicit vs. legacy wildcard in sotw and delta Now that the `Subscription` has been standardized to be uniform for `sotw` and `delta`, as well as always for the given request type, this PR ensures all the fields are properly populated in all cases A follow-up PR will rely on those informations to address current issues in the cache implementations related to resource tracking Signed-off-by: Valerian Roche --- pkg/cache/v3/cache.go | 33 +- pkg/cache/v3/delta.go | 17 +- pkg/cache/v3/delta_test.go | 88 ++--- pkg/cache/v3/linear.go | 35 +- pkg/cache/v3/linear_test.go | 371 ++++++++++++++-------- pkg/cache/v3/mux.go | 17 +- pkg/cache/v3/resource.go | 7 +- pkg/cache/v3/simple.go | 42 +-- pkg/cache/v3/simple_test.go | 139 +++++--- pkg/cache/v3/snapshot.go | 1 - pkg/cache/v3/status.go | 5 +- pkg/integration/ttl_integration_test.go | 4 +- pkg/log/test.go | 4 + pkg/server/config/config.go | 5 + pkg/server/delta/v3/server.go | 72 ++--- pkg/server/delta/v3/watches.go | 2 +- pkg/server/sotw/v3/ads.go | 104 +++--- pkg/server/sotw/v3/server.go | 52 +-- pkg/server/sotw/v3/watches.go | 6 +- pkg/server/sotw/v3/xds.go | 114 ++++--- pkg/server/stream/v3/stream.go | 116 ------- pkg/server/stream/v3/subscription.go | 176 ++++++++++ pkg/server/stream/v3/subscription_test.go | 159 ++++++++++ pkg/server/v3/delta_test.go | 23 +- pkg/server/v3/server_test.go | 50 +-- 25 files changed, 1019 insertions(+), 623 deletions(-) create mode 100644 pkg/server/stream/v3/subscription.go create mode 100644 pkg/server/stream/v3/subscription_test.go diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index 5ad8e24140..16d464d444 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -26,7 +26,6 @@ import ( "google.golang.org/protobuf/types/known/durationpb" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" ) @@ -37,6 +36,34 @@ type Request = discovery.DiscoveryRequest // DeltaRequest is an alias for the delta discovery request type. type DeltaRequest = discovery.DeltaDiscoveryRequest +// Subscription stores the server view of the client state for a given resource type. +// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources). +// Though the methods may return mutable parts of the state for performance reasons, +// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation. +type Subscription interface { + // ReturnedResources returns a list of resources that were sent to the client and their associated versions. + // The versions are: + // - delta protocol: version of the specific resource set in the response. + // - sotw protocol: version of the global response when the resource was last sent. + ReturnedResources() map[string]string + + // SubscribedResources returns the list of resources currently subscribed to by the client for the type. + // For delta it keeps track of subscription updates across requests + // For sotw it is a normalized view of the last request resources + SubscribedResources() map[string]struct{} + + // IsWildcard returns whether the client has a wildcard watch. + // This considers subtleties related to the current migration of wildcard definitions within the protocol. + // More details on the behavior of wildcard are present at https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return + IsWildcard() bool + + // WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription. + // It is currently only applicable to delta-xds. + // If the request is wildcard, it will always return true, + // otherwise it will compare the provided resources to the list of resources currently subscribed + WatchesResources(resourceNames map[string]struct{}) bool +} + // ConfigWatcher requests watches for configuration resources by a node, last // applied version identifier, and resource names hint. The watch should send // the responses when they are ready. The watch can be canceled by the @@ -54,7 +81,7 @@ type ConfigWatcher interface { // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateWatch(*Request, stream.StreamState, chan Response) (cancel func()) + CreateWatch(*Request, Subscription, chan Response) (cancel func(), err error) // CreateDeltaWatch returns a new open incremental xDS watch. // This is the entrypoint to propagate configuration changes the @@ -66,7 +93,7 @@ type ConfigWatcher interface { // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func()) + CreateDeltaWatch(*DeltaRequest, Subscription, chan DeltaResponse) (cancel func(), err error) } // ConfigFetcher fetches configuration resources from cache diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index deaeeb7ed1..93d572337e 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -18,7 +18,6 @@ import ( "context" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // groups together resource-related arguments for the createDeltaResponse function @@ -28,7 +27,7 @@ type resourceContainer struct { systemVersion string } -func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse { +func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscription, resources resourceContainer) *RawDeltaResponse { // variables to build our response with var nextVersionMap map[string]string var filtered []types.Resource @@ -36,8 +35,8 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // If we are handling a wildcard request, we want to respond with all resources switch { - case state.IsWildcard(): - if len(state.GetResourceVersions()) == 0 { + case sub.IsWildcard(): + if len(sub.ReturnedResources()) == 0 { filtered = make([]types.Resource, 0, len(resources.resourceMap)) } nextVersionMap = make(map[string]string, len(resources.resourceMap)) @@ -46,7 +45,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // we can just set it here to be used for comparison later version := resources.versionMap[name] nextVersionMap[name] = version - prevVersion, found := state.GetResourceVersions()[name] + prevVersion, found := sub.ReturnedResources()[name] if !found || (prevVersion != version) { filtered = append(filtered, r) } @@ -54,17 +53,17 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // Compute resources for removal // The resource version can be set to "" here to trigger a removal even if never returned before - for name := range state.GetResourceVersions() { + for name := range sub.ReturnedResources() { if _, ok := resources.resourceMap[name]; !ok { toRemove = append(toRemove, name) } } default: - nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) + nextVersionMap = make(map[string]string, len(sub.SubscribedResources())) // state.GetResourceVersions() may include resources no longer subscribed // In the current code this gets silently cleaned when updating the version map - for name := range state.GetSubscribedResourceNames() { - prevVersion, found := state.GetResourceVersions()[name] + for name := range sub.SubscribedResources() { + prevVersion, found := sub.ReturnedResources()[name] if r, ok := resources.resourceMap[name]; ok { nextVersion := resources.versionMap[name] if prevVersion != nextVersion { diff --git a/pkg/cache/v3/delta_test.go b/pkg/cache/v3/delta_test.go index fc4ee91327..9877330c88 100644 --- a/pkg/cache/v3/delta_test.go +++ b/pkg/cache/v3/delta_test.go @@ -21,7 +21,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) -func assertResourceMapEqual(t *testing.T, want map[string]types.Resource, got map[string]types.Resource) { +func assertResourceMapEqual(t *testing.T, want, got map[string]types.Resource) { t.Helper() if !cmp.Equal(want, got, protocmp.Transform()) { @@ -32,32 +32,34 @@ func assertResourceMapEqual(t *testing.T, want map[string]types.Resource, got ma func TestSnapshotCacheDeltaWatch(t *testing.T) { c := cache.NewSnapshotCache(false, group{}, logger{t: t}) watches := make(map[string]chan cache.DeltaResponse) + subscriptions := make(map[string]stream.Subscription) // Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) - c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ + subscriptions[typ] = stream.NewDeltaSubscription(nil, nil, nil) + _, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: "node", }, - TypeUrl: typ, - ResourceNamesSubscribe: names[typ], - }, stream.NewStreamState(true, nil), watches[typ]) + TypeUrl: typ, + }, subscriptions[typ], watches[typ]) + require.NoError(t, err) } if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { t.Fatal(err) } - versionMap := make(map[string]map[string]string) for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { select { case out := <-watches[typ]: snapshot := fixture.snapshot() assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ)) - vMap := out.GetNextVersionMap() - versionMap[typ] = vMap + sub := subscriptions[typ] + sub.SetReturnedResources(out.GetNextVersionMap()) + subscriptions[typ] = sub case <-time.After(time.Second): t.Fatal("failed to receive snapshot response") } @@ -68,17 +70,23 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { // all resources as well as individual resource removals for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) - state := stream.NewStreamState(false, versionMap[typ]) - for resource := range versionMap[typ] { - state.GetSubscribedResourceNames()[resource] = struct{}{} - } - c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ + req := &discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: "node", }, - TypeUrl: typ, - ResourceNamesSubscribe: names[typ], - }, state, watches[typ]) + TypeUrl: typ, + // We must set a nonce to avoid the specific behavior + // of returning immediately on wildcard and first request + ResponseNonce: "nonce", + } + sub := subscriptions[typ] + if len(names[typ]) > 0 { + sub.UpdateResourceSubscriptions(names[typ], []string{"*"}) + req.ResourceNamesSubscribe = names[typ] + req.ResourceNamesUnsubscribe = []string{"*"} + } + _, err := c.CreateDeltaWatch(req, sub, watches[typ]) + require.NoError(t, err) } if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) { @@ -101,8 +109,9 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { snapshot2 := fixture.snapshot() snapshot2.Resources[types.Endpoint] = cache.NewResources(fixture.version2, []types.Resource{resource.MakeEndpoint(clusterName, 9090)}) assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot2.GetResources(rsrc.EndpointType)) - vMap := out.GetNextVersionMap() - versionMap[testTypes[0]] = vMap + sub := subscriptions[testTypes[0]] + sub.SetReturnedResources(out.GetNextVersionMap()) + subscriptions[testTypes[0]] = sub case <-time.After(time.Second): t.Fatal("failed to receive snapshot response") } @@ -111,20 +120,21 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { func TestDeltaRemoveResources(t *testing.T) { c := cache.NewSnapshotCache(false, group{}, logger{t: t}) watches := make(map[string]chan cache.DeltaResponse) - streams := make(map[string]*stream.StreamState) + subscriptions := make(map[string]*stream.Subscription) for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) - state := stream.NewStreamState(true, make(map[string]string)) - streams[typ] = &state + sub := stream.NewDeltaSubscription(nil, nil, nil) + subscriptions[typ] = &sub // We don't specify any resource name subscriptions here because we want to make sure we test wildcard // functionality. This means we should receive all resources back without requesting a subscription by name. - c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ + _, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: "node", }, TypeUrl: typ, - }, *streams[typ], watches[typ]) + }, *subscriptions[typ], watches[typ]) + require.NoError(t, err) } if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { @@ -138,7 +148,7 @@ func TestDeltaRemoveResources(t *testing.T) { snapshot := fixture.snapshot() assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ)) nextVersionMap := out.GetNextVersionMap() - streams[typ].SetResourceVersions(nextVersionMap) + subscriptions[typ].SetReturnedResources(nextVersionMap) case <-time.After(time.Second): t.Fatal("failed to receive a snapshot response") } @@ -149,12 +159,14 @@ func TestDeltaRemoveResources(t *testing.T) { // test the removal of certain resources from a partial snapshot for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) - c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ + _, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: "node", }, - TypeUrl: typ, - }, *streams[typ], watches[typ]) + TypeUrl: typ, + ResponseNonce: "nonce", + }, *subscriptions[typ], watches[typ]) + require.NoError(t, err) } if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) { @@ -177,7 +189,7 @@ func TestDeltaRemoveResources(t *testing.T) { nextVersionMap := out.GetNextVersionMap() // make sure the version maps are different since we no longer are tracking any endpoint resources - if reflect.DeepEqual(streams[testTypes[0]].GetResourceVersions(), nextVersionMap) { + if reflect.DeepEqual(subscriptions[testTypes[0]].ReturnedResources(), nextVersionMap) { t.Fatalf("versionMap for the endpoint resource type did not change, received: %v, instead of an empty map", nextVersionMap) } case <-time.After(time.Second): @@ -204,14 +216,15 @@ func TestConcurrentSetDeltaWatch(t *testing.T) { t.Fatalf("snapshot failed: %s", err) } } else { - cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ + cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: id, }, TypeUrl: rsrc.EndpointType, ResourceNamesSubscribe: []string{clusterName}, - }, stream.NewStreamState(false, make(map[string]string)), responses) + }, stream.NewDeltaSubscription([]string{clusterName}, nil, nil), responses) + require.NoError(t, err) defer cancel() } }) @@ -226,21 +239,21 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) { // Create a non-buffered channel that will block sends. watchCh := make(chan cache.DeltaResponse) - state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}}) - c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ + sub := stream.NewDeltaSubscription(names[rsrc.EndpointType], nil, nil) + _, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: key, }, TypeUrl: rsrc.EndpointType, ResourceNamesSubscribe: names[rsrc.EndpointType], - }, state, watchCh) + }, sub, watchCh) + require.NoError(t, err) // The first time we set the snapshot without consuming from the blocking channel, so this should time out. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - err := c.SetSnapshot(ctx, key, fixture.snapshot()) + err = c.SetSnapshot(ctx, key, fixture.snapshot()) require.EqualError(t, err, context.Canceled.Error()) // Now reset the snapshot with a consuming channel. This verifies that if setting the snapshot fails, @@ -270,13 +283,14 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) for _, typ := range testTypes { responses := make(chan cache.DeltaResponse, 1) - cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ + cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: key, }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, stream.NewStreamState(false, make(map[string]string)), responses) + }, stream.NewDeltaSubscription(names[typ], nil, nil), responses) + require.NoError(t, err) // Cancel the watch cancel() diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index cd8f6bf2bd..6d9d3793ce 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -17,6 +17,7 @@ package cache import ( "context" "errors" + "fmt" "strconv" "strings" "sync" @@ -24,7 +25,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/log" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) type watches = map[ResponseWatch]struct{} @@ -178,11 +178,11 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { } for id, watch := range cache.deltaWatches { - if !watch.StreamState.WatchesResources(modified) { + if !watch.subscription.WatchesResources(modified) { continue } - res := cache.respondDelta(watch.Request, watch.Response, watch.StreamState) + res := cache.respondDelta(watch.Request, watch.Response, watch.subscription) if res != nil { delete(cache.deltaWatches, id) } @@ -190,8 +190,8 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { } } -func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) *RawDeltaResponse { - resp := createDeltaResponse(context.Background(), request, state, resourceContainer{ +func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, sub Subscription) *RawDeltaResponse { + resp := createDeltaResponse(context.Background(), request, sub, resourceContainer{ resourceMap: cache.resources, versionMap: cache.versionMap, systemVersion: cache.getVersion(), @@ -201,7 +201,7 @@ func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaRe if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 { if cache.log != nil { cache.log.Debugf("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", - request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard()) + request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, sub.IsWildcard()) } value <- resp return resp @@ -312,10 +312,9 @@ func (cache *LinearCache) GetResources() map[string]types.Resource { return resources } -func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, value chan Response) func() { +func (cache *LinearCache) CreateWatch(request *Request, _ Subscription, value chan Response) (func(), error) { if request.GetTypeUrl() != cache.typeURL { - value <- nil - return nil + return nil, fmt.Errorf("request type %s does not match cache type %s", request.GetTypeUrl(), cache.typeURL) } // If the version is not up to date, check whether any requested resource has // been updated between the last version and the current version. This avoids the problem @@ -361,7 +360,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va } if stale { cache.respond(watch, staleResources) - return nil + return nil, nil } // Create open watches since versions are up to date. if len(request.GetResourceNames()) == 0 { @@ -371,7 +370,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va cache.mu.Lock() defer cache.mu.Unlock() delete(cache.watchAll, watch) - } + }, nil } cache.log.Infof("[linear cache] open watch for %s resources %v, system version %q", cache.typeURL, request.ResourceNames, cache.getVersion()) @@ -387,7 +386,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va cache.mu.Lock() defer cache.mu.Unlock() cache.removeWatch(watch) - } + }, nil } // Must be called under lock @@ -403,7 +402,7 @@ func (cache *LinearCache) removeWatch(watch ResponseWatch) { } } -func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) { cache.mu.Lock() defer cache.mu.Unlock() @@ -420,7 +419,7 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S cache.log.Errorf("failed to update version map: %v", err) } } - response := cache.respondDelta(request, value, state) + response := cache.respondDelta(request, value, sub) // if respondDelta returns nil this means that there is no change in any resource version // create a new watch accordingly @@ -428,15 +427,15 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S watchID := cache.nextDeltaWatchID() if cache.log != nil { cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID, - cache.typeURL, state.GetSubscribedResourceNames(), cache.getVersion()) + cache.typeURL, sub.SubscribedResources(), cache.getVersion()) } - cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state} + cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, subscription: sub} - return cache.cancelDeltaWatch(watchID) + return cache.cancelDeltaWatch(watchID), nil } - return nil + return nil, nil } func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error { diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 32d2bedd02..f1c8a25613 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -42,7 +42,7 @@ func testResource(s string) types.Resource { return wrapperspb.String(s) } -func verifyResponseContent(t *testing.T, ch <-chan Response, expectedType string, expectedVersion string) (Response, *discovery.DiscoveryResponse) { +func verifyResponseContent(t *testing.T, ch <-chan Response, expectedType, expectedVersion string) (Response, *discovery.DiscoveryResponse) { t.Helper() var r Response select { @@ -88,7 +88,7 @@ func verifyResponse(t *testing.T, ch <-chan Response, expectedVersion string, ex } } -func verifyResponseResources(t *testing.T, ch <-chan Response, expectedType string, expectedVersion string, expectedResources ...string) { +func verifyResponseResources(t *testing.T, ch <-chan Response, expectedType, expectedVersion string, expectedResources ...string) { t.Helper() r, _ := verifyResponseContent(t, ch, expectedType, expectedVersion) if r == nil { @@ -229,96 +229,123 @@ func hashResource(t *testing.T, resource types.Resource) string { return v } -func createWildcardDeltaWatch(c *LinearCache, w chan DeltaResponse) { - state := stream.NewStreamState(true, nil) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) +func createWildcardDeltaWatch(t *testing.T, c *LinearCache, w chan DeltaResponse) { + t.Helper() + sub := stream.NewDeltaSubscription(nil, nil, nil) + _, err := c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub, w) + require.NoError(t, err) resp := <-w - state.SetResourceVersions(resp.GetNextVersionMap()) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) // Ensure the watch is set properly with cache values + sub.SetReturnedResources(resp.GetNextVersionMap()) + _, err = c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub, w) // Ensure the watch is set properly with cache values + require.NoError(t, err) +} + +func subFromRequest(req *Request) stream.Subscription { + return stream.NewSotwSubscription(req.GetResourceNames()) +} + +func subFromDeltaRequest(req *DeltaRequest) stream.Subscription { + return stream.NewDeltaSubscription(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe(), req.GetInitialResourceVersions()) } func TestLinearInitialResources(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, streamState, w) + req := &Request{ResourceNames: []string{"a"}, TypeUrl: testType} + sub := subFromRequest(req) + _, err := c.CreateWatch(req, sub, w) + require.NoError(t, err) verifyResponse(t, w, "0", 1) - c.CreateWatch(&Request{TypeUrl: testType}, streamState, w) + + req = &Request{TypeUrl: testType} + sub = subFromRequest(req) + _, err = c.CreateWatch(req, sub, w) + require.NoError(t, err) verifyResponse(t, w, "0", 2) checkVersionMapNotSet(t, c) } func TestLinearCornerCases(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType) err := c.UpdateResource("a", nil) if err == nil { t.Error("expected error on nil resource") } // create an incorrect type URL request + req := &Request{TypeUrl: "test"} w := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: "test"}, streamState, w) - select { - case r := <-w: - if r != nil { - t.Error("response should be nil") - } - default: - t.Error("should receive nil response") - } + sub := subFromRequest(req) + _, err = c.CreateWatch(req, sub, w) + require.Error(t, err, "cache should have rejected the watch") } func TestLinearBasic(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType) // Create watches before a resource is ready w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + req1 := &Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"} + sub1 := subFromRequest(req1) + _, err := c.CreateWatch(req1, sub1, w1) + require.NoError(t, err) mustBlock(t, w1) checkVersionMapNotSet(t, c) - w := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) - mustBlock(t, w) + w2 := make(chan Response, 1) + req2 := &Request{TypeUrl: testType, VersionInfo: "0"} + sub2 := subFromRequest(req2) + _, err = c.CreateWatch(req2, sub2, w2) + require.NoError(t, err) + mustBlock(t, w2) checkWatchCount(t, c, "a", 2) checkWatchCount(t, c, "b", 1) require.NoError(t, c.UpdateResource("a", testResource("a"))) checkWatchCount(t, c, "a", 0) checkWatchCount(t, c, "b", 0) verifyResponse(t, w1, "1", 1) - verifyResponse(t, w, "1", 1) + verifyResponse(t, w2, "1", 1) // Request again, should get same response - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + _, err = c.CreateWatch(req1, sub1, w1) + require.NoError(t, err) checkWatchCount(t, c, "a", 0) - verifyResponse(t, w, "1", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + verifyResponse(t, w1, "1", 1) + _, err = c.CreateWatch(req2, sub2, w2) + require.NoError(t, err) checkWatchCount(t, c, "a", 0) - verifyResponse(t, w, "1", 1) + verifyResponse(t, w2, "1", 1) // Add another element and update the first, response should be different require.NoError(t, c.UpdateResource("b", testResource("b"))) require.NoError(t, c.UpdateResource("a", testResource("aa"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) - verifyResponse(t, w, "3", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) - verifyResponse(t, w, "3", 2) + _, err = c.CreateWatch(req1, sub1, w1) + require.NoError(t, err) + verifyResponse(t, w1, "3", 1) + _, err = c.CreateWatch(req2, sub2, w2) + require.NoError(t, err) + verifyResponse(t, w2, "3", 2) // Ensure the version map was not created as we only ever used stow watches checkVersionMapNotSet(t, c) } func TestLinearSetResources(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType) // Create new resources w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + req1 := &Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"} + sub1 := subFromRequest(req1) + _, err := c.CreateWatch(req1, sub1, w1) + require.NoError(t, err) mustBlock(t, w1) + w2 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w2) + req2 := &Request{TypeUrl: testType, VersionInfo: "0"} + sub2 := subFromRequest(req2) + _, err = c.CreateWatch(req2, sub2, w2) + require.NoError(t, err) mustBlock(t, w2) + c.SetResources(map[string]types.Resource{ "a": testResource("a"), "b": testResource("b"), @@ -327,10 +354,15 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "1", 2) // the version was only incremented once for all resources // Add another element and update the first, response should be different - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w1) + req1.VersionInfo = "1" + _, err = c.CreateWatch(req1, sub1, w1) + require.NoError(t, err) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w2) + req2.VersionInfo = "1" + _, err = c.CreateWatch(req2, sub2, w2) + require.NoError(t, err) mustBlock(t, w2) + c.SetResources(map[string]types.Resource{ "a": testResource("aa"), "b": testResource("b"), @@ -340,10 +372,15 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "2", 3) // Delete resource - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, streamState, w1) + req1.VersionInfo = "2" + _, err = c.CreateWatch(req1, sub1, w1) + require.NoError(t, err) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, streamState, w2) + req2.VersionInfo = "2" + _, err = c.CreateWatch(req2, sub2, w2) + require.NoError(t, err) mustBlock(t, w2) + c.SetResources(map[string]types.Resource{ "b": testResource("b"), "c": testResource("c"), @@ -370,72 +407,103 @@ func TestLinearGetResources(t *testing.T) { } func TestLinearVersionPrefix(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithVersionPrefix("instance1-")) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + req := &Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"} + sub := subFromRequest(req) + _, err := c.CreateWatch(req, sub, w) + require.NoError(t, err) verifyResponse(t, w, "instance1-0", 0) require.NoError(t, c.UpdateResource("a", testResource("a"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + + req.VersionInfo = "instance1-0" + _, err = c.CreateWatch(req, sub, w) + require.NoError(t, err) verifyResponse(t, w, "instance1-1", 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, streamState, w) + req.VersionInfo = "instance1-1" + _, err = c.CreateWatch(req, sub, w) + require.NoError(t, err) mustBlock(t, w) checkWatchCount(t, c, "a", 1) } func TestLinearDeletion(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + req := &Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"} + sub := subFromRequest(req) + _, err := c.CreateWatch(req, sub, w) + require.NoError(t, err) mustBlock(t, w) checkWatchCount(t, c, "a", 1) + require.NoError(t, c.DeleteResource("a")) verifyResponse(t, w, "1", 0) checkWatchCount(t, c, "a", 0) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + + req = &Request{TypeUrl: testType, VersionInfo: "0"} + sub = subFromRequest(req) + _, err = c.CreateWatch(req, sub, w) + require.NoError(t, err) verifyResponse(t, w, "1", 1) checkWatchCount(t, c, "b", 0) require.NoError(t, c.DeleteResource("b")) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w) + + req.VersionInfo = "1" + _, err = c.CreateWatch(req, sub, w) + require.NoError(t, err) verifyResponse(t, w, "2", 0) checkWatchCount(t, c, "b", 0) } func TestLinearWatchTwo(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) - w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) - mustBlock(t, w) + w1 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + req1 := &Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"} + sub1 := subFromRequest(req1) + _, err := c.CreateWatch(req1, sub1, w1) + require.NoError(t, err) mustBlock(t, w1) + + w2 := make(chan Response, 1) + req2 := &Request{TypeUrl: testType, VersionInfo: "0"} + sub2 := subFromRequest(req2) + _, err = c.CreateWatch(req2, sub2, w2) + require.NoError(t, err) + mustBlock(t, w2) + require.NoError(t, c.UpdateResource("a", testResource("aa"))) // should only get the modified resource - verifyResponse(t, w, "1", 1) - verifyResponse(t, w1, "1", 2) + verifyResponse(t, w1, "1", 1) + verifyResponse(t, w2, "1", 2) } func TestLinearCancel(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType) require.NoError(t, c.UpdateResource("a", testResource("a"))) // cancel watch-all - w := make(chan Response, 1) - cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w) - mustBlock(t, w) + w1 := make(chan Response, 1) + req1 := &Request{TypeUrl: testType, VersionInfo: "1"} + sub1 := subFromRequest(req1) + cancel, err := c.CreateWatch(req1, sub1, w1) + require.NoError(t, err) + mustBlock(t, w1) checkWatchCount(t, c, "a", 1) cancel() checkWatchCount(t, c, "a", 0) // cancel watch for "a" - cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w) - mustBlock(t, w) + sub1.SetResourceSubscription([]string{"a"}) + req1.ResourceNames = []string{"a"} + req1.VersionInfo = "1" + cancel, err = c.CreateWatch(req1, sub1, w1) + require.NoError(t, err) + mustBlock(t, w1) checkWatchCount(t, c, "a", 1) cancel() checkWatchCount(t, c, "a", 0) @@ -444,11 +512,19 @@ func TestLinearCancel(t *testing.T) { w2 := make(chan Response, 1) w3 := make(chan Response, 1) w4 := make(chan Response, 1) - cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w) - cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w2) - cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w3) - cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w4) - mustBlock(t, w) + cancel, err = c.CreateWatch(req1, sub1, w1) + require.NoError(t, err) + + req2 := &Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"} + cancel2, err := c.CreateWatch(req2, subFromRequest(req2), w2) + require.NoError(t, err) + req3 := &Request{TypeUrl: testType, VersionInfo: "1"} + cancel3, err := c.CreateWatch(req3, subFromRequest(req3), w3) + require.NoError(t, err) + req4 := &Request{TypeUrl: testType, VersionInfo: "1"} + cancel4, err := c.CreateWatch(req4, subFromRequest(req4), w4) + require.NoError(t, err) + mustBlock(t, w1) mustBlock(t, w2) mustBlock(t, w3) mustBlock(t, w4) @@ -469,7 +545,6 @@ func TestLinearCancel(t *testing.T) { // TODO(mattklein123): This test requires GOMAXPROCS or -parallel >= 100. This should be // rewritten to not require that. This is not the case in the GH actions environment. func TestLinearConcurrentSetWatch(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType) n := 50 for i := 0; i < 2*n; i++ { @@ -484,12 +559,14 @@ func TestLinearConcurrentSetWatch(t *testing.T) { id2 := fmt.Sprintf("%d", i-1) t.Logf("request resources %q and %q", id, id2) value := make(chan Response, 1) - c.CreateWatch(&Request{ + req := &Request{ // Only expect one to become stale ResourceNames: []string{id, id2}, VersionInfo: "0", TypeUrl: testType, - }, streamState, value) + } + _, err := c.CreateWatch(req, subFromRequest(req), value) + require.NoError(t, err) // wait until all updates apply verifyResponse(t, value, "", 1) } @@ -500,19 +577,22 @@ func TestLinearConcurrentSetWatch(t *testing.T) { func TestLinearDeltaWildcard(t *testing.T) { c := NewLinearCache(testType) - state1 := stream.NewStreamState(true, map[string]string{}) + req1 := &DeltaRequest{TypeUrl: testType} w1 := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state1, w1) + _, err := c.CreateDeltaWatch(req1, subFromDeltaRequest(req1), w1) + require.NoError(t, err) mustBlockDelta(t, w1) - state2 := stream.NewStreamState(true, map[string]string{}) + + req2 := &DeltaRequest{TypeUrl: testType} w2 := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state2, w2) + _, err = c.CreateDeltaWatch(req2, subFromDeltaRequest(req2), w2) + require.NoError(t, err) mustBlockDelta(t, w1) checkDeltaWatchCount(t, c, 2) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hash := hashResource(t, a) - err := c.UpdateResource("a", a) + err = c.UpdateResource("a", a) require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w1, []resourceInfo{{"a", hash}}, nil) @@ -530,17 +610,18 @@ func TestLinearDeltaExistingResources(t *testing.T) { err = c.UpdateResource("b", b) require.NoError(t, err) - state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"b": {}, "c": {}}) // watching b and c - not interested in a + // watching b and c - not interested in a + req := &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"b", "c"}} w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) + require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, []string{}) - state = stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + req = &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}} w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) + require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) } @@ -556,17 +637,17 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { err = c.UpdateResource("b", b) require.NoError(t, err) - state := stream.NewStreamState(false, map[string]string{"b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + req := &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"b": hashB}} w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) + require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) // b is up to date and shouldn't be returned - state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + req = &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"a": hashA, "b": hashB}} w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) + require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{{Priority: 10}}} // new version of b @@ -590,18 +671,18 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { // There is currently no delta watch checkVersionMapNotSet(t, c) - state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + req := &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}} w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) + require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) checkVersionMapSet(t, c) - state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + req = &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"a": hashA, "b": hashB}} w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) + require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -626,17 +707,17 @@ func TestLinearDeltaResourceDelete(t *testing.T) { err = c.UpdateResource("b", b) require.NoError(t, err) - state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + req := &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}} w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) + require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) - state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + req = &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"a": hashA, "b": hashB}} w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) + require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -651,14 +732,15 @@ func TestLinearDeltaResourceDelete(t *testing.T) { func TestLinearDeltaMultiResourceUpdates(t *testing.T) { c := NewLinearCache(testType) - state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) checkVersionMapNotSet(t, c) assert.Equal(t, 0, c.NumResources()) // Initial update - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + req := &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}} + sub := subFromDeltaRequest(req) + _, err := c.CreateDeltaWatch(req, subFromDeltaRequest(req), w) + require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) // The version map should now be created, even if empty @@ -667,16 +749,19 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { hashA := hashResource(t, a) b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} hashB := hashResource(t, b) - err := c.UpdateResources(map[string]types.Resource{"a": a, "b": b}, nil) + err = c.UpdateResources(map[string]types.Resource{"a": a, "b": b}, nil) require.NoError(t, err) resp := <-w validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil) checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetResourceVersions(resp.GetNextVersionMap()) + + sub.SetReturnedResources(resp.GetNextVersionMap()) // Multiple updates - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + req.ResourceNamesSubscribe = nil // No change in subscription + _, err = c.CreateDeltaWatch(req, sub, w) + require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update @@ -693,10 +778,11 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil) checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetResourceVersions(resp.GetNextVersionMap()) + sub.SetReturnedResources(resp.GetNextVersionMap()) // Update/add/delete - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(req, sub, w) + require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update @@ -712,10 +798,11 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}}, []string{"b"}) checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetResourceVersions(resp.GetNextVersionMap()) + sub.SetReturnedResources(resp.GetNextVersionMap()) // Re-add previously deleted watched resource - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + _, err = c.CreateDeltaWatch(req, sub, w) + require.NoError(t, err) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{}} // recreate watched resource @@ -728,10 +815,10 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"b", hashB}}, nil) // d is not watched and should not be returned checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetResourceVersions(resp.GetNextVersionMap()) + sub.SetReturnedResources(resp.GetNextVersionMap()) // Wildcard create/update - createWildcardDeltaWatch(c, w) + createWildcardDeltaWatch(t, c, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update @@ -747,7 +834,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { assert.Equal(t, 3, c.NumResources()) // Wildcard update/delete - createWildcardDeltaWatch(c, w) + createWildcardDeltaWatch(t, c, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update @@ -776,9 +863,11 @@ func TestLinearMixedWatches(t *testing.T) { require.NoError(t, err) assert.Equal(t, 2, c.NumResources()) - sotwState := stream.NewStreamState(false, nil) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w) + sotwReq := &Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()} + sotwSub := subFromRequest(sotwReq) + _, err = c.CreateWatch(sotwReq, subFromRequest(sotwReq), w) + require.NoError(t, err) mustBlock(t, w) checkVersionMapNotSet(t, c) @@ -792,16 +881,18 @@ func TestLinearMixedWatches(t *testing.T) { verifyResponse(t, w, c.getVersion(), 1) checkVersionMapNotSet(t, c) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w) + sotwReq.VersionInfo = c.getVersion() + _, err = c.CreateWatch(sotwReq, sotwSub, w) + require.NoError(t, err) mustBlock(t, w) checkVersionMapNotSet(t, c) - deltaState := stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - deltaState.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + deltaReq := &DeltaRequest{TypeUrl: testType, ResourceNamesSubscribe: []string{"a", "b"}, InitialResourceVersions: map[string]string{"a": hashA, "b": hashB}} wd := make(chan DeltaResponse, 1) // Initial update - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, deltaState, wd) + _, err = c.CreateDeltaWatch(deltaReq, subFromDeltaRequest(deltaReq), wd) + require.NoError(t, err) mustBlockDelta(t, wd) checkDeltaWatchCount(t, c, 1) checkVersionMapSet(t, c) @@ -829,9 +920,11 @@ func TestLinearSotwWatches(t *testing.T) { // An update is done for the three objects in a row // If the watches are no properly purged, all three updates will send responses in the channel, but only the first one is tracked // The buffer will therefore saturate and the third request will deadlock the entire cache as occurring under the mutex - sotwState := stream.NewStreamState(false, nil) w := make(chan Response, 1) - _ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "c"}, TypeUrl: testType, VersionInfo: cache.getVersion()}, sotwState, w) + sotwReq := &Request{ResourceNames: []string{"a", "b", "c"}, TypeUrl: testType, VersionInfo: cache.getVersion()} + sotwSub := subFromRequest(sotwReq) + _, err = cache.CreateWatch(sotwReq, sotwSub, w) + require.NoError(t, err) mustBlock(t, w) checkVersionMapNotSet(t, cache) @@ -854,7 +947,10 @@ func TestLinearSotwWatches(t *testing.T) { // c no longer watched w = make(chan Response, 1) - _ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: cache.getVersion()}, sotwState, w) + sotwReq.ResourceNames = []string{"a", "b"} + sotwReq.VersionInfo = cache.getVersion() + sotwSub.SetResourceSubscription([]string{"a", "b"}) + _, err = cache.CreateWatch(sotwReq, sotwSub, w) require.NoError(t, err) mustBlock(t, w) checkVersionMapNotSet(t, cache) @@ -873,7 +969,10 @@ func TestLinearSotwWatches(t *testing.T) { checkVersionMapNotSet(t, cache) w = make(chan Response, 1) - _ = cache.CreateWatch(&Request{ResourceNames: []string{"c"}, TypeUrl: testType, VersionInfo: cache.getVersion()}, sotwState, w) + sotwReq.ResourceNames = []string{"c"} + sotwReq.VersionInfo = cache.getVersion() + sotwSub.SetResourceSubscription([]string{"c"}) + _, err = cache.CreateWatch(sotwReq, sotwSub, w) require.NoError(t, err) mustBlock(t, w) checkVersionMapNotSet(t, cache) @@ -904,28 +1003,34 @@ func TestLinearSotwWatches(t *testing.T) { assert.Equal(t, 3, cache.NumResources()) // Non-wildcard request - nonWildcardState := stream.NewStreamState(false, nil) + nonWildcardReq := &Request{ResourceNames: []string{"a", "b", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()} + nonWildcardSub := subFromRequest(nonWildcardReq) w1 := make(chan Response, 1) - _ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, nonWildcardState, w1) + _, err := cache.CreateWatch(nonWildcardReq, nonWildcardSub, w1) + require.NoError(t, err) mustBlock(t, w1) checkVersionMapNotSet(t, cache) // wildcard request - wildcardState := stream.NewStreamState(true, nil) + wildcardReq := &Request{ResourceNames: nil, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()} + wildcardSub := subFromRequest(wildcardReq) w2 := make(chan Response, 1) - _ = cache.CreateWatch(&Request{ResourceNames: nil, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, wildcardState, w2) + _, err = cache.CreateWatch(wildcardReq, wildcardSub, w2) + require.NoError(t, err) mustBlock(t, w2) checkVersionMapNotSet(t, cache) // request not requesting b - otherState := stream.NewStreamState(false, nil) + otherReq := &Request{ResourceNames: []string{"a", "c", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()} + otherSub := subFromRequest(otherReq) w3 := make(chan Response, 1) - _ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "c", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, otherState, w3) + _, err = cache.CreateWatch(otherReq, otherSub, w3) + require.NoError(t, err) mustBlock(t, w3) checkVersionMapNotSet(t, cache) b.AltStatName = "othername" - err := cache.UpdateResources(map[string]types.Resource{"b": b}, nil) + err = cache.UpdateResources(map[string]types.Resource{"b": b}, nil) require.NoError(t, err) // Other watch has not triggered @@ -936,10 +1041,14 @@ func TestLinearSotwWatches(t *testing.T) { // Recreate the watches w1 = make(chan Response, 1) - _ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, nonWildcardState, w1) + nonWildcardReq.VersionInfo = cache.getVersion() + _, err = cache.CreateWatch(nonWildcardReq, nonWildcardSub, w1) + require.NoError(t, err) mustBlock(t, w1) w2 = make(chan Response, 1) - _ = cache.CreateWatch(&Request{ResourceNames: nil, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, wildcardState, w2) + wildcardReq.VersionInfo = cache.getVersion() + _, err = cache.CreateWatch(wildcardReq, wildcardSub, w2) + require.NoError(t, err) mustBlock(t, w2) // Update d, new resource in the cache diff --git a/pkg/cache/v3/mux.go b/pkg/cache/v3/mux.go index 9fdfb090d6..2a7a84a49d 100644 --- a/pkg/cache/v3/mux.go +++ b/pkg/cache/v3/mux.go @@ -17,8 +17,7 @@ package cache import ( "context" "errors" - - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" + "fmt" ) // MuxCache multiplexes across several caches using a classification function. @@ -37,24 +36,22 @@ type MuxCache struct { var _ Cache = &MuxCache{} -func (mux *MuxCache) CreateWatch(request *Request, state stream.StreamState, value chan Response) func() { +func (mux *MuxCache) CreateWatch(request *Request, sub Subscription, value chan Response) (func(), error) { key := mux.Classify(request) cache, exists := mux.Caches[key] if !exists { - value <- nil - return nil + return nil, fmt.Errorf("no cache defined for key %s", key) } - return cache.CreateWatch(request, state, value) + return cache.CreateWatch(request, sub, value) } -func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) { key := mux.ClassifyDelta(request) cache, exists := mux.Caches[key] if !exists { - value <- nil - return nil + return nil, fmt.Errorf("no cache defined for key %s", key) } - return cache.CreateDeltaWatch(request, state, value) + return cache.CreateDeltaWatch(request, sub, value) } func (mux *MuxCache) Fetch(context.Context, *Request) (Response, error) { diff --git a/pkg/cache/v3/resource.go b/pkg/cache/v3/resource.go index e1156c4af6..1645d8c49a 100644 --- a/pkg/cache/v3/resource.go +++ b/pkg/cache/v3/resource.go @@ -102,8 +102,9 @@ func GetResourceName(res types.Resource) string { func ResourceRequiresFullStateInSotw(typeURL resource.Type) bool { // From https://www.envoyproxy.io/docs/envoy/v1.28.0/api-docs/xds_protocol#grouping-resources-into-responses, // when using sotw the control-plane MUST return all requested resources (or simply all if wildcard) - // for some types. This is relied on by xds-grpc which is explicitly requesting clusters but expect - // to receive all existing resources + // for some types. This is relied on by xds-grpc which is explicitly requesting clusters and listeners + // but expects to receive all existing resources for those types. Missing clusters or listeners are + // considered deleted. switch typeURL { case resource.ClusterType: return true @@ -183,7 +184,7 @@ func getResourceReferences(resources map[string]types.ResourceWithTTL, out map[r } } -func mapMerge(dst map[string]bool, src map[string]bool) { +func mapMerge(dst, src map[string]bool) { for k, v := range src { dst[k] = v } diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index ebf63f5b6f..773c81b9e7 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -23,7 +23,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/log" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // ResourceSnapshot is an abstract snapshot of a collection of resources that @@ -225,6 +224,7 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh cache.mu.Lock() defer cache.mu.Unlock() + cache.log.Debugf("setting snapshot for node %s", node) // update the existing entry cache.snapshots[node] = snapshot @@ -310,7 +310,7 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu snapshot, watch.Request, watch.Response, - watch.StreamState, + watch.subscription, ) if err != nil { return err @@ -328,7 +328,7 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu snapshot, watch.Request, watch.Response, - watch.StreamState, + watch.subscription, ) if err != nil { return err @@ -385,7 +385,7 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL) // CreateWatch returns a watch for an xDS request. A nil function may be // returned if an error occurs. -func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() { +func (cache *snapshotCache) CreateWatch(request *Request, sub Subscription, value chan Response) (func(), error) { nodeID := cache.hash.ID(request.GetNode()) cache.mu.Lock() @@ -409,7 +409,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str } if exists { - knownResourceNames := streamState.GetKnownResourceNames(request.GetTypeUrl()) + knownResourceNames := sub.ReturnedResources() diff := []string{} for _, r := range request.GetResourceNames() { if _, ok := knownResourceNames[r]; !ok { @@ -427,9 +427,9 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil { cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.GetTypeUrl(), request.GetResourceNames(), nodeID, err) - return nil + return nil, fmt.Errorf("failed to send the response: %w", err) } - return func() {} + return func() {}, nil } } } @@ -442,7 +442,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str info.mu.Lock() info.watches[watchID] = ResponseWatch{Request: request, Response: value} info.mu.Unlock() - return cache.cancelWatch(nodeID, watchID) + return cache.cancelWatch(nodeID, watchID), nil } // otherwise, the watch may be responded immediately @@ -450,10 +450,10 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil { cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.GetTypeUrl(), request.GetResourceNames(), nodeID, err) - return nil + return nil, fmt.Errorf("failed to send the response: %w", err) } - return func() {} + return func() {}, nil } func (cache *snapshotCache) nextWatchID() int64 { @@ -525,7 +525,7 @@ func createResponse(ctx context.Context, request *Request, resources map[string] } // CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache. -func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) { nodeID := cache.hash.ID(request.GetNode()) t := request.GetTypeUrl() @@ -554,7 +554,7 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream if err != nil { cache.log.Errorf("failed to compute version for snapshot resources inline: %s", err) } - response, err := cache.respondDelta(context.Background(), snapshot, request, value, state) + response, err := cache.respondDelta(context.Background(), snapshot, request, value, sub) if err != nil { cache.log.Errorf("failed to respond with delta response: %s", err) } @@ -566,21 +566,21 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream watchID := cache.nextDeltaWatchID() if exists { - cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, state.GetSubscribedResourceNames(), nodeID, snapshot.GetVersion(t)) + cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, sub.SubscribedResources(), nodeID, snapshot.GetVersion(t)) } else { - cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, state.GetSubscribedResourceNames(), nodeID) + cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, sub.SubscribedResources(), nodeID) } - info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state}) - return cache.cancelDeltaWatch(nodeID, watchID) + info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, subscription: sub}) + return cache.cancelDeltaWatch(nodeID, watchID), nil } - return nil + return nil, nil } // Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. -func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) (*RawDeltaResponse, error) { - resp := createDeltaResponse(ctx, request, state, resourceContainer{ +func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, sub Subscription) (*RawDeltaResponse, error) { + resp := createDeltaResponse(ctx, request, sub, resourceContainer{ resourceMap: snapshot.GetResources(request.GetTypeUrl()), versionMap: snapshot.GetVersionMap(request.GetTypeUrl()), systemVersion: snapshot.GetVersion(request.GetTypeUrl()), @@ -589,10 +589,10 @@ func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceS // Only send a response if there were changes // We want to respond immediately for the first wildcard request in a stream, even if the response is empty // otherwise, envoy won't complete initialization - if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsWildcard() && state.IsFirst()) { + if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (sub.IsWildcard() && request.ResponseNonce == "") { if cache.log != nil { cache.log.Debugf("node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", - request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard()) + request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, sub.IsWildcard()) } select { case value <- resp: diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index eba4cf96d9..8bfb698127 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -91,10 +91,25 @@ type logger struct { t *testing.T } -func (log logger) Debugf(format string, args ...interface{}) { log.t.Logf(format, args...) } -func (log logger) Infof(format string, args ...interface{}) { log.t.Logf(format, args...) } -func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format, args...) } -func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) } +func (log logger) Debugf(format string, args ...interface{}) { + log.t.Helper() + log.t.Logf(format, args...) +} + +func (log logger) Infof(format string, args ...interface{}) { + log.t.Helper() + log.t.Logf(format, args...) +} + +func (log logger) Warnf(format string, args ...interface{}) { + log.t.Helper() + log.t.Logf(format, args...) +} + +func (log logger) Errorf(format string, args ...interface{}) { + log.t.Helper() + log.t.Logf(format, args...) +} func TestSnapshotCacheWithTTL(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -119,13 +134,15 @@ func TestSnapshotCacheWithTTL(t *testing.T) { wg := sync.WaitGroup{} // All the resources should respond immediately when version is not up to date. - streamState := stream.NewStreamState(false, map[string]string{}) + subs := map[string]stream.Subscription{} for _, typ := range testTypes { + sub := stream.NewSotwSubscription(names[typ]) wg.Add(1) t.Run(typ, func(t *testing.T) { defer wg.Done() value := make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value) + _, err = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, sub, value) + require.NoError(t, err) select { case out := <-value: if gotVersion, _ := out.GetVersion(); gotVersion != fixture.version { @@ -134,8 +151,14 @@ func TestSnapshotCacheWithTTL(t *testing.T) { if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTTL(typ)) { t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTTL.GetResourcesAndTTL(typ)) } - // Update streamState - streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().GetResourceNames()) + + returnedResources := make(map[string]string) + // Update sub to track what was returned + for _, resource := range out.GetRequest().GetResourceNames() { + returnedResources[resource] = fixture.version + } + sub.SetReturnedResources(returnedResources) + subs[typ] = sub case <-time.After(2 * time.Second): t.Errorf("failed to receive snapshot response") } @@ -153,9 +176,11 @@ func TestSnapshotCacheWithTTL(t *testing.T) { end := time.After(5 * time.Second) for { + sub := subs[typ] value := make(chan cache.Response, 1) - cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, - streamState, value) + cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, + sub, value) + require.NoError(t, err) select { case out := <-value: @@ -172,7 +197,13 @@ func TestSnapshotCacheWithTTL(t *testing.T) { updatesByType[typ]++ - streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().GetResourceNames()) + returnedResources := make(map[string]string) + // Update sub to track what was returned + for _, resource := range out.GetRequest().GetResourceNames() { + returnedResources[resource] = fixture.version + } + sub.SetReturnedResources(returnedResources) + subs[typ] = sub case <-end: cancel() return @@ -214,9 +245,10 @@ func TestSnapshotCache(t *testing.T) { // try to get endpoints with incorrect list of names // should not receive response value := make(chan cache.Response, 1) - streamState := stream.NewStreamState(false, map[string]string{}) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}, - streamState, value) + sub := stream.NewSotwSubscription([]string{"none"}) + _, err = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}, + sub, value) + require.NoError(t, err) select { case out := <-value: t.Errorf("watch for endpoints and mismatched names => got %v, want none", out) @@ -226,9 +258,10 @@ func TestSnapshotCache(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { value := make(chan cache.Response, 1) - streamState := stream.NewStreamState(false, map[string]string{}) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, - streamState, value) + sub := stream.NewSotwSubscription(names[typ]) + _, err = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, + sub, value) + require.NoError(t, err) select { case out := <-value: snapshot := fixture.snapshot() @@ -279,10 +312,13 @@ func TestSnapshotCacheFetch(t *testing.T) { func TestSnapshotCacheWatch(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) watches := make(map[string]chan cache.Response) - streamState := stream.NewStreamState(false, map[string]string{}) + subs := map[string]stream.Subscription{} for _, typ := range testTypes { + sub := stream.NewSotwSubscription(names[typ]) + subs[typ] = sub watches[typ] = make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, watches[typ]) + _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, sub, watches[typ]) + require.NoError(t, err) } if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { t.Fatal(err) @@ -298,7 +334,14 @@ func TestSnapshotCacheWatch(t *testing.T) { if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTTL(typ)) { t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTTL(typ)) } - streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().GetResourceNames()) + returnedResources := make(map[string]string) + // Update sub to track what was returned + for _, resource := range out.GetRequest().GetResourceNames() { + returnedResources[resource] = fixture.version + } + sub := subs[typ] + sub.SetReturnedResources(returnedResources) + subs[typ] = sub case <-time.After(time.Second): t.Fatal("failed to receive snapshot response") } @@ -308,8 +351,9 @@ func TestSnapshotCacheWatch(t *testing.T) { // open new watches with the latest version for _, typ := range testTypes { watches[typ] = make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, - streamState, watches[typ]) + _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, + subs[typ], watches[typ]) + require.NoError(t, err) } if count := c.GetStatusInfo(key).GetNumWatches(); count != len(testTypes) { t.Errorf("watches should be created for the latest version: %d", count) @@ -354,11 +398,12 @@ func TestConcurrentSetWatch(t *testing.T) { t.Fatalf("failed to set snapshot %q: %s", id, err) } } else { - streamState := stream.NewStreamState(false, map[string]string{}) - cancel := c.CreateWatch(&discovery.DiscoveryRequest{ + sub := stream.NewSotwSubscription(nil) + cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{ Node: &core.Node{Id: id}, TypeUrl: rsrc.EndpointType, - }, streamState, value) + }, sub, value) + require.NoError(t, err) defer cancel() } }) @@ -367,10 +412,11 @@ func TestConcurrentSetWatch(t *testing.T) { func TestSnapshotCacheWatchCancel(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) - streamState := stream.NewStreamState(false, map[string]string{}) for _, typ := range testTypes { + sub := stream.NewSotwSubscription(names[typ]) value := make(chan cache.Response, 1) - cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value) + cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, sub, value) + require.NoError(t, err) cancel() } // should be status info for the node @@ -394,15 +440,16 @@ func TestSnapshotCacheWatchTimeout(t *testing.T) { // Create a non-buffered channel that will block sends. watchCh := make(chan cache.Response) - streamState := stream.NewStreamState(false, map[string]string{}) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: names[rsrc.EndpointType]}, - streamState, watchCh) + sub := stream.NewSotwSubscription(names[rsrc.EndpointType]) + _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: names[rsrc.EndpointType]}, + sub, watchCh) + require.NoError(t, err) // The first time we set the snapshot without consuming from the blocking channel, so this should time out. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() - err := c.SetSnapshot(ctx, key, fixture.snapshot()) + err = c.SetSnapshot(ctx, key, fixture.snapshot()) require.EqualError(t, err, context.Canceled.Error()) // Now reset the snapshot with a consuming channel. This verifies that if setting the snapshot fails, @@ -450,8 +497,9 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Request resource with name=ClusterName go func() { - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}}, - stream.NewStreamState(false, map[string]string{}), watch) + _, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}}, + stream.NewSotwSubscription([]string{clusterName}), watch) + require.NoError(t, err) }() select { @@ -469,12 +517,13 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Request additional resource with name=clusterName2 for same version go func() { - state := stream.NewStreamState(false, map[string]string{}) - state.SetKnownResourceNames(rsrc.EndpointType, map[string]struct{}{clusterName: {}}) - c.CreateWatch(&discovery.DiscoveryRequest{ + sub := stream.NewSotwSubscription([]string{clusterName, clusterName2}) + sub.SetReturnedResources(map[string]string{clusterName: fixture.version}) + _, err := c.CreateWatch(&discovery.DiscoveryRequest{ TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, ResourceNames: []string{clusterName, clusterName2}, - }, state, watch) + }, sub, watch) + require.NoError(t, err) }() select { @@ -490,12 +539,14 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { } // Repeat request for with same version and make sure a watch is created - state := stream.NewStreamState(false, map[string]string{}) - state.SetKnownResourceNames(rsrc.EndpointType, map[string]struct{}{clusterName: {}, clusterName2: {}}) - if cancel := c.CreateWatch(&discovery.DiscoveryRequest{ + sub := stream.NewSotwSubscription([]string{clusterName, clusterName2}) + sub.SetReturnedResources(map[string]string{clusterName: fixture.version, clusterName2: fixture.version}) + cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{ TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, ResourceNames: []string{clusterName, clusterName2}, - }, state, watch); cancel == nil { + }, sub, watch) + require.NoError(t, err) + if cancel == nil { t.Fatal("Should create a watch") } else { cancel() @@ -624,9 +675,11 @@ func TestAvertPanicForWatchOnNonExistentSnapshot(t *testing.T) { ResourceNames: []string{"rtds"}, TypeUrl: rsrc.RuntimeType, } - ss := stream.NewStreamState(false, map[string]string{"cluster": "abcdef"}) + ss := stream.NewSotwSubscription([]string{"rtds"}) + ss.SetReturnedResources(map[string]string{"cluster": "abcdef"}) responder := make(chan cache.Response) - c.CreateWatch(req, ss, responder) + _, err := c.CreateWatch(req, ss, responder) + require.NoError(t, err) go func() { // Wait for at least one heartbeat to occur, then set snapshot. diff --git a/pkg/cache/v3/snapshot.go b/pkg/cache/v3/snapshot.go index ca7c9b5af8..8770d6916a 100644 --- a/pkg/cache/v3/snapshot.go +++ b/pkg/cache/v3/snapshot.go @@ -94,7 +94,6 @@ func (s *Snapshot) Consistent() error { } for idx, items := range s.Resources { - // We only want to check resource types that are expected to be referenced by another resource type. // Basically, if the consistency relationship is modeled as a DAG, we only want // to check nodes that are expected to have edges pointing to it. diff --git a/pkg/cache/v3/status.go b/pkg/cache/v3/status.go index dca93e02ff..18465f6dc5 100644 --- a/pkg/cache/v3/status.go +++ b/pkg/cache/v3/status.go @@ -20,7 +20,6 @@ import ( "time" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // NodeHash computes string identifiers for Envoy nodes. @@ -101,8 +100,8 @@ type DeltaResponseWatch struct { // Response is the channel to push the delta responses to Response chan DeltaResponse - // VersionMap for the stream - StreamState stream.StreamState + // Subscription stores the current client subscription state. + subscription Subscription } // newStatusInfo initializes a status info data structure. diff --git a/pkg/integration/ttl_integration_test.go b/pkg/integration/ttl_integration_test.go index b2bdb318c5..6278e62896 100644 --- a/pkg/integration/ttl_integration_test.go +++ b/pkg/integration/ttl_integration_test.go @@ -19,7 +19,9 @@ import ( endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/log" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" ) @@ -37,7 +39,7 @@ func TestTTLResponse(t *testing.T) { defer cancel() snapshotCache := cache.NewSnapshotCacheWithHeartbeating(ctx, false, cache.IDHash{}, logger{t: t}, time.Second) - server := server.NewServer(ctx, snapshotCache, nil) + server := server.NewServer(ctx, snapshotCache, nil, sotw.WithLogger(log.NewTestLogger(t))) grpcServer := grpc.NewServer() endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, server) diff --git a/pkg/log/test.go b/pkg/log/test.go index 214dc9eb69..cbae60272c 100644 --- a/pkg/log/test.go +++ b/pkg/log/test.go @@ -14,20 +14,24 @@ func NewTestLogger(t testing.TB) Logger { // Debugf logs a message at level debug on the test logger. func (l testLogger) Debugf(msg string, args ...interface{}) { + l.t.Helper() l.t.Logf("[debug] "+msg, args...) } // Infof logs a message at level info on the test logger. func (l testLogger) Infof(msg string, args ...interface{}) { + l.t.Helper() l.t.Logf("[info] "+msg, args...) } // Warnf logs a message at level warn on the test logger. func (l testLogger) Warnf(msg string, args ...interface{}) { + l.t.Helper() l.t.Logf("[warn] "+msg, args...) } // Errorf logs a message at level error on the test logger. func (l testLogger) Errorf(msg string, args ...interface{}) { + l.t.Helper() l.t.Logf("[error] "+msg, args...) } diff --git a/pkg/server/config/config.go b/pkg/server/config/config.go index b746acfab9..8dd31a2923 100644 --- a/pkg/server/config/config.go +++ b/pkg/server/config/config.go @@ -1,15 +1,20 @@ package config +import "github.com/envoyproxy/go-control-plane/pkg/log" + // Opts for individual xDS implementations that can be // utilized through the functional opts pattern. type Opts struct { // If true respond to ADS requests with a guaranteed resource ordering Ordered bool + + Logger log.Logger } func NewOpts() Opts { return Opts{ Ordered: false, + Logger: log.NewDefaultLogger(), } } diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index b570b19b27..35255d8f7e 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -12,6 +12,7 @@ import ( core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/log" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/config" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" @@ -49,6 +50,13 @@ type server struct { opts config.Opts } +// WithLogger configures the server logger. Defaults to no logging +func WithLogger(logger log.Logger) config.XDSOption { + return func(o *config.Opts) { + o.Logger = logger + } +} + // NewServer creates a delta xDS specific server which utilizes a ConfigWatcher and delta Callbacks. func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server { s := &server{ @@ -118,7 +126,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De watch := watches.deltaWatches[typ] watch.nonce = nonce - watch.state.SetResourceVersions(resp.GetNextVersionMap()) + watch.subscription.SetReturnedResources(resp.GetNextVersionMap()) watches.deltaWatches[typ] = watch return nil } @@ -204,21 +212,28 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De // cancel existing watch to (re-)request a newer version watch, ok := watches.deltaWatches[typeURL] if !ok { - // Initialize the state of the stream. - // Since there was no previous state, we know we're handling the first request of this type + // Initialize the state of the type subscription. + // Since there was no previous subscription, we know we're handling the first request of this type // so we set the initial resource versions if we have any. - // We also set the stream as wildcard based on its legacy meaning (no resource name sent in resource_names_subscribe). - // If the state starts with this legacy mode, adding new resources will not unsubscribe from wildcard. + // We also set the subscription as wildcard based on its legacy meaning (no resource name sent in resource_names_subscribe). + // If the subscription starts with this legacy mode, adding new resources will not unsubscribe from wildcard. // It can still be done by explicitly unsubscribing from "*" - watch.state = stream.NewStreamState(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions()) + watch.subscription = stream.NewDeltaSubscription(req.GetResourceNamesSubscribe(), req.GetResourceNamesUnsubscribe(), req.GetInitialResourceVersions()) } else { watch.Cancel() - } - s.subscribe(req.GetResourceNamesSubscribe(), &watch.state) - s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state) + // Update subscription with the new requests + watch.subscription.UpdateResourceSubscriptions( + req.GetResourceNamesSubscribe(), + req.GetResourceNamesUnsubscribe(), + ) + } - watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watches.deltaMuxedResponses) + var err error + watch.cancel, err = s.cache.CreateDeltaWatch(req, watch.subscription, watches.deltaMuxedResponses) + if err != nil { + return err + } watches.deltaWatches[typeURL] = watch } } @@ -249,40 +264,3 @@ func (s *server) DeltaStreamHandler(str stream.DeltaStream, typeURL string) erro return s.processDelta(str, reqCh, typeURL) } - -// When we subscribe, we just want to make the cache know we are subscribing to a resource. -// Even if the stream is wildcard, we keep the list of explicitly subscribed resources as the wildcard subscription can be discarded later on. -func (s *server) subscribe(resources []string, streamState *stream.StreamState) { - sv := streamState.GetSubscribedResourceNames() - for _, resource := range resources { - if resource == "*" { - streamState.SetWildcard(true) - continue - } - sv[resource] = struct{}{} - } -} - -// Unsubscriptions remove resources from the stream's subscribed resource list. -// If a client explicitly unsubscribes from a wildcard request, the stream is updated and now watches only subscribed resources. -func (s *server) unsubscribe(resources []string, streamState *stream.StreamState) { - sv := streamState.GetSubscribedResourceNames() - for _, resource := range resources { - if resource == "*" { - streamState.SetWildcard(false) - continue - } - if _, ok := sv[resource]; ok && streamState.IsWildcard() { - // The XDS protocol states that: - // * if a watch is currently wildcard - // * a resource is explicitly unsubscribed by name - // Then the control-plane must return in the response whether the resource is removed (if no longer present for this node) - // or still existing. In the latter case the entire resource must be returned, same as if it had been created or updated - // To achieve that, we mark the resource as having been returned with an empty version. While creating the response, the cache will either: - // * detect the version change, and return the resource (as an update) - // * detect the resource deletion, and set it as removed in the response - streamState.GetResourceVersions()[resource] = "" - } - delete(sv, resource) - } -} diff --git a/pkg/server/delta/v3/watches.go b/pkg/server/delta/v3/watches.go index 63c4c2d38d..98e4991b49 100644 --- a/pkg/server/delta/v3/watches.go +++ b/pkg/server/delta/v3/watches.go @@ -41,7 +41,7 @@ type watch struct { cancel func() nonce string - state stream.StreamState + subscription stream.Subscription } // Cancel calls terminate and cancel diff --git a/pkg/server/sotw/v3/ads.go b/pkg/server/sotw/v3/ads.go index bbb6dd4b20..23992a6d8f 100644 --- a/pkg/server/sotw/v3/ads.go +++ b/pkg/server/sotw/v3/ads.go @@ -7,29 +7,13 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // process handles a bi-di stream request -func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRequest, defaultTypeURL string) error { - // We make a responder channel here so we can multiplex responses from the dynamic channels. - sw.watches.addWatch(resource.AnyType, &watch{ - // Create a buffered channel the size of the known resource types. - response: make(chan cache.Response, types.UnknownType), - cancel: func() { - close(sw.watches.responders[resource.AnyType].response) - }, - }) - - process := func(resp cache.Response) error { - nonce, err := sw.send(resp) - if err != nil { - return err - } - - sw.watches.responders[resp.GetRequest().GetTypeUrl()].nonce = nonce - return nil - } +func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRequest) error { + // Create a buffered multiplexed channel the size of the known resource types. + respChan := make(chan cache.Response, types.UnknownType) // Instead of creating a separate channel for each incoming request and abandoning the old one // This algorithm uses (and reuses) a single channel for all request types and guarantees @@ -42,9 +26,9 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe for { select { // We watch the multiplexed ADS channel for incoming responses. - case res := <-sw.watches.responders[resource.AnyType].response: + case res := <-respChan: if res.GetRequest().GetTypeUrl() != typeURL { - if err := process(res); err != nil { + if err := sw.send(res); err != nil { return err } } @@ -62,9 +46,9 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe select { case <-s.ctx.Done(): return nil - // We only watch the multiplexed channel since all values will come through from process. - case res := <-sw.watches.responders[resource.AnyType].response: - if err := process(res); err != nil { + // We only watch the multiplexed channel since we don't use per watch channels. + case res := <-respChan: + if err := sw.send(res); err != nil { return status.Errorf(codes.Unavailable, err.Error()) } case req, ok := <-reqCh: @@ -85,14 +69,10 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe req.Node = sw.node } - // Nonces can be reused across streams; we verify nonce only if nonce is not initialized. - nonce := req.GetResponseNonce() - // type URL is required for ADS but is implicit for xDS - if defaultTypeURL == resource.AnyType { - if req.GetTypeUrl() == "" { - return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") - } + typeURL := req.GetTypeUrl() + if typeURL == "" { + return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } if s.callbacks != nil { @@ -101,40 +81,44 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe } } - if lastResponse, ok := sw.lastDiscoveryResponses[req.GetTypeUrl()]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - sw.streamState.SetKnownResourceNames(req.GetTypeUrl(), lastResponse.resources) + var subscription stream.Subscription + w, ok := sw.watches.responders[typeURL] + if ok { + if w.nonce != "" && req.GetResponseNonce() != w.nonce { + // The request does not match the stream nonce, ignore it as per + // https://www.envoyproxy.io/docs/envoy/v1.28.0/api-docs/xds_protocol#resource-updates + // Ignore this request and wait for the next one + // This behavior is being discussed in https://github.com/envoyproxy/envoy/issues/10363 + // as it might create a race in edge cases, but it matches the current protocol definition + s.opts.Logger.Infof("[sotw ads] Skipping request as nonce is stale for %s", typeURL) + break } - } - typeURL := req.GetTypeUrl() - // Use the multiplexed channel for new watches. - responder := sw.watches.responders[resource.AnyType].response - if w, ok := sw.watches.responders[typeURL]; ok { - // We've found a pre-existing watch, lets check and update if needed. - // If these requirements aren't satisfied, leave an open watch. - if w.nonce == "" || w.nonce == nonce { - w.close() - - // Only process if we have an existing watch otherwise go ahead and create. - if err := processAllExcept(typeURL); err != nil { - return err - } + // We found an existing watch + // Close it to ensure the Cache will not reply to it while we modify the subscription state + w.close() - sw.watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, sw.streamState, responder), - response: responder, - }) + // Only process if we have an existing watch otherwise go ahead and create. + if err := processAllExcept(typeURL); err != nil { + return err } + + subscription = w.sub + subscription.SetResourceSubscription(req.GetResourceNames()) } else { - // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. - sw.watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, sw.streamState, responder), - response: responder, - }) + s.opts.Logger.Debugf("[sotw ads] New subscription for type %s and stream %d", typeURL, sw.ID) + subscription = stream.NewSotwSubscription(req.GetResourceNames()) + } + + cancel, err := s.cache.CreateWatch(req, subscription, respChan) + if err != nil { + return err } + sw.watches.addWatch(typeURL, &watch{ + cancel: cancel, + response: respChan, + sub: subscription, + }) } } } diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index f5be0c57a9..4d056ea1f2 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -18,12 +18,14 @@ package sotw import ( "context" "errors" + "fmt" "strconv" "sync/atomic" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/log" "github.com/envoyproxy/go-control-plane/pkg/server/config" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) @@ -65,6 +67,13 @@ func WithOrderedADS() config.XDSOption { } } +// WithLogger configures the server logger. Defaults to no logging +func WithLogger(logger log.Logger) config.XDSOption { + return func(o *config.Opts) { + o.Logger = logger + } +} + type server struct { cache cache.ConfigWatcher callbacks Callbacks @@ -88,43 +97,49 @@ type streamWrapper struct { callbacks Callbacks // callbacks for performing actions through stream lifecycle node *core.Node // registered xDS client - - // The below fields are used for tracking resource - // cache state and should be maintained per stream. - streamState stream.StreamState - lastDiscoveryResponses map[string]lastDiscoveryResponse } // Send packages the necessary resources before sending on the gRPC stream, // and sets the current state of the world. -func (s *streamWrapper) send(resp cache.Response) (string, error) { +func (s *streamWrapper) send(resp cache.Response) error { if resp == nil { - return "", errors.New("missing response") + return errors.New("missing response") } out, err := resp.GetDiscoveryResponse() if err != nil { - return "", err + return err } // increment nonce and convert it to base10 out.Nonce = strconv.FormatInt(atomic.AddInt64(&s.nonce, 1), 10) - lastResponse := lastDiscoveryResponse{ - nonce: out.GetNonce(), - resources: make(map[string]struct{}), + typeURL := resp.GetRequest().GetTypeUrl() + w, ok := s.watches.responders[typeURL] + if !ok { + return fmt.Errorf("no current watch for %s", typeURL) } + + // Track in the type subcription the nonce and objects returned to the client. + version, err := resp.GetVersion() + if err != nil { + return err + } + // ToDo(valerian-roche): properly return the resources actually sent to the client + // Currently we set all resources requested, which is non-descriptive when using wildcard. + resources := make(map[string]string, len(resp.GetRequest().GetResourceNames())) for _, r := range resp.GetRequest().GetResourceNames() { - lastResponse.resources[r] = struct{}{} + resources[r] = version } - s.lastDiscoveryResponses[resp.GetRequest().GetTypeUrl()] = lastResponse + w.sub.SetReturnedResources(resources) + w.nonce = out.Nonce // Register with the callbacks provided that we are sending the response. if s.callbacks != nil { s.callbacks.OnStreamResponse(resp.GetContext(), s.ID, resp.GetRequest(), out) } - return out.GetNonce(), s.stream.Send(out) + return s.stream.Send(out) } // Shutdown closes all open watches, and notifies API consumers the stream has closed. @@ -135,15 +150,6 @@ func (s *streamWrapper) shutdown() { } } -// Discovery response that is sent over GRPC stream. -// We need to record what resource names are already sent to a client -// So if the client requests a new name we can respond back -// regardless current snapshot version (even if it is not changed yet) -type lastDiscoveryResponse struct { - nonce string - resources map[string]struct{} -} - // StreamHandler converts a blocking read call to channels and initiates stream processing func (s *server) StreamHandler(stream stream.Stream, typeURL string) error { // a channel for receiving incoming requests diff --git a/pkg/server/sotw/v3/watches.go b/pkg/server/sotw/v3/watches.go index d781f663e6..43b3bedabe 100644 --- a/pkg/server/sotw/v3/watches.go +++ b/pkg/server/sotw/v3/watches.go @@ -7,6 +7,7 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // watches for all xDS resource types @@ -63,8 +64,11 @@ func (w *watches) recompute(ctx context.Context, req <-chan *discovery.Discovery // watch contains the necessary modifiable data for receiving resource responses type watch struct { cancel func() - nonce string response chan cache.Response + + sub stream.Subscription + // Nonce of the latest response sent for this type + nonce string } // close cancels an open watch diff --git a/pkg/server/sotw/v3/xds.go b/pkg/server/sotw/v3/xds.go index 3b24dec409..c6de51ca5f 100644 --- a/pkg/server/sotw/v3/xds.go +++ b/pkg/server/sotw/v3/xds.go @@ -26,9 +26,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque node: &core.Node{}, // node may only be set on the first discovery request // a collection of stack allocated watches per request type. - watches: newWatches(), - streamState: stream.NewStreamState(false, map[string]string{}), - lastDiscoveryResponses: make(map[string]lastDiscoveryResponse), + watches: newWatches(), } // cleanup once our stream has ended. @@ -40,6 +38,22 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque } } + // type URL is required for ADS but is implicit for xDS + if defaultTypeURL == resource.AnyType && s.opts.Ordered { + // When using ADS we need to order responses. + // This is guaranteed in the xDS protocol specification + // as ADS is required to be eventually consistent. + // More details can be found here if interested: + // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations + + // Trigger a different code path specifically for ADS. + // We want resource ordering so things don't get sent before they should. + // This is a blocking call and will exit the process function + // on successful completion. + s.opts.Logger.Debugf("[sotw] Switching to ordered ADS implementation for stream %d", sw.ID) + return s.processADS(&sw, reqCh) + } + // do an initial recompute so we can load the first 2 channels: // <-reqCh // s.ctx.Done() @@ -66,6 +80,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque req := value.Interface().(*discovery.DiscoveryRequest) if req == nil { + s.opts.Logger.Debugf("[sotw] Rejecting empty request for stream %d", sw.ID) return status.Errorf(codes.Unavailable, "empty request") } @@ -76,37 +91,12 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque req.Node = sw.node } - // nonces can be reused across streams; we verify nonce only if nonce is not initialized - nonce := req.GetResponseNonce() - // type URL is required for ADS but is implicit for xDS - if defaultTypeURL == resource.AnyType { - if req.GetTypeUrl() == "" { - return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") - } - - // When using ADS we need to order responses. - // This is guaranteed in the xDS protocol specification - // as ADS is required to be eventually consistent. - // More details can be found here if interested: - // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations - if s.opts.Ordered { - // send our first request on the stream again so it doesn't get - // lost in processing on the new control loop - // There's a risk (albeit very limited) that we'd end up handling requests in the wrong order here. - // If envoy is using ADS for endpoints, and clusters are added in short sequence, - // the following request might include a new cluster and be discarded as the previous one will be handled after. - go func() { - reqCh <- req - }() - - // Trigger a different code path specifically for ADS. - // We want resource ordering so things don't get sent before they should. - // This is a blocking call and will exit the process function - // on successful completion. - return s.processADS(&sw, reqCh, defaultTypeURL) - } - } else if req.GetTypeUrl() == "" { + switch { + case defaultTypeURL == resource.AnyType && req.GetTypeUrl() == "": + s.opts.Logger.Debugf("[sotw] Rejecting request as missing URL for stream %d", sw.ID) + return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") + case req.GetTypeUrl() == "": req.TypeUrl = defaultTypeURL } @@ -116,34 +106,42 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque } } - if lastResponse, ok := sw.lastDiscoveryResponses[req.GetTypeUrl()]; ok { - if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // Let's record Resource names that a client has received. - sw.streamState.SetKnownResourceNames(req.GetTypeUrl(), lastResponse.resources) + typeURL := req.GetTypeUrl() + var subscription stream.Subscription + w, ok := sw.watches.responders[typeURL] + if ok { + if w.nonce != "" && req.GetResponseNonce() != w.nonce { + // The request does not match the stream nonce, ignore it as per + // https://www.envoyproxy.io/docs/envoy/v1.28.0/api-docs/xds_protocol#resource-updates + // Ignore this request and wait for the next one + // This behavior is being discussed in https://github.com/envoyproxy/envoy/issues/10363 + // as it might create a race in edge cases, but it matches the current protocol definition + s.opts.Logger.Infof("[sotw] Skipping request as nonce is stale for type %s and stream %d", typeURL, sw.ID) + break } + + // We found an existing watch + // Close it to ensure the Cache will not reply to it while we modify the subscription state + w.close() + + subscription = w.sub + subscription.SetResourceSubscription(req.GetResourceNames()) + } else { + s.opts.Logger.Debugf("[sotw] New subscription for type %s and stream %d", typeURL, sw.ID) + subscription = stream.NewSotwSubscription(req.GetResourceNames()) } - typeURL := req.GetTypeUrl() responder := make(chan cache.Response, 1) - if w, ok := sw.watches.responders[typeURL]; ok { - // We've found a pre-existing watch, lets check and update if needed. - // If these requirements aren't satisfied, leave an open watch. - if w.nonce == "" || w.nonce == nonce { - w.close() - - sw.watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, sw.streamState, responder), - response: responder, - }) - } - } else { - // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. - sw.watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, sw.streamState, responder), - response: responder, - }) + cancel, err := s.cache.CreateWatch(req, subscription, responder) + if err != nil { + s.opts.Logger.Warnf("[sotw] Watch rejected for type %s and stream %d", typeURL, sw.ID) + return err } + sw.watches.addWatch(typeURL, &watch{ + cancel: cancel, + response: responder, + sub: subscription, + }) // Recompute the dynamic select cases for this stream. sw.watches.recompute(s.ctx, reqCh) @@ -155,12 +153,10 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque } res := value.Interface().(cache.Response) - nonce, err := sw.send(res) + err := sw.send(res) if err != nil { return err } - - sw.watches.responders[res.GetRequest().GetTypeUrl()].nonce = nonce } } } diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index 1664a941e0..b999d40c78 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -21,119 +21,3 @@ type DeltaStream interface { Send(*discovery.DeltaDiscoveryResponse) error Recv() (*discovery.DeltaDiscoveryRequest, error) } - -// StreamState will keep track of resource cache state per type on a stream. -type StreamState struct { // nolint:golint,revive - // Indicates whether the delta stream currently has a wildcard watch - wildcard bool - - // Provides the list of resources explicitly requested by the client - // This list might be non-empty even when set as wildcard - subscribedResourceNames map[string]struct{} - - // ResourceVersions contains a hash of the resource as the value and the resource name as the key. - // This field stores the last state sent to the client. - resourceVersions map[string]string - - // knownResourceNames contains resource names that a client has received previously (SOTW). - knownResourceNames map[string]map[string]struct{} - - // First indicates whether the StreamState has been modified since its creation - first bool - - // Ordered indicates whether we want an ordered ADS stream or not - ordered bool -} - -// NewStreamState initializes a stream state. -func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState { - state := StreamState{ - wildcard: wildcard, - subscribedResourceNames: map[string]struct{}{}, - resourceVersions: initialResourceVersions, - first: true, - knownResourceNames: map[string]map[string]struct{}{}, - ordered: false, // Ordered comes from the first request since that's when we discover if they want ADS - } - - if initialResourceVersions == nil { - state.resourceVersions = make(map[string]string) - } - - return state -} - -// GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to -// If the request is set to wildcard it may be empty -// Currently populated only when using delta-xds -func (s *StreamState) GetSubscribedResourceNames() map[string]struct{} { - return s.subscribedResourceNames -} - -// SetSubscribedResourceNames is setting the list of resources currently explicitly subscribed to -// It is decorrelated from the wildcard state of the stream -// Currently used only when using delta-xds -func (s *StreamState) SetSubscribedResourceNames(subscribedResourceNames map[string]struct{}) { - s.subscribedResourceNames = subscribedResourceNames -} - -// WatchesResources returns whether at least one of the resource provided is currently watch by the stream -// It is currently only applicable to delta-xds -// If the request is wildcard, it will always return true -// Otherwise it will compare the provided resources to the list of resources currently subscribed -func (s *StreamState) WatchesResources(resourceNames map[string]struct{}) bool { - if s.IsWildcard() { - return true - } - for resourceName := range resourceNames { - if _, ok := s.subscribedResourceNames[resourceName]; ok { - return true - } - } - return false -} - -func (s *StreamState) SetWildcard(wildcard bool) { - s.wildcard = wildcard -} - -// GetResourceVersions returns a map of current resources grouped by type URL. -func (s *StreamState) GetResourceVersions() map[string]string { - return s.resourceVersions -} - -// SetResourceVersions sets a list of resource versions by type URL and removes the flag -// of "first" since we can safely assume another request has come through the stream. -func (s *StreamState) SetResourceVersions(resourceVersions map[string]string) { - s.first = false - s.resourceVersions = resourceVersions -} - -// IsFirst returns whether or not the state of the stream is based upon the initial request. -func (s *StreamState) IsFirst() bool { - return s.first -} - -// IsWildcard returns whether or not an xDS client requested in wildcard mode on the initial request. -func (s *StreamState) IsWildcard() bool { - return s.wildcard -} - -// GetKnownResourceNames returns the current known list of resources on a SOTW stream. -func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} { - return s.knownResourceNames[url] -} - -// SetKnownResourceNames sets a list of resource names in a stream utilizing the SOTW protocol. -func (s *StreamState) SetKnownResourceNames(url string, names map[string]struct{}) { - s.knownResourceNames[url] = names -} - -// SetKnownResourceNamesAsList is a helper function to set resource names as a slice input. -func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) { - m := map[string]struct{}{} - for _, name := range names { - m[name] = struct{}{} - } - s.knownResourceNames[url] = m -} diff --git a/pkg/server/stream/v3/subscription.go b/pkg/server/stream/v3/subscription.go new file mode 100644 index 0000000000..106d97844e --- /dev/null +++ b/pkg/server/stream/v3/subscription.go @@ -0,0 +1,176 @@ +package stream + +const ( + explicitWildcard = "*" +) + +// Subscription stores the server view of a given type subscription in a stream. +type Subscription struct { + // wildcard indicates if the subscription currently has a wildcard watch. + wildcard bool + + // allowLegacyWildcard indicates that the stream never provided any resource + // and is de facto wildcard. + // As soon as a resource or an explicit subscription to wildcard is provided, + // this flag will be set to false + allowLegacyWildcard bool + + // subscribedResourceNames provides the resources explicitly requested by the client + // This list might be non-empty even when set as wildcard. + subscribedResourceNames map[string]struct{} + + // returnedResources contains the resources acknowledged by the client and the acknowledged versions. + returnedResources map[string]string +} + +// newSubscription initializes a subscription state. +func newSubscription(wildcard bool, initialResourceVersions map[string]string) Subscription { + state := Subscription{ + wildcard: wildcard, + allowLegacyWildcard: wildcard, + subscribedResourceNames: map[string]struct{}{}, + returnedResources: initialResourceVersions, + } + + if initialResourceVersions == nil { + state.returnedResources = make(map[string]string) + } + + return state +} + +func NewSotwSubscription(subscribed []string) Subscription { + sub := newSubscription(len(subscribed) == 0, nil) + sub.SetResourceSubscription(subscribed) + return sub +} + +// SetResourceSubscription updates the subscribed resources (including the wildcard state) +// based on the full state of subscribed resources provided in the request +// Used in sotw subscriptions +// Behavior is based on +// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return +func (s *Subscription) SetResourceSubscription(subscribed []string) { + if s.allowLegacyWildcard { + if len(subscribed) == 0 { + // We were wildcard based on legacy behavior and still don't request any resource + // The watch remains wildcard + return + } + + // A resource was provided (might be an explicit wildcard) + // Documentation states that we should no longer allow to fallback to the previous case + // and no longer setting wildcard would no longer subscribe to anything + s.allowLegacyWildcard = false + } + + subscribedResources := make(map[string]struct{}, len(subscribed)) + explicitWildcardSet := false + for _, resource := range subscribed { + if resource == explicitWildcard { + explicitWildcardSet = true + } else { + subscribedResources[resource] = struct{}{} + } + } + + // Explicit subscription to wildcard as we are not in legacy wildcard behavior + s.wildcard = explicitWildcardSet + s.subscribedResourceNames = subscribedResources +} + +func NewDeltaSubscription(subscribed, unsubscribed []string, initialResourceVersions map[string]string) Subscription { + sub := newSubscription(len(subscribed) == 0, initialResourceVersions) + sub.UpdateResourceSubscriptions(subscribed, unsubscribed) + return sub +} + +// UpdateResourceSubscriptions updates the subscribed resources (including the wildcard state) +// based on newly subscribed or unsubscribed resources +// Used in delta subscriptions +func (s *Subscription) UpdateResourceSubscriptions(subscribed, unsubscribed []string) { + // Handles legacy wildcard behavior first to exit if we are still in this behavior + if s.allowLegacyWildcard { + // The protocol (as of v1.29.0) only references subscribed as triggering + // exiting legacy wildcard behavior, so we currently not check unsubscribed + if len(subscribed) == 0 { + // We were wildcard based on legacy behavior and still don't request any resource + // The watch remains wildcard + return + } + + // A resource was provided (might be an explicit wildcard) + // Documentation states that we should no longer allow to fallback to the previous case + // and no longer setting wildcard would no longer subscribe to anything + // The watch does remain wildcard if not explicitly unsubscribed (from the example in + // https://www.envoyproxy.io/docs/envoy/v1.29.0/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return) + s.allowLegacyWildcard = false + } + + // Handle subscriptions first + for _, resource := range subscribed { + if resource == explicitWildcard { + s.wildcard = true + continue + } + s.subscribedResourceNames[resource] = struct{}{} + } + + // Then unsubscriptions + for _, resource := range unsubscribed { + if resource == explicitWildcard { + s.wildcard = false + continue + } + if _, ok := s.subscribedResourceNames[resource]; ok && s.wildcard { + // The XDS protocol states that: + // * if a watch is currently wildcard + // * a resource is explicitly unsubscribed by name + // Then the control-plane must return in the response whether the resource is removed (if no longer present for this node) + // or still existing. In the latter case the entire resource must be returned, same as if it had been created or updated + // To achieve that, we mark the resource as having been returned with an empty version. While creating the response, the cache will either: + // * detect the version change, and return the resource (as an update) + // * detect the resource deletion, and set it as removed in the response + s.returnedResources[resource] = "" + } + delete(s.subscribedResourceNames, resource) + } +} + +// SubscribedResources returns the list of resources currently explicitly subscribed to +// If the request is set to wildcard it may be empty +func (s Subscription) SubscribedResources() map[string]struct{} { + return s.subscribedResourceNames +} + +// IsWildcard returns whether or not the subscription currently has a wildcard watch +func (s Subscription) IsWildcard() bool { + return s.wildcard +} + +// WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription. +// If the request is wildcard, it will always return true, +// otherwise it will compare the provided resources to the list of resources currently subscribed +func (s Subscription) WatchesResources(resourceNames map[string]struct{}) bool { + if s.wildcard { + return true + } + for resourceName := range resourceNames { + if _, ok := s.subscribedResourceNames[resourceName]; ok { + return true + } + } + return false +} + +// ReturnedResources returns the list of resources returned to the client +// and their version +func (s Subscription) ReturnedResources() map[string]string { + return s.returnedResources +} + +// SetReturnedResources sets a list of resource versions currently known by the client +// The cache can use this state to compute resources added/updated/deleted +func (s *Subscription) SetReturnedResources(resourceVersions map[string]string) { + s.returnedResources = resourceVersions +} diff --git a/pkg/server/stream/v3/subscription_test.go b/pkg/server/stream/v3/subscription_test.go new file mode 100644 index 0000000000..23e29ca7c6 --- /dev/null +++ b/pkg/server/stream/v3/subscription_test.go @@ -0,0 +1,159 @@ +package stream + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSotwSubscriptions(t *testing.T) { + t.Run("legacy mode properly handled", func(t *testing.T) { + sub := NewSotwSubscription([]string{}) + assert.True(t, sub.IsWildcard()) + + // Requests always set empty in legacy mode + sub.SetResourceSubscription([]string{}) + assert.True(t, sub.IsWildcard()) + assert.Empty(t, sub.SubscribedResources()) + + // Requests always set empty in legacy mode + sub.SetResourceSubscription(nil) + assert.True(t, sub.IsWildcard()) + assert.Empty(t, sub.SubscribedResources()) + + // Set any resource, no longer wildcard + sub.SetResourceSubscription([]string{"resource"}) + assert.False(t, sub.IsWildcard()) + assert.Equal(t, map[string]struct{}{"resource": {}}, sub.SubscribedResources()) + + // No longer watch any resource, should not come back to wildcard as no longer in legacy mode + // We end up with a watch to nothing + sub.SetResourceSubscription(nil) + assert.False(t, sub.IsWildcard()) + assert.Empty(t, sub.SubscribedResources()) + }) + + t.Run("new wildcard mode from start", func(t *testing.T) { + // A resource is provided so the subscription was created in wildcard + sub := NewSotwSubscription([]string{"*"}) + assert.True(t, sub.IsWildcard()) + assert.Empty(t, sub.SubscribedResources()) + + // Keep wildcard, no change + sub.SetResourceSubscription([]string{"*"}) + assert.True(t, sub.IsWildcard()) + assert.Empty(t, sub.SubscribedResources()) + + // Add resource to wildcard + sub.SetResourceSubscription([]string{"*", "resource"}) + assert.True(t, sub.IsWildcard()) + assert.Equal(t, map[string]struct{}{"resource": {}}, sub.SubscribedResources()) + + // Add/Remove resource to wildcard + sub.SetResourceSubscription([]string{"*", "otherresource"}) + assert.True(t, sub.IsWildcard()) + assert.Equal(t, map[string]struct{}{"otherresource": {}}, sub.SubscribedResources()) + + // Remove wildcard + sub.SetResourceSubscription([]string{"otherresource"}) + assert.False(t, sub.IsWildcard()) + assert.Equal(t, map[string]struct{}{"otherresource": {}}, sub.SubscribedResources()) + + // Remove last resource + sub.SetResourceSubscription([]string{}) + assert.False(t, sub.IsWildcard()) + assert.Empty(t, sub.SubscribedResources()) + + // Re-subscribe to wildcard + sub.SetResourceSubscription([]string{"*"}) + assert.True(t, sub.IsWildcard()) + assert.Empty(t, sub.SubscribedResources()) + }) +} + +func TestDeltaSubscriptions(t *testing.T) { + t.Run("legacy mode properly handled", func(t *testing.T) { + sub := NewDeltaSubscription([]string{}, []string{}, map[string]string{"resource": "version"}) + assert.True(t, sub.IsWildcard()) + assert.Empty(t, sub.SubscribedResources()) + assert.Equal(t, map[string]string{"resource": "version"}, sub.ReturnedResources()) + + // New request with no additional subscription + sub.UpdateResourceSubscriptions(nil, nil) + assert.True(t, sub.IsWildcard()) + assert.Empty(t, sub.SubscribedResources()) + assert.Equal(t, map[string]string{"resource": "version"}, sub.ReturnedResources()) + + // New request adding a resource + sub.UpdateResourceSubscriptions([]string{"resource"}, nil) + assert.True(t, sub.IsWildcard()) // Wildcard not unsubscribed + assert.Equal(t, map[string]struct{}{"resource": {}}, sub.SubscribedResources()) + assert.Equal(t, map[string]string{"resource": "version"}, sub.ReturnedResources()) + + // Unsubscribe from "resource", still wildcard + sub.UpdateResourceSubscriptions(nil, []string{"resource"}) + assert.True(t, sub.IsWildcard()) + assert.Empty(t, sub.SubscribedResources()) + // Version is set to "" to trigger an update or have the resource in the "removed" field + // when explicitly unsubscribing from wildcard, to align with + // https://www.envoyproxy.io/docs/envoy/v1.29.0/api-docs/xds_protocol#xds-protocol-unsubscribe + assert.Equal(t, map[string]string{"resource": ""}, sub.ReturnedResources()) + }) + + t.Run("new wildcard mode", func(t *testing.T) { + // A resource is provided so the subscription was created in wildcard + sub := NewDeltaSubscription([]string{"*"}, []string{}, map[string]string{"resource": "version"}) + assert.True(t, sub.IsWildcard()) + assert.Empty(t, sub.SubscribedResources()) + + // New request with no additional subscription + sub.UpdateResourceSubscriptions(nil, nil) + assert.True(t, sub.IsWildcard()) + assert.Empty(t, sub.SubscribedResources()) + assert.Equal(t, map[string]string{"resource": "version"}, sub.ReturnedResources()) + + // Add resource to wildcard + sub.UpdateResourceSubscriptions([]string{"resource"}, []string{}) + assert.True(t, sub.IsWildcard()) + assert.Equal(t, map[string]struct{}{"resource": {}}, sub.SubscribedResources()) + assert.Equal(t, map[string]string{"resource": "version"}, sub.ReturnedResources()) + + // Unsubscribe from resource while wildcard + sub.UpdateResourceSubscriptions([]string{"otherresource"}, []string{"resource"}) + assert.True(t, sub.IsWildcard()) + assert.Equal(t, map[string]struct{}{"otherresource": {}}, sub.SubscribedResources()) + // Version is set to "" to trigger an update or have the resource in the "removed" field + // when explicitly unsubscribing from wildcard, to align with + // https://www.envoyproxy.io/docs/envoy/v1.29.0/api-docs/xds_protocol#xds-protocol-unsubscribe + assert.Equal(t, map[string]string{"resource": ""}, sub.ReturnedResources()) + + sub.SetReturnedResources(nil) + + // Remove subscription to wildcard + sub.UpdateResourceSubscriptions([]string{"resource"}, []string{"*"}) + assert.False(t, sub.IsWildcard()) + assert.Equal(t, map[string]struct{}{"resource": {}, "otherresource": {}}, sub.SubscribedResources()) + assert.Empty(t, sub.ReturnedResources()) + + // Remove all subscriptions + // Does not come back to wildcard + sub.UpdateResourceSubscriptions([]string{}, []string{"resource", "otherresource"}) + assert.False(t, sub.IsWildcard()) + assert.Empty(t, sub.SubscribedResources()) + + // Attempt to remove wildcard when not subscribed + sub.UpdateResourceSubscriptions([]string{"resource"}, []string{"*"}) + assert.False(t, sub.IsWildcard()) + assert.Equal(t, map[string]struct{}{"resource": {}}, sub.SubscribedResources()) + + // Resubscribe to wildcard + sub.UpdateResourceSubscriptions([]string{"*"}, nil) + assert.True(t, sub.IsWildcard()) + assert.Equal(t, map[string]struct{}{"resource": {}}, sub.SubscribedResources()) + + // Attempt to remove not-subscribed resource. Should just be ignored + sub.UpdateResourceSubscriptions([]string{}, []string{"otherresource"}) + assert.True(t, sub.IsWildcard()) + assert.Equal(t, map[string]struct{}{"resource": {}}, sub.SubscribedResources()) + }) +} diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 72c2b74075..aca1aaf155 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -16,12 +16,11 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) -func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state stream.StreamState, out chan cache.DeltaResponse) func() { +func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, sub cache.Subscription, out chan cache.DeltaResponse) (func(), error) { config.deltaCounts[req.GetTypeUrl()] = config.deltaCounts[req.GetTypeUrl()] + 1 // This is duplicated from pkg/cache/v3/delta.go as private there @@ -37,8 +36,8 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR // If we are handling a wildcard request, we want to respond with all resources switch { - case state.IsWildcard(): - if len(state.GetResourceVersions()) == 0 { + case sub.IsWildcard(): + if len(sub.ReturnedResources()) == 0 { filtered = make([]types.Resource, 0, len(resourceMap)) } nextVersionMap = make(map[string]string, len(resourceMap)) @@ -47,24 +46,24 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR // we can just set it here to be used for comparison later version := versionMap[name] nextVersionMap[name] = version - prevVersion, found := state.GetResourceVersions()[name] + prevVersion, found := sub.ReturnedResources()[name] if !found || (prevVersion != version) { filtered = append(filtered, r) } } // Compute resources for removal - for name := range state.GetResourceVersions() { + for name := range sub.ReturnedResources() { if _, ok := resourceMap[name]; !ok { toRemove = append(toRemove, name) } } default: - nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) + nextVersionMap = make(map[string]string, len(sub.SubscribedResources())) // state.GetResourceVersions() may include resources no longer subscribed // In the current code this gets silently cleaned when updating the version map - for name := range state.GetSubscribedResourceNames() { - prevVersion, found := state.GetResourceVersions()[name] + for name := range sub.SubscribedResources() { + prevVersion, found := sub.ReturnedResources()[name] if r, ok := resourceMap[name]; ok { nextVersion := versionMap[name] if prevVersion != nextVersion { @@ -89,10 +88,10 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR config.deltaWatches++ return func() { config.deltaWatches-- - } + }, nil } - return nil + return nil, nil } type mockDeltaStream struct { @@ -472,7 +471,7 @@ func TestDeltaWildcardSubscriptions(t *testing.T) { }, } - validateResponse := func(t *testing.T, replies <-chan *discovery.DeltaDiscoveryResponse, expectedResources []string, expectedRemovedResources []string) { + validateResponse := func(t *testing.T, replies <-chan *discovery.DeltaDiscoveryResponse, expectedResources, expectedRemovedResources []string) { t.Helper() select { case response := <-replies: diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index d3c4e0f81b..762cf1614e 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -32,9 +32,9 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/log" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) @@ -50,24 +50,26 @@ type mockConfigWatcher struct { mu *sync.RWMutex } -func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, _ stream.StreamState, out chan cache.Response) func() { - config.counts[req.GetTypeUrl()] = config.counts[req.GetTypeUrl()] + 1 - if len(config.responses[req.GetTypeUrl()]) > 0 { - out <- config.responses[req.GetTypeUrl()][0] - config.responses[req.GetTypeUrl()] = config.responses[req.GetTypeUrl()][1:] +func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, _ cache.Subscription, out chan cache.Response) (func(), error) { + typ := req.GetTypeUrl() + config.counts[typ]++ + if len(config.responses[typ]) > 0 { + out <- config.responses[typ][0] + config.responses[typ] = config.responses[typ][1:] } else { config.watches++ return func() { config.watches-- - } + }, nil } - return nil + return nil, nil } func (config *mockConfigWatcher) Fetch(_ context.Context, req *discovery.DiscoveryRequest) (cache.Response, error) { - if len(config.responses[req.GetTypeUrl()]) > 0 { - out := config.responses[req.GetTypeUrl()][0] - config.responses[req.GetTypeUrl()] = config.responses[req.GetTypeUrl()][1:] + typ := req.GetTypeUrl() + if len(config.responses[typ]) > 0 { + out := config.responses[typ][0] + config.responses[typ] = config.responses[typ][1:] return out, nil } return nil, errors.New("missing") @@ -259,7 +261,7 @@ func TestServerShutdown(t *testing.T) { config.responses = makeResponses() shutdown := make(chan bool) ctx, cancel := context.WithCancel(context.Background()) - s := server.NewServer(ctx, config, server.CallbackFuncs{}) + s := server.NewServer(ctx, config, server.CallbackFuncs{}, sotw.WithLogger(log.NewTestLogger(t))) // make a request resp := makeMockStream(t) @@ -313,7 +315,7 @@ func TestResponseHandlers(t *testing.T) { config := makeMockConfigWatcher() config.responses = makeResponses() - s := server.NewServer(ctx, config, server.CallbackFuncs{}) + s := server.NewServer(ctx, config, server.CallbackFuncs{}, sotw.WithLogger(log.NewTestLogger(t))) // make a request resp := makeMockStream(t) @@ -389,7 +391,7 @@ func TestFetch(t *testing.T) { }, } - s := server.NewServer(context.Background(), config, cb) + s := server.NewServer(context.Background(), config, cb, sotw.WithLogger(log.NewTestLogger(t))) out, err := s.FetchEndpoints(context.Background(), &discovery.DiscoveryRequest{Node: node}) assert.NotNil(t, out) require.NoError(t, err) @@ -484,7 +486,7 @@ func TestSendError(t *testing.T) { t.Run(typ, func(t *testing.T) { config := makeMockConfigWatcher() config.responses = makeResponses() - s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}, sotw.WithLogger(log.NewTestLogger(t))) // make a request resp := makeMockStream(t) @@ -508,7 +510,7 @@ func TestStaleNonce(t *testing.T) { t.Run(typ, func(t *testing.T) { config := makeMockConfigWatcher() config.responses = makeResponses() - s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}, sotw.WithLogger(log.NewTestLogger(t))) resp := makeMockStream(t) resp.recv <- &discovery.DiscoveryRequest{ @@ -520,11 +522,11 @@ func TestStaleNonce(t *testing.T) { err := s.StreamAggregatedResources(resp) require.NoError(t, err) // should be two watches called - assert.True(t, reflect.DeepEqual(map[string]int{typ: 2}, config.counts)) + assert.True(t, reflect.DeepEqual(map[string]int{typ: 2}, config.counts), config.counts) close(stop) }() select { - case <-resp.sent: + case res := <-resp.sent: // stale request resp.recv <- &discovery.DiscoveryRequest{ Node: node, @@ -533,10 +535,10 @@ func TestStaleNonce(t *testing.T) { } // fresh request resp.recv <- &discovery.DiscoveryRequest{ - VersionInfo: "1", + VersionInfo: res.VersionInfo, Node: node, TypeUrl: typ, - ResponseNonce: "1", + ResponseNonce: res.Nonce, } close(resp.recv) case <-time.After(1 * time.Second): @@ -583,7 +585,7 @@ func TestAggregatedHandlers(t *testing.T) { // We create the server with the optional ordered ADS flag so we guarantee resource // ordering over the stream. - s := server.NewServer(context.Background(), config, server.CallbackFuncs{}, sotw.WithOrderedADS()) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}, sotw.WithOrderedADS(), sotw.WithLogger(log.NewTestLogger(t))) go func() { err := s.StreamAggregatedResources(resp) require.NoError(t, err) @@ -618,7 +620,7 @@ func TestAggregatedHandlers(t *testing.T) { func TestAggregateRequestType(t *testing.T) { config := makeMockConfigWatcher() - s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}, sotw.WithLogger(log.NewTestLogger(t))) resp := makeMockStream(t) resp.recv <- &discovery.DiscoveryRequest{Node: node} err := s.StreamAggregatedResources(resp) @@ -635,7 +637,7 @@ func TestCancellations(t *testing.T) { } } close(resp.recv) - s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}, sotw.WithLogger(log.NewTestLogger(t))) err := s.StreamAggregatedResources(resp) require.NoError(t, err) assert.Equal(t, 0, config.watches) @@ -653,7 +655,7 @@ func TestOpaqueRequestsChannelMuxing(t *testing.T) { } } close(resp.recv) - s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}, sotw.WithLogger(log.NewTestLogger(t))) err := s.StreamAggregatedResources(resp) require.NoError(t, err) assert.Equal(t, 0, config.watches)