diff --git a/.github/workflows/dst.yaml b/.github/workflows/dst.yaml index c110c696..da91417f 100644 --- a/.github/workflows/dst.yaml +++ b/.github/workflows/dst.yaml @@ -53,7 +53,7 @@ jobs: strategy: fail-fast: false matrix: - scenario: [default, fault] + scenario: [default, fault, lazy] store: [sqlite, postgres] run: [1, 2] @@ -119,7 +119,7 @@ jobs: strategy: fail-fast: false matrix: - scenario: [default, fault] + scenario: [default, fault, lazy] store: [sqlite, postgres, sqlite/postgres] include: - store: sqlite diff --git a/cmd/dst/run.go b/cmd/dst/run.go index 9febbf7d..77a2cf88 100644 --- a/cmd/dst/run.go +++ b/cmd/dst/run.go @@ -96,8 +96,11 @@ func RunDSTCmd() *cobra.Command { case "fault": p = r.Float64() dstScenario = &dst.Scenario{Kind: dst.FaultInjection, FaultInjection: &dst.FaultInjectionScenario{P: p}} + case "lazy": + p = 0 + dstScenario = &dst.Scenario{Kind: dst.LazyTimeout, LazyTimeout: &dst.LazyTimeoutScenario{}} default: - return fmt.Errorf("invalid scenario: %s, permitted scenarios: {default, fault}", scenario) + return fmt.Errorf("invalid scenario: %s, permitted scenarios: {default, fault, lazy}", scenario) } // instatiate api/aio @@ -149,13 +152,11 @@ func RunDSTCmd() *cobra.Command { system.AddOnTick(1000, coroutines.EnqueueTasks) system.AddOnTick(1000, coroutines.TimeoutLocks) system.AddOnTick(1000, coroutines.SchedulePromises) - system.AddOnTick(1000, coroutines.TimeoutPromises) system.AddOnTick(1000, coroutines.NotifySubscriptions) reqs := []t_api.Kind{ // PROMISE t_api.ReadPromise, - t_api.SearchPromises, t_api.CreatePromise, t_api.CompletePromise, @@ -180,6 +181,14 @@ func RunDSTCmd() *cobra.Command { t_api.CompleteTask, } + // remove search promises and timeout promises if lazy timeout scenario + // this forces the "lazy" path to be taken for promises to transition + // to timedout state + if dstScenario.Kind != dst.LazyTimeout { + reqs = append(reqs, t_api.SearchPromises) + system.AddOnTick(1000, coroutines.TimeoutPromises) + } + dst := dst.New(&dst.Config{ Scenario: dstScenario, Ticks: ticks, @@ -220,7 +229,7 @@ func RunDSTCmd() *cobra.Command { cmd.Flags().Int64Var(&seed, "seed", 0, "dst seed") cmd.Flags().Int64Var(&ticks, "ticks", 1000, "number of ticks") - cmd.Flags().StringVar(&scenario, "scenario", "default", "can be one of: {default, fault}") + cmd.Flags().StringVar(&scenario, "scenario", "default", "can be one of: {default, fault, lazy}") // dst related values cmd.Flags().Var(&reqsPerTick, "reqs-per-tick", "number of requests per tick") diff --git a/internal/app/coroutines/completePromise.go b/internal/app/coroutines/completePromise.go index f8405fd6..a27b496b 100644 --- a/internal/app/coroutines/completePromise.go +++ b/internal/app/coroutines/completePromise.go @@ -74,10 +74,16 @@ func CompletePromise(metadata *metadata.Metadata, req *t_api.Request, res func(* completedState := promise.GetTimedoutState(p) util.Assert(completedState == promise.Timedout || completedState == promise.Resolved, "completedState must be Timedout or Resolved") + // If not strict, status is OK + status := t_api.StatusPromiseAlreadyTimedOut + if !req.CompletePromise.Strict { + status = t_api.StatusOK + } + res(&t_api.Response{ Kind: req.Kind, CompletePromise: &t_api.CompletePromiseResponse{ - Status: t_api.StatusPromiseAlreadyTimedOut, + Status: status, Promise: &promise.Promise{ Id: p.Id, State: completedState, diff --git a/internal/app/coroutines/createPromise.go b/internal/app/coroutines/createPromise.go index bbb544fc..d80378be 100644 --- a/internal/app/coroutines/createPromise.go +++ b/internal/app/coroutines/createPromise.go @@ -131,7 +131,6 @@ func CreatePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_ } else { c.Scheduler.Add(CreatePromise(metadata, req, res)) } - // todo: to here... } else { p, err := result.Records[0].Promise() if err != nil { @@ -140,12 +139,8 @@ func CreatePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_ return } + // initial status status := t_api.StatusPromiseAlreadyExists - strict := req.CreatePromise.Strict && p.State != promise.Pending - - if !strict && p.IdempotencyKeyForCreate.Match(req.CreatePromise.IdempotencyKey) { - status = t_api.StatusOK - } if p.State == promise.Pending && c.Time() >= p.Timeout { c.Scheduler.Add(TimeoutPromise(metadata, p, CreatePromise(metadata, req, res), func(err error) { @@ -158,6 +153,11 @@ func CreatePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_ completedState := promise.GetTimedoutState(p) util.Assert(completedState == promise.Timedout || completedState == promise.Resolved, "completedState must be Timedout or Resolved") + // switch status to ok if not strict and idempotency keys match + if !req.CreatePromise.Strict && p.IdempotencyKeyForCreate.Match(req.CreatePromise.IdempotencyKey) { + status = t_api.StatusOK + } + res(&t_api.Response{ Kind: t_api.CreatePromise, CreatePromise: &t_api.CreatePromiseResponse{ @@ -181,6 +181,12 @@ func CreatePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_ }, nil) })) } else { + // switch status to ok if not strict and idempotency keys match + strict := req.CreatePromise.Strict && p.State != promise.Pending + if !strict && p.IdempotencyKeyForCreate.Match(req.CreatePromise.IdempotencyKey) { + status = t_api.StatusOK + } + res(&t_api.Response{ Kind: t_api.CreatePromise, CreatePromise: &t_api.CreatePromiseResponse{ diff --git a/test/dst/dst.go b/test/dst/dst.go index 82322e73..43076e04 100644 --- a/test/dst/dst.go +++ b/test/dst/dst.go @@ -36,6 +36,7 @@ type Scenario struct { Kind Kind Default *DefaultScenario FaultInjection *FaultInjectionScenario + LazyTimeout *LazyTimeoutScenario } type Kind string @@ -43,6 +44,7 @@ type Kind string const ( Default Kind = "default" FaultInjection Kind = "fault" + LazyTimeout Kind = "lazy" ) type DefaultScenario struct{} @@ -51,6 +53,8 @@ type FaultInjectionScenario struct { P float64 } +type LazyTimeoutScenario struct{} + func New(config *Config) *DST { return &DST{ config: config, diff --git a/test/dst/dst_test.go b/test/dst/dst_test.go index 0f89f66c..73337412 100644 --- a/test/dst/dst_test.go +++ b/test/dst/dst_test.go @@ -30,6 +30,13 @@ func TestDSTFaultInjection(t *testing.T) { }) } +func TestDSTLazyTimeout(t *testing.T) { + test(t, &Scenario{ + Kind: LazyTimeout, + LazyTimeout: &LazyTimeoutScenario{}, + }) +} + func test(t *testing.T, scenario *Scenario) { r := rand.New(rand.NewSource(0)) @@ -93,14 +100,12 @@ func test(t *testing.T, scenario *Scenario) { system.AddOnTick(1000, coroutines.EnqueueTasks) system.AddOnTick(1000, coroutines.TimeoutLocks) system.AddOnTick(1000, coroutines.SchedulePromises) - system.AddOnTick(1000, coroutines.TimeoutPromises) system.AddOnTick(1000, coroutines.NotifySubscriptions) // specify reqs to enable reqs := []t_api.Kind{ // PROMISE t_api.ReadPromise, - t_api.SearchPromises, t_api.CreatePromise, t_api.CompletePromise, @@ -125,6 +130,14 @@ func test(t *testing.T, scenario *Scenario) { t_api.CompleteTask, } + // remove search promises and timeout promises if lazy timeout scenario + // this forces the "lazy" path to be taken for promises to transition + // to timedout state + if scenario.Kind != LazyTimeout { + reqs = append(reqs, t_api.SearchPromises) + system.AddOnTick(1000, coroutines.TimeoutPromises) + } + // start api/aio if err := api.Start(); err != nil { t.Fatal(err) diff --git a/test/dst/model.go b/test/dst/model.go index 51536b5b..8846aac1 100644 --- a/test/dst/model.go +++ b/test/dst/model.go @@ -245,7 +245,7 @@ func (m *Model) ValidateCreatePromise(reqTime int64, resTime int64, req *t_api.R switch res.CreatePromise.Status { case t_api.StatusOK: if pm.promise != nil { - if !pm.idempotencyKeyForCreateMatch(res.CreatePromise.Promise) { + if !pm.promise.IdempotencyKeyForCreate.Match(res.CreatePromise.Promise.IdempotencyKeyForCreate) { return fmt.Errorf("ikey mismatch (%s, %s)", pm.promise.IdempotencyKeyForCreate, res.CreatePromise.Promise.IdempotencyKeyForCreate) } else if req.CreatePromise.Strict && pm.promise.State != promise.Pending { return fmt.Errorf("unexpected state %s when strict true", pm.promise.State) @@ -288,6 +288,9 @@ func (m *Model) ValidateCreatePromise(reqTime int64, resTime int64, req *t_api.R pm.promise = res.CreatePromise.Promise return nil case t_api.StatusPromiseAlreadyExists: + if !req.CreatePromise.Strict && req.CreatePromise.IdempotencyKey.Match(res.CreatePromise.Promise.IdempotencyKeyForCreate) { + return fmt.Errorf("unexpected response code (%d) when strict=false and ikeys match", res.CreatePromise.Status) + } return nil case t_api.StatusPromiseNotFound: return fmt.Errorf("invalid response '%d' for create promise request", res.CreatePromise.Status) @@ -302,7 +305,7 @@ func (m *Model) ValidateCompletePromise(reqTime int64, resTime int64, req *t_api switch res.CompletePromise.Status { case t_api.StatusOK: if pm.completed() { - if !pm.idempotencyKeyForCompleteMatch(res.CompletePromise.Promise) && + if !pm.promise.IdempotencyKeyForComplete.Match(res.CompletePromise.Promise.IdempotencyKeyForComplete) && (req.CompletePromise.Strict || pm.promise.State != promise.Timedout) { return fmt.Errorf("ikey mismatch (%s, %s)", pm.promise.IdempotencyKeyForComplete, res.CompletePromise.Promise.IdempotencyKeyForComplete) } else if req.CompletePromise.Strict && pm.promise.State != req.CompletePromise.State { @@ -340,7 +343,12 @@ func (m *Model) ValidateCompletePromise(reqTime int64, resTime int64, req *t_api // update model state pm.promise = res.CompletePromise.Promise return nil - case t_api.StatusPromiseAlreadyResolved, t_api.StatusPromiseAlreadyRejected, t_api.StatusPromiseAlreadyCanceled, t_api.StatusPromiseAlreadyTimedOut: + case t_api.StatusPromiseAlreadyResolved, t_api.StatusPromiseAlreadyRejected, t_api.StatusPromiseAlreadyCanceled: + return nil + case t_api.StatusPromiseAlreadyTimedOut: + if !req.CompletePromise.Strict { + return fmt.Errorf("unexpected response code (%d) when state=timedout and strict=true", res.CompletePromise.Status) + } return nil case t_api.StatusPromiseNotFound: if pm.promise != nil { @@ -694,14 +702,6 @@ func (m *Model) ValidateCompleteTask(reqTime int64, resTime int64, req *t_api.Re // UTILS -func (m *PromiseModel) idempotencyKeyForCreateMatch(promise *promise.Promise) bool { - return m.promise.IdempotencyKeyForCreate != nil && promise.IdempotencyKeyForCreate != nil && *m.promise.IdempotencyKeyForCreate == *promise.IdempotencyKeyForCreate -} - -func (m *PromiseModel) idempotencyKeyForCompleteMatch(promise *promise.Promise) bool { - return m.promise.IdempotencyKeyForComplete != nil && promise.IdempotencyKeyForComplete != nil && *m.promise.IdempotencyKeyForComplete == *promise.IdempotencyKeyForComplete -} - func (m *PromiseModel) completed() bool { return m.promise != nil && m.promise.State != promise.Pending }