From 6ea0cdd425c2b979ebb3bbd3f51b7ab840bdb511 Mon Sep 17 00:00:00 2001 From: David Farr Date: Fri, 17 May 2024 15:19:33 -0700 Subject: [PATCH] Fix lazy promise transitions A promise cannot be guarenteed to be transitioned from pending to timedout when requests come in after the timeout time. To compensate for this, we lazily transition promises on request if we see that the promise should be timedout, but is not. This rarely taken code path had two bugs. 1. On create promise request, a promise that is lazily timedout should return a 200 if strict is false and the idempotency keys match. 2. On complete promise request, a promise that is lazily timedout should return a 200 if strict is false. This PR also introduces a lazy scenario to DST to test the lazy code path. When this scenario is executed, we disable the time out promises coroutine and search promises request. --- .github/workflows/dst.yaml | 4 ++-- cmd/dst/run.go | 17 +++++++++++++---- internal/app/coroutines/completePromise.go | 8 +++++++- internal/app/coroutines/createPromise.go | 18 ++++++++++++------ test/dst/dst.go | 4 ++++ test/dst/dst_test.go | 17 +++++++++++++++-- test/dst/model.go | 22 +++++++++++----------- 7 files changed, 64 insertions(+), 26 deletions(-) diff --git a/.github/workflows/dst.yaml b/.github/workflows/dst.yaml index c110c696..da91417f 100644 --- a/.github/workflows/dst.yaml +++ b/.github/workflows/dst.yaml @@ -53,7 +53,7 @@ jobs: strategy: fail-fast: false matrix: - scenario: [default, fault] + scenario: [default, fault, lazy] store: [sqlite, postgres] run: [1, 2] @@ -119,7 +119,7 @@ jobs: strategy: fail-fast: false matrix: - scenario: [default, fault] + scenario: [default, fault, lazy] store: [sqlite, postgres, sqlite/postgres] include: - store: sqlite diff --git a/cmd/dst/run.go b/cmd/dst/run.go index 9febbf7d..77a2cf88 100644 --- a/cmd/dst/run.go +++ b/cmd/dst/run.go @@ -96,8 +96,11 @@ func RunDSTCmd() *cobra.Command { case "fault": p = r.Float64() dstScenario = &dst.Scenario{Kind: dst.FaultInjection, FaultInjection: &dst.FaultInjectionScenario{P: p}} + case "lazy": + p = 0 + dstScenario = &dst.Scenario{Kind: dst.LazyTimeout, LazyTimeout: &dst.LazyTimeoutScenario{}} default: - return fmt.Errorf("invalid scenario: %s, permitted scenarios: {default, fault}", scenario) + return fmt.Errorf("invalid scenario: %s, permitted scenarios: {default, fault, lazy}", scenario) } // instatiate api/aio @@ -149,13 +152,11 @@ func RunDSTCmd() *cobra.Command { system.AddOnTick(1000, coroutines.EnqueueTasks) system.AddOnTick(1000, coroutines.TimeoutLocks) system.AddOnTick(1000, coroutines.SchedulePromises) - system.AddOnTick(1000, coroutines.TimeoutPromises) system.AddOnTick(1000, coroutines.NotifySubscriptions) reqs := []t_api.Kind{ // PROMISE t_api.ReadPromise, - t_api.SearchPromises, t_api.CreatePromise, t_api.CompletePromise, @@ -180,6 +181,14 @@ func RunDSTCmd() *cobra.Command { t_api.CompleteTask, } + // remove search promises and timeout promises if lazy timeout scenario + // this forces the "lazy" path to be taken for promises to transition + // to timedout state + if dstScenario.Kind != dst.LazyTimeout { + reqs = append(reqs, t_api.SearchPromises) + system.AddOnTick(1000, coroutines.TimeoutPromises) + } + dst := dst.New(&dst.Config{ Scenario: dstScenario, Ticks: ticks, @@ -220,7 +229,7 @@ func RunDSTCmd() *cobra.Command { cmd.Flags().Int64Var(&seed, "seed", 0, "dst seed") cmd.Flags().Int64Var(&ticks, "ticks", 1000, "number of ticks") - cmd.Flags().StringVar(&scenario, "scenario", "default", "can be one of: {default, fault}") + cmd.Flags().StringVar(&scenario, "scenario", "default", "can be one of: {default, fault, lazy}") // dst related values cmd.Flags().Var(&reqsPerTick, "reqs-per-tick", "number of requests per tick") diff --git a/internal/app/coroutines/completePromise.go b/internal/app/coroutines/completePromise.go index f8405fd6..a27b496b 100644 --- a/internal/app/coroutines/completePromise.go +++ b/internal/app/coroutines/completePromise.go @@ -74,10 +74,16 @@ func CompletePromise(metadata *metadata.Metadata, req *t_api.Request, res func(* completedState := promise.GetTimedoutState(p) util.Assert(completedState == promise.Timedout || completedState == promise.Resolved, "completedState must be Timedout or Resolved") + // If not strict, status is OK + status := t_api.StatusPromiseAlreadyTimedOut + if !req.CompletePromise.Strict { + status = t_api.StatusOK + } + res(&t_api.Response{ Kind: req.Kind, CompletePromise: &t_api.CompletePromiseResponse{ - Status: t_api.StatusPromiseAlreadyTimedOut, + Status: status, Promise: &promise.Promise{ Id: p.Id, State: completedState, diff --git a/internal/app/coroutines/createPromise.go b/internal/app/coroutines/createPromise.go index bbb544fc..d80378be 100644 --- a/internal/app/coroutines/createPromise.go +++ b/internal/app/coroutines/createPromise.go @@ -131,7 +131,6 @@ func CreatePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_ } else { c.Scheduler.Add(CreatePromise(metadata, req, res)) } - // todo: to here... } else { p, err := result.Records[0].Promise() if err != nil { @@ -140,12 +139,8 @@ func CreatePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_ return } + // initial status status := t_api.StatusPromiseAlreadyExists - strict := req.CreatePromise.Strict && p.State != promise.Pending - - if !strict && p.IdempotencyKeyForCreate.Match(req.CreatePromise.IdempotencyKey) { - status = t_api.StatusOK - } if p.State == promise.Pending && c.Time() >= p.Timeout { c.Scheduler.Add(TimeoutPromise(metadata, p, CreatePromise(metadata, req, res), func(err error) { @@ -158,6 +153,11 @@ func CreatePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_ completedState := promise.GetTimedoutState(p) util.Assert(completedState == promise.Timedout || completedState == promise.Resolved, "completedState must be Timedout or Resolved") + // switch status to ok if not strict and idempotency keys match + if !req.CreatePromise.Strict && p.IdempotencyKeyForCreate.Match(req.CreatePromise.IdempotencyKey) { + status = t_api.StatusOK + } + res(&t_api.Response{ Kind: t_api.CreatePromise, CreatePromise: &t_api.CreatePromiseResponse{ @@ -181,6 +181,12 @@ func CreatePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_ }, nil) })) } else { + // switch status to ok if not strict and idempotency keys match + strict := req.CreatePromise.Strict && p.State != promise.Pending + if !strict && p.IdempotencyKeyForCreate.Match(req.CreatePromise.IdempotencyKey) { + status = t_api.StatusOK + } + res(&t_api.Response{ Kind: t_api.CreatePromise, CreatePromise: &t_api.CreatePromiseResponse{ diff --git a/test/dst/dst.go b/test/dst/dst.go index 82322e73..43076e04 100644 --- a/test/dst/dst.go +++ b/test/dst/dst.go @@ -36,6 +36,7 @@ type Scenario struct { Kind Kind Default *DefaultScenario FaultInjection *FaultInjectionScenario + LazyTimeout *LazyTimeoutScenario } type Kind string @@ -43,6 +44,7 @@ type Kind string const ( Default Kind = "default" FaultInjection Kind = "fault" + LazyTimeout Kind = "lazy" ) type DefaultScenario struct{} @@ -51,6 +53,8 @@ type FaultInjectionScenario struct { P float64 } +type LazyTimeoutScenario struct{} + func New(config *Config) *DST { return &DST{ config: config, diff --git a/test/dst/dst_test.go b/test/dst/dst_test.go index 0f89f66c..73337412 100644 --- a/test/dst/dst_test.go +++ b/test/dst/dst_test.go @@ -30,6 +30,13 @@ func TestDSTFaultInjection(t *testing.T) { }) } +func TestDSTLazyTimeout(t *testing.T) { + test(t, &Scenario{ + Kind: LazyTimeout, + LazyTimeout: &LazyTimeoutScenario{}, + }) +} + func test(t *testing.T, scenario *Scenario) { r := rand.New(rand.NewSource(0)) @@ -93,14 +100,12 @@ func test(t *testing.T, scenario *Scenario) { system.AddOnTick(1000, coroutines.EnqueueTasks) system.AddOnTick(1000, coroutines.TimeoutLocks) system.AddOnTick(1000, coroutines.SchedulePromises) - system.AddOnTick(1000, coroutines.TimeoutPromises) system.AddOnTick(1000, coroutines.NotifySubscriptions) // specify reqs to enable reqs := []t_api.Kind{ // PROMISE t_api.ReadPromise, - t_api.SearchPromises, t_api.CreatePromise, t_api.CompletePromise, @@ -125,6 +130,14 @@ func test(t *testing.T, scenario *Scenario) { t_api.CompleteTask, } + // remove search promises and timeout promises if lazy timeout scenario + // this forces the "lazy" path to be taken for promises to transition + // to timedout state + if scenario.Kind != LazyTimeout { + reqs = append(reqs, t_api.SearchPromises) + system.AddOnTick(1000, coroutines.TimeoutPromises) + } + // start api/aio if err := api.Start(); err != nil { t.Fatal(err) diff --git a/test/dst/model.go b/test/dst/model.go index 51536b5b..8846aac1 100644 --- a/test/dst/model.go +++ b/test/dst/model.go @@ -245,7 +245,7 @@ func (m *Model) ValidateCreatePromise(reqTime int64, resTime int64, req *t_api.R switch res.CreatePromise.Status { case t_api.StatusOK: if pm.promise != nil { - if !pm.idempotencyKeyForCreateMatch(res.CreatePromise.Promise) { + if !pm.promise.IdempotencyKeyForCreate.Match(res.CreatePromise.Promise.IdempotencyKeyForCreate) { return fmt.Errorf("ikey mismatch (%s, %s)", pm.promise.IdempotencyKeyForCreate, res.CreatePromise.Promise.IdempotencyKeyForCreate) } else if req.CreatePromise.Strict && pm.promise.State != promise.Pending { return fmt.Errorf("unexpected state %s when strict true", pm.promise.State) @@ -288,6 +288,9 @@ func (m *Model) ValidateCreatePromise(reqTime int64, resTime int64, req *t_api.R pm.promise = res.CreatePromise.Promise return nil case t_api.StatusPromiseAlreadyExists: + if !req.CreatePromise.Strict && req.CreatePromise.IdempotencyKey.Match(res.CreatePromise.Promise.IdempotencyKeyForCreate) { + return fmt.Errorf("unexpected response code (%d) when strict=false and ikeys match", res.CreatePromise.Status) + } return nil case t_api.StatusPromiseNotFound: return fmt.Errorf("invalid response '%d' for create promise request", res.CreatePromise.Status) @@ -302,7 +305,7 @@ func (m *Model) ValidateCompletePromise(reqTime int64, resTime int64, req *t_api switch res.CompletePromise.Status { case t_api.StatusOK: if pm.completed() { - if !pm.idempotencyKeyForCompleteMatch(res.CompletePromise.Promise) && + if !pm.promise.IdempotencyKeyForComplete.Match(res.CompletePromise.Promise.IdempotencyKeyForComplete) && (req.CompletePromise.Strict || pm.promise.State != promise.Timedout) { return fmt.Errorf("ikey mismatch (%s, %s)", pm.promise.IdempotencyKeyForComplete, res.CompletePromise.Promise.IdempotencyKeyForComplete) } else if req.CompletePromise.Strict && pm.promise.State != req.CompletePromise.State { @@ -340,7 +343,12 @@ func (m *Model) ValidateCompletePromise(reqTime int64, resTime int64, req *t_api // update model state pm.promise = res.CompletePromise.Promise return nil - case t_api.StatusPromiseAlreadyResolved, t_api.StatusPromiseAlreadyRejected, t_api.StatusPromiseAlreadyCanceled, t_api.StatusPromiseAlreadyTimedOut: + case t_api.StatusPromiseAlreadyResolved, t_api.StatusPromiseAlreadyRejected, t_api.StatusPromiseAlreadyCanceled: + return nil + case t_api.StatusPromiseAlreadyTimedOut: + if !req.CompletePromise.Strict { + return fmt.Errorf("unexpected response code (%d) when state=timedout and strict=true", res.CompletePromise.Status) + } return nil case t_api.StatusPromiseNotFound: if pm.promise != nil { @@ -694,14 +702,6 @@ func (m *Model) ValidateCompleteTask(reqTime int64, resTime int64, req *t_api.Re // UTILS -func (m *PromiseModel) idempotencyKeyForCreateMatch(promise *promise.Promise) bool { - return m.promise.IdempotencyKeyForCreate != nil && promise.IdempotencyKeyForCreate != nil && *m.promise.IdempotencyKeyForCreate == *promise.IdempotencyKeyForCreate -} - -func (m *PromiseModel) idempotencyKeyForCompleteMatch(promise *promise.Promise) bool { - return m.promise.IdempotencyKeyForComplete != nil && promise.IdempotencyKeyForComplete != nil && *m.promise.IdempotencyKeyForComplete == *promise.IdempotencyKeyForComplete -} - func (m *PromiseModel) completed() bool { return m.promise != nil && m.promise.State != promise.Pending }