Skip to content

Commit

Permalink
fix(kuma-cp): use contexts instead of channels in watchdog
Browse files Browse the repository at this point in the history
This makes the code simpler and easier to track

xref kumahq#11094

Signed-off-by: Charly Molter <[email protected]>
  • Loading branch information
lahabana committed Aug 13, 2024
1 parent dd39e30 commit 33b7db9
Show file tree
Hide file tree
Showing 15 changed files with 153 additions and 134 deletions.
10 changes: 4 additions & 6 deletions pkg/hds/tracker/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,18 @@ func (t *tracker) OnHealthCheckRequest(streamID xds.StreamID, req *envoy_service
streams.activeStreams[streamID] = true

if streams.watchdogCancel == nil { // watchdog was not started yet
stopCh := make(chan struct{})
streams.watchdogCancel = func() {
close(stopCh)
}
ctx, cancel := context.WithCancel(context.Background())
streams.watchdogCancel = cancel
// kick off watchdog for that Dataplane
go t.newWatchdog(req.Node).Start(stopCh)
go t.newWatchdog(req.Node).Start(ctx)
t.log.V(1).Info("started Watchdog for a Dataplane", "streamid", streamID, "proxyId", proxyId, "dataplaneKey", dataplaneKey)
}
t.dpStreams[dataplaneKey] = streams
t.streamsAssociation[streamID] = dataplaneKey
return nil
}

