diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index 54d5875373..13b6e9d523 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -51,7 +51,7 @@ type ConfigWatcher interface { // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateWatch(*Request, chan Response) (cancel func()) + CreateWatch(*Request, stream.StreamState, chan Response) (cancel func()) // CreateDeltaWatch returns a new open incremental xDS watch. // diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 9d92a28946..7994175254 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -260,7 +260,7 @@ func (cache *LinearCache) GetResources() map[string]types.Resource { return resources } -func (cache *LinearCache) CreateWatch(request *Request, value chan Response) func() { +func (cache *LinearCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() { if request.TypeUrl != cache.typeURL { value <- nil return nil diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 6a2f5d719b..6755ed921e 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -158,15 +158,17 @@ func hashResource(t *testing.T, resource types.Resource) string { } func TestLinearInitialResources(t *testing.T) { + streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, streamState, w) verifyResponse(t, w, "0", 1) - c.CreateWatch(&Request{TypeUrl: testType}, w) + c.CreateWatch(&Request{TypeUrl: testType}, streamState, w) verifyResponse(t, w, "0", 2) } func TestLinearCornerCases(t *testing.T) { + streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType) err := c.UpdateResource("a", nil) if err == nil { @@ -174,7 +176,7 @@ func TestLinearCornerCases(t *testing.T) { } // create an incorrect type URL request w := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: "test"}, w) + c.CreateWatch(&Request{TypeUrl: "test"}, streamState, w) select { case r := <-w: if r != nil { @@ -186,15 +188,16 @@ func TestLinearCornerCases(t *testing.T) { } func TestLinearBasic(t *testing.T) { + streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType) // Create watches before a resource is ready w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) mustBlock(t, w1) w := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 2) checkWatchCount(t, c, "b", 1) @@ -205,31 +208,32 @@ func TestLinearBasic(t *testing.T) { verifyResponse(t, w, "1", 1) // Request again, should get same response - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) // Add another element and update the first, response should be different require.NoError(t, c.UpdateResource("b", testResource("b"))) require.NoError(t, c.UpdateResource("a", testResource("aa"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) verifyResponse(t, w, "3", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) verifyResponse(t, w, "3", 2) } func TestLinearSetResources(t *testing.T) { + streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType) // Create new resources w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) mustBlock(t, w1) w2 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "a": testResource("a"), @@ -239,9 +243,9 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "1", 2) // the version was only incremented once for all resources // Add another element and update the first, response should be different - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w1) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "a": testResource("aa"), @@ -252,9 +256,9 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "2", 3) // Delete resource - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, streamState, w1) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, streamState, w2) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "b": testResource("b"), @@ -282,46 +286,49 @@ func TestLinearGetResources(t *testing.T) { } func TestLinearVersionPrefix(t *testing.T) { + streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithVersionPrefix("instance1-")) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) verifyResponse(t, w, "instance1-0", 0) require.NoError(t, c.UpdateResource("a", testResource("a"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) verifyResponse(t, w, "instance1-1", 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) } func TestLinearDeletion(t *testing.T) { + streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) require.NoError(t, c.DeleteResource("a")) verifyResponse(t, w, "1", 0) checkWatchCount(t, c, "a", 0) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) verifyResponse(t, w, "1", 1) checkWatchCount(t, c, "b", 0) require.NoError(t, c.DeleteResource("b")) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w) verifyResponse(t, w, "2", 0) checkWatchCount(t, c, "b", 0) } func TestLinearWatchTwo(t *testing.T) { + streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w) mustBlock(t, w) w1 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w1) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w1) mustBlock(t, w1) require.NoError(t, c.UpdateResource("a", testResource("aa"))) // should only get the modified resource @@ -330,19 +337,20 @@ func TestLinearWatchTwo(t *testing.T) { } func TestLinearCancel(t *testing.T) { + streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType) require.NoError(t, c.UpdateResource("a", testResource("a"))) // cancel watch-all w := make(chan Response, 1) - cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w) + cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) cancel() checkWatchCount(t, c, "a", 0) // cancel watch for "a" - cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w) + cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) cancel() @@ -352,10 +360,10 @@ func TestLinearCancel(t *testing.T) { w2 := make(chan Response, 1) w3 := make(chan Response, 1) w4 := make(chan Response, 1) - cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w) - cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, w2) - cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w3) - cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w4) + cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w) + cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w2) + cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w3) + cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w4) mustBlock(t, w) mustBlock(t, w2) mustBlock(t, w3) @@ -377,6 +385,7 @@ func TestLinearCancel(t *testing.T) { // TODO(mattklein123): This test requires GOMAXPROCS or -parallel >= 100. This should be // rewritten to not require that. This is not the case in the GH actions environment. func TestLinearConcurrentSetWatch(t *testing.T) { + streamState := stream.NewStreamState(false, map[string]string{}) c := NewLinearCache(testType) n := 50 for i := 0; i < 2*n; i++ { @@ -396,7 +405,7 @@ func TestLinearConcurrentSetWatch(t *testing.T) { ResourceNames: []string{id, id2}, VersionInfo: "0", TypeUrl: testType, - }, value) + }, streamState, value) // wait until all updates apply verifyResponse(t, value, "", 1) } diff --git a/pkg/cache/v3/mux.go b/pkg/cache/v3/mux.go index d0ce33e1c6..db5a65d0a7 100644 --- a/pkg/cache/v3/mux.go +++ b/pkg/cache/v3/mux.go @@ -37,14 +37,14 @@ type MuxCache struct { var _ Cache = &MuxCache{} -func (mux *MuxCache) CreateWatch(request *Request, value chan Response) func() { +func (mux *MuxCache) CreateWatch(request *Request, state stream.StreamState, value chan Response) func() { key := mux.Classify(request) cache, exists := mux.Caches[key] if !exists { value <- nil return nil } - return cache.CreateWatch(request, value) + return cache.CreateWatch(request, state, value) } func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 10da0e3234..be7d565a21 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -289,7 +289,7 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL) } // CreateWatch returns a watch for an xDS request. -func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) func() { +func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() { nodeID := cache.hash.ID(request.Node) cache.mu.Lock() @@ -309,6 +309,32 @@ func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) f snapshot, exists := cache.snapshots[nodeID] version := snapshot.GetVersion(request.TypeUrl) + if exists { + knownResourceNames := streamState.GetKnownResourceNames(request.TypeUrl) + diff := []string{} + for _, r := range request.ResourceNames { + if _, ok := knownResourceNames[r]; !ok { + diff = append(diff, r) + } + } + + cache.log.Debugf("nodeID %q requested %s%v and known %v. Diff %v", nodeID, + request.TypeUrl, request.ResourceNames, knownResourceNames, diff) + + if len(diff) > 0 { + resources := snapshot.GetResourcesAndTTL(request.TypeUrl) + for _, name := range diff { + if _, exists := resources[name]; exists { + if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil { + cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl, + request.ResourceNames, nodeID, err) + } + return nil + } + } + } + } + // if the requested version is up-to-date or missing a response, leave an open watch if !exists || request.VersionInfo == version { watchID := cache.nextWatchID() @@ -322,7 +348,10 @@ func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) f // otherwise, the watch may be responded immediately resources := snapshot.GetResourcesAndTTL(request.TypeUrl) - _ = cache.respond(context.Background(), request, value, resources, version, false) + if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil { + cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl, + request.ResourceNames, nodeID, err) + } return nil } diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index 487ec17241..42c1227f12 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -29,6 +29,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) @@ -119,12 +120,13 @@ func TestSnapshotCacheWithTTL(t *testing.T) { wg := sync.WaitGroup{} // All the resources should respond immediately when version is not up to date. + streamState := stream.NewStreamState(false, map[string]string{}) for _, typ := range testTypes { wg.Add(1) t.Run(typ, func(t *testing.T) { defer wg.Done() value := make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, value) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value) select { case out := <-value: if gotVersion, _ := out.GetVersion(); gotVersion != version { @@ -133,6 +135,8 @@ func TestSnapshotCacheWithTTL(t *testing.T) { if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTTL(typ)) { t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTTL.GetResourcesAndTTL(typ)) } + // Update streamState + streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().GetResourceNames()) case <-time.After(2 * time.Second): t.Errorf("failed to receive snapshot response") } @@ -151,7 +155,8 @@ func TestSnapshotCacheWithTTL(t *testing.T) { end := time.After(5 * time.Second) for { value := make(chan cache.Response, 1) - cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version}, value) + cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version}, + streamState, value) select { case out := <-value: @@ -167,6 +172,8 @@ func TestSnapshotCacheWithTTL(t *testing.T) { } updatesByType[typ]++ + + streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().ResourceNames) case <-end: cancel() return @@ -208,7 +215,9 @@ func TestSnapshotCache(t *testing.T) { // try to get endpoints with incorrect list of names // should not receive response value := make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}, value) + streamState := stream.NewStreamState(false, map[string]string{}) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}, + streamState, value) select { case out := <-value: t.Errorf("watch for endpoints and mismatched names => got %v, want none", out) @@ -218,7 +227,9 @@ func TestSnapshotCache(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { value := make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, value) + streamState := stream.NewStreamState(false, map[string]string{}) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, + streamState, value) select { case out := <-value: if gotVersion, _ := out.GetVersion(); gotVersion != version { @@ -268,9 +279,10 @@ func TestSnapshotCacheFetch(t *testing.T) { func TestSnapshotCacheWatch(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) watches := make(map[string]chan cache.Response) + streamState := stream.NewStreamState(false, map[string]string{}) for _, typ := range testTypes { watches[typ] = make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, watches[typ]) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, watches[typ]) } if err := c.SetSnapshot(context.Background(), key, snapshot); err != nil { t.Fatal(err) @@ -285,6 +297,7 @@ func TestSnapshotCacheWatch(t *testing.T) { if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTTL(typ)) { t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTTL(typ)) } + streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().GetResourceNames()) case <-time.After(time.Second): t.Fatal("failed to receive snapshot response") } @@ -294,7 +307,8 @@ func TestSnapshotCacheWatch(t *testing.T) { // open new watches with the latest version for _, typ := range testTypes { watches[typ] = make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version}, watches[typ]) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version}, + streamState, watches[typ]) } if count := c.GetStatusInfo(key).GetNumWatches(); count != len(testTypes) { t.Errorf("watches should be created for the latest version: %d", count) @@ -338,10 +352,11 @@ func TestConcurrentSetWatch(t *testing.T) { t.Fatalf("failed to set snapshot %q: %s", id, err) } } else { + streamState := stream.NewStreamState(false, map[string]string{}) cancel := c.CreateWatch(&discovery.DiscoveryRequest{ Node: &core.Node{Id: id}, TypeUrl: rsrc.EndpointType, - }, value) + }, streamState, value) defer cancel() } @@ -351,9 +366,10 @@ func TestConcurrentSetWatch(t *testing.T) { func TestSnapshotCacheWatchCancel(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) + streamState := stream.NewStreamState(false, map[string]string{}) for _, typ := range testTypes { value := make(chan cache.Response, 1) - cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, value) + cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, streamState, value) cancel() } // should be status info for the node @@ -377,7 +393,9 @@ func TestSnapshotCacheWatchTimeout(t *testing.T) { // Create a non-buffered channel that will block sends. watchCh := make(chan cache.Response) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: names[rsrc.EndpointType]}, watchCh) + streamState := stream.NewStreamState(false, map[string]string{}) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: names[rsrc.EndpointType]}, + streamState, watchCh) // The first time we set the snapshot without consuming from the blocking channel, so this should time out. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) @@ -409,6 +427,76 @@ func TestSnapshotCacheWatchTimeout(t *testing.T) { } } +func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { + clusterName2 := "clusterName2" + routeName2 := "routeName2" + listenerName2 := "listenerName2" + c := cache.NewSnapshotCache(false, group{}, logger{t: t}) + + snapshot2, _ := cache.NewSnapshot(version, map[rsrc.Type][]types.Resource{ + rsrc.EndpointType: {testEndpoint, resource.MakeEndpoint(clusterName2, 8080)}, + rsrc.ClusterType: {testCluster, resource.MakeCluster(resource.Ads, clusterName2)}, + rsrc.RouteType: {testRoute, resource.MakeRoute(routeName2, clusterName2)}, + rsrc.ListenerType: {testListener, resource.MakeRouteHTTPListener(resource.Ads, listenerName2, 80, routeName2)}, + rsrc.RuntimeType: {}, + rsrc.SecretType: {}, + rsrc.ExtensionConfigType: {}, + }) + if err := c.SetSnapshot(context.Background(), key, snapshot2); err != nil { + t.Fatal(err) + } + watch := make(chan cache.Response) + + // Request resource with name=ClusterName + go func() { + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}}, + stream.NewStreamState(false, map[string]string{}), watch) + }() + + select { + case out := <-watch: + if gotVersion, _ := out.GetVersion(); gotVersion != version { + t.Errorf("got version %q, want %q", gotVersion, version) + } + want := map[string]types.ResourceWithTTL{clusterName: snapshot2.Resources[types.Endpoint].Items[clusterName]} + if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), want) { + t.Errorf("got resources %v, want %v", out.(*cache.RawResponse).Resources, want) + } + case <-time.After(time.Second): + t.Fatal("failed to receive snapshot response") + } + + // Request additional resource with name=clusterName2 for same version + go func() { + state := stream.NewStreamState(false, map[string]string{}) + state.SetKnownResourceNames(rsrc.EndpointType, map[string]struct{}{clusterName: {}}) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: version, + ResourceNames: []string{clusterName, clusterName2}}, state, watch) + }() + + select { + case out := <-watch: + if gotVersion, _ := out.GetVersion(); gotVersion != version { + t.Errorf("got version %q, want %q", gotVersion, version) + } + if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot2.Resources[types.Endpoint].Items) { + t.Errorf("got resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot2.Resources[types.Endpoint].Items) + } + case <-time.After(time.Second): + t.Fatal("failed to receive snapshot response") + } + + // Repeat request for with same version and make sure a watch is created + state := stream.NewStreamState(false, map[string]string{}) + state.SetKnownResourceNames(rsrc.EndpointType, map[string]struct{}{clusterName: {}, clusterName2: {}}) + if cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: version, + ResourceNames: []string{clusterName, clusterName2}}, state, watch); cancel == nil { + t.Fatal("Should create a watch") + } else { + cancel() + } +} + func TestSnapshotClear(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) if err := c.SetSnapshot(context.Background(), key, snapshot); err != nil { diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index b2711f654f..ef40e8ceaf 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -29,6 +29,7 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + streamv3 "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) type Server interface { @@ -105,6 +106,15 @@ type watches struct { nonces map[string]string } +// Discovery response that is sent over GRPC stream +// We need to record what resource names are already sent to a client +// So if the client requests a new name we can respond back +// regardless current snapshot version (even if it is not changed yet) +type lastDiscoveryResponse struct { + nonce string + resources map[string]struct{} +} + // Initialize all watches func (values *watches) Init() { // muxed channel needs a buffer to release go-routines populating it @@ -158,6 +168,9 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest // ignores stale nonces. nonce is only modified within send() function. var streamNonce int64 + streamState := streamv3.NewStreamState(false, map[string]string{}) + lastDiscoveryResponses := map[string]lastDiscoveryResponse{} + // a collection of stack allocated watches per request type var values watches values.Init() @@ -182,6 +195,16 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest // increment nonce streamNonce = streamNonce + 1 out.Nonce = strconv.FormatInt(streamNonce, 10) + + lastResponse := lastDiscoveryResponse{ + nonce: out.Nonce, + resources: make(map[string]struct{}), + } + for _, r := range resp.GetRequest().ResourceNames { + lastResponse.resources[r] = struct{}{} + } + lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse + if s.callbacks != nil { s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out) } @@ -329,6 +352,13 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest } } + if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok { + if lastResponse.nonce == "" || lastResponse.nonce == nonce { + // Let's record Resource names that a client has received. + streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) + } + } + // cancel existing watches to (re-)request a newer version switch { case req.TypeUrl == resource.EndpointType: @@ -337,7 +367,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest values.endpointCancel() } values.endpoints = make(chan cache.Response, 1) - values.endpointCancel = s.cache.CreateWatch(req, values.endpoints) + values.endpointCancel = s.cache.CreateWatch(req, streamState, values.endpoints) } case req.TypeUrl == resource.ClusterType: if values.clusterNonce == "" || values.clusterNonce == nonce { @@ -345,7 +375,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest values.clusterCancel() } values.clusters = make(chan cache.Response, 1) - values.clusterCancel = s.cache.CreateWatch(req, values.clusters) + values.clusterCancel = s.cache.CreateWatch(req, streamState, values.clusters) } case req.TypeUrl == resource.RouteType: if values.routeNonce == "" || values.routeNonce == nonce { @@ -353,7 +383,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest values.routeCancel() } values.routes = make(chan cache.Response, 1) - values.routeCancel = s.cache.CreateWatch(req, values.routes) + values.routeCancel = s.cache.CreateWatch(req, streamState, values.routes) } case req.TypeUrl == resource.ScopedRouteType: if values.scopedRouteNonce == "" || values.scopedRouteNonce == nonce { @@ -361,7 +391,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest values.scopedRouteCancel() } values.scopedRoutes = make(chan cache.Response, 1) - values.scopedRouteCancel = s.cache.CreateWatch(req, values.scopedRoutes) + values.scopedRouteCancel = s.cache.CreateWatch(req, streamState, values.scopedRoutes) } case req.TypeUrl == resource.ListenerType: if values.listenerNonce == "" || values.listenerNonce == nonce { @@ -369,7 +399,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest values.listenerCancel() } values.listeners = make(chan cache.Response, 1) - values.listenerCancel = s.cache.CreateWatch(req, values.listeners) + values.listenerCancel = s.cache.CreateWatch(req, streamState, values.listeners) } case req.TypeUrl == resource.SecretType: if values.secretNonce == "" || values.secretNonce == nonce { @@ -377,7 +407,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest values.secretCancel() } values.secrets = make(chan cache.Response, 1) - values.secretCancel = s.cache.CreateWatch(req, values.secrets) + values.secretCancel = s.cache.CreateWatch(req, streamState, values.secrets) } case req.TypeUrl == resource.RuntimeType: if values.runtimeNonce == "" || values.runtimeNonce == nonce { @@ -385,7 +415,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest values.runtimeCancel() } values.runtimes = make(chan cache.Response, 1) - values.runtimeCancel = s.cache.CreateWatch(req, values.runtimes) + values.runtimeCancel = s.cache.CreateWatch(req, streamState, values.runtimes) } case req.TypeUrl == resource.ExtensionConfigType: if values.extensionConfigNonce == "" || values.extensionConfigNonce == nonce { @@ -393,7 +423,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest values.extensionConfigCancel() } values.extensionConfigs = make(chan cache.Response, 1) - values.extensionConfigCancel = s.cache.CreateWatch(req, values.extensionConfigs) + values.extensionConfigCancel = s.cache.CreateWatch(req, streamState, values.extensionConfigs) } default: typeURL := req.TypeUrl @@ -402,7 +432,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest if cancel, seen := values.cancellations[typeURL]; seen && cancel != nil { cancel() } - values.cancellations[typeURL] = s.cache.CreateWatch(req, values.responses) + values.cancellations[typeURL] = s.cache.CreateWatch(req, streamState, values.responses) } } } diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index d5b4fed2de..3a4247c457 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -30,6 +30,9 @@ type StreamState struct { // nolint:golint,revive // This field stores the last state sent to the client. resourceVersions map[string]string + // knownResourceNames contains resource names that a client has received previously + knownResourceNames map[string]map[string]struct{} + // indicates whether the object has beed modified since its creation first bool } @@ -51,12 +54,29 @@ func (s *StreamState) IsWildcard() bool { return s.wildcard } +func (s *StreamState) SetKnownResourceNames(url string, names map[string]struct{}) { + s.knownResourceNames[url] = names +} + +func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) { + m := map[string]struct{}{} + for _, name := range names { + m[name] = struct{}{} + } + s.knownResourceNames[url] = m +} + +func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} { + return s.knownResourceNames[url] +} + // NewStreamState initializes a stream state. func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState { state := StreamState{ - wildcard: wildcard, - resourceVersions: initialResourceVersions, - first: true, + wildcard: wildcard, + resourceVersions: initialResourceVersions, + first: true, + knownResourceNames: map[string]map[string]struct{}{}, } if initialResourceVersions == nil { diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 9cc732f089..80c7e65d2a 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -30,6 +30,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" ) @@ -45,7 +46,7 @@ type mockConfigWatcher struct { mu *sync.RWMutex } -func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, out chan cache.Response) func() { +func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, state stream.StreamState, out chan cache.Response) func() { config.counts[req.TypeUrl] = config.counts[req.TypeUrl] + 1 if len(config.responses[req.TypeUrl]) > 0 { out <- config.responses[req.TypeUrl][0]