Skip to content

Commit

Permalink
test(dst): add support for simulating cron schedules (#189)
Browse files Browse the repository at this point in the history
* test(dst): add support for simulating cron schedules

* fix(utils): update util tests

* fix(typo)

* fix(asserts): for schedules w/ david

* fix(time): travel 50_000ms

* fix(time): cleanup

* fix(time): cleanup x2

* test(model): add read promises assert

* fix(feedback): from david

---------

Co-authored-by: Gabriel Guerra <[email protected]>
  • Loading branch information
guergabo and Gabriel Guerra committed Jan 8, 2024
1 parent f5755a4 commit 5b5ef90
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 48 deletions.
3 changes: 2 additions & 1 deletion cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ func RunDSTCmd() *cobra.Command {
}

dst := dst.New(&dst.Config{
Ticks: ticks,
Ticks: ticks,
TimeElapsedPerTick: 50_000, // milliseconds
Reqs: func() int {
return reqsPerTick.Resolve(r)
},
Expand Down
14 changes: 13 additions & 1 deletion internal/app/coroutines/schedulePromises.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/resonatehq/resonate/pkg/schedule"
)

var schedulesInflight = inflight{}

func SchedulePromises(t int64, config *system.Config) *Coroutine {
metadata := metadata.New(fmt.Sprintf("tick:%d:schedule", t))
metadata.Tags.Set("name", "schedule-promises")
Expand Down Expand Up @@ -56,7 +58,9 @@ func SchedulePromises(t int64, config *system.Config) *Coroutine {
continue
}

c.Scheduler.Add(schedulePromise(metadata.TransactionId, schedule))
if !schedulesInflight.get(sid(schedule)) {
c.Scheduler.Add(schedulePromise(metadata.TransactionId, schedule))
}
}
})
}
Expand All @@ -68,6 +72,10 @@ func schedulePromise(tid string, schedule *schedule.Schedule) *scheduler.Corouti
// handle creating promise (schedule run) and updating schedule record.

return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
// handle inflight cache
schedulesInflight.add(sid(schedule))
c.OnDone(func() { schedulesInflight.remove(sid(schedule)) })

