diff --git a/cmd/serve.go b/cmd/serve.go index b7b8b29d..c16f6bd2 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -7,6 +7,7 @@ import ( "syscall" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/api" "github.com/resonatehq/resonate/internal/app/coroutines" @@ -16,6 +17,7 @@ import ( "github.com/resonatehq/resonate/internal/app/subsystems/api/http" "github.com/resonatehq/resonate/internal/kernel/system" "github.com/resonatehq/resonate/internal/kernel/types" + "github.com/resonatehq/resonate/internal/metrics" "github.com/spf13/cobra" ) @@ -28,6 +30,10 @@ var serveCmd = &cobra.Command{ Use: "serve", Short: "Start the durable promises server", RunE: func(cmd *cobra.Command, args []string) error { + // logger + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) + slog.SetDefault(logger) + // config cfg := &system.Config{ PromiseCacheSize: 100, @@ -37,12 +43,16 @@ var serveCmd = &cobra.Command{ CompletionBatchSize: 100, } + // instantiate metrics + reg := prometheus.NewRegistry() + metrics := metrics.New(reg) + // instatiate api/aio - api := api.New(100) - aio := aio.New(100) + api := api.New(100, metrics) + aio := aio.New(100, metrics) // instatiate api subsystems - http := http.New(api, httpAddr, 10*time.Second) + http := http.New(api, httpAddr, 10*time.Second, reg) grpc := grpc.New(api, grpcAddr) // instatiate aio subsystems @@ -69,7 +79,7 @@ var serveCmd = &cobra.Command{ } // instantiate system - system := system.New(cfg, api, aio) + system := system.New(cfg, api, aio, metrics) system.AddOnRequest(types.ReadPromise, coroutines.ReadPromise) system.AddOnRequest(types.SearchPromises, coroutines.SearchPromises) system.AddOnRequest(types.CreatePromise, coroutines.CreatePromise) @@ -82,6 +92,7 @@ var serveCmd = &cobra.Command{ system.AddOnTick(2, coroutines.TimeoutPromises) system.AddOnTick(1, coroutines.NotifySubscriptions) + // listen for shutdown signal go func() { sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) diff --git a/go.mod b/go.mod index c5f65ebd..d9d0a72f 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,9 @@ require ( ) require ( + github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/sonic v1.9.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect @@ -27,10 +29,15 @@ require ( github.com/klauspost/cpuid/v2 v2.2.4 // indirect github.com/leodido/go-urn v1.2.4 // indirect github.com/mattn/go-isatty v0.0.19 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_golang v1.16.0 // indirect + github.com/prometheus/client_model v0.3.0 // indirect + github.com/prometheus/common v0.42.0 // indirect + github.com/prometheus/procfs v0.10.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.11 // indirect diff --git a/go.sum b/go.sum index 1898c45e..b58c0601 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,10 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= @@ -24,6 +28,8 @@ github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= @@ -44,6 +50,8 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -53,6 +61,14 @@ github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZ github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= +github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= +github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= +github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= +github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= +github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= +github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= +github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= @@ -80,6 +96,7 @@ golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= diff --git a/internal/aio/aio.go b/internal/aio/aio.go index 87b9371b..85f0bd13 100644 --- a/internal/aio/aio.go +++ b/internal/aio/aio.go @@ -2,9 +2,11 @@ package aio import ( "fmt" + "log/slog" "github.com/resonatehq/resonate/internal/kernel/bus" "github.com/resonatehq/resonate/internal/kernel/types" + "github.com/resonatehq/resonate/internal/metrics" ) type AIO interface { @@ -19,6 +21,7 @@ type aio struct { subsystems map[types.AIOKind]*subsystemWrapper done bool errors chan error + metrics *metrics.Metrics } type subsystemWrapper struct { @@ -35,11 +38,12 @@ type workerWrapper struct { flushCh chan int64 } -func New(size int) *aio { +func New(size int, metrics *metrics.Metrics) *aio { return &aio{ cq: make(chan *bus.CQE[types.Submission, types.Completion], size), subsystems: map[types.AIOKind]*subsystemWrapper{}, errors: make(chan error), + metrics: metrics, } } @@ -104,6 +108,8 @@ func (a *aio) Enqueue(sqe *bus.SQE[types.Submission, types.Completion]) { if subsystem, ok := a.subsystems[sqe.Submission.Kind]; ok { select { case subsystem.sq <- sqe: + slog.Debug("aio:enqueue", "sqe", sqe.Submission) + a.metrics.AioInFlight.WithLabelValues(sqe.Submission.Kind.String()).Inc() default: sqe.Callback(nil, fmt.Errorf("aio:subsystem:%s submission queue full", subsystem)) } @@ -123,6 +129,18 @@ func (a *aio) Dequeue(n int) []*bus.CQE[types.Submission, types.Completion] { if !ok { return cqes } + + var status string + if cqe.Error != nil { + status = "failure" + } else { + status = "success" + } + + slog.Debug("aio:dequeue", "cqe", cqe.Completion) + a.metrics.AioTotal.WithLabelValues(cqe.Completion.Kind.String(), status).Inc() + a.metrics.AioInFlight.WithLabelValues(cqe.Completion.Kind.String()).Dec() + cqes = append(cqes, cqe) default: return cqes diff --git a/internal/api/api.go b/internal/api/api.go index 94e2d4b6..af9984a1 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -2,14 +2,16 @@ package api import ( "fmt" + "strconv" "time" "github.com/resonatehq/resonate/internal/kernel/bus" "github.com/resonatehq/resonate/internal/kernel/types" + "github.com/resonatehq/resonate/internal/metrics" ) type API interface { - Enqueue(*bus.SQE[types.Request, types.Response]) + Enqueue(string, *bus.SQE[types.Request, types.Response]) Dequeue(int, <-chan time.Time) []*bus.SQE[types.Request, types.Response] Done() bool } @@ -19,12 +21,14 @@ type api struct { subsystems []Subsystem done bool errors chan error + metrics *metrics.Metrics } -func New(size int) *api { +func New(size int, metrics *metrics.Metrics) *api { return &api{ - sq: make(chan *bus.SQE[types.Request, types.Response], size), - errors: make(chan error), + sq: make(chan *bus.SQE[types.Request, types.Response], size), + errors: make(chan error), + metrics: metrics, } } @@ -62,9 +66,40 @@ func (a *api) Errors() <-chan error { return a.errors } -func (a *api) Enqueue(sqe *bus.SQE[types.Request, types.Response]) { +func (a *api) Enqueue(kind string, sqe *bus.SQE[types.Request, types.Response]) { select { case a.sq <- sqe: + a.metrics.ApiInFlight.WithLabelValues(kind).Inc() + + callback := sqe.Callback + sqe.Callback = func(res *types.Response, err error) { + var status types.ResponseStatus + switch res.Kind { + case types.ReadPromise: + status = res.ReadPromise.Status + case types.SearchPromises: + status = res.SearchPromises.Status + case types.CreatePromise: + status = res.CreatePromise.Status + case types.CancelPromise: + status = res.CancelPromise.Status + case types.ResolvePromise: + status = res.ResolvePromise.Status + case types.RejectPromise: + status = res.RejectPromise.Status + case types.ReadSubscriptions: + status = res.ReadSubscriptions.Status + case types.CreateSubscription: + status = res.CreateSubscription.Status + case types.DeleteSubscription: + status = res.DeleteSubscription.Status + } + + a.metrics.ApiTotal.WithLabelValues(kind, strconv.Itoa(int(status))).Inc() + a.metrics.ApiInFlight.WithLabelValues(kind).Dec() + + callback(res, err) + } default: sqe.Callback(nil, fmt.Errorf("api submission queue full")) } diff --git a/internal/app/coroutines/cancelPromise.go b/internal/app/coroutines/cancelPromise.go index b5af4e21..17428917 100644 --- a/internal/app/coroutines/cancelPromise.go +++ b/internal/app/coroutines/cancelPromise.go @@ -11,7 +11,7 @@ import ( ) func CancelPromise(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine { - return scheduler.NewCoroutine(fmt.Sprintf("CancelPromise(id=%s)", req.CancelPromise.Id), func(s *scheduler.Scheduler, c *scheduler.Coroutine) { + return scheduler.NewCoroutine(fmt.Sprintf("CancelPromise(id=%s)", req.CancelPromise.Id), "CancelPromise", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { if req.CancelPromise.Value.Headers == nil { req.CancelPromise.Value.Headers = map[string]string{} } diff --git a/internal/app/coroutines/createPromise.go b/internal/app/coroutines/createPromise.go index ff3fe6ac..8f0c6067 100644 --- a/internal/app/coroutines/createPromise.go +++ b/internal/app/coroutines/createPromise.go @@ -12,7 +12,7 @@ import ( ) func CreatePromise(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine { - return scheduler.NewCoroutine(fmt.Sprintf("CreatePromise(id=%s)", req.CreatePromise.Id), func(s *scheduler.Scheduler, c *scheduler.Coroutine) { + return scheduler.NewCoroutine(fmt.Sprintf("CreatePromise(id=%s)", req.CreatePromise.Id), "CreatePromise", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { if req.CreatePromise.Param.Headers == nil { req.CreatePromise.Param.Headers = map[string]string{} } diff --git a/internal/app/coroutines/createSubscription.go b/internal/app/coroutines/createSubscription.go index eabe6587..1ad472c7 100644 --- a/internal/app/coroutines/createSubscription.go +++ b/internal/app/coroutines/createSubscription.go @@ -11,7 +11,7 @@ import ( ) func CreateSubscription(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine { - return scheduler.NewCoroutine(fmt.Sprintf("CreateSubscription(promiseId=%s, url=%s)", req.CreateSubscription.PromiseId, req.CreateSubscription.Url), func(s *scheduler.Scheduler, c *scheduler.Coroutine) { + return scheduler.NewCoroutine(fmt.Sprintf("CreateSubscription(promiseId=%s, url=%s)", req.CreateSubscription.PromiseId, req.CreateSubscription.Url), "CreateSubscription", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { // default retry policy if req.CreateSubscription.RetryPolicy == nil { req.CreateSubscription.RetryPolicy = &subscription.RetryPolicy{ diff --git a/internal/app/coroutines/deleteSubscription.go b/internal/app/coroutines/deleteSubscription.go index 67c1021f..add85ff1 100644 --- a/internal/app/coroutines/deleteSubscription.go +++ b/internal/app/coroutines/deleteSubscription.go @@ -10,7 +10,7 @@ import ( ) func DeleteSubscription(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine { - return scheduler.NewCoroutine(fmt.Sprintf("DeleteSubscription(id=%d)", req.DeleteSubscription.Id), func(s *scheduler.Scheduler, c *scheduler.Coroutine) { + return scheduler.NewCoroutine(fmt.Sprintf("DeleteSubscription(id=%d)", req.DeleteSubscription.Id), "DeleteSubscription", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { submission := &types.Submission{ Kind: types.Store, Store: &types.StoreSubmission{ diff --git a/internal/app/coroutines/notifiySubscriptions.go b/internal/app/coroutines/notifiySubscriptions.go index fbd21384..67662a1d 100644 --- a/internal/app/coroutines/notifiySubscriptions.go +++ b/internal/app/coroutines/notifiySubscriptions.go @@ -31,7 +31,7 @@ func (i inflight) remove(id int64) { } func NotifySubscriptions(t int64, cfg *system.Config) *scheduler.Coroutine { - return scheduler.NewCoroutine(fmt.Sprintf("NotifySubscriptions(t=%d)", t), func(s *scheduler.Scheduler, c *scheduler.Coroutine) { + return scheduler.NewCoroutine(fmt.Sprintf("NotifySubscriptions(t=%d)", t), "NotifySubscriptions", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { submission := &types.Submission{ Kind: types.Store, Store: &types.StoreSubmission{ @@ -72,7 +72,7 @@ func NotifySubscriptions(t int64, cfg *system.Config) *scheduler.Coroutine { } func notifySubscription(notification *notification.Notification) *scheduler.Coroutine { - return scheduler.NewCoroutine(fmt.Sprintf("NotifySubscription:%d", notification.Id), func(s *scheduler.Scheduler, c *scheduler.Coroutine) { + return scheduler.NewCoroutine(fmt.Sprintf("NotifySubscription:%d", notification.Id), "NotifySubscription", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { // handle inflight cache inflights.add(notification.Id) c.OnDone(func() { inflights.remove(notification.Id) }) diff --git a/internal/app/coroutines/readPromise.go b/internal/app/coroutines/readPromise.go index 196fdf4f..23848efb 100644 --- a/internal/app/coroutines/readPromise.go +++ b/internal/app/coroutines/readPromise.go @@ -11,7 +11,7 @@ import ( ) func ReadPromise(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine { - return scheduler.NewCoroutine(fmt.Sprintf("ReadPromise(id=%s)", req.ReadPromise.Id), func(s *scheduler.Scheduler, c *scheduler.Coroutine) { + return scheduler.NewCoroutine(fmt.Sprintf("ReadPromise(id=%s)", req.ReadPromise.Id), "ReadPromise", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { submission := &types.Submission{ Kind: types.Store, Store: &types.StoreSubmission{ diff --git a/internal/app/coroutines/readSubscriptions.go b/internal/app/coroutines/readSubscriptions.go index d0b8ca76..c62f6b50 100644 --- a/internal/app/coroutines/readSubscriptions.go +++ b/internal/app/coroutines/readSubscriptions.go @@ -11,7 +11,7 @@ import ( ) func ReadSubscriptions(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine { - return scheduler.NewCoroutine(fmt.Sprintf("ReadSubscriptions(promiseId=%s)", req.ReadSubscriptions.PromiseId), func(s *scheduler.Scheduler, c *scheduler.Coroutine) { + return scheduler.NewCoroutine(fmt.Sprintf("ReadSubscriptions(promiseId=%s)", req.ReadSubscriptions.PromiseId), "ReadSubscriptions", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { submission := &types.Submission{ Kind: types.Store, Store: &types.StoreSubmission{ diff --git a/internal/app/coroutines/rejectPromise.go b/internal/app/coroutines/rejectPromise.go index 4356ccee..72e2282c 100644 --- a/internal/app/coroutines/rejectPromise.go +++ b/internal/app/coroutines/rejectPromise.go @@ -11,7 +11,7 @@ import ( ) func RejectPromise(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine { - return scheduler.NewCoroutine(fmt.Sprintf("RejectPromise(id=%s)", req.RejectPromise.Id), func(s *scheduler.Scheduler, c *scheduler.Coroutine) { + return scheduler.NewCoroutine(fmt.Sprintf("RejectPromise(id=%s)", req.RejectPromise.Id), "RejectPromise", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { if req.RejectPromise.Value.Headers == nil { req.RejectPromise.Value.Headers = map[string]string{} } diff --git a/internal/app/coroutines/resolvePromise.go b/internal/app/coroutines/resolvePromise.go index 193260d7..59c1c4c9 100644 --- a/internal/app/coroutines/resolvePromise.go +++ b/internal/app/coroutines/resolvePromise.go @@ -11,7 +11,7 @@ import ( ) func ResolvePromise(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine { - return scheduler.NewCoroutine(fmt.Sprintf("ResolvePromise(id=%s)", req.ResolvePromise.Id), func(s *scheduler.Scheduler, c *scheduler.Coroutine) { + return scheduler.NewCoroutine(fmt.Sprintf("ResolvePromise(id=%s)", req.ResolvePromise.Id), "ResolvePromise", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { if req.ResolvePromise.Value.Headers == nil { req.ResolvePromise.Value.Headers = map[string]string{} } diff --git a/internal/app/coroutines/searchPromises.go b/internal/app/coroutines/searchPromises.go index 60938d37..65528be5 100644 --- a/internal/app/coroutines/searchPromises.go +++ b/internal/app/coroutines/searchPromises.go @@ -11,7 +11,7 @@ import ( ) func SearchPromises(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine { - return scheduler.NewCoroutine(fmt.Sprintf("SearchPromises(q=%s)", req.SearchPromises.Q), func(s *scheduler.Scheduler, c *scheduler.Coroutine) { + return scheduler.NewCoroutine(fmt.Sprintf("SearchPromises(q=%s)", req.SearchPromises.Q), "SearchPromises", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { submission := &types.Submission{ Kind: types.Store, Store: &types.StoreSubmission{ diff --git a/internal/app/coroutines/timeoutPromise.go b/internal/app/coroutines/timeoutPromise.go index acaedab8..a230f2f5 100644 --- a/internal/app/coroutines/timeoutPromise.go +++ b/internal/app/coroutines/timeoutPromise.go @@ -11,7 +11,7 @@ import ( ) func TimeoutPromise(t int64, id string, retry *scheduler.Coroutine, res func(error)) *scheduler.Coroutine { - return scheduler.NewCoroutine(fmt.Sprintf("TimeoutPromise(id=%s)", id), func(s *scheduler.Scheduler, c *scheduler.Coroutine) { + return scheduler.NewCoroutine(fmt.Sprintf("TimeoutPromise(id=%s)", id), "TimeoutPromise", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { submission := &types.Submission{ Kind: types.Store, Store: &types.StoreSubmission{ diff --git a/internal/app/coroutines/timeoutPromises.go b/internal/app/coroutines/timeoutPromises.go index 7f5c8727..542b625f 100644 --- a/internal/app/coroutines/timeoutPromises.go +++ b/internal/app/coroutines/timeoutPromises.go @@ -12,7 +12,7 @@ import ( ) func TimeoutPromises(t int64, cfg *system.Config) *scheduler.Coroutine { - return scheduler.NewCoroutine(fmt.Sprintf("TimeoutPromises(t=%d)", t), func(s *scheduler.Scheduler, c *scheduler.Coroutine) { + return scheduler.NewCoroutine(fmt.Sprintf("TimeoutPromises(t=%d)", t), "TimeoutPromises", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { submission := &types.Submission{ Kind: types.Store, Store: &types.StoreSubmission{ diff --git a/internal/app/subsystems/api/grpc/grpc.go b/internal/app/subsystems/api/grpc/grpc.go index 1f1a317c..ee78155f 100644 --- a/internal/app/subsystems/api/grpc/grpc.go +++ b/internal/app/subsystems/api/grpc/grpc.go @@ -79,7 +79,7 @@ func (s *server) ReadPromise(ctx context.Context, req *grpcApi.ReadPromiseReques cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.ReadPromise, ReadPromise: &types.ReadPromiseRequest{ @@ -114,7 +114,7 @@ func (s *server) SearchPromises(ctx context.Context, req *grpcApi.SearchPromises return nil, grpcStatus.Error(codes.InvalidArgument, "") } - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.SearchPromises, SearchPromises: &types.SearchPromisesRequest{ @@ -173,7 +173,7 @@ func (s *server) CreatePromise(ctx context.Context, req *grpcApi.CreatePromiseRe } } - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.CreatePromise, CreatePromise: &types.CreatePromiseRequest{ @@ -225,7 +225,7 @@ func (s *server) CancelPromise(ctx context.Context, req *grpcApi.CancelPromiseRe data = req.Value.Data } - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.CancelPromise, CancelPromise: &types.CancelPromiseRequest{ @@ -275,7 +275,7 @@ func (s *server) ResolvePromise(ctx context.Context, req *grpcApi.ResolvePromise data = req.Value.Data } - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.ResolvePromise, ResolvePromise: &types.ResolvePromiseRequest{ @@ -325,7 +325,7 @@ func (s *server) RejectPromise(ctx context.Context, req *grpcApi.RejectPromiseRe data = req.Value.Data } - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.RejectPromise, RejectPromise: &types.RejectPromiseRequest{ @@ -357,7 +357,7 @@ func (s *server) ReadSubscriptions(ctx context.Context, req *grpcApi.ReadSubscri cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.ReadSubscriptions, ReadSubscriptions: &types.ReadSubscriptionsRequest{ @@ -389,7 +389,7 @@ func (s *server) CreateSubscription(ctx context.Context, req *grpcApi.CreateSubs cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.CreateSubscription, CreateSubscription: &types.CreateSubscriptionRequest{ @@ -417,7 +417,7 @@ func (s *server) DeleteSubscription(ctx context.Context, req *grpcApi.DeleteSubs cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.DeleteSubscription, DeleteSubscription: &types.DeleteSubscriptionRequest{ diff --git a/internal/app/subsystems/api/http/http.go b/internal/app/subsystems/api/http/http.go index f155e9eb..bc973170 100644 --- a/internal/app/subsystems/api/http/http.go +++ b/internal/app/subsystems/api/http/http.go @@ -8,6 +8,8 @@ import ( "log/slog" "github.com/gin-gonic/gin" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/resonatehq/resonate/internal/api" "github.com/resonatehq/resonate/internal/kernel/bus" "github.com/resonatehq/resonate/internal/kernel/types" @@ -19,7 +21,7 @@ type Http struct { server *http.Server } -func New(api api.API, addr string, timeout time.Duration) api.Subsystem { +func New(api api.API, addr string, timeout time.Duration, reg *prometheus.Registry) api.Subsystem { r := gin.Default() s := &server{api: api} @@ -36,6 +38,9 @@ func New(api api.API, addr string, timeout time.Duration) api.Subsystem { r.POST("/subscriptions", s.createSubscription) r.DELETE("/subscriptions/:id", s.deleteSubscription) + // Metrics + r.GET("/metrics", gin.WrapH(promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}))) + return &Http{ addr: addr, timeout: timeout, diff --git a/internal/app/subsystems/api/http/promise.go b/internal/app/subsystems/api/http/promise.go index 82c2e5f0..55ad1a17 100644 --- a/internal/app/subsystems/api/http/promise.go +++ b/internal/app/subsystems/api/http/promise.go @@ -14,7 +14,7 @@ func (s *server) readPromise(c *gin.Context) { cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.ReadPromise, ReadPromise: &types.ReadPromiseRequest{ @@ -49,7 +49,7 @@ func (s *server) searchPromises(c *gin.Context) { return } - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.SearchPromises, SearchPromises: &types.SearchPromisesRequest{ @@ -86,7 +86,7 @@ func (s *server) createPromise(c *gin.Context) { cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.CreatePromise, CreatePromise: createPromise, @@ -120,7 +120,7 @@ func (s *server) resolvePromise(c *gin.Context) { cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.ResolvePromise, ResolvePromise: resolvePromise, @@ -154,7 +154,7 @@ func (s *server) rejectPromise(c *gin.Context) { cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.RejectPromise, RejectPromise: rejectPromise, @@ -188,7 +188,7 @@ func (s *server) cancelPromise(c *gin.Context) { cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.CancelPromise, CancelPromise: cancelPromise, diff --git a/internal/app/subsystems/api/http/subscription.go b/internal/app/subsystems/api/http/subscription.go index a63e6d80..b1e05046 100644 --- a/internal/app/subsystems/api/http/subscription.go +++ b/internal/app/subsystems/api/http/subscription.go @@ -14,7 +14,7 @@ func (s *server) readSubscriptions(c *gin.Context) { cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.ReadSubscriptions, ReadSubscriptions: &types.ReadSubscriptionsRequest{ @@ -48,7 +48,7 @@ func (s *server) createSubscription(c *gin.Context) { cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.CreateSubscription, CreateSubscription: createSubscription, @@ -78,7 +78,7 @@ func (s *server) deleteSubscription(c *gin.Context) { cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ Submission: &types.Request{ Kind: types.DeleteSubscription, DeleteSubscription: &types.DeleteSubscriptionRequest{ diff --git a/internal/kernel/scheduler/coroutine.go b/internal/kernel/scheduler/coroutine.go index 45dd4440..4b688c60 100644 --- a/internal/kernel/scheduler/coroutine.go +++ b/internal/kernel/scheduler/coroutine.go @@ -4,15 +4,17 @@ import "github.com/resonatehq/resonate/internal/kernel/types" type Coroutine struct { name string + kind string init func(*Scheduler, *Coroutine) onDone []func() submission *types.Submission continuation func(*types.Completion, error) } -func NewCoroutine(name string, init func(*Scheduler, *Coroutine)) *Coroutine { +func NewCoroutine(name string, kind string, init func(*Scheduler, *Coroutine)) *Coroutine { return &Coroutine{ name: name, + kind: kind, init: init, } } @@ -21,6 +23,10 @@ func (c *Coroutine) String() string { return c.name } +func (c *Coroutine) Kind() string { + return c.kind +} + func (c *Coroutine) Yield(submission *types.Submission, continuation func(*types.Completion, error)) { c.submission = submission c.continuation = continuation diff --git a/internal/kernel/scheduler/scheduler.go b/internal/kernel/scheduler/scheduler.go index 2c06d3fb..de414ddb 100644 --- a/internal/kernel/scheduler/scheduler.go +++ b/internal/kernel/scheduler/scheduler.go @@ -6,22 +6,27 @@ import ( "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/kernel/bus" "github.com/resonatehq/resonate/internal/kernel/types" + "github.com/resonatehq/resonate/internal/metrics" ) type Scheduler struct { aio aio.AIO + metrics *metrics.Metrics coroutines []*Coroutine } -func NewScheduler(aio aio.AIO) *Scheduler { +func NewScheduler(aio aio.AIO, metrics *metrics.Metrics) *Scheduler { return &Scheduler{ aio: aio, + metrics: metrics, coroutines: []*Coroutine{}, } } func (s *Scheduler) Add(coroutine *Coroutine) { - slog.Info("scheduler:add", "coroutine", coroutine.String()) + slog.Debug("scheduler:add", "coroutine", coroutine.String()) + s.metrics.CoroutinesTotal.WithLabelValues(coroutine.Kind()).Inc() + s.metrics.CoroutinesInFlight.WithLabelValues(coroutine.Kind()).Inc() coroutine.init(s, coroutine) s.coroutines = append(s.coroutines, coroutine) @@ -32,19 +37,16 @@ func (s *Scheduler) Tick(t int64, batchSize int) { for _, coroutine := range s.coroutines { if submission := coroutine.next(); submission != nil { - sqe := &bus.SQE[types.Submission, types.Completion]{ + s.aio.Enqueue(&bus.SQE[types.Submission, types.Completion]{ Submission: submission, Callback: coroutine.resume, - } - - slog.Info("aio:enqueue", "sqe", sqe.Submission) - s.aio.Enqueue(sqe) + }) } if !coroutine.done() { coroutines = append(coroutines, coroutine) } else { - slog.Info("scheduler:rmv", "coroutine", coroutine.String()) + s.metrics.CoroutinesInFlight.WithLabelValues(coroutine.Kind()).Dec() } } @@ -56,7 +58,6 @@ func (s *Scheduler) Tick(t int64, batchSize int) { // callback cqes for _, cqe := range s.aio.Dequeue(batchSize) { - slog.Info("aio:dequeue", "cqe", cqe.Completion) cqe.Callback(cqe.Completion, cqe.Error) } } diff --git a/internal/kernel/system/system.go b/internal/kernel/system/system.go index 3e7b9fa8..3da07a53 100644 --- a/internal/kernel/system/system.go +++ b/internal/kernel/system/system.go @@ -6,6 +6,7 @@ import ( "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/api" + "github.com/resonatehq/resonate/internal/metrics" "github.com/resonatehq/resonate/internal/kernel/scheduler" "github.com/resonatehq/resonate/internal/kernel/types" @@ -16,6 +17,7 @@ type System struct { cfg *Config api api.API aio aio.AIO + metrics *metrics.Metrics scheduler *scheduler.Scheduler onRequest map[types.APIKind]func(int64, *types.Request, func(*types.Response, error)) *scheduler.Coroutine onTick map[int][]func(int64, *Config) *scheduler.Coroutine @@ -30,12 +32,13 @@ type Config struct { CompletionBatchSize int } -func New(cfg *Config, api api.API, aio aio.AIO) *System { +func New(cfg *Config, api api.API, aio aio.AIO, metrics *metrics.Metrics) *System { return &System{ cfg: cfg, api: api, aio: aio, - scheduler: scheduler.NewScheduler(aio), + metrics: metrics, + scheduler: scheduler.NewScheduler(aio, metrics), onRequest: map[types.APIKind]func(int64, *types.Request, func(*types.Response, error)) *scheduler.Coroutine{}, onTick: map[int][]func(int64, *Config) *scheduler.Coroutine{}, } @@ -59,7 +62,7 @@ func (s *System) Tick(t int64, timeoutCh <-chan time.Time) { // add request coroutines for _, sqe := range s.api.Dequeue(s.cfg.SubmissionBatchSize, timeoutCh) { if coroutine, ok := s.onRequest[sqe.Submission.Kind]; ok { - slog.Info("api:dequeue", "sqe", sqe.Submission) + slog.Debug("api:dequeue", "sqe", sqe.Submission) s.scheduler.Add(coroutine(t, sqe.Submission, sqe.Callback)) } else { panic("invalid api request") diff --git a/internal/kernel/types/aio.go b/internal/kernel/types/aio.go index c1276da8..2d157f6f 100644 --- a/internal/kernel/types/aio.go +++ b/internal/kernel/types/aio.go @@ -10,6 +10,19 @@ const ( Store ) +func (k AIOKind) String() string { + switch k { + case Echo: + return "echo" + case Network: + return "network" + case Store: + return "store" + default: + panic("invalid aio") + } +} + type Submission struct { Kind AIOKind Echo *EchoSubmission diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go new file mode 100644 index 00000000..e41b1820 --- /dev/null +++ b/internal/metrics/metrics.go @@ -0,0 +1,62 @@ +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +type Metrics struct { + AioTotal *prometheus.CounterVec + AioInFlight *prometheus.GaugeVec + ApiTotal *prometheus.CounterVec + ApiInFlight *prometheus.GaugeVec + CoroutinesTotal *prometheus.CounterVec + CoroutinesInFlight *prometheus.GaugeVec +} + +func New(reg prometheus.Registerer) *Metrics { + metrics := &Metrics{ + AioTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "aio_total_submissions", + Help: "Total number of aio submissions", + }, []string{"type", "status"}), + AioInFlight: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "aio_in_flight_submissions", + Help: "Number of in flight aio submissions", + }, []string{"type"}), + ApiTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "api_total_requests", + Help: "Total number of api requests", + }, []string{"type", "status"}), + ApiInFlight: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "api_in_flight_requests", + Help: "Number of in flight api requests", + }, []string{"type"}), + CoroutinesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "coroutines_total", + Help: "Total number of coroutines", + }, []string{"type"}), + CoroutinesInFlight: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "coroutines_in_flight", + Help: "Number of in flight coroutines", + }, []string{"type"}), + } + + metrics.Enable(reg) + return metrics +} + +func (m *Metrics) Enable(reg prometheus.Registerer) { + reg.MustRegister(m.AioTotal) + reg.MustRegister(m.AioInFlight) + reg.MustRegister(m.ApiTotal) + reg.MustRegister(m.ApiInFlight) + reg.MustRegister(m.CoroutinesTotal) + reg.MustRegister(m.CoroutinesInFlight) +} + +func (m *Metrics) Disable(reg prometheus.Registerer) { + reg.Unregister(m.AioTotal) + reg.Unregister(m.AioInFlight) + reg.Unregister(m.ApiTotal) + reg.Unregister(m.ApiInFlight) + reg.Unregister(m.CoroutinesTotal) + reg.Unregister(m.CoroutinesInFlight) +} diff --git a/test/dst/dst.go b/test/dst/dst.go index f1ab8910..cf27d333 100644 --- a/test/dst/dst.go +++ b/test/dst/dst.go @@ -5,6 +5,7 @@ import ( "math/rand" "testing" + "github.com/prometheus/client_golang/prometheus" "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/api" "github.com/resonatehq/resonate/internal/app/coroutines" @@ -13,6 +14,7 @@ import ( "github.com/resonatehq/resonate/internal/kernel/bus" "github.com/resonatehq/resonate/internal/kernel/system" "github.com/resonatehq/resonate/internal/kernel/types" + "github.com/resonatehq/resonate/internal/metrics" "github.com/resonatehq/resonate/test" ) @@ -47,8 +49,12 @@ func (d *DST) Run(t *testing.T, r *rand.Rand, seed int64) { CompletionBatchSize: test.RangeIntn(r, 1, d.SQEsPerTick*Q_RATIO), } + // metrics + reg := prometheus.NewRegistry() + metrics := metrics.New(reg) + // instatiate api/aio - api := api.New(SQ_SIZE) + api := api.New(SQ_SIZE, metrics) aio := aio.NewDST() // instatiate aio subsystems @@ -71,7 +77,7 @@ func (d *DST) Run(t *testing.T, r *rand.Rand, seed int64) { } // instatiate system - system := system.New(cfg, api, aio) + system := system.New(cfg, api, aio, metrics) system.AddOnRequest(types.ReadPromise, coroutines.ReadPromise) system.AddOnRequest(types.SearchPromises, coroutines.SearchPromises) system.AddOnRequest(types.CreatePromise, coroutines.CreatePromise) @@ -116,7 +122,7 @@ func (d *DST) Run(t *testing.T, r *rand.Rand, seed int64) { for _, req := range generator.Generate(r, time, r.Intn(d.SQEsPerTick)) { req := req - api.Enqueue(&bus.SQE[types.Request, types.Response]{ + api.Enqueue("dst", &bus.SQE[types.Request, types.Response]{ Submission: req, Callback: func(res *types.Response, err error) { var errMsg string