Skip to content

Commit

Permalink
Add scenario type to dst (#278)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfarr authored Apr 8, 2024
1 parent 1c84c31 commit 4caa707
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 68 deletions.
48 changes: 29 additions & 19 deletions cmd/dst/run.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dst

import (
"fmt"
"log/slog"
"math/rand" // nosemgrep
netHttp "net/http"
Expand All @@ -26,18 +27,17 @@ import (

func RunDSTCmd() *cobra.Command {
var (
seed int64
ticks int64
reqsPerTick = util.RangeIntFlag{Min: 1, Max: 1000}
ids = util.RangeIntFlag{Min: 1, Max: 1000}
idempotencyKeys = util.RangeIntFlag{Min: 1, Max: 1000}
headers = util.RangeIntFlag{Min: 1, Max: 1000}
data = util.RangeIntFlag{Min: 1, Max: 1000}
tags = util.RangeIntFlag{Min: 1, Max: 1000}
urls = util.RangeIntFlag{Min: 1, Max: 1000}
retries = util.RangeIntFlag{Min: 1, Max: 1000}
failureProbability float64
faultInjectionMode int
seed int64
ticks int64
scenario string
reqsPerTick = util.RangeIntFlag{Min: 1, Max: 1000}
ids = util.RangeIntFlag{Min: 1, Max: 1000}
idempotencyKeys = util.RangeIntFlag{Min: 1, Max: 1000}
headers = util.RangeIntFlag{Min: 1, Max: 1000}
data = util.RangeIntFlag{Min: 1, Max: 1000}
tags = util.RangeIntFlag{Min: 1, Max: 1000}
urls = util.RangeIntFlag{Min: 1, Max: 1000}
retries = util.RangeIntFlag{Min: 1, Max: 1000}
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -84,15 +84,24 @@ func RunDSTCmd() *cobra.Command {

go metricsServer.ListenAndServe() // nolint: errcheck

scenario := dst.Default
aioScenario := aio.Default
if faultInjectionMode > 0 {
scenario, aioScenario = dst.FaultInjection, aio.FaultInjection
// scenario
var p float64
var dstScenario *dst.Scenario

switch scenario {
case "default":
p = 0
dstScenario = &dst.Scenario{Kind: dst.Default, Default: &dst.DefaultScenario{}}
case "fault":
p = r.Float64()
dstScenario = &dst.Scenario{Kind: dst.FaultInjection, FaultInjection: &dst.FaultInjectionScenario{P: p}}
default:
return fmt.Errorf("invalid scenario: %s, permitted scenarios: {default, fault}", scenario)
}

// instatiate api/aio
api := api.New(config.API.Size, metrics)
aio := aio.NewDST(r, metrics, failureProbability, aioScenario)
aio := aio.NewDST(r, p, metrics)

// instatiate aio subsystems
network := network.NewDST(config.AIO.Subsystems.NetworkDST.Config, rand.New(rand.NewSource(r.Int63())))
Expand Down Expand Up @@ -166,6 +175,7 @@ func RunDSTCmd() *cobra.Command {
}

dst := dst.New(&dst.Config{
Scenario: dstScenario,
Ticks: ticks,
TimeElapsedPerTick: 50_000, // milliseconds
Reqs: func() int {
Expand All @@ -178,7 +188,7 @@ func RunDSTCmd() *cobra.Command {
Tags: tags.Resolve(r),
Urls: urls.Resolve(r),
Retries: retries.Resolve(r),
}, scenario)
})

slog.Info("DST", "seed", seed, "ticks", ticks, "reqs", reqsPerTick.String(), "dst", dst, "system", system)
if errs := dst.Run(r, api, aio, system, reqs); len(errs) > 0 {
Expand All @@ -204,6 +214,7 @@ func RunDSTCmd() *cobra.Command {

cmd.Flags().Int64Var(&seed, "seed", 0, "dst seed")
cmd.Flags().Int64Var(&ticks, "ticks", 1000, "number of ticks")
cmd.Flags().StringVar(&scenario, "scenario", "default", "can be one of: {default, fault}")

// dst related values
cmd.Flags().Var(&reqsPerTick, "reqs-per-tick", "number of requests per tick")
Expand All @@ -214,7 +225,6 @@ func RunDSTCmd() *cobra.Command {
cmd.Flags().Var(&tags, "tags", "number promise tags")
cmd.Flags().Var(&urls, "urls", "number subscription urls")
cmd.Flags().Var(&retries, "retries", "number subscription retries")
cmd.Flags().Float64Var(&failureProbability, "failure-probability", 0.5, "probability of aio failure")

// api
cmd.Flags().Var(&util.RangeIntFlag{Min: 1, Max: 1000000}, "api-size", "size of the submission queue buffered channel")
Expand Down
30 changes: 11 additions & 19 deletions internal/aio/aio_dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,21 @@ import (
"github.com/resonatehq/resonate/internal/util"
)

type Scenario int
type aioDST struct {
r *rand.Rand
sqes []*bus.SQE[t_aio.Submission, t_aio.Completion]
cqes []*bus.CQE[t_aio.Submission, t_aio.Completion]
subsystems map[t_aio.Kind]Subsystem
metrics *metrics.Metrics
failureProbability float64
faultInjectionMode Scenario
r *rand.Rand
p float64
sqes []*bus.SQE[t_aio.Submission, t_aio.Completion]
cqes []*bus.CQE[t_aio.Submission, t_aio.Completion]
subsystems map[t_aio.Kind]Subsystem
metrics *metrics.Metrics
}

const (
Default Scenario = iota
FaultInjection
)

func NewDST(r *rand.Rand, metrics *metrics.Metrics, failureProbability float64, faultInjectionMode Scenario) *aioDST {
func NewDST(r *rand.Rand, p float64, metrics *metrics.Metrics) *aioDST {
return &aioDST{
r: r,
subsystems: map[t_aio.Kind]Subsystem{},
metrics: metrics,
failureProbability: failureProbability,
faultInjectionMode: faultInjectionMode,
r: r,
p: p,
subsystems: map[t_aio.Kind]Subsystem{},
metrics: metrics,
}
}

Expand Down
32 changes: 21 additions & 11 deletions test/dst/dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ import (
"github.com/resonatehq/resonate/internal/kernel/t_api"
)

type DST struct {
config *Config
}

type Config struct {
Scenario *Scenario
Ticks int64
TimeElapsedPerTick int64
Reqs func() int
Expand All @@ -25,25 +30,30 @@ type Config struct {
Tags int
Urls int
Retries int
FaultInjectionMode bool
}

type DST struct {
config *Config
scenario Scenario
type Scenario struct {
Kind Kind
Default *DefaultScenario
FaultInjection *FaultInjectionScenario
}

type Scenario int
type Kind string

const (
Default Scenario = iota
FaultInjection
Default Kind = "default"
FaultInjection = "fault"
)

func New(config *Config, scenario Scenario) *DST {
type DefaultScenario struct{}

type FaultInjectionScenario struct {
P float64
}

func New(config *Config) *DST {
return &DST{
config: config,
scenario: Default,
config: config,
}
}

Expand All @@ -52,7 +62,7 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System,
generator := NewGenerator(r, d.config)

// model
model := NewModel(d.scenario)
model := NewModel(d.config.Scenario)

// add req/res
for _, req := range reqs {
Expand Down
33 changes: 14 additions & 19 deletions test/dst/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
// Model

type Model struct {
promises Promises
schedules Schedules
locks Locks
tasks Tasks
cursors []*t_api.Request
responses map[t_api.Kind]ResponseValidator
faultInjectionMode bool
scenario *Scenario
promises Promises
schedules Schedules
locks Locks
tasks Tasks
cursors []*t_api.Request
responses map[t_api.Kind]ResponseValidator
}

type PromiseModel struct {
Expand Down Expand Up @@ -113,19 +113,14 @@ func (t Tasks) Get(id string) *TaskModel {
return t[id]
}

func NewModel(dstScenario Scenario) *Model {
faultInjectionMode := false
if dstScenario == 1 {
faultInjectionMode = true
}

func NewModel(scenario *Scenario) *Model {
return &Model{
promises: map[string]*PromiseModel{},
schedules: map[string]*ScheduleModel{},
locks: map[string]*LockModel{},
tasks: map[string]*TaskModel{},
responses: map[t_api.Kind]ResponseValidator{},
faultInjectionMode: faultInjectionMode,
scenario: scenario,
promises: map[string]*PromiseModel{},
schedules: map[string]*ScheduleModel{},
locks: map[string]*LockModel{},
tasks: map[string]*TaskModel{},
responses: map[t_api.Kind]ResponseValidator{},
}
}

Expand Down

0 comments on commit 4caa707

Please sign in to comment.