Skip to content

Commit

Permalink
Add metadata to sqes, cqes, and coroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
dfarr committed Oct 20, 2023
1 parent 0a5497f commit 5d62744
Show file tree
Hide file tree
Showing 37 changed files with 503 additions and 266 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/gin-gonic/gin v1.9.1
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/google/uuid v1.3.1
github.com/lib/pq v1.10.9
github.com/mattn/go-sqlite3 v1.14.17
github.com/mitchellh/mapstructure v1.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g=
Expand Down
5 changes: 2 additions & 3 deletions internal/aio/aio.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package aio
import (
"fmt"
"log/slog"
"strings"

"github.com/resonatehq/resonate/internal/kernel/t_aio"

Expand Down Expand Up @@ -109,7 +108,7 @@ func (a *aio) Enqueue(sqe *bus.SQE[t_aio.Submission, t_aio.Completion]) {
select {
case subsystem.sq <- sqe:
slog.Debug("aio:enqueue", "sqe", sqe)
a.metrics.AioInFlight.WithLabelValues(strings.Split(sqe.Tags, ",")...).Inc()
a.metrics.AioInFlight.WithLabelValues(sqe.Metadata.Tags.Split("aio")...).Inc()
default:
sqe.Callback(nil, fmt.Errorf("aio:subsystem:%s submission queue full", subsystem))
}
Expand Down Expand Up @@ -137,7 +136,7 @@ func (a *aio) Dequeue(n int) []*bus.CQE[t_aio.Submission, t_aio.Completion] {
status = "success"
}

tags := strings.Split(cqe.Tags, ",")
tags := cqe.Metadata.Tags.Split("aio")
a.metrics.AioTotal.WithLabelValues(append(tags, status)...).Inc()
a.metrics.AioInFlight.WithLabelValues(tags...).Dec()

Expand Down
5 changes: 2 additions & 3 deletions internal/aio/aio_dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"log/slog"
"math/rand" // nosemgrep
"strings"

"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/metrics"
Expand Down Expand Up @@ -57,7 +56,7 @@ func (a *aioDST) Shutdown() {}

func (a *aioDST) Enqueue(sqe *bus.SQE[t_aio.Submission, t_aio.Completion]) {
slog.Debug("aio:enqueue", "sqe", sqe)
a.metrics.AioInFlight.WithLabelValues(strings.Split(sqe.Tags, ",")...).Inc()
a.metrics.AioInFlight.WithLabelValues(sqe.Metadata.Tags.Split("aio")...).Inc()

i := a.r.Intn(len(a.sqes) + 1)
a.sqes = append(a.sqes[:i], append([]*bus.SQE[t_aio.Submission, t_aio.Completion]{sqe}, a.sqes[i:]...)...)
Expand All @@ -77,7 +76,7 @@ func (a *aioDST) Dequeue(n int) []*bus.CQE[t_aio.Submission, t_aio.Completion] {
status = "success"
}

tags := strings.Split(cqe.Tags, ",")
tags := cqe.Metadata.Tags.Split("aio")
a.metrics.AioTotal.WithLabelValues(append(tags, status)...).Inc()
a.metrics.AioInFlight.WithLabelValues(tags...).Dec()
}
Expand Down
5 changes: 1 addition & 4 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package api
import (
"fmt"
"strconv"
"strings"
"time"

"log/slog"

"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/t_api"
"github.com/resonatehq/resonate/internal/metrics"
"github.com/resonatehq/resonate/internal/util"
)

type API interface {
Expand Down Expand Up @@ -75,7 +73,7 @@ func (a *api) Errors() <-chan error {
}

func (a *api) Enqueue(sqe *bus.SQE[t_api.Request, t_api.Response]) {
tags := strings.Split(sqe.Tags, ",")
tags := sqe.Metadata.Tags.Split("api")
a.metrics.ApiInFlight.WithLabelValues(tags...).Inc()

// replace sqe.Callback with a callback that wraps the original
Expand Down Expand Up @@ -133,7 +131,6 @@ func (a *api) Enqueue(sqe *bus.SQE[t_api.Request, t_api.Response]) {
}

func (a *api) Dequeue(n int, timeoutCh <-chan time.Time) []*bus.SQE[t_api.Request, t_api.Response] {
util.Assert(n > 0, "submission batch size must be greater than 0")
sqes := []*bus.SQE[t_api.Request, t_api.Response]{}

if timeoutCh != nil {
Expand Down
9 changes: 5 additions & 4 deletions internal/app/coroutines/cancelPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package coroutines
import (
"log/slog"

"github.com/resonatehq/resonate/internal/kernel/metadata"
"github.com/resonatehq/resonate/internal/kernel/scheduler"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/kernel/t_api"
"github.com/resonatehq/resonate/internal/util"
"github.com/resonatehq/resonate/pkg/promise"
)

func CancelPromise(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine("CancelPromise", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
func CancelPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
if req.CancelPromise.Value.Headers == nil {
req.CancelPromise.Value.Headers = map[string]string{}
}
Expand Down Expand Up @@ -63,7 +64,7 @@ func CancelPromise(req *t_api.Request, res func(*t_api.Response, error)) *schedu

if p.State == promise.Pending {
if c.Time() >= p.Timeout {
c.Scheduler.Add(TimeoutPromise(p, CancelPromise(req, res), func(err error) {
c.Scheduler.Add(TimeoutPromise(metadata, p, CancelPromise(metadata, req, res), func(err error) {
if err != nil {
slog.Error("failed to timeout promise", "req", req, "err", err)
res(nil, err)
Expand Down Expand Up @@ -158,7 +159,7 @@ func CancelPromise(req *t_api.Request, res func(*t_api.Response, error)) *schedu
},
}, nil)
} else {
c.Scheduler.Add(CancelPromise(req, res))
c.Scheduler.Add(CancelPromise(metadata, req, res))
}
}
} else {
Expand Down
9 changes: 5 additions & 4 deletions internal/app/coroutines/createPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package coroutines
import (
"log/slog"

"github.com/resonatehq/resonate/internal/kernel/metadata"
"github.com/resonatehq/resonate/internal/kernel/scheduler"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/kernel/t_api"
"github.com/resonatehq/resonate/internal/util"
"github.com/resonatehq/resonate/pkg/promise"
)

func CreatePromise(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine("CreatePromise", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
func CreatePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
if req.CreatePromise.Param.Headers == nil {
req.CreatePromise.Param.Headers = map[string]string{}
}
Expand Down Expand Up @@ -100,7 +101,7 @@ func CreatePromise(req *t_api.Request, res func(*t_api.Response, error)) *schedu
},
}, nil)
} else {
c.Scheduler.Add(CreatePromise(req, res))
c.Scheduler.Add(CreatePromise(metadata, req, res))
}
} else {
p, err := result.Records[0].Promise()
Expand All @@ -118,7 +119,7 @@ func CreatePromise(req *t_api.Request, res func(*t_api.Response, error)) *schedu
}

if p.State == promise.Pending && c.Time() >= p.Timeout {
c.Scheduler.Add(TimeoutPromise(p, CreatePromise(req, res), func(err error) {
c.Scheduler.Add(TimeoutPromise(metadata, p, CreatePromise(metadata, req, res), func(err error) {

Check warning on line 122 in internal/app/coroutines/createPromise.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/createPromise.go#L122

Added line #L122 was not covered by tests
if err != nil {
slog.Error("failed to timeout promise", "req", req, "err", err)
res(nil, err)
Expand Down
7 changes: 4 additions & 3 deletions internal/app/coroutines/createSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package coroutines
import (
"log/slog"

"github.com/resonatehq/resonate/internal/kernel/metadata"
"github.com/resonatehq/resonate/internal/kernel/scheduler"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/kernel/t_api"
"github.com/resonatehq/resonate/internal/util"
"github.com/resonatehq/resonate/pkg/subscription"
)

func CreateSubscription(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine("CreateSubscription", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
func CreateSubscription(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
// default retry policy
if req.CreateSubscription.RetryPolicy == nil {
req.CreateSubscription.RetryPolicy = &subscription.RetryPolicy{
Expand Down Expand Up @@ -111,7 +112,7 @@ func CreateSubscription(req *t_api.Request, res func(*t_api.Response, error)) *s
},
}, nil)
} else {
c.Scheduler.Add(CreateSubscription(req, res))
c.Scheduler.Add(CreateSubscription(metadata, req, res))
}
}
})
Expand Down
5 changes: 3 additions & 2 deletions internal/app/coroutines/deleteSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package coroutines
import (
"log/slog"

"github.com/resonatehq/resonate/internal/kernel/metadata"
"github.com/resonatehq/resonate/internal/kernel/scheduler"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/kernel/t_api"
"github.com/resonatehq/resonate/internal/util"
)

func DeleteSubscription(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine("DeleteSubscription", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
func DeleteSubscription(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
completion, err := c.Yield(&t_aio.Submission{
Kind: t_aio.Store,
Store: &t_aio.StoreSubmission{
Expand Down
5 changes: 3 additions & 2 deletions internal/app/coroutines/echo.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package coroutines

import (
"github.com/resonatehq/resonate/internal/kernel/metadata"
"github.com/resonatehq/resonate/internal/kernel/scheduler"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/kernel/t_api"
)

func Echo(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine("Echo", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
func Echo(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
submission := &t_aio.Submission{
Kind: t_aio.Echo,
Echo: &t_aio.EchoSubmission{
Expand Down
17 changes: 12 additions & 5 deletions internal/app/coroutines/notifiySubscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"net/http"

"github.com/resonatehq/resonate/internal/kernel/metadata"
"github.com/resonatehq/resonate/internal/kernel/scheduler"
"github.com/resonatehq/resonate/internal/kernel/system"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
Expand All @@ -30,8 +31,11 @@ func (i inflight) remove(id string) {
delete(i, id)
}

func NotifySubscriptions(config *system.Config) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine("NotifySubscriptions", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
func NotifySubscriptions(t int64, config *system.Config) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
metadata := metadata.New(fmt.Sprintf("tick:%d:notify", t))
metadata.Tags.Set("name", "notify-subscriptions")

return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
completion, err := c.Yield(&t_aio.Submission{
Kind: t_aio.Store,
Store: &t_aio.StoreSubmission{
Expand Down Expand Up @@ -64,14 +68,17 @@ func NotifySubscriptions(config *system.Config) *scheduler.Coroutine[*t_aio.Comp
}

if c.Time() >= record.Time && !inflights.get(id(notification)) {
c.Scheduler.Add(notifySubscription(notification))
c.Scheduler.Add(notifySubscription(metadata.TransactionId, notification))
}
}
})
}

func notifySubscription(notification *notification.Notification) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine("NotifySubscription", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
func notifySubscription(tid string, notification *notification.Notification) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
metadata := metadata.New(tid)
metadata.Tags.Set("name", "notify-subscription")

return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
// handle inflight cache
inflights.add(id(notification))
c.OnDone(func() { inflights.remove(id(notification)) })
Expand Down
7 changes: 4 additions & 3 deletions internal/app/coroutines/readPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package coroutines
import (
"log/slog"

"github.com/resonatehq/resonate/internal/kernel/metadata"
"github.com/resonatehq/resonate/internal/kernel/scheduler"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/kernel/t_api"
"github.com/resonatehq/resonate/internal/util"
"github.com/resonatehq/resonate/pkg/promise"
)

func ReadPromise(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine("ReadPromise", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
func ReadPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
completion, err := c.Yield(&t_aio.Submission{
Kind: t_aio.Store,
Store: &t_aio.StoreSubmission{
Expand Down Expand Up @@ -55,7 +56,7 @@ func ReadPromise(req *t_api.Request, res func(*t_api.Response, error)) *schedule
}

if p.State == promise.Pending && c.Time() >= p.Timeout {
c.Scheduler.Add(TimeoutPromise(p, ReadPromise(req, res), func(err error) {
c.Scheduler.Add(TimeoutPromise(metadata, p, ReadPromise(metadata, req, res), func(err error) {

Check warning on line 59 in internal/app/coroutines/readPromise.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/readPromise.go#L59

Added line #L59 was not covered by tests
if err != nil {
slog.Error("failed to timeout promise", "req", req, "err", err)
res(nil, err)
Expand Down
5 changes: 3 additions & 2 deletions internal/app/coroutines/readSubscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package coroutines
import (
"log/slog"

"github.com/resonatehq/resonate/internal/kernel/metadata"
"github.com/resonatehq/resonate/internal/kernel/scheduler"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/kernel/t_api"
"github.com/resonatehq/resonate/internal/util"
"github.com/resonatehq/resonate/pkg/subscription"
)

func ReadSubscriptions(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine("ReadSubscriptions", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
func ReadSubscriptions(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
completion, err := c.Yield(&t_aio.Submission{
Kind: t_aio.Store,
Store: &t_aio.StoreSubmission{
Expand Down
9 changes: 5 additions & 4 deletions internal/app/coroutines/rejectPromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ package coroutines
import (
"log/slog"

"github.com/resonatehq/resonate/internal/kernel/metadata"
"github.com/resonatehq/resonate/internal/kernel/scheduler"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/kernel/t_api"
"github.com/resonatehq/resonate/internal/util"
"github.com/resonatehq/resonate/pkg/promise"
)

func RejectPromise(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine("RejectPromise", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
func RejectPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] {
return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) {
if req.RejectPromise.Value.Headers == nil {
req.RejectPromise.Value.Headers = map[string]string{}
}
Expand Down Expand Up @@ -63,7 +64,7 @@ func RejectPromise(req *t_api.Request, res func(*t_api.Response, error)) *schedu

if p.State == promise.Pending {
if c.Time() >= p.Timeout {
c.Scheduler.Add(TimeoutPromise(p, RejectPromise(req, res), func(err error) {
c.Scheduler.Add(TimeoutPromise(metadata, p, RejectPromise(metadata, req, res), func(err error) {
if err != nil {
slog.Error("failed to timeout promise", "req", req, "err", err)
res(nil, err)
Expand Down Expand Up @@ -158,7 +159,7 @@ func RejectPromise(req *t_api.Request, res func(*t_api.Response, error)) *schedu
},
}, nil)
} else {
c.Scheduler.Add(RejectPromise(req, res))
c.Scheduler.Add(RejectPromise(metadata, req, res))
}
}
} else {
Expand Down
Loading

0 comments on commit 5d62744

Please sign in to comment.