Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lazy promise transitions #331

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/dst.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
strategy:
fail-fast: false
matrix:
scenario: [default, fault]
scenario: [default, fault, lazy]
store: [sqlite, postgres]
run: [1, 2]

Expand Down Expand Up @@ -119,7 +119,7 @@ jobs:
strategy:
fail-fast: false
matrix:
scenario: [default, fault]
scenario: [default, fault, lazy]
store: [sqlite, postgres, sqlite/postgres]
include:
- store: sqlite
Expand Down
17 changes: 13 additions & 4 deletions cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@ func RunDSTCmd() *cobra.Command {
case "fault":
p = r.Float64()
dstScenario = &dst.Scenario{Kind: dst.FaultInjection, FaultInjection: &dst.FaultInjectionScenario{P: p}}
case "lazy":
p = 0
dstScenario = &dst.Scenario{Kind: dst.LazyTimeout, LazyTimeout: &dst.LazyTimeoutScenario{}}
default:
return fmt.Errorf("invalid scenario: %s, permitted scenarios: {default, fault}", scenario)
return fmt.Errorf("invalid scenario: %s, permitted scenarios: {default, fault, lazy}", scenario)
}

// instatiate api/aio
Expand Down Expand Up @@ -149,13 +152,11 @@ func RunDSTCmd() *cobra.Command {
system.AddOnTick(1000, coroutines.EnqueueTasks)
system.AddOnTick(1000, coroutines.TimeoutLocks)
system.AddOnTick(1000, coroutines.SchedulePromises)
system.AddOnTick(1000, coroutines.TimeoutPromises)
system.AddOnTick(1000, coroutines.NotifySubscriptions)

reqs := []t_api.Kind{
// PROMISE
t_api.ReadPromise,
t_api.SearchPromises,
t_api.CreatePromise,
t_api.CompletePromise,

Expand All @@ -180,6 +181,14 @@ func RunDSTCmd() *cobra.Command {
t_api.CompleteTask,
}

// remove search promises and timeout promises if lazy timeout scenario
// this forces the "lazy" path to be taken for promises to transition
// to timedout state
if dstScenario.Kind != dst.LazyTimeout {
reqs = append(reqs, t_api.SearchPromises)
system.AddOnTick(1000, coroutines.TimeoutPromises)
}

dst := dst.New(&dst.Config{
Scenario: dstScenario,
Ticks: ticks,
Expand Down Expand Up @@ -220,7 +229,7 @@ func RunDSTCmd() *cobra.Command {

cmd.Flags().Int64Var(&seed, "seed", 0, "dst seed")
cmd.Flags().Int64Var(&ticks, "ticks", 1000, "number of ticks")
cmd.Flags().StringVar(&scenario, "scenario", "default", "can be one of: {default, fault}")
cmd.Flags().StringVar(&scenario, "scenario", "default", "can be one of: {default, fault, lazy}")

// dst related values
cmd.Flags().Var(&reqsPerTick, "reqs-per-tick", "number of requests per tick")
Expand Down
8 changes: 7 additions & 1 deletion internal/app/coroutines/completePromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,16 @@
completedState := promise.GetTimedoutState(p)
util.Assert(completedState == promise.Timedout || completedState == promise.Resolved, "completedState must be Timedout or Resolved")

// If not strict, status is OK
status := t_api.StatusPromiseAlreadyTimedOut
if !req.CompletePromise.Strict {
status = t_api.StatusOK

Check warning on line 80 in internal/app/coroutines/completePromise.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/completePromise.go#L80

Added line #L80 was not covered by tests
}

res(&t_api.Response{
Kind: req.Kind,
CompletePromise: &t_api.CompletePromiseResponse{
Status: t_api.StatusPromiseAlreadyTimedOut,
Status: status,
Promise: &promise.Promise{
Id: p.Id,
State: completedState,
Expand Down
18 changes: 12 additions & 6 deletions internal/app/coroutines/createPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@
} else {
c.Scheduler.Add(CreatePromise(metadata, req, res))
}
// todo: to here...
} else {
p, err := result.Records[0].Promise()
if err != nil {
Expand All @@ -140,12 +139,8 @@
return
}

// initial status
status := t_api.StatusPromiseAlreadyExists
strict := req.CreatePromise.Strict && p.State != promise.Pending

if !strict && p.IdempotencyKeyForCreate.Match(req.CreatePromise.IdempotencyKey) {
status = t_api.StatusOK
}

if p.State == promise.Pending && c.Time() >= p.Timeout {
c.Scheduler.Add(TimeoutPromise(metadata, p, CreatePromise(metadata, req, res), func(err error) {
Expand All @@ -158,6 +153,11 @@
completedState := promise.GetTimedoutState(p)
util.Assert(completedState == promise.Timedout || completedState == promise.Resolved, "completedState must be Timedout or Resolved")

// switch status to ok if not strict and idempotency keys match
if !req.CreatePromise.Strict && p.IdempotencyKeyForCreate.Match(req.CreatePromise.IdempotencyKey) {
status = t_api.StatusOK

Check warning on line 158 in internal/app/coroutines/createPromise.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/createPromise.go#L158

Added line #L158 was not covered by tests
}

res(&t_api.Response{
Kind: t_api.CreatePromise,
CreatePromise: &t_api.CreatePromiseResponse{
Expand All @@ -181,6 +181,12 @@
}, nil)
}))
} else {
// switch status to ok if not strict and idempotency keys match
strict := req.CreatePromise.Strict && p.State != promise.Pending
if !strict && p.IdempotencyKeyForCreate.Match(req.CreatePromise.IdempotencyKey) {
status = t_api.StatusOK
}

res(&t_api.Response{
Kind: t_api.CreatePromise,
CreatePromise: &t_api.CreatePromiseResponse{
Expand Down
4 changes: 4 additions & 0 deletions test/dst/dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ type Scenario struct {
Kind Kind
Default *DefaultScenario
FaultInjection *FaultInjectionScenario
LazyTimeout *LazyTimeoutScenario
}

type Kind string

const (
Default Kind = "default"
FaultInjection Kind = "fault"
LazyTimeout Kind = "lazy"
)

type DefaultScenario struct{}
Expand All @@ -51,6 +53,8 @@ type FaultInjectionScenario struct {
P float64
}

type LazyTimeoutScenario struct{}

func New(config *Config) *DST {
return &DST{
config: config,
Expand Down
17 changes: 15 additions & 2 deletions test/dst/dst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ func TestDSTFaultInjection(t *testing.T) {
})
}

func TestDSTLazyTimeout(t *testing.T) {
test(t, &Scenario{
Kind: LazyTimeout,
LazyTimeout: &LazyTimeoutScenario{},
})
}

func test(t *testing.T, scenario *Scenario) {
r := rand.New(rand.NewSource(0))

Expand Down Expand Up @@ -93,14 +100,12 @@ func test(t *testing.T, scenario *Scenario) {
system.AddOnTick(1000, coroutines.EnqueueTasks)
system.AddOnTick(1000, coroutines.TimeoutLocks)
system.AddOnTick(1000, coroutines.SchedulePromises)
system.AddOnTick(1000, coroutines.TimeoutPromises)
system.AddOnTick(1000, coroutines.NotifySubscriptions)

// specify reqs to enable
reqs := []t_api.Kind{
// PROMISE
t_api.ReadPromise,
t_api.SearchPromises,
t_api.CreatePromise,
t_api.CompletePromise,

Expand All @@ -125,6 +130,14 @@ func test(t *testing.T, scenario *Scenario) {
t_api.CompleteTask,
}

// remove search promises and timeout promises if lazy timeout scenario
// this forces the "lazy" path to be taken for promises to transition
// to timedout state
if scenario.Kind != LazyTimeout {
reqs = append(reqs, t_api.SearchPromises)
system.AddOnTick(1000, coroutines.TimeoutPromises)
}

// start api/aio
if err := api.Start(); err != nil {
t.Fatal(err)
Expand Down
22 changes: 11 additions & 11 deletions test/dst/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@
switch res.CreatePromise.Status {
case t_api.StatusOK:
if pm.promise != nil {
if !pm.idempotencyKeyForCreateMatch(res.CreatePromise.Promise) {
if !pm.promise.IdempotencyKeyForCreate.Match(res.CreatePromise.Promise.IdempotencyKeyForCreate) {
return fmt.Errorf("ikey mismatch (%s, %s)", pm.promise.IdempotencyKeyForCreate, res.CreatePromise.Promise.IdempotencyKeyForCreate)
} else if req.CreatePromise.Strict && pm.promise.State != promise.Pending {
return fmt.Errorf("unexpected state %s when strict true", pm.promise.State)
Expand Down Expand Up @@ -288,6 +288,9 @@
pm.promise = res.CreatePromise.Promise
return nil
case t_api.StatusPromiseAlreadyExists:
if !req.CreatePromise.Strict && req.CreatePromise.IdempotencyKey.Match(res.CreatePromise.Promise.IdempotencyKeyForCreate) {
return fmt.Errorf("unexpected response code (%d) when strict=false and ikeys match", res.CreatePromise.Status)

Check warning on line 292 in test/dst/model.go

View check run for this annotation

Codecov / codecov/patch

test/dst/model.go#L292

Added line #L292 was not covered by tests
}
return nil
case t_api.StatusPromiseNotFound:
return fmt.Errorf("invalid response '%d' for create promise request", res.CreatePromise.Status)
Expand All @@ -302,7 +305,7 @@
switch res.CompletePromise.Status {
case t_api.StatusOK:
if pm.completed() {
if !pm.idempotencyKeyForCompleteMatch(res.CompletePromise.Promise) &&
if !pm.promise.IdempotencyKeyForComplete.Match(res.CompletePromise.Promise.IdempotencyKeyForComplete) &&
(req.CompletePromise.Strict || pm.promise.State != promise.Timedout) {
return fmt.Errorf("ikey mismatch (%s, %s)", pm.promise.IdempotencyKeyForComplete, res.CompletePromise.Promise.IdempotencyKeyForComplete)
} else if req.CompletePromise.Strict && pm.promise.State != req.CompletePromise.State {
Expand Down Expand Up @@ -340,7 +343,12 @@
// update model state
pm.promise = res.CompletePromise.Promise
return nil
case t_api.StatusPromiseAlreadyResolved, t_api.StatusPromiseAlreadyRejected, t_api.StatusPromiseAlreadyCanceled, t_api.StatusPromiseAlreadyTimedOut:
case t_api.StatusPromiseAlreadyResolved, t_api.StatusPromiseAlreadyRejected, t_api.StatusPromiseAlreadyCanceled:
return nil
case t_api.StatusPromiseAlreadyTimedOut:
if !req.CompletePromise.Strict {
return fmt.Errorf("unexpected response code (%d) when state=timedout and strict=true", res.CompletePromise.Status)

Check warning on line 350 in test/dst/model.go

View check run for this annotation

Codecov / codecov/patch

test/dst/model.go#L350

Added line #L350 was not covered by tests
}
return nil
case t_api.StatusPromiseNotFound:
if pm.promise != nil {
Expand Down Expand Up @@ -694,14 +702,6 @@

// UTILS

func (m *PromiseModel) idempotencyKeyForCreateMatch(promise *promise.Promise) bool {
return m.promise.IdempotencyKeyForCreate != nil && promise.IdempotencyKeyForCreate != nil && *m.promise.IdempotencyKeyForCreate == *promise.IdempotencyKeyForCreate
}

func (m *PromiseModel) idempotencyKeyForCompleteMatch(promise *promise.Promise) bool {
return m.promise.IdempotencyKeyForComplete != nil && promise.IdempotencyKeyForComplete != nil && *m.promise.IdempotencyKeyForComplete == *promise.IdempotencyKeyForComplete
}

func (m *PromiseModel) completed() bool {
return m.promise != nil && m.promise.State != promise.Pending
}
Expand Down