Skip to content

Commit

Permalink
cache: rework snapshot api (#484)
Browse files Browse the repository at this point in the history
* rework new snapshot api to remove the need of API updates whenever a new xDS server is implemented

Signed-off-by: Alec Holmes <[email protected]>
  • Loading branch information
alecholmez authored Aug 31, 2021
1 parent 67485fd commit 4ca66c8
Show file tree
Hide file tree
Showing 13 changed files with 257 additions and 182 deletions.
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed h1:OZmjad4L3H8ncOIR8rnb5MREYqG8ixi5+WbeUsquF0c=
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158 h1:CevA8fI91PAnP8vpnXuB8ZYAZ5wqY86nAbxfgK8tWO4=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
Expand Down
16 changes: 7 additions & 9 deletions internal/example/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,12 @@ func makeConfigSource() *core.ConfigSource {
}

func GenerateSnapshot() cache.Snapshot {
return cache.NewSnapshot(
"1",
[]types.Resource{}, // endpoints
[]types.Resource{makeCluster(ClusterName)},
[]types.Resource{makeRoute(RouteName, ClusterName)},
[]types.Resource{makeHTTPListener(ListenerName, RouteName)},
[]types.Resource{}, // runtimes
[]types.Resource{}, // secrets
[]types.Resource{}, // extension configs
snap, _ := cache.NewSnapshot("1",
map[resource.Type][]types.Resource{
resource.ClusterType: {makeCluster(ClusterName)},
resource.RouteType: {makeRoute(RouteName, ClusterName)},
resource.ListenerType: {makeHTTPListener(ListenerName, RouteName)},
},
)
return snap
}
5 changes: 4 additions & 1 deletion pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
id := fmt.Sprintf("%d", i%2)
responses := make(chan cache.DeltaResponse, 1)
if i < 25 {
snap := cache.Snapshot{}
snap, err := cache.NewSnapshot("", map[rsrc.Type][]types.Resource{})
if err != nil {
t.Fatal(err)
}
snap.Resources[types.Endpoint] = cache.NewResources(version, []types.Resource{resource.MakeEndpoint(clusterName, uint32(i))})
if err := c.SetSnapshot(context.Background(), key, snap); err != nil {
t.Fatalf("snapshot failed: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/v3/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

// GetResponseType returns the enumeration for a valid xDS type URL
func GetResponseType(typeURL string) types.ResponseType {
func GetResponseType(typeURL resource.Type) types.ResponseType {
switch typeURL {
case resource.EndpointType:
return types.Endpoint
Expand Down
47 changes: 47 additions & 0 deletions pkg/cache/v3/resources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package cache

import "github.com/envoyproxy/go-control-plane/pkg/cache/types"

// Resources is a versioned group of resources.
type Resources struct {
// Version information.
Version string

// Items in the group indexed by name.
Items map[string]types.ResourceWithTTL
}

// IndexResourcesByName creates a map from the resource name to the resource.
func IndexResourcesByName(items []types.ResourceWithTTL) map[string]types.ResourceWithTTL {
indexed := make(map[string]types.ResourceWithTTL)
for _, item := range items {
indexed[GetResourceName(item.Resource)] = item
}
return indexed
}

// IndexRawResourcesByName creates a map from the resource name to the resource.
func IndexRawResourcesByName(items []types.Resource) map[string]types.Resource {
indexed := make(map[string]types.Resource)
for _, item := range items {
indexed[GetResourceName(item)] = item
}
return indexed
}

// NewResources creates a new resource group.
func NewResources(version string, items []types.Resource) Resources {
itemsWithTTL := []types.ResourceWithTTL{}
for _, item := range items {
itemsWithTTL = append(itemsWithTTL, types.ResourceWithTTL{Resource: item})
}
return NewResourcesWithTTL(version, itemsWithTTL)
}

// NewResourcesWithTTL creates a new resource group.
func NewResourcesWithTTL(version string, items []types.ResourceWithTTL) Resources {
return Resources{
Version: version,
Items: IndexResourcesByName(items),
}
}
84 changes: 84 additions & 0 deletions pkg/cache/v3/resources_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package cache_test

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
)

func TestIndexResourcesByName(t *testing.T) {
tests := []struct {
name string
resources []types.ResourceWithTTL
want map[string]types.ResourceWithTTL
}{
{
name: "empty",
resources: nil,
want: map[string]types.ResourceWithTTL{},
},
{
name: "more than one",
resources: []types.ResourceWithTTL{
{Resource: testEndpoint, TTL: &ttl},
{Resource: testRoute, TTL: &ttl},
},
want: map[string]types.ResourceWithTTL{
"cluster0": {Resource: testEndpoint, TTL: &ttl},
"route0": {Resource: testRoute, TTL: &ttl},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := cache.IndexResourcesByName(tt.resources)
assert.Equal(t, tt.want, got)
})
}
}

func TestIndexRawResourceByName(t *testing.T) {
tests := []struct {
name string
resources []types.Resource
want map[string]types.Resource
}{
{
name: "empty",
resources: nil,
want: map[string]types.Resource{},
},
{
name: "more than one",
resources: []types.Resource{
testEndpoint,
testRoute,
},
want: map[string]types.Resource{
"cluster0": testEndpoint,
"route0": testRoute,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := cache.IndexRawResourcesByName(tt.resources)
assert.Equal(t, tt.want, got)
})
}
}

func TestNewResources(t *testing.T) {
resources := cache.NewResources("x", []types.Resource{
testEndpoint,
testRoute,
})

assert.NotNil(t, resources.Items)
assert.Equal(t, "x", resources.Version)
}
8 changes: 4 additions & 4 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) {
for id, watch := range info.watches {
// Respond with the current version regardless of whether the version has changed.
version := snapshot.GetVersion(watch.Request.TypeUrl)
resources := snapshot.GetResourcesAndTtl(watch.Request.TypeUrl)
resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl)

// TODO(snowp): Construct this once per type instead of once per watch.
resourcesWithTTL := map[string]types.ResourceWithTTL{}
Expand Down Expand Up @@ -201,7 +201,7 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
if cache.log != nil {
cache.log.Debugf("respond open watch %d%v with new version %q", id, watch.Request.ResourceNames, version)
}
resources := snapshot.GetResourcesAndTtl(watch.Request.TypeUrl)
resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl)
err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false)
if err != nil {
return err
Expand Down Expand Up @@ -320,7 +320,7 @@ func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) f
}

// otherwise, the watch may be responded immediately
resources := snapshot.GetResourcesAndTtl(request.TypeUrl)
resources := snapshot.GetResourcesAndTTL(request.TypeUrl)
_ = cache.respond(context.Background(), request, value, resources, version, false)

return nil
Expand Down Expand Up @@ -517,7 +517,7 @@ func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Respon
return nil, &types.SkipFetchError{}
}

resources := snapshot.GetResourcesAndTtl(request.TypeUrl)
resources := snapshot.GetResourcesAndTTL(request.TypeUrl)
out := createResponse(ctx, request, resources, version, false)
return out, nil
}
Expand Down
58 changes: 30 additions & 28 deletions pkg/cache/v3/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,26 @@ var (
version = "x"
version2 = "y"

snapshot = cache.NewSnapshot(version,
[]types.Resource{testEndpoint},
[]types.Resource{testCluster},
[]types.Resource{testRoute},
[]types.Resource{testListener},
[]types.Resource{testRuntime},
[]types.Resource{testSecret[0]},
[]types.Resource{testExtensionConfig})

ttl = 2 * time.Second

snapshotWithTTL = cache.NewSnapshotWithTtls(version,
[]types.ResourceWithTTL{{Resource: testEndpoint, TTL: &ttl}},
[]types.ResourceWithTTL{{Resource: testCluster}},
[]types.ResourceWithTTL{{Resource: testRoute}},
[]types.ResourceWithTTL{{Resource: testListener}},
[]types.ResourceWithTTL{{Resource: testRuntime}},
[]types.ResourceWithTTL{{Resource: testSecret[0]}})
snapshot, _ = cache.NewSnapshot(version, map[rsrc.Type][]types.Resource{
rsrc.EndpointType: {testEndpoint},
rsrc.ClusterType: {testCluster},
rsrc.RouteType: {testRoute},
rsrc.ListenerType: {testListener},
rsrc.RuntimeType: {testRuntime},
rsrc.SecretType: {testSecret[0]},
rsrc.ExtensionConfigType: {testExtensionConfig},
})

ttl = 2 * time.Second
snapshotWithTTL, _ = cache.NewSnapshotWithTTLs(version, map[rsrc.Type][]types.ResourceWithTTL{
rsrc.EndpointType: {{Resource: testEndpoint, TTL: &ttl}},
rsrc.ClusterType: {{Resource: testCluster}},
rsrc.RouteType: {{Resource: testRoute}},
rsrc.ListenerType: {{Resource: testListener}},
rsrc.RuntimeType: {{Resource: testRuntime}},
rsrc.SecretType: {{Resource: testSecret[0]}},
rsrc.ExtensionConfigType: {{Resource: testExtensionConfig}},
})

names = map[string][]string{
rsrc.EndpointType: {clusterName},
Expand Down Expand Up @@ -94,7 +96,7 @@ func (log logger) Infof(format string, args ...interface{}) { log.t.Logf(format
func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format, args...) }
func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) }

func TestSnapshotCacheWithTtl(t *testing.T) {
func TestSnapshotCacheWithTTL(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := cache.NewSnapshotCacheWithHeartbeating(ctx, true, group{}, logger{t: t}, time.Second)
Expand Down Expand Up @@ -128,8 +130,8 @@ func TestSnapshotCacheWithTtl(t *testing.T) {
if gotVersion, _ := out.GetVersion(); gotVersion != version {
t.Errorf("got version %q, want %q", gotVersion, version)
}
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTtl(typ)) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTTL.GetResourcesAndTtl(typ))
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTTL(typ)) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTTL.GetResourcesAndTTL(typ))
}
case <-time.After(2 * time.Second):
t.Errorf("failed to receive snapshot response")
Expand All @@ -156,11 +158,11 @@ func TestSnapshotCacheWithTtl(t *testing.T) {
if gotVersion, _ := out.GetVersion(); gotVersion != version {
t.Errorf("got version %q, want %q", gotVersion, version)
}
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTtl(typ)) {
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTTL(typ)) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTTL.GetResources(typ))
}

