Skip to content

Commit

Permalink
feat(back-channel): verify the task submissions (#338)
Browse files Browse the repository at this point in the history
* feat(back-channel): wip

* feat(back-channel): clean up

---------

Co-authored-by: Gabriel Guerra <[email protected]>
  • Loading branch information
guergabo and Gabriel Guerra authored May 31, 2024
1 parent 936c333 commit 32bd3b7
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 20 deletions.
6 changes: 5 additions & 1 deletion cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ func RunDSTCmd() *cobra.Command {
if err != nil {
return err
}

reqsPerTickSize := reqsPerTick.Resolve(r)
config.AIO.Subsystems.QueuingDST.Config.Queue = make(chan *queuing.ConnectionSubmissionDST, reqsPerTickSize)
queuing, err := queuing.NewDST(config.AIO.Subsystems.QueuingDST.Config, rand.New(rand.NewSource(r.Int63())))
if err != nil {
return err
Expand Down Expand Up @@ -191,10 +194,11 @@ func RunDSTCmd() *cobra.Command {

dst := dst.New(&dst.Config{
Scenario: dstScenario,
TaskQueue: config.AIO.Subsystems.QueuingDST.Config.Queue,
Ticks: ticks,
TimeElapsedPerTick: 50_000, // milliseconds
Reqs: func() int {
return reqsPerTick.Resolve(r)
return reqsPerTickSize
},
Ids: ids.Resolve(r),
IdempotencyKeys: idempotencyKeys.Resolve(r),
Expand Down
16 changes: 15 additions & 1 deletion internal/app/subsystems/aio/queuing/queuing_dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (

type (
ConfigDST struct {
P float32
P float32
Queue chan *ConnectionSubmissionDST
}

QueuingSubsystemDST struct {
Expand All @@ -23,6 +24,12 @@ type (
config *ConfigDST
r *rand.Rand
}

ConnectionSubmissionDST struct {
Queue string `json:"queue"`
TaskId string `json:"taskid"`
Counter int `json:"counter"`
}
)

// NewDST is a simple helper functions that wraps New and returns a pre-configured QueuingSubsystem.
Expand All @@ -49,6 +56,7 @@ func (q *QueuingSubsystemDST) Start() error {
}

func (q *QueuingSubsystemDST) Stop() error {
close(q.config.Queue)
return nil
}

Expand All @@ -69,6 +77,12 @@ func (w *QueuingWorkerDST) Process(sqes []*bus.SQE[t_aio.Submission, t_aio.Compl
for i, sqe := range sqes {
util.Assert(sqe.Submission.Queuing != nil, "submission must not be nil")

w.config.Queue <- &ConnectionSubmissionDST{
Queue: "analytics",
TaskId: sqe.Submission.Queuing.TaskId,
Counter: sqe.Submission.Queuing.Counter,
}

cqe := &bus.CQE[t_aio.Submission, t_aio.Completion]{
Metadata: sqe.Metadata,
Callback: sqe.Callback,
Expand Down
4 changes: 3 additions & 1 deletion test/dst/dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/api"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing"
"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/metadata"
"github.com/resonatehq/resonate/internal/kernel/system"
Expand All @@ -20,6 +21,7 @@ type DST struct {

type Config struct {
Scenario *Scenario
TaskQueue chan *queuing.ConnectionSubmissionDST
Ticks int64
TimeElapsedPerTick int64
Reqs func() int
Expand Down Expand Up @@ -66,7 +68,7 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System,
generator := NewGenerator(r, d.config)

// model
model := NewModel(d.config.Scenario)
model := NewModel(d.config.Scenario, d.config.TaskQueue)

// add req/res
for _, req := range reqs {
Expand Down
10 changes: 7 additions & 3 deletions test/dst/dst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ func test(t *testing.T, scenario *Scenario) {
if err != nil {
t.Fatal(err)
}
queuing, err := queuing.NewDST(&queuing.ConfigDST{P: 0.5}, r)

reqsPerTickSize := 100
taskQueue := make(chan *queuing.ConnectionSubmissionDST, reqsPerTickSize)
queuing, err := queuing.NewDST(&queuing.ConfigDST{P: 0.5, Queue: taskQueue}, r)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -147,9 +150,9 @@ func test(t *testing.T, scenario *Scenario) {
}

dst := New(&Config{
Ticks: 1000,
Ticks: 3_000,
TimeElapsedPerTick: 50_000, // milliseconds
Reqs: func() int { return 100 },
Reqs: func() int { return reqsPerTickSize },
Ids: 100,
IdempotencyKeys: 100,
Headers: 100,
Expand All @@ -158,6 +161,7 @@ func test(t *testing.T, scenario *Scenario) {
Urls: 100,
Retries: 100,
Scenario: scenario,
TaskQueue: taskQueue,
})

if errs := dst.Run(r, api, aio, system, reqs); len(errs) > 0 {
Expand Down
84 changes: 70 additions & 14 deletions test/dst/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package dst
import (
"errors"
"fmt"
"log/slog"
"regexp"
"strings"

Expand All @@ -21,13 +22,15 @@ import (
// Model

type Model struct {
scenario *Scenario
promises Promises
schedules Schedules
locks Locks
tasks Tasks
cursors []*t_api.Request
responses map[t_api.Kind]ResponseValidator
scenario *Scenario
promises Promises
schedules Schedules
locks Locks
tasks Tasks
taskQueueSubmissions TaskQueueSubmissions
cursors []*t_api.Request
responses map[t_api.Kind]ResponseValidator
taskQueue chan *queuing.ConnectionSubmissionDST
}

type PromiseModel struct {
Expand Down Expand Up @@ -56,11 +59,17 @@ type TaskModel struct {
task *task.Task
}

type TaskQueueSubmissionModel struct {
id string
taskQueueSubmission *queuing.ConnectionSubmissionDST
}

type Promises map[string]*PromiseModel
type Schedules map[string]*ScheduleModel
type Subscriptions map[string]*SubscriptionModel
type Locks map[string]*LockModel
type Tasks map[string]*TaskModel
type TaskQueueSubmissions map[string]*TaskQueueSubmissionModel
type ResponseValidator func(int64, int64, *t_api.Request, *t_api.Response) error

func (p Promises) Get(id string) *PromiseModel {
Expand Down Expand Up @@ -114,14 +123,26 @@ func (t Tasks) Get(id string) *TaskModel {
return t[id]
}

func NewModel(scenario *Scenario) *Model {
func (t TaskQueueSubmissions) Get(id string) *TaskQueueSubmissionModel {
if _, ok := t[id]; !ok {
t[id] = &TaskQueueSubmissionModel{
id: id,
}
}

return t[id]
}

func NewModel(scenario *Scenario, queue chan *queuing.ConnectionSubmissionDST) *Model {
return &Model{
scenario: scenario,
promises: map[string]*PromiseModel{},
schedules: map[string]*ScheduleModel{},
locks: map[string]*LockModel{},
tasks: map[string]*TaskModel{},
responses: map[t_api.Kind]ResponseValidator{},
scenario: scenario,
promises: map[string]*PromiseModel{},
schedules: map[string]*ScheduleModel{},
locks: map[string]*LockModel{},
tasks: map[string]*TaskModel{},
taskQueueSubmissions: map[string]*TaskQueueSubmissionModel{},
responses: map[t_api.Kind]ResponseValidator{},
taskQueue: queue,
}
}

Expand Down Expand Up @@ -161,13 +182,48 @@ func (m *Model) Step(reqTime int64, resTime int64, req *t_api.Request, res *t_ap
return fmt.Errorf("unexpected response kind '%d' for request kind '%d'", res.Kind, req.Kind)
}

if err := m.dequeue(); err != nil {
return err
}

if f, ok := m.responses[req.Kind]; ok {
return f(reqTime, resTime, req, res)
}

return nil
}

func (m *Model) dequeue() error {
for {
select {
case task, ok := <-m.taskQueue:
if !ok {
// queue closed
return nil
}

slog.Info("dequeue", "task", task)

submission := m.taskQueueSubmissions.Get(task.TaskId)
if submission.taskQueueSubmission == nil {
if task.Counter != 1 {
return fmt.Errorf("unexpected counter %d, expected 1", task.Counter)
}
m.taskQueueSubmissions[task.TaskId] = &TaskQueueSubmissionModel{
id: task.TaskId,
taskQueueSubmission: task,
}
continue
}
if submission.taskQueueSubmission.Counter >= task.Counter {
return fmt.Errorf("unexpected counter %d, expected %d", submission.taskQueueSubmission.Counter, task.Counter)
}
default: // no more requests
return nil
}
}
}

// PROMISES

func (m *Model) ValidateReadPromise(reqTime int64, resTime int64, req *t_api.Request, res *t_api.Response) error {
Expand Down

0 comments on commit 32bd3b7

Please sign in to comment.