Skip to content

Commit

Permalink
Currently sotw and delta watches use different interfaces to track th…
Browse files Browse the repository at this point in the history
…e stream state

An assumption used to be that sotw watches are stateless and can be processed from the request itself. This was already altered in envoyproxy#508, but it only covers the usecases partially (e.g. it does not cover subscribing/unsubscribing to wildcard).
The current object passed to track the subscription provides multiple issues:
 - it is not an interface, restricting evolution or testing for users building a separate cache
 - it tracks some objects per types, other globally, based on sotw or delta, making it confusing and hard to use in a consistent way

This PR replaces the legacy streamState by the notion of Subscription per type. StreamState is not relevant to the cache, as it only requires to know the subscription status (e.g. the last nonce is not needed), while a stream can multiplex multiple types.
Further work will come to:
 - fill the subscription with the same attributes in sotw and delta, allowing to take proper decisions on what to reply
 - Fix multiple issues within the cache in sotw
 - Fix handling of explicit vs. legacy wildcard in sotw and delta

Signed-off-by: Valerian Roche <[email protected]>
  • Loading branch information
valerian-roche committed Feb 16, 2024
1 parent 0a8d393 commit fb60942
Show file tree
Hide file tree
Showing 18 changed files with 460 additions and 385 deletions.
33 changes: 30 additions & 3 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"google.golang.org/protobuf/types/known/durationpb"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
Expand All @@ -37,6 +36,34 @@ type Request = discovery.DiscoveryRequest
// DeltaRequest is an alias for the delta discovery request type.
type DeltaRequest = discovery.DeltaDiscoveryRequest

// Subscription stores the server view of the client state for a given resource type.
// This allows proper implementation of stateful aspects of the protocol (e.g. returning only some updated resources).
// Though the methods may return mutable parts of the state for performance reasons,
// the cache is expected to consider this state as immutable and thread safe between a watch creation and its cancellation.
type Subscription interface {
// ReturnedResources returns a list of resources that clients have ACK'd and their associated version.
// The versions are:
// - delta protocol: version of the specific resource set in the response
// - sotw protocol: version of the global response when the resource was last ACKed
ReturnedResources() map[string]string

// SubscribedResources returns the list of resources currently subscribed to by the client for the type.
// For delta it keeps track of subscription updates across requests
// For sotw it is a normalized view of the last request resources
SubscribedResources() map[string]struct{}

// IsWildcard returns whether the client has a wildcard watch.
// This considers subtleties related to the current migration of wildcard definitions within the protocol.
// More details on the behavior of wildcard are present at https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#how-the-client-specifies-what-resources-to-return
IsWildcard() bool

// WatchesResources returns whether at least one of the resources provided is currently being watched by the subscription.
// It is currently only applicable to delta-xds.
// If the request is wildcard, it will always return true,
// otherwise it will compare the provided resources to the list of resources currently subscribed
WatchesResources(resourceNames map[string]struct{}) bool
}

// ConfigWatcher requests watches for configuration resources by a node, last
// applied version identifier, and resource names hint. The watch should send
// the responses when they are ready. The watch can be canceled by the
Expand All @@ -54,7 +81,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateWatch(*Request, stream.StreamState, chan Response) (cancel func())
CreateWatch(*Request, Subscription, chan Response) (cancel func(), err error)

// CreateDeltaWatch returns a new open incremental xDS watch.
// This is the entrypoint to propagate configuration changes the
Expand All @@ -66,7 +93,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func())
CreateDeltaWatch(*DeltaRequest, Subscription, chan DeltaResponse) (cancel func(), err error)
}

// ConfigFetcher fetches configuration resources from cache
Expand Down
17 changes: 8 additions & 9 deletions pkg/cache/v3/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

// groups together resource-related arguments for the createDeltaResponse function
Expand All @@ -28,16 +27,16 @@ type resourceContainer struct {
systemVersion string
}

