Skip to content

Commit

Permalink
Fix lazy promise transitions (#331)
Browse files Browse the repository at this point in the history
A promise cannot be guarenteed to be transitioned from pending to
timedout when requests come in after the timeout time. To
compensate for this, we lazily transition promises on request if
we see that the promise should be timedout, but is not. This
rarely taken code path had two bugs.

1. On create promise request, a promise that is lazily timedout
   should return a 200 if strict is false and the idempotency keys
   match.

2. On complete promise request, a promise that is lazily timedout
   should return a 200 if strict is false.

This PR also introduces a lazy scenario to DST to test the lazy
code path. When this scenario is executed, we disable the time out
promises coroutine and search promises request.
  • Loading branch information
dfarr authored May 20, 2024
1 parent 9f31bbe commit 4875c84
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 26 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/dst.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
strategy:
fail-fast: false
matrix:
scenario: [default, fault]
scenario: [default, fault, lazy]
store: [sqlite, postgres]
run: [1, 2]

Expand Down Expand Up @@ -119,7 +119,7 @@ jobs:
strategy:
fail-fast: false
matrix:
scenario: [default, fault]
scenario: [default, fault, lazy]
store: [sqlite, postgres, sqlite/postgres]
include:
- store: sqlite
Expand Down
17 changes: 13 additions & 4 deletions cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@ func RunDSTCmd() *cobra.Command {
case "fault":
p = r.Float64()
dstScenario = &dst.Scenario{Kind: dst.FaultInjection, FaultInjection: &dst.FaultInjectionScenario{P: p}}
case "lazy":
p = 0
dstScenario = &dst.Scenario{Kind: dst.LazyTimeout, LazyTimeout: &dst.LazyTimeoutScenario{}}
default:
return fmt.Errorf("invalid scenario: %s, permitted scenarios: {default, fault}", scenario)
return fmt.Errorf("invalid scenario: %s, permitted scenarios: {default, fault, lazy}", scenario)
}

// instatiate api/aio
Expand Down Expand Up @@ -149,13 +152,11 @@ func RunDSTCmd() *cobra.Command {
system.AddOnTick(1000, coroutines.EnqueueTasks)
system.AddOnTick(1000, coroutines.TimeoutLocks)
system.AddOnTick(1000, coroutines.SchedulePromises)
system.AddOnTick(1000, coroutines.TimeoutPromises)
system.AddOnTick(1000, coroutines.NotifySubscriptions)

reqs := []t_api.Kind{
// PROMISE
t_api.ReadPromise,
t_api.SearchPromises,
t_api.CreatePromise,
t_api.CompletePromise,

Expand All @@ -180,6 +181,14 @@ func RunDSTCmd() *cobra.Command {
t_api.CompleteTask,
}

// remove search promises and timeout promises if lazy timeout scenario
// this forces the "lazy" path to be taken for promises to transition
// to timedout state
if dstScenario.Kind != dst.LazyTimeout {
reqs = append(reqs, t_api.SearchPromises)
system.AddOnTick(1000, coroutines.TimeoutPromises)
}

dst := dst.New(&dst.Config{
Scenario: dstScenario,
Ticks: ticks,
Expand Down Expand Up @@ -220,7 +229,7 @@ func RunDSTCmd() *cobra.Command {

cmd.Flags().Int64Var(&seed, "seed", 0, "dst seed")
cmd.Flags().Int64Var(&ticks, "ticks", 1000, "number of ticks")
cmd.Flags().StringVar(&scenario, "scenario", "default", "can be one of: {default, fault}")
cmd.Flags().StringVar(&scenario, "scenario", "default", "can be one of: {default, fault, lazy}")

// dst related values
cmd.Flags().Var(&reqsPerTick, "reqs-per-tick", "number of requests per tick")
Expand Down
8 changes: 7 additions & 1 deletion internal/app/coroutines/completePromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,16 @@ func CompletePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*
completedState := promise.GetTimedoutState(p)
util.Assert(completedState == promise.Timedout || completedState == promise.Resolved, "completedState must be Timedout or Resolved")

// If not strict, status is OK
status := t_api.StatusPromiseAlreadyTimedOut
if !req.CompletePromise.Strict {
status = t_api.StatusOK
}

res(&t_api.Response{
Kind: req.Kind,
CompletePromise: &t_api.CompletePromiseResponse{
Status: t_api.StatusPromiseAlreadyTimedOut,
Status: status,
Promise: &promise.Promise{
Id: p.Id,
State: completedState,
Expand Down
18 changes: 12 additions & 6 deletions internal/app/coroutines/createPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func CreatePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_
} else {
c.Scheduler.Add(CreatePromise(metadata, req, res))
}
// todo: to here...
} else {
p, err := result.Records[0].Promise()
if err != nil {
Expand All @@ -140,12 +139,8 @@ func CreatePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_
return
}

// initial status
status := t_api.StatusPromiseAlreadyExists
strict := req.CreatePromise.Strict && p.State != promise.Pending

if !strict && p.IdempotencyKeyForCreate.Match(req.CreatePromise.IdempotencyKey) {
status = t_api.StatusOK
}

if p.State == promise.Pending && c.Time() >= p.Timeout {
c.Scheduler.Add(TimeoutPromise(metadata, p, CreatePromise(metadata, req, res), func(err error) {
Expand All @@ -158,6 +153,11 @@ func CreatePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_
completedState := promise.GetTimedoutState(p)
util.Assert(completedState == promise.Timedout || completedState == promise.Resolved, "completedState must be Timedout or Resolved")

// switch status to ok if not strict and idempotency keys match
if !req.CreatePromise.Strict && p.IdempotencyKeyForCreate.Match(req.CreatePromise.IdempotencyKey) {
status = t_api.StatusOK
}

res(&t_api.Response{
Kind: t_api.CreatePromise,
CreatePromise: &t_api.CreatePromiseResponse{
Expand All @@ -181,6 +181,12 @@ func CreatePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_
}, nil)
}))
} else {
// switch status to ok if not strict and idempotency keys match
strict := req.CreatePromise.Strict && p.State != promise.Pending
if !strict && p.IdempotencyKeyForCreate.Match(req.CreatePromise.IdempotencyKey) {
status = t_api.StatusOK
}

res(&t_api.Response{
Kind: t_api.CreatePromise,
CreatePromise: &t_api.CreatePromiseResponse{
Expand Down
4 changes: 4 additions & 0 deletions test/dst/dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ type Scenario struct {
Kind Kind
Default *DefaultScenario
FaultInjection *FaultInjectionScenario
LazyTimeout *LazyTimeoutScenario
}

type Kind string

const (
Default Kind = "default"
FaultInjection Kind = "fault"
LazyTimeout Kind = "lazy"
)

type DefaultScenario struct{}
Expand All @@ -51,6 +53,8 @@ type FaultInjectionScenario struct {
P float64
}

type LazyTimeoutScenario struct{}

func New(config *Config) *DST {
return &DST{
config: config,
Expand Down
17 changes: 15 additions & 2 deletions test/dst/dst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ func TestDSTFaultInjection(t *testing.T) {
})
}

func TestDSTLazyTimeout(t *testing.T) {
test(t, &Scenario{
Kind: LazyTimeout,
LazyTimeout: &LazyTimeoutScenario{},
})
}

func test(t *testing.T, scenario *Scenario) {
r := rand.New(rand.NewSource(0))

Expand Down Expand Up @@ -93,14 +100,12 @@ func test(t *testing.T, scenario *Scenario) {
system.AddOnTick(1000, coroutines.EnqueueTasks)
system.AddOnTick(1000, coroutines.TimeoutLocks)
system.AddOnTick(1000, coroutines.SchedulePromises)
system.AddOnTick(1000, coroutines.TimeoutPromises)
system.AddOnTick(1000, coroutines.NotifySubscriptions)

// specify reqs to enable
reqs := []t_api.Kind{
// PROMISE
t_api.ReadPromise,
t_api.SearchPromises,
t_api.CreatePromise,
t_api.CompletePromise,

Expand All @@ -125,6 +130,14 @@ func test(t *testing.T, scenario *Scenario) {
t_api.CompleteTask,
}

// remove search promises and timeout promises if lazy timeout scenario
// this forces the "lazy" path to be taken for promises to transition
// to timedout state
if scenario.Kind != LazyTimeout {
reqs = append(reqs, t_api.SearchPromises)
system.AddOnTick(1000, coroutines.TimeoutPromises)
}

// start api/aio
if err := api.Start(); err != nil {
t.Fatal(err)
Expand Down
22 changes: 11 additions & 11 deletions test/dst/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (m *Model) ValidateCreatePromise(reqTime int64, resTime int64, req *t_api.R
switch res.CreatePromise.Status {
case t_api.StatusOK:
if pm.promise != nil {
if !pm.idempotencyKeyForCreateMatch(res.CreatePromise.Promise) {
if !pm.promise.IdempotencyKeyForCreate.Match(res.CreatePromise.Promise.IdempotencyKeyForCreate) {
return fmt.Errorf("ikey mismatch (%s, %s)", pm.promise.IdempotencyKeyForCreate, res.CreatePromise.Promise.IdempotencyKeyForCreate)
} else if req.CreatePromise.Strict && pm.promise.State != promise.Pending {
return fmt.Errorf("unexpected state %s when strict true", pm.promise.State)
Expand Down Expand Up @@ -288,6 +288,9 @@ func (m *Model) ValidateCreatePromise(reqTime int64, resTime int64, req *t_api.R
pm.promise = res.CreatePromise.Promise
return nil
case t_api.StatusPromiseAlreadyExists:
if !req.CreatePromise.Strict && req.CreatePromise.IdempotencyKey.Match(res.CreatePromise.Promise.IdempotencyKeyForCreate) {
return fmt.Errorf("unexpected response code (%d) when strict=false and ikeys match", res.CreatePromise.Status)
}
return nil
case t_api.StatusPromiseNotFound:
return fmt.Errorf("invalid response '%d' for create promise request", res.CreatePromise.Status)
Expand All @@ -302,7 +305,7 @@ func (m *Model) ValidateCompletePromise(reqTime int64, resTime int64, req *t_api
switch res.CompletePromise.Status {
case t_api.StatusOK:
if pm.completed() {
if !pm.idempotencyKeyForCompleteMatch(res.CompletePromise.Promise) &&
if !pm.promise.IdempotencyKeyForComplete.Match(res.CompletePromise.Promise.IdempotencyKeyForComplete) &&
(req.CompletePromise.Strict || pm.promise.State != promise.Timedout) {
return fmt.Errorf("ikey mismatch (%s, %s)", pm.promise.IdempotencyKeyForComplete, res.CompletePromise.Promise.IdempotencyKeyForComplete)
} else if req.CompletePromise.Strict && pm.promise.State != req.CompletePromise.State {
Expand Down Expand Up @@ -340,7 +343,12 @@ func (m *Model) ValidateCompletePromise(reqTime int64, resTime int64, req *t_api
// update model state
pm.promise = res.CompletePromise.Promise
return nil
case t_api.StatusPromiseAlreadyResolved, t_api.StatusPromiseAlreadyRejected, t_api.StatusPromiseAlreadyCanceled, t_api.StatusPromiseAlreadyTimedOut:
case t_api.StatusPromiseAlreadyResolved, t_api.StatusPromiseAlreadyRejected, t_api.StatusPromiseAlreadyCanceled:
return nil
case t_api.StatusPromiseAlreadyTimedOut:
if !req.CompletePromise.Strict {
return fmt.Errorf("unexpected response code (%d) when state=timedout and strict=true", res.CompletePromise.Status)
}
return nil
case t_api.StatusPromiseNotFound:
if pm.promise != nil {
Expand Down Expand Up @@ -694,14 +702,6 @@ func (m *Model) ValidateCompleteTask(reqTime int64, resTime int64, req *t_api.Re

// UTILS

func (m *PromiseModel) idempotencyKeyForCreateMatch(promise *promise.Promise) bool {
return m.promise.IdempotencyKeyForCreate != nil && promise.IdempotencyKeyForCreate != nil && *m.promise.IdempotencyKeyForCreate == *promise.IdempotencyKeyForCreate
}

func (m *PromiseModel) idempotencyKeyForCompleteMatch(promise *promise.Promise) bool {
return m.promise.IdempotencyKeyForComplete != nil && promise.IdempotencyKeyForComplete != nil && *m.promise.IdempotencyKeyForComplete == *promise.IdempotencyKeyForComplete
}

func (m *PromiseModel) completed() bool {
return m.promise != nil && m.promise.State != promise.Pending
}
Expand Down

0 comments on commit 4875c84

Please sign in to comment.