From a8f67526ecddf33be64c7ec038bed7de35ba82da Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Thu, 23 May 2024 12:05:55 -0400 Subject: [PATCH 1/2] feat(back-channel): wip --- cmd/dst/run.go | 8 +- .../app/subsystems/aio/queuing/queuing_dst.go | 16 +++- test/dst/dst.go | 7 +- test/dst/dst_test.go | 36 ++++---- test/dst/model.go | 83 +++++++++++++++---- 5 files changed, 114 insertions(+), 36 deletions(-) diff --git a/cmd/dst/run.go b/cmd/dst/run.go index 77a2cf88..a73e5e1a 100644 --- a/cmd/dst/run.go +++ b/cmd/dst/run.go @@ -29,7 +29,7 @@ import ( func RunDSTCmd() *cobra.Command { var ( seed int64 - ticks int64 + ticks int64 // make hire... scenario string reqsPerTick = util.RangeIntFlag{Min: 1, Max: 1000} ids = util.RangeIntFlag{Min: 1, Max: 1000} @@ -113,6 +113,9 @@ func RunDSTCmd() *cobra.Command { if err != nil { return err } + + reqsPerTickSize := reqsPerTick.Resolve(r) + config.AIO.Subsystems.QueuingDST.Config.Queue = make(chan *queuing.ConnectionSubmissionDST, reqsPerTickSize) queuing, err := queuing.NewDST(config.AIO.Subsystems.QueuingDST.Config, rand.New(rand.NewSource(r.Int63()))) if err != nil { return err @@ -191,10 +194,11 @@ func RunDSTCmd() *cobra.Command { dst := dst.New(&dst.Config{ Scenario: dstScenario, + TaskQueue: config.AIO.Subsystems.QueuingDST.Config.Queue, Ticks: ticks, TimeElapsedPerTick: 50_000, // milliseconds Reqs: func() int { - return reqsPerTick.Resolve(r) + return reqsPerTickSize }, Ids: ids.Resolve(r), IdempotencyKeys: idempotencyKeys.Resolve(r), diff --git a/internal/app/subsystems/aio/queuing/queuing_dst.go b/internal/app/subsystems/aio/queuing/queuing_dst.go index 086c145a..6e4b7fef 100644 --- a/internal/app/subsystems/aio/queuing/queuing_dst.go +++ b/internal/app/subsystems/aio/queuing/queuing_dst.go @@ -11,7 +11,8 @@ import ( type ( ConfigDST struct { - P float32 + P float32 + Queue chan *ConnectionSubmissionDST } QueuingSubsystemDST struct { @@ -23,6 +24,12 @@ type ( config *ConfigDST r *rand.Rand } + + ConnectionSubmissionDST struct { + Queue string `json:"queue"` + TaskId string `json:"taskid"` + Counter int `json:"counter"` + } ) // NewDST is a simple helper functions that wraps New and returns a pre-configured QueuingSubsystem. @@ -49,6 +56,7 @@ func (q *QueuingSubsystemDST) Start() error { } func (q *QueuingSubsystemDST) Stop() error { + close(q.config.Queue) return nil } @@ -69,6 +77,12 @@ func (w *QueuingWorkerDST) Process(sqes []*bus.SQE[t_aio.Submission, t_aio.Compl for i, sqe := range sqes { util.Assert(sqe.Submission.Queuing != nil, "submission must not be nil") + w.config.Queue <- &ConnectionSubmissionDST{ + Queue: "analytics", + TaskId: sqe.Submission.Queuing.TaskId, + Counter: sqe.Submission.Queuing.Counter, + } + cqe := &bus.CQE[t_aio.Submission, t_aio.Completion]{ Metadata: sqe.Metadata, Callback: sqe.Callback, diff --git a/test/dst/dst.go b/test/dst/dst.go index 43076e04..426a5961 100644 --- a/test/dst/dst.go +++ b/test/dst/dst.go @@ -2,12 +2,12 @@ package dst import ( "fmt" - "log/slog" "math/rand" // nosemgrep "strconv" "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/api" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing" "github.com/resonatehq/resonate/internal/kernel/bus" "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/system" @@ -20,6 +20,7 @@ type DST struct { type Config struct { Scenario *Scenario + TaskQueue chan *queuing.ConnectionSubmissionDST Ticks int64 TimeElapsedPerTick int64 Reqs func() int @@ -66,7 +67,7 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System, generator := NewGenerator(r, d.config) // model - model := NewModel(d.config.Scenario) + model := NewModel(d.config.Scenario, d.config.TaskQueue) // add req/res for _, req := range reqs { @@ -155,7 +156,7 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System, if modelErr != nil { errs = append(errs, modelErr) } - slog.Info("DST", "t", fmt.Sprintf("%d|%d", reqTime, resTime), "tid", metadata.TransactionId, "req", req, "res", res, "err", err, "ok", modelErr == nil) + // slog.Info("DST", "t", fmt.Sprintf("%d|%d", reqTime, resTime), "tid", metadata.TransactionId, "req", req, "res", res, "err", err, "ok", modelErr == nil) }, }) diff --git a/test/dst/dst_test.go b/test/dst/dst_test.go index 73337412..a42cdff7 100644 --- a/test/dst/dst_test.go +++ b/test/dst/dst_test.go @@ -23,19 +23,19 @@ func TestDST(t *testing.T) { test(t, &Scenario{Kind: Default}) } -func TestDSTFaultInjection(t *testing.T) { - test(t, &Scenario{ - Kind: FaultInjection, - FaultInjection: &FaultInjectionScenario{P: 0.5}, - }) -} - -func TestDSTLazyTimeout(t *testing.T) { - test(t, &Scenario{ - Kind: LazyTimeout, - LazyTimeout: &LazyTimeoutScenario{}, - }) -} +// func TestDSTFaultInjection(t *testing.T) { +// test(t, &Scenario{ +// Kind: FaultInjection, +// FaultInjection: &FaultInjectionScenario{P: 0.5}, +// }) +// } + +// func TestDSTLazyTimeout(t *testing.T) { +// test(t, &Scenario{ +// Kind: LazyTimeout, +// LazyTimeout: &LazyTimeoutScenario{}, +// }) +// } func test(t *testing.T, scenario *Scenario) { r := rand.New(rand.NewSource(0)) @@ -69,7 +69,10 @@ func test(t *testing.T, scenario *Scenario) { if err != nil { t.Fatal(err) } - queuing, err := queuing.NewDST(&queuing.ConfigDST{P: 0.5}, r) + + reqsPerTickSize := 100 + taskQueue := make(chan *queuing.ConnectionSubmissionDST, reqsPerTickSize) + queuing, err := queuing.NewDST(&queuing.ConfigDST{P: 0.5, Queue: taskQueue}, r) if err != nil { t.Fatal(err) } @@ -147,9 +150,9 @@ func test(t *testing.T, scenario *Scenario) { } dst := New(&Config{ - Ticks: 1000, + Ticks: 3_000, TimeElapsedPerTick: 50_000, // milliseconds - Reqs: func() int { return 100 }, + Reqs: func() int { return reqsPerTickSize }, Ids: 100, IdempotencyKeys: 100, Headers: 100, @@ -158,6 +161,7 @@ func test(t *testing.T, scenario *Scenario) { Urls: 100, Retries: 100, Scenario: scenario, + TaskQueue: taskQueue, }) if errs := dst.Run(r, api, aio, system, reqs); len(errs) > 0 { diff --git a/test/dst/model.go b/test/dst/model.go index 8846aac1..f69241d7 100644 --- a/test/dst/model.go +++ b/test/dst/model.go @@ -5,6 +5,7 @@ package dst import ( "errors" "fmt" + "log/slog" "regexp" "strings" @@ -21,13 +22,15 @@ import ( // Model type Model struct { - scenario *Scenario - promises Promises - schedules Schedules - locks Locks - tasks Tasks - cursors []*t_api.Request - responses map[t_api.Kind]ResponseValidator + scenario *Scenario + promises Promises + schedules Schedules + locks Locks + tasks Tasks + taskQueueSubmissions TaskQueueSubmissions + cursors []*t_api.Request + responses map[t_api.Kind]ResponseValidator + taskQueue chan *queuing.ConnectionSubmissionDST } type PromiseModel struct { @@ -56,11 +59,17 @@ type TaskModel struct { task *task.Task } +type TaskQueueSubmissionModel struct { + id string + taskQueueSubmission *queuing.ConnectionSubmissionDST +} + type Promises map[string]*PromiseModel type Schedules map[string]*ScheduleModel type Subscriptions map[string]*SubscriptionModel type Locks map[string]*LockModel type Tasks map[string]*TaskModel +type TaskQueueSubmissions map[string]*TaskQueueSubmissionModel type ResponseValidator func(int64, int64, *t_api.Request, *t_api.Response) error func (p Promises) Get(id string) *PromiseModel { @@ -114,14 +123,26 @@ func (t Tasks) Get(id string) *TaskModel { return t[id] } -func NewModel(scenario *Scenario) *Model { +func (t TaskQueueSubmissions) Get(id string) *TaskQueueSubmissionModel { + if _, ok := t[id]; !ok { + t[id] = &TaskQueueSubmissionModel{ + id: id, + } + } + + return t[id] +} + +func NewModel(scenario *Scenario, queue chan *queuing.ConnectionSubmissionDST) *Model { return &Model{ - scenario: scenario, - promises: map[string]*PromiseModel{}, - schedules: map[string]*ScheduleModel{}, - locks: map[string]*LockModel{}, - tasks: map[string]*TaskModel{}, - responses: map[t_api.Kind]ResponseValidator{}, + scenario: scenario, + promises: map[string]*PromiseModel{}, + schedules: map[string]*ScheduleModel{}, + locks: map[string]*LockModel{}, + tasks: map[string]*TaskModel{}, + taskQueueSubmissions: map[string]*TaskQueueSubmissionModel{}, + responses: map[t_api.Kind]ResponseValidator{}, + taskQueue: queue, } } @@ -161,6 +182,8 @@ func (m *Model) Step(reqTime int64, resTime int64, req *t_api.Request, res *t_ap return fmt.Errorf("unexpected response kind '%d' for request kind '%d'", res.Kind, req.Kind) } + m.dequeue() + if f, ok := m.responses[req.Kind]; ok { return f(reqTime, resTime, req, res) } @@ -168,6 +191,38 @@ func (m *Model) Step(reqTime int64, resTime int64, req *t_api.Request, res *t_ap return nil } +// 1) validate number of tasks enqueued are somewhat correct? +func (m *Model) dequeue() { + for { + select { + case task, ok := <-m.taskQueue: + if !ok { + // queue closed + return + } + + slog.Info("dequeue", "task", task) + + submission := m.taskQueueSubmissions.Get(task.TaskId) + if submission.taskQueueSubmission == nil { + if task.Counter != 1 { + panic("unexpected counter-1") + } + m.taskQueueSubmissions[task.TaskId] = &TaskQueueSubmissionModel{ + id: task.TaskId, + taskQueueSubmission: task, + } + continue + } + if submission.taskQueueSubmission.Counter >= task.Counter { + panic("unexpected counter-2") + } + default: // no more requests + return + } + } +} + // PROMISES func (m *Model) ValidateReadPromise(reqTime int64, resTime int64, req *t_api.Request, res *t_api.Response) error { From bab6fc06988ef20daff05c77a9ef4d793a3e22c8 Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Fri, 31 May 2024 11:24:00 -0400 Subject: [PATCH 2/2] feat(back-channel): clean up --- cmd/dst/run.go | 2 +- test/dst/dst.go | 3 ++- test/dst/dst_test.go | 26 +++++++++++++------------- test/dst/model.go | 15 ++++++++------- 4 files changed, 24 insertions(+), 22 deletions(-) diff --git a/cmd/dst/run.go b/cmd/dst/run.go index a73e5e1a..c2df3ea4 100644 --- a/cmd/dst/run.go +++ b/cmd/dst/run.go @@ -29,7 +29,7 @@ import ( func RunDSTCmd() *cobra.Command { var ( seed int64 - ticks int64 // make hire... + ticks int64 scenario string reqsPerTick = util.RangeIntFlag{Min: 1, Max: 1000} ids = util.RangeIntFlag{Min: 1, Max: 1000} diff --git a/test/dst/dst.go b/test/dst/dst.go index 426a5961..4e96f871 100644 --- a/test/dst/dst.go +++ b/test/dst/dst.go @@ -2,6 +2,7 @@ package dst import ( "fmt" + "log/slog" "math/rand" // nosemgrep "strconv" @@ -156,7 +157,7 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System, if modelErr != nil { errs = append(errs, modelErr) } - // slog.Info("DST", "t", fmt.Sprintf("%d|%d", reqTime, resTime), "tid", metadata.TransactionId, "req", req, "res", res, "err", err, "ok", modelErr == nil) + slog.Info("DST", "t", fmt.Sprintf("%d|%d", reqTime, resTime), "tid", metadata.TransactionId, "req", req, "res", res, "err", err, "ok", modelErr == nil) }, }) diff --git a/test/dst/dst_test.go b/test/dst/dst_test.go index a42cdff7..5fdf4c2a 100644 --- a/test/dst/dst_test.go +++ b/test/dst/dst_test.go @@ -23,19 +23,19 @@ func TestDST(t *testing.T) { test(t, &Scenario{Kind: Default}) } -// func TestDSTFaultInjection(t *testing.T) { -// test(t, &Scenario{ -// Kind: FaultInjection, -// FaultInjection: &FaultInjectionScenario{P: 0.5}, -// }) -// } - -// func TestDSTLazyTimeout(t *testing.T) { -// test(t, &Scenario{ -// Kind: LazyTimeout, -// LazyTimeout: &LazyTimeoutScenario{}, -// }) -// } +func TestDSTFaultInjection(t *testing.T) { + test(t, &Scenario{ + Kind: FaultInjection, + FaultInjection: &FaultInjectionScenario{P: 0.5}, + }) +} + +func TestDSTLazyTimeout(t *testing.T) { + test(t, &Scenario{ + Kind: LazyTimeout, + LazyTimeout: &LazyTimeoutScenario{}, + }) +} func test(t *testing.T, scenario *Scenario) { r := rand.New(rand.NewSource(0)) diff --git a/test/dst/model.go b/test/dst/model.go index f69241d7..c28dd644 100644 --- a/test/dst/model.go +++ b/test/dst/model.go @@ -182,7 +182,9 @@ func (m *Model) Step(reqTime int64, resTime int64, req *t_api.Request, res *t_ap return fmt.Errorf("unexpected response kind '%d' for request kind '%d'", res.Kind, req.Kind) } - m.dequeue() + if err := m.dequeue(); err != nil { + return err + } if f, ok := m.responses[req.Kind]; ok { return f(reqTime, resTime, req, res) @@ -191,14 +193,13 @@ func (m *Model) Step(reqTime int64, resTime int64, req *t_api.Request, res *t_ap return nil } -// 1) validate number of tasks enqueued are somewhat correct? -func (m *Model) dequeue() { +func (m *Model) dequeue() error { for { select { case task, ok := <-m.taskQueue: if !ok { // queue closed - return + return nil } slog.Info("dequeue", "task", task) @@ -206,7 +207,7 @@ func (m *Model) dequeue() { submission := m.taskQueueSubmissions.Get(task.TaskId) if submission.taskQueueSubmission == nil { if task.Counter != 1 { - panic("unexpected counter-1") + return fmt.Errorf("unexpected counter %d, expected 1", task.Counter) } m.taskQueueSubmissions[task.TaskId] = &TaskQueueSubmissionModel{ id: task.TaskId, @@ -215,10 +216,10 @@ func (m *Model) dequeue() { continue } if submission.taskQueueSubmission.Counter >= task.Counter { - panic("unexpected counter-2") + return fmt.Errorf("unexpected counter %d, expected %d", submission.taskQueueSubmission.Counter, task.Counter) } default: // no more requests - return + return nil } } }