From fb60942bd4844542a01607ce8a6d6ca2c19c816e Mon Sep 17 00:00:00 2001 From: Valerian Roche Date: Thu, 18 Jan 2024 23:15:42 -0500 Subject: [PATCH] Currently sotw and delta watches use different interfaces to track the stream state An assumption used to be that sotw watches are stateless and can be processed from the request itself. This was already altered in https://github.com/envoyproxy/go-control-plane/pull/508, but it only covers the usecases partially (e.g. it does not cover subscribing/unsubscribing to wildcard). The current object passed to track the subscription provides multiple issues: - it is not an interface, restricting evolution or testing for users building a separate cache - it tracks some objects per types, other globally, based on sotw or delta, making it confusing and hard to use in a consistent way This PR replaces the legacy streamState by the notion of Subscription per type. StreamState is not relevant to the cache, as it only requires to know the subscription status (e.g. the last nonce is not needed), while a stream can multiplex multiple types. Further work will come to: - fill the subscription with the same attributes in sotw and delta, allowing to take proper decisions on what to reply - Fix multiple issues within the cache in sotw - Fix handling of explicit vs. legacy wildcard in sotw and delta Signed-off-by: Valerian Roche --- pkg/cache/v3/cache.go | 33 +++- pkg/cache/v3/delta.go | 17 +-- pkg/cache/v3/delta_test.go | 41 ++--- pkg/cache/v3/linear.go | 35 +++-- pkg/cache/v3/linear_test.go | 221 ++++++++++++++------------- pkg/cache/v3/mux.go | 17 +-- pkg/cache/v3/simple.go | 41 +++-- pkg/cache/v3/simple_test.go | 110 ++++++++----- pkg/cache/v3/status.go | 5 +- pkg/server/delta/v3/server.go | 32 ++-- pkg/server/delta/v3/watches.go | 2 +- pkg/server/sotw/v3/ads.go | 24 ++- pkg/server/sotw/v3/server.go | 13 +- pkg/server/sotw/v3/xds.go | 25 ++- pkg/server/stream/v3/stream.go | 116 -------------- pkg/server/stream/v3/subscription.go | 85 +++++++++++ pkg/server/v3/delta_test.go | 21 ++- pkg/server/v3/server_test.go | 7 +- 18 files changed, 460 insertions(+), 385 deletions(-) create mode 100644 pkg/server/stream/v3/subscription.go diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index 5ad8e24140..081b0a216f 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -26,7 +26,6 @@ import ( "google.golang.org/protobuf/types/known/durationpb" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" ) @@ -37,6 +36,34 @@ type Request = discovery.DiscoveryRequest // DeltaRequest is an alias for the delta discovery request type. type DeltaRequest = discovery.DeltaDiscoveryRequest +// Subscription stores the server view of the client state for a given resource type. +// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources). +// Though the methods may return mutable parts of the state for performance reasons, +// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation. +type Subscription interface { + // ReturnedResources returns a list of resources that clients have ACK'd and their associated version. + // The versions are: + // - delta protocol: version of the specific resource set in the response + // - sotw protocol: version of the global response when the resource was last ACKed + ReturnedResources() map[string]string + + // SubscribedResources returns the list of resources currently subscribed to by the client for the type. + // For delta it keeps track of subscription updates across requests + // For sotw it is a normalized view of the last request resources + SubscribedResources() map[string]struct{} + + // IsWildcard returns whether the client has a wildcard watch. + // This considers subtleties related to the current migration of wildcard definitions within the protocol. + // More details on the behavior of wildcard are present at https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return + IsWildcard() bool + + // WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription. + // It is currently only applicable to delta-xds. + // If the request is wildcard, it will always return true, + // otherwise it will compare the provided resources to the list of resources currently subscribed + WatchesResources(resourceNames map[string]struct{}) bool +} + // ConfigWatcher requests watches for configuration resources by a node, last // applied version identifier, and resource names hint. The watch should send // the responses when they are ready. The watch can be canceled by the @@ -54,7 +81,7 @@ type ConfigWatcher interface { // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateWatch(*Request, stream.StreamState, chan Response) (cancel func()) + CreateWatch(*Request, Subscription, chan Response) (cancel func(), err error) // CreateDeltaWatch returns a new open incremental xDS watch. // This is the entrypoint to propagate configuration changes the @@ -66,7 +93,7 @@ type ConfigWatcher interface { // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func()) + CreateDeltaWatch(*DeltaRequest, Subscription, chan DeltaResponse) (cancel func(), err error) } // ConfigFetcher fetches configuration resources from cache diff --git a/pkg/cache/v3/delta.go b/pkg/cache/v3/delta.go index deaeeb7ed1..93d572337e 100644 --- a/pkg/cache/v3/delta.go +++ b/pkg/cache/v3/delta.go @@ -18,7 +18,6 @@ import ( "context" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // groups together resource-related arguments for the createDeltaResponse function @@ -28,7 +27,7 @@ type resourceContainer struct { systemVersion string } -func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse { +func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscription, resources resourceContainer) *RawDeltaResponse { // variables to build our response with var nextVersionMap map[string]string var filtered []types.Resource @@ -36,8 +35,8 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // If we are handling a wildcard request, we want to respond with all resources switch { - case state.IsWildcard(): - if len(state.GetResourceVersions()) == 0 { + case sub.IsWildcard(): + if len(sub.ReturnedResources()) == 0 { filtered = make([]types.Resource, 0, len(resources.resourceMap)) } nextVersionMap = make(map[string]string, len(resources.resourceMap)) @@ -46,7 +45,7 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // we can just set it here to be used for comparison later version := resources.versionMap[name] nextVersionMap[name] = version - prevVersion, found := state.GetResourceVersions()[name] + prevVersion, found := sub.ReturnedResources()[name] if !found || (prevVersion != version) { filtered = append(filtered, r) } @@ -54,17 +53,17 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St // Compute resources for removal // The resource version can be set to "" here to trigger a removal even if never returned before - for name := range state.GetResourceVersions() { + for name := range sub.ReturnedResources() { if _, ok := resources.resourceMap[name]; !ok { toRemove = append(toRemove, name) } } default: - nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) + nextVersionMap = make(map[string]string, len(sub.SubscribedResources())) // state.GetResourceVersions() may include resources no longer subscribed // In the current code this gets silently cleaned when updating the version map - for name := range state.GetSubscribedResourceNames() { - prevVersion, found := state.GetResourceVersions()[name] + for name := range sub.SubscribedResources() { + prevVersion, found := sub.ReturnedResources()[name] if r, ok := resources.resourceMap[name]; ok { nextVersion := resources.versionMap[name] if prevVersion != nextVersion { diff --git a/pkg/cache/v3/delta_test.go b/pkg/cache/v3/delta_test.go index fc4ee91327..220256946e 100644 --- a/pkg/cache/v3/delta_test.go +++ b/pkg/cache/v3/delta_test.go @@ -42,7 +42,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, stream.NewStreamState(true, nil), watches[typ]) + }, stream.NewSubscription(true, nil), watches[typ]) } if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { @@ -68,9 +68,9 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { // all resources as well as individual resource removals for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) - state := stream.NewStreamState(false, versionMap[typ]) + sub := stream.NewSubscription(false, versionMap[typ]) for resource := range versionMap[typ] { - state.GetSubscribedResourceNames()[resource] = struct{}{} + sub.SubscribedResources()[resource] = struct{}{} } c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ @@ -78,7 +78,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, state, watches[typ]) + }, sub, watches[typ]) } if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) { @@ -111,12 +111,12 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) { func TestDeltaRemoveResources(t *testing.T) { c := cache.NewSnapshotCache(false, group{}, logger{t: t}) watches := make(map[string]chan cache.DeltaResponse) - streams := make(map[string]*stream.StreamState) + subscriptions := make(map[string]*stream.Subscription) for _, typ := range testTypes { watches[typ] = make(chan cache.DeltaResponse, 1) - state := stream.NewStreamState(true, make(map[string]string)) - streams[typ] = &state + sub := stream.NewSubscription(true, make(map[string]string)) + subscriptions[typ] = &sub // We don't specify any resource name subscriptions here because we want to make sure we test wildcard // functionality. This means we should receive all resources back without requesting a subscription by name. c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ @@ -124,7 +124,7 @@ func TestDeltaRemoveResources(t *testing.T) { Id: "node", }, TypeUrl: typ, - }, *streams[typ], watches[typ]) + }, *subscriptions[typ], watches[typ]) } if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { @@ -138,7 +138,7 @@ func TestDeltaRemoveResources(t *testing.T) { snapshot := fixture.snapshot() assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ)) nextVersionMap := out.GetNextVersionMap() - streams[typ].SetResourceVersions(nextVersionMap) + subscriptions[typ].SetReturnedResources(nextVersionMap) case <-time.After(time.Second): t.Fatal("failed to receive a snapshot response") } @@ -153,8 +153,9 @@ func TestDeltaRemoveResources(t *testing.T) { Node: &core.Node{ Id: "node", }, - TypeUrl: typ, - }, *streams[typ], watches[typ]) + TypeUrl: typ, + ResponseNonce: "nonce", + }, *subscriptions[typ], watches[typ]) } if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) { @@ -177,7 +178,7 @@ func TestDeltaRemoveResources(t *testing.T) { nextVersionMap := out.GetNextVersionMap() // make sure the version maps are different since we no longer are tracking any endpoint resources - if reflect.DeepEqual(streams[testTypes[0]].GetResourceVersions(), nextVersionMap) { + if reflect.DeepEqual(subscriptions[testTypes[0]].ReturnedResources(), nextVersionMap) { t.Fatalf("versionMap for the endpoint resource type did not change, received: %v, instead of an empty map", nextVersionMap) } case <-time.After(time.Second): @@ -204,14 +205,15 @@ func TestConcurrentSetDeltaWatch(t *testing.T) { t.Fatalf("snapshot failed: %s", err) } } else { - cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ + cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: id, }, TypeUrl: rsrc.EndpointType, ResourceNamesSubscribe: []string{clusterName}, - }, stream.NewStreamState(false, make(map[string]string)), responses) + }, stream.NewSubscription(false, make(map[string]string)), responses) + require.NoError(t, err) defer cancel() } }) @@ -226,15 +228,15 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) { // Create a non-buffered channel that will block sends. watchCh := make(chan cache.DeltaResponse) - state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}}) + sub := stream.NewSubscription(false, nil) + sub.SubscribedResources()[names[rsrc.EndpointType][0]] = struct{}{} c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: key, }, TypeUrl: rsrc.EndpointType, ResourceNamesSubscribe: names[rsrc.EndpointType], - }, state, watchCh) + }, sub, watchCh) // The first time we set the snapshot without consuming from the blocking channel, so this should time out. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) @@ -270,13 +272,14 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) for _, typ := range testTypes { responses := make(chan cache.DeltaResponse, 1) - cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ + cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{ Node: &core.Node{ Id: key, }, TypeUrl: typ, ResourceNamesSubscribe: names[typ], - }, stream.NewStreamState(false, make(map[string]string)), responses) + }, stream.NewSubscription(false, make(map[string]string)), responses) + require.NoError(t, err) // Cancel the watch cancel() diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index cd8f6bf2bd..6d9d3793ce 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -17,6 +17,7 @@ package cache import ( "context" "errors" + "fmt" "strconv" "strings" "sync" @@ -24,7 +25,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/log" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) type watches = map[ResponseWatch]struct{} @@ -178,11 +178,11 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { } for id, watch := range cache.deltaWatches { - if !watch.StreamState.WatchesResources(modified) { + if !watch.subscription.WatchesResources(modified) { continue } - res := cache.respondDelta(watch.Request, watch.Response, watch.StreamState) + res := cache.respondDelta(watch.Request, watch.Response, watch.subscription) if res != nil { delete(cache.deltaWatches, id) } @@ -190,8 +190,8 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { } } -func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) *RawDeltaResponse { - resp := createDeltaResponse(context.Background(), request, state, resourceContainer{ +func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, sub Subscription) *RawDeltaResponse { + resp := createDeltaResponse(context.Background(), request, sub, resourceContainer{ resourceMap: cache.resources, versionMap: cache.versionMap, systemVersion: cache.getVersion(), @@ -201,7 +201,7 @@ func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaRe if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 { if cache.log != nil { cache.log.Debugf("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", - request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard()) + request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, sub.IsWildcard()) } value <- resp return resp @@ -312,10 +312,9 @@ func (cache *LinearCache) GetResources() map[string]types.Resource { return resources } -func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, value chan Response) func() { +func (cache *LinearCache) CreateWatch(request *Request, _ Subscription, value chan Response) (func(), error) { if request.GetTypeUrl() != cache.typeURL { - value <- nil - return nil + return nil, fmt.Errorf("request type %s does not match cache type %s", request.GetTypeUrl(), cache.typeURL) } // If the version is not up to date, check whether any requested resource has // been updated between the last version and the current version. This avoids the problem @@ -361,7 +360,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va } if stale { cache.respond(watch, staleResources) - return nil + return nil, nil } // Create open watches since versions are up to date. if len(request.GetResourceNames()) == 0 { @@ -371,7 +370,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va cache.mu.Lock() defer cache.mu.Unlock() delete(cache.watchAll, watch) - } + }, nil } cache.log.Infof("[linear cache] open watch for %s resources %v, system version %q", cache.typeURL, request.ResourceNames, cache.getVersion()) @@ -387,7 +386,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va cache.mu.Lock() defer cache.mu.Unlock() cache.removeWatch(watch) - } + }, nil } // Must be called under lock @@ -403,7 +402,7 @@ func (cache *LinearCache) removeWatch(watch ResponseWatch) { } } -func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) { cache.mu.Lock() defer cache.mu.Unlock() @@ -420,7 +419,7 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S cache.log.Errorf("failed to update version map: %v", err) } } - response := cache.respondDelta(request, value, state) + response := cache.respondDelta(request, value, sub) // if respondDelta returns nil this means that there is no change in any resource version // create a new watch accordingly @@ -428,15 +427,15 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S watchID := cache.nextDeltaWatchID() if cache.log != nil { cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID, - cache.typeURL, state.GetSubscribedResourceNames(), cache.getVersion()) + cache.typeURL, sub.SubscribedResources(), cache.getVersion()) } - cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state} + cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, subscription: sub} - return cache.cancelDeltaWatch(watchID) + return cache.cancelDeltaWatch(watchID), nil } - return nil + return nil, nil } func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error { diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 32d2bedd02..cf722cf288 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -230,26 +230,26 @@ func hashResource(t *testing.T, resource types.Resource) string { } func createWildcardDeltaWatch(c *LinearCache, w chan DeltaResponse) { - state := stream.NewStreamState(true, nil) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + sub := stream.NewSubscription(true, nil) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub, w) resp := <-w - state.SetResourceVersions(resp.GetNextVersionMap()) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) // Ensure the watch is set properly with cache values + sub.SetReturnedResources(resp.GetNextVersionMap()) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub, w) // Ensure the watch is set properly with cache values } func TestLinearInitialResources(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, sub, w) verifyResponse(t, w, "0", 1) - c.CreateWatch(&Request{TypeUrl: testType}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType}, sub, w) verifyResponse(t, w, "0", 2) checkVersionMapNotSet(t, c) } func TestLinearCornerCases(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType) err := c.UpdateResource("a", nil) if err == nil { @@ -257,29 +257,22 @@ func TestLinearCornerCases(t *testing.T) { } // create an incorrect type URL request w := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: "test"}, streamState, w) - select { - case r := <-w: - if r != nil { - t.Error("response should be nil") - } - default: - t.Error("should receive nil response") - } + _, err = c.CreateWatch(&Request{TypeUrl: "test"}, sub, w) + require.Error(t, err, "cache should have rejected the watch") } func TestLinearBasic(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType) // Create watches before a resource is ready w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w1) mustBlock(t, w1) checkVersionMapNotSet(t, c) w := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w) mustBlock(t, w) checkWatchCount(t, c, "a", 2) checkWatchCount(t, c, "b", 1) @@ -290,34 +283,34 @@ func TestLinearBasic(t *testing.T) { verifyResponse(t, w, "1", 1) // Request again, should get same response - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) // Add another element and update the first, response should be different require.NoError(t, c.UpdateResource("b", testResource("b"))) require.NoError(t, c.UpdateResource("a", testResource("aa"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) verifyResponse(t, w, "3", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w) verifyResponse(t, w, "3", 2) // Ensure the version map was not created as we only ever used stow watches checkVersionMapNotSet(t, c) } func TestLinearSetResources(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType) // Create new resources w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w1) mustBlock(t, w1) w2 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "a": testResource("a"), @@ -327,9 +320,9 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "1", 2) // the version was only incremented once for all resources // Add another element and update the first, response should be different - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, sub, w1) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, sub, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "a": testResource("aa"), @@ -340,9 +333,9 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "2", 3) // Delete resource - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, streamState, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, sub, w1) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, streamState, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, sub, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "b": testResource("b"), @@ -370,49 +363,49 @@ func TestLinearGetResources(t *testing.T) { } func TestLinearVersionPrefix(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType, WithVersionPrefix("instance1-")) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) verifyResponse(t, w, "instance1-0", 0) require.NoError(t, c.UpdateResource("a", testResource("a"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) verifyResponse(t, w, "instance1-1", 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, sub, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) } func TestLinearDeletion(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) require.NoError(t, c.DeleteResource("a")) verifyResponse(t, w, "1", 0) checkWatchCount(t, c, "a", 0) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w) verifyResponse(t, w, "1", 1) checkWatchCount(t, c, "b", 0) require.NoError(t, c.DeleteResource("b")) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, sub, w) verifyResponse(t, w, "2", 0) checkWatchCount(t, c, "b", 0) } func TestLinearWatchTwo(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, sub, w) mustBlock(t, w) w1 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w1) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, sub, w1) mustBlock(t, w1) require.NoError(t, c.UpdateResource("a", testResource("aa"))) // should only get the modified resource @@ -421,20 +414,22 @@ func TestLinearWatchTwo(t *testing.T) { } func TestLinearCancel(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType) require.NoError(t, c.UpdateResource("a", testResource("a"))) // cancel watch-all w := make(chan Response, 1) - cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w) + cancel, err := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, sub, w) + require.NoError(t, err) mustBlock(t, w) checkWatchCount(t, c, "a", 1) cancel() checkWatchCount(t, c, "a", 0) // cancel watch for "a" - cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w) + cancel, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, sub, w) + require.NoError(t, err) mustBlock(t, w) checkWatchCount(t, c, "a", 1) cancel() @@ -444,10 +439,14 @@ func TestLinearCancel(t *testing.T) { w2 := make(chan Response, 1) w3 := make(chan Response, 1) w4 := make(chan Response, 1) - cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w) - cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w2) - cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w3) - cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w4) + cancel, err = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, sub, w) + require.NoError(t, err) + cancel2, err := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, sub, w2) + require.NoError(t, err) + cancel3, err := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, sub, w3) + require.NoError(t, err) + cancel4, err := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, sub, w4) + require.NoError(t, err) mustBlock(t, w) mustBlock(t, w2) mustBlock(t, w3) @@ -469,7 +468,7 @@ func TestLinearCancel(t *testing.T) { // TODO(mattklein123): This test requires GOMAXPROCS or -parallel >= 100. This should be // rewritten to not require that. This is not the case in the GH actions environment. func TestLinearConcurrentSetWatch(t *testing.T) { - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c := NewLinearCache(testType) n := 50 for i := 0; i < 2*n; i++ { @@ -489,7 +488,7 @@ func TestLinearConcurrentSetWatch(t *testing.T) { ResourceNames: []string{id, id2}, VersionInfo: "0", TypeUrl: testType, - }, streamState, value) + }, sub, value) // wait until all updates apply verifyResponse(t, value, "", 1) } @@ -500,13 +499,13 @@ func TestLinearConcurrentSetWatch(t *testing.T) { func TestLinearDeltaWildcard(t *testing.T) { c := NewLinearCache(testType) - state1 := stream.NewStreamState(true, map[string]string{}) + sub1 := stream.NewSubscription(true, map[string]string{}) w1 := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state1, w1) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub1, w1) mustBlockDelta(t, w1) - state2 := stream.NewStreamState(true, map[string]string{}) + sub2 := stream.NewSubscription(true, map[string]string{}) w2 := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state2, w2) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub2, w2) mustBlockDelta(t, w1) checkDeltaWatchCount(t, c, 2) @@ -530,17 +529,17 @@ func TestLinearDeltaExistingResources(t *testing.T) { err = c.UpdateResource("b", b) require.NoError(t, err) - state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"b": {}, "c": {}}) // watching b and c - not interested in a + sub := stream.NewSubscription(false, nil) + sub.SetSubscribedResources(map[string]struct{}{"b": {}, "c": {}}) // watching b and c - not interested in a w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, []string{}) - state = stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + sub = stream.NewSubscription(false, nil) + sub.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) } @@ -556,17 +555,17 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { err = c.UpdateResource("b", b) require.NoError(t, err) - state := stream.NewStreamState(false, map[string]string{"b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + sub := stream.NewSubscription(false, map[string]string{"b": hashB}) + sub.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) // b is up to date and shouldn't be returned - state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + sub = stream.NewSubscription(false, map[string]string{"a": hashA, "b": hashB}) + sub.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{{Priority: 10}}} // new version of b @@ -590,18 +589,18 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { // There is currently no delta watch checkVersionMapNotSet(t, c) - state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + sub := stream.NewSubscription(false, nil) + sub.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) checkVersionMapSet(t, c) - state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + sub = stream.NewSubscription(false, map[string]string{"a": hashA, "b": hashB}) + sub.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -626,17 +625,17 @@ func TestLinearDeltaResourceDelete(t *testing.T) { err = c.UpdateResource("b", b) require.NoError(t, err) - state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + sub := stream.NewSubscription(false, nil) + sub.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) - state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + sub = stream.NewSubscription(false, map[string]string{"a": hashA, "b": hashB}) + sub.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w = make(chan DeltaResponse, 1) - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) @@ -651,14 +650,14 @@ func TestLinearDeltaResourceDelete(t *testing.T) { func TestLinearDeltaMultiResourceUpdates(t *testing.T) { c := NewLinearCache(testType) - state := stream.NewStreamState(false, nil) - state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + sub := stream.NewSubscription(false, nil) + sub.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) w := make(chan DeltaResponse, 1) checkVersionMapNotSet(t, c) assert.Equal(t, 0, c.NumResources()) // Initial update - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) // The version map should now be created, even if empty @@ -673,10 +672,10 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil) checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetResourceVersions(resp.GetNextVersionMap()) + sub.SetReturnedResources(resp.GetNextVersionMap()) // Multiple updates - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update @@ -693,10 +692,10 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil) checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetResourceVersions(resp.GetNextVersionMap()) + sub.SetReturnedResources(resp.GetNextVersionMap()) // Update/add/delete - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update @@ -712,10 +711,10 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}}, []string{"b"}) checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetResourceVersions(resp.GetNextVersionMap()) + sub.SetReturnedResources(resp.GetNextVersionMap()) // Re-add previously deleted watched resource - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, sub, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{}} // recreate watched resource @@ -728,7 +727,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { validateDeltaResponse(t, resp, []resourceInfo{{"b", hashB}}, nil) // d is not watched and should not be returned checkVersionMapSet(t, c) assert.Equal(t, 2, c.NumResources()) - state.SetResourceVersions(resp.GetNextVersionMap()) + sub.SetReturnedResources(resp.GetNextVersionMap()) // Wildcard create/update createWildcardDeltaWatch(c, w) @@ -776,9 +775,9 @@ func TestLinearMixedWatches(t *testing.T) { require.NoError(t, err) assert.Equal(t, 2, c.NumResources()) - sotwState := stream.NewStreamState(false, nil) + sotwSub := stream.NewSubscription(false, nil) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwSub, w) mustBlock(t, w) checkVersionMapNotSet(t, c) @@ -792,16 +791,16 @@ func TestLinearMixedWatches(t *testing.T) { verifyResponse(t, w, c.getVersion(), 1) checkVersionMapNotSet(t, c) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwSub, w) mustBlock(t, w) checkVersionMapNotSet(t, c) - deltaState := stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) - deltaState.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) + deltaSub := stream.NewSubscription(false, map[string]string{"a": hashA, "b": hashB}) + deltaSub.SetSubscribedResources(map[string]struct{}{"a": {}, "b": {}}) wd := make(chan DeltaResponse, 1) // Initial update - c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, deltaState, wd) + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, deltaSub, wd) mustBlockDelta(t, wd) checkDeltaWatchCount(t, c, 1) checkVersionMapSet(t, c) @@ -829,9 +828,10 @@ func TestLinearSotwWatches(t *testing.T) { // An update is done for the three objects in a row // If the watches are no properly purged, all three updates will send responses in the channel, but only the first one is tracked // The buffer will therefore saturate and the third request will deadlock the entire cache as occurring under the mutex - sotwState := stream.NewStreamState(false, nil) + sotwSub := stream.NewSubscription(false, nil) w := make(chan Response, 1) - _ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "c"}, TypeUrl: testType, VersionInfo: cache.getVersion()}, sotwState, w) + _, err = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "c"}, TypeUrl: testType, VersionInfo: cache.getVersion()}, sotwSub, w) + require.NoError(t, err) mustBlock(t, w) checkVersionMapNotSet(t, cache) @@ -854,7 +854,7 @@ func TestLinearSotwWatches(t *testing.T) { // c no longer watched w = make(chan Response, 1) - _ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: cache.getVersion()}, sotwState, w) + _, err = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: cache.getVersion()}, sotwSub, w) require.NoError(t, err) mustBlock(t, w) checkVersionMapNotSet(t, cache) @@ -873,7 +873,7 @@ func TestLinearSotwWatches(t *testing.T) { checkVersionMapNotSet(t, cache) w = make(chan Response, 1) - _ = cache.CreateWatch(&Request{ResourceNames: []string{"c"}, TypeUrl: testType, VersionInfo: cache.getVersion()}, sotwState, w) + _, err = cache.CreateWatch(&Request{ResourceNames: []string{"c"}, TypeUrl: testType, VersionInfo: cache.getVersion()}, sotwSub, w) require.NoError(t, err) mustBlock(t, w) checkVersionMapNotSet(t, cache) @@ -904,28 +904,31 @@ func TestLinearSotwWatches(t *testing.T) { assert.Equal(t, 3, cache.NumResources()) // Non-wildcard request - nonWildcardState := stream.NewStreamState(false, nil) + nonWildcardSub := stream.NewSubscription(false, nil) w1 := make(chan Response, 1) - _ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, nonWildcardState, w1) + _, err := cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, nonWildcardSub, w1) + require.NoError(t, err) mustBlock(t, w1) checkVersionMapNotSet(t, cache) // wildcard request - wildcardState := stream.NewStreamState(true, nil) + wildcardSub := stream.NewSubscription(true, nil) w2 := make(chan Response, 1) - _ = cache.CreateWatch(&Request{ResourceNames: nil, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, wildcardState, w2) + _, err = cache.CreateWatch(&Request{ResourceNames: nil, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, wildcardSub, w2) + require.NoError(t, err) mustBlock(t, w2) checkVersionMapNotSet(t, cache) // request not requesting b - otherState := stream.NewStreamState(false, nil) + otherSub := stream.NewSubscription(false, nil) w3 := make(chan Response, 1) - _ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "c", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, otherState, w3) + _, err = cache.CreateWatch(&Request{ResourceNames: []string{"a", "c", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, otherSub, w3) + require.NoError(t, err) mustBlock(t, w3) checkVersionMapNotSet(t, cache) b.AltStatName = "othername" - err := cache.UpdateResources(map[string]types.Resource{"b": b}, nil) + err = cache.UpdateResources(map[string]types.Resource{"b": b}, nil) require.NoError(t, err) // Other watch has not triggered @@ -936,10 +939,12 @@ func TestLinearSotwWatches(t *testing.T) { // Recreate the watches w1 = make(chan Response, 1) - _ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, nonWildcardState, w1) + _, err = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, nonWildcardSub, w1) + require.NoError(t, err) mustBlock(t, w1) w2 = make(chan Response, 1) - _ = cache.CreateWatch(&Request{ResourceNames: nil, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, wildcardState, w2) + _, err = cache.CreateWatch(&Request{ResourceNames: nil, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, wildcardSub, w2) + require.NoError(t, err) mustBlock(t, w2) // Update d, new resource in the cache diff --git a/pkg/cache/v3/mux.go b/pkg/cache/v3/mux.go index 9fdfb090d6..2a7a84a49d 100644 --- a/pkg/cache/v3/mux.go +++ b/pkg/cache/v3/mux.go @@ -17,8 +17,7 @@ package cache import ( "context" "errors" - - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" + "fmt" ) // MuxCache multiplexes across several caches using a classification function. @@ -37,24 +36,22 @@ type MuxCache struct { var _ Cache = &MuxCache{} -func (mux *MuxCache) CreateWatch(request *Request, state stream.StreamState, value chan Response) func() { +func (mux *MuxCache) CreateWatch(request *Request, sub Subscription, value chan Response) (func(), error) { key := mux.Classify(request) cache, exists := mux.Caches[key] if !exists { - value <- nil - return nil + return nil, fmt.Errorf("no cache defined for key %s", key) } - return cache.CreateWatch(request, state, value) + return cache.CreateWatch(request, sub, value) } -func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) { key := mux.ClassifyDelta(request) cache, exists := mux.Caches[key] if !exists { - value <- nil - return nil + return nil, fmt.Errorf("no cache defined for key %s", key) } - return cache.CreateDeltaWatch(request, state, value) + return cache.CreateDeltaWatch(request, sub, value) } func (mux *MuxCache) Fetch(context.Context, *Request) (Response, error) { diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index ebf63f5b6f..064cb40256 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -23,7 +23,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/log" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // ResourceSnapshot is an abstract snapshot of a collection of resources that @@ -310,7 +309,7 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu snapshot, watch.Request, watch.Response, - watch.StreamState, + watch.subscription, ) if err != nil { return err @@ -328,7 +327,7 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu snapshot, watch.Request, watch.Response, - watch.StreamState, + watch.subscription, ) if err != nil { return err @@ -385,7 +384,7 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL) // CreateWatch returns a watch for an xDS request. A nil function may be // returned if an error occurs. -func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() { +func (cache *snapshotCache) CreateWatch(request *Request, sub Subscription, value chan Response) (func(), error) { nodeID := cache.hash.ID(request.GetNode()) cache.mu.Lock() @@ -409,7 +408,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str } if exists { - knownResourceNames := streamState.GetKnownResourceNames(request.GetTypeUrl()) + knownResourceNames := sub.ReturnedResources() diff := []string{} for _, r := range request.GetResourceNames() { if _, ok := knownResourceNames[r]; !ok { @@ -427,9 +426,9 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil { cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.GetTypeUrl(), request.GetResourceNames(), nodeID, err) - return nil + return nil, fmt.Errorf("failed to send the response: %w", err) } - return func() {} + return func() {}, nil } } } @@ -442,7 +441,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str info.mu.Lock() info.watches[watchID] = ResponseWatch{Request: request, Response: value} info.mu.Unlock() - return cache.cancelWatch(nodeID, watchID) + return cache.cancelWatch(nodeID, watchID), nil } // otherwise, the watch may be responded immediately @@ -450,10 +449,10 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil { cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.GetTypeUrl(), request.GetResourceNames(), nodeID, err) - return nil + return nil, fmt.Errorf("failed to send the response: %w", err) } - return func() {} + return func() {}, nil } func (cache *snapshotCache) nextWatchID() int64 { @@ -525,7 +524,7 @@ func createResponse(ctx context.Context, request *Request, resources map[string] } // CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache. -func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { +func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) { nodeID := cache.hash.ID(request.GetNode()) t := request.GetTypeUrl() @@ -554,7 +553,7 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream if err != nil { cache.log.Errorf("failed to compute version for snapshot resources inline: %s", err) } - response, err := cache.respondDelta(context.Background(), snapshot, request, value, state) + response, err := cache.respondDelta(context.Background(), snapshot, request, value, sub) if err != nil { cache.log.Errorf("failed to respond with delta response: %s", err) } @@ -566,21 +565,21 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream watchID := cache.nextDeltaWatchID() if exists { - cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, state.GetSubscribedResourceNames(), nodeID, snapshot.GetVersion(t)) + cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q, version %q", watchID, t, sub.SubscribedResources(), nodeID, snapshot.GetVersion(t)) } else { - cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, state.GetSubscribedResourceNames(), nodeID) + cache.log.Infof("open delta watch ID:%d for %s Resources:%v from nodeID: %q", watchID, t, sub.SubscribedResources(), nodeID) } - info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state}) - return cache.cancelDeltaWatch(nodeID, watchID) + info.setDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, subscription: sub}) + return cache.cancelDeltaWatch(nodeID, watchID), nil } - return nil + return nil, nil } // Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. -func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) (*RawDeltaResponse, error) { - resp := createDeltaResponse(ctx, request, state, resourceContainer{ +func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, sub Subscription) (*RawDeltaResponse, error) { + resp := createDeltaResponse(ctx, request, sub, resourceContainer{ resourceMap: snapshot.GetResources(request.GetTypeUrl()), versionMap: snapshot.GetVersionMap(request.GetTypeUrl()), systemVersion: snapshot.GetVersion(request.GetTypeUrl()), @@ -589,10 +588,10 @@ func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceS // Only send a response if there were changes // We want to respond immediately for the first wildcard request in a stream, even if the response is empty // otherwise, envoy won't complete initialization - if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsWildcard() && state.IsFirst()) { + if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (sub.IsWildcard() && request.ResponseNonce == "") { if cache.log != nil { cache.log.Debugf("node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", - request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard()) + request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, sub.IsWildcard()) } select { case value <- resp: diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index eba4cf96d9..cb506fe3b3 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -91,10 +91,22 @@ type logger struct { t *testing.T } -func (log logger) Debugf(format string, args ...interface{}) { log.t.Logf(format, args...) } -func (log logger) Infof(format string, args ...interface{}) { log.t.Logf(format, args...) } -func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format, args...) } -func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) } +func (log logger) Debugf(format string, args ...interface{}) { + log.t.Helper() + log.t.Logf(format, args...) +} +func (log logger) Infof(format string, args ...interface{}) { + log.t.Helper() + log.t.Logf(format, args...) +} +func (log logger) Warnf(format string, args ...interface{}) { + log.t.Helper() + log.t.Logf(format, args...) +} +func (log logger) Errorf(format string, args ...interface{}) { + log.t.Helper() + log.t.Logf(format, args...) +} func TestSnapshotCacheWithTTL(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -119,13 +131,14 @@ func TestSnapshotCacheWithTTL(t *testing.T) { wg := sync.WaitGroup{} // All the resources should respond immediately when version is not up to date. - streamState := stream.NewStreamState(false, map[string]string{}) + subs := map[string]stream.Subscription{} for _, typ := range testTypes { + sub := stream.NewSubscription(false, map[string]string{}) wg.Add(1) t.Run(typ, func(t *testing.T) { defer wg.Done() value := make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, sub, value) select { case out := <-value: if gotVersion, _ := out.GetVersion(); gotVersion != fixture.version { @@ -134,8 +147,14 @@ func TestSnapshotCacheWithTTL(t *testing.T) { if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTTL(typ)) { t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTTL.GetResourcesAndTTL(typ)) } - // Update streamState - streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().GetResourceNames()) + + returnedResources := make(map[string]string) + // Update sub to track what was returned + for _, resource := range out.GetRequest().GetResourceNames() { + returnedResources[resource] = fixture.version + } + sub.SetReturnedResources(returnedResources) + subs[typ] = sub case <-time.After(2 * time.Second): t.Errorf("failed to receive snapshot response") } @@ -153,9 +172,11 @@ func TestSnapshotCacheWithTTL(t *testing.T) { end := time.After(5 * time.Second) for { + sub := subs[typ] value := make(chan cache.Response, 1) - cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, - streamState, value) + cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, + sub, value) + require.NoError(t, err) select { case out := <-value: @@ -172,7 +193,13 @@ func TestSnapshotCacheWithTTL(t *testing.T) { updatesByType[typ]++ - streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().GetResourceNames()) + returnedResources := make(map[string]string) + // Update sub to track what was returned + for _, resource := range out.GetRequest().GetResourceNames() { + returnedResources[resource] = fixture.version + } + sub.SetReturnedResources(returnedResources) + subs[typ] = sub case <-end: cancel() return @@ -214,9 +241,9 @@ func TestSnapshotCache(t *testing.T) { // try to get endpoints with incorrect list of names // should not receive response value := make(chan cache.Response, 1) - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}, - streamState, value) + sub, value) select { case out := <-value: t.Errorf("watch for endpoints and mismatched names => got %v, want none", out) @@ -226,9 +253,9 @@ func TestSnapshotCache(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { value := make(chan cache.Response, 1) - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, - streamState, value) + sub, value) select { case out := <-value: snapshot := fixture.snapshot() @@ -279,10 +306,12 @@ func TestSnapshotCacheFetch(t *testing.T) { func TestSnapshotCacheWatch(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) watches := make(map[string]chan cache.Response) - streamState := stream.NewStreamState(false, map[string]string{}) + subs := map[string]stream.Subscription{} for _, typ := range testTypes { + sub := stream.NewSubscription(false, map[string]string{}) + subs[typ] = sub watches[typ] = make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, watches[typ]) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, sub, watches[typ]) } if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil { t.Fatal(err) @@ -298,7 +327,14 @@ func TestSnapshotCacheWatch(t *testing.T) { if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTTL(typ)) { t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTTL(typ)) } - streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().GetResourceNames()) + returnedResources := make(map[string]string) + // Update sub to track what was returned + for _, resource := range out.GetRequest().GetResourceNames() { + returnedResources[resource] = fixture.version + } + sub := subs[typ] + sub.SetReturnedResources(returnedResources) + subs[typ] = sub case <-time.After(time.Second): t.Fatal("failed to receive snapshot response") } @@ -309,7 +345,7 @@ func TestSnapshotCacheWatch(t *testing.T) { for _, typ := range testTypes { watches[typ] = make(chan cache.Response, 1) c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: fixture.version}, - streamState, watches[typ]) + subs[typ], watches[typ]) } if count := c.GetStatusInfo(key).GetNumWatches(); count != len(testTypes) { t.Errorf("watches should be created for the latest version: %d", count) @@ -354,11 +390,12 @@ func TestConcurrentSetWatch(t *testing.T) { t.Fatalf("failed to set snapshot %q: %s", id, err) } } else { - streamState := stream.NewStreamState(false, map[string]string{}) - cancel := c.CreateWatch(&discovery.DiscoveryRequest{ + sub := stream.NewSubscription(false, map[string]string{}) + cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{ Node: &core.Node{Id: id}, TypeUrl: rsrc.EndpointType, - }, streamState, value) + }, sub, value) + require.NoError(t, err) defer cancel() } }) @@ -367,10 +404,11 @@ func TestConcurrentSetWatch(t *testing.T) { func TestSnapshotCacheWatchCancel(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) - streamState := stream.NewStreamState(false, map[string]string{}) for _, typ := range testTypes { + sub := stream.NewSubscription(false, map[string]string{}) value := make(chan cache.Response, 1) - cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value) + cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, sub, value) + require.NoError(t, err) cancel() } // should be status info for the node @@ -394,9 +432,9 @@ func TestSnapshotCacheWatchTimeout(t *testing.T) { // Create a non-buffered channel that will block sends. watchCh := make(chan cache.Response) - streamState := stream.NewStreamState(false, map[string]string{}) + sub := stream.NewSubscription(false, map[string]string{}) c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: names[rsrc.EndpointType]}, - streamState, watchCh) + sub, watchCh) // The first time we set the snapshot without consuming from the blocking channel, so this should time out. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) @@ -451,7 +489,7 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Request resource with name=ClusterName go func() { c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}}, - stream.NewStreamState(false, map[string]string{}), watch) + stream.NewSubscription(false, map[string]string{}), watch) }() select { @@ -469,12 +507,12 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Request additional resource with name=clusterName2 for same version go func() { - state := stream.NewStreamState(false, map[string]string{}) - state.SetKnownResourceNames(rsrc.EndpointType, map[string]struct{}{clusterName: {}}) + sub := stream.NewSubscription(false, map[string]string{}) + sub.SetReturnedResources(map[string]string{clusterName: fixture.version}) c.CreateWatch(&discovery.DiscoveryRequest{ TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, ResourceNames: []string{clusterName, clusterName2}, - }, state, watch) + }, sub, watch) }() select { @@ -490,12 +528,14 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { } // Repeat request for with same version and make sure a watch is created - state := stream.NewStreamState(false, map[string]string{}) - state.SetKnownResourceNames(rsrc.EndpointType, map[string]struct{}{clusterName: {}, clusterName2: {}}) - if cancel := c.CreateWatch(&discovery.DiscoveryRequest{ + sub := stream.NewSubscription(false, map[string]string{}) + sub.SetReturnedResources(map[string]string{clusterName: fixture.version, clusterName2: fixture.version}) + cancel, err := c.CreateWatch(&discovery.DiscoveryRequest{ TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, ResourceNames: []string{clusterName, clusterName2}, - }, state, watch); cancel == nil { + }, sub, watch) + require.NoError(t, err) + if cancel == nil { t.Fatal("Should create a watch") } else { cancel() @@ -624,7 +664,7 @@ func TestAvertPanicForWatchOnNonExistentSnapshot(t *testing.T) { ResourceNames: []string{"rtds"}, TypeUrl: rsrc.RuntimeType, } - ss := stream.NewStreamState(false, map[string]string{"cluster": "abcdef"}) + ss := stream.NewSubscription(false, map[string]string{"cluster": "abcdef"}) responder := make(chan cache.Response) c.CreateWatch(req, ss, responder) diff --git a/pkg/cache/v3/status.go b/pkg/cache/v3/status.go index dca93e02ff..18465f6dc5 100644 --- a/pkg/cache/v3/status.go +++ b/pkg/cache/v3/status.go @@ -20,7 +20,6 @@ import ( "time" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // NodeHash computes string identifiers for Envoy nodes. @@ -101,8 +100,8 @@ type DeltaResponseWatch struct { // Response is the channel to push the delta responses to Response chan DeltaResponse - // VersionMap for the stream - StreamState stream.StreamState + // Subscription stores the current client subscription state. + subscription Subscription } // newStatusInfo initializes a status info data structure. diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index b570b19b27..a7925a3673 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -118,7 +118,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De watch := watches.deltaWatches[typ] watch.nonce = nonce - watch.state.SetResourceVersions(resp.GetNextVersionMap()) + watch.subscription.SetReturnedResources(resp.GetNextVersionMap()) watches.deltaWatches[typ] = watch return nil } @@ -210,15 +210,19 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De // We also set the stream as wildcard based on its legacy meaning (no resource name sent in resource_names_subscribe). // If the state starts with this legacy mode, adding new resources will not unsubscribe from wildcard. // It can still be done by explicitly unsubscribing from "*" - watch.state = stream.NewStreamState(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions()) + watch.subscription = stream.NewSubscription(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions()) } else { watch.Cancel() } - s.subscribe(req.GetResourceNamesSubscribe(), &watch.state) - s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state) + s.subscribe(req.GetResourceNamesSubscribe(), &watch.subscription) + s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.subscription) - watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watches.deltaMuxedResponses) + var err error + watch.cancel, err = s.cache.CreateDeltaWatch(req, watch.subscription, watches.deltaMuxedResponses) + if err != nil { + return err + } watches.deltaWatches[typeURL] = watch } } @@ -252,27 +256,27 @@ func (s *server) DeltaStreamHandler(str stream.DeltaStream, typeURL string) erro // When we subscribe, we just want to make the cache know we are subscribing to a resource. // Even if the stream is wildcard, we keep the list of explicitly subscribed resources as the wildcard subscription can be discarded later on. -func (s *server) subscribe(resources []string, streamState *stream.StreamState) { - sv := streamState.GetSubscribedResourceNames() +func (s *server) subscribe(resources []string, subscription *stream.Subscription) { + sv := subscription.SubscribedResources() for _, resource := range resources { if resource == "*" { - streamState.SetWildcard(true) + subscription.SetWildcard(true) continue } sv[resource] = struct{}{} } } -// Unsubscriptions remove resources from the stream's subscribed resource list. +// unsubscribe remove resources from the stream's subscribed resource list. // If a client explicitly unsubscribes from a wildcard request, the stream is updated and now watches only subscribed resources. -func (s *server) unsubscribe(resources []string, streamState *stream.StreamState) { - sv := streamState.GetSubscribedResourceNames() +func (s *server) unsubscribe(resources []string, subscription *stream.Subscription) { + sv := subscription.SubscribedResources() for _, resource := range resources { if resource == "*" { - streamState.SetWildcard(false) + subscription.SetWildcard(false) continue } - if _, ok := sv[resource]; ok && streamState.IsWildcard() { + if _, ok := sv[resource]; ok && subscription.IsWildcard() { // The XDS protocol states that: // * if a watch is currently wildcard // * a resource is explicitly unsubscribed by name @@ -281,7 +285,7 @@ func (s *server) unsubscribe(resources []string, streamState *stream.StreamState // To achieve that, we mark the resource as having been returned with an empty version. While creating the response, the cache will either: // * detect the version change, and return the resource (as an update) // * detect the resource deletion, and set it as removed in the response - streamState.GetResourceVersions()[resource] = "" + subscription.ReturnedResources()[resource] = "" } delete(sv, resource) } diff --git a/pkg/server/delta/v3/watches.go b/pkg/server/delta/v3/watches.go index 63c4c2d38d..98e4991b49 100644 --- a/pkg/server/delta/v3/watches.go +++ b/pkg/server/delta/v3/watches.go @@ -41,7 +41,7 @@ type watch struct { cancel func() nonce string - state stream.StreamState + subscription stream.Subscription } // Cancel calls terminate and cancel diff --git a/pkg/server/sotw/v3/ads.go b/pkg/server/sotw/v3/ads.go index bbb6dd4b20..80c3b87dd1 100644 --- a/pkg/server/sotw/v3/ads.go +++ b/pkg/server/sotw/v3/ads.go @@ -8,6 +8,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) // process handles a bi-di stream request @@ -101,14 +102,19 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe } } + typeURL := req.GetTypeUrl() + subscription, ok := sw.subscriptions[typeURL] + if !ok { + subscription = stream.NewSubscription(len(req.GetResourceNames()) == 0, nil) + } + if lastResponse, ok := sw.lastDiscoveryResponses[req.GetTypeUrl()]; ok { if lastResponse.nonce == "" || lastResponse.nonce == nonce { // Let's record Resource names that a client has received. - sw.streamState.SetKnownResourceNames(req.GetTypeUrl(), lastResponse.resources) + subscription.SetReturnedResources(lastResponse.resources) } } - typeURL := req.GetTypeUrl() // Use the multiplexed channel for new watches. responder := sw.watches.responders[resource.AnyType].response if w, ok := sw.watches.responders[typeURL]; ok { @@ -122,19 +128,29 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe return err } + cancel, err := s.cache.CreateWatch(req, subscription, responder) + if err != nil { + return err + } sw.watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, sw.streamState, responder), + cancel: cancel, response: responder, }) } } else { // No pre-existing watch exists, let's create one. // We need to precompute the watches first then open a watch in the cache. + cancel, err := s.cache.CreateWatch(req, subscription, responder) + if err != nil { + return err + } sw.watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, sw.streamState, responder), + cancel: cancel, response: responder, }) } + + sw.subscriptions[typeURL] = subscription } } } diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index f5be0c57a9..e8343000f7 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -91,7 +91,7 @@ type streamWrapper struct { // The below fields are used for tracking resource // cache state and should be maintained per stream. - streamState stream.StreamState + subscriptions map[string]stream.Subscription lastDiscoveryResponses map[string]lastDiscoveryResponse } @@ -110,12 +110,17 @@ func (s *streamWrapper) send(resp cache.Response) (string, error) { // increment nonce and convert it to base10 out.Nonce = strconv.FormatInt(atomic.AddInt64(&s.nonce, 1), 10) + version, err := resp.GetVersion() + if err != nil { + return "", err + } + lastResponse := lastDiscoveryResponse{ nonce: out.GetNonce(), - resources: make(map[string]struct{}), + resources: make(map[string]string), } for _, r := range resp.GetRequest().GetResourceNames() { - lastResponse.resources[r] = struct{}{} + lastResponse.resources[r] = version } s.lastDiscoveryResponses[resp.GetRequest().GetTypeUrl()] = lastResponse @@ -141,7 +146,7 @@ func (s *streamWrapper) shutdown() { // regardless current snapshot version (even if it is not changed yet) type lastDiscoveryResponse struct { nonce string - resources map[string]struct{} + resources map[string]string } // StreamHandler converts a blocking read call to channels and initiates stream processing diff --git a/pkg/server/sotw/v3/xds.go b/pkg/server/sotw/v3/xds.go index 3b24dec409..7869e66fc8 100644 --- a/pkg/server/sotw/v3/xds.go +++ b/pkg/server/sotw/v3/xds.go @@ -27,7 +27,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque // a collection of stack allocated watches per request type. watches: newWatches(), - streamState: stream.NewStreamState(false, map[string]string{}), + subscriptions: make(map[string]stream.Subscription), lastDiscoveryResponses: make(map[string]lastDiscoveryResponse), } @@ -116,14 +116,19 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque } } + typeURL := req.GetTypeUrl() + subscription, ok := sw.subscriptions[typeURL] + if !ok { + subscription = stream.NewSubscription(len(req.GetResourceNames()) == 0, nil) + } + if lastResponse, ok := sw.lastDiscoveryResponses[req.GetTypeUrl()]; ok { if lastResponse.nonce == "" || lastResponse.nonce == nonce { // Let's record Resource names that a client has received. - sw.streamState.SetKnownResourceNames(req.GetTypeUrl(), lastResponse.resources) + subscription.SetReturnedResources(lastResponse.resources) } } - typeURL := req.GetTypeUrl() responder := make(chan cache.Response, 1) if w, ok := sw.watches.responders[typeURL]; ok { // We've found a pre-existing watch, lets check and update if needed. @@ -131,20 +136,30 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque if w.nonce == "" || w.nonce == nonce { w.close() + cancel, err := s.cache.CreateWatch(req, subscription, responder) + if err != nil { + return err + } sw.watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, sw.streamState, responder), + cancel: cancel, response: responder, }) } } else { // No pre-existing watch exists, let's create one. // We need to precompute the watches first then open a watch in the cache. + cancel, err := s.cache.CreateWatch(req, subscription, responder) + if err != nil { + return err + } sw.watches.addWatch(typeURL, &watch{ - cancel: s.cache.CreateWatch(req, sw.streamState, responder), + cancel: cancel, response: responder, }) } + sw.subscriptions[typeURL] = subscription + // Recompute the dynamic select cases for this stream. sw.watches.recompute(s.ctx, reqCh) default: diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index 1664a941e0..b999d40c78 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -21,119 +21,3 @@ type DeltaStream interface { Send(*discovery.DeltaDiscoveryResponse) error Recv() (*discovery.DeltaDiscoveryRequest, error) } - -// StreamState will keep track of resource cache state per type on a stream. -type StreamState struct { // nolint:golint,revive - // Indicates whether the delta stream currently has a wildcard watch - wildcard bool - - // Provides the list of resources explicitly requested by the client - // This list might be non-empty even when set as wildcard - subscribedResourceNames map[string]struct{} - - // ResourceVersions contains a hash of the resource as the value and the resource name as the key. - // This field stores the last state sent to the client. - resourceVersions map[string]string - - // knownResourceNames contains resource names that a client has received previously (SOTW). - knownResourceNames map[string]map[string]struct{} - - // First indicates whether the StreamState has been modified since its creation - first bool - - // Ordered indicates whether we want an ordered ADS stream or not - ordered bool -} - -// NewStreamState initializes a stream state. -func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState { - state := StreamState{ - wildcard: wildcard, - subscribedResourceNames: map[string]struct{}{}, - resourceVersions: initialResourceVersions, - first: true, - knownResourceNames: map[string]map[string]struct{}{}, - ordered: false, // Ordered comes from the first request since that's when we discover if they want ADS - } - - if initialResourceVersions == nil { - state.resourceVersions = make(map[string]string) - } - - return state -} - -// GetSubscribedResourceNames returns the list of resources currently explicitly subscribed to -// If the request is set to wildcard it may be empty -// Currently populated only when using delta-xds -func (s *StreamState) GetSubscribedResourceNames() map[string]struct{} { - return s.subscribedResourceNames -} - -// SetSubscribedResourceNames is setting the list of resources currently explicitly subscribed to -// It is decorrelated from the wildcard state of the stream -// Currently used only when using delta-xds -func (s *StreamState) SetSubscribedResourceNames(subscribedResourceNames map[string]struct{}) { - s.subscribedResourceNames = subscribedResourceNames -} - -// WatchesResources returns whether at least one of the resource provided is currently watch by the stream -// It is currently only applicable to delta-xds -// If the request is wildcard, it will always return true -// Otherwise it will compare the provided resources to the list of resources currently subscribed -func (s *StreamState) WatchesResources(resourceNames map[string]struct{}) bool { - if s.IsWildcard() { - return true - } - for resourceName := range resourceNames { - if _, ok := s.subscribedResourceNames[resourceName]; ok { - return true - } - } - return false -} - -func (s *StreamState) SetWildcard(wildcard bool) { - s.wildcard = wildcard -} - -// GetResourceVersions returns a map of current resources grouped by type URL. -func (s *StreamState) GetResourceVersions() map[string]string { - return s.resourceVersions -} - -// SetResourceVersions sets a list of resource versions by type URL and removes the flag -// of "first" since we can safely assume another request has come through the stream. -func (s *StreamState) SetResourceVersions(resourceVersions map[string]string) { - s.first = false - s.resourceVersions = resourceVersions -} - -// IsFirst returns whether or not the state of the stream is based upon the initial request. -func (s *StreamState) IsFirst() bool { - return s.first -} - -// IsWildcard returns whether or not an xDS client requested in wildcard mode on the initial request. -func (s *StreamState) IsWildcard() bool { - return s.wildcard -} - -// GetKnownResourceNames returns the current known list of resources on a SOTW stream. -func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} { - return s.knownResourceNames[url] -} - -// SetKnownResourceNames sets a list of resource names in a stream utilizing the SOTW protocol. -func (s *StreamState) SetKnownResourceNames(url string, names map[string]struct{}) { - s.knownResourceNames[url] = names -} - -// SetKnownResourceNamesAsList is a helper function to set resource names as a slice input. -func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) { - m := map[string]struct{}{} - for _, name := range names { - m[name] = struct{}{} - } - s.knownResourceNames[url] = m -} diff --git a/pkg/server/stream/v3/subscription.go b/pkg/server/stream/v3/subscription.go new file mode 100644 index 0000000000..a4b9f5959c --- /dev/null +++ b/pkg/server/stream/v3/subscription.go @@ -0,0 +1,85 @@ +package stream + +// Subscription stores the server view of a given type subscription in a stream. +type Subscription struct { + // wildcard indicates if the subscription currently has a wildcard watch. + wildcard bool + + // subscribedResourceNames provides the resources explicitly requested by the client + // This list might be non-empty even when set as wildcard. + subscribedResourceNames map[string]struct{} + + // returnedResources contains the resources acknowledged by the client and the acknowledged versions. + returnedResources map[string]string +} + +// NewSubscription initializes a subscription state. +func NewSubscription(wildcard bool, initialResourceVersions map[string]string) Subscription { + state := Subscription{ + wildcard: wildcard, + subscribedResourceNames: map[string]struct{}{}, + returnedResources: initialResourceVersions, + } + + if initialResourceVersions == nil { + state.returnedResources = make(map[string]string) + } + + return state +} + +// SubscribedResources returns the list of resources currently explicitly subscribed to +// If the request is set to wildcard it may be empty +// Can only be used for delta watches +// TODO(valerian-roche): set those resources properly for sotw subscriptions +func (s Subscription) SubscribedResources() map[string]struct{} { + return s.subscribedResourceNames +} + +func (s *Subscription) SetSubscribedResources(resources map[string]struct{}) { + if resources != nil { + s.subscribedResourceNames = resources + } else { + s.subscribedResourceNames = make(map[string]struct{}) + } +} + +// IsWildcard returns whether or not the subscription currently has a wildcard watch +// Can only be used for delta watches +// TODO(valerian-roche): set those resources properly for sotw subscriptions +func (s Subscription) IsWildcard() bool { + return s.wildcard +} + +// SetWildcard sets whether the subscription is using a wildcard watch, whether legacy or not +func (s *Subscription) SetWildcard(wildcard bool) { + s.wildcard = wildcard +} + +// WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription. +// If the request is wildcard, it will always return true, +// otherwise it will compare the provided resources to the list of resources currently subscribed +// Can only be used for delta watches +func (s Subscription) WatchesResources(resourceNames map[string]struct{}) bool { + if s.wildcard { + return true + } + for resourceName := range resourceNames { + if _, ok := s.subscribedResourceNames[resourceName]; ok { + return true + } + } + return false +} + +// ReturnedResources returns the list of resources returned to the client +// and their version +func (s Subscription) ReturnedResources() map[string]string { + return s.returnedResources +} + +// SetReturnedResources sets a list of resource versions currently known by the client +// The cache can use this state to compute resources added/updated/deleted +func (s *Subscription) SetReturnedResources(resourceVersions map[string]string) { + s.returnedResources = resourceVersions +} diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 72c2b74075..324d3cb430 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -16,12 +16,11 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) -func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state stream.StreamState, out chan cache.DeltaResponse) func() { +func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, sub cache.Subscription, out chan cache.DeltaResponse) (func(), error) { config.deltaCounts[req.GetTypeUrl()] = config.deltaCounts[req.GetTypeUrl()] + 1 // This is duplicated from pkg/cache/v3/delta.go as private there @@ -37,8 +36,8 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR // If we are handling a wildcard request, we want to respond with all resources switch { - case state.IsWildcard(): - if len(state.GetResourceVersions()) == 0 { + case sub.IsWildcard(): + if len(sub.ReturnedResources()) == 0 { filtered = make([]types.Resource, 0, len(resourceMap)) } nextVersionMap = make(map[string]string, len(resourceMap)) @@ -47,24 +46,24 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR // we can just set it here to be used for comparison later version := versionMap[name] nextVersionMap[name] = version - prevVersion, found := state.GetResourceVersions()[name] + prevVersion, found := sub.ReturnedResources()[name] if !found || (prevVersion != version) { filtered = append(filtered, r) } } // Compute resources for removal - for name := range state.GetResourceVersions() { + for name := range sub.ReturnedResources() { if _, ok := resourceMap[name]; !ok { toRemove = append(toRemove, name) } } default: - nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames())) + nextVersionMap = make(map[string]string, len(sub.SubscribedResources())) // state.GetResourceVersions() may include resources no longer subscribed // In the current code this gets silently cleaned when updating the version map - for name := range state.GetSubscribedResourceNames() { - prevVersion, found := state.GetResourceVersions()[name] + for name := range sub.SubscribedResources() { + prevVersion, found := sub.ReturnedResources()[name] if r, ok := resourceMap[name]; ok { nextVersion := versionMap[name] if prevVersion != nextVersion { @@ -89,10 +88,10 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR config.deltaWatches++ return func() { config.deltaWatches-- - } + }, nil } - return nil + return nil, nil } type mockDeltaStream struct { diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index d3c4e0f81b..fc978c7587 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -34,7 +34,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/v3" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) @@ -50,7 +49,7 @@ type mockConfigWatcher struct { mu *sync.RWMutex } -func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, _ stream.StreamState, out chan cache.Response) func() { +func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, _ cache.Subscription, out chan cache.Response) (func(), error) { config.counts[req.GetTypeUrl()] = config.counts[req.GetTypeUrl()] + 1 if len(config.responses[req.GetTypeUrl()]) > 0 { out <- config.responses[req.GetTypeUrl()][0] @@ -59,9 +58,9 @@ func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, _ config.watches++ return func() { config.watches-- - } + }, nil } - return nil + return nil, nil } func (config *mockConfigWatcher) Fetch(_ context.Context, req *discovery.DiscoveryRequest) (cache.Response, error) {