func (t *tracker) newWatchdog(node *envoy_core.Node) watchdog.Watchdog {
func (t *tracker) newWatchdog(node *envoy_core.Node) util_xds_v3.Watchdog {
return &watchdog.SimpleWatchdog{
NewTicker: func() *time.Ticker {
return time.NewTicker(t.config.RefreshInterval.Duration)
Expand Down
3 changes: 2 additions & 1 deletion pkg/kds/v2/reconcile/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ type Reconciler interface {
// Reconcile reconciles state of node given changed resource types.
// Returns error and bool which is true if any resource was changed.
Reconcile(context.Context, *envoy_core.Node, map[model.ResourceType]struct{}, logr.Logger) (error, bool)
Clear(context.Context, *envoy_core.Node) error
// Clear remove local state of node
Clear(*envoy_core.Node) error
}

// Generates a snapshot of xDS resources for a given node.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kds/v2/reconcile/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type reconciler struct {
lock sync.Mutex
}

func (r *reconciler) Clear(ctx context.Context, node *envoy_core.Node) error {
func (r *reconciler) Clear(node *envoy_core.Node) error {
id := r.hasher.ID(node)
r.lock.Lock()
defer r.lock.Unlock()
Expand Down
7 changes: 3 additions & 4 deletions pkg/kds/v2/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,11 @@ func newSyncTracker(
for _, typ := range providedTypes {
changedTypes[typ] = struct{}{}
}
return util_xds_v3.NewWatchdogCallbacks(func(ctx context.Context, node *envoy_core.Node, streamID int64) (util_watchdog.Watchdog, error) {
return util_xds_v3.NewWatchdogCallbacks(func(ctx context.Context, node *envoy_core.Node, streamID int64) (util_xds_v3.Watchdog, error) {
log := log.WithValues("streamID", streamID, "nodeID", node.Id)
log = kuma_log.AddFieldsFromCtx(log, ctx, extensions)
if experimentalWatchdogCfg.Enabled {
return &EventBasedWatchdog{
Ctx: ctx,
Node: node,
EventBus: eventBus,
Reconciler: reconciler,
Expand Down Expand Up @@ -128,7 +127,7 @@ func newSyncTracker(
NewTicker: func() *time.Ticker {
return time.NewTicker(refresh)
},
OnTick: func(context.Context) error {
OnTick: func(ctx context.Context) error {
start := core.Now()
log.V(1).Info("on tick")
err, changed := reconciler.Reconcile(ctx, node, changedTypes, log)
Expand All @@ -147,7 +146,7 @@ func newSyncTracker(
log.Error(err, "OnTick() failed")
},
OnStop: func() {
if err := reconciler.Clear(ctx, node); err != nil {
if err := reconciler.Clear(node); err != nil {
log.Error(err, "OnStop() failed")
}
},
Expand Down
14 changes: 5 additions & 9 deletions pkg/kds/v2/server/event_based_watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ import (
"github.com/kumahq/kuma/pkg/kds/v2/reconcile"
"github.com/kumahq/kuma/pkg/multitenant"
util_maps "github.com/kumahq/kuma/pkg/util/maps"
util_watchdog "github.com/kumahq/kuma/pkg/util/watchdog"
)

type EventBasedWatchdog struct {
Ctx context.Context
Node *envoy_core.Node
EventBus events.EventBus
Reconciler reconcile.Reconciler
Expand All @@ -31,10 +29,8 @@ type EventBasedWatchdog struct {
NewFullResyncTicker func() *time.Ticker
}

var _ util_watchdog.Watchdog = &EventBasedWatchdog{}

func (e *EventBasedWatchdog) Start(stop <-chan struct{}) {
tenantID, _ := multitenant.TenantFromCtx(e.Ctx)
func (e *EventBasedWatchdog) Start(ctx context.Context) {
tenantID, _ := multitenant.TenantFromCtx(ctx)
listener := e.EventBus.Subscribe(func(event events.Event) bool {
switch ev := event.(type) {
case events.ResourceChangedEvent:
Expand All @@ -58,8 +54,8 @@ func (e *EventBasedWatchdog) Start(stop <-chan struct{}) {

for {
select {
case <-stop:
if err := e.Reconciler.Clear(e.Ctx, e.Node); err != nil {
case <-ctx.Done():
if err := e.Reconciler.Clear(e.Node); err != nil {
e.Log.Error(err, "reconcile clear failed")
}
listener.Close()
Expand All @@ -71,7 +67,7 @@ func (e *EventBasedWatchdog) Start(stop <-chan struct{}) {
reason := strings.Join(util_maps.SortedKeys(reasons), "_and_")
e.Log.V(1).Info("reconcile", "changedTypes", changedTypes, "reason", reason)
start := core.Now()
err, changed := e.Reconciler.Reconcile(e.Ctx, e.Node, changedTypes, e.Log)
err, changed := e.Reconciler.Reconcile(ctx, e.Node, changedTypes, e.Log)
if err != nil && !errors.Is(err, context.Canceled) {
e.Log.Error(err, "reconcile failed", "changedTypes", changedTypes, "reason", reason)
e.Metrics.KdsGenerationErrors.Inc()
Expand Down
12 changes: 6 additions & 6 deletions pkg/kds/v2/server/event_based_watchdog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s staticReconciler) Reconcile(ctx context.Context, node *envoy_core.Node,
return nil, true
}

func (s staticReconciler) Clear(ctx context.Context, node *envoy_core.Node) error {
func (s staticReconciler) Clear(node *envoy_core.Node) error {
return nil
}

Expand All @@ -39,7 +39,7 @@ var _ = Describe("Event Based Watchdog", func() {
var eventBus events.EventBus
var metrics core_metrics.Metrics
var reconciler *staticReconciler
var stopCh chan struct{}
var cancel context.CancelFunc
var flushCh chan time.Time
var fullResyncCh chan time.Time
var watchdog *EventBasedWatchdog
Expand All @@ -56,14 +56,14 @@ var _ = Describe("Event Based Watchdog", func() {
eventBus, err = events.NewEventBus(10, metrics)
Expect(err).ToNot(HaveOccurred())

stopCh = make(chan struct{})
flushCh = make(chan time.Time)
fullResyncCh = make(chan time.Time)
reconciler = &staticReconciler{
changedResTypes: make(chan map[core_model.ResourceType]struct{}, 1),
}
ctx := context.Background()
ctx, cancel = context.WithCancel(ctx)
watchdog = &EventBasedWatchdog{
Ctx: context.Background(),
Node: &envoy_core.Node{
Id: "1",
},
Expand All @@ -84,12 +84,12 @@ var _ = Describe("Event Based Watchdog", func() {
},
}
go func() {
watchdog.Start(stopCh)
watchdog.Start(ctx)
}()
})

AfterAll(func() {
close(stopCh)
cancel()
})

It("should reconcile on the first flush", func() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/mads/v1/service/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ func NewReconcilerRestCallbacks(reconciler mads_reconcile.Reconciler) util_xds.R
}

func NewSyncTracker(reconciler mads_reconcile.Reconciler, refresh time.Duration, log logr.Logger) envoy_xds.Callbacks {
return util_xds_v3.NewWatchdogCallbacks(func(ctx context.Context, node *envoy_core.Node, streamID int64) (util_watchdog.Watchdog, error) {
return util_xds_v3.NewWatchdogCallbacks(func(_ context.Context, node *envoy_core.Node, streamID int64) (util_xds_v3.Watchdog, error) {
log := log.WithValues("streamID", streamID, "node", node)
return &util_watchdog.SimpleWatchdog{
NewTicker: func() *time.Ticker {
return time.NewTicker(refresh)
},
OnTick: func(context.Context) error {
OnTick: func(ctx context.Context) error {
log.V(1).Info("on tick")
return reconciler.Reconcile(ctx)
},
Expand Down
26 changes: 12 additions & 14 deletions pkg/mads/v1/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ var _ = Describe("MADS http service", func() {

// then
Expect(err).ToNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
Expect(resp).To(HaveHTTPStatus(http.StatusOK))

// when
respBody, err := io.ReadAll(resp.Body)
Expand Down Expand Up @@ -125,7 +125,7 @@ var _ = Describe("MADS http service", func() {

// then
Expect(err).ToNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusNotModified))
Expect(resp).To(HaveHTTPStatus(http.StatusNotModified))

// when
respBody, err = io.ReadAll(resp.Body)
Expand Down Expand Up @@ -244,7 +244,7 @@ var _ = Describe("MADS http service", func() {

// then
Expect(err).ToNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
Expect(resp).To(HaveHTTPStatus(http.StatusOK))

// when
respBody, err := io.ReadAll(resp.Body)
Expand Down Expand Up @@ -287,7 +287,7 @@ var _ = Describe("MADS http service", func() {

// then
Expect(err).ToNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusNotModified))
Expect(resp).To(HaveHTTPStatus(http.StatusNotModified))

// when
respBody, err = io.ReadAll(resp.Body)
Expand Down Expand Up @@ -319,12 +319,10 @@ var _ = Describe("MADS http service", func() {

// then
Expect(err).ToNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
Expect(resp).To(HaveHTTPStatus(http.StatusOK))

// when
respBody, err := io.ReadAll(resp.Body)

// then
Expect(err).ToNot(HaveOccurred())

// when
Expand Down Expand Up @@ -356,7 +354,7 @@ var _ = Describe("MADS http service", func() {

// then
Expect(err).ToNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
Expect(resp).To(HaveHTTPStatus(http.StatusOK))

// when
respBody, err = io.ReadAll(resp.Body)
Expand Down Expand Up @@ -396,7 +394,7 @@ var _ = Describe("MADS http service", func() {

// then
Expect(err).ToNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
Expect(resp).To(HaveHTTPStatus(http.StatusOK))

// when
respBody, err := io.ReadAll(resp.Body)
Expand Down Expand Up @@ -444,7 +442,7 @@ var _ = Describe("MADS http service", func() {

// then
Expect(err).ToNot(HaveOccurred())
Expect(resp2.StatusCode).To(Equal(http.StatusOK))
Expect(resp2).To(HaveHTTPStatus(http.StatusOK))

// when
respBody, err = io.ReadAll(resp2.Body)
Expand Down Expand Up @@ -530,7 +528,7 @@ var _ = Describe("MADS http service", func() {

// then
Expect(err).ToNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
Expect(resp).To(HaveHTTPStatus(http.StatusOK))

// when
respBody, err := io.ReadAll(resp.Body)
Expand Down Expand Up @@ -562,7 +560,7 @@ var _ = Describe("MADS http service", func() {

// then
Expect(err).ToNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusNotModified))
Expect(resp).To(HaveHTTPStatus(http.StatusNotModified))
})

It("should allow synchronous requests", func() {
Expand Down Expand Up @@ -603,7 +601,7 @@ var _ = Describe("MADS http service", func() {

// then
Expect(err).ToNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
Expect(resp).To(HaveHTTPStatus(http.StatusOK))

// when
respBody, err := io.ReadAll(resp.Body)
Expand Down Expand Up @@ -645,7 +643,7 @@ var _ = Describe("MADS http service", func() {

// then
Expect(err).ToNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusBadRequest))
Expect(resp).To(HaveHTTPStatus(http.StatusBadRequest))

// when
respBody, err := io.ReadAll(resp.Body)
Expand Down
44 changes: 10 additions & 34 deletions pkg/util/watchdog/watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,66 +7,42 @@ import (
"github.com/pkg/errors"
)

type Watchdog interface {
Start(stop <-chan struct{})
}

type SimpleWatchdog struct {
NewTicker func() *time.Ticker
OnTick func(context.Context) error
OnError func(error)
OnStop func()
}

func (w *SimpleWatchdog) Start(stop <-chan struct{}) {
func (w *SimpleWatchdog) Start(ctx context.Context) {
ticker := w.NewTicker()
defer ticker.Stop()

for {
ctx, cancel := context.WithCancel(context.Background())
// cancel is called at the end of the loop
go func() {
select {
case <-stop:
cancel()
case <-ctx.Done():
}
}()

select {
case <-stop:
default:
if err := w.onTick(ctx); err != nil && !errors.Is(err, context.Canceled) {
if err := w.onTick(ctx); err != nil {
if !errors.Is(err, context.Canceled) && w.OnError != nil {
w.OnError(err)
}
}
cancel()

select {
case <-ticker.C:
continue
case <-stop:
case <-ctx.Done():
if w.OnStop != nil {
w.OnStop()
}
// cancel will be called by the above goroutine
return
}
}
}

func (w *SimpleWatchdog) onTick(ctx context.Context) error {
func (w *SimpleWatchdog) onTick(ctx context.Context) (err error) {
defer func() {
if cause := recover(); cause != nil {
if w.OnError != nil {
var err error
switch typ := cause.(type) {
case error:
err = errors.WithStack(typ)
default:
err = errors.Errorf("%v", cause)
}
w.OnError(err)
switch typ := cause.(type) {
case error:
err = errors.WithStack(typ)
default:
err = errors.Errorf("%v", cause)
}
}
}()
Expand Down
Loading

0 comments on commit 33b7db9

Please sign in to comment.