Skip to content

Commit

Permalink
engine: resources, graph: Change the done channel into a ctx
Browse files Browse the repository at this point in the history
This is part one of porting Watch to context.
  • Loading branch information
purpleidea committed Aug 7, 2023
1 parent 0dcfe02 commit 80d1253
Show file tree
Hide file tree
Showing 34 changed files with 72 additions and 66 deletions.
18 changes: 9 additions & 9 deletions engine/graph/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
defer close(obj.state[vertex].eventsChan) // we close this on behalf of res

// This is a close reverse-multiplexer. If any of the channels
// close, then it will cause the doneChan to close. That way,
// close, then it will cause the doneCtx to cancel. That way,
// multiple different folks can send a close signal, without
// every worrying about duplicate channel close panics.
obj.state[vertex].wg.Add(1)
Expand All @@ -289,7 +289,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
}

// the main "done" signal gets activated here!
close(obj.state[vertex].doneChan)
obj.state[vertex].doneCtxCancel() // cancels doneCtx
}()

var err error
Expand All @@ -308,7 +308,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {
case <-timer.C: // the wait is over
return errDelayExpired // special

case <-obj.state[vertex].init.Done:
case <-obj.state[vertex].init.DoneCtx.Done():
return nil
}
}
Expand Down Expand Up @@ -359,7 +359,7 @@ func (obj *Engine) Worker(vertex pgraph.Vertex) error {

// If this exits cleanly, we must unblock the reverse-multiplexer.
// I think this additional close is unnecessary, but it's not harmful.
defer close(obj.state[vertex].eventsDone) // causes doneChan to close
defer close(obj.state[vertex].eventsDone) // causes doneCtx to cancel
limiter := rate.NewLimiter(res.MetaParams().Limit, res.MetaParams().Burst)
var reserv *rate.Reservation
var reterr error
Expand All @@ -376,7 +376,7 @@ Loop:
// we then save so we can return it to the caller of us.
if err != nil {
failed = true
close(obj.state[vertex].watchDone) // causes doneChan to close
close(obj.state[vertex].watchDone) // causes doneCtx to cancel
reterr = errwrap.Append(reterr, err) // permanent failure
continue
}
Expand Down Expand Up @@ -411,7 +411,7 @@ Loop:
// pause if one was requested...
select {
case <-obj.state[vertex].pauseSignal: // channel closes
// NOTE: If we allowed a doneChan below to let us out
// NOTE: If we allowed a doneCtx below to let us out
// of the resumeSignal wait, then we could loop around
// and run this again, causing a panic. Instead of this
// being made safe with a sync.Once, we instead run a
Expand Down Expand Up @@ -457,7 +457,7 @@ Loop:
}
if e != nil {
failed = true
close(obj.state[vertex].limitDone) // causes doneChan to close
close(obj.state[vertex].limitDone) // causes doneCtx to cancel
reterr = errwrap.Append(reterr, e) // permanent failure
break LimitWait
}
Expand Down Expand Up @@ -497,7 +497,7 @@ Loop:
}
if e != nil {
failed = true
close(obj.state[vertex].limitDone) // causes doneChan to close
close(obj.state[vertex].limitDone) // causes doneCtx to cancel
reterr = errwrap.Append(reterr, e) // permanent failure
break RetryWait
}
Expand Down Expand Up @@ -545,7 +545,7 @@ Loop:
// this dies. If Process fails permanently, we ask it
// to exit right here... (It happens when we loop...)
failed = true
close(obj.state[vertex].processDone) // causes doneChan to close
close(obj.state[vertex].processDone) // causes doneCtx to cancel
reterr = errwrap.Append(reterr, err) // permanent failure
continue

