Skip to content

Commit

Permalink
Merge pull request #5 from valerian-roche/vr/resource-returned
Browse files Browse the repository at this point in the history
[sotw][issue-540] Return full state when applicable for watches in linear cache
  • Loading branch information
valerian-roche authored Jan 16, 2024
2 parents 4709834 + 24a2de3 commit 9068fc5
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 14 deletions.
26 changes: 25 additions & 1 deletion pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
versionMap: nil,
version: 0,
versionVector: make(map[string]uint64),
log: log.NewDefaultLogger(),
}
for _, opt := range opts {
opt(out)
Expand All @@ -117,11 +118,24 @@ func (cache *LinearCache) respond(watch ResponseWatch, staleResources []string)
var resources []types.ResourceWithTTL
// TODO: optimize the resources slice creations across different clients
if len(staleResources) == 0 {
// Wildcard case, we return all resources in the cache
resources = make([]types.ResourceWithTTL, 0, len(cache.resources))
for _, resource := range cache.resources {
resources = append(resources, types.ResourceWithTTL{Resource: resource})
}
} else if ResourceRequiresFullStateInSotw(cache.typeURL) {
// Non-wildcard request for a type requiring full state response
// We need to return all requested resources, if existing, for this type
requestedResources := watch.Request.GetResourceNames()
resources = make([]types.ResourceWithTTL, 0, len(requestedResources))
for _, resource := range requestedResources {
resource := cache.resources[resource]
if resource != nil {
resources = append(resources, types.ResourceWithTTL{Resource: resource})
}
}
} else {
// Non-wildcard request for other types. Only return stale resources
resources = make([]types.ResourceWithTTL, 0, len(staleResources))
for _, name := range staleResources {
resource := cache.resources[name]
Expand Down Expand Up @@ -327,8 +341,12 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
case err != nil:
stale = true
staleResources = request.GetResourceNames()
cache.log.Debugf("Watch is stale as version failed to parse %s", err.Error())
case len(request.GetResourceNames()) == 0:
stale = lastVersion != cache.version
stale = (lastVersion != cache.version)
if stale {
cache.log.Debugf("Watch is stale as cache version %d differs for wildcard watch %d", cache.version, lastVersion)
}
default:
for _, name := range request.GetResourceNames() {
// When a resource is removed, its version defaults 0 and it is not considered stale.
Expand All @@ -337,20 +355,26 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
staleResources = append(staleResources, name)
}
}
if stale {
cache.log.Debugf("Watch is stale with stale resources %v", staleResources)
}
}
if stale {
cache.respond(watch, staleResources)
return nil
}
// Create open watches since versions are up to date.
if len(request.GetResourceNames()) == 0 {
cache.log.Infof("[linear cache] open watch for %s all resources, system version %q", cache.typeURL, cache.getVersion())
cache.watchAll[watch] = struct{}{}
return func() {
cache.mu.Lock()
defer cache.mu.Unlock()
delete(cache.watchAll, watch)
}
}

cache.log.Infof("[linear cache] open watch for %s resources %v, system version %q", cache.typeURL, request.ResourceNames, cache.getVersion())
for _, name := range request.GetResourceNames() {
set, exists := cache.watches[name]
if !exists {
Expand Down
117 changes: 104 additions & 13 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/wrapperspb"

cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/log"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

Expand All @@ -38,18 +42,18 @@ func testResource(s string) types.Resource {
return wrapperspb.String(s)
}

func verifyResponse(t *testing.T, ch <-chan Response, version string, num int) {
func verifyResponseContent(t *testing.T, ch <-chan Response, expectedType string, expectedVersion string) (Response, *discovery.DiscoveryResponse) {
t.Helper()
var r Response
select {
case r = <-ch:
case <-time.After(1 * time.Second):
t.Error("failed to receive response after 1 second")
return
return nil, nil
}

if r.GetRequest().GetTypeUrl() != testType {
t.Errorf("unexpected empty request type URL: %q", r.GetRequest().GetTypeUrl())
if r.GetRequest().GetTypeUrl() != expectedType {
t.Errorf("unexpected request type URL: %q", r.GetRequest().GetTypeUrl())
}
if r.GetContext() == nil {
t.Errorf("unexpected empty response context")
Expand All @@ -61,18 +65,41 @@ func verifyResponse(t *testing.T, ch <-chan Response, version string, num int) {
if out.GetVersionInfo() == "" {
t.Error("unexpected response empty version")
}
if n := len(out.GetResources()); n != num {
t.Errorf("unexpected number of responses: got %d, want %d", n, num)
}
if version != "" && out.GetVersionInfo() != version {
t.Errorf("unexpected version: got %q, want %q", out.GetVersionInfo(), version)
if expectedVersion != "" && out.GetVersionInfo() != expectedVersion {
t.Errorf("unexpected version: got %q, want %q", out.GetVersionInfo(), expectedVersion)
}
if out.GetTypeUrl() != testType {
if out.GetTypeUrl() != expectedType {
t.Errorf("unexpected type URL: %q", out.GetTypeUrl())
}
if len(r.GetRequest().GetResourceNames()) != 0 && len(r.GetRequest().GetResourceNames()) < len(out.Resources) {
t.Errorf("received more resources (%d) than requested (%d)", len(r.GetRequest().GetResourceNames()), len(out.Resources))
}
return r, out
}

func verifyResponse(t *testing.T, ch <-chan Response, expectedVersion string, expectedResourcesNb int) {
t.Helper()
_, r := verifyResponseContent(t, ch, testType, expectedVersion)
if r == nil {
return
}
if n := len(r.GetResources()); n != expectedResourcesNb {
t.Errorf("unexpected number of responses: got %d, want %d", n, expectedResourcesNb)
}
}

func verifyResponseResources(t *testing.T, ch <-chan Response, expectedType string, expectedVersion string, expectedResources ...string) {
t.Helper()
r, _ := verifyResponseContent(t, ch, expectedType, expectedVersion)
if r == nil {
return
}
out := r.(*RawResponse)
resourceNames := []string{}
for _, res := range out.Resources {
resourceNames = append(resourceNames, GetResourceName(res.Resource))
}
assert.ElementsMatch(t, resourceNames, expectedResources)
}

type resourceInfo struct {
Expand Down Expand Up @@ -172,6 +199,7 @@ func checkVersionMapSet(t *testing.T, c *LinearCache) {
}

func mustBlock(t *testing.T, w <-chan Response) {
t.Helper()
select {
case <-w:
t.Error("watch must block")
Expand All @@ -180,6 +208,7 @@ func mustBlock(t *testing.T, w <-chan Response) {
}

func mustBlockDelta(t *testing.T, w <-chan DeltaResponse) {
t.Helper()
select {
case <-w:
t.Error("watch must block")
Expand All @@ -188,6 +217,7 @@ func mustBlockDelta(t *testing.T, w <-chan DeltaResponse) {
}

func hashResource(t *testing.T, resource types.Resource) string {
t.Helper()
marshaledResource, err := MarshalResource(resource)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -815,7 +845,7 @@ func TestLinearSotwWatches(t *testing.T) {
}}
err = cache.UpdateResources(map[string]types.Resource{"a": a}, nil)
require.NoError(t, err)
verifyResponse(t, w, cache.getVersion(), 1)
verifyResponseResources(t, w, testType, cache.getVersion(), "a")
checkVersionMapNotSet(t, cache)

assert.Empty(t, cache.watches["a"])
Expand All @@ -839,7 +869,7 @@ func TestLinearSotwWatches(t *testing.T) {
assert.Empty(t, cache.watches["c"])

require.NoError(t, err)
verifyResponse(t, w, cache.getVersion(), 1)
verifyResponseResources(t, w, testType, cache.getVersion(), "b")
checkVersionMapNotSet(t, cache)

w = make(chan Response, 1)
Expand All @@ -853,11 +883,72 @@ func TestLinearSotwWatches(t *testing.T) {
}}
err = cache.UpdateResources(map[string]types.Resource{"c": c}, nil)
require.NoError(t, err)
verifyResponse(t, w, cache.getVersion(), 1)
verifyResponseResources(t, w, testType, cache.getVersion(), "c")
checkVersionMapNotSet(t, cache)

assert.Empty(t, cache.watches["a"])
assert.Empty(t, cache.watches["b"])
assert.Empty(t, cache.watches["c"])
})

t.Run("watches return full state for types requesting it", func(t *testing.T) {
a := &cluster.Cluster{Name: "a"}
b := &cluster.Cluster{Name: "b"}
c := &cluster.Cluster{Name: "c"}
// ClusterType requires all resources to always be returned
cache := NewLinearCache(resource.ClusterType, WithInitialResources(map[string]types.Resource{
"a": a,
"b": b,
"c": c,
}), WithLogger(log.NewTestLogger(t)))
assert.Equal(t, 3, cache.NumResources())

// Non-wildcard request
nonWildcardState := stream.NewStreamState(false, nil)
w1 := make(chan Response, 1)
_ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, nonWildcardState, w1)
mustBlock(t, w1)
checkVersionMapNotSet(t, cache)

// wildcard request
wildcardState := stream.NewStreamState(true, nil)
w2 := make(chan Response, 1)
_ = cache.CreateWatch(&Request{ResourceNames: nil, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, wildcardState, w2)
mustBlock(t, w2)
checkVersionMapNotSet(t, cache)

// request not requesting b
otherState := stream.NewStreamState(false, nil)
w3 := make(chan Response, 1)
_ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "c", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, otherState, w3)
mustBlock(t, w3)
checkVersionMapNotSet(t, cache)

b.AltStatName = "othername"
err := cache.UpdateResources(map[string]types.Resource{"b": b}, nil)
require.NoError(t, err)

// Other watch has not triggered
mustBlock(t, w3)

verifyResponseResources(t, w1, resource.ClusterType, cache.getVersion(), "a", "b") // a is also returned as cluster requires full state
verifyResponseResources(t, w2, resource.ClusterType, cache.getVersion(), "a", "b", "c") // a and c are also returned wildcard

// Recreate the watches
w1 = make(chan Response, 1)
_ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, nonWildcardState, w1)
mustBlock(t, w1)
w2 = make(chan Response, 1)
_ = cache.CreateWatch(&Request{ResourceNames: nil, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, wildcardState, w2)
mustBlock(t, w2)

// Update d, new resource in the cache
d := &cluster.Cluster{Name: "d"}
err = cache.UpdateResource("d", d)
require.NoError(t, err)

verifyResponseResources(t, w1, resource.ClusterType, cache.getVersion(), "a", "b", "d")
verifyResponseResources(t, w2, resource.ClusterType, cache.getVersion(), "a", "b", "c", "d")
verifyResponseResources(t, w3, resource.ClusterType, cache.getVersion(), "a", "c", "d")
})
}
18 changes: 18 additions & 0 deletions pkg/cache/v3/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,24 @@ func GetResourceName(res types.Resource) string {
}
}

// ResourceRequiresFullStateInSotw indicates whether when building the reply in Sotw,
// the response must include all existing resources or can return only the modified ones
func ResourceRequiresFullStateInSotw(typeURL resource.Type) bool {
// From https://www.envoyproxy.io/docs/envoy/v1.28.0/api-docs/xds_protocol#grouping-resources-into-responses,
// when using sotw the control-plane MUST return all requested resources (or simply all if wildcard)
// for some types. This is relied on by xds-grpc which is explicitly requesting clusters and listeners
// but expects to receive all existing resources for those types. Missing clusters or listeners are
// considered deleted.
switch typeURL {
case resource.ClusterType:
return true
case resource.ListenerType:
return true
default:
return false
}
}

// GetResourceName returns the resource names for a list of valid xDS response types.
func GetResourceNames(resources []types.Resource) []string {
out := make([]string, len(resources))
Expand Down
33 changes: 33 additions & 0 deletions pkg/log/test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package log

import "testing"

type testLogger struct {
t testing.TB
}

var _ Logger = testLogger{}

func NewTestLogger(t testing.TB) Logger {
return testLogger{t}
}

// Debugf logs a message at level debug on the test logger.
func (l testLogger) Debugf(msg string, args ...interface{}) {
l.t.Logf("[debug] "+msg, args...)
}

// Infof logs a message at level info on the test logger.
func (l testLogger) Infof(msg string, args ...interface{}) {
l.t.Logf("[info] "+msg, args...)
}

// Warnf logs a message at level warn on the test logger.
func (l testLogger) Warnf(msg string, args ...interface{}) {
l.t.Logf("[warn] "+msg, args...)
}

// Errorf logs a message at level error on the test logger.
func (l testLogger) Errorf(msg string, args ...interface{}) {
l.t.Logf("[error] "+msg, args...)
}

0 comments on commit 9068fc5

Please sign in to comment.