Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(back-channel): verify the task submissions #338

Merged
merged 2 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ func RunDSTCmd() *cobra.Command {
if err != nil {
return err
}

reqsPerTickSize := reqsPerTick.Resolve(r)
config.AIO.Subsystems.QueuingDST.Config.Queue = make(chan *queuing.ConnectionSubmissionDST, reqsPerTickSize)
queuing, err := queuing.NewDST(config.AIO.Subsystems.QueuingDST.Config, rand.New(rand.NewSource(r.Int63())))
if err != nil {
return err
Expand Down Expand Up @@ -191,10 +194,11 @@ func RunDSTCmd() *cobra.Command {

dst := dst.New(&dst.Config{
Scenario: dstScenario,
TaskQueue: config.AIO.Subsystems.QueuingDST.Config.Queue,
Ticks: ticks,
TimeElapsedPerTick: 50_000, // milliseconds
Reqs: func() int {
return reqsPerTick.Resolve(r)
return reqsPerTickSize
},
Ids: ids.Resolve(r),
IdempotencyKeys: idempotencyKeys.Resolve(r),
Expand Down
16 changes: 15 additions & 1 deletion internal/app/subsystems/aio/queuing/queuing_dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (

type (
ConfigDST struct {
P float32
P float32
Queue chan *ConnectionSubmissionDST
}

QueuingSubsystemDST struct {
Expand All @@ -23,6 +24,12 @@ type (
config *ConfigDST
r *rand.Rand
}

ConnectionSubmissionDST struct {
Queue string `json:"queue"`
TaskId string `json:"taskid"`
Counter int `json:"counter"`
}
)

// NewDST is a simple helper functions that wraps New and returns a pre-configured QueuingSubsystem.
Expand All @@ -49,6 +56,7 @@ func (q *QueuingSubsystemDST) Start() error {
}

func (q *QueuingSubsystemDST) Stop() error {
close(q.config.Queue)
return nil
}

Expand All @@ -69,6 +77,12 @@ func (w *QueuingWorkerDST) Process(sqes []*bus.SQE[t_aio.Submission, t_aio.Compl
for i, sqe := range sqes {
util.Assert(sqe.Submission.Queuing != nil, "submission must not be nil")

w.config.Queue <- &ConnectionSubmissionDST{
Queue: "analytics",
TaskId: sqe.Submission.Queuing.TaskId,
Counter: sqe.Submission.Queuing.Counter,
}

cqe := &bus.CQE[t_aio.Submission, t_aio.Completion]{
Metadata: sqe.Metadata,
Callback: sqe.Callback,
Expand Down
4 changes: 3 additions & 1 deletion test/dst/dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/api"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing"
"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/metadata"
"github.com/resonatehq/resonate/internal/kernel/system"
Expand All @@ -20,6 +21,7 @@ type DST struct {

type Config struct {
Scenario *Scenario
TaskQueue chan *queuing.ConnectionSubmissionDST
Ticks int64
TimeElapsedPerTick int64
Reqs func() int
Expand Down Expand Up @@ -66,7 +68,7 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System,
generator := NewGenerator(r, d.config)

// model
model := NewModel(d.config.Scenario)
model := NewModel(d.config.Scenario, d.config.TaskQueue)

// add req/res
for _, req := range reqs {
Expand Down
10 changes: 7 additions & 3 deletions test/dst/dst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ func test(t *testing.T, scenario *Scenario) {
if err != nil {
t.Fatal(err)
}
queuing, err := queuing.NewDST(&queuing.ConfigDST{P: 0.5}, r)

reqsPerTickSize := 100
taskQueue := make(chan *queuing.ConnectionSubmissionDST, reqsPerTickSize)
queuing, err := queuing.NewDST(&queuing.ConfigDST{P: 0.5, Queue: taskQueue}, r)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -147,9 +150,9 @@ func test(t *testing.T, scenario *Scenario) {
}

dst := New(&Config{
Ticks: 1000,
Ticks: 3_000,
TimeElapsedPerTick: 50_000, // milliseconds
Reqs: func() int { return 100 },
Reqs: func() int { return reqsPerTickSize },
Ids: 100,
IdempotencyKeys: 100,
Headers: 100,
Expand All @@ -158,6 +161,7 @@ func test(t *testing.T, scenario *Scenario) {
Urls: 100,
Retries: 100,
Scenario: scenario,
TaskQueue: taskQueue,
})

if errs := dst.Run(r, api, aio, system, reqs); len(errs) > 0 {
Expand Down
84 changes: 70 additions & 14 deletions test/dst/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import (
"errors"
"fmt"
"log/slog"
"regexp"
"strings"

Expand All @@ -21,13 +22,15 @@
// Model

type Model struct {
scenario *Scenario
promises Promises
schedules Schedules
locks Locks
tasks Tasks
cursors []*t_api.Request
responses map[t_api.Kind]ResponseValidator
scenario *Scenario
promises Promises
schedules Schedules
locks Locks
tasks Tasks
taskQueueSubmissions TaskQueueSubmissions
cursors []*t_api.Request
responses map[t_api.Kind]ResponseValidator
taskQueue chan *queuing.ConnectionSubmissionDST
}

type PromiseModel struct {
Expand Down Expand Up @@ -56,11 +59,17 @@
task *task.Task
}

type TaskQueueSubmissionModel struct {
id string
taskQueueSubmission *queuing.ConnectionSubmissionDST
}

type Promises map[string]*PromiseModel
type Schedules map[string]*ScheduleModel
type Subscriptions map[string]*SubscriptionModel
type Locks map[string]*LockModel
type Tasks map[string]*TaskModel
type TaskQueueSubmissions map[string]*TaskQueueSubmissionModel
type ResponseValidator func(int64, int64, *t_api.Request, *t_api.Response) error

func (p Promises) Get(id string) *PromiseModel {
Expand Down Expand Up @@ -114,14 +123,26 @@
return t[id]
}

func NewModel(scenario *Scenario) *Model {
func (t TaskQueueSubmissions) Get(id string) *TaskQueueSubmissionModel {
if _, ok := t[id]; !ok {
t[id] = &TaskQueueSubmissionModel{
id: id,
}
}

return t[id]
}

func NewModel(scenario *Scenario, queue chan *queuing.ConnectionSubmissionDST) *Model {
return &Model{
scenario: scenario,
promises: map[string]*PromiseModel{},
schedules: map[string]*ScheduleModel{},
locks: map[string]*LockModel{},
tasks: map[string]*TaskModel{},
responses: map[t_api.Kind]ResponseValidator{},
scenario: scenario,
promises: map[string]*PromiseModel{},
schedules: map[string]*ScheduleModel{},
locks: map[string]*LockModel{},
tasks: map[string]*TaskModel{},
taskQueueSubmissions: map[string]*TaskQueueSubmissionModel{},
responses: map[t_api.Kind]ResponseValidator{},
taskQueue: queue,
}
}

Expand Down Expand Up @@ -161,13 +182,48 @@
return fmt.Errorf("unexpected response kind '%d' for request kind '%d'", res.Kind, req.Kind)
}

if err := m.dequeue(); err != nil {
return err

Check warning on line 186 in test/dst/model.go

View check run for this annotation

Codecov / codecov/patch

test/dst/model.go#L186

Added line #L186 was not covered by tests
}

if f, ok := m.responses[req.Kind]; ok {
return f(reqTime, resTime, req, res)
}

return nil
}

func (m *Model) dequeue() error {
for {
select {
case task, ok := <-m.taskQueue:
if !ok {
// queue closed
return nil

Check warning on line 202 in test/dst/model.go

View check run for this annotation

Codecov / codecov/patch

test/dst/model.go#L202

Added line #L202 was not covered by tests
}

slog.Info("dequeue", "task", task)

submission := m.taskQueueSubmissions.Get(task.TaskId)
if submission.taskQueueSubmission == nil {
if task.Counter != 1 {
return fmt.Errorf("unexpected counter %d, expected 1", task.Counter)

Check warning on line 210 in test/dst/model.go

View check run for this annotation

Codecov / codecov/patch

test/dst/model.go#L210

Added line #L210 was not covered by tests
}
m.taskQueueSubmissions[task.TaskId] = &TaskQueueSubmissionModel{
id: task.TaskId,
taskQueueSubmission: task,
}
continue
}
if submission.taskQueueSubmission.Counter >= task.Counter {
return fmt.Errorf("unexpected counter %d, expected %d", submission.taskQueueSubmission.Counter, task.Counter)

Check warning on line 219 in test/dst/model.go

View check run for this annotation

Codecov / codecov/patch

test/dst/model.go#L219

Added line #L219 was not covered by tests
}
default: // no more requests
return nil
}
}
}

// PROMISES

func (m *Model) ValidateReadPromise(reqTime int64, resTime int64, req *t_api.Request, res *t_api.Response) error {
Expand Down