From 1922477c16b9229b9607f16218e7297369015100 Mon Sep 17 00:00:00 2001 From: Pavel Date: Tue, 8 Jun 2021 22:17:14 +0300 Subject: [PATCH] server api without linearization (#443) Signed-off-by: Pavel Forkert --- pkg/cache/v3/cache.go | 2 +- pkg/cache/v3/linear.go | 13 +++---- pkg/cache/v3/linear_test.go | 72 +++++++++++++++++++++--------------- pkg/cache/v3/mux.go | 9 ++--- pkg/cache/v3/simple.go | 9 ++--- pkg/cache/v3/simple_test.go | 26 ++++++++----- pkg/server/v3/server_test.go | 34 ++--------------- 7 files changed, 77 insertions(+), 88 deletions(-) diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index 73312a816a..47adcbe06b 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -54,7 +54,7 @@ type ConfigWatcher interface { // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateWatch(*Request) (value chan Response, cancel func()) + CreateWatch(*Request, chan Response) (cancel func()) // CreateDeltaWatch returns a new open incremental xDS watch. // diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index cbc50842f4..da674ae9ce 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -165,11 +165,10 @@ func (cache *LinearCache) DeleteResource(name string) error { return nil } -func (cache *LinearCache) CreateWatch(request *Request) (chan Response, func()) { - value := make(chan Response, 1) +func (cache *LinearCache) CreateWatch(request *Request, value chan Response) func() { if request.TypeUrl != cache.typeURL { - close(value) - return value, nil + value <- nil + return nil } // If the version is not up to date, check whether any requested resource has // been updated between the last version and the current version. This avoids the problem @@ -205,12 +204,12 @@ func (cache *LinearCache) CreateWatch(request *Request) (chan Response, func()) } if stale { cache.respond(value, staleResources) - return value, nil + return nil } // Create open watches since versions are up to date. if len(request.ResourceNames) == 0 { cache.watchAll[value] = struct{}{} - return value, func() { + return func() { cache.mu.Lock() defer cache.mu.Unlock() delete(cache.watchAll, value) @@ -224,7 +223,7 @@ func (cache *LinearCache) CreateWatch(request *Request) (chan Response, func()) } set[value] = struct{}{} } - return value, func() { + return func() { cache.mu.Lock() defer cache.mu.Unlock() for _, name := range request.ResourceNames { diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index cc3935007c..65b09b4c26 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -72,9 +72,10 @@ func mustBlock(t *testing.T, w <-chan Response) { func TestLinearInitialResources(t *testing.T) { c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) - w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}) + w := make(chan Response, 1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, w) verifyResponse(t, w, "0", 1) - w, _ = c.CreateWatch(&Request{TypeUrl: testType}) + c.CreateWatch(&Request{TypeUrl: testType}, w) verifyResponse(t, w, "0", 2) } @@ -85,14 +86,15 @@ func TestLinearCornerCases(t *testing.T) { t.Error("expected error on nil resource") } // create an incorrect type URL request - w, _ := c.CreateWatch(&Request{TypeUrl: "test"}) + w := make(chan Response, 1) + c.CreateWatch(&Request{TypeUrl: "test"}, w) select { - case _, more := <-w: - if more { - t.Error("should be closed by the producer") + case r := <-w: + if r != nil { + t.Error("response should be nil") } default: - t.Error("channel should be closed") + t.Error("should receive nil response") } } @@ -100,9 +102,12 @@ func TestLinearBasic(t *testing.T) { c := NewLinearCache(testType) // Create watches before a resource is ready - w1, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}) + w1 := make(chan Response, 1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1) mustBlock(t, w1) - w, _ := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}) + + w := make(chan Response, 1) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w) mustBlock(t, w) checkWatchCount(t, c, "a", 2) checkWatchCount(t, c, "b", 1) @@ -113,59 +118,63 @@ func TestLinearBasic(t *testing.T) { verifyResponse(t, w, "1", 1) // Request again, should get same response - w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) - w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) // Add another element and update the first, response should be different c.UpdateResource("b", testResource("b")) c.UpdateResource("a", testResource("aa")) - w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w) verifyResponse(t, w, "3", 1) - w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w) verifyResponse(t, w, "3", 2) } func TestLinearVersionPrefix(t *testing.T) { c := NewLinearCache(testType, WithVersionPrefix("instance1-")) - w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}) + w := make(chan Response, 1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w) verifyResponse(t, w, "instance1-0", 0) c.UpdateResource("a", testResource("a")) - w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w) verifyResponse(t, w, "instance1-1", 1) - w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) } func TestLinearDeletion(t *testing.T) { c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) - w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}) + w := make(chan Response, 1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) c.DeleteResource("a") verifyResponse(t, w, "1", 0) checkWatchCount(t, c, "a", 0) - w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w) verifyResponse(t, w, "1", 1) checkWatchCount(t, c, "b", 0) c.DeleteResource("b") - w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w) verifyResponse(t, w, "2", 0) checkWatchCount(t, c, "b", 0) } func TestLinearWatchTwo(t *testing.T) { c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) - w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}) + w := make(chan Response, 1) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, w) mustBlock(t, w) - w1, _ := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}) + w1 := make(chan Response, 1) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w1) mustBlock(t, w1) c.UpdateResource("a", testResource("aa")) // should only get the modified resource @@ -178,24 +187,28 @@ func TestLinearCancel(t *testing.T) { c.UpdateResource("a", testResource("a")) // cancel watch-all - w, cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}) + w := make(chan Response, 1) + cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) cancel() checkWatchCount(t, c, "a", 0) // cancel watch for "a" - w, cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}) + cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w) mustBlock(t, w) checkWatchCount(t, c, "a", 1) cancel() checkWatchCount(t, c, "a", 0) // open four watches for "a" and "b" and two for all, cancel one of each, make sure the second one is unaffected - w, cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}) - w2, cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}) - w3, cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}) - w4, cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}) + w2 := make(chan Response, 1) + w3 := make(chan Response, 1) + w4 := make(chan Response, 1) + cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w) + cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, w2) + cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w3) + cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w4) mustBlock(t, w) mustBlock(t, w2) mustBlock(t, w3) @@ -230,12 +243,13 @@ func TestLinearConcurrentSetWatch(t *testing.T) { } else { id2 := fmt.Sprintf("%d", i-1) t.Logf("request resources %q and %q", id, id2) - value, _ := c.CreateWatch(&Request{ + value := make(chan Response, 1) + c.CreateWatch(&Request{ // Only expect one to become stale ResourceNames: []string{id, id2}, VersionInfo: "0", TypeUrl: testType, - }) + }, value) // wait until all updates apply verifyResponse(t, value, "", 1) } diff --git a/pkg/cache/v3/mux.go b/pkg/cache/v3/mux.go index c8002a6f11..1942a1465e 100644 --- a/pkg/cache/v3/mux.go +++ b/pkg/cache/v3/mux.go @@ -36,15 +36,14 @@ type MuxCache struct { var _ Cache = &MuxCache{} -func (mux *MuxCache) CreateWatch(request *Request) (chan Response, func()) { +func (mux *MuxCache) CreateWatch(request *Request, value chan Response) func() { key := mux.Classify(*request) cache, exists := mux.Caches[key] if !exists { - value := make(chan Response, 0) - close(value) - return value, nil + value <- nil + return nil } - return cache.CreateWatch(request) + return cache.CreateWatch(request, value) } func (cache *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState) (chan DeltaResponse, func()) { diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 557bf15e9b..111e5f4898 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -282,7 +282,7 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTtl) } // CreateWatch returns a watch for an xDS request. -func (cache *snapshotCache) CreateWatch(request *Request) (chan Response, func()) { +func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) func() { nodeID := cache.hash.ID(request.Node) cache.mu.Lock() @@ -299,9 +299,6 @@ func (cache *snapshotCache) CreateWatch(request *Request) (chan Response, func() info.lastWatchRequestTime = time.Now() info.mu.Unlock() - // allocate capacity 1 to allow one-time non-blocking use - value := make(chan Response, 1) - snapshot, exists := cache.snapshots[nodeID] version := snapshot.GetVersion(request.TypeUrl) @@ -315,14 +312,14 @@ func (cache *snapshotCache) CreateWatch(request *Request) (chan Response, func() info.mu.Lock() info.watches[watchID] = ResponseWatch{Request: request, Response: value} info.mu.Unlock() - return value, cache.cancelWatch(nodeID, watchID) + return cache.cancelWatch(nodeID, watchID) } // otherwise, the watch may be responded immediately resources := snapshot.GetResourcesAndTtl(request.TypeUrl) cache.respond(request, value, resources, version, false) - return value, nil + return nil } func (cache *snapshotCache) nextWatchID() int64 { diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index eef7274f0b..aca8a9bd61 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -119,7 +119,8 @@ func TestSnapshotCacheWithTtl(t *testing.T) { wg.Add(1) t.Run(typ, func(t *testing.T) { defer wg.Done() - value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}) + value := make(chan cache.Response, 1) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, value) select { case out := <-value: if gotVersion, _ := out.GetVersion(); gotVersion != version { @@ -145,7 +146,8 @@ func TestSnapshotCacheWithTtl(t *testing.T) { end := time.After(5 * time.Second) for { - value, cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version}) + value := make(chan cache.Response, 1) + cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version}, value) select { case out := <-value: @@ -201,7 +203,8 @@ func TestSnapshotCache(t *testing.T) { // try to get endpoints with incorrect list of names // should not receive response - value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}) + value := make(chan cache.Response, 1) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}, value) select { case out := <-value: t.Errorf("watch for endpoints and mismatched names => got %v, want none", out) @@ -210,7 +213,8 @@ func TestSnapshotCache(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { - value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}) + value := make(chan cache.Response, 1) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, value) select { case out := <-value: if gotVersion, _ := out.GetVersion(); gotVersion != version { @@ -261,7 +265,8 @@ func TestSnapshotCacheWatch(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) watches := make(map[string]chan cache.Response) for _, typ := range testTypes { - watches[typ], _ = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}) + watches[typ] = make(chan cache.Response, 1) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, watches[typ]) } if err := c.SetSnapshot(key, snapshot); err != nil { t.Fatal(err) @@ -284,7 +289,8 @@ func TestSnapshotCacheWatch(t *testing.T) { // open new watches with the latest version for _, typ := range testTypes { - watches[typ], _ = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version}) + watches[typ] = make(chan cache.Response, 1) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version}, watches[typ]) } if count := c.GetStatusInfo(key).GetNumWatches(); count != len(testTypes) { t.Errorf("watches should be created for the latest version: %d", count) @@ -322,6 +328,7 @@ func TestConcurrentSetWatch(t *testing.T) { t.Parallel() id := fmt.Sprintf("%d", i%2) var cancel func() + value := make(chan cache.Response, 1) if i < 25 { snap := cache.Snapshot{} snap.Resources[types.Endpoint] = cache.NewResources(fmt.Sprintf("v%d", i), []types.Resource{resource.MakeEndpoint(clusterName, uint32(i))}) @@ -330,10 +337,10 @@ func TestConcurrentSetWatch(t *testing.T) { if cancel != nil { cancel() } - _, cancel = c.CreateWatch(&discovery.DiscoveryRequest{ + cancel = c.CreateWatch(&discovery.DiscoveryRequest{ Node: &core.Node{Id: id}, TypeUrl: rsrc.EndpointType, - }) + }, value) } }) }(i) @@ -343,7 +350,8 @@ func TestConcurrentSetWatch(t *testing.T) { func TestSnapshotCacheWatchCancel(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) for _, typ := range testTypes { - _, cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}) + value := make(chan cache.Response, 1) + cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, value) cancel() } // should be status info for the node diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 208c5e0593..a243b373d3 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -48,23 +48,18 @@ type mockConfigWatcher struct { mu *sync.RWMutex } -func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest) (chan cache.Response, func()) { +func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, out chan cache.Response) func() { config.counts[req.TypeUrl] = config.counts[req.TypeUrl] + 1 - out := make(chan cache.Response, 1) if len(config.responses[req.TypeUrl]) > 0 { out <- config.responses[req.TypeUrl][0] config.responses[req.TypeUrl] = config.responses[req.TypeUrl][1:] - } else if config.closeWatch { - close(out) } else { config.watches += 1 - return out, func() { - // it is ok to close the channel after cancellation and not wait for it to be garbage collected - close(out) + return func() { config.watches -= 1 } } - return out, nil + return nil } func (config *mockConfigWatcher) Fetch(ctx context.Context, req *discovery.DiscoveryRequest) (cache.Response, error) { @@ -415,29 +410,6 @@ func TestFetch(t *testing.T) { assert.True(t, responseCount == 6) } -func TestWatchClosed(t *testing.T) { - for _, typ := range testTypes { - t.Run(typ, func(t *testing.T) { - config := makeMockConfigWatcher() - config.closeWatch = true - s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) - - // make a request - resp := makeMockStream(t) - resp.recv <- &discovery.DiscoveryRequest{ - Node: node, - TypeUrl: typ, - } - - // check that response fails since watch gets closed - err := s.StreamAggregatedResources(resp) - assert.Error(t, err) - - close(resp.recv) - }) - } -} - func TestSendError(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) {