Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement tracking of known resource names by caller stream. Fixes #399 (Attempt #2) #508

Merged
merged 1 commit into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type ConfigWatcher interface {
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateWatch(*Request, chan Response) (cancel func())
CreateWatch(*Request, stream.StreamState, chan Response) (cancel func())

// CreateDeltaWatch returns a new open incremental xDS watch.
//
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (cache *LinearCache) GetResources() map[string]types.Resource {
return resources
}

func (cache *LinearCache) CreateWatch(request *Request, value chan Response) func() {
func (cache *LinearCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() {
if request.TypeUrl != cache.typeURL {
value <- nil
return nil
Expand Down
69 changes: 39 additions & 30 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,23 +158,25 @@ func hashResource(t *testing.T, resource types.Resource) string {
}

func TestLinearInitialResources(t *testing.T) {
streamState := stream.NewStreamState(false, map[string]string{})
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
w := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, w)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, streamState, w)
verifyResponse(t, w, "0", 1)
c.CreateWatch(&Request{TypeUrl: testType}, w)
c.CreateWatch(&Request{TypeUrl: testType}, streamState, w)
verifyResponse(t, w, "0", 2)
}

func TestLinearCornerCases(t *testing.T) {
streamState := stream.NewStreamState(false, map[string]string{})
c := NewLinearCache(testType)
err := c.UpdateResource("a", nil)
if err == nil {
t.Error("expected error on nil resource")
}
// create an incorrect type URL request
w := make(chan Response, 1)
c.CreateWatch(&Request{TypeUrl: "test"}, w)
c.CreateWatch(&Request{TypeUrl: "test"}, streamState, w)
select {
case r := <-w:
if r != nil {
Expand All @@ -186,15 +188,16 @@ func TestLinearCornerCases(t *testing.T) {
}

func TestLinearBasic(t *testing.T) {
streamState := stream.NewStreamState(false, map[string]string{})
c := NewLinearCache(testType)

// Create watches before a resource is ready
w1 := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
mustBlock(t, w1)

w := make(chan Response, 1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w)
mustBlock(t, w)
checkWatchCount(t, c, "a", 2)
checkWatchCount(t, c, "b", 1)
Expand All @@ -205,31 +208,32 @@ func TestLinearBasic(t *testing.T) {
verifyResponse(t, w, "1", 1)

// Request again, should get same response
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
checkWatchCount(t, c, "a", 0)
verifyResponse(t, w, "1", 1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w)
checkWatchCount(t, c, "a", 0)
verifyResponse(t, w, "1", 1)

// Add another element and update the first, response should be different
require.NoError(t, c.UpdateResource("b", testResource("b")))
require.NoError(t, c.UpdateResource("a", testResource("aa")))
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
verifyResponse(t, w, "3", 1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w)
verifyResponse(t, w, "3", 2)
}

func TestLinearSetResources(t *testing.T) {
streamState := stream.NewStreamState(false, map[string]string{})
c := NewLinearCache(testType)

// Create new resources
w1 := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
mustBlock(t, w1)
w2 := make(chan Response, 1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w2)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w2)
mustBlock(t, w2)
c.SetResources(map[string]types.Resource{
"a": testResource("a"),
Expand All @@ -239,9 +243,9 @@ func TestLinearSetResources(t *testing.T) {
verifyResponse(t, w2, "1", 2) // the version was only incremented once for all resources

// Add another element and update the first, response should be different
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w1)
mustBlock(t, w1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w2)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w2)
mustBlock(t, w2)
c.SetResources(map[string]types.Resource{
"a": testResource("aa"),
Expand All @@ -252,9 +256,9 @@ func TestLinearSetResources(t *testing.T) {
verifyResponse(t, w2, "2", 3)

// Delete resource
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, w1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, streamState, w1)
mustBlock(t, w1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, w2)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, streamState, w2)
mustBlock(t, w2)
c.SetResources(map[string]types.Resource{
"b": testResource("b"),
Expand Down Expand Up @@ -282,46 +286,49 @@ func TestLinearGetResources(t *testing.T) {
}

func TestLinearVersionPrefix(t *testing.T) {
streamState := stream.NewStreamState(false, map[string]string{})
c := NewLinearCache(testType, WithVersionPrefix("instance1-"))

w := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
verifyResponse(t, w, "instance1-0", 0)

require.NoError(t, c.UpdateResource("a", testResource("a")))
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
verifyResponse(t, w, "instance1-1", 1)

c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, w)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, streamState, w)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
}

func TestLinearDeletion(t *testing.T) {
streamState := stream.NewStreamState(false, map[string]string{})
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
w := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
require.NoError(t, c.DeleteResource("a"))
verifyResponse(t, w, "1", 0)
checkWatchCount(t, c, "a", 0)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w)
verifyResponse(t, w, "1", 1)
checkWatchCount(t, c, "b", 0)
require.NoError(t, c.DeleteResource("b"))
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w)
verifyResponse(t, w, "2", 0)
checkWatchCount(t, c, "b", 0)
}

func TestLinearWatchTwo(t *testing.T) {
streamState := stream.NewStreamState(false, map[string]string{})
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
w := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w)
mustBlock(t, w)
w1 := make(chan Response, 1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w1)
mustBlock(t, w1)
require.NoError(t, c.UpdateResource("a", testResource("aa")))
// should only get the modified resource
Expand All @@ -330,19 +337,20 @@ func TestLinearWatchTwo(t *testing.T) {
}

func TestLinearCancel(t *testing.T) {
streamState := stream.NewStreamState(false, map[string]string{})
c := NewLinearCache(testType)
require.NoError(t, c.UpdateResource("a", testResource("a")))

// cancel watch-all
w := make(chan Response, 1)
cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w)
cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
cancel()
checkWatchCount(t, c, "a", 0)

// cancel watch for "a"
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w)
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
cancel()
Expand All @@ -352,10 +360,10 @@ func TestLinearCancel(t *testing.T) {
w2 := make(chan Response, 1)
w3 := make(chan Response, 1)
w4 := make(chan Response, 1)
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w)
cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, w2)
cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w3)
cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w4)
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w)
cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, streamState, w2)
cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w3)
cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, streamState, w4)
mustBlock(t, w)
mustBlock(t, w2)
mustBlock(t, w3)
Expand All @@ -377,6 +385,7 @@ func TestLinearCancel(t *testing.T) {
// TODO(mattklein123): This test requires GOMAXPROCS or -parallel >= 100. This should be
// rewritten to not require that. This is not the case in the GH actions environment.
func TestLinearConcurrentSetWatch(t *testing.T) {
streamState := stream.NewStreamState(false, map[string]string{})
c := NewLinearCache(testType)
n := 50
for i := 0; i < 2*n; i++ {
Expand All @@ -396,7 +405,7 @@ func TestLinearConcurrentSetWatch(t *testing.T) {
ResourceNames: []string{id, id2},
VersionInfo: "0",
TypeUrl: testType,
}, value)
}, streamState, value)
// wait until all updates apply
verifyResponse(t, value, "", 1)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/v3/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ type MuxCache struct {

var _ Cache = &MuxCache{}

func (mux *MuxCache) CreateWatch(request *Request, value chan Response) func() {
func (mux *MuxCache) CreateWatch(request *Request, state stream.StreamState, value chan Response) func() {
key := mux.Classify(request)
cache, exists := mux.Caches[key]
if !exists {
value <- nil
return nil
}
return cache.CreateWatch(request, value)
return cache.CreateWatch(request, state, value)
}

func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() {
Expand Down
33 changes: 31 additions & 2 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL)
}

// CreateWatch returns a watch for an xDS request.
func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) func() {
func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() {
nodeID := cache.hash.ID(request.Node)

cache.mu.Lock()
Expand All @@ -309,6 +309,32 @@ func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) f
snapshot, exists := cache.snapshots[nodeID]
version := snapshot.GetVersion(request.TypeUrl)

if exists {
knownResourceNames := streamState.GetKnownResourceNames(request.TypeUrl)
diff := []string{}
for _, r := range request.ResourceNames {
if _, ok := knownResourceNames[r]; !ok {
diff = append(diff, r)
}
}

cache.log.Debugf("nodeID %q requested %s%v and known %v. Diff %v", nodeID,
request.TypeUrl, request.ResourceNames, knownResourceNames, diff)

if len(diff) > 0 {
resources := snapshot.GetResourcesAndTTL(request.TypeUrl)
for _, name := range diff {
if _, exists := resources[name]; exists {
if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil {
cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl,
request.ResourceNames, nodeID, err)
}
return nil
}
}
}
}

// if the requested version is up-to-date or missing a response, leave an open watch
if !exists || request.VersionInfo == version {
watchID := cache.nextWatchID()
Expand All @@ -322,7 +348,10 @@ func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) f

// otherwise, the watch may be responded immediately
resources := snapshot.GetResourcesAndTTL(request.TypeUrl)
_ = cache.respond(context.Background(), request, value, resources, version, false)
if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil {
cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl,
request.ResourceNames, nodeID, err)
}

return nil
}
Expand Down
Loading