Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Linear cache: Add new UpdateResources method to update multiple resources at once without doing a stow update and limit unneeded allocations when processing delta watches #546

Merged
merged 3 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,37 @@ type resourceContainer struct {

func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse {
// variables to build our response with
nextVersionMap := make(map[string]string)
filtered := make([]types.Resource, 0, len(resources.resourceMap))
toRemove := make([]string, 0)
var nextVersionMap map[string]string
var filtered []types.Resource
var toRemove []string

// If we are handling a wildcard request, we want to respond with all resources
switch {
case state.IsWildcard():
if len(state.GetResourceVersions()) == 0 {
filtered = make([]types.Resource, 0, len(resources.resourceMap))
}
nextVersionMap = make(map[string]string, len(resources.resourceMap))
for name, r := range resources.resourceMap {
// Since we've already precomputed the version hashes of the new snapshot,
// we can just set it here to be used for comparison later
version := resources.versionMap[name]
nextVersionMap[name] = version
prevVersion, found := state.GetResourceVersions()[name]
if !found || (prevVersion != nextVersionMap[name]) {
if !found || (prevVersion != version) {
filtered = append(filtered, r)
}
}

// Compute resources for removal
for name := range state.GetResourceVersions() {
if _, ok := resources.resourceMap[name]; !ok {
toRemove = append(toRemove, name)
}
}
default:
// Reply only with the requested resources
nextVersionMap = make(map[string]string, len(state.GetResourceVersions()))
for name, prevVersion := range state.GetResourceVersions() {
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
Expand All @@ -59,19 +71,15 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
} else {
// We track non-existent resources for non-wildcard streams until the client explicitly unsubscribes from them.
nextVersionMap[name] = ""
// The version check is to make sure we are only sending an update once right after removal.
// If the client keeps the subscription, we skip the add for every subsequent response.
if prevVersion != "" {
toRemove = append(toRemove, name)
}
}
}
}

// Compute resources for removal regardless of the request type
for name, prevVersion := range state.GetResourceVersions() {
// The prevVersion != "" check is in place to make sure we are only sending an update to the client once right after it is removed.
// If the client decides to keep the subscription we skip the add for every subsequent response.
if _, ok := resources.resourceMap[name]; !ok && prevVersion != "" {
toRemove = append(toRemove, name)
}
}

return &RawDeltaResponse{
DeltaRequest: req,
Resources: filtered,
Expand Down
26 changes: 26 additions & 0 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,32 @@ func (cache *LinearCache) DeleteResource(name string) error {
return nil
}

// UpdateResources updates/deletes a list of resources in the cache.
// Calling UpdateResources instead of iterating on UpdateResource and DeleteResource
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
// is significantly more efficient when using delta or wildcard watches.
func (cache *LinearCache) UpdateResources(toUpdate map[string]types.Resource, toDelete []string) error {
cache.mu.Lock()
defer cache.mu.Unlock()

cache.version++

modified := make(map[string]struct{}, len(toUpdate)+len(toDelete))
for name, resource := range toUpdate {
cache.versionVector[name] = cache.version
cache.resources[name] = resource
modified[name] = struct{}{}
}
for _, name := range toDelete {
delete(cache.versionVector, name)
delete(cache.resources, name)
modified[name] = struct{}{}
}

cache.notifyAll(modified)

return nil
}

// SetResources replaces current resources with a new set of resources.
// This function is useful for wildcard xDS subscriptions.
// This way watches that are subscribed to all resources are triggered only once regardless of how many resources are changed.
Expand Down
128 changes: 123 additions & 5 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -66,13 +67,13 @@ type resourceInfo struct {
version string
}

func verifyDeltaResponse(t *testing.T, ch <-chan DeltaResponse, resources []resourceInfo, deleted []string) {
func validateDeltaResponse(t *testing.T, resp DeltaResponse, resources []resourceInfo, deleted []string) {
t.Helper()
r := <-ch
if r.GetDeltaRequest().TypeUrl != testType {
t.Errorf("unexpected empty request type URL: %q", r.GetDeltaRequest().TypeUrl)

if resp.GetDeltaRequest().TypeUrl != testType {
t.Errorf("unexpected empty request type URL: %q", resp.GetDeltaRequest().TypeUrl)
}
out, err := r.GetDeltaDiscoveryResponse()
out, err := resp.GetDeltaDiscoveryResponse()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -115,6 +116,18 @@ func verifyDeltaResponse(t *testing.T, ch <-chan DeltaResponse, resources []reso
}
}

func verifyDeltaResponse(t *testing.T, ch <-chan DeltaResponse, resources []resourceInfo, deleted []string) {
t.Helper()
var r DeltaResponse
select {
case r = <-ch:
case <-time.After(5 * time.Second):
t.Error("timeout waiting for delta response")
return
}
validateDeltaResponse(t, r, resources, deleted)
}

func checkWatchCount(t *testing.T, c *LinearCache, name string, count int) {
t.Helper()
if i := c.NumWatches(name); i != count {
Expand Down Expand Up @@ -157,6 +170,14 @@ func hashResource(t *testing.T, resource types.Resource) string {
return v
}

func createWildcardDeltaWatch(c *LinearCache, w chan DeltaResponse) {
state := stream.NewStreamState(true, nil)
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
resp := <-w
state.SetResourceVersions(resp.GetNextVersionMap())
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) // Ensure the watch is set properly with cache values
}

func TestLinearInitialResources(t *testing.T) {
streamState := stream.NewStreamState(false, map[string]string{})
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
Expand Down Expand Up @@ -551,3 +572,100 @@ func TestLinearDeltaResourceDelete(t *testing.T) {
c.SetResources(map[string]types.Resource{"a": a})
verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, []string{"b"})
}

func TestLinearDeltaMultiResourceUpdates(t *testing.T) {
c := NewLinearCache(testType)

state := stream.NewStreamState(false, map[string]string{"a": "", "b": ""})
w := make(chan DeltaResponse, 1)

// Initial update
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
checkDeltaWatchCount(t, c, 1)
a := &endpoint.ClusterLoadAssignment{ClusterName: "a"}
hashA := hashResource(t, a)
b := &endpoint.ClusterLoadAssignment{ClusterName: "b"}
hashB := hashResource(t, b)
err := c.UpdateResources(map[string]types.Resource{"a": a, "b": b}, nil)
assert.NoError(t, err)
resp := <-w
validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil)
state.SetResourceVersions(resp.GetNextVersionMap())

// Multiple updates
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
checkDeltaWatchCount(t, c, 1)
a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update
{Priority: 10},
}}
b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update
{Priority: 15},
}}
hashA = hashResource(t, a)
hashB = hashResource(t, b)
err = c.UpdateResources(map[string]types.Resource{"a": a, "b": b}, nil)
assert.NoError(t, err)
resp = <-w
validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil)
state.SetResourceVersions(resp.GetNextVersionMap())

// Update/add/delete
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
checkDeltaWatchCount(t, c, 1)
a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update
{Priority: 15},
}}
d := &endpoint.ClusterLoadAssignment{ClusterName: "d", Endpoints: []*endpoint.LocalityLbEndpoints{}} // resource created, but not watched
hashA = hashResource(t, a)
err = c.UpdateResources(map[string]types.Resource{"a": a, "d": d}, []string{"b"})
assert.NoError(t, err)
assert.Contains(t, c.resources, "d", "resource with name d not found in cache")
assert.NotContains(t, c.resources, "b", "resource with name b was found in cache")
resp = <-w
validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}}, []string{"b"})
state.SetResourceVersions(resp.GetNextVersionMap())

