Skip to content

Commit

Permalink
reconciler: add a configuration watcher pkg
Browse files Browse the repository at this point in the history
Move everything related to watching / updating the configuration to a
separate pkg. That makes the code easier to follow, and allows us to
properly unit test watching the configuration.

Signed-off-by: Miguel Duarte Barroso <[email protected]>
  • Loading branch information
maiqueb committed Dec 22, 2023
1 parent da358de commit 108a2ce
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 85 deletions.
103 changes: 18 additions & 85 deletions cmd/controlloop/controlloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"os"
"os/signal"
"strings"
"time"

"github.com/fsnotify/fsnotify"
Expand All @@ -22,11 +21,9 @@ import (

wbclient "github.com/k8snetworkplumbingwg/whereabouts/pkg/client/clientset/versioned"
wbinformers "github.com/k8snetworkplumbingwg/whereabouts/pkg/client/informers/externalversions"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/config"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/controlloop"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/logging"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/reconciler"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/types"
)

const (
Expand All @@ -38,11 +35,9 @@ const (
const (
_ int = iota
couldNotCreateController
couldNotGetFlatIPAM
cronExpressionError
cronSchedulerCreationError
fileWatcherError
fileWatcherAddWatcherError
couldNotCreateConfigWatcherError
)

const (
Expand Down Expand Up @@ -75,21 +70,6 @@ func main() {
if err != nil {
os.Exit(cronSchedulerCreationError)
}
schedule := determineCronExpression()

job, err := s.NewJob(
gocron.CronJob(schedule, false),
gocron.NewTask(func() {
reconciler.ReconcileIPs(errorChan)
}),
)
if err != nil {
_ = logging.Errorf("error with cron expression schedule: %v", err)
os.Exit(cronExpressionError)
}

logging.Verbosef("started cron with job ID: %q", job.ID().String())
s.Start()

watcher, err := fsnotify.NewWatcher()
if err != nil {
Expand All @@ -98,11 +78,24 @@ func main() {
}
defer watcher.Close()

go syncConfiguration(watcher, s, job, errorChan)
if err := watcher.Add(reconcilerCronConfiguration); err != nil {
_ = logging.Errorf("error adding watcher to config %q: %v", reconcilerCronConfiguration, err)
os.Exit(fileWatcherAddWatcherError)
reconcilerConfigWatcher, err := reconciler.NewConfigWatcher(
reconcilerCronConfiguration,
s,
watcher,
func() {
reconciler.ReconcileIPs(errorChan)
},
)
if err != nil {
os.Exit(couldNotCreateConfigWatcherError)
}
s.Start()

const reconcilerConfigMntFile = "/cron-schedule/..data"
p := func(e fsnotify.Event) bool {
return e.Name == reconcilerConfigMntFile && e.Op&fsnotify.Create == fsnotify.Create
}
reconcilerConfigWatcher.SyncConfiguration(p)

for {
select {
Expand Down Expand Up @@ -191,63 +184,3 @@ func newEventBroadcaster(k8sClientset kubernetes.Interface) record.EventBroadcas
func newEventRecorder(broadcaster record.EventBroadcaster) record.EventRecorder {
return broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName})
}

func determineCronExpression() string {
// We read the expression from a file if present, otherwise we use ReconcilerCronExpression
fileContents, err := os.ReadFile(reconcilerCronConfiguration)
if err != nil {
flatipam, _, err := config.GetFlatIPAM(true, &types.IPAMConfig{}, "")
if err != nil {
_ = logging.Errorf("could not get flatipam config: %v", err)
os.Exit(couldNotGetFlatIPAM)
}
_ = logging.Errorf("could not read file: %v, using expression from flatfile: %v", err, flatipam.IPAM.ReconcilerCronExpression)
return flatipam.IPAM.ReconcilerCronExpression
}
logging.Verbosef("using expression: %v", strings.TrimSpace(string(fileContents))) // do i need to trim spaces? idk i think the file would JUST be the expression?
return strings.TrimSpace(string(fileContents))
}

func syncConfiguration(
watcher *fsnotify.Watcher,
scheduler gocron.Scheduler,
job gocron.Job,
errorChannel chan error,
) {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}

updatedSchedule := determineCronExpression()
logging.Verbosef(
"configuration updated to file %q. New cron expression: %s",
event.Name,
updatedSchedule,
)
updatedJob, err := scheduler.Update(
job.ID(),
gocron.CronJob(updatedSchedule, false),
gocron.NewTask(func() {
reconciler.ReconcileIPs(errorChannel)
}),
)
if err != nil {
_ = logging.Errorf("error updating job %q configuration: %v", job.ID().String(), err)
}

logging.Verbosef(
"successfully updated CRON configuration id %q - new cron expression: %s",
updatedJob.ID().String(),
updatedSchedule,
)
case err, ok := <-watcher.Errors:
_ = logging.Errorf("error when listening to config changes: %v", err)
if !ok {
return
}
}
}
}
122 changes: 122 additions & 0 deletions pkg/reconciler/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package reconciler

import (
"fmt"
"os"
"path/filepath"
"strings"

"github.com/fsnotify/fsnotify"
"github.com/go-co-op/gocron/v2"

"github.com/k8snetworkplumbingwg/whereabouts/pkg/config"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/logging"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/types"
)

type ConfigWatcher struct {
configDir string
configPath string
currentSchedule string
job gocron.Job
scheduler gocron.Scheduler
handlerFunc func()
watcher *fsnotify.Watcher
}

func NewConfigWatcher(configPath string, scheduler gocron.Scheduler, configWatcher *fsnotify.Watcher, handlerFunc func()) (*ConfigWatcher, error) {
schedule, err := determineCronExpression(configPath)
if err != nil {
return nil, err
}

job, err := scheduler.NewJob(
gocron.CronJob(schedule, false),
gocron.NewTask(handlerFunc),
)
if err != nil {
return nil, fmt.Errorf("error creating job: %v", err)
}

return &ConfigWatcher{
configDir: filepath.Dir(configPath),
configPath: configPath,
currentSchedule: schedule,
job: job,
scheduler: scheduler,
watcher: configWatcher,
handlerFunc: handlerFunc,
}, nil
}

func determineCronExpression(configPath string) (string, error) {
// We read the expression from a file if present, otherwise we use ReconcilerCronExpression
fileContents, err := os.ReadFile(configPath)
if err != nil {
flatipam, _, err := config.GetFlatIPAM(true, &types.IPAMConfig{}, "")
if err != nil {
return "", logging.Errorf("could not get flatipam config: %v", err)
}

_ = logging.Errorf("could not read file: %v, using expression from flatfile: %v", err, flatipam.IPAM.ReconcilerCronExpression)
return flatipam.IPAM.ReconcilerCronExpression, nil
}
logging.Verbosef("using expression: %v", strings.TrimSpace(string(fileContents))) // do i need to trim spaces? idk i think the file would JUST be the expression?
return strings.TrimSpace(string(fileContents)), nil
}

func (c *ConfigWatcher) SyncConfiguration(relevantEventPredicate func(event fsnotify.Event) bool) {
go c.syncConfig(relevantEventPredicate)
if err := c.watcher.Add(c.configDir); err != nil {
_ = logging.Errorf("error adding watcher to config %q: %v", c.configPath, err)
}
}

func (c *ConfigWatcher) syncConfig(relevantEventPredicate func(event fsnotify.Event) bool) {
for {
select {
case event, ok := <-c.watcher.Events:
if !ok {
return
}

if !relevantEventPredicate(event) {
logging.Verbosef("event not relevant: %v", event)
continue
}
updatedSchedule, err := determineCronExpression(c.configPath)
if err != nil {
_ = logging.Errorf("error determining cron expression from %q: %v", c.configPath, err)
}
logging.Verbosef(
"configuration updated to file %q. New cron expression: %s",
event.Name,
updatedSchedule,
)

if updatedSchedule == c.currentSchedule {
logging.Debugf("no changes in schedule, nothing to do.")
continue
}
updatedJob, err := c.scheduler.Update(
c.job.ID(),
gocron.CronJob(updatedSchedule, false),
gocron.NewTask(c.handlerFunc),
)
if err != nil {
_ = logging.Errorf("error updating job %q configuration: %v", c.job.ID().String(), err)
}
c.currentSchedule = updatedSchedule
logging.Verbosef(
"successfully updated CRON configuration id %q - new cron expression: %s",
updatedJob.ID().String(),
updatedSchedule,
)
case err, ok := <-c.watcher.Errors:
_ = logging.Errorf("error when listening to config changes: %v", err)
if !ok {
return
}
}
}
}
70 changes: 70 additions & 0 deletions pkg/reconciler/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package reconciler

import (
"os"
"path/filepath"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"github.com/fsnotify/fsnotify"
"github.com/go-co-op/gocron/v2"
)

var _ = Describe("Reconciler configuration watcher", func() {
var (
config *ConfigWatcher
configDir string
dummyConfig *os.File
mailbox chan struct{}
watcher *fsnotify.Watcher
)

BeforeEach(func() {
var err error

mailbox = make(chan struct{})

configDir, err = os.MkdirTemp("", "config")
Expect(err).NotTo(HaveOccurred())
const initialCron = "0/1 2 3 * *"
dummyConfig, err = os.Create(filepath.Join(configDir, filepath.Base("..data")))
Expect(err).NotTo(HaveOccurred())

Expect(dummyConfig.Write([]byte(initialCron))).To(Equal(len(initialCron)))
scheduler, err := gocron.NewScheduler()
Expect(err).NotTo(HaveOccurred())
watcher, err = fsnotify.NewWatcher()
Expect(err).NotTo(HaveOccurred())
config, err = NewConfigWatcher(
dummyConfig.Name(),
scheduler,
watcher,
func() { mailbox <- struct{}{} },
)
scheduler.Start()
Expect(err).NotTo(HaveOccurred())
config.SyncConfiguration(func(event fsnotify.Event) bool {
return event.Name == dummyConfig.Name() && event.Op&fsnotify.Write == fsnotify.Write
})
})

AfterEach(func() {
watcher.Close()
dummyConfig.Close()
})

When("the cron job expression is updated in the file-system", func() {
const updatedCron = "0/1 * * * *"

BeforeEach(func() {
Expect(dummyConfig.WriteAt([]byte(updatedCron), 0)).To(Equal(len(updatedCron)))
})

It("the current schedule is updated, and the handler function executed", func() {
Eventually(func() string { return config.currentSchedule }).Should(Equal(updatedCron))
Eventually(mailbox).WithTimeout(time.Minute).Should(Receive())
})
})
})

0 comments on commit 108a2ce

Please sign in to comment.