Expand Down
2 changes: 1 addition & 1 deletion engine/graph/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (obj *Engine) Commit() error {
free := []func() error{} // functions to run after graphsync to reset...
vertexRemoveFn := func(vertex pgraph.Vertex) error {
// wait for exit before starting new graph!
close(obj.state[vertex].removeDone) // causes doneChan to close
close(obj.state[vertex].removeDone) // causes doneCtx to cancel
obj.state[vertex].Resume() // unblock from resume
obj.waits[vertex].Wait() // sync

Expand Down
16 changes: 10 additions & 6 deletions engine/graph/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package graph

import (
"context"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -60,9 +61,12 @@ type State struct {
isStateOK bool // is state OK or do we need to run CheckApply ?
workerErr error // did the Worker error?

// doneChan closes when Watch should shut down. When any of the
// doneCtx is cancelled when Watch should shut down. When any of the
// following channels close, it causes this to close.
doneChan chan struct{}
doneCtx context.Context

// doneCtxCancel is the cancel function for doneCtx.
doneCtxCancel func()

// processDone is closed when the Process/CheckApply function fails
// permanently, and wants to cause Watch to exit.
Expand Down Expand Up @@ -131,7 +135,7 @@ func (obj *State) Init() error {
return fmt.Errorf("the Logf function is missing")
}

obj.doneChan = make(chan struct{})
obj.doneCtx, obj.doneCtxCancel = context.WithCancel(context.Background())

obj.processDone = make(chan struct{})
obj.watchDone = make(chan struct{})
Expand Down Expand Up @@ -161,7 +165,7 @@ func (obj *State) Init() error {
// Watch:
Running: obj.event,
Event: obj.event,
Done: obj.doneChan,
DoneCtx: obj.doneCtx,

// CheckApply:
Refresh: func() bool {
Expand Down Expand Up @@ -338,7 +342,7 @@ func (obj *State) Pause() error {
select {
case <-obj.pausedAck.Wait(): // we got it!
// we're paused
case <-obj.doneChan:
case <-obj.doneCtx.Done():
return engine.ErrClosed
}
obj.paused = true
Expand Down Expand Up @@ -401,7 +405,7 @@ func (obj *State) poll(interval uint32) error {
case <-ticker.C: // received the timer event
obj.init.Logf("polling...")

case <-obj.init.Done: // signal for shutdown request
case <-obj.init.DoneCtx.Done(): // signal for shutdown request
return nil
}

Expand Down
8 changes: 5 additions & 3 deletions engine/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package engine

import (
"context"
"encoding/gob"
"fmt"

Expand Down Expand Up @@ -101,9 +102,10 @@ type Init struct {
// Event sends an event notifying the engine of a possible state change.
Event func()

// Done returns a channel that will close to signal to us that it's time
// for us to shutdown.
Done chan struct{}
// DoneCtx returns a context that will cancel to signal to us that it's
// time for us to shutdown.
// TODO: this is temporary until Watch supports context directly.
DoneCtx context.Context

// Called from within CheckApply:

Expand Down
2 changes: 1 addition & 1 deletion engine/resources/augeas.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (obj *AugeasRes) Watch() error {
}
send = true

case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions engine/resources/aws_ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func (obj *AwsEc2Res) longpollWatch() error {
send = true
}

case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
return nil
}

Expand Down Expand Up @@ -596,7 +596,7 @@ func (obj *AwsEc2Res) snsWatch() error {
obj.init.Logf("State: %v", msg.event)
send = true

case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
return nil
}

Expand Down
5 changes: 2 additions & 3 deletions engine/resources/config_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ func (obj *ConfigEtcdRes) Watch() error {
obj.wg.Add(1)
defer obj.wg.Done()
// FIXME: add timeout to context
// The obj.init.Done channel is closed by the engine to signal shutdown.
ctx, cancel := util.ContextWithCloser(context.Background(), obj.init.Done)
ctx, cancel := context.WithCancel(obj.init.DoneCtx)
defer cancel()
ch, err := obj.init.World.IdealClusterSizeWatch(util.CtxWithWg(ctx, obj.wg))
if err != nil {
Expand All @@ -121,7 +120,7 @@ Loop:
}
// pass through and send an event

case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
}

obj.init.Event() // notify engine of an event (this can block)
Expand Down
6 changes: 3 additions & 3 deletions engine/resources/consul_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ func (obj *ConsulKVRes) Watch() error {
// Unexpected situation, bug in consul API...
select {
case ch <- fmt.Errorf("unexpected behaviour in Consul API"):
case <-obj.init.Done: // signal for shutdown request
case <-obj.init.DoneCtx.Done(): // signal for shutdown request
}

case <-obj.init.Done: // signal for shutdown request
case <-obj.init.DoneCtx.Done(): // signal for shutdown request
}
return
}
Expand All @@ -186,7 +186,7 @@ func (obj *ConsulKVRes) Watch() error {
}
obj.init.Event()

case <-obj.init.Done: // signal for shutdown request
case <-obj.init.DoneCtx.Done(): // signal for shutdown request
return nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion engine/resources/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (obj *CronRes) Watch() error {
}
send = true

case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
return nil
}
// do all our event sending all together to avoid duplicate msgs
Expand Down
6 changes: 3 additions & 3 deletions engine/resources/dhcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func (obj *DHCPServerRes) Watch() error {
case <-closeSignal: // something shut us down early
return closeError

case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
return nil
}

Expand Down Expand Up @@ -518,7 +518,7 @@ func (obj *DHCPServerRes) CheckApply(apply bool) (bool, error) {
//select {
//case <-ch:
////case <-obj.interruptChan: // TODO: if we ever support InterruptableRes
//case <-obj.init.Done: // closed by the engine to signal shutdown
//case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
//}

// Cheap runtime validation!
Expand Down Expand Up @@ -1056,7 +1056,7 @@ func (obj *DHCPHostRes) Watch() error {
obj.init.Running() // when started, notify engine that we're running

select {
case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
}

//obj.init.Event() // notify engine of an event (this can block)
Expand Down
2 changes: 1 addition & 1 deletion engine/resources/docker_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (obj *DockerContainerRes) Watch() error {
}
return err

case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion engine/resources/docker_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (obj *DockerImageRes) Watch() error {
}
return err

case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion engine/resources/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (obj *ExecRes) Watch() error {
send = true
}

case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion engine/resources/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func (obj *FileRes) Watch() error {
}
send = true

case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion engine/resources/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (obj *GroupRes) Watch() error {
}
send = true

case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion engine/resources/hostname.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (obj *HostnameRes) Watch() error {
case <-signals:
send = true

case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions engine/resources/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (obj *HTTPServerRes) Watch() error {
case <-closeSignal: // something shut us down early
return closeError

case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
return nil
}

Expand Down Expand Up @@ -725,7 +725,7 @@ func (obj *HTTPFileRes) Watch() error {
obj.init.Running() // when started, notify engine that we're running

select {
case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
}

//obj.init.Event() // notify engine of an event (this can block)
Expand Down
7 changes: 3 additions & 4 deletions engine/resources/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/purpleidea/mgmt/engine"
"github.com/purpleidea/mgmt/engine/traits"
"github.com/purpleidea/mgmt/util"
"github.com/purpleidea/mgmt/util/errwrap"
)

Expand Down Expand Up @@ -132,8 +131,8 @@ func (obj *KVRes) Close() error {
// Watch is the primary listener for this resource and it outputs events.
func (obj *KVRes) Watch() error {
// FIXME: add timeout to context
// The obj.init.Done channel is closed by the engine to signal shutdown.
ctx, cancel := util.ContextWithCloser(context.Background(), obj.init.Done)
// The obj.init.DoneCtx context is closed by the engine to signal shutdown.
ctx, cancel := context.WithCancel(obj.init.DoneCtx)
defer cancel()

ch, err := obj.init.World.StrMapWatch(ctx, obj.getKey()) // get possible events!
Expand All @@ -159,7 +158,7 @@ func (obj *KVRes) Watch() error {
}
send = true

case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion engine/resources/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (obj *MountRes) Watch() error {

send = true

case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion engine/resources/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (obj *MsgRes) Watch() error {
//var send = false // send event?
for {
select {
case <-obj.init.Done: // closed by the engine to signal shutdown
case <-obj.init.DoneCtx.Done(): // closed by the engine to signal shutdown
return nil
}

Expand Down
Loading

0 comments on commit 80d1253

Please sign in to comment.