Skip to content

Commit

Permalink
[Linear cache] Add new UpdateResources method to update multiple reso…
Browse files Browse the repository at this point in the history
…urces at once without doing a stow update and limit unneeded allocations when processing delta watches

Signed-off-by: Valerian Roche <[email protected]>
  • Loading branch information
valerian-roche committed Mar 15, 2022
1 parent e6111ee commit e705a9d
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 18 deletions.
34 changes: 21 additions & 13 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,39 @@ type resourceContainer struct {

func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse {
// variables to build our response with
nextVersionMap := make(map[string]string)
filtered := make([]types.Resource, 0, len(resources.resourceMap))
toRemove := make([]string, 0)
var nextVersionMap map[string]string
var filtered []types.Resource
var toRemove []string

// If we are handling a wildcard request, we want to respond with all resources
switch {
case state.IsWildcard():
if len(state.GetResourceVersions()) == 0 {
filtered = make([]types.Resource, 0, len(resources.resourceMap))
}
nextVersionMap = make(map[string]string, len(resources.resourceMap))
for name, r := range resources.resourceMap {
// Since we've already precomputed the version hashes of the new snapshot,
// we can just set it here to be used for comparison later
version := resources.versionMap[name]
nextVersionMap[name] = version
prevVersion, found := state.GetResourceVersions()[name]
if !found || (prevVersion != nextVersionMap[name]) {
if !found || (prevVersion != version) {
filtered = append(filtered, r)
}
}

// Compute resources for removal
for name, prevVersion := range state.GetResourceVersions() {
// The prevVersion != "" check is in place to make sure we are only sending an update to the client once right after it is removed.
// If the client decides to keep the subscription we skip the add for every subsequent response.
if _, ok := resources.resourceMap[name]; !ok && prevVersion != "" {
toRemove = append(toRemove, name)
}
}
default:
// Reply only with the requested resources
nextVersionMap = make(map[string]string, len(state.GetResourceVersions()))
for name, prevVersion := range state.GetResourceVersions() {
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
Expand All @@ -59,19 +73,13 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
} else {
// We track non-existent resources for non-wildcard streams until the client explicitly unsubscribes from them.
nextVersionMap[name] = ""
if prevVersion != "" {
toRemove = append(toRemove, name)
}
}
}
}

// Compute resources for removal regardless of the request type
for name, prevVersion := range state.GetResourceVersions() {
// The prevVersion != "" check is in place to make sure we are only sending an update to the client once right after it is removed.
// If the client decides to keep the subscription we skip the add for every subsequent response.
if _, ok := resources.resourceMap[name]; !ok && prevVersion != "" {
toRemove = append(toRemove, name)
}
}

return &RawDeltaResponse{
DeltaRequest: req,
Resources: filtered,
Expand Down
27 changes: 27 additions & 0 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,33 @@ func (cache *LinearCache) DeleteResource(name string) error {
return nil
}

// UpdateResources updates a list of resources in the collection.
// Resources provided with value == nil are considered as deleted.
// Calling UpdateResources instead of iterating on UpdateResource and DeleteResource
// is significantly more efficient when using delta-watches.
func (cache *LinearCache) UpdateResources(toUpdate map[string]types.Resource, toDelete []string) error {
cache.mu.Lock()
defer cache.mu.Unlock()

cache.version++

modified := make(map[string]struct{}, len(toUpdate)+len(toDelete))
for name, resource := range toUpdate {
cache.versionVector[name] = cache.version
cache.resources[name] = resource
modified[name] = struct{}{}
}
for _, name := range toDelete {
delete(cache.versionVector, name)
delete(cache.resources, name)
modified[name] = struct{}{}
}

cache.notifyAll(modified)

return nil
}

// SetResources replaces current resources with a new set of resources.
// This function is useful for wildcard xDS subscriptions.
// This way watches that are subscribed to all resources are triggered only once regardless of how many resources are changed.
Expand Down
123 changes: 118 additions & 5 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -66,13 +67,13 @@ type resourceInfo struct {
version string
}

func verifyDeltaResponse(t *testing.T, ch <-chan DeltaResponse, resources []resourceInfo, deleted []string) {
func validateDeltaResponse(t *testing.T, resp DeltaResponse, resources []resourceInfo, deleted []string) {
t.Helper()
r := <-ch
if r.GetDeltaRequest().TypeUrl != testType {
t.Errorf("unexpected empty request type URL: %q", r.GetDeltaRequest().TypeUrl)

if resp.GetDeltaRequest().TypeUrl != testType {
t.Errorf("unexpected empty request type URL: %q", resp.GetDeltaRequest().TypeUrl)
}
out, err := r.GetDeltaDiscoveryResponse()
out, err := resp.GetDeltaDiscoveryResponse()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -115,6 +116,18 @@ func verifyDeltaResponse(t *testing.T, ch <-chan DeltaResponse, resources []reso
}
}

func verifyDeltaResponse(t *testing.T, ch <-chan DeltaResponse, resources []resourceInfo, deleted []string) {
t.Helper()
var r DeltaResponse
select {
case r = <-ch:
case <-time.After(5 * time.Second):
t.Error("timeout waiting for delta response")
return
}
validateDeltaResponse(t, r, resources, deleted)
}

func checkWatchCount(t *testing.T, c *LinearCache, name string, count int) {
t.Helper()
if i := c.NumWatches(name); i != count {
Expand Down Expand Up @@ -157,6 +170,14 @@ func hashResource(t *testing.T, resource types.Resource) string {
return v
}

func createWildcardDeltaWatch(c *LinearCache, w chan DeltaResponse) {
state := stream.NewStreamState(true, nil)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
resp := <-w
state.SetResourceVersions(resp.GetNextVersionMap())
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) // Ensure the watch is set properly with cache values
}

func TestLinearInitialResources(t *testing.T) {
streamState := stream.NewStreamState(false, map[string]string{})
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
Expand Down Expand Up @@ -551,3 +572,95 @@ func TestLinearDeltaResourceDelete(t *testing.T) {
c.SetResources(map[string]types.Resource{"a": a})
verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, []string{"b"})
}

func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
c := NewLinearCache(testType)

state := stream.NewStreamState(false, map[string]string{"a": "", "b": ""})
w := make(chan DeltaResponse, 1)

// Initial update
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
checkDeltaWatchCount(t, c, 1)
a := &endpoint.ClusterLoadAssignment{ClusterName: "a"}
hashA := hashResource(t, a)
b := &endpoint.ClusterLoadAssignment{ClusterName: "b"}
hashB := hashResource(t, b)
err := c.UpdateResources(map[string]types.Resource{"a": a, "b": b}, nil)
assert.NoError(t, err)
resp := <-w
validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil)
state.SetResourceVersions(resp.GetNextVersionMap())

// Multiple updates
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
checkDeltaWatchCount(t, c, 1)
a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update
{Priority: 10},
}}
b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update
{Priority: 15},
}}
hashA = hashResource(t, a)
hashB = hashResource(t, b)
c.UpdateResources(map[string]types.Resource{"a": a, "b": b}, nil)
resp = <-w
validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil)
state.SetResourceVersions(resp.GetNextVersionMap())