// Re-add previously deleted watched resource
c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w)
mustBlockDelta(t, w)
checkDeltaWatchCount(t, c, 1)
b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{}} // recreate watched resource
hashB = hashResource(t, b)
err = c.UpdateResources(map[string]types.Resource{"b": b}, []string{"d"})
assert.NoError(t, err)
assert.Contains(t, c.resources, "b", "resource with name b not found in cache")
assert.NotContains(t, c.resources, "d", "resource with name d was found in cache")
resp = <-w
validateDeltaResponse(t, resp, []resourceInfo{{"b", hashB}}, nil) // d is not watched and should not be returned
state.SetResourceVersions(resp.GetNextVersionMap())

// Wildcard create/update
createWildcardDeltaWatch(c, w)
mustBlockDelta(t, w)
checkDeltaWatchCount(t, c, 1)
b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update
{Priority: 15},
}}
d = &endpoint.ClusterLoadAssignment{ClusterName: "d", Endpoints: []*endpoint.LocalityLbEndpoints{}} // resource create
hashB = hashResource(t, b)
hashD := hashResource(t, d)
err = c.UpdateResources(map[string]types.Resource{"b": b, "d": d}, nil)
assert.NoError(t, err)
verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"d", hashD}}, nil)

// Wildcard update/delete
createWildcardDeltaWatch(c, w)
mustBlockDelta(t, w)
checkDeltaWatchCount(t, c, 1)
a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update
{Priority: 25},
}}
hashA = hashResource(t, a)
err = c.UpdateResources(map[string]types.Resource{"a": a}, []string{"d"})
assert.NoError(t, err)
assert.NotContains(t, c.resources, "d", "resource with name d was found in cache")
verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, []string{"d"})
}