Skip to content

Commit

Permalink
fix: cron.Job should break loop when shutdown trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
morlay committed Aug 22, 2024
1 parent df82028 commit e2d0421
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 18 deletions.
6 changes: 1 addition & 5 deletions pkg/cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,7 @@ func (a *app) newFrom(cc Command, parent Command) *cobra.Command {
return err
}

if err := singletons.RunOrServe(ctx); err != nil {
fmt.Println(err)
}

return nil
return singletons.RunOrServe(ctx)
}

return cmd
Expand Down
18 changes: 11 additions & 7 deletions pkg/configuration/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ func RunOrServe(ctx context.Context, configurators ...any) error {
}

if len(configuratorServers) > 0 {
stopCh := make(chan os.Signal, 1)

stopCh := make(chan os.Signal)
g, c := errgroup.WithContext(cc)

g.Go(func() error {
Expand Down Expand Up @@ -113,11 +112,11 @@ func serve(ctx context.Context, stopCh chan os.Signal, configuratorServers ...Se
}

g.Go(func() error {
log.With(
l := log.With(
slog.String("type", fmt.Sprintf("%T", server)),
slog.String("lifecycle", "Serve"),
).Debug("serving")

)
l.Debug("serving")
err := server.Serve(c)
go func() {
stopCh <- syscall.SIGTERM
Expand All @@ -131,20 +130,25 @@ func serve(ctx context.Context, stopCh chan os.Signal, configuratorServers ...Se

func Shutdown(c context.Context, configuratorServers ...CanShutdown) error {
timeout := 10 * time.Second

g := &errgroup.Group{}

for _, canShutdown := range configuratorServers {
g.Go(func() error {
ctx, cancel := context.WithTimeout(c, timeout)
defer cancel()

log.With(
l := log.With(
slog.String("type", fmt.Sprintf("%T", canShutdown)),
slog.String("lifecycle", "Shutdown"),
slog.String("timeout", timeout.String()),
).Debug("shutting down")
)

l.Debug("shutting down")
defer log.Debug("done")

done := make(chan error)
defer close(done)

go func() {
done <- canShutdown.Shutdown(ctx)
Expand Down
25 changes: 19 additions & 6 deletions pkg/cron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/pkg/errors"
"log/slog"
"sync"
"time"

"github.com/go-courier/logr"
Expand All @@ -24,6 +25,8 @@ type Job struct {

schedule cron.Schedule
timer *time.Timer
done chan struct{}
once sync.Once

name string
action func(ctx context.Context)
Expand Down Expand Up @@ -62,19 +65,25 @@ var _ configuration.Server = (*Job)(nil)
func (j *Job) Serve(ctx context.Context) error {
ci := configuration.ContextInjectorFromContext(ctx)

logr.FromContext(ctx).WithValues(
l := logr.FromContext(ctx).WithValues(
slog.String("name", j.name),
slog.String("cron", j.Cron),
).Info("waiting")
)

l.Info("waiting")

j.timer = time.NewTimer(5 * time.Second)

j.done = make(chan struct{})

for {
now := time.Now()

j.timer.Reset(j.schedule.Next(now).Sub(now))

select {
case <-j.done:
return nil
case <-ctx.Done():
return ctx.Err()
case now = <-j.timer.C:
Expand All @@ -87,9 +96,13 @@ func (j *Job) Serve(ctx context.Context) error {
}
}

func (j *Job) Shutdown(context.Context) error {
if j.timer != nil {
j.timer.Stop()
}
func (j *Job) Shutdown(ctx context.Context) error {
j.once.Do(func() {
close(j.done)

if j.timer != nil {
j.timer.Stop()
}
})
return nil
}

0 comments on commit e2d0421

Please sign in to comment.