Skip to content

Commit

Permalink
server api without linearization (envoyproxy#443)
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Forkert <[email protected]>
  • Loading branch information
fxposter authored and alecholmez committed Jun 10, 2021
1 parent f5f3295 commit 3f52731
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 131 deletions.
2 changes: 1 addition & 1 deletion pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateWatch(*Request) (value chan Response, cancel func())
CreateWatch(*Request, chan Response) (cancel func())

// CreateDeltaWatch returns a new open incremental xDS watch.
//
Expand Down
13 changes: 6 additions & 7 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,10 @@ func (cache *LinearCache) DeleteResource(name string) error {
return nil
}

func (cache *LinearCache) CreateWatch(request *Request) (chan Response, func()) {
value := make(chan Response, 1)
func (cache *LinearCache) CreateWatch(request *Request, value chan Response) func() {
if request.TypeUrl != cache.typeURL {
close(value)
return value, nil
value <- nil
return nil
}
// If the version is not up to date, check whether any requested resource has
// been updated between the last version and the current version. This avoids the problem
Expand Down Expand Up @@ -205,12 +204,12 @@ func (cache *LinearCache) CreateWatch(request *Request) (chan Response, func())
}
if stale {
cache.respond(value, staleResources)
return value, nil
return nil
}
// Create open watches since versions are up to date.
if len(request.ResourceNames) == 0 {
cache.watchAll[value] = struct{}{}
return value, func() {
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
delete(cache.watchAll, value)
Expand All @@ -224,7 +223,7 @@ func (cache *LinearCache) CreateWatch(request *Request) (chan Response, func())
}
set[value] = struct{}{}
}
return value, func() {
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
for _, name := range request.ResourceNames {
Expand Down
72 changes: 43 additions & 29 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ func mustBlock(t *testing.T, w <-chan Response) {

func TestLinearInitialResources(t *testing.T) {
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType})
w := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, w)
verifyResponse(t, w, "0", 1)
w, _ = c.CreateWatch(&Request{TypeUrl: testType})
c.CreateWatch(&Request{TypeUrl: testType}, w)
verifyResponse(t, w, "0", 2)
}

Expand All @@ -85,24 +86,28 @@ func TestLinearCornerCases(t *testing.T) {
t.Error("expected error on nil resource")
}
// create an incorrect type URL request
w, _ := c.CreateWatch(&Request{TypeUrl: "test"})
w := make(chan Response, 1)
c.CreateWatch(&Request{TypeUrl: "test"}, w)
select {
case _, more := <-w:
if more {
t.Error("should be closed by the producer")
case r := <-w:
if r != nil {
t.Error("response should be nil")
}
default:
t.Error("channel should be closed")
t.Error("should receive nil response")
}
}

func TestLinearBasic(t *testing.T) {
c := NewLinearCache(testType)

// Create watches before a resource is ready
w1, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
w1 := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1)
mustBlock(t, w1)
w, _ := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})

w := make(chan Response, 1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w)
mustBlock(t, w)
checkWatchCount(t, c, "a", 2)
checkWatchCount(t, c, "b", 1)
Expand All @@ -113,59 +118,63 @@ func TestLinearBasic(t *testing.T) {
verifyResponse(t, w, "1", 1)

// Request again, should get same response
w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
checkWatchCount(t, c, "a", 0)
verifyResponse(t, w, "1", 1)
w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w)
checkWatchCount(t, c, "a", 0)
verifyResponse(t, w, "1", 1)

// Add another element and update the first, response should be different
c.UpdateResource("b", testResource("b"))
c.UpdateResource("a", testResource("aa"))
w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
verifyResponse(t, w, "3", 1)
w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w)
verifyResponse(t, w, "3", 2)
}

