Skip to content

Commit

Permalink
origin modules not active,thanks robfig#446
Browse files Browse the repository at this point in the history
  • Loading branch information
penglj committed May 9, 2022
1 parent e4edd23 commit a4b6a7b
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 7 deletions.
40 changes: 33 additions & 7 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ type Entry struct {
// Valid returns true if this is not the zero entry.
func (e Entry) Valid() bool { return e.ID != 0 }

// ScheduleFirst is used for the initial scheduling. If a Prev value has been
// included with the Entry, it will be used in place of "now" to allow schedules
// to be preserved across process restarts.
func (e Entry) ScheduleFirst(now time.Time) time.Time {
if !e.Prev.IsZero() {
return e.Schedule.Next(e.Prev)
} else {
return e.Schedule.Next(now)
}
}

// byTime is a wrapper for sorting the entry array by time
// (with zero time at the end).
type byTime []*Entry
Expand Down Expand Up @@ -154,25 +165,25 @@ func (f FuncJob) Run() { f() }
// AddFunc adds a func to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
return c.AddJob(spec, FuncJob(cmd))
func (c *Cron) AddFunc(spec string, cmd func(), entryOpts ...EntryOption) (EntryID, error) {
return c.AddJob(spec, FuncJob(cmd), entryOpts...)
}

// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
func (c *Cron) AddJob(spec string, cmd Job, entryOpts ...EntryOption) (EntryID, error) {
schedule, err := c.parser.Parse(spec)

if err != nil {
return 0, err
}
return c.Schedule(schedule, cmd), nil
return c.Schedule(schedule, cmd, entryOpts...), nil
}

// Schedule adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
func (c *Cron) Schedule(schedule Schedule, cmd Job, entryOpts ...EntryOption) EntryID {
c.runningMu.Lock()
defer c.runningMu.Unlock()
c.nextID++
Expand All @@ -182,6 +193,9 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
WrappedJob: c.chain.Then(cmd),
Job: cmd,
}
for _, fn := range entryOpts {
fn(entry)
}
if !c.running {
c.entries = append(c.entries, entry)
} else {
Expand Down Expand Up @@ -223,6 +237,18 @@ func (c *Cron) UpdateSchedule(id EntryID, schedule Schedule) error {
return errors.New(fmt.Sprintf("invalid ID provided: %d", id))
}

// EntryOption is a hook which allows the Entry to be altered before being
// committed internally.
type EntryOption func(*Entry)

// EntryPrev allows setting the Prev time to allow interval-based schedules to
// preserve their timeline even in the face of process restarts.
func WithPrev(prev time.Time) EntryOption {
return func(e *Entry) {
e.Prev = prev
}
}

// Entries returns a snapshot of the cron entries.
func (c *Cron) Entries() []Entry {
c.runningMu.Lock()
Expand Down Expand Up @@ -306,7 +332,7 @@ func (c *Cron) run() {
// Figure out the next activation times for each entry.
now := c.now()
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
entry.Next = entry.ScheduleFirst(now)
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
}

Expand Down Expand Up @@ -344,7 +370,7 @@ func (c *Cron) run() {
case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
newEntry.Next = newEntry.ScheduleFirst(now)
c.entries = append(c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)

Expand Down
14 changes: 14 additions & 0 deletions cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,20 @@ func TestStopAndWait(t *testing.T) {
})
}

func TestJobWithCustomPrev(t *testing.T) {
cron := New()
var calls int64
// running every 3s, but starting 2s in the past
// expected timeline: 1s ... 4s ... stop (2 calls)
// if prev was ignored, the func would only be called once (at 3s)
cron.AddFunc("@every 3s", func() { atomic.AddInt64(&calls, 1) }, WithPrev(time.Now().Add(-2*time.Second)))
cron.Start()
time.Sleep(5 * time.Second)
if atomic.LoadInt64(&calls) != 2 {
t.Errorf("called %d times, expected 2\n", calls)
}
}

func TestMultiThreadedStartAndStop(t *testing.T) {
cron := New()
go cron.Run()
Expand Down

0 comments on commit a4b6a7b

Please sign in to comment.