Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-654: allow setting a stopTime for job. #760

Merged
merged 4 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ var (
ErrWithMonitorNil = fmt.Errorf("gocron: WithMonitor: monitor must not be nil")
ErrWithNameEmpty = fmt.Errorf("gocron: WithName: name must not be empty")
ErrWithStartDateTimePast = fmt.Errorf("gocron: WithStartDateTime: start must not be in the past")
ErrWithStopDateTimePast = fmt.Errorf("gocron: WithStopDateTime: end must not be in the past")
ErrStartTimeLaterThanEndTime = fmt.Errorf("gocron: WithStartDateTime: start must not be later than end")
ErrStopTimeEarlierThanStartTime = fmt.Errorf("gocron: WithStopDateTime: end must not be earlier than start")
ErrWithStopTimeoutZeroOrNegative = fmt.Errorf("gocron: WithStopTimeout: timeout must be greater than 0")
)

Expand Down
4 changes: 4 additions & 0 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
default:
}

if j.stopTimeReached(e.clock.Now()) {
return
}

if e.elector != nil {
if err := e.elector.IsLeader(j.ctx); err != nil {
e.sendOutForRescheduling(&jIn)
Expand Down
38 changes: 38 additions & 0 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type internalJob struct {
limitRunsTo *limitRunsTo
startTime time.Time
startImmediately bool
stopTime time.Time
// event listeners
afterJobRuns func(jobID uuid.UUID, jobName string)
beforeJobRuns func(jobID uuid.UUID, jobName string)
Expand All @@ -60,6 +61,13 @@ func (j *internalJob) stop() {
j.cancel()
}

func (j *internalJob) stopTimeReached(now time.Time) bool {
if j.stopTime.IsZero() {
return false
}
return j.stopTime.Before(now)
}

// task stores the function and parameters
// that are actually run when the job is executed.
type task struct {
Expand Down Expand Up @@ -594,11 +602,41 @@ func WithStartDateTime(start time.Time) StartAtOption {
if start.IsZero() || start.Before(now) {
return ErrWithStartDateTimePast
}
if !j.stopTime.IsZero() && j.stopTime.Before(start) {
return ErrStartTimeLaterThanEndTime
}
j.startTime = start
return nil
}
}

// WithStopAt sets the option for stopping the job from running
// after the specified time.
func WithStopAt(option StopAtOption) JobOption {
return func(j *internalJob, now time.Time) error {
return option(j, now)
}
}

// StopAtOption defines options for stopping the job
type StopAtOption func(*internalJob, time.Time) error

// WithStopDateTime sets the final date & time after which the job should stop.
// This must be in the future and should be after the startTime (if specified).
// The job's final run may be at the stop time, but not after.
func WithStopDateTime(end time.Time) StopAtOption {
return func(j *internalJob, now time.Time) error {
if end.IsZero() || end.Before(now) {
return ErrWithStopDateTimePast
}
if end.Before(j.startTime) {
return ErrStopTimeEarlierThanStartTime
}
j.stopTime = end
return nil
}
}

// WithTags sets the tags for the job. Tags provide
// a way to identify jobs by a set of tags and remove
// multiple jobs by tag.
Expand Down
4 changes: 4 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
return
}

if j.stopTimeReached(s.now()) {
return
}

scheduleFrom := j.lastRun
if len(j.nextScheduled) > 0 {
// always grab the last element in the slice as that is the furthest
Expand Down
31 changes: 31 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,21 @@ func TestScheduler_LongRunningJobs(t *testing.T) {
options []SchedulerOption
expectedRuns int
}{
{
"duration with stop time between executions",
durationCh,
DurationJob(
time.Millisecond * 500,
),
NewTask(
func() {
time.Sleep(1 * time.Second)
durationCh <- struct{}{}
}),
[]JobOption{WithStopAt(WithStopDateTime(time.Now().Add(time.Millisecond * 1100)))},
[]SchedulerOption{WithStopTimeout(time.Second * 2)},
2,
},
{
"duration",
durationCh,
Expand Down Expand Up @@ -755,6 +770,22 @@ func TestScheduler_NewJobErrors(t *testing.T) {
[]JobOption{WithStartAt(WithStartDateTime(time.Now().Add(-time.Second)))},
ErrWithStartDateTimePast,
},
{
"WithStartDateTime is later than the end",
DurationJob(
time.Second,
),
[]JobOption{WithStopAt(WithStopDateTime(time.Now().Add(time.Second))), WithStartAt(WithStartDateTime(time.Now().Add(time.Hour)))},
ErrStartTimeLaterThanEndTime,
},
{
"WithStopDateTime is earlier than the start",
DurationJob(
time.Second,
),
[]JobOption{WithStartAt(WithStartDateTime(time.Now().Add(time.Hour))), WithStopAt(WithStopDateTime(time.Now().Add(time.Second)))},
ErrStopTimeEarlierThanStartTime,
},
{
"oneTimeJob start at is zero",
OneTimeJob(OneTimeJobStartDateTime(time.Time{})),
Expand Down