if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTtl(typ)) {
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTTL.GetResourcesAndTTL(typ)) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTTL.GetResources(typ))
}

Expand Down Expand Up @@ -222,8 +224,8 @@ func TestSnapshotCache(t *testing.T) {
if gotVersion, _ := out.GetVersion(); gotVersion != version {
t.Errorf("got version %q, want %q", gotVersion, version)
}
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTtl(typ)) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTtl(typ))
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTTL(typ)) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTTL(typ))
}
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
Expand Down Expand Up @@ -280,8 +282,8 @@ func TestSnapshotCacheWatch(t *testing.T) {
if gotVersion, _ := out.GetVersion(); gotVersion != version {
t.Errorf("got version %q, want %q", gotVersion, version)
}
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTtl(typ)) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTtl(typ))
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTTL(typ)) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTTL(typ))
}
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
Expand Down Expand Up @@ -315,7 +317,7 @@ func TestSnapshotCacheWatch(t *testing.T) {
t.Errorf("got version %q, want %q", gotVersion, version2)
}
if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot2.Resources[types.Endpoint].Items) {
t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot2.Resources[types.Endpoint].Items)
t.Errorf("got resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot2.Resources[types.Endpoint].Items)
}
case <-time.After(time.Second):
t.Fatal("failed to receive snapshot response")
Expand Down
Loading

0 comments on commit 4ca66c8

Please sign in to comment.