Skip to content

Commit

Permalink
Refactor (#76)
Browse files Browse the repository at this point in the history
types -> t_api and t_aio
  • Loading branch information
dfarr authored Oct 4, 2023
1 parent 929fbcf commit 3884dc6
Show file tree
Hide file tree
Showing 50 changed files with 1,895 additions and 1,923 deletions.
42 changes: 21 additions & 21 deletions cmd/dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/resonatehq/resonate/internal/api"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/network"
"github.com/resonatehq/resonate/internal/kernel/system"
"github.com/resonatehq/resonate/internal/kernel/types"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/metrics"
"github.com/resonatehq/resonate/test/dst"
"github.com/spf13/cobra"
Expand All @@ -29,15 +29,15 @@ var (
seed int64

// run command
ticks int64
reqsPerTick = rangeIntFlag{Min: 1, Max: 1000}
ids = rangeIntFlag{Min: 1, Max: 1000}
ikeys = rangeIntFlag{Min: 1, Max: 1000}
data = rangeIntFlag{Min: 1, Max: 1000}
headers = rangeIntFlag{Min: 1, Max: 1000}
tags = rangeIntFlag{Min: 1, Max: 1000}
urls = rangeIntFlag{Min: 1, Max: 1000}
retries = rangeIntFlag{Min: 1, Max: 1000}
ticks int64
reqsPerTick = rangeIntFlag{Min: 1, Max: 1000}
ids = rangeIntFlag{Min: 1, Max: 1000}
idempotencyKeys = rangeIntFlag{Min: 1, Max: 1000}
headers = rangeIntFlag{Min: 1, Max: 1000}
data = rangeIntFlag{Min: 1, Max: 1000}
tags = rangeIntFlag{Min: 1, Max: 1000}
urls = rangeIntFlag{Min: 1, Max: 1000}
retries = rangeIntFlag{Min: 1, Max: 1000}

// issue command
store string
Expand Down Expand Up @@ -128,8 +128,8 @@ var dstRunCmd = &cobra.Command{
}

// add api subsystems
aio.AddSubsystem(types.Network, network)
aio.AddSubsystem(types.Store, store)
aio.AddSubsystem(t_aio.Network, network)
aio.AddSubsystem(t_aio.Store, store)

// start api/aio
if err := api.Start(); err != nil {
Expand All @@ -147,13 +147,13 @@ var dstRunCmd = &cobra.Command{
Reqs: func() int {
return reqsPerTick.Resolve(r)
},
Ids: ids.Resolve(r),
Ikeys: ikeys.Resolve(r),
Data: data.Resolve(r),
Headers: headers.Resolve(r),
Tags: tags.Resolve(r),
Urls: urls.Resolve(r),
Retries: retries.Resolve(r),
Ids: ids.Resolve(r),
IdempotencyKeys: idempotencyKeys.Resolve(r),
Headers: headers.Resolve(r),
Data: data.Resolve(r),
Tags: tags.Resolve(r),
Urls: urls.Resolve(r),
Retries: retries.Resolve(r),
})

slog.Info("DST", "seed", seed, "ticks", ticks, "reqs", reqsPerTick.String(), "dst", dst, "system", system)
Expand Down Expand Up @@ -215,9 +215,9 @@ func init() {
// dst related values
dstRunCmd.Flags().Var(&reqsPerTick, "reqs-per-tick", "number of requests per tick")
dstRunCmd.Flags().Var(&ids, "ids", "number promise ids")
dstRunCmd.Flags().Var(&ikeys, "ikeys", "number promise idempotency keys")
dstRunCmd.Flags().Var(&data, "data", "number promise data byte arrays")
dstRunCmd.Flags().Var(&idempotencyKeys, "idempotency-keys", "number promise idempotency keys")
dstRunCmd.Flags().Var(&headers, "headers", "number promise headers")
dstRunCmd.Flags().Var(&data, "data", "number promise data byte arrays")
dstRunCmd.Flags().Var(&tags, "tags", "number promise tags")
dstRunCmd.Flags().Var(&urls, "urls", "number subscription urls")
dstRunCmd.Flags().Var(&retries, "retries", "number subscription retries")
Expand Down
25 changes: 13 additions & 12 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
"github.com/resonatehq/resonate/internal/app/subsystems/api/grpc"
"github.com/resonatehq/resonate/internal/app/subsystems/api/http"
"github.com/resonatehq/resonate/internal/kernel/system"
"github.com/resonatehq/resonate/internal/kernel/types"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/kernel/t_api"
"github.com/resonatehq/resonate/internal/metrics"
"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand Down Expand Up @@ -62,8 +63,8 @@ var serveCmd = &cobra.Command{
api.AddSubsystem(grpc)

// add api subsystems
aio.AddSubsystem(types.Network, network, config.AIO.Subsystems.Network.Size, config.AIO.Subsystems.Network.BatchSize, config.AIO.Subsystems.Network.Workers)
aio.AddSubsystem(types.Store, store, config.AIO.Subsystems.Store.Size, config.AIO.Subsystems.Store.BatchSize, config.AIO.Subsystems.Store.Workers)
aio.AddSubsystem(t_aio.Network, network, config.AIO.Subsystems.Network.Size, config.AIO.Subsystems.Network.BatchSize, config.AIO.Subsystems.Network.Workers)
aio.AddSubsystem(t_aio.Store, store, config.AIO.Subsystems.Store.Size, config.AIO.Subsystems.Store.BatchSize, config.AIO.Subsystems.Store.Workers)

// start api/aio
if err := api.Start(); err != nil {
Expand All @@ -77,15 +78,15 @@ var serveCmd = &cobra.Command{

// instantiate system
system := system.New(api, aio, config.System, metrics)
system.AddOnRequest(types.ReadPromise, coroutines.ReadPromise)
system.AddOnRequest(types.SearchPromises, coroutines.SearchPromises)
system.AddOnRequest(types.CreatePromise, coroutines.CreatePromise)
system.AddOnRequest(types.ResolvePromise, coroutines.ResolvePromise)
system.AddOnRequest(types.RejectPromise, coroutines.RejectPromise)
system.AddOnRequest(types.CancelPromise, coroutines.CancelPromise)
system.AddOnRequest(types.ReadSubscriptions, coroutines.ReadSubscriptions)
system.AddOnRequest(types.CreateSubscription, coroutines.CreateSubscription)
system.AddOnRequest(types.DeleteSubscription, coroutines.DeleteSubscription)
system.AddOnRequest(t_api.ReadPromise, coroutines.ReadPromise)
system.AddOnRequest(t_api.SearchPromises, coroutines.SearchPromises)
system.AddOnRequest(t_api.CreatePromise, coroutines.CreatePromise)
system.AddOnRequest(t_api.ResolvePromise, coroutines.ResolvePromise)
system.AddOnRequest(t_api.RejectPromise, coroutines.RejectPromise)
system.AddOnRequest(t_api.CancelPromise, coroutines.CancelPromise)
system.AddOnRequest(t_api.ReadSubscriptions, coroutines.ReadSubscriptions)
system.AddOnRequest(t_api.CreateSubscription, coroutines.CreateSubscription)
system.AddOnRequest(t_api.DeleteSubscription, coroutines.DeleteSubscription)
system.AddOnTick(2, coroutines.TimeoutPromises)
system.AddOnTick(1, coroutines.NotifySubscriptions)

Expand Down
41 changes: 21 additions & 20 deletions internal/aio/aio.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,53 +4,54 @@ import (
"fmt"
"log/slog"

"github.com/resonatehq/resonate/internal/kernel/t_aio"

"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/types"
"github.com/resonatehq/resonate/internal/metrics"
"github.com/resonatehq/resonate/internal/util"
)

type AIO interface {
String() string
Enqueue(*bus.SQE[types.Submission, types.Completion])
Dequeue(int) []*bus.CQE[types.Submission, types.Completion]
Enqueue(*bus.SQE[t_aio.Submission, t_aio.Completion])
Dequeue(int) []*bus.CQE[t_aio.Submission, t_aio.Completion]
Done() bool
Flush(int64)
}

type aio struct {
cq chan *bus.CQE[types.Submission, types.Completion]
subsystems map[types.AIOKind]*subsystemWrapper
cq chan *bus.CQE[t_aio.Submission, t_aio.Completion]
subsystems map[t_aio.Kind]*subsystemWrapper
done bool
errors chan error
metrics *metrics.Metrics
}

type subsystemWrapper struct {
Subsystem
sq chan<- *bus.SQE[types.Submission, types.Completion]
sq chan<- *bus.SQE[t_aio.Submission, t_aio.Completion]
workers []*workerWrapper
}

type workerWrapper struct {
Worker
sq <-chan *bus.SQE[types.Submission, types.Completion]
cq chan<- *bus.CQE[types.Submission, types.Completion]
sq <-chan *bus.SQE[t_aio.Submission, t_aio.Completion]
cq chan<- *bus.CQE[t_aio.Submission, t_aio.Completion]
max int
flushCh chan int64
}

func New(size int, metrics *metrics.Metrics) *aio {
return &aio{
cq: make(chan *bus.CQE[types.Submission, types.Completion], size),
subsystems: map[types.AIOKind]*subsystemWrapper{},
cq: make(chan *bus.CQE[t_aio.Submission, t_aio.Completion], size),
subsystems: map[t_aio.Kind]*subsystemWrapper{},
errors: make(chan error),
metrics: metrics,
}
}

func (a *aio) AddSubsystem(kind types.AIOKind, subsystem Subsystem, size int, max int, n int) {
sq := make(chan *bus.SQE[types.Submission, types.Completion], size)
func (a *aio) AddSubsystem(kind t_aio.Kind, subsystem Subsystem, size int, max int, n int) {
sq := make(chan *bus.SQE[t_aio.Submission, t_aio.Completion], size)
workers := make([]*workerWrapper, n)

for i := 0; i < n; i++ {
Expand Down Expand Up @@ -106,12 +107,12 @@ func (a *aio) Errors() <-chan error {
return a.errors
}

func (a *aio) Enqueue(sqe *bus.SQE[types.Submission, types.Completion]) {
func (a *aio) Enqueue(sqe *bus.SQE[t_aio.Submission, t_aio.Completion]) {
if subsystem, ok := a.subsystems[sqe.Submission.Kind]; ok {
select {
case subsystem.sq <- sqe:
slog.Debug("aio:enqueue", "sqe", sqe.Submission)
a.metrics.AioInFlight.WithLabelValues(sqe.Kind).Inc()
a.metrics.AioInFlight.WithLabelValues(sqe.Tags).Inc()
default:
sqe.Callback(0, nil, fmt.Errorf("aio:subsystem:%s submission queue full", subsystem))
}
Expand All @@ -120,8 +121,8 @@ func (a *aio) Enqueue(sqe *bus.SQE[types.Submission, types.Completion]) {
}
}

func (a *aio) Dequeue(n int) []*bus.CQE[types.Submission, types.Completion] {
cqes := []*bus.CQE[types.Submission, types.Completion]{}
func (a *aio) Dequeue(n int) []*bus.CQE[t_aio.Submission, t_aio.Completion] {
cqes := []*bus.CQE[t_aio.Submission, t_aio.Completion]{}

// collects n entries or until the channel is
// exhausted, whichever happens first
Expand All @@ -140,8 +141,8 @@ func (a *aio) Dequeue(n int) []*bus.CQE[types.Submission, types.Completion] {
}

slog.Debug("aio:dequeue", "cqe", cqe.Completion)
a.metrics.AioTotal.WithLabelValues(cqe.Kind, status).Inc()
a.metrics.AioInFlight.WithLabelValues(cqe.Kind).Dec()
a.metrics.AioTotal.WithLabelValues(cqe.Tags, status).Inc()
a.metrics.AioInFlight.WithLabelValues(cqe.Tags).Dec()

cqes = append(cqes, cqe)
default:
Expand Down Expand Up @@ -193,8 +194,8 @@ func (w *workerWrapper) flush(t int64) {
}
}

func (w *workerWrapper) collect() ([]*bus.SQE[types.Submission, types.Completion], bool) {
sqes := []*bus.SQE[types.Submission, types.Completion]{}
func (w *workerWrapper) collect() ([]*bus.SQE[t_aio.Submission, t_aio.Completion], bool) {
sqes := []*bus.SQE[t_aio.Submission, t_aio.Completion]{}

for i := 0; i < w.max; i++ {
select {
Expand Down
25 changes: 13 additions & 12 deletions internal/aio/aio_dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,28 @@ import (
"fmt"
"math/rand" // nosemgrep

"github.com/resonatehq/resonate/internal/kernel/t_aio"

"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/types"
"github.com/resonatehq/resonate/internal/util"
)

type aioDST struct {
r *rand.Rand
sqes []*bus.SQE[types.Submission, types.Completion]
cqes []*bus.CQE[types.Submission, types.Completion]
subsystems map[types.AIOKind]Subsystem
sqes []*bus.SQE[t_aio.Submission, t_aio.Completion]
cqes []*bus.CQE[t_aio.Submission, t_aio.Completion]
subsystems map[t_aio.Kind]Subsystem
done bool
}

func NewDST(r *rand.Rand) *aioDST {
return &aioDST{
r: r,
subsystems: map[types.AIOKind]Subsystem{},
subsystems: map[t_aio.Kind]Subsystem{},
}
}

func (a *aioDST) AddSubsystem(kind types.AIOKind, subsystem Subsystem) {
func (a *aioDST) AddSubsystem(kind t_aio.Kind, subsystem Subsystem) {
a.subsystems[kind] = subsystem
}

Expand Down Expand Up @@ -56,20 +57,20 @@ func (a *aioDST) Done() bool {
return a.done
}

func (a *aioDST) Enqueue(sqe *bus.SQE[types.Submission, types.Completion]) {
func (a *aioDST) Enqueue(sqe *bus.SQE[t_aio.Submission, t_aio.Completion]) {
i := a.r.Intn(len(a.sqes) + 1)
a.sqes = append(a.sqes[:i], append([]*bus.SQE[types.Submission, types.Completion]{sqe}, a.sqes[i:]...)...)
a.sqes = append(a.sqes[:i], append([]*bus.SQE[t_aio.Submission, t_aio.Completion]{sqe}, a.sqes[i:]...)...)
}

func (a *aioDST) Dequeue(n int) []*bus.CQE[types.Submission, types.Completion] {
func (a *aioDST) Dequeue(n int) []*bus.CQE[t_aio.Submission, t_aio.Completion] {
cqes := a.cqes[:min(n, len(a.cqes))]
a.cqes = a.cqes[min(n, len(a.cqes)):]

return cqes
}

func (a *aioDST) Flush(t int64) {
flush := map[types.AIOKind][]*bus.SQE[types.Submission, types.Completion]{}
flush := map[t_aio.Kind][]*bus.SQE[t_aio.Submission, t_aio.Completion]{}
for _, sqe := range a.sqes {
flush[sqe.Submission.Kind] = append(flush[sqe.Submission.Kind], sqe)
}
Expand All @@ -87,8 +88,8 @@ func (a *aioDST) Flush(t int64) {

func (a *aioDST) String() string {
// use subsystem keys so that we can compare cross-store dst runs
subsystems := make([]types.AIOKind, len(a.subsystems))
for i, subsystem := range util.OrderedRangeKV[types.AIOKind](a.subsystems) {
subsystems := make([]t_aio.Kind, len(a.subsystems))
for i, subsystem := range util.OrderedRangeKV[t_aio.Kind](a.subsystems) {
subsystems[i] = subsystem.Key
}

Expand Down
4 changes: 2 additions & 2 deletions internal/aio/subsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package aio

import (
"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/types"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
)

type Subsystem interface {
Expand All @@ -14,5 +14,5 @@ type Subsystem interface {
}

type Worker interface {
Process([]*bus.SQE[types.Submission, types.Completion]) []*bus.CQE[types.Submission, types.Completion]
Process([]*bus.SQE[t_aio.Submission, t_aio.Completion]) []*bus.CQE[t_aio.Submission, t_aio.Completion]
}
Loading

0 comments on commit 3884dc6

Please sign in to comment.