func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.StreamState, resources resourceContainer) *RawDeltaResponse {
func createDeltaResponse(ctx context.Context, req *DeltaRequest, sub Subscription, resources resourceContainer) *RawDeltaResponse {
// variables to build our response with
var nextVersionMap map[string]string
var filtered []types.Resource
var toRemove []string

// If we are handling a wildcard request, we want to respond with all resources
switch {
case state.IsWildcard():
if len(state.GetResourceVersions()) == 0 {
case sub.IsWildcard():
if len(sub.ReturnedResources()) == 0 {
filtered = make([]types.Resource, 0, len(resources.resourceMap))
}
nextVersionMap = make(map[string]string, len(resources.resourceMap))
Expand All @@ -46,25 +45,25 @@ func createDeltaResponse(ctx context.Context, req *DeltaRequest, state stream.St
// we can just set it here to be used for comparison later
version := resources.versionMap[name]
nextVersionMap[name] = version
prevVersion, found := state.GetResourceVersions()[name]
prevVersion, found := sub.ReturnedResources()[name]
if !found || (prevVersion != version) {
filtered = append(filtered, r)
}
}

// Compute resources for removal
// The resource version can be set to "" here to trigger a removal even if never returned before
for name := range state.GetResourceVersions() {
for name := range sub.ReturnedResources() {
if _, ok := resources.resourceMap[name]; !ok {
toRemove = append(toRemove, name)
}
}
default:
nextVersionMap = make(map[string]string, len(state.GetSubscribedResourceNames()))
nextVersionMap = make(map[string]string, len(sub.SubscribedResources()))
// state.GetResourceVersions() may include resources no longer subscribed
// In the current code this gets silently cleaned when updating the version map
for name := range state.GetSubscribedResourceNames() {
prevVersion, found := state.GetResourceVersions()[name]
for name := range sub.SubscribedResources() {
prevVersion, found := sub.ReturnedResources()[name]
if r, ok := resources.resourceMap[name]; ok {
nextVersion := resources.versionMap[name]
if prevVersion != nextVersion {
Expand Down
41 changes: 22 additions & 19 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.NewStreamState(true, nil), watches[typ])
}, stream.NewSubscription(true, nil), watches[typ])
}

if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
Expand All @@ -68,17 +68,17 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
// all resources as well as individual resource removals
for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(false, versionMap[typ])
sub := stream.NewSubscription(false, versionMap[typ])
for resource := range versionMap[typ] {
state.GetSubscribedResourceNames()[resource] = struct{}{}
sub.SubscribedResources()[resource] = struct{}{}
}
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, state, watches[typ])
}, sub, watches[typ])
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
Expand Down Expand Up @@ -111,20 +111,20 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
func TestDeltaRemoveResources(t *testing.T) {
c := cache.NewSnapshotCache(false, group{}, logger{t: t})
watches := make(map[string]chan cache.DeltaResponse)
streams := make(map[string]*stream.StreamState)
subscriptions := make(map[string]*stream.Subscription)

for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
state := stream.NewStreamState(true, make(map[string]string))
streams[typ] = &state
sub := stream.NewSubscription(true, make(map[string]string))
subscriptions[typ] = &sub
// We don't specify any resource name subscriptions here because we want to make sure we test wildcard
// functionality. This means we should receive all resources back without requesting a subscription by name.
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
}, *streams[typ], watches[typ])
}, *subscriptions[typ], watches[typ])
}

if err := c.SetSnapshot(context.Background(), key, fixture.snapshot()); err != nil {
Expand All @@ -138,7 +138,7 @@ func TestDeltaRemoveResources(t *testing.T) {
snapshot := fixture.snapshot()
assertResourceMapEqual(t, cache.IndexRawResourcesByName(out.(*cache.RawDeltaResponse).Resources), snapshot.GetResources(typ))
nextVersionMap := out.GetNextVersionMap()
streams[typ].SetResourceVersions(nextVersionMap)
subscriptions[typ].SetReturnedResources(nextVersionMap)
case <-time.After(time.Second):
t.Fatal("failed to receive a snapshot response")
}
Expand All @@ -153,8 +153,9 @@ func TestDeltaRemoveResources(t *testing.T) {
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
}, *streams[typ], watches[typ])
TypeUrl: typ,
ResponseNonce: "nonce",
}, *subscriptions[typ], watches[typ])
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
Expand All @@ -177,7 +178,7 @@ func TestDeltaRemoveResources(t *testing.T) {
nextVersionMap := out.GetNextVersionMap()

// make sure the version maps are different since we no longer are tracking any endpoint resources
if reflect.DeepEqual(streams[testTypes[0]].GetResourceVersions(), nextVersionMap) {
if reflect.DeepEqual(subscriptions[testTypes[0]].ReturnedResources(), nextVersionMap) {
t.Fatalf("versionMap for the endpoint resource type did not change, received: %v, instead of an empty map", nextVersionMap)
}
case <-time.After(time.Second):
Expand All @@ -204,14 +205,15 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
t.Fatalf("snapshot failed: %s", err)
}
} else {
cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: id,
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: []string{clusterName},
}, stream.NewStreamState(false, make(map[string]string)), responses)
}, stream.NewSubscription(false, make(map[string]string)), responses)

