Skip to content

Commit

Permalink
Add prometheus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dfarr committed Aug 29, 2023
1 parent f87d945 commit 82ecbda
Show file tree
Hide file tree
Showing 27 changed files with 242 additions and 58 deletions.
19 changes: 15 additions & 4 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/api"
"github.com/resonatehq/resonate/internal/app/coroutines"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/resonatehq/resonate/internal/app/subsystems/api/http"
"github.com/resonatehq/resonate/internal/kernel/system"
"github.com/resonatehq/resonate/internal/kernel/types"
"github.com/resonatehq/resonate/internal/metrics"
"github.com/spf13/cobra"
)

Expand All @@ -28,6 +30,10 @@ var serveCmd = &cobra.Command{
Use: "serve",
Short: "Start the durable promises server",
RunE: func(cmd *cobra.Command, args []string) error {
// logger
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
slog.SetDefault(logger)

// config
cfg := &system.Config{
PromiseCacheSize: 100,
Expand All @@ -37,12 +43,16 @@ var serveCmd = &cobra.Command{
CompletionBatchSize: 100,
}

// instantiate metrics
reg := prometheus.NewRegistry()
metrics := metrics.New(reg)

// instatiate api/aio
api := api.New(100)
aio := aio.New(100)
api := api.New(100, metrics)
aio := aio.New(100, metrics)

// instatiate api subsystems
http := http.New(api, httpAddr, 10*time.Second)
http := http.New(api, httpAddr, 10*time.Second, reg)
grpc := grpc.New(api, grpcAddr)

// instatiate aio subsystems
Expand All @@ -69,7 +79,7 @@ var serveCmd = &cobra.Command{
}

// instantiate system
system := system.New(cfg, api, aio)
system := system.New(cfg, api, aio, metrics)
system.AddOnRequest(types.ReadPromise, coroutines.ReadPromise)
system.AddOnRequest(types.SearchPromises, coroutines.SearchPromises)
system.AddOnRequest(types.CreatePromise, coroutines.CreatePromise)
Expand All @@ -82,6 +92,7 @@ var serveCmd = &cobra.Command{
system.AddOnTick(2, coroutines.TimeoutPromises)
system.AddOnTick(1, coroutines.NotifySubscriptions)

// listen for shutdown signal
go func() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
Expand Down
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ require (
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
Expand All @@ -27,10 +29,15 @@ require (
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
Expand Down
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s=
github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
Expand All @@ -24,6 +28,8 @@ github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg
github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
Expand All @@ -44,6 +50,8 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand All @@ -53,6 +61,14 @@ github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZ
github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8=
github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc=
github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4=
github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM=
github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc=
github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg=
github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
Expand Down Expand Up @@ -80,6 +96,7 @@ golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
Expand Down
20 changes: 19 additions & 1 deletion internal/aio/aio.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package aio

import (
"fmt"
"log/slog"

"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/types"
"github.com/resonatehq/resonate/internal/metrics"
)

type AIO interface {
Expand All @@ -19,6 +21,7 @@ type aio struct {
subsystems map[types.AIOKind]*subsystemWrapper
done bool
errors chan error
metrics *metrics.Metrics
}

type subsystemWrapper struct {
Expand All @@ -35,11 +38,12 @@ type workerWrapper struct {
flushCh chan int64
}

func New(size int) *aio {
func New(size int, metrics *metrics.Metrics) *aio {

Check warning on line 41 in internal/aio/aio.go

View check run for this annotation

Codecov / codecov/patch

internal/aio/aio.go#L41

Added line #L41 was not covered by tests
return &aio{
cq: make(chan *bus.CQE[types.Submission, types.Completion], size),
subsystems: map[types.AIOKind]*subsystemWrapper{},
errors: make(chan error),
metrics: metrics,

Check warning on line 46 in internal/aio/aio.go

View check run for this annotation

Codecov / codecov/patch

internal/aio/aio.go#L46

Added line #L46 was not covered by tests
}
}

Expand Down Expand Up @@ -104,6 +108,8 @@ func (a *aio) Enqueue(sqe *bus.SQE[types.Submission, types.Completion]) {
if subsystem, ok := a.subsystems[sqe.Submission.Kind]; ok {
select {
case subsystem.sq <- sqe:
slog.Debug("aio:enqueue", "sqe", sqe.Submission)
a.metrics.AioInFlight.WithLabelValues(sqe.Submission.Kind.String()).Inc()

Check warning on line 112 in internal/aio/aio.go

View check run for this annotation

Codecov / codecov/patch

internal/aio/aio.go#L111-L112

Added lines #L111 - L112 were not covered by tests
default:
sqe.Callback(nil, fmt.Errorf("aio:subsystem:%s submission queue full", subsystem))
}
Expand All @@ -123,6 +129,18 @@ func (a *aio) Dequeue(n int) []*bus.CQE[types.Submission, types.Completion] {
if !ok {
return cqes
}

var status string
if cqe.Error != nil {
status = "failure"
} else {
status = "success"
}

Check warning on line 138 in internal/aio/aio.go

View check run for this annotation

Codecov / codecov/patch

internal/aio/aio.go#L133-L138

Added lines #L133 - L138 were not covered by tests

slog.Debug("aio:dequeue", "cqe", cqe.Completion)
a.metrics.AioTotal.WithLabelValues(cqe.Completion.Kind.String(), status).Inc()
a.metrics.AioInFlight.WithLabelValues(cqe.Completion.Kind.String()).Dec()

Check warning on line 143 in internal/aio/aio.go

View check run for this annotation

Codecov / codecov/patch

internal/aio/aio.go#L140-L143

Added lines #L140 - L143 were not covered by tests
cqes = append(cqes, cqe)
default:
return cqes
Expand Down
45 changes: 40 additions & 5 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package api

import (
"fmt"
"strconv"
"time"

"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/types"
"github.com/resonatehq/resonate/internal/metrics"
)

type API interface {
Enqueue(*bus.SQE[types.Request, types.Response])
Enqueue(string, *bus.SQE[types.Request, types.Response])
Dequeue(int, <-chan time.Time) []*bus.SQE[types.Request, types.Response]
Done() bool
}
Expand All @@ -19,12 +21,14 @@ type api struct {
subsystems []Subsystem
done bool
errors chan error
metrics *metrics.Metrics
}

func New(size int) *api {
func New(size int, metrics *metrics.Metrics) *api {
return &api{
sq: make(chan *bus.SQE[types.Request, types.Response], size),
errors: make(chan error),
sq: make(chan *bus.SQE[types.Request, types.Response], size),
errors: make(chan error),
metrics: metrics,
}
}

Expand Down Expand Up @@ -62,9 +66,40 @@ func (a *api) Errors() <-chan error {
return a.errors
}

func (a *api) Enqueue(sqe *bus.SQE[types.Request, types.Response]) {
func (a *api) Enqueue(kind string, sqe *bus.SQE[types.Request, types.Response]) {
select {
case a.sq <- sqe:
a.metrics.ApiInFlight.WithLabelValues(kind).Inc()

callback := sqe.Callback
sqe.Callback = func(res *types.Response, err error) {
var status types.ResponseStatus
switch res.Kind {
case types.ReadPromise:
status = res.ReadPromise.Status
case types.SearchPromises:
status = res.SearchPromises.Status
case types.CreatePromise:
status = res.CreatePromise.Status
case types.CancelPromise:
status = res.CancelPromise.Status
case types.ResolvePromise:
status = res.ResolvePromise.Status
case types.RejectPromise:
status = res.RejectPromise.Status
case types.ReadSubscriptions:
status = res.ReadSubscriptions.Status
case types.CreateSubscription:
status = res.CreateSubscription.Status
case types.DeleteSubscription:
status = res.DeleteSubscription.Status

Check warning on line 95 in internal/api/api.go

View check run for this annotation

Codecov / codecov/patch

internal/api/api.go#L94-L95

Added lines #L94 - L95 were not covered by tests
}

a.metrics.ApiTotal.WithLabelValues(kind, strconv.Itoa(int(status))).Inc()
a.metrics.ApiInFlight.WithLabelValues(kind).Dec()

callback(res, err)
}
default:
sqe.Callback(nil, fmt.Errorf("api submission queue full"))
}
Expand Down
2 changes: 1 addition & 1 deletion internal/app/coroutines/cancelPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func CancelPromise(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine {
return scheduler.NewCoroutine(fmt.Sprintf("CancelPromise(id=%s)", req.CancelPromise.Id), func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
return scheduler.NewCoroutine(fmt.Sprintf("CancelPromise(id=%s)", req.CancelPromise.Id), "CancelPromise", func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
if req.CancelPromise.Value.Headers == nil {
req.CancelPromise.Value.Headers = map[string]string{}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/app/coroutines/createPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func CreatePromise(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine {
return scheduler.NewCoroutine(fmt.Sprintf("CreatePromise(id=%s)", req.CreatePromise.Id), func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
return scheduler.NewCoroutine(fmt.Sprintf("CreatePromise(id=%s)", req.CreatePromise.Id), "CreatePromise", func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
if req.CreatePromise.Param.Headers == nil {
req.CreatePromise.Param.Headers = map[string]string{}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/app/coroutines/createSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func CreateSubscription(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine {
return scheduler.NewCoroutine(fmt.Sprintf("CreateSubscription(promiseId=%s, url=%s)", req.CreateSubscription.PromiseId, req.CreateSubscription.Url), func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
return scheduler.NewCoroutine(fmt.Sprintf("CreateSubscription(promiseId=%s, url=%s)", req.CreateSubscription.PromiseId, req.CreateSubscription.Url), "CreateSubscription", func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
// default retry policy
if req.CreateSubscription.RetryPolicy == nil {
req.CreateSubscription.RetryPolicy = &subscription.RetryPolicy{
Expand Down
2 changes: 1 addition & 1 deletion internal/app/coroutines/deleteSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func DeleteSubscription(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine {
return scheduler.NewCoroutine(fmt.Sprintf("DeleteSubscription(id=%d)", req.DeleteSubscription.Id), func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
return scheduler.NewCoroutine(fmt.Sprintf("DeleteSubscription(id=%d)", req.DeleteSubscription.Id), "DeleteSubscription", func(s *scheduler.Scheduler, c *scheduler.Coroutine) {

Check warning on line 13 in internal/app/coroutines/deleteSubscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/deleteSubscription.go#L13

Added line #L13 was not covered by tests
submission := &types.Submission{
Kind: types.Store,
Store: &types.StoreSubmission{
Expand Down
4 changes: 2 additions & 2 deletions internal/app/coroutines/notifiySubscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (i inflight) remove(id int64) {
}

func NotifySubscriptions(t int64, cfg *system.Config) *scheduler.Coroutine {
return scheduler.NewCoroutine(fmt.Sprintf("NotifySubscriptions(t=%d)", t), func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
return scheduler.NewCoroutine(fmt.Sprintf("NotifySubscriptions(t=%d)", t), "NotifySubscriptions", func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
submission := &types.Submission{
Kind: types.Store,
Store: &types.StoreSubmission{
Expand Down Expand Up @@ -72,7 +72,7 @@ func NotifySubscriptions(t int64, cfg *system.Config) *scheduler.Coroutine {
}

func notifySubscription(notification *notification.Notification) *scheduler.Coroutine {
return scheduler.NewCoroutine(fmt.Sprintf("NotifySubscription:%d", notification.Id), func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
return scheduler.NewCoroutine(fmt.Sprintf("NotifySubscription:%d", notification.Id), "NotifySubscription", func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
// handle inflight cache
inflights.add(notification.Id)
c.OnDone(func() { inflights.remove(notification.Id) })
Expand Down
2 changes: 1 addition & 1 deletion internal/app/coroutines/readPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func ReadPromise(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine {
return scheduler.NewCoroutine(fmt.Sprintf("ReadPromise(id=%s)", req.ReadPromise.Id), func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
return scheduler.NewCoroutine(fmt.Sprintf("ReadPromise(id=%s)", req.ReadPromise.Id), "ReadPromise", func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
submission := &types.Submission{
Kind: types.Store,
Store: &types.StoreSubmission{
Expand Down
2 changes: 1 addition & 1 deletion internal/app/coroutines/readSubscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func ReadSubscriptions(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine {
return scheduler.NewCoroutine(fmt.Sprintf("ReadSubscriptions(promiseId=%s)", req.ReadSubscriptions.PromiseId), func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
return scheduler.NewCoroutine(fmt.Sprintf("ReadSubscriptions(promiseId=%s)", req.ReadSubscriptions.PromiseId), "ReadSubscriptions", func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
submission := &types.Submission{
Kind: types.Store,
Store: &types.StoreSubmission{
Expand Down
2 changes: 1 addition & 1 deletion internal/app/coroutines/rejectPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func RejectPromise(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine {
return scheduler.NewCoroutine(fmt.Sprintf("RejectPromise(id=%s)", req.RejectPromise.Id), func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
return scheduler.NewCoroutine(fmt.Sprintf("RejectPromise(id=%s)", req.RejectPromise.Id), "RejectPromise", func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
if req.RejectPromise.Value.Headers == nil {
req.RejectPromise.Value.Headers = map[string]string{}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/app/coroutines/resolvePromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func ResolvePromise(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine {
return scheduler.NewCoroutine(fmt.Sprintf("ResolvePromise(id=%s)", req.ResolvePromise.Id), func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
return scheduler.NewCoroutine(fmt.Sprintf("ResolvePromise(id=%s)", req.ResolvePromise.Id), "ResolvePromise", func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
if req.ResolvePromise.Value.Headers == nil {
req.ResolvePromise.Value.Headers = map[string]string{}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/app/coroutines/searchPromises.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func SearchPromises(t int64, req *types.Request, res func(*types.Response, error)) *scheduler.Coroutine {
return scheduler.NewCoroutine(fmt.Sprintf("SearchPromises(q=%s)", req.SearchPromises.Q), func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
return scheduler.NewCoroutine(fmt.Sprintf("SearchPromises(q=%s)", req.SearchPromises.Q), "SearchPromises", func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
submission := &types.Submission{
Kind: types.Store,
Store: &types.StoreSubmission{
Expand Down
2 changes: 1 addition & 1 deletion internal/app/coroutines/timeoutPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func TimeoutPromise(t int64, id string, retry *scheduler.Coroutine, res func(error)) *scheduler.Coroutine {
return scheduler.NewCoroutine(fmt.Sprintf("TimeoutPromise(id=%s)", id), func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
return scheduler.NewCoroutine(fmt.Sprintf("TimeoutPromise(id=%s)", id), "TimeoutPromise", func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
submission := &types.Submission{
Kind: types.Store,
Store: &types.StoreSubmission{
Expand Down
2 changes: 1 addition & 1 deletion internal/app/coroutines/timeoutPromises.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func TimeoutPromises(t int64, cfg *system.Config) *scheduler.Coroutine {
return scheduler.NewCoroutine(fmt.Sprintf("TimeoutPromises(t=%d)", t), func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
return scheduler.NewCoroutine(fmt.Sprintf("TimeoutPromises(t=%d)", t), "TimeoutPromises", func(s *scheduler.Scheduler, c *scheduler.Coroutine) {
submission := &types.Submission{
Kind: types.Store,
Store: &types.StoreSubmission{
Expand Down
Loading

0 comments on commit 82ecbda

Please sign in to comment.