// Update/add/delete
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
checkDeltaWatchCount(t, c, 1)
a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update
{Priority: 15},
}}
d := &endpoint.ClusterLoadAssignment{ClusterName: "d", Endpoints: []*endpoint.LocalityLbEndpoints{}} // resource created, but not watched
hashA = hashResource(t, a)
c.UpdateResources(map[string]types.Resource{"a": a, "d": d}, []string{"b"})
assert.Contains(t, c.resources, "d", "resource with name d not found in cache")
assert.NotContains(t, c.resources, "b", "resource with name b was found in cache")
resp = <-w
validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}}, []string{"b"})
state.SetResourceVersions(resp.GetNextVersionMap())

// Re-add previously deleted watched resource
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
checkDeltaWatchCount(t, c, 1)
b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{}} // recreate watched resource
hashB = hashResource(t, b)
c.UpdateResources(map[string]types.Resource{"b": b}, []string{"d"})
assert.Contains(t, c.resources, "b", "resource with name b not found in cache")
assert.NotContains(t, c.resources, "d", "resource with name d was found in cache")
resp = <-w
validateDeltaResponse(t, resp, []resourceInfo{{"b", hashB}}, nil) // d is not watched and should not be returned
state.SetResourceVersions(resp.GetNextVersionMap())

// Wildcard create/update
createWildcardDeltaWatch(c, w)
mustBlockDelta(t, w)
checkDeltaWatchCount(t, c, 1)
b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update
{Priority: 15},
}}
d = &endpoint.ClusterLoadAssignment{ClusterName: "d", Endpoints: []*endpoint.LocalityLbEndpoints{}} // resource create
hashB = hashResource(t, b)
hashD := hashResource(t, d)
c.UpdateResources(map[string]types.Resource{"b": b, "d": d}, nil)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"d", hashD}}, nil)

// Wildcard update/delete
createWildcardDeltaWatch(c, w)
mustBlockDelta(t, w)
checkDeltaWatchCount(t, c, 1)
a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update
{Priority: 25},
}}
hashA = hashResource(t, a)
c.UpdateResources(map[string]types.Resource{"a": a}, []string{"d"})
assert.NotContains(t, c.resources, "d", "resource with name d was found in cache")
verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, []string{"d"})
}

0 comments on commit e705a9d

Please sign in to comment.