func TestLinearVersionPrefix(t *testing.T) {
c := NewLinearCache(testType, WithVersionPrefix("instance1-"))

w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
w := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
verifyResponse(t, w, "instance1-0", 0)

c.UpdateResource("a", testResource("a"))
w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
verifyResponse(t, w, "instance1-1", 1)

w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"})
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, w)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
}

func TestLinearDeletion(t *testing.T) {
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
w := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
c.DeleteResource("a")
verifyResponse(t, w, "1", 0)
checkWatchCount(t, c, "a", 0)
w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w)
verifyResponse(t, w, "1", 1)
checkWatchCount(t, c, "b", 0)
c.DeleteResource("b")
w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w)
verifyResponse(t, w, "2", 0)
checkWatchCount(t, c, "b", 0)
}

func TestLinearWatchTwo(t *testing.T) {
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"})
w := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, w)
mustBlock(t, w)
w1, _ := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
w1 := make(chan Response, 1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w1)
mustBlock(t, w1)
c.UpdateResource("a", testResource("aa"))
// should only get the modified resource
Expand All @@ -178,24 +187,28 @@ func TestLinearCancel(t *testing.T) {
c.UpdateResource("a", testResource("a"))

// cancel watch-all
w, cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
w := make(chan Response, 1)
cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
cancel()
checkWatchCount(t, c, "a", 0)

// cancel watch for "a"
w, cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"})
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
cancel()
checkWatchCount(t, c, "a", 0)

// open four watches for "a" and "b" and two for all, cancel one of each, make sure the second one is unaffected
w, cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"})
w2, cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"})
w3, cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
w4, cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
w2 := make(chan Response, 1)
w3 := make(chan Response, 1)
w4 := make(chan Response, 1)
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w)
cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, w2)
cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w3)
cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w4)
mustBlock(t, w)
mustBlock(t, w2)
mustBlock(t, w3)
Expand Down Expand Up @@ -230,12 +243,13 @@ func TestLinearConcurrentSetWatch(t *testing.T) {
} else {
id2 := fmt.Sprintf("%d", i-1)
t.Logf("request resources %q and %q", id, id2)
value, _ := c.CreateWatch(&Request{
value := make(chan Response, 1)
c.CreateWatch(&Request{
// Only expect one to become stale
ResourceNames: []string{id, id2},
VersionInfo: "0",
TypeUrl: testType,
})
}, value)
// wait until all updates apply
verifyResponse(t, value, "", 1)
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/cache/v3/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ type MuxCache struct {

var _ Cache = &MuxCache{}

func (mux *MuxCache) CreateWatch(request *Request) (chan Response, func()) {
func (mux *MuxCache) CreateWatch(request *Request, value chan Response) func() {
key := mux.Classify(*request)
cache, exists := mux.Caches[key]
if !exists {
value := make(chan Response, 0)
close(value)
return value, nil
value <- nil
return nil
}
return cache.CreateWatch(request)
return cache.CreateWatch(request, value)
}

func (cache *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState) (chan DeltaResponse, func()) {
Expand Down
9 changes: 3 additions & 6 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTtl)
}

// CreateWatch returns a watch for an xDS request.
func (cache *snapshotCache) CreateWatch(request *Request) (chan Response, func()) {
func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) func() {
nodeID := cache.hash.ID(request.Node)

cache.mu.Lock()
Expand All @@ -299,9 +299,6 @@ func (cache *snapshotCache) CreateWatch(request *Request) (chan Response, func()
info.lastWatchRequestTime = time.Now()
info.mu.Unlock()

// allocate capacity 1 to allow one-time non-blocking use
value := make(chan Response, 1)

snapshot, exists := cache.snapshots[nodeID]
version := snapshot.GetVersion(request.TypeUrl)

Expand All @@ -315,14 +312,14 @@ func (cache *snapshotCache) CreateWatch(request *Request) (chan Response, func()
info.mu.Lock()
info.watches[watchID] = ResponseWatch{Request: request, Response: value}
info.mu.Unlock()
return value, cache.cancelWatch(nodeID, watchID)
return cache.cancelWatch(nodeID, watchID)
}

// otherwise, the watch may be responded immediately
resources := snapshot.GetResourcesAndTtl(request.TypeUrl)
cache.respond(request, value, resources, version, false)

return value, nil
return nil
}

func (cache *snapshotCache) nextWatchID() int64 {
Expand Down
26 changes: 17 additions & 9 deletions pkg/cache/v3/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ func TestSnapshotCacheWithTtl(t *testing.T) {
wg.Add(1)
t.Run(typ, func(t *testing.T) {
defer wg.Done()
value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]})
value := make(chan cache.Response, 1)
c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, value)
select {
case out := <-value:
if gotVersion, _ := out.GetVersion(); gotVersion != version {
Expand All @@ -146,7 +147,8 @@ func TestSnapshotCacheWithTtl(t *testing.T) {

end := time.After(5 * time.Second)
for {
value, cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version})
value := make(chan cache.Response, 1)
cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version}, value)

select {
case out := <-value:
Expand Down Expand Up @@ -202,7 +204,8 @@ func TestSnapshotCache(t *testing.T) {

// try to get endpoints with incorrect list of names
// should not receive response
value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}})
value := make(chan cache.Response, 1)
c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}, value)
select {
case out := <-value:
t.Errorf("watch for endpoints and mismatched names => got %v, want none", out)
Expand All @@ -211,7 +214,8 @@ func TestSnapshotCache(t *testing.T) {

for _, typ := range testTypes {
t.Run(typ, func(t *testing.T) {
value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]})
value := make(chan cache.Response, 1)
c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, value)
select {
case out := <-value:
if gotVersion, _ := out.GetVersion(); gotVersion != version {
Expand Down Expand Up @@ -262,7 +266,8 @@ func TestSnapshotCacheWatch(t *testing.T) {
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
watches := make(map[string]chan cache.Response)
for _, typ := range testTypes {
watches[typ], _ = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]})
watches[typ] = make(chan cache.Response, 1)
c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, watches[typ])
}
if err := c.SetSnapshot(key, snapshot); err != nil {
t.Fatal(err)
Expand All @@ -285,7 +290,8 @@ func TestSnapshotCacheWatch(t *testing.T) {

// open new watches with the latest version
for _, typ := range testTypes {
watches[typ], _ = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version})
watches[typ] = make(chan cache.Response, 1)
c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version}, watches[typ])
}
if count := c.GetStatusInfo(key).GetNumWatches(); count != len(testTypes) {
t.Errorf("watches should be created for the latest version: %d", count)
Expand Down Expand Up @@ -323,6 +329,7 @@ func TestConcurrentSetWatch(t *testing.T) {
t.Parallel()
id := fmt.Sprintf("%d", i%2)
var cancel func()
value := make(chan cache.Response, 1)
if i < 25 {
snap := cache.Snapshot{}
snap.Resources[types.Endpoint] = cache.NewResources(fmt.Sprintf("v%d", i), []types.Resource{resource.MakeEndpoint(clusterName, uint32(i))})
Expand All @@ -331,10 +338,10 @@ func TestConcurrentSetWatch(t *testing.T) {
if cancel != nil {
cancel()
}
_, cancel = c.CreateWatch(&discovery.DiscoveryRequest{
cancel = c.CreateWatch(&discovery.DiscoveryRequest{
Node: &core.Node{Id: id},
TypeUrl: rsrc.EndpointType,
})
}, value)
}
})
}(i)
Expand All @@ -344,7 +351,8 @@ func TestConcurrentSetWatch(t *testing.T) {
func TestSnapshotCacheWatchCancel(t *testing.T) {
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
for _, typ := range testTypes {
_, cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]})
value := make(chan cache.Response, 1)
cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, value)
cancel()
}
// should be status info for the node
Expand Down
Loading

0 comments on commit 3f52731

Please sign in to comment.