next, err := util.Next(schedule.NextRunTime, schedule.Cron)
if err != nil {
slog.Error("failed to calculate next run time", "err", err)
Expand Down Expand Up @@ -168,3 +176,7 @@ func generatePromiseId(id string, vars map[string]string) (string, error) {

return replaced.String(), nil
}

func sid(schedule *schedule.Schedule) string {
return fmt.Sprintf("%s:%d", schedule.Id, schedule.NextRunTime)
}
2 changes: 1 addition & 1 deletion internal/kernel/t_api/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type CreateScheduleResponse struct {
type SearchSchedulesResponse struct {
Status ResponseStatus `json:"status"`
Cursor *Cursor[SearchSchedulesRequest] `json:"cursor,omitempty"`
Schedules []*schedule.Schedule `json:"promises,omitempty"`
Schedules []*schedule.Schedule `json:"schedules,omitempty"`
}

type ReadScheduleResponse struct {
Expand Down
15 changes: 11 additions & 4 deletions internal/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,26 @@ func TestNext(t *testing.T) {
expectedErr error
}{
{
name: "valid",
curr: 16551234321,
name: "valid cron",
curr: 1704719383520,
cronExp: "* * * * *",
expectedNext: 16551240000,
expectedNext: 1704719400000,
expectedErr: nil,
},
{
name: "invalid cron",
curr: 16551234321,
curr: 1704719383520,
cronExp: "random",
expectedNext: 0,
expectedErr: fmt.Errorf("expected 5 to 6 fields, found 1: [random]"),
},
{
name: "valid cron (small)",
curr: 0,
cronExp: "2 * * * *",
expectedNext: 120_000,
expectedErr: nil,
},
}

for _, tc := range testCases {
Expand Down
3 changes: 2 additions & 1 deletion pkg/schedule/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package schedule
import (
"fmt"

"github.com/resonatehq/resonate/internal/util"
"github.com/resonatehq/resonate/pkg/idempotency"
"github.com/resonatehq/resonate/pkg/promise"
)
Expand Down Expand Up @@ -34,7 +35,7 @@ func (s *Schedule) String() string {
s.PromiseTimeout,
s.PromiseParam,
s.PromiseTags,
s.LastRunTime,
util.SafeDeref(s.LastRunTime),
s.NextRunTime,
s.IdempotencyKey,
s.CreatedOn,
Expand Down
21 changes: 11 additions & 10 deletions test/dst/dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ import (
)

type Config struct {
Ticks int64
Reqs func() int
Ids int
IdempotencyKeys int
Headers int
Data int
Tags int
Urls int
Retries int
Ticks int64
TimeElapsedPerTick int64
Reqs func() int
Ids int
IdempotencyKeys int
Headers int
Data int
Tags int
Urls int
Retries int
}

type DST struct {
Expand Down Expand Up @@ -98,7 +99,7 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System,
var errs []error

// test loop
for t := int64(0); t < d.config.Ticks; t++ {
for t := int64(0); t < d.config.Ticks*d.config.TimeElapsedPerTick; t += d.config.TimeElapsedPerTick {
for _, req := range generator.Generate(r, t, d.config.Reqs(), model.cursors) {
req := req
reqTime := t
Expand Down
19 changes: 10 additions & 9 deletions test/dst/dst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,16 @@ func TestDST(t *testing.T) {
}

dst := New(&Config{
Ticks: 1000,
Reqs: func() int { return 100 },
Ids: 100,
IdempotencyKeys: 100,
Headers: 100,
Data: 100,
Tags: 100,
Urls: 100,
Retries: 100,
Ticks: 1000,
TimeElapsedPerTick: 50_000, // milliseconds
Reqs: func() int { return 100 },
Ids: 100,
IdempotencyKeys: 100,
Headers: 100,
Data: 100,
Tags: 100,
Urls: 100,
Retries: 100,
})

if errs := dst.Run(r, api, aio, system, reqs); len(errs) > 0 {
Expand Down
42 changes: 22 additions & 20 deletions test/dst/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ import (
)

type Generator struct {
ticks int64
idSet []string
idemotencyKeySet []*idempotency.Key
headersSet []map[string]string
dataSet [][]byte
tagsSet []map[string]string
urlSet []string
retrySet []int
requests []RequestGenerator
ticks int64
timeElapsedPerTick int64
idSet []string
idemotencyKeySet []*idempotency.Key
headersSet []map[string]string
dataSet [][]byte
tagsSet []map[string]string
urlSet []string
retrySet []int
requests []RequestGenerator
}

type RequestGenerator func(*rand.Rand, int64) *t_api.Request
Expand Down Expand Up @@ -74,14 +75,15 @@ func NewGenerator(r *rand.Rand, config *Config) *Generator {
}

return &Generator{
ticks: config.Ticks,
idSet: idSet,
idemotencyKeySet: idempotencyKeySet,
headersSet: headersSet,
dataSet: dataSet,
tagsSet: tagsSet,
urlSet: urlSet,
retrySet: retrySet,
ticks: config.Ticks,
timeElapsedPerTick: config.TimeElapsedPerTick,
idSet: idSet,
idemotencyKeySet: idempotencyKeySet,
headersSet: headersSet,
dataSet: dataSet,
tagsSet: tagsSet,
urlSet: urlSet,
retrySet: retrySet,
}
}

Expand Down Expand Up @@ -172,7 +174,7 @@ func (g *Generator) GenerateCreatePromise(r *rand.Rand, t int64) *t_api.Request
data := g.dataSet[r.Intn(len(g.dataSet))]
headers := g.headersSet[r.Intn(len(g.headersSet))]
tags := g.tagsSet[r.Intn(len(g.tagsSet))]
timeout := RangeInt63n(r, t, g.ticks)
timeout := RangeInt63n(r, t, g.ticks*g.timeElapsedPerTick)
strict := r.Intn(2) == 0

return &t_api.Request{
Expand Down Expand Up @@ -292,11 +294,11 @@ func (g *Generator) GenerateSearchSchedules(r *rand.Rand, t int64) *t_api.Reques

func (g *Generator) GenerateCreateSchedule(r *rand.Rand, t int64) *t_api.Request {
id := g.idSet[r.Intn(len(g.idSet))]
cron := fmt.Sprintf("%d %d * * *", r.Intn(60), r.Intn(24))
cron := fmt.Sprintf("%d * * * *", r.Intn(60))
tags := g.tagsSet[r.Intn(len(g.tagsSet))]
idempotencyKey := g.idemotencyKeySet[r.Intn(len(g.idemotencyKeySet))]

promiseTimeout := RangeInt63n(r, t, g.ticks)
promiseTimeout := RangeInt63n(r, t, g.ticks*g.timeElapsedPerTick)
promiseHeaders := g.headersSet[r.Intn(len(g.headersSet))]
promiseData := g.dataSet[r.Intn(len(g.dataSet))]
promiseTags := g.tagsSet[r.Intn(len(g.tagsSet))]
Expand Down
18 changes: 17 additions & 1 deletion test/dst/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,16 @@ func (m *Model) ValidateReadSchedule(req *t_api.Request, res *t_api.Response) er

switch res.ReadSchedule.Status {
case t_api.StatusOK:
sm.schedule = res.ReadSchedule.Schedule
s := res.ReadSchedule.Schedule // schedule response

if s.NextRunTime < sm.schedule.NextRunTime {
return fmt.Errorf("unexpected nextRunTime, schedule nextRunTime %d is greater than the request nextRunTime %d", s.NextRunTime, sm.schedule.NextRunTime)
}
if (s.LastRunTime != nil && sm.schedule.LastRunTime != nil) && *s.LastRunTime < *sm.schedule.LastRunTime {
return fmt.Errorf("unexpected lastRunTime, schedule lastRunTime %d is greater than the request lastRunTime %d", s.LastRunTime, sm.schedule.LastRunTime)
}

sm.schedule = s
return nil
case t_api.StatusScheduleNotFound:
if sm.schedule != nil {
Expand Down Expand Up @@ -420,6 +429,13 @@ func (m *Model) ValidateSearchSchedules(req *t_api.Request, res *t_api.Response)
}
}

if s.NextRunTime < sm.schedule.NextRunTime {
return fmt.Errorf("unexpected nextRunTime, schedule nextRunTime %d is greater than the request nextRunTime %d", s.NextRunTime, sm.schedule.NextRunTime)
}
if (s.LastRunTime != nil && sm.schedule.LastRunTime != nil) && *s.LastRunTime < *sm.schedule.LastRunTime {
return fmt.Errorf("unexpected lastRunTime, schedule lastRunTime %d is greater than the request lastRunTime %d", s.LastRunTime, sm.schedule.LastRunTime)
}

// update schedule state
sm.schedule = s
}
Expand Down

0 comments on commit 5b5ef90

Please sign in to comment.