Skip to content

Commit

Permalink
origin modules not active,thanks robfig#364
Browse files Browse the repository at this point in the history
  • Loading branch information
penglj committed May 9, 2022
1 parent a1d5809 commit 81bea23
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 0 deletions.
57 changes: 57 additions & 0 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ type Entry struct {
// It is kept around so that user code that needs to get at the job later,
// e.g. via Entries() can do so.
Job Job

// Paused is a flag to indicate that whether the job is currently paused.
Paused bool
PausedMu sync.Mutex
}

// Valid returns true if this is not the zero entry.
Expand All @@ -99,6 +103,12 @@ func (e Entry) ScheduleFirst(now time.Time) time.Time {
}
}

func (e *Entry) isPaused() bool {
e.PausedMu.Lock()
defer e.PausedMu.Unlock()
return e.Paused
}

// byTime is a wrapper for sorting the entry array by time
// (with zero time at the end).
type byTime []*Entry
Expand Down Expand Up @@ -192,6 +202,7 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job, entryOpts ...EntryOption) En
Schedule: schedule,
WrappedJob: c.chain.Then(cmd),
Job: cmd,
Paused: false,
}
for _, fn := range entryOpts {
fn(entry)
Expand Down Expand Up @@ -354,6 +365,14 @@ func (c *Cron) run() {
break
}

if e.isPaused() {
// Updating Next and Prev so that the schedule continues to be maintained.
// This will help us proceed once the job is continued.
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
continue
}

c.startJob(e.WrappedJob)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
Expand Down Expand Up @@ -445,6 +464,44 @@ func (c *Cron) Stop() context.Context {
return ctx
}

// Pause the cron job corresponding to the given id.
// This would result in a no-op if the job is currently paused.
func (c *Cron) Pause(id EntryID) error {
var validId = false
for _, entry := range c.entries {
if entry.ID == id {
entry.PausedMu.Lock()
entry.Paused = true
validId = true
entry.PausedMu.Unlock()
break
}
}
if !validId {
return errors.New("invalid entry id")
}
return nil
}

// Continue the cron job corresponding to the given id.
// This would result in a no-op if the job is not currently in a paused state.
func (c *Cron) Continue(id EntryID) error {
var validId = false
for _, entry := range c.entries {
if entry.ID == id {
entry.PausedMu.Lock()
entry.Paused = false
validId = true
entry.PausedMu.Unlock()
break
}
}
if !validId {
return errors.New("invalid entry id")
}
return nil
}

// entrySnapshot returns a copy of the current cron entry list.
func (c *Cron) entrySnapshot() []Entry {
var entries = make([]Entry, len(c.entries))
Expand Down
87 changes: 87 additions & 0 deletions cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,93 @@ func TestCron_UpdateSchedule(t *testing.T) {

}

func TestCron_PauseAndContinue(t *testing.T) {
checkNoError := func(err error) {
if err != nil {
t.Error(err)
}
}

t.Run("pause invalid entry", func(t *testing.T) {
spec := "?/5 * * * * *" // every 5 seconds.
cron := New(WithSeconds())
ticks := 0
id, err := cron.AddFunc(spec, func() { ticks++ })
checkNoError(err)
if err := cron.Pause(id + 1); err == nil {
t.Error("was able to pause invalid entry")
}
})

t.Run("pause after single execution and continue after 2 cycles", func(t *testing.T) {
spec := "?/5 * * * * *" // every 5 seconds.
cron := New(WithSeconds())
var ticks int64 = 0
id, err := cron.AddFunc(spec, func() { atomic.AddInt64(&ticks, 1) })
checkNoError(err)
cron.Start()

for atomic.LoadInt64(&ticks) == 0 {
} // waiting for single execution of job.
checkNoError(cron.Pause(id))

<-time.After(12 * time.Second) // waiting 2 execution cycles + some buffer time (2 sec).
if atomic.LoadInt64(&ticks) != 1 { // the job should have just run once.
t.Error("failed to correctly pause job")
}
checkNoError(cron.Continue(id))
// next execution would be in approx 3 seconds as we had a 2 second buffer.
<-time.After(4 * time.Second) // waiting for one more execution of job (1sec buffer).
if atomic.LoadInt64(&ticks) != 2 { // the job should have run twice.
t.Error("failed to correctly continue job")
}
cron.Stop()
})

t.Run("pause and continue with multiple jobs", func(t *testing.T) {
spec := "?/5 * * * * *" // every 5 seconds.
cron := New(WithSeconds())
var tick1, tick2 int64
var id1, id2 EntryID
var err error
id1, err = cron.AddFunc(spec, func() {
atomic.AddInt64(&tick1, 1)
})
checkNoError(err)

id2, err = cron.AddFunc(spec, func() {
atomic.AddInt64(&tick2, 1)
})
checkNoError(err)

cron.Start()

for (atomic.LoadInt64(&tick1) == 0) && (atomic.LoadInt64(&tick2) == 0) {
} // waiting for both jobs to execute once.
checkNoError(cron.Pause(id1))

<-time.After(12 * time.Second) // waiting 2 execution cycles + some buffer time (2 sec).
if atomic.LoadInt64(&tick1) != 1 { // tick1 should not have changed as corresponding job was paused.
t.Error("failed to pause job with id = ", id1)
}
if atomic.LoadInt64(&tick2) != 3 {
// should not be here.
t.Error("job with id = ", id2, " did not execute required number of times")
}
checkNoError(cron.Pause(id2))
checkNoError(cron.Continue(id1))

<-time.After(4 * time.Second) // waiting for one more execution of job1 (1sec buffer).
if atomic.LoadInt64(&tick2) != 3 {
t.Error("failed to pause job with id = ", id2)
}
if atomic.LoadInt64(&tick1) != 2 {
t.Error("continued job with id = ", id1, " executed more times than required")
}
cron.Stop()
})
}

func wait(wg *sync.WaitGroup) chan bool {
ch := make(chan bool)
go func() {
Expand Down

0 comments on commit 81bea23

Please sign in to comment.