diff --git a/pkg/server/delta/v3/watches.go b/pkg/server/delta/v3/watches.go index a423cdea21..c88548388a 100644 --- a/pkg/server/delta/v3/watches.go +++ b/pkg/server/delta/v3/watches.go @@ -26,9 +26,7 @@ func newWatches() watches { // Cancel all watches func (w *watches) Cancel() { for _, watch := range w.deltaWatches { - if watch.cancel != nil { - watch.cancel() - } + watch.Cancel() } } @@ -46,5 +44,9 @@ func (w *watch) Cancel() { if w.cancel != nil { w.cancel() } - close(w.responses) + if w.responses != nil { + // w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here + // This is needed to release resources taken by goroutines watching this channel + close(w.responses) + } } diff --git a/pkg/server/delta/v3/watches_test.go b/pkg/server/delta/v3/watches_test.go new file mode 100644 index 0000000000..8bb5078c44 --- /dev/null +++ b/pkg/server/delta/v3/watches_test.go @@ -0,0 +1,42 @@ +package delta + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/envoyproxy/go-control-plane/pkg/cache/v3" +) + +func TestDeltaWatches(t *testing.T) { + t.Run("watches response channels are properly closed when the watches are cancelled", func(t *testing.T) { + watches := newWatches() + + cancelCount := 0 + var channels []chan cache.DeltaResponse + // create a few watches, and ensure that the cancel function are called and the channels are closed + for i := 0; i < 5; i++ { + newWatch := watch{} + if i%2 == 0 { + newWatch.cancel = func() { cancelCount++ } + newWatch.responses = make(chan cache.DeltaResponse) + channels = append(channels, newWatch.responses) + } + + watches.deltaWatches[strconv.Itoa(i)] = newWatch + } + + watches.Cancel() + + assert.Equal(t, 3, cancelCount) + for _, channel := range channels { + select { + case _, ok := <-channel: + assert.False(t, ok, "a channel was not closed") + default: + assert.Fail(t, "a channel was not closed") + } + } + }) +}