require.NoError(t, err)
defer cancel()
}
})
Expand All @@ -226,15 +228,15 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) {

// Create a non-buffered channel that will block sends.
watchCh := make(chan cache.DeltaResponse)
state := stream.NewStreamState(false, nil)
state.SetSubscribedResourceNames(map[string]struct{}{names[rsrc.EndpointType][0]: {}})
sub := stream.NewSubscription(false, nil)
sub.SubscribedResources()[names[rsrc.EndpointType][0]] = struct{}{}
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: names[rsrc.EndpointType],
}, state, watchCh)
}, sub, watchCh)

// The first time we set the snapshot without consuming from the blocking channel, so this should time out.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
Expand Down Expand Up @@ -270,13 +272,14 @@ func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
for _, typ := range testTypes {
responses := make(chan cache.DeltaResponse, 1)
cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
cancel, err := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.NewStreamState(false, make(map[string]string)), responses)
}, stream.NewSubscription(false, make(map[string]string)), responses)
require.NoError(t, err)

// Cancel the watch
cancel()
Expand Down
35 changes: 17 additions & 18 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ package cache
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/log"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

type watches = map[ResponseWatch]struct{}
Expand Down Expand Up @@ -178,20 +178,20 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
}

for id, watch := range cache.deltaWatches {
if !watch.StreamState.WatchesResources(modified) {
if !watch.subscription.WatchesResources(modified) {
continue
}

res := cache.respondDelta(watch.Request, watch.Response, watch.StreamState)
res := cache.respondDelta(watch.Request, watch.Response, watch.subscription)
if res != nil {
delete(cache.deltaWatches, id)
}
}
}
}

func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) *RawDeltaResponse {
resp := createDeltaResponse(context.Background(), request, state, resourceContainer{
func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaResponse, sub Subscription) *RawDeltaResponse {
resp := createDeltaResponse(context.Background(), request, sub, resourceContainer{
resourceMap: cache.resources,
versionMap: cache.versionMap,
systemVersion: cache.getVersion(),
Expand All @@ -201,7 +201,7 @@ func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaRe
if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 {
if cache.log != nil {
cache.log.Debugf("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t",
request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard())
request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, sub.IsWildcard())
}
value <- resp
return resp
Expand Down Expand Up @@ -312,10 +312,9 @@ func (cache *LinearCache) GetResources() map[string]types.Resource {
return resources
}

func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, value chan Response) func() {
func (cache *LinearCache) CreateWatch(request *Request, _ Subscription, value chan Response) (func(), error) {
if request.GetTypeUrl() != cache.typeURL {
value <- nil
return nil
return nil, fmt.Errorf("request type %s does not match cache type %s", request.GetTypeUrl(), cache.typeURL)
}
// If the version is not up to date, check whether any requested resource has
// been updated between the last version and the current version. This avoids the problem
Expand Down Expand Up @@ -361,7 +360,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
}
if stale {
cache.respond(watch, staleResources)
return nil
return nil, nil
}
// Create open watches since versions are up to date.
if len(request.GetResourceNames()) == 0 {
Expand All @@ -371,7 +370,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
cache.mu.Lock()
defer cache.mu.Unlock()
delete(cache.watchAll, watch)
}
}, nil
}

cache.log.Infof("[linear cache] open watch for %s resources %v, system version %q", cache.typeURL, request.ResourceNames, cache.getVersion())
Expand All @@ -387,7 +386,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
cache.mu.Lock()
defer cache.mu.Unlock()
cache.removeWatch(watch)
}
}, nil
}

// Must be called under lock
Expand All @@ -403,7 +402,7 @@ func (cache *LinearCache) removeWatch(watch ResponseWatch) {
}
}

func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() {
func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, sub Subscription, value chan DeltaResponse) (func(), error) {
cache.mu.Lock()
defer cache.mu.Unlock()

Expand All @@ -420,23 +419,23 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S
cache.log.Errorf("failed to update version map: %v", err)
}
}
response := cache.respondDelta(request, value, state)
response := cache.respondDelta(request, value, sub)

// if respondDelta returns nil this means that there is no change in any resource version
// create a new watch accordingly
if response == nil {
watchID := cache.nextDeltaWatchID()
if cache.log != nil {
cache.log.Infof("[linear cache] open delta watch ID:%d for %s Resources:%v, system version %q", watchID,
cache.typeURL, state.GetSubscribedResourceNames(), cache.getVersion())
cache.typeURL, sub.SubscribedResources(), cache.getVersion())
}

cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, StreamState: state}
cache.deltaWatches[watchID] = DeltaResponseWatch{Request: request, Response: value, subscription: sub}

return cache.cancelDeltaWatch(watchID)
return cache.cancelDeltaWatch(watchID), nil
}

return nil
return nil, nil
}

func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error {
Expand Down
Loading

0 comments on commit fb60942

Please sign in to comment.