From 5d62744df8e5462c124461b47ec4dc65df00b1c6 Mon Sep 17 00:00:00 2001 From: David Farr Date: Fri, 20 Oct 2023 11:43:12 -0700 Subject: [PATCH] Add metadata to sqes, cqes, and coroutines --- go.mod | 1 + go.sum | 2 + internal/aio/aio.go | 5 +- internal/aio/aio_dst.go | 5 +- internal/api/api.go | 5 +- internal/app/coroutines/cancelPromise.go | 9 +- internal/app/coroutines/createPromise.go | 9 +- internal/app/coroutines/createSubscription.go | 7 +- internal/app/coroutines/deleteSubscription.go | 5 +- internal/app/coroutines/echo.go | 5 +- .../app/coroutines/notifiySubscriptions.go | 17 +- internal/app/coroutines/readPromise.go | 7 +- internal/app/coroutines/readSubscriptions.go | 5 +- internal/app/coroutines/rejectPromise.go | 9 +- internal/app/coroutines/resolvePromise.go | 9 +- internal/app/coroutines/searchPromises.go | 5 +- internal/app/coroutines/timeoutPromise.go | 5 +- internal/app/coroutines/timeoutPromises.go | 9 +- internal/app/subsystems/aio/echo/echo.go | 2 +- .../app/subsystems/aio/network/network.go | 2 +- .../app/subsystems/aio/network/network_dst.go | 2 +- internal/app/subsystems/aio/store/store.go | 2 +- .../app/subsystems/api/grpc/api/promise.pb.go | 347 ++++++++++-------- .../app/subsystems/api/grpc/api/promise.proto | 6 + internal/app/subsystems/api/grpc/grpc.go | 21 +- internal/app/subsystems/api/http/promise.go | 24 +- .../app/subsystems/api/service/request.go | 8 + .../app/subsystems/api/service/service.go | 45 ++- .../subsystems/api/service/service_test.go | 7 +- internal/kernel/bus/bus.go | 9 +- internal/kernel/metadata/metadata.go | 36 ++ internal/kernel/scheduler/coroutine.go | 38 +- internal/kernel/scheduler/scheduler.go | 29 +- internal/kernel/system/system.go | 19 +- internal/kernel/t_api/api.go | 25 ++ test/dst/dst.go | 14 +- test/system/system_test.go | 14 +- 37 files changed, 503 insertions(+), 266 deletions(-) create mode 100644 internal/kernel/metadata/metadata.go diff --git a/go.mod b/go.mod index ceefac4e..b13db319 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/gin-gonic/gin v1.9.1 github.com/golang-jwt/jwt v3.2.2+incompatible + github.com/google/uuid v1.3.1 github.com/lib/pq v1.10.9 github.com/mattn/go-sqlite3 v1.14.17 github.com/mitchellh/mapstructure v1.5.0 diff --git a/go.sum b/go.sum index 6434a4f8..58a0c04d 100644 --- a/go.sum +++ b/go.sum @@ -153,6 +153,8 @@ github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= diff --git a/internal/aio/aio.go b/internal/aio/aio.go index 6fdf4f79..7a4f554b 100644 --- a/internal/aio/aio.go +++ b/internal/aio/aio.go @@ -3,7 +3,6 @@ package aio import ( "fmt" "log/slog" - "strings" "github.com/resonatehq/resonate/internal/kernel/t_aio" @@ -109,7 +108,7 @@ func (a *aio) Enqueue(sqe *bus.SQE[t_aio.Submission, t_aio.Completion]) { select { case subsystem.sq <- sqe: slog.Debug("aio:enqueue", "sqe", sqe) - a.metrics.AioInFlight.WithLabelValues(strings.Split(sqe.Tags, ",")...).Inc() + a.metrics.AioInFlight.WithLabelValues(sqe.Metadata.Tags.Split("aio")...).Inc() default: sqe.Callback(nil, fmt.Errorf("aio:subsystem:%s submission queue full", subsystem)) } @@ -137,7 +136,7 @@ func (a *aio) Dequeue(n int) []*bus.CQE[t_aio.Submission, t_aio.Completion] { status = "success" } - tags := strings.Split(cqe.Tags, ",") + tags := cqe.Metadata.Tags.Split("aio") a.metrics.AioTotal.WithLabelValues(append(tags, status)...).Inc() a.metrics.AioInFlight.WithLabelValues(tags...).Dec() diff --git a/internal/aio/aio_dst.go b/internal/aio/aio_dst.go index 23e20941..bd7c525f 100644 --- a/internal/aio/aio_dst.go +++ b/internal/aio/aio_dst.go @@ -4,7 +4,6 @@ import ( "fmt" "log/slog" "math/rand" // nosemgrep - "strings" "github.com/resonatehq/resonate/internal/kernel/t_aio" "github.com/resonatehq/resonate/internal/metrics" @@ -57,7 +56,7 @@ func (a *aioDST) Shutdown() {} func (a *aioDST) Enqueue(sqe *bus.SQE[t_aio.Submission, t_aio.Completion]) { slog.Debug("aio:enqueue", "sqe", sqe) - a.metrics.AioInFlight.WithLabelValues(strings.Split(sqe.Tags, ",")...).Inc() + a.metrics.AioInFlight.WithLabelValues(sqe.Metadata.Tags.Split("aio")...).Inc() i := a.r.Intn(len(a.sqes) + 1) a.sqes = append(a.sqes[:i], append([]*bus.SQE[t_aio.Submission, t_aio.Completion]{sqe}, a.sqes[i:]...)...) @@ -77,7 +76,7 @@ func (a *aioDST) Dequeue(n int) []*bus.CQE[t_aio.Submission, t_aio.Completion] { status = "success" } - tags := strings.Split(cqe.Tags, ",") + tags := cqe.Metadata.Tags.Split("aio") a.metrics.AioTotal.WithLabelValues(append(tags, status)...).Inc() a.metrics.AioInFlight.WithLabelValues(tags...).Dec() } diff --git a/internal/api/api.go b/internal/api/api.go index 46d7c750..f288e4d6 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -3,7 +3,6 @@ package api import ( "fmt" "strconv" - "strings" "time" "log/slog" @@ -11,7 +10,6 @@ import ( "github.com/resonatehq/resonate/internal/kernel/bus" "github.com/resonatehq/resonate/internal/kernel/t_api" "github.com/resonatehq/resonate/internal/metrics" - "github.com/resonatehq/resonate/internal/util" ) type API interface { @@ -75,7 +73,7 @@ func (a *api) Errors() <-chan error { } func (a *api) Enqueue(sqe *bus.SQE[t_api.Request, t_api.Response]) { - tags := strings.Split(sqe.Tags, ",") + tags := sqe.Metadata.Tags.Split("api") a.metrics.ApiInFlight.WithLabelValues(tags...).Inc() // replace sqe.Callback with a callback that wraps the original @@ -133,7 +131,6 @@ func (a *api) Enqueue(sqe *bus.SQE[t_api.Request, t_api.Response]) { } func (a *api) Dequeue(n int, timeoutCh <-chan time.Time) []*bus.SQE[t_api.Request, t_api.Response] { - util.Assert(n > 0, "submission batch size must be greater than 0") sqes := []*bus.SQE[t_api.Request, t_api.Response]{} if timeoutCh != nil { diff --git a/internal/app/coroutines/cancelPromise.go b/internal/app/coroutines/cancelPromise.go index 2024fe6f..95337c88 100644 --- a/internal/app/coroutines/cancelPromise.go +++ b/internal/app/coroutines/cancelPromise.go @@ -3,6 +3,7 @@ package coroutines import ( "log/slog" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/scheduler" "github.com/resonatehq/resonate/internal/kernel/t_aio" "github.com/resonatehq/resonate/internal/kernel/t_api" @@ -10,8 +11,8 @@ import ( "github.com/resonatehq/resonate/pkg/promise" ) -func CancelPromise(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { - return scheduler.NewCoroutine("CancelPromise", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { +func CancelPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { + return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { if req.CancelPromise.Value.Headers == nil { req.CancelPromise.Value.Headers = map[string]string{} } @@ -63,7 +64,7 @@ func CancelPromise(req *t_api.Request, res func(*t_api.Response, error)) *schedu if p.State == promise.Pending { if c.Time() >= p.Timeout { - c.Scheduler.Add(TimeoutPromise(p, CancelPromise(req, res), func(err error) { + c.Scheduler.Add(TimeoutPromise(metadata, p, CancelPromise(metadata, req, res), func(err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(nil, err) @@ -158,7 +159,7 @@ func CancelPromise(req *t_api.Request, res func(*t_api.Response, error)) *schedu }, }, nil) } else { - c.Scheduler.Add(CancelPromise(req, res)) + c.Scheduler.Add(CancelPromise(metadata, req, res)) } } } else { diff --git a/internal/app/coroutines/createPromise.go b/internal/app/coroutines/createPromise.go index b78c2fb8..d71887ff 100644 --- a/internal/app/coroutines/createPromise.go +++ b/internal/app/coroutines/createPromise.go @@ -3,6 +3,7 @@ package coroutines import ( "log/slog" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/scheduler" "github.com/resonatehq/resonate/internal/kernel/t_aio" "github.com/resonatehq/resonate/internal/kernel/t_api" @@ -10,8 +11,8 @@ import ( "github.com/resonatehq/resonate/pkg/promise" ) -func CreatePromise(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { - return scheduler.NewCoroutine("CreatePromise", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { +func CreatePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { + return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { if req.CreatePromise.Param.Headers == nil { req.CreatePromise.Param.Headers = map[string]string{} } @@ -100,7 +101,7 @@ func CreatePromise(req *t_api.Request, res func(*t_api.Response, error)) *schedu }, }, nil) } else { - c.Scheduler.Add(CreatePromise(req, res)) + c.Scheduler.Add(CreatePromise(metadata, req, res)) } } else { p, err := result.Records[0].Promise() @@ -118,7 +119,7 @@ func CreatePromise(req *t_api.Request, res func(*t_api.Response, error)) *schedu } if p.State == promise.Pending && c.Time() >= p.Timeout { - c.Scheduler.Add(TimeoutPromise(p, CreatePromise(req, res), func(err error) { + c.Scheduler.Add(TimeoutPromise(metadata, p, CreatePromise(metadata, req, res), func(err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(nil, err) diff --git a/internal/app/coroutines/createSubscription.go b/internal/app/coroutines/createSubscription.go index 9020f407..753fe152 100644 --- a/internal/app/coroutines/createSubscription.go +++ b/internal/app/coroutines/createSubscription.go @@ -3,6 +3,7 @@ package coroutines import ( "log/slog" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/scheduler" "github.com/resonatehq/resonate/internal/kernel/t_aio" "github.com/resonatehq/resonate/internal/kernel/t_api" @@ -10,8 +11,8 @@ import ( "github.com/resonatehq/resonate/pkg/subscription" ) -func CreateSubscription(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { - return scheduler.NewCoroutine("CreateSubscription", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { +func CreateSubscription(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { + return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { // default retry policy if req.CreateSubscription.RetryPolicy == nil { req.CreateSubscription.RetryPolicy = &subscription.RetryPolicy{ @@ -111,7 +112,7 @@ func CreateSubscription(req *t_api.Request, res func(*t_api.Response, error)) *s }, }, nil) } else { - c.Scheduler.Add(CreateSubscription(req, res)) + c.Scheduler.Add(CreateSubscription(metadata, req, res)) } } }) diff --git a/internal/app/coroutines/deleteSubscription.go b/internal/app/coroutines/deleteSubscription.go index 1d8de8d4..e41e18ac 100644 --- a/internal/app/coroutines/deleteSubscription.go +++ b/internal/app/coroutines/deleteSubscription.go @@ -3,14 +3,15 @@ package coroutines import ( "log/slog" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/scheduler" "github.com/resonatehq/resonate/internal/kernel/t_aio" "github.com/resonatehq/resonate/internal/kernel/t_api" "github.com/resonatehq/resonate/internal/util" ) -func DeleteSubscription(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { - return scheduler.NewCoroutine("DeleteSubscription", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { +func DeleteSubscription(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { + return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { completion, err := c.Yield(&t_aio.Submission{ Kind: t_aio.Store, Store: &t_aio.StoreSubmission{ diff --git a/internal/app/coroutines/echo.go b/internal/app/coroutines/echo.go index 5528f1e0..9ee42dba 100644 --- a/internal/app/coroutines/echo.go +++ b/internal/app/coroutines/echo.go @@ -1,13 +1,14 @@ package coroutines import ( + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/scheduler" "github.com/resonatehq/resonate/internal/kernel/t_aio" "github.com/resonatehq/resonate/internal/kernel/t_api" ) -func Echo(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { - return scheduler.NewCoroutine("Echo", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { +func Echo(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { + return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { submission := &t_aio.Submission{ Kind: t_aio.Echo, Echo: &t_aio.EchoSubmission{ diff --git a/internal/app/coroutines/notifiySubscriptions.go b/internal/app/coroutines/notifiySubscriptions.go index 5c07c1c0..cb2a97f1 100644 --- a/internal/app/coroutines/notifiySubscriptions.go +++ b/internal/app/coroutines/notifiySubscriptions.go @@ -7,6 +7,7 @@ import ( "math" "net/http" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/scheduler" "github.com/resonatehq/resonate/internal/kernel/system" "github.com/resonatehq/resonate/internal/kernel/t_aio" @@ -30,8 +31,11 @@ func (i inflight) remove(id string) { delete(i, id) } -func NotifySubscriptions(config *system.Config) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { - return scheduler.NewCoroutine("NotifySubscriptions", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { +func NotifySubscriptions(t int64, config *system.Config) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { + metadata := metadata.New(fmt.Sprintf("tick:%d:notify", t)) + metadata.Tags.Set("name", "notify-subscriptions") + + return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { completion, err := c.Yield(&t_aio.Submission{ Kind: t_aio.Store, Store: &t_aio.StoreSubmission{ @@ -64,14 +68,17 @@ func NotifySubscriptions(config *system.Config) *scheduler.Coroutine[*t_aio.Comp } if c.Time() >= record.Time && !inflights.get(id(notification)) { - c.Scheduler.Add(notifySubscription(notification)) + c.Scheduler.Add(notifySubscription(metadata.TransactionId, notification)) } } }) } -func notifySubscription(notification *notification.Notification) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { - return scheduler.NewCoroutine("NotifySubscription", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { +func notifySubscription(tid string, notification *notification.Notification) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { + metadata := metadata.New(tid) + metadata.Tags.Set("name", "notify-subscription") + + return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { // handle inflight cache inflights.add(id(notification)) c.OnDone(func() { inflights.remove(id(notification)) }) diff --git a/internal/app/coroutines/readPromise.go b/internal/app/coroutines/readPromise.go index 7d003373..dd37fc81 100644 --- a/internal/app/coroutines/readPromise.go +++ b/internal/app/coroutines/readPromise.go @@ -3,6 +3,7 @@ package coroutines import ( "log/slog" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/scheduler" "github.com/resonatehq/resonate/internal/kernel/t_aio" "github.com/resonatehq/resonate/internal/kernel/t_api" @@ -10,8 +11,8 @@ import ( "github.com/resonatehq/resonate/pkg/promise" ) -func ReadPromise(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { - return scheduler.NewCoroutine("ReadPromise", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { +func ReadPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { + return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { completion, err := c.Yield(&t_aio.Submission{ Kind: t_aio.Store, Store: &t_aio.StoreSubmission{ @@ -55,7 +56,7 @@ func ReadPromise(req *t_api.Request, res func(*t_api.Response, error)) *schedule } if p.State == promise.Pending && c.Time() >= p.Timeout { - c.Scheduler.Add(TimeoutPromise(p, ReadPromise(req, res), func(err error) { + c.Scheduler.Add(TimeoutPromise(metadata, p, ReadPromise(metadata, req, res), func(err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(nil, err) diff --git a/internal/app/coroutines/readSubscriptions.go b/internal/app/coroutines/readSubscriptions.go index 808943c3..46383196 100644 --- a/internal/app/coroutines/readSubscriptions.go +++ b/internal/app/coroutines/readSubscriptions.go @@ -3,6 +3,7 @@ package coroutines import ( "log/slog" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/scheduler" "github.com/resonatehq/resonate/internal/kernel/t_aio" "github.com/resonatehq/resonate/internal/kernel/t_api" @@ -10,8 +11,8 @@ import ( "github.com/resonatehq/resonate/pkg/subscription" ) -func ReadSubscriptions(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { - return scheduler.NewCoroutine("ReadSubscriptions", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { +func ReadSubscriptions(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { + return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { completion, err := c.Yield(&t_aio.Submission{ Kind: t_aio.Store, Store: &t_aio.StoreSubmission{ diff --git a/internal/app/coroutines/rejectPromise.go b/internal/app/coroutines/rejectPromise.go index d3d18646..942483ba 100644 --- a/internal/app/coroutines/rejectPromise.go +++ b/internal/app/coroutines/rejectPromise.go @@ -3,6 +3,7 @@ package coroutines import ( "log/slog" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/scheduler" "github.com/resonatehq/resonate/internal/kernel/t_aio" "github.com/resonatehq/resonate/internal/kernel/t_api" @@ -10,8 +11,8 @@ import ( "github.com/resonatehq/resonate/pkg/promise" ) -func RejectPromise(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { - return scheduler.NewCoroutine("RejectPromise", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { +func RejectPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { + return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { if req.RejectPromise.Value.Headers == nil { req.RejectPromise.Value.Headers = map[string]string{} } @@ -63,7 +64,7 @@ func RejectPromise(req *t_api.Request, res func(*t_api.Response, error)) *schedu if p.State == promise.Pending { if c.Time() >= p.Timeout { - c.Scheduler.Add(TimeoutPromise(p, RejectPromise(req, res), func(err error) { + c.Scheduler.Add(TimeoutPromise(metadata, p, RejectPromise(metadata, req, res), func(err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(nil, err) @@ -158,7 +159,7 @@ func RejectPromise(req *t_api.Request, res func(*t_api.Response, error)) *schedu }, }, nil) } else { - c.Scheduler.Add(RejectPromise(req, res)) + c.Scheduler.Add(RejectPromise(metadata, req, res)) } } } else { diff --git a/internal/app/coroutines/resolvePromise.go b/internal/app/coroutines/resolvePromise.go index 10973c70..cc320b9f 100644 --- a/internal/app/coroutines/resolvePromise.go +++ b/internal/app/coroutines/resolvePromise.go @@ -3,6 +3,7 @@ package coroutines import ( "log/slog" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/scheduler" "github.com/resonatehq/resonate/internal/kernel/t_aio" "github.com/resonatehq/resonate/internal/kernel/t_api" @@ -10,8 +11,8 @@ import ( "github.com/resonatehq/resonate/pkg/promise" ) -func ResolvePromise(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { - return scheduler.NewCoroutine("ResolvePromise", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { +func ResolvePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { + return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { if req.ResolvePromise.Value.Headers == nil { req.ResolvePromise.Value.Headers = map[string]string{} } @@ -63,7 +64,7 @@ func ResolvePromise(req *t_api.Request, res func(*t_api.Response, error)) *sched if p.State == promise.Pending { if c.Time() >= p.Timeout { - c.Scheduler.Add(TimeoutPromise(p, ResolvePromise(req, res), func(err error) { + c.Scheduler.Add(TimeoutPromise(metadata, p, ResolvePromise(metadata, req, res), func(err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(nil, err) @@ -157,7 +158,7 @@ func ResolvePromise(req *t_api.Request, res func(*t_api.Response, error)) *sched }, }, nil) } else { - c.Scheduler.Add(ResolvePromise(req, res)) + c.Scheduler.Add(ResolvePromise(metadata, req, res)) } } } else { diff --git a/internal/app/coroutines/searchPromises.go b/internal/app/coroutines/searchPromises.go index 0729604e..885a3e9a 100644 --- a/internal/app/coroutines/searchPromises.go +++ b/internal/app/coroutines/searchPromises.go @@ -3,6 +3,7 @@ package coroutines import ( "log/slog" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/scheduler" "github.com/resonatehq/resonate/internal/kernel/t_aio" "github.com/resonatehq/resonate/internal/kernel/t_api" @@ -10,8 +11,8 @@ import ( "github.com/resonatehq/resonate/pkg/promise" ) -func SearchPromises(req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { - return scheduler.NewCoroutine("SearchPromises", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { +func SearchPromises(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { + return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { util.Assert(req.SearchPromises.Q != "", "query must not be empty") util.Assert(req.SearchPromises.Limit > 0, "limit must be greater than zero") diff --git a/internal/app/coroutines/timeoutPromise.go b/internal/app/coroutines/timeoutPromise.go index 1c66e165..f5f70b24 100644 --- a/internal/app/coroutines/timeoutPromise.go +++ b/internal/app/coroutines/timeoutPromise.go @@ -3,14 +3,15 @@ package coroutines import ( "log/slog" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/scheduler" "github.com/resonatehq/resonate/internal/kernel/t_aio" "github.com/resonatehq/resonate/internal/util" "github.com/resonatehq/resonate/pkg/promise" ) -func TimeoutPromise(p *promise.Promise, retry *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission], res func(error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { - return scheduler.NewCoroutine("TimeoutPromise", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { +func TimeoutPromise(metadata *metadata.Metadata, p *promise.Promise, retry *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission], res func(error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { + return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { completion, err := c.Yield(&t_aio.Submission{ Kind: t_aio.Store, Store: &t_aio.StoreSubmission{ diff --git a/internal/app/coroutines/timeoutPromises.go b/internal/app/coroutines/timeoutPromises.go index b0d732de..c46993fd 100644 --- a/internal/app/coroutines/timeoutPromises.go +++ b/internal/app/coroutines/timeoutPromises.go @@ -1,16 +1,21 @@ package coroutines import ( + "fmt" "log/slog" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/scheduler" "github.com/resonatehq/resonate/internal/kernel/system" "github.com/resonatehq/resonate/internal/kernel/t_aio" "github.com/resonatehq/resonate/internal/util" ) -func TimeoutPromises(config *system.Config) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { - return scheduler.NewCoroutine("TimeoutPromises", func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { +func TimeoutPromises(t int64, config *system.Config) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { + metadata := metadata.New(fmt.Sprintf("tick:%d:timeout", t)) + metadata.Tags.Set("name", "timeout-promises") + + return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { completion, err := c.Yield(&t_aio.Submission{ Kind: t_aio.Store, Store: &t_aio.StoreSubmission{ diff --git a/internal/app/subsystems/aio/echo/echo.go b/internal/app/subsystems/aio/echo/echo.go index 9ac8044e..522ff20b 100644 --- a/internal/app/subsystems/aio/echo/echo.go +++ b/internal/app/subsystems/aio/echo/echo.go @@ -39,7 +39,7 @@ func (d *EchoDevice) Process(sqes []*bus.SQE[t_aio.Submission, t_aio.Completion] for i, sqe := range sqes { cqes[i] = &bus.CQE[t_aio.Submission, t_aio.Completion]{ - Tags: sqe.Tags, + Metadata: sqe.Metadata, Completion: &t_aio.Completion{ Echo: &t_aio.EchoCompletion{ Data: sqe.Submission.Echo.Data, diff --git a/internal/app/subsystems/aio/network/network.go b/internal/app/subsystems/aio/network/network.go index 14ad65db..7e1b99cb 100644 --- a/internal/app/subsystems/aio/network/network.go +++ b/internal/app/subsystems/aio/network/network.go @@ -62,7 +62,7 @@ func (d *NetworkDevice) Process(sqes []*bus.SQE[t_aio.Submission, t_aio.Completi switch sqe.Submission.Network.Kind { case t_aio.Http: cqe := &bus.CQE[t_aio.Submission, t_aio.Completion]{ - Tags: sqe.Tags, + Metadata: sqe.Metadata, Callback: sqe.Callback, } diff --git a/internal/app/subsystems/aio/network/network_dst.go b/internal/app/subsystems/aio/network/network_dst.go index 6e1d1739..08f5d191 100644 --- a/internal/app/subsystems/aio/network/network_dst.go +++ b/internal/app/subsystems/aio/network/network_dst.go @@ -59,7 +59,7 @@ func (d *NetworkDSTDevice) Process(sqes []*bus.SQE[t_aio.Submission, t_aio.Compl switch sqe.Submission.Network.Kind { case t_aio.Http: cqe := &bus.CQE[t_aio.Submission, t_aio.Completion]{ - Tags: sqe.Tags, + Metadata: sqe.Metadata, Callback: sqe.Callback, } diff --git a/internal/app/subsystems/aio/store/store.go b/internal/app/subsystems/aio/store/store.go index 2021c052..de27ee62 100644 --- a/internal/app/subsystems/aio/store/store.go +++ b/internal/app/subsystems/aio/store/store.go @@ -26,7 +26,7 @@ func Process(store Store, sqes []*bus.SQE[t_aio.Submission, t_aio.Completion]) [ for i, sqe := range sqes { cqe := &bus.CQE[t_aio.Submission, t_aio.Completion]{ - Tags: sqe.Tags, + Metadata: sqe.Metadata, Callback: sqe.Callback, } diff --git a/internal/app/subsystems/api/grpc/api/promise.pb.go b/internal/app/subsystems/api/grpc/api/promise.pb.go index aab690bf..d4107182 100644 --- a/internal/app/subsystems/api/grpc/api/promise.pb.go +++ b/internal/app/subsystems/api/grpc/api/promise.pb.go @@ -343,7 +343,8 @@ type ReadPromiseRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + RequestId string `protobuf:"bytes,2,opt,name=requestId,proto3" json:"requestId,omitempty"` } func (x *ReadPromiseRequest) Reset() { @@ -385,6 +386,13 @@ func (x *ReadPromiseRequest) GetId() string { return "" } +func (x *ReadPromiseRequest) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" +} + type ReadPromiseResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -445,10 +453,11 @@ type SearchPromisesRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Q string `protobuf:"bytes,1,opt,name=q,proto3" json:"q,omitempty"` - State SearchState `protobuf:"varint,2,opt,name=state,proto3,enum=promise.SearchState" json:"state,omitempty"` - Limit int32 `protobuf:"varint,3,opt,name=limit,proto3" json:"limit,omitempty"` - Cursor string `protobuf:"bytes,4,opt,name=cursor,proto3" json:"cursor,omitempty"` + Q string `protobuf:"bytes,1,opt,name=q,proto3" json:"q,omitempty"` + State SearchState `protobuf:"varint,2,opt,name=state,proto3,enum=promise.SearchState" json:"state,omitempty"` + Limit int32 `protobuf:"varint,3,opt,name=limit,proto3" json:"limit,omitempty"` + Cursor string `protobuf:"bytes,4,opt,name=cursor,proto3" json:"cursor,omitempty"` + RequestId string `protobuf:"bytes,5,opt,name=requestId,proto3" json:"requestId,omitempty"` } func (x *SearchPromisesRequest) Reset() { @@ -511,6 +520,13 @@ func (x *SearchPromisesRequest) GetCursor() string { return "" } +func (x *SearchPromisesRequest) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" +} + type SearchPromisesResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -584,6 +600,7 @@ type CreatePromiseRequest struct { Strict bool `protobuf:"varint,3,opt,name=strict,proto3" json:"strict,omitempty"` Param *Value `protobuf:"bytes,4,opt,name=param,proto3" json:"param,omitempty"` Timeout int64 `protobuf:"varint,5,opt,name=timeout,proto3" json:"timeout,omitempty"` + RequestId string `protobuf:"bytes,6,opt,name=requestId,proto3" json:"requestId,omitempty"` } func (x *CreatePromiseRequest) Reset() { @@ -653,6 +670,13 @@ func (x *CreatePromiseRequest) GetTimeout() int64 { return 0 } +func (x *CreatePromiseRequest) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" +} + type CreatePromiseResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -717,6 +741,7 @@ type CancelPromiseRequest struct { IdempotencyKey string `protobuf:"bytes,2,opt,name=idempotencyKey,proto3" json:"idempotencyKey,omitempty"` Strict bool `protobuf:"varint,3,opt,name=strict,proto3" json:"strict,omitempty"` Value *Value `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"` + RequestId string `protobuf:"bytes,5,opt,name=requestId,proto3" json:"requestId,omitempty"` } func (x *CancelPromiseRequest) Reset() { @@ -779,6 +804,13 @@ func (x *CancelPromiseRequest) GetValue() *Value { return nil } +func (x *CancelPromiseRequest) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" +} + type CancelPromiseResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -843,6 +875,7 @@ type ResolvePromiseRequest struct { IdempotencyKey string `protobuf:"bytes,2,opt,name=idempotencyKey,proto3" json:"idempotencyKey,omitempty"` Strict bool `protobuf:"varint,3,opt,name=strict,proto3" json:"strict,omitempty"` Value *Value `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"` + RequestId string `protobuf:"bytes,5,opt,name=requestId,proto3" json:"requestId,omitempty"` } func (x *ResolvePromiseRequest) Reset() { @@ -905,6 +938,13 @@ func (x *ResolvePromiseRequest) GetValue() *Value { return nil } +func (x *ResolvePromiseRequest) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" +} + type ResolvePromiseResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -969,6 +1009,7 @@ type RejectPromiseRequest struct { IdempotencyKey string `protobuf:"bytes,2,opt,name=idempotencyKey,proto3" json:"idempotencyKey,omitempty"` Strict bool `protobuf:"varint,3,opt,name=strict,proto3" json:"strict,omitempty"` Value *Value `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"` + RequestId string `protobuf:"bytes,5,opt,name=requestId,proto3" json:"requestId,omitempty"` } func (x *RejectPromiseRequest) Reset() { @@ -1031,6 +1072,13 @@ func (x *RejectPromiseRequest) GetValue() *Value { return nil } +func (x *RejectPromiseRequest) GetRequestId() string { + if x != nil { + return x.RequestId + } + return "" +} + type RejectPromiseResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1119,154 +1167,165 @@ var file_internal_app_subsystems_api_grpc_api_promise_proto_rawDesc = []byte{ 0x74, 0x61, 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x24, + 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x42, 0x0a, 0x12, 0x52, 0x65, 0x61, 0x64, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x02, 0x69, 0x64, 0x22, 0x6a, 0x0a, 0x13, 0x52, 0x65, 0x61, 0x64, 0x50, 0x72, 0x6f, 0x6d, + 0x52, 0x02, 0x69, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x49, 0x64, 0x22, 0x6a, 0x0a, 0x13, 0x52, 0x65, 0x61, 0x64, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, + 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x27, 0x0a, 0x06, 0x73, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x6d, + 0x69, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x12, 0x2a, 0x0a, 0x07, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x50, 0x72, + 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x07, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x22, 0x9d, + 0x01, 0x0a, 0x15, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0c, 0x0a, 0x01, 0x71, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x01, 0x71, 0x12, 0x2a, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, + 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, + 0x74, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x75, 0x72, 0x73, + 0x6f, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, + 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x18, 0x05, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x22, 0x87, + 0x01, 0x0a, 0x16, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x27, 0x0a, 0x06, 0x73, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x6d, + 0x69, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x2c, 0x0a, 0x08, 0x70, 0x72, + 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x70, + 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x08, + 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x73, 0x22, 0xc4, 0x01, 0x0a, 0x14, 0x43, 0x72, 0x65, + 0x61, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, + 0x64, 0x12, 0x26, 0x0a, 0x0e, 0x69, 0x64, 0x65, 0x6d, 0x70, 0x6f, 0x74, 0x65, 0x6e, 0x63, 0x79, + 0x4b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x69, 0x64, 0x65, 0x6d, 0x70, + 0x6f, 0x74, 0x65, 0x6e, 0x63, 0x79, 0x4b, 0x65, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x72, + 0x69, 0x63, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x73, 0x74, 0x72, 0x69, 0x63, + 0x74, 0x12, 0x24, 0x0a, 0x05, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x0e, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x75, 0x65, + 0x52, 0x05, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, + 0x75, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, + 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x18, 0x06, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x22, + 0x6c, 0x0a, 0x15, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x27, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, + 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x12, 0x2a, 0x0a, 0x07, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x50, 0x72, 0x6f, + 0x6d, 0x69, 0x73, 0x65, 0x52, 0x07, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x22, 0xaa, 0x01, + 0x0a, 0x14, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x26, 0x0a, 0x0e, 0x69, 0x64, 0x65, 0x6d, 0x70, 0x6f, + 0x74, 0x65, 0x6e, 0x63, 0x79, 0x4b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, + 0x69, 0x64, 0x65, 0x6d, 0x70, 0x6f, 0x74, 0x65, 0x6e, 0x63, 0x79, 0x4b, 0x65, 0x79, 0x12, 0x16, + 0x0a, 0x06, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, + 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x12, 0x24, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1c, 0x0a, 0x09, + 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x22, 0x6c, 0x0a, 0x15, 0x43, 0x61, + 0x6e, 0x63, 0x65, 0x6c, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x27, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x2a, 0x0a, 0x07, + 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, + 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, + 0x07, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x22, 0xab, 0x01, 0x0a, 0x15, 0x52, 0x65, 0x73, + 0x6f, 0x6c, 0x76, 0x65, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, + 0x69, 0x64, 0x12, 0x26, 0x0a, 0x0e, 0x69, 0x64, 0x65, 0x6d, 0x70, 0x6f, 0x74, 0x65, 0x6e, 0x63, + 0x79, 0x4b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x69, 0x64, 0x65, 0x6d, + 0x70, 0x6f, 0x74, 0x65, 0x6e, 0x63, 0x79, 0x4b, 0x65, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, + 0x72, 0x69, 0x63, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x73, 0x74, 0x72, 0x69, + 0x63, 0x74, 0x12, 0x24, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x0e, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x75, + 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x49, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x22, 0x6d, 0x0a, 0x16, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, + 0x65, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x27, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, + 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x2a, 0x0a, 0x07, 0x70, 0x72, 0x6f, + 0x6d, 0x69, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x70, 0x72, 0x6f, + 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x07, 0x70, 0x72, + 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x22, 0xaa, 0x01, 0x0a, 0x14, 0x52, 0x65, 0x6a, 0x65, 0x63, 0x74, + 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x26, + 0x0a, 0x0e, 0x69, 0x64, 0x65, 0x6d, 0x70, 0x6f, 0x74, 0x65, 0x6e, 0x63, 0x79, 0x4b, 0x65, 0x79, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x69, 0x64, 0x65, 0x6d, 0x70, 0x6f, 0x74, 0x65, + 0x6e, 0x63, 0x79, 0x4b, 0x65, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x12, 0x24, + 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, + 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, + 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x49, 0x64, 0x22, 0x6c, 0x0a, 0x15, 0x52, 0x65, 0x6a, 0x65, 0x63, 0x74, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x27, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x2a, 0x0a, 0x07, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x07, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, - 0x22, 0x7f, 0x0a, 0x15, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, - 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0c, 0x0a, 0x01, 0x71, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x01, 0x71, 0x12, 0x2a, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, - 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, - 0x61, 0x74, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x75, 0x72, - 0x73, 0x6f, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, - 0x72, 0x22, 0x87, 0x01, 0x0a, 0x16, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x50, 0x72, 0x6f, 0x6d, - 0x69, 0x73, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x27, 0x0a, 0x06, - 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x70, - 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x2c, 0x0a, - 0x08, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x10, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, - 0x65, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x73, 0x22, 0xa6, 0x01, 0x0a, 0x14, - 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x02, 0x69, 0x64, 0x12, 0x26, 0x0a, 0x0e, 0x69, 0x64, 0x65, 0x6d, 0x70, 0x6f, 0x74, 0x65, - 0x6e, 0x63, 0x79, 0x4b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x69, 0x64, - 0x65, 0x6d, 0x70, 0x6f, 0x74, 0x65, 0x6e, 0x63, 0x79, 0x4b, 0x65, 0x79, 0x12, 0x16, 0x0a, 0x06, - 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x73, 0x74, - 0x72, 0x69, 0x63, 0x74, 0x12, 0x24, 0x0a, 0x05, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x56, 0x61, - 0x6c, 0x75, 0x65, 0x52, 0x05, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, - 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, 0x6d, - 0x65, 0x6f, 0x75, 0x74, 0x22, 0x6c, 0x0a, 0x15, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x72, - 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x27, 0x0a, - 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, - 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, - 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x2a, 0x0a, 0x07, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, - 0x65, 0x2e, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x07, 0x70, 0x72, 0x6f, 0x6d, 0x69, - 0x73, 0x65, 0x22, 0x8c, 0x01, 0x0a, 0x14, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x50, 0x72, 0x6f, - 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, - 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x26, 0x0a, 0x0e, 0x69, - 0x64, 0x65, 0x6d, 0x70, 0x6f, 0x74, 0x65, 0x6e, 0x63, 0x79, 0x4b, 0x65, 0x79, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0e, 0x69, 0x64, 0x65, 0x6d, 0x70, 0x6f, 0x74, 0x65, 0x6e, 0x63, 0x79, - 0x4b, 0x65, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x08, 0x52, 0x06, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x12, 0x24, 0x0a, 0x05, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x70, 0x72, 0x6f, - 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x22, 0x6c, 0x0a, 0x15, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x50, 0x72, 0x6f, 0x6d, 0x69, - 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x27, 0x0a, 0x06, 0x73, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, - 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x12, 0x2a, 0x0a, 0x07, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x50, - 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x07, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x22, - 0x8d, 0x01, 0x0a, 0x15, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x50, 0x72, 0x6f, 0x6d, 0x69, - 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x26, 0x0a, 0x0e, 0x69, 0x64, 0x65, - 0x6d, 0x70, 0x6f, 0x74, 0x65, 0x6e, 0x63, 0x79, 0x4b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0e, 0x69, 0x64, 0x65, 0x6d, 0x70, 0x6f, 0x74, 0x65, 0x6e, 0x63, 0x79, 0x4b, 0x65, - 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x08, 0x52, 0x06, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x12, 0x24, 0x0a, 0x05, 0x76, 0x61, 0x6c, - 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, - 0x73, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, - 0x6d, 0x0a, 0x16, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, - 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x27, 0x0a, 0x06, 0x73, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x6d, - 0x69, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x12, 0x2a, 0x0a, 0x07, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x50, 0x72, - 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x07, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x22, 0x8c, - 0x01, 0x0a, 0x14, 0x52, 0x65, 0x6a, 0x65, 0x63, 0x74, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x26, 0x0a, 0x0e, 0x69, 0x64, 0x65, 0x6d, 0x70, - 0x6f, 0x74, 0x65, 0x6e, 0x63, 0x79, 0x4b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0e, 0x69, 0x64, 0x65, 0x6d, 0x70, 0x6f, 0x74, 0x65, 0x6e, 0x63, 0x79, 0x4b, 0x65, 0x79, 0x12, - 0x16, 0x0a, 0x06, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, - 0x06, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x12, 0x24, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, - 0x2e, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x6c, 0x0a, - 0x15, 0x52, 0x65, 0x6a, 0x65, 0x63, 0x74, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x27, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, - 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, - 0x2a, 0x0a, 0x07, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x10, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x50, 0x72, 0x6f, 0x6d, 0x69, - 0x73, 0x65, 0x52, 0x07, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2a, 0x5e, 0x0a, 0x05, 0x53, - 0x74, 0x61, 0x74, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, - 0x00, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x45, 0x53, 0x4f, 0x4c, 0x56, 0x45, 0x44, 0x10, 0x01, 0x12, - 0x0c, 0x0a, 0x08, 0x52, 0x45, 0x4a, 0x45, 0x43, 0x54, 0x45, 0x44, 0x10, 0x02, 0x12, 0x15, 0x0a, - 0x11, 0x52, 0x45, 0x4a, 0x45, 0x43, 0x54, 0x45, 0x44, 0x5f, 0x54, 0x49, 0x4d, 0x45, 0x44, 0x4f, - 0x55, 0x54, 0x10, 0x03, 0x12, 0x15, 0x0a, 0x11, 0x52, 0x45, 0x4a, 0x45, 0x43, 0x54, 0x45, 0x44, - 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x45, 0x44, 0x10, 0x04, 0x2a, 0x5b, 0x0a, 0x0b, 0x53, - 0x65, 0x61, 0x72, 0x63, 0x68, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0e, 0x0a, 0x0a, 0x53, 0x45, - 0x41, 0x52, 0x43, 0x48, 0x5f, 0x41, 0x4c, 0x4c, 0x10, 0x00, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x45, - 0x41, 0x52, 0x43, 0x48, 0x5f, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x13, - 0x0a, 0x0f, 0x53, 0x45, 0x41, 0x52, 0x43, 0x48, 0x5f, 0x52, 0x45, 0x53, 0x4f, 0x4c, 0x56, 0x45, - 0x44, 0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x45, 0x41, 0x52, 0x43, 0x48, 0x5f, 0x52, 0x45, - 0x4a, 0x45, 0x43, 0x54, 0x45, 0x44, 0x10, 0x03, 0x2a, 0x6a, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, - 0x07, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0xc8, 0x01, 0x12, 0x0c, 0x0a, 0x07, 0x43, 0x52, 0x45, 0x41, - 0x54, 0x45, 0x44, 0x10, 0xc9, 0x01, 0x12, 0x0e, 0x0a, 0x09, 0x4e, 0x4f, 0x43, 0x4f, 0x4e, 0x54, - 0x45, 0x4e, 0x54, 0x10, 0xcc, 0x01, 0x12, 0x0e, 0x0a, 0x09, 0x46, 0x4f, 0x52, 0x42, 0x49, 0x44, - 0x44, 0x45, 0x4e, 0x10, 0x93, 0x03, 0x12, 0x0d, 0x0a, 0x08, 0x4e, 0x4f, 0x54, 0x46, 0x4f, 0x55, - 0x4e, 0x44, 0x10, 0x94, 0x03, 0x12, 0x0d, 0x0a, 0x08, 0x43, 0x4f, 0x4e, 0x46, 0x4c, 0x49, 0x43, - 0x54, 0x10, 0x99, 0x03, 0x32, 0xfc, 0x03, 0x0a, 0x0e, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, - 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4a, 0x0a, 0x0b, 0x52, 0x65, 0x61, 0x64, 0x50, - 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x12, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, - 0x2e, 0x52, 0x65, 0x61, 0x64, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x52, 0x65, - 0x61, 0x64, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x12, 0x53, 0x0a, 0x0e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x50, 0x72, 0x6f, - 0x6d, 0x69, 0x73, 0x65, 0x73, 0x12, 0x1e, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, - 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x73, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, - 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x50, 0x0a, 0x0d, 0x43, 0x72, 0x65, 0x61, - 0x74, 0x65, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x12, 0x1d, 0x2e, 0x70, 0x72, 0x6f, 0x6d, - 0x69, 0x73, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, - 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, - 0x73, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x50, 0x0a, 0x0d, 0x43, 0x61, - 0x6e, 0x63, 0x65, 0x6c, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x12, 0x1d, 0x2e, 0x70, 0x72, - 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x50, 0x72, 0x6f, 0x6d, - 0x69, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x72, 0x6f, - 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x50, 0x72, 0x6f, 0x6d, 0x69, - 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x53, 0x0a, 0x0e, - 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x12, 0x1e, - 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, - 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, - 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, + 0x2a, 0x5e, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x50, 0x45, 0x4e, + 0x44, 0x49, 0x4e, 0x47, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x45, 0x53, 0x4f, 0x4c, 0x56, + 0x45, 0x44, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x45, 0x4a, 0x45, 0x43, 0x54, 0x45, 0x44, + 0x10, 0x02, 0x12, 0x15, 0x0a, 0x11, 0x52, 0x45, 0x4a, 0x45, 0x43, 0x54, 0x45, 0x44, 0x5f, 0x54, + 0x49, 0x4d, 0x45, 0x44, 0x4f, 0x55, 0x54, 0x10, 0x03, 0x12, 0x15, 0x0a, 0x11, 0x52, 0x45, 0x4a, + 0x45, 0x43, 0x54, 0x45, 0x44, 0x5f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x45, 0x44, 0x10, 0x04, + 0x2a, 0x5b, 0x0a, 0x0b, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, + 0x0e, 0x0a, 0x0a, 0x53, 0x45, 0x41, 0x52, 0x43, 0x48, 0x5f, 0x41, 0x4c, 0x4c, 0x10, 0x00, 0x12, + 0x12, 0x0a, 0x0e, 0x53, 0x45, 0x41, 0x52, 0x43, 0x48, 0x5f, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, + 0x47, 0x10, 0x01, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x45, 0x41, 0x52, 0x43, 0x48, 0x5f, 0x52, 0x45, + 0x53, 0x4f, 0x4c, 0x56, 0x45, 0x44, 0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x45, 0x41, 0x52, + 0x43, 0x48, 0x5f, 0x52, 0x45, 0x4a, 0x45, 0x43, 0x54, 0x45, 0x44, 0x10, 0x03, 0x2a, 0x6a, 0x0a, + 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, + 0x57, 0x4e, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0xc8, 0x01, 0x12, 0x0c, 0x0a, + 0x07, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0xc9, 0x01, 0x12, 0x0e, 0x0a, 0x09, 0x4e, + 0x4f, 0x43, 0x4f, 0x4e, 0x54, 0x45, 0x4e, 0x54, 0x10, 0xcc, 0x01, 0x12, 0x0e, 0x0a, 0x09, 0x46, + 0x4f, 0x52, 0x42, 0x49, 0x44, 0x44, 0x45, 0x4e, 0x10, 0x93, 0x03, 0x12, 0x0d, 0x0a, 0x08, 0x4e, + 0x4f, 0x54, 0x46, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x94, 0x03, 0x12, 0x0d, 0x0a, 0x08, 0x43, 0x4f, + 0x4e, 0x46, 0x4c, 0x49, 0x43, 0x54, 0x10, 0x99, 0x03, 0x32, 0xfc, 0x03, 0x0a, 0x0e, 0x50, 0x72, + 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4a, 0x0a, 0x0b, + 0x52, 0x65, 0x61, 0x64, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x12, 0x1b, 0x2e, 0x70, 0x72, + 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, + 0x73, 0x65, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x53, 0x0a, 0x0e, 0x53, 0x65, 0x61, 0x72, + 0x63, 0x68, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x73, 0x12, 0x1e, 0x2e, 0x70, 0x72, 0x6f, + 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x50, 0x72, 0x6f, 0x6d, 0x69, + 0x73, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x70, 0x72, 0x6f, + 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x50, 0x72, 0x6f, 0x6d, 0x69, + 0x73, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x50, 0x0a, + 0x0d, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x12, 0x1d, + 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, + 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, + 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x72, + 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, + 0x50, 0x0a, 0x0d, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, + 0x12, 0x1d, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x43, 0x61, 0x6e, 0x63, 0x65, + 0x6c, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x1e, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x00, 0x12, 0x50, 0x0a, 0x0d, 0x52, 0x65, 0x6a, 0x65, 0x63, 0x74, 0x50, 0x72, 0x6f, 0x6d, 0x69, - 0x73, 0x65, 0x12, 0x1d, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x6a, - 0x65, 0x63, 0x74, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x6a, 0x65, - 0x63, 0x74, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x42, 0x45, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x72, 0x65, 0x73, 0x6f, 0x6e, 0x61, 0x74, 0x65, 0x68, 0x71, 0x2f, 0x72, 0x65, 0x73, - 0x6f, 0x6e, 0x61, 0x74, 0x65, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x61, - 0x70, 0x70, 0x2f, 0x73, 0x75, 0x62, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x73, 0x2f, 0x61, 0x70, - 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x00, 0x12, 0x53, 0x0a, 0x0e, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x50, 0x72, 0x6f, 0x6d, + 0x69, 0x73, 0x65, 0x12, 0x1e, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x52, 0x65, + 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x2e, 0x52, 0x65, + 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x50, 0x0a, 0x0d, 0x52, 0x65, 0x6a, 0x65, 0x63, 0x74, + 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x12, 0x1d, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, + 0x65, 0x2e, 0x52, 0x65, 0x6a, 0x65, 0x63, 0x74, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, + 0x2e, 0x52, 0x65, 0x6a, 0x65, 0x63, 0x74, 0x50, 0x72, 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x45, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x65, 0x73, 0x6f, 0x6e, 0x61, 0x74, 0x65, 0x68, + 0x71, 0x2f, 0x72, 0x65, 0x73, 0x6f, 0x6e, 0x61, 0x74, 0x65, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x73, 0x75, 0x62, 0x73, 0x79, 0x73, 0x74, 0x65, + 0x6d, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x61, 0x70, 0x69, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/internal/app/subsystems/api/grpc/api/promise.proto b/internal/app/subsystems/api/grpc/api/promise.proto index b3051151..26b5233a 100644 --- a/internal/app/subsystems/api/grpc/api/promise.proto +++ b/internal/app/subsystems/api/grpc/api/promise.proto @@ -46,6 +46,7 @@ message Value { message ReadPromiseRequest { string id = 1; + string requestId = 2; } message ReadPromiseResponse { @@ -58,6 +59,7 @@ message SearchPromisesRequest { SearchState state = 2; int32 limit = 3; string cursor = 4; + string requestId = 5; } message SearchPromisesResponse { @@ -72,6 +74,7 @@ message CreatePromiseRequest { bool strict = 3; Value param = 4; int64 timeout = 5; + string requestId = 6; } message CreatePromiseResponse { @@ -84,6 +87,7 @@ message CancelPromiseRequest { string idempotencyKey = 2; bool strict = 3; Value value = 4; + string requestId = 5; } message CancelPromiseResponse { @@ -96,6 +100,7 @@ message ResolvePromiseRequest { string idempotencyKey = 2; bool strict = 3; Value value = 4; + string requestId = 5; } message ResolvePromiseResponse { @@ -108,6 +113,7 @@ message RejectPromiseRequest { string idempotencyKey = 2; bool strict = 3; Value value = 4; + string requestId = 5; } message RejectPromiseResponse { diff --git a/internal/app/subsystems/api/grpc/grpc.go b/internal/app/subsystems/api/grpc/grpc.go index 1580a9ec..4fb46c5d 100644 --- a/internal/app/subsystems/api/grpc/grpc.go +++ b/internal/app/subsystems/api/grpc/grpc.go @@ -2,10 +2,11 @@ package grpc import ( "context" - "github.com/resonatehq/resonate/internal/app/subsystems/api/service" "log/slog" "net" + "github.com/resonatehq/resonate/internal/app/subsystems/api/service" + "github.com/resonatehq/resonate/internal/api" grpcApi "github.com/resonatehq/resonate/internal/app/subsystems/api/grpc/api" "github.com/resonatehq/resonate/internal/kernel/t_api" @@ -73,7 +74,11 @@ func (s *server) log(ctx context.Context, req interface{}, info *grpc.UnaryServe } func (s *server) ReadPromise(ctx context.Context, req *grpcApi.ReadPromiseRequest) (*grpcApi.ReadPromiseResponse, error) { - resp, err := s.service.ReadPromise(req.Id) + header := &service.Header{ + RequestId: req.RequestId, + } + + resp, err := s.service.ReadPromise(req.Id, header) if err != nil { return nil, grpcStatus.Error(codes.Internal, err.Error()) } @@ -85,13 +90,18 @@ func (s *server) ReadPromise(ctx context.Context, req *grpcApi.ReadPromiseReques } func (s *server) SearchPromises(ctx context.Context, req *grpcApi.SearchPromisesRequest) (*grpcApi.SearchPromisesResponse, error) { + header := &service.Header{ + RequestId: req.RequestId, + } + params := &service.SearchPromiseParams{ Q: req.Q, State: searchState(req.State), Limit: int(req.Limit), Cursor: req.Cursor, } - resp, err := s.service.SearchPromises(params) + + resp, err := s.service.SearchPromises(header, params) if err != nil { if verr, ok := err.(*service.ValidationError); ok { return nil, grpcStatus.Error(codes.InvalidArgument, verr.Error()) @@ -139,6 +149,7 @@ func (s *server) CreatePromise(ctx context.Context, req *grpcApi.CreatePromiseRe } header := &service.CreatePromiseHeader{ + RequestId: req.RequestId, Strict: req.Strict, IdempotencyKey: idempotencyKey, } @@ -180,6 +191,7 @@ func (s *server) CancelPromise(ctx context.Context, req *grpcApi.CancelPromiseRe } header := &service.CancelPromiseHeader{ + RequestId: req.RequestId, Strict: req.Strict, IdempotencyKey: idempotencyKey, } @@ -219,6 +231,7 @@ func (s *server) ResolvePromise(ctx context.Context, req *grpcApi.ResolvePromise } header := &service.ResolvePromiseHeader{ + RequestId: req.RequestId, Strict: req.Strict, IdempotencyKey: idempotencyKey, } @@ -242,7 +255,6 @@ func (s *server) ResolvePromise(ctx context.Context, req *grpcApi.ResolvePromise } func (s *server) RejectPromise(ctx context.Context, req *grpcApi.RejectPromiseRequest) (*grpcApi.RejectPromiseResponse, error) { - var idempotencyKey *promise.IdempotencyKey if req.IdempotencyKey != "" { i := promise.IdempotencyKey(req.IdempotencyKey) @@ -260,6 +272,7 @@ func (s *server) RejectPromise(ctx context.Context, req *grpcApi.RejectPromiseRe } header := &service.RejectPromiseHeader{ + RequestId: req.RequestId, Strict: req.Strict, IdempotencyKey: idempotencyKey, } diff --git a/internal/app/subsystems/api/http/promise.go b/internal/app/subsystems/api/http/promise.go index f8bfb7d0..bca71166 100644 --- a/internal/app/subsystems/api/http/promise.go +++ b/internal/app/subsystems/api/http/promise.go @@ -1,16 +1,25 @@ package http import ( - "github.com/resonatehq/resonate/internal/app/subsystems/api/service" "net/http" + "github.com/resonatehq/resonate/internal/app/subsystems/api/service" + "github.com/gin-gonic/gin" ) // Read Promise func (s *server) readPromise(c *gin.Context) { - resp, err := s.service.ReadPromise(c.Param("id")) + var header service.Header + if err := c.ShouldBindHeader(&header); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "error": err.Error(), + }) + return + } + + resp, err := s.service.ReadPromise(c.Param("id"), &header) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "error": err.Error(), @@ -24,8 +33,15 @@ func (s *server) readPromise(c *gin.Context) { // Search Promise func (s *server) searchPromises(c *gin.Context) { - var params service.SearchPromiseParams + var header service.Header + if err := c.ShouldBindHeader(&header); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "error": err.Error(), + }) + return + } + var params service.SearchPromiseParams if err := c.ShouldBindQuery(¶ms); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "error": err.Error(), @@ -33,7 +49,7 @@ func (s *server) searchPromises(c *gin.Context) { return } - resp, err := s.service.SearchPromises(¶ms) + resp, err := s.service.SearchPromises(&header, ¶ms) if err != nil { if verr, ok := err.(*service.ValidationError); ok { diff --git a/internal/app/subsystems/api/service/request.go b/internal/app/subsystems/api/service/request.go index 6e0ef144..ed79edf6 100644 --- a/internal/app/subsystems/api/service/request.go +++ b/internal/app/subsystems/api/service/request.go @@ -6,6 +6,10 @@ type ValidationError struct { msg string // description of error } +type Header struct { + RequestId string `header:"request-id"` +} + type SearchPromiseParams struct { Q string `form:"q" json:"q"` State string `form:"state" json:"state"` @@ -14,6 +18,7 @@ type SearchPromiseParams struct { } type CreatePromiseHeader struct { + RequestId string `header:"request-id"` IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"` Strict bool `header:"strict"` } @@ -25,6 +30,7 @@ type CreatePromiseBody struct { } type CancelPromiseHeader struct { + RequestId string `header:"request-id"` IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"` Strict bool `header:"strict"` } @@ -34,6 +40,7 @@ type CancelPromiseBody struct { } type ResolvePromiseHeader struct { + RequestId string `header:"request-id"` IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"` Strict bool `header:"strict"` } @@ -43,6 +50,7 @@ type ResolvePromiseBody struct { } type RejectPromiseHeader struct { + RequestId string `header:"request-id"` IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"` Strict bool `header:"strict"` } diff --git a/internal/app/subsystems/api/service/service.go b/internal/app/subsystems/api/service/service.go index 880c5c38..51b1848f 100644 --- a/internal/app/subsystems/api/service/service.go +++ b/internal/app/subsystems/api/service/service.go @@ -1,40 +1,51 @@ package service import ( + "strings" + + "github.com/google/uuid" "github.com/resonatehq/resonate/internal/api" "github.com/resonatehq/resonate/internal/kernel/bus" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/t_api" "github.com/resonatehq/resonate/internal/util" "github.com/resonatehq/resonate/pkg/promise" - "strings" ) func (e *ValidationError) Error() string { return e.msg } type Service struct { - api api.API - serverProtocol string + api api.API + protocol string } -func New(api api.API, serverProtocol string) *Service { +func New(api api.API, protocol string) *Service { return &Service{ - api: api, - serverProtocol: serverProtocol, + api: api, + protocol: protocol, } } -func (s *Service) protocol() string { - return s.serverProtocol +func (s *Service) metadata(id string, name string) *metadata.Metadata { + if id == "" { + id = uuid.New().String() + } + + metadata := metadata.New(id) + metadata.Tags.Set("name", name) + metadata.Tags.Set("api", s.protocol) + + return metadata } // Read Promise -func (s *Service) ReadPromise(id string) (*t_api.ReadPromiseResponse, error) { +func (s *Service) ReadPromise(id string, header *Header) (*t_api.ReadPromiseResponse, error) { cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) defer close(cq) s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: s.protocol(), + Metadata: s.metadata(header.RequestId, "read-promise"), Submission: &t_api.Request{ Kind: t_api.ReadPromise, ReadPromise: &t_api.ReadPromiseRequest{ @@ -56,7 +67,7 @@ func (s *Service) ReadPromise(id string) (*t_api.ReadPromiseResponse, error) { // Search Promise -func (s *Service) SearchPromises(params *SearchPromiseParams) (*t_api.SearchPromisesResponse, error) { +func (s *Service) SearchPromises(header *Header, params *SearchPromiseParams) (*t_api.SearchPromisesResponse, error) { var searchPromises *t_api.SearchPromisesRequest if params.Cursor != "" { cursor, err := t_api.NewCursor[t_api.SearchPromisesRequest](params.Cursor) @@ -114,7 +125,7 @@ func (s *Service) SearchPromises(params *SearchPromiseParams) (*t_api.SearchProm defer close(cq) s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: s.protocol(), + Metadata: s.metadata(header.RequestId, "search-promises"), Submission: &t_api.Request{ Kind: t_api.SearchPromises, SearchPromises: searchPromises, @@ -138,7 +149,7 @@ func (s *Service) CreatePromise(id string, header *CreatePromiseHeader, body *Cr defer close(cq) s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: s.protocol(), + Metadata: s.metadata(header.RequestId, "create-promise"), Submission: &t_api.Request{ Kind: t_api.CreatePromise, CreatePromise: &t_api.CreatePromiseRequest{ @@ -169,7 +180,7 @@ func (s *Service) CancelPromise(id string, header *CancelPromiseHeader, body *Ca defer close(cq) s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: s.protocol(), + Metadata: s.metadata(header.RequestId, "cancel-promise"), Submission: &t_api.Request{ Kind: t_api.CancelPromise, CancelPromise: &t_api.CancelPromiseRequest{ @@ -198,7 +209,7 @@ func (s *Service) ResolvePromise(id string, header *ResolvePromiseHeader, body * defer close(cq) s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: s.protocol(), + Metadata: s.metadata(header.RequestId, "resolve-promise"), Submission: &t_api.Request{ Kind: t_api.ResolvePromise, ResolvePromise: &t_api.ResolvePromiseRequest{ @@ -227,7 +238,7 @@ func (s *Service) RejectPromise(id string, header *RejectPromiseHeader, body *Re defer close(cq) s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: s.protocol(), + Metadata: s.metadata(header.RequestId, "reject-promise"), Submission: &t_api.Request{ Kind: t_api.RejectPromise, RejectPromise: &t_api.RejectPromiseRequest{ @@ -252,7 +263,7 @@ func (s *Service) RejectPromise(id string, header *RejectPromiseHeader, body *Re func (s *Service) sendOrPanic(cq chan *bus.CQE[t_api.Request, t_api.Response]) func(*t_api.Response, error) { return func(completion *t_api.Response, err error) { cqe := &bus.CQE[t_api.Request, t_api.Response]{ - Tags: s.protocol(), + // Tags: s.protocol(), Completion: completion, Error: err, } diff --git a/internal/app/subsystems/api/service/service_test.go b/internal/app/subsystems/api/service/service_test.go index 02d9e2e6..8d3dc3e7 100644 --- a/internal/app/subsystems/api/service/service_test.go +++ b/internal/app/subsystems/api/service/service_test.go @@ -2,9 +2,10 @@ package service import ( "fmt" + "testing" + "github.com/resonatehq/resonate/internal/app/subsystems/api/test" "github.com/resonatehq/resonate/pkg/promise" - "testing" "github.com/resonatehq/resonate/internal/kernel/t_api" "github.com/stretchr/testify/assert" @@ -75,7 +76,7 @@ func TestReadPromise(t *testing.T) { t.Run(tc.name, func(t *testing.T) { serviceTest.Load(t, tc.req, tc.res) - res, err := serviceTest.service.ReadPromise(tc.id) + res, err := serviceTest.service.ReadPromise(tc.id, &Header{}) if err != nil { t.Fatal(err) } @@ -244,7 +245,7 @@ func TestSearchPromises(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { serviceTest.Load(t, tc.req, tc.res) - res, err := serviceTest.service.SearchPromises(tc.serviceReq) + res, err := serviceTest.service.SearchPromises(&Header{}, tc.serviceReq) if err != nil { fmt.Println("we are here bro", res.Status) t.Fatal(err) diff --git a/internal/kernel/bus/bus.go b/internal/kernel/bus/bus.go index b03a6f67..3a2499bf 100644 --- a/internal/kernel/bus/bus.go +++ b/internal/kernel/bus/bus.go @@ -3,6 +3,7 @@ package bus import ( "fmt" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/t_aio" "github.com/resonatehq/resonate/internal/kernel/t_api" ) @@ -16,22 +17,22 @@ type Output interface { } type SQE[I Input, O Output] struct { - Tags string + Metadata *metadata.Metadata Submission *I Callback func(*O, error) } func (sqe *SQE[I, O]) String() string { - return fmt.Sprintf("SQE(tags=%s, submission=%v)", sqe.Tags, sqe.Submission) + return fmt.Sprintf("SQE(metadata=%s, submission=%v)", sqe.Metadata, sqe.Submission) } type CQE[I Input, O Output] struct { - Tags string + Metadata *metadata.Metadata Completion *O Callback func(*O, error) Error error } func (cqe *CQE[I, O]) String() string { - return fmt.Sprintf("CQE(tags=%s, completion=%v, error=%v)", cqe.Tags, cqe.Completion, cqe.Error) + return fmt.Sprintf("CQE(metadata=%s, completion=%v, error=%v)", cqe.Metadata, cqe.Completion, cqe.Error) } diff --git a/internal/kernel/metadata/metadata.go b/internal/kernel/metadata/metadata.go new file mode 100644 index 00000000..36523015 --- /dev/null +++ b/internal/kernel/metadata/metadata.go @@ -0,0 +1,36 @@ +package metadata + +import ( + "fmt" + "strings" +) + +type Metadata struct { + TransactionId string + Tags Tags +} + +func New(id string) *Metadata { + return &Metadata{ + TransactionId: id, + Tags: Tags{}, + } +} + +func (m Metadata) String() string { + return fmt.Sprintf("Metadata(tid=%s, tags=%s)", m.TransactionId, m.Tags) +} + +type Tags map[string]string + +func (t Tags) Set(key string, val string) { + t[key] = val +} + +func (t Tags) Get(key string) string { + return t[key] +} + +func (t Tags) Split(key string) []string { + return strings.Split(t[key], ",") +} diff --git a/internal/kernel/scheduler/coroutine.go b/internal/kernel/scheduler/coroutine.go index f7a14ca0..8f7a6da9 100644 --- a/internal/kernel/scheduler/coroutine.go +++ b/internal/kernel/scheduler/coroutine.go @@ -1,18 +1,30 @@ package scheduler +import ( + "fmt" + + "github.com/resonatehq/resonate/internal/kernel/metadata" +) + type Coroutine[I, O any] struct { - name string - f func(*Coroutine[I, O]) - c_i chan *WrapI[I] - c_o chan *WrapO[O] + f func(*Coroutine[I, O]) + c_i chan *WrapI[I] + c_o chan *WrapO[O] // lifecycle onDone []func() + // metadata + metadata *metadata.Metadata + // scheduler Scheduler S } +func (c *Coroutine[I, O]) String() string { + return fmt.Sprintf("Coroutine(metadata=%s)", c.metadata) +} + type WrapI[I any] struct { Value I Error error @@ -23,12 +35,12 @@ type WrapO[O any] struct { Done bool } -func NewCoroutine[I, O any](name string, f func(*Coroutine[I, O])) *Coroutine[I, O] { +func NewCoroutine[I, O any](metadata *metadata.Metadata, f func(*Coroutine[I, O])) *Coroutine[I, O] { c := &Coroutine[I, O]{ - f: f, - c_i: make(chan *WrapI[I]), - c_o: make(chan *WrapO[O]), - name: name, + f: f, + c_i: make(chan *WrapI[I]), + c_o: make(chan *WrapO[O]), + metadata: metadata, } go func() { @@ -49,14 +61,16 @@ func (c *Coroutine[I, O]) OnDone(f func()) { func (c *Coroutine[I, O]) Yield(o O) (I, error) { c.c_o <- &WrapO[O]{Value: o, Done: false} - i := <-c.c_i + i := <-c.c_i return i.Value, i.Error } -func (c *Coroutine[I, O]) Resume(i I, e error) *WrapO[O] { +func (c *Coroutine[I, O]) Resume(i I, e error) (O, bool) { c.c_i <- &WrapI[I]{Value: i, Error: e} - return <-c.c_o + + o := <-c.c_o + return o.Value, o.Done } func (c *Coroutine[I, O]) Time() int64 { diff --git a/internal/kernel/scheduler/scheduler.go b/internal/kernel/scheduler/scheduler.go index 1914dfd9..3cdf02f4 100644 --- a/internal/kernel/scheduler/scheduler.go +++ b/internal/kernel/scheduler/scheduler.go @@ -36,9 +36,9 @@ func NewScheduler(aio aio.AIO, metrics *metrics.Metrics) *Scheduler { } func (s *Scheduler) Add(coroutine *Coroutine[*t_aio.Completion, *t_aio.Submission]) { - slog.Debug("scheduler:add", "coroutine", coroutine.name) - s.metrics.CoroutinesTotal.WithLabelValues(coroutine.name).Inc() - s.metrics.CoroutinesInFlight.WithLabelValues(coroutine.name).Inc() + slog.Debug("scheduler:add", "coroutine", coroutine) + s.metrics.CoroutinesTotal.WithLabelValues(coroutine.metadata.Tags.Get("name")).Inc() + s.metrics.CoroutinesInFlight.WithLabelValues(coroutine.metadata.Tags.Get("name")).Inc() // add reference to scheduler coroutine.Scheduler = s @@ -58,16 +58,21 @@ func (s *Scheduler) Tick(t int64, batchSize int) { } // enqueue sqes - for _, coroutine := range s.runnable { - coroutine := coroutine // bind to local variable for callback + n := len(s.runnable) + for i := 0; i < n; i++ { + coroutine := s.runnable[i] // bind to local variable for callback - if submission := coroutine.Resume(coroutine.next, coroutine.error); !submission.Done { + if submission, done := coroutine.Resume(coroutine.next, coroutine.error); !done { // suspend s.suspended = append(s.suspended, coroutine.Coroutine) + // metadata + metadata := coroutine.metadata + metadata.Tags.Set("aio", submission.Kind.String()) + s.aio.Enqueue(&bus.SQE[t_aio.Submission, t_aio.Completion]{ - Tags: submission.Value.Kind.String(), - Submission: submission.Value, + Metadata: metadata, + Submission: submission, Callback: func(completion *t_aio.Completion, err error) { // unsuspend s.runnable = append(s.runnable, &runnableCoroutine{ @@ -85,8 +90,8 @@ func (s *Scheduler) Tick(t int64, batchSize int) { }, }) } else { - slog.Debug("scheduler:rmv", "coroutine", coroutine.name) - s.metrics.CoroutinesInFlight.WithLabelValues(coroutine.name).Dec() + slog.Debug("scheduler:rmv", "coroutine", coroutine) + s.metrics.CoroutinesInFlight.WithLabelValues(coroutine.metadata.Tags.Get("name")).Dec() // call onDone functions for _, f := range coroutine.onDone { @@ -98,8 +103,8 @@ func (s *Scheduler) Tick(t int64, batchSize int) { // flush s.aio.Flush(t) - // clear runnable - s.runnable = nil + // clear runnable (new coroutines may have been appended) + s.runnable = s.runnable[n:] } func (s *Scheduler) Time() int64 { diff --git a/internal/kernel/system/system.go b/internal/kernel/system/system.go index bd9bff80..4a41c293 100644 --- a/internal/kernel/system/system.go +++ b/internal/kernel/system/system.go @@ -8,6 +8,7 @@ import ( "github.com/resonatehq/resonate/internal/api" "github.com/resonatehq/resonate/internal/metrics" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/scheduler" "github.com/resonatehq/resonate/internal/kernel/t_aio" "github.com/resonatehq/resonate/internal/kernel/t_api" @@ -35,8 +36,8 @@ type System struct { config *Config metrics *metrics.Metrics scheduler *scheduler.Scheduler - onRequest map[t_api.Kind]func(*t_api.Request, func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] - onTick map[int][]func(*Config) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] + onRequest map[t_api.Kind]func(*metadata.Metadata, *t_api.Request, func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] + onTick map[int][]func(int64, *Config) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] ticks int64 } @@ -47,8 +48,8 @@ func New(api api.API, aio aio.AIO, config *Config, metrics *metrics.Metrics) *Sy config: config, metrics: metrics, scheduler: scheduler.NewScheduler(aio, metrics), - onRequest: map[t_api.Kind]func(*t_api.Request, func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]{}, - onTick: map[int][]func(*Config) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]{}, + onRequest: map[t_api.Kind]func(*metadata.Metadata, *t_api.Request, func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]{}, + onTick: map[int][]func(int64, *Config) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]{}, } } @@ -73,7 +74,7 @@ func (s *System) Tick(t int64, timeoutCh <-chan time.Time) { // add request coroutines for _, sqe := range s.api.Dequeue(s.config.SubmissionBatchSize, timeoutCh) { if coroutine, ok := s.onRequest[sqe.Submission.Kind]; ok { - s.scheduler.Add(coroutine(sqe.Submission, sqe.Callback)) + s.scheduler.Add(coroutine(sqe.Metadata, sqe.Submission, sqe.Callback)) } else { panic("invalid api request") } @@ -83,7 +84,7 @@ func (s *System) Tick(t int64, timeoutCh <-chan time.Time) { for _, coroutines := range util.OrderedRangeKV(s.onTick) { if s.ticks%int64(coroutines.Key) == 0 { for _, coroutine := range coroutines.Value { - s.scheduler.Add(coroutine(s.config)) + s.scheduler.Add(coroutine(t, s.config)) } } } @@ -98,12 +99,12 @@ func (s *System) Shutdown() { s.aio.Shutdown() } -func (s *System) AddOnRequest(kind t_api.Kind, constructor func(*t_api.Request, func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { +func (s *System) AddOnRequest(kind t_api.Kind, constructor func(*metadata.Metadata, *t_api.Request, func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { s.onRequest[kind] = constructor } -func (s *System) AddOnTick(n int, constructor func(*Config) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { - util.Assert(n > 0, "n must be greater than 0") +func (s *System) AddOnTick(n int, constructor func(int64, *Config) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { + util.Assert(n > 0, "n must be greater than zero") s.onTick[n] = append(s.onTick[n], constructor) } diff --git a/internal/kernel/t_api/api.go b/internal/kernel/t_api/api.go index 24f76d83..7d5b3ff2 100644 --- a/internal/kernel/t_api/api.go +++ b/internal/kernel/t_api/api.go @@ -19,3 +19,28 @@ const ( // Echo Echo ) + +func (k Kind) String() string { + switch k { + case ReadPromise: + return "read-promise" + case SearchPromises: + return "search-promises" + case CreatePromise: + return "create-promise" + case CancelPromise: + return "cancel-promise" + case ResolvePromise: + return "resolve-promise" + case RejectPromise: + return "reject-promise" + case ReadSubscriptions: + return "read-subscriptions" + case CreateSubscription: + return "create-subscription" + case DeleteSubscription: + return "delete-subscription" + default: + panic("invalid api") + } +} diff --git a/test/dst/dst.go b/test/dst/dst.go index b7cb31c7..3ef3b63e 100644 --- a/test/dst/dst.go +++ b/test/dst/dst.go @@ -4,10 +4,12 @@ import ( "fmt" "log/slog" "math/rand" + "strconv" "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/api" "github.com/resonatehq/resonate/internal/kernel/bus" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/system" "github.com/resonatehq/resonate/internal/kernel/t_api" ) @@ -75,6 +77,7 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System, } // errors + var i int64 var errs []error // test loop @@ -82,8 +85,13 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System, for _, req := range generator.Generate(r, t, d.config.Reqs(), model.cursors) { req := req reqTime := t + + metadata := metadata.New(strconv.FormatInt(i, 10)) + metadata.Tags.Set("name", req.Kind.String()) + metadata.Tags.Set("api", "dst") + api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: "dst", + Metadata: metadata, Submission: req, Callback: func(res *t_api.Response, err error) { modelErr := model.Step(req, res, err) @@ -91,9 +99,11 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System, errs = append(errs, modelErr) } - slog.Info("DST", "t", fmt.Sprintf("%d|%d", reqTime, t), "req", req, "res", res, "err", err, "ok", modelErr == nil) + slog.Info("DST", "t", fmt.Sprintf("%d|%d", reqTime, t), "tid", metadata.TransactionId, "req", req, "res", res, "err", err, "ok", modelErr == nil) }, }) + + i++ } system.Tick(t, nil) diff --git a/test/system/system_test.go b/test/system/system_test.go index 1fa13914..8fb6733e 100644 --- a/test/system/system_test.go +++ b/test/system/system_test.go @@ -1,6 +1,7 @@ package system import ( + "fmt" "strconv" "testing" @@ -10,6 +11,7 @@ import ( "github.com/resonatehq/resonate/internal/app/coroutines" "github.com/resonatehq/resonate/internal/app/subsystems/aio/echo" "github.com/resonatehq/resonate/internal/kernel/bus" + "github.com/resonatehq/resonate/internal/kernel/metadata" "github.com/resonatehq/resonate/internal/kernel/system" "github.com/resonatehq/resonate/internal/kernel/t_aio" "github.com/resonatehq/resonate/internal/kernel/t_api" @@ -52,8 +54,12 @@ func TestSystemLoop(t *testing.T) { for i := 0; i < 5; i++ { data := strconv.Itoa(i) + metadata := metadata.New(fmt.Sprintf("a.%d", i)) + metadata.Tags.Set("name", "echo") + metadata.Tags.Set("api", "test") + api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: "test", + Metadata: metadata, Submission: &t_api.Request{ Kind: t_api.Echo, Echo: &t_api.EchoRequest{ @@ -74,8 +80,12 @@ func TestSystemLoop(t *testing.T) { // all requests made after shutdown should fail for i := 0; i < 5; i++ { + metadata := metadata.New(fmt.Sprintf("a.%d", i)) + metadata.Tags.Set("name", "echo") + metadata.Tags.Set("api", "test") + api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: "test", + Metadata: metadata, Submission: &t_api.Request{ Kind: t_api.Echo, Echo: &t_api.EchoRequest{