From d08b07100b8bdf48673dccb966d4289df7faca1d Mon Sep 17 00:00:00 2001 From: David Farr Date: Tue, 29 Aug 2023 17:55:46 -0700 Subject: [PATCH 1/2] Add tags and createdOn/completedOn timestamps to promises --- internal/aio/aio.go | 6 +- internal/api/api.go | 68 +++++---- internal/app/coroutines/cancelPromise.go | 50 +++--- internal/app/coroutines/createPromise.go | 57 +++---- internal/app/coroutines/createSubscription.go | 1 + internal/app/coroutines/readPromise.go | 34 ++--- internal/app/coroutines/rejectPromise.go | 50 +++--- internal/app/coroutines/resolvePromise.go | 49 +++--- internal/app/coroutines/timeoutPromise.go | 15 +- internal/app/coroutines/timeoutPromises.go | 10 +- internal/app/subsystems/aio/echo/echo.go | 1 + .../app/subsystems/aio/network/network.go | 1 + .../app/subsystems/aio/network/network_dst.go | 1 + .../app/subsystems/aio/store/sqlite/sqlite.go | 40 +++-- internal/app/subsystems/aio/store/store.go | 1 + .../app/subsystems/aio/store/test/util.go | 144 ++++++++++++++++++ internal/app/subsystems/api/grpc/grpc.go | 28 ++-- internal/app/subsystems/api/http/http.go | 1 + internal/app/subsystems/api/http/promise.go | 18 ++- .../app/subsystems/api/http/subscription.go | 9 +- internal/kernel/bus/bus.go | 2 + internal/kernel/scheduler/scheduler.go | 1 + internal/kernel/types/aio_store.go | 10 +- internal/kernel/types/api_request.go | 1 + pkg/promise/promise.go | 13 +- pkg/promise/record.go | 41 +++-- pkg/subscription/record.go | 2 + pkg/subscription/subscription.go | 1 + test/dst/dst.go | 2 +- 29 files changed, 438 insertions(+), 219 deletions(-) diff --git a/internal/aio/aio.go b/internal/aio/aio.go index 85f0bd13..124f6f13 100644 --- a/internal/aio/aio.go +++ b/internal/aio/aio.go @@ -109,7 +109,7 @@ func (a *aio) Enqueue(sqe *bus.SQE[types.Submission, types.Completion]) { select { case subsystem.sq <- sqe: slog.Debug("aio:enqueue", "sqe", sqe.Submission) - a.metrics.AioInFlight.WithLabelValues(sqe.Submission.Kind.String()).Inc() + a.metrics.AioInFlight.WithLabelValues(sqe.Kind).Inc() default: sqe.Callback(nil, fmt.Errorf("aio:subsystem:%s submission queue full", subsystem)) } @@ -138,8 +138,8 @@ func (a *aio) Dequeue(n int) []*bus.CQE[types.Submission, types.Completion] { } slog.Debug("aio:dequeue", "cqe", cqe.Completion) - a.metrics.AioTotal.WithLabelValues(cqe.Completion.Kind.String(), status).Inc() - a.metrics.AioInFlight.WithLabelValues(cqe.Completion.Kind.String()).Dec() + a.metrics.AioTotal.WithLabelValues(cqe.Kind, status).Inc() + a.metrics.AioInFlight.WithLabelValues(cqe.Kind).Dec() cqes = append(cqes, cqe) default: diff --git a/internal/api/api.go b/internal/api/api.go index af9984a1..ef478efe 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -11,7 +11,7 @@ import ( ) type API interface { - Enqueue(string, *bus.SQE[types.Request, types.Response]) + Enqueue(*bus.SQE[types.Request, types.Response]) Dequeue(int, <-chan time.Time) []*bus.SQE[types.Request, types.Response] Done() bool } @@ -66,40 +66,42 @@ func (a *api) Errors() <-chan error { return a.errors } -func (a *api) Enqueue(kind string, sqe *bus.SQE[types.Request, types.Response]) { - select { - case a.sq <- sqe: - a.metrics.ApiInFlight.WithLabelValues(kind).Inc() - - callback := sqe.Callback - sqe.Callback = func(res *types.Response, err error) { - var status types.ResponseStatus - switch res.Kind { - case types.ReadPromise: - status = res.ReadPromise.Status - case types.SearchPromises: - status = res.SearchPromises.Status - case types.CreatePromise: - status = res.CreatePromise.Status - case types.CancelPromise: - status = res.CancelPromise.Status - case types.ResolvePromise: - status = res.ResolvePromise.Status - case types.RejectPromise: - status = res.RejectPromise.Status - case types.ReadSubscriptions: - status = res.ReadSubscriptions.Status - case types.CreateSubscription: - status = res.CreateSubscription.Status - case types.DeleteSubscription: - status = res.DeleteSubscription.Status - } +func (a *api) Enqueue(sqe *bus.SQE[types.Request, types.Response]) { + // replace sqe.Callback with a callback that wraps the original + // function and emits metrics + callback := sqe.Callback + sqe.Callback = func(res *types.Response, err error) { + var status types.ResponseStatus + switch res.Kind { + case types.ReadPromise: + status = res.ReadPromise.Status + case types.SearchPromises: + status = res.SearchPromises.Status + case types.CreatePromise: + status = res.CreatePromise.Status + case types.CancelPromise: + status = res.CancelPromise.Status + case types.ResolvePromise: + status = res.ResolvePromise.Status + case types.RejectPromise: + status = res.RejectPromise.Status + case types.ReadSubscriptions: + status = res.ReadSubscriptions.Status + case types.CreateSubscription: + status = res.CreateSubscription.Status + case types.DeleteSubscription: + status = res.DeleteSubscription.Status + } - a.metrics.ApiTotal.WithLabelValues(kind, strconv.Itoa(int(status))).Inc() - a.metrics.ApiInFlight.WithLabelValues(kind).Dec() + a.metrics.ApiTotal.WithLabelValues(sqe.Kind, strconv.Itoa(int(status))).Inc() + a.metrics.ApiInFlight.WithLabelValues(sqe.Kind).Dec() - callback(res, err) - } + callback(res, err) + } + + select { + case a.sq <- sqe: + a.metrics.ApiInFlight.WithLabelValues(sqe.Kind).Inc() default: sqe.Callback(nil, fmt.Errorf("api submission queue full")) } diff --git a/internal/app/coroutines/cancelPromise.go b/internal/app/coroutines/cancelPromise.go index 17428917..8f28f6db 100644 --- a/internal/app/coroutines/cancelPromise.go +++ b/internal/app/coroutines/cancelPromise.go @@ -52,18 +52,16 @@ func CancelPromise(t int64, req *types.Request, res func(*types.Response, error) }, }, nil) } else { - record := result.Records[0] - - param, err := record.Param() + p, err := result.Records[0].Promise() if err != nil { - slog.Error("failed to parse promise record param", "record", record, "err", err) + slog.Error("failed to parse promise record", "record", result.Records[0], "err", err) res(nil, err) return } - if record.State == promise.Pending { - if t >= record.Timeout { - s.Add(TimeoutPromise(t, req.CancelPromise.Id, CancelPromise(t, req, res), func(err error) { + if p.State == promise.Pending { + if t >= p.Timeout { + s.Add(TimeoutPromise(t, p, CancelPromise(t, req, res), func(err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(nil, err) @@ -75,15 +73,18 @@ func CancelPromise(t int64, req *types.Request, res func(*types.Response, error) CancelPromise: &types.CancelPromiseResponse{ Status: types.ResponseForbidden, Promise: &promise.Promise{ - Id: record.Id, + Id: p.Id, State: promise.Timedout, - Param: param, + Param: p.Param, Value: promise.Value{ Headers: map[string]string{}, Ikey: nil, Data: nil, }, - Timeout: record.Timeout, + Timeout: p.Timeout, + Tags: p.Tags, + CreatedOn: p.CreatedOn, + CompletedOn: &p.Timeout, }, }, }, nil) @@ -119,9 +120,10 @@ func CancelPromise(t int64, req *types.Request, res func(*types.Response, error) { Kind: types.StoreUpdatePromise, UpdatePromise: &types.UpdatePromiseCommand{ - Id: req.CancelPromise.Id, - State: promise.Canceled, - Value: req.CancelPromise.Value, + Id: req.CancelPromise.Id, + State: promise.Canceled, + Value: req.CancelPromise.Value, + CompletedOn: t, }, }, { @@ -171,11 +173,14 @@ func CancelPromise(t int64, req *types.Request, res func(*types.Response, error) CancelPromise: &types.CancelPromiseResponse{ Status: types.ResponseCreated, Promise: &promise.Promise{ - Id: record.Id, - State: promise.Canceled, - Timeout: record.Timeout, - Param: param, - Value: req.CancelPromise.Value, + Id: p.Id, + State: promise.Canceled, + Param: p.Param, + Value: req.CancelPromise.Value, + Timeout: p.Timeout, + Tags: p.Tags, + CreatedOn: p.CreatedOn, + CompletedOn: &t, }, }, }, nil) @@ -186,15 +191,8 @@ func CancelPromise(t int64, req *types.Request, res func(*types.Response, error) }) } } else { - p, err := record.Promise() - if err != nil { - slog.Error("failed to parse promise record", "record", record, "err", err) - res(nil, err) - return - } - var status types.ResponseStatus - if record.State == promise.Canceled && record.ValueIkey.Match(req.CancelPromise.Value.Ikey) { + if p.State == promise.Canceled && p.Value.Ikey.Match(req.CancelPromise.Value.Ikey) { status = types.ResponseOK } else { status = types.ResponseForbidden diff --git a/internal/app/coroutines/createPromise.go b/internal/app/coroutines/createPromise.go index 8f0c6067..e154af71 100644 --- a/internal/app/coroutines/createPromise.go +++ b/internal/app/coroutines/createPromise.go @@ -16,6 +16,9 @@ func CreatePromise(t int64, req *types.Request, res func(*types.Response, error) if req.CreatePromise.Param.Headers == nil { req.CreatePromise.Param.Headers = map[string]string{} } + if req.CreatePromise.Tags == nil { + req.CreatePromise.Tags = map[string]string{} + } submission := &types.Submission{ Kind: types.Store, @@ -50,9 +53,11 @@ func CreatePromise(t int64, req *types.Request, res func(*types.Response, error) { Kind: types.StoreCreatePromise, CreatePromise: &types.CreatePromiseCommand{ - Id: req.CreatePromise.Id, - Timeout: req.CreatePromise.Timeout, - Param: req.CreatePromise.Param, + Id: req.CreatePromise.Id, + Param: req.CreatePromise.Param, + Timeout: req.CreatePromise.Timeout, + Tags: req.CreatePromise.Tags, + CreatedOn: t, }, }, { @@ -79,6 +84,7 @@ func CreatePromise(t int64, req *types.Request, res func(*types.Response, error) PromiseId: req.CreatePromise.Id, Url: s.Url, RetryPolicy: s.RetryPolicy, + CreatedOn: t, }, }) } @@ -110,10 +116,12 @@ func CreatePromise(t int64, req *types.Request, res func(*types.Response, error) CreatePromise: &types.CreatePromiseResponse{ Status: types.ResponseCreated, Promise: &promise.Promise{ - Id: req.CreatePromise.Id, - State: promise.Pending, - Timeout: req.CreatePromise.Timeout, - Param: req.CreatePromise.Param, + Id: req.CreatePromise.Id, + State: promise.Pending, + Param: req.CreatePromise.Param, + Timeout: req.CreatePromise.Timeout, + Tags: req.CreatePromise.Tags, + CreatedOn: &t, }, }, }, nil) @@ -122,55 +130,50 @@ func CreatePromise(t int64, req *types.Request, res func(*types.Response, error) } }) } else { - record := result.Records[0] + p, err := result.Records[0].Promise() + if err != nil { + slog.Error("failed to parse promise record", "record", result.Records[0], "err", err) + res(nil, err) + return + } var status types.ResponseStatus - if record.ParamIkey.Match(req.CreatePromise.Param.Ikey) { + if p.Param.Ikey.Match(req.CreatePromise.Param.Ikey) { status = types.ResponseOK } else { status = types.ResponseForbidden } - if record.State == promise.Pending && t >= record.Timeout { - s.Add(TimeoutPromise(t, req.CreatePromise.Id, CreatePromise(t, req, res), func(err error) { + if p.State == promise.Pending && t >= p.Timeout { + s.Add(TimeoutPromise(t, p, CreatePromise(t, req, res), func(err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(nil, err) return } - param, err := record.Param() - if err != nil { - res(nil, err) - return - } - res(&types.Response{ Kind: types.CreatePromise, CreatePromise: &types.CreatePromiseResponse{ Status: status, Promise: &promise.Promise{ - Id: record.Id, + Id: p.Id, State: promise.Timedout, - Param: param, + Param: p.Param, Value: promise.Value{ Headers: map[string]string{}, Ikey: nil, Data: nil, }, - Timeout: record.Timeout, + Timeout: p.Timeout, + Tags: p.Tags, + CreatedOn: p.CreatedOn, + CompletedOn: &p.Timeout, }, }, }, nil) })) } else { - p, err := record.Promise() - if err != nil { - slog.Error("failed to parse promise record", "record", record, "err", err) - res(nil, err) - return - } - res(&types.Response{ Kind: types.CreatePromise, CreatePromise: &types.CreatePromiseResponse{ diff --git a/internal/app/coroutines/createSubscription.go b/internal/app/coroutines/createSubscription.go index 1ad472c7..48f4019c 100644 --- a/internal/app/coroutines/createSubscription.go +++ b/internal/app/coroutines/createSubscription.go @@ -31,6 +31,7 @@ func CreateSubscription(t int64, req *types.Request, res func(*types.Response, e PromiseId: req.CreateSubscription.PromiseId, Url: req.CreateSubscription.Url, RetryPolicy: req.CreateSubscription.RetryPolicy, + CreatedOn: t, }, }, }, diff --git a/internal/app/coroutines/readPromise.go b/internal/app/coroutines/readPromise.go index 23848efb..f7b7f87a 100644 --- a/internal/app/coroutines/readPromise.go +++ b/internal/app/coroutines/readPromise.go @@ -48,49 +48,43 @@ func ReadPromise(t int64, req *types.Request, res func(*types.Response, error)) }, }, nil) } else { - record := result.Records[0] + p, err := result.Records[0].Promise() + if err != nil { + slog.Error("failed to parse promise record", "record", result.Records[0], "err", err) + res(nil, err) + return + } - if record.State == promise.Pending && t >= record.Timeout { - s.Add(TimeoutPromise(t, req.ReadPromise.Id, ReadPromise(t, req, res), func(err error) { + if p.State == promise.Pending && t >= p.Timeout { + s.Add(TimeoutPromise(t, p, ReadPromise(t, req, res), func(err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(nil, err) return } - param, err := record.Param() - if err != nil { - slog.Error("failed to parse promise record param", "record", record, "err", err) - res(nil, err) - return - } - res(&types.Response{ Kind: types.ReadPromise, ReadPromise: &types.ReadPromiseResponse{ Status: types.ResponseOK, Promise: &promise.Promise{ - Id: record.Id, + Id: p.Id, State: promise.Timedout, - Param: param, + Param: p.Param, Value: promise.Value{ Headers: map[string]string{}, Ikey: nil, Data: nil, }, - Timeout: record.Timeout, + Timeout: p.Timeout, + Tags: p.Tags, + CreatedOn: p.CreatedOn, + CompletedOn: &p.Timeout, }, }, }, nil) })) } else { - p, err := record.Promise() - if err != nil { - slog.Error("failed to parse promise record", "record", record, "err", err) - res(nil, err) - return - } - res(&types.Response{ Kind: types.ReadPromise, ReadPromise: &types.ReadPromiseResponse{ diff --git a/internal/app/coroutines/rejectPromise.go b/internal/app/coroutines/rejectPromise.go index 72e2282c..1578d47a 100644 --- a/internal/app/coroutines/rejectPromise.go +++ b/internal/app/coroutines/rejectPromise.go @@ -52,18 +52,16 @@ func RejectPromise(t int64, req *types.Request, res func(*types.Response, error) }, }, nil) } else { - record := result.Records[0] - - param, err := record.Param() + p, err := result.Records[0].Promise() if err != nil { - slog.Error("failed to parse promise record param", "record", record, "err", err) + slog.Error("failed to parse promise record", "record", result.Records[0], "err", err) res(nil, err) return } - if record.State == promise.Pending { - if t >= record.Timeout { - s.Add(TimeoutPromise(t, req.RejectPromise.Id, RejectPromise(t, req, res), func(err error) { + if p.State == promise.Pending { + if t >= p.Timeout { + s.Add(TimeoutPromise(t, p, RejectPromise(t, req, res), func(err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(nil, err) @@ -75,15 +73,18 @@ func RejectPromise(t int64, req *types.Request, res func(*types.Response, error) RejectPromise: &types.RejectPromiseResponse{ Status: types.ResponseForbidden, Promise: &promise.Promise{ - Id: record.Id, + Id: p.Id, State: promise.Timedout, - Param: param, + Param: p.Param, Value: promise.Value{ Headers: map[string]string{}, Ikey: nil, Data: nil, }, - Timeout: record.Timeout, + Timeout: p.Timeout, + Tags: p.Tags, + CreatedOn: p.CreatedOn, + CompletedOn: &p.Timeout, }, }, }, nil) @@ -119,9 +120,10 @@ func RejectPromise(t int64, req *types.Request, res func(*types.Response, error) { Kind: types.StoreUpdatePromise, UpdatePromise: &types.UpdatePromiseCommand{ - Id: req.RejectPromise.Id, - State: promise.Rejected, - Value: req.RejectPromise.Value, + Id: req.RejectPromise.Id, + State: promise.Rejected, + Value: req.RejectPromise.Value, + CompletedOn: t, }, }, { @@ -171,11 +173,14 @@ func RejectPromise(t int64, req *types.Request, res func(*types.Response, error) RejectPromise: &types.RejectPromiseResponse{ Status: types.ResponseCreated, Promise: &promise.Promise{ - Id: record.Id, - State: promise.Rejected, - Timeout: record.Timeout, - Param: param, - Value: req.RejectPromise.Value, + Id: p.Id, + State: promise.Rejected, + Param: p.Param, + Value: req.RejectPromise.Value, + Timeout: p.Timeout, + Tags: p.Tags, + CreatedOn: p.CreatedOn, + CompletedOn: &t, }, }, }, nil) @@ -186,15 +191,8 @@ func RejectPromise(t int64, req *types.Request, res func(*types.Response, error) }) } } else { - p, err := record.Promise() - if err != nil { - slog.Error("failed to parse promise record", "record", record, "err", err) - res(nil, err) - return - } - var status types.ResponseStatus - if record.State == promise.Rejected && record.ValueIkey.Match(req.RejectPromise.Value.Ikey) { + if p.State == promise.Rejected && p.Value.Ikey.Match(req.RejectPromise.Value.Ikey) { status = types.ResponseOK } else { status = types.ResponseForbidden diff --git a/internal/app/coroutines/resolvePromise.go b/internal/app/coroutines/resolvePromise.go index 59c1c4c9..d0386bb7 100644 --- a/internal/app/coroutines/resolvePromise.go +++ b/internal/app/coroutines/resolvePromise.go @@ -52,18 +52,16 @@ func ResolvePromise(t int64, req *types.Request, res func(*types.Response, error }, }, nil) } else { - record := result.Records[0] - - param, err := record.Param() + p, err := result.Records[0].Promise() if err != nil { - slog.Error("failed to parse promise record param", "record", record, "err", err) + slog.Error("failed to parse promise record", "record", result.Records[0], "err", err) res(nil, err) return } - if record.State == promise.Pending { - if t >= record.Timeout { - s.Add(TimeoutPromise(t, req.ResolvePromise.Id, ResolvePromise(t, req, res), func(err error) { + if p.State == promise.Pending { + if t >= p.Timeout { + s.Add(TimeoutPromise(t, p, ResolvePromise(t, req, res), func(err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(nil, err) @@ -75,15 +73,18 @@ func ResolvePromise(t int64, req *types.Request, res func(*types.Response, error ResolvePromise: &types.ResolvePromiseResponse{ Status: types.ResponseForbidden, Promise: &promise.Promise{ - Id: record.Id, + Id: p.Id, State: promise.Timedout, - Param: param, + Param: p.Param, Value: promise.Value{ Headers: map[string]string{}, Ikey: nil, Data: nil, }, - Timeout: record.Timeout, + Timeout: p.Timeout, + Tags: p.Tags, + CreatedOn: p.CreatedOn, + CompletedOn: &p.Timeout, }, }, }, nil) @@ -119,9 +120,10 @@ func ResolvePromise(t int64, req *types.Request, res func(*types.Response, error { Kind: types.StoreUpdatePromise, UpdatePromise: &types.UpdatePromiseCommand{ - Id: req.ResolvePromise.Id, - State: promise.Resolved, - Value: req.ResolvePromise.Value, + Id: req.ResolvePromise.Id, + State: promise.Resolved, + Value: req.ResolvePromise.Value, + CompletedOn: t, }, }, { @@ -171,11 +173,13 @@ func ResolvePromise(t int64, req *types.Request, res func(*types.Response, error ResolvePromise: &types.ResolvePromiseResponse{ Status: types.ResponseCreated, Promise: &promise.Promise{ - Id: record.Id, - State: promise.Resolved, - Timeout: record.Timeout, - Param: param, - Value: req.ResolvePromise.Value, + Id: p.Id, + State: promise.Resolved, + Param: p.Param, + Value: req.ResolvePromise.Value, + Timeout: p.Timeout, + CreatedOn: p.CreatedOn, + CompletedOn: &t, }, }, }, nil) @@ -186,15 +190,8 @@ func ResolvePromise(t int64, req *types.Request, res func(*types.Response, error }) } } else { - p, err := record.Promise() - if err != nil { - slog.Error("failed to parse promise record", "record", record, "err", err) - res(nil, err) - return - } - var status types.ResponseStatus - if record.State == promise.Resolved && record.ValueIkey.Match(req.ResolvePromise.Value.Ikey) { + if p.State == promise.Resolved && p.Value.Ikey.Match(req.ResolvePromise.Value.Ikey) { status = types.ResponseOK } else { status = types.ResponseForbidden diff --git a/internal/app/coroutines/timeoutPromise.go b/internal/app/coroutines/timeoutPromise.go index a230f2f5..a899cba0 100644 --- a/internal/app/coroutines/timeoutPromise.go +++ b/internal/app/coroutines/timeoutPromise.go @@ -10,8 +10,8 @@ import ( "github.com/resonatehq/resonate/pkg/promise" ) -func TimeoutPromise(t int64, id string, retry *scheduler.Coroutine, res func(error)) *scheduler.Coroutine { - return scheduler.NewCoroutine(fmt.Sprintf("TimeoutPromise(id=%s)", id), "TimeoutPromise", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { +func TimeoutPromise(t int64, p *promise.Promise, retry *scheduler.Coroutine, res func(error)) *scheduler.Coroutine { + return scheduler.NewCoroutine(fmt.Sprintf("TimeoutPromise(id=%s)", p.Id), "TimeoutPromise", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { submission := &types.Submission{ Kind: types.Store, Store: &types.StoreSubmission{ @@ -20,7 +20,7 @@ func TimeoutPromise(t int64, id string, retry *scheduler.Coroutine, res func(err { Kind: types.StoreReadSubscriptions, ReadSubscriptions: &types.ReadSubscriptionsCommand{ - PromiseIds: []string{id}, + PromiseIds: []string{p.Id}, }, }, }, @@ -30,7 +30,7 @@ func TimeoutPromise(t int64, id string, retry *scheduler.Coroutine, res func(err c.Yield(submission, func(completion *types.Completion, err error) { if err != nil { - slog.Error("failed to read subscriptions", "id", id, "err", err) + slog.Error("failed to read subscriptions", "id", p.Id, "err", err) res(err) return } @@ -42,19 +42,20 @@ func TimeoutPromise(t int64, id string, retry *scheduler.Coroutine, res func(err { Kind: types.StoreUpdatePromise, UpdatePromise: &types.UpdatePromiseCommand{ - Id: id, + Id: p.Id, State: promise.Timedout, Value: promise.Value{ Headers: map[string]string{}, Ikey: nil, Data: nil, }, + CompletedOn: p.Timeout, }, }, { Kind: types.StoreDeleteTimeout, DeleteTimeout: &types.DeleteTimeoutCommand{ - Id: id, + Id: p.Id, }, }, } @@ -82,7 +83,7 @@ func TimeoutPromise(t int64, id string, retry *scheduler.Coroutine, res func(err c.Yield(submission, func(completion *types.Completion, err error) { if err != nil { - slog.Error("failed to update state", "id", id, "err", err) + slog.Error("failed to update state", "id", p.Id, "err", err) res(err) return } diff --git a/internal/app/coroutines/timeoutPromises.go b/internal/app/coroutines/timeoutPromises.go index 542b625f..cca2eca3 100644 --- a/internal/app/coroutines/timeoutPromises.go +++ b/internal/app/coroutines/timeoutPromises.go @@ -9,6 +9,7 @@ import ( "github.com/resonatehq/resonate/internal/kernel/types" "github.com/resonatehq/resonate/internal/util" "github.com/resonatehq/resonate/pkg/promise" + "github.com/resonatehq/resonate/pkg/timeout" ) func TimeoutPromises(t int64, cfg *system.Config) *scheduler.Coroutine { @@ -38,10 +39,12 @@ func TimeoutPromises(t int64, cfg *system.Config) *scheduler.Coroutine { util.Assert(completion.Store != nil, "completion must not be nil") records := completion.Store.Results[0].ReadTimeouts.Records + timeouts := []*timeout.TimeoutRecord{} promiseIds := []string{} for _, record := range records { if t >= record.Time { + timeouts = append(timeouts, record) promiseIds = append(promiseIds, record.Id) } } @@ -74,22 +77,23 @@ func TimeoutPromises(t int64, cfg *system.Config) *scheduler.Coroutine { records := completion.Store.Results[0].ReadSubscriptions.Records commands := []*types.Command{} - for _, promiseId := range promiseIds { + for _, timeout := range timeouts { commands = append(commands, &types.Command{ Kind: types.StoreUpdatePromise, UpdatePromise: &types.UpdatePromiseCommand{ - Id: promiseId, + Id: timeout.Id, State: promise.Timedout, Value: promise.Value{ Headers: map[string]string{}, Ikey: nil, Data: nil, }, + CompletedOn: timeout.Time, }, }, &types.Command{ Kind: types.StoreDeleteTimeout, DeleteTimeout: &types.DeleteTimeoutCommand{ - Id: promiseId, + Id: timeout.Id, }, }) } diff --git a/internal/app/subsystems/aio/echo/echo.go b/internal/app/subsystems/aio/echo/echo.go index db27bf87..1b644da0 100644 --- a/internal/app/subsystems/aio/echo/echo.go +++ b/internal/app/subsystems/aio/echo/echo.go @@ -39,6 +39,7 @@ func (d *EchoDevice) Process(sqes []*bus.SQE[types.Submission, types.Completion] for i, sqe := range sqes { cqes[i] = &bus.CQE[types.Submission, types.Completion]{ + Kind: sqe.Kind, Completion: &types.Completion{ Echo: &types.EchoCompletion{ Data: sqe.Submission.Echo.Data, diff --git a/internal/app/subsystems/aio/network/network.go b/internal/app/subsystems/aio/network/network.go index 7557fb5f..cd6f49a3 100644 --- a/internal/app/subsystems/aio/network/network.go +++ b/internal/app/subsystems/aio/network/network.go @@ -58,6 +58,7 @@ func (d *NetworkDevice) Process(sqes []*bus.SQE[types.Submission, types.Completi switch sqe.Submission.Network.Kind { case types.Http: cqe := &bus.CQE[types.Submission, types.Completion]{ + Kind: sqe.Kind, Callback: sqe.Callback, } diff --git a/internal/app/subsystems/aio/network/network_dst.go b/internal/app/subsystems/aio/network/network_dst.go index eacc0eb9..e3187698 100644 --- a/internal/app/subsystems/aio/network/network_dst.go +++ b/internal/app/subsystems/aio/network/network_dst.go @@ -55,6 +55,7 @@ func (d *NetworkDSTDevice) Process(sqes []*bus.SQE[types.Submission, types.Compl switch sqe.Submission.Network.Kind { case types.Http: cqe := &bus.CQE[types.Submission, types.Completion]{ + Kind: sqe.Kind, Callback: sqe.Callback, } diff --git a/internal/app/subsystems/aio/store/sqlite/sqlite.go b/internal/app/subsystems/aio/store/sqlite/sqlite.go index 66d78c55..68f4eb97 100644 --- a/internal/app/subsystems/aio/store/sqlite/sqlite.go +++ b/internal/app/subsystems/aio/store/sqlite/sqlite.go @@ -31,6 +31,9 @@ const ( "valueIkey" TEXT, "valueData" BLOB, "timeout" INTEGER, + "tags" BLOB, + "createdOn" INTEGER, + "completedOn" INGEGER, PRIMARY KEY("id") ); CREATE TABLE IF NOT EXISTS "Timeouts" ( @@ -43,6 +46,7 @@ const ( "promiseId" TEXT, "url" TEXT, "retryPolicy" BLOB, + "createdOn" INTEGER, UNIQUE("promiseId", "url"), FOREIGN KEY("promiseId") REFERENCES Promises("id") ); @@ -59,7 +63,7 @@ const ( PROMISE_SELECT_STATEMENT = ` SELECT - id, state, paramHeaders, paramIkey, paramData, valueHeaders, valueIkey, valueData, timeout + id, state, paramHeaders, paramIkey, paramData, valueHeaders, valueIkey, valueData, timeout, tags, createdOn, completedOn FROM Promises WHERE @@ -67,7 +71,7 @@ const ( PROMISE_SEARCH_STATEMENT = ` SELECT - id, state, paramHeaders, paramIkey, paramData, valueHeaders, valueIkey, valueData, timeout + id, state, paramHeaders, paramIkey, paramData, valueHeaders, valueIkey, valueData, timeout, tags, createdOn, completedOn FROM Promises WHERE @@ -75,13 +79,13 @@ const ( PROMISE_INSERT_STATEMENT = ` INSERT INTO Promises - (id, state, paramHeaders, paramIkey, paramData, timeout) + (id, state, paramHeaders, paramIkey, paramData, timeout, tags, createdOn) VALUES - (?, ?, ?, ?, ?, ?)` + (?, ?, ?, ?, ?, ?, ?, ?)` PROMISE_UPDATE_STATMENT = ` UPDATE Promises - SET state = ?, valueHeaders = ?, valueIkey = ?, valueData = ? + SET state = ?, valueHeaders = ?, valueIkey = ?, valueData = ?, completedOn = ? WHERE id = ? AND state = 1` TIMEOUT_SELECT_STATEMENT = ` @@ -104,7 +108,7 @@ const ( SUBSCRIPTION_SELECT_STATEMENT = ` SELECT - id, promiseId, url, retryPolicy + id, promiseId, url, retryPolicy, createdOn FROM Subscriptions WHERE @@ -112,9 +116,9 @@ const ( SUBSCRIPTION_INSERT_STATEMENT = ` INSERT INTO Subscriptions - (promiseId, url, retryPolicy) + (promiseId, url, retryPolicy, createdOn) VALUES - (?, ?, ?)` + (?, ?, ?, ?)` SUBSCRIPTION_DELETE_STATEMENT = ` DELETE FROM Subscriptions WHERE id = ?` @@ -362,6 +366,9 @@ func (d *SqliteStoreDevice) readPromise(tx *sql.Tx, cmd *types.ReadPromiseComman &record.ValueIkey, &record.ValueData, &record.Timeout, + &record.Tags, + &record.CreatedOn, + &record.CompletedOn, ); err != nil { if err == sql.ErrNoRows { rowsReturned = 0 @@ -409,6 +416,9 @@ func (d *SqliteStoreDevice) searchPromises(tx *sql.Tx, cmd *types.SearchPromises &record.ValueIkey, &record.ValueData, &record.Timeout, + &record.Tags, + &record.CreatedOn, + &record.CompletedOn, ); err != nil { return nil, err } @@ -428,14 +438,20 @@ func (d *SqliteStoreDevice) searchPromises(tx *sql.Tx, cmd *types.SearchPromises func (d *SqliteStoreDevice) createPromise(tx *sql.Tx, stmt *sql.Stmt, cmd *types.CreatePromiseCommand) (*types.Result, error) { util.Assert(cmd.Param.Headers != nil, "headers must not be nil") + util.Assert(cmd.Tags != nil, "tags must not be nil") headers, err := json.Marshal(cmd.Param.Headers) if err != nil { return nil, err } + tags, err := json.Marshal(cmd.Tags) + if err != nil { + return nil, err + } + // insert - res, err := stmt.Exec(cmd.Id, promise.Pending, headers, cmd.Param.Ikey, cmd.Param.Data, cmd.Timeout) + res, err := stmt.Exec(cmd.Id, promise.Pending, headers, cmd.Param.Ikey, cmd.Param.Data, cmd.Timeout, tags, cmd.CreatedOn) var rowsAffected int64 if err != nil { @@ -468,7 +484,7 @@ func (d *SqliteStoreDevice) updatePromise(tx *sql.Tx, stmt *sql.Stmt, cmd *types } // update - res, err := stmt.Exec(cmd.State, headers, cmd.Value.Ikey, cmd.Value.Data, cmd.Id) + res, err := stmt.Exec(cmd.State, headers, cmd.Value.Ikey, cmd.Value.Data, cmd.CompletedOn, cmd.Id) if err != nil { return nil, err } @@ -591,7 +607,7 @@ func (d *SqliteStoreDevice) readSubscriptions(tx *sql.Tx, cmd *types.ReadSubscri for rows.Next() { record := &subscription.SubscriptionRecord{} - if err := rows.Scan(&record.Id, &record.PromiseId, &record.Url, &record.RetryPolicy); err != nil { + if err := rows.Scan(&record.Id, &record.PromiseId, &record.Url, &record.RetryPolicy, &record.CreatedOn); err != nil { return nil, err } @@ -620,7 +636,7 @@ func (d *SqliteStoreDevice) createSubscription(tx *sql.Tx, stmt *sql.Stmt, cmd * var lastInsertId int64 // insert - res, err := stmt.Exec(cmd.PromiseId, cmd.Url, retryPolicy) + res, err := stmt.Exec(cmd.PromiseId, cmd.Url, retryPolicy, cmd.CreatedOn) if err != nil { sqliteErr, ok := err.(sqlite3.Error) if !ok || sqliteErr.ExtendedCode != sqlite3.ErrConstraintUnique { diff --git a/internal/app/subsystems/aio/store/store.go b/internal/app/subsystems/aio/store/store.go index 31aa1179..a5acfb79 100644 --- a/internal/app/subsystems/aio/store/store.go +++ b/internal/app/subsystems/aio/store/store.go @@ -26,6 +26,7 @@ func Process(store Store, sqes []*bus.SQE[types.Submission, types.Completion]) [ for i, sqe := range sqes { cqe := &bus.CQE[types.Submission, types.Completion]{ + Kind: sqe.Kind, Callback: sqe.Callback, } diff --git a/internal/app/subsystems/aio/store/test/util.go b/internal/app/subsystems/aio/store/test/util.go index 1750f859..83649327 100644 --- a/internal/app/subsystems/aio/store/test/util.go +++ b/internal/app/subsystems/aio/store/test/util.go @@ -67,6 +67,8 @@ var TestCases = []*testCase{ Param: promise.Value{ Headers: map[string]string{}, }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { @@ -92,6 +94,8 @@ var TestCases = []*testCase{ State: 1, ParamHeaders: []byte("{}"), Timeout: 1, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), }}, }, }, @@ -109,6 +113,8 @@ var TestCases = []*testCase{ Headers: map[string]string{}, Ikey: ikey("bar"), }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { @@ -135,6 +141,8 @@ var TestCases = []*testCase{ ParamHeaders: []byte("{}"), ParamIkey: ikey("bar"), Timeout: 2, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), }}, }, }, @@ -153,6 +161,8 @@ var TestCases = []*testCase{ Ikey: ikey("baz"), Data: []byte("baz"), }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { @@ -180,6 +190,8 @@ var TestCases = []*testCase{ ParamIkey: ikey("baz"), ParamData: []byte("baz"), Timeout: 3, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), }}, }, }, @@ -202,6 +214,8 @@ var TestCases = []*testCase{ Ikey: ikey("baz"), Data: []byte("baz"), }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { @@ -229,6 +243,65 @@ var TestCases = []*testCase{ ParamIkey: ikey("baz"), ParamData: []byte("baz"), Timeout: 3, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), + }}, + }, + }, + }, + }, + { + name: "CreatePromiseWithIdKeyAndParamAndHeadersAndTags", + commands: []*types.Command{ + { + Kind: types.StoreCreatePromise, + CreatePromise: &types.CreatePromiseCommand{ + Id: "baz", + Timeout: 3, + Param: promise.Value{ + Headers: map[string]string{ + "a": "a", + "b": "b", + "c": "c", + }, + Ikey: ikey("baz"), + Data: []byte("baz"), + }, + Tags: map[string]string{ + "x": "x", + "y": "y", + "z": "z", + }, + CreatedOn: 1, + }, + }, + { + Kind: types.StoreReadPromise, + ReadPromise: &types.ReadPromiseCommand{ + Id: "baz", + }, + }, + }, + expected: []*types.Result{ + { + Kind: types.StoreCreatePromise, + CreatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreReadPromise, + ReadPromise: &types.QueryPromisesResult{ + RowsReturned: 1, + Records: []*promise.PromiseRecord{{ + Id: "baz", + State: 1, + ParamHeaders: []byte(`{"a":"a","b":"b","c":"c"}`), + ParamIkey: ikey("baz"), + ParamData: []byte("baz"), + Timeout: 3, + Tags: []byte(`{"x":"x","y":"y","z":"z"}`), + CreatedOn: int64ToPointer(1), }}, }, }, @@ -244,6 +317,8 @@ var TestCases = []*testCase{ Param: promise.Value{ Headers: map[string]string{}, }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { @@ -253,6 +328,8 @@ var TestCases = []*testCase{ Param: promise.Value{ Headers: map[string]string{}, }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, }, @@ -282,6 +359,8 @@ var TestCases = []*testCase{ Param: promise.Value{ Headers: map[string]string{}, }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { @@ -292,6 +371,7 @@ var TestCases = []*testCase{ Value: promise.Value{ Headers: map[string]string{}, }, + CompletedOn: 2, }, }, { @@ -308,6 +388,8 @@ var TestCases = []*testCase{ Param: promise.Value{ Headers: map[string]string{}, }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { @@ -318,6 +400,7 @@ var TestCases = []*testCase{ Value: promise.Value{ Headers: map[string]string{}, }, + CompletedOn: 2, }, }, { @@ -350,6 +433,9 @@ var TestCases = []*testCase{ ParamHeaders: []byte("{}"), ValueHeaders: []byte("{}"), Timeout: 1, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), }}, }, }, @@ -375,6 +461,9 @@ var TestCases = []*testCase{ ParamHeaders: []byte("{}"), ValueHeaders: []byte("{}"), Timeout: 2, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), }}, }, }, @@ -391,6 +480,8 @@ var TestCases = []*testCase{ Param: promise.Value{ Headers: map[string]string{}, }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { @@ -402,6 +493,7 @@ var TestCases = []*testCase{ Headers: map[string]string{}, Ikey: ikey("foo"), }, + CompletedOn: 2, }, }, { @@ -418,6 +510,8 @@ var TestCases = []*testCase{ Param: promise.Value{ Headers: map[string]string{}, }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { @@ -429,6 +523,7 @@ var TestCases = []*testCase{ Headers: map[string]string{}, Ikey: ikey("bar"), }, + CompletedOn: 2, }, }, { @@ -462,6 +557,9 @@ var TestCases = []*testCase{ ValueHeaders: []byte("{}"), ValueIkey: ikey("foo"), Timeout: 1, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), }}, }, }, @@ -488,6 +586,9 @@ var TestCases = []*testCase{ ValueHeaders: []byte("{}"), ValueIkey: ikey("bar"), Timeout: 2, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), }}, }, }, @@ -504,6 +605,8 @@ var TestCases = []*testCase{ Param: promise.Value{ Headers: map[string]string{}, }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { @@ -516,6 +619,7 @@ var TestCases = []*testCase{ Ikey: ikey("foo"), Data: []byte("foo"), }, + CompletedOn: 2, }, }, { @@ -532,6 +636,8 @@ var TestCases = []*testCase{ Param: promise.Value{ Headers: map[string]string{}, }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { @@ -544,6 +650,7 @@ var TestCases = []*testCase{ Ikey: ikey("bar"), Data: []byte("bar"), }, + CompletedOn: 2, }, }, { @@ -578,6 +685,9 @@ var TestCases = []*testCase{ ValueIkey: ikey("foo"), ValueData: []byte("foo"), Timeout: 1, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), }}, }, }, @@ -605,6 +715,9 @@ var TestCases = []*testCase{ ValueIkey: ikey("bar"), ValueData: []byte("bar"), Timeout: 2, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), }}, }, }, @@ -621,6 +734,8 @@ var TestCases = []*testCase{ Param: promise.Value{ Headers: map[string]string{}, }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { @@ -637,6 +752,7 @@ var TestCases = []*testCase{ Ikey: ikey("foo"), Data: []byte("foo"), }, + CompletedOn: 2, }, }, { @@ -653,6 +769,8 @@ var TestCases = []*testCase{ Param: promise.Value{ Headers: map[string]string{}, }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { @@ -669,6 +787,7 @@ var TestCases = []*testCase{ Ikey: ikey("bar"), Data: []byte("bar"), }, + CompletedOn: 2, }, }, { @@ -703,6 +822,9 @@ var TestCases = []*testCase{ ValueIkey: ikey("foo"), ValueData: []byte("foo"), Timeout: 1, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), }}, }, }, @@ -730,6 +852,9 @@ var TestCases = []*testCase{ ValueIkey: ikey("bar"), ValueData: []byte("bar"), Timeout: 2, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), }}, }, }, @@ -745,6 +870,8 @@ var TestCases = []*testCase{ Param: promise.Value{ Headers: map[string]string{}, }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { @@ -755,6 +882,7 @@ var TestCases = []*testCase{ Value: promise.Value{ Headers: map[string]string{}, }, + CompletedOn: 2, }, }, { @@ -765,6 +893,7 @@ var TestCases = []*testCase{ Value: promise.Value{ Headers: map[string]string{}, }, + CompletedOn: 2, }, }, { @@ -774,6 +903,8 @@ var TestCases = []*testCase{ Param: promise.Value{ Headers: map[string]string{}, }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { @@ -784,6 +915,7 @@ var TestCases = []*testCase{ Value: promise.Value{ Headers: map[string]string{}, }, + CompletedOn: 2, }, }, { @@ -794,6 +926,7 @@ var TestCases = []*testCase{ Value: promise.Value{ Headers: map[string]string{}, }, + CompletedOn: 2, }, }, }, @@ -847,6 +980,7 @@ var TestCases = []*testCase{ Value: promise.Value{ Headers: map[string]string{}, }, + CompletedOn: 2, }, }, { @@ -857,6 +991,7 @@ var TestCases = []*testCase{ Value: promise.Value{ Headers: map[string]string{}, }, + CompletedOn: 2, }, }, }, @@ -886,6 +1021,8 @@ var TestCases = []*testCase{ Param: promise.Value{ Headers: map[string]string{}, }, + Tags: map[string]string{}, + CreatedOn: 1, }, }, { @@ -911,6 +1048,8 @@ var TestCases = []*testCase{ State: 1, ParamHeaders: []byte("{}"), Timeout: 1, + Tags: []byte("{}"), + CreatedOn: int64ToPointer(1), }}, }, }, @@ -1585,6 +1724,7 @@ var TestCases = []*testCase{ Param: promise.Value{ Headers: nil, }, + Tags: map[string]string{}, }, }}, }, @@ -1619,3 +1759,7 @@ func ikey(s string) *promise.Ikey { ikey := promise.Ikey(s) return &ikey } + +func int64ToPointer(i int64) *int64 { + return &i +} diff --git a/internal/app/subsystems/api/grpc/grpc.go b/internal/app/subsystems/api/grpc/grpc.go index ee78155f..a6890f92 100644 --- a/internal/app/subsystems/api/grpc/grpc.go +++ b/internal/app/subsystems/api/grpc/grpc.go @@ -63,6 +63,7 @@ type server struct { func (s *server) sendOrPanic(cq chan *bus.CQE[types.Request, types.Response]) func(completion *types.Response, err error) { return func(completion *types.Response, err error) { cqe := &bus.CQE[types.Request, types.Response]{ + Kind: "grpc", Completion: completion, Error: err, } @@ -79,7 +80,8 @@ func (s *server) ReadPromise(ctx context.Context, req *grpcApi.ReadPromiseReques cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "grpc", Submission: &types.Request{ Kind: types.ReadPromise, ReadPromise: &types.ReadPromiseRequest{ @@ -114,7 +116,8 @@ func (s *server) SearchPromises(ctx context.Context, req *grpcApi.SearchPromises return nil, grpcStatus.Error(codes.InvalidArgument, "") } - s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "grpc", Submission: &types.Request{ Kind: types.SearchPromises, SearchPromises: &types.SearchPromisesRequest{ @@ -173,7 +176,8 @@ func (s *server) CreatePromise(ctx context.Context, req *grpcApi.CreatePromiseRe } } - s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "grpc", Submission: &types.Request{ Kind: types.CreatePromise, CreatePromise: &types.CreatePromiseRequest{ @@ -225,7 +229,8 @@ func (s *server) CancelPromise(ctx context.Context, req *grpcApi.CancelPromiseRe data = req.Value.Data } - s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "grpc", Submission: &types.Request{ Kind: types.CancelPromise, CancelPromise: &types.CancelPromiseRequest{ @@ -275,7 +280,8 @@ func (s *server) ResolvePromise(ctx context.Context, req *grpcApi.ResolvePromise data = req.Value.Data } - s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "grpc", Submission: &types.Request{ Kind: types.ResolvePromise, ResolvePromise: &types.ResolvePromiseRequest{ @@ -325,7 +331,8 @@ func (s *server) RejectPromise(ctx context.Context, req *grpcApi.RejectPromiseRe data = req.Value.Data } - s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "grpc", Submission: &types.Request{ Kind: types.RejectPromise, RejectPromise: &types.RejectPromiseRequest{ @@ -357,7 +364,8 @@ func (s *server) ReadSubscriptions(ctx context.Context, req *grpcApi.ReadSubscri cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "grpc", Submission: &types.Request{ Kind: types.ReadSubscriptions, ReadSubscriptions: &types.ReadSubscriptionsRequest{ @@ -389,7 +397,8 @@ func (s *server) CreateSubscription(ctx context.Context, req *grpcApi.CreateSubs cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "grpc", Submission: &types.Request{ Kind: types.CreateSubscription, CreateSubscription: &types.CreateSubscriptionRequest{ @@ -417,7 +426,8 @@ func (s *server) DeleteSubscription(ctx context.Context, req *grpcApi.DeleteSubs cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue("grpc", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "grpc", Submission: &types.Request{ Kind: types.DeleteSubscription, DeleteSubscription: &types.DeleteSubscriptionRequest{ diff --git a/internal/app/subsystems/api/http/http.go b/internal/app/subsystems/api/http/http.go index bc973170..262c047f 100644 --- a/internal/app/subsystems/api/http/http.go +++ b/internal/app/subsystems/api/http/http.go @@ -72,6 +72,7 @@ type server struct { func (s *server) sendOrPanic(cq chan *bus.CQE[types.Request, types.Response]) func(completion *types.Response, err error) { return func(completion *types.Response, err error) { cqe := &bus.CQE[types.Request, types.Response]{ + Kind: "http", Completion: completion, Error: err, } diff --git a/internal/app/subsystems/api/http/promise.go b/internal/app/subsystems/api/http/promise.go index 55ad1a17..6b925300 100644 --- a/internal/app/subsystems/api/http/promise.go +++ b/internal/app/subsystems/api/http/promise.go @@ -14,7 +14,8 @@ func (s *server) readPromise(c *gin.Context) { cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "http", Submission: &types.Request{ Kind: types.ReadPromise, ReadPromise: &types.ReadPromiseRequest{ @@ -49,7 +50,8 @@ func (s *server) searchPromises(c *gin.Context) { return } - s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "http", Submission: &types.Request{ Kind: types.SearchPromises, SearchPromises: &types.SearchPromisesRequest{ @@ -86,7 +88,8 @@ func (s *server) createPromise(c *gin.Context) { cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "http", Submission: &types.Request{ Kind: types.CreatePromise, CreatePromise: createPromise, @@ -120,7 +123,8 @@ func (s *server) resolvePromise(c *gin.Context) { cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "http", Submission: &types.Request{ Kind: types.ResolvePromise, ResolvePromise: resolvePromise, @@ -154,7 +158,8 @@ func (s *server) rejectPromise(c *gin.Context) { cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "http", Submission: &types.Request{ Kind: types.RejectPromise, RejectPromise: rejectPromise, @@ -188,7 +193,8 @@ func (s *server) cancelPromise(c *gin.Context) { cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "http", Submission: &types.Request{ Kind: types.CancelPromise, CancelPromise: cancelPromise, diff --git a/internal/app/subsystems/api/http/subscription.go b/internal/app/subsystems/api/http/subscription.go index b1e05046..b165be36 100644 --- a/internal/app/subsystems/api/http/subscription.go +++ b/internal/app/subsystems/api/http/subscription.go @@ -14,7 +14,8 @@ func (s *server) readSubscriptions(c *gin.Context) { cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "http", Submission: &types.Request{ Kind: types.ReadSubscriptions, ReadSubscriptions: &types.ReadSubscriptionsRequest{ @@ -48,7 +49,8 @@ func (s *server) createSubscription(c *gin.Context) { cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "http", Submission: &types.Request{ Kind: types.CreateSubscription, CreateSubscription: createSubscription, @@ -78,7 +80,8 @@ func (s *server) deleteSubscription(c *gin.Context) { cq := make(chan *bus.CQE[types.Request, types.Response]) defer close(cq) - s.api.Enqueue("http", &bus.SQE[types.Request, types.Response]{ + s.api.Enqueue(&bus.SQE[types.Request, types.Response]{ + Kind: "http", Submission: &types.Request{ Kind: types.DeleteSubscription, DeleteSubscription: &types.DeleteSubscriptionRequest{ diff --git a/internal/kernel/bus/bus.go b/internal/kernel/bus/bus.go index 8f6f2ffb..8a086689 100644 --- a/internal/kernel/bus/bus.go +++ b/internal/kernel/bus/bus.go @@ -13,11 +13,13 @@ type Completion interface { } type SQE[S Submission, C Completion] struct { + Kind string Submission *S Callback func(*C, error) } type CQE[S Submission, C Completion] struct { + Kind string Completion *C Callback func(*C, error) Error error diff --git a/internal/kernel/scheduler/scheduler.go b/internal/kernel/scheduler/scheduler.go index de414ddb..50350d5d 100644 --- a/internal/kernel/scheduler/scheduler.go +++ b/internal/kernel/scheduler/scheduler.go @@ -38,6 +38,7 @@ func (s *Scheduler) Tick(t int64, batchSize int) { for _, coroutine := range s.coroutines { if submission := coroutine.next(); submission != nil { s.aio.Enqueue(&bus.SQE[types.Submission, types.Completion]{ + Kind: submission.Kind.String(), Submission: submission, Callback: coroutine.resume, }) diff --git a/internal/kernel/types/aio_store.go b/internal/kernel/types/aio_store.go index 1d7ac412..ae53061c 100644 --- a/internal/kernel/types/aio_store.go +++ b/internal/kernel/types/aio_store.go @@ -88,12 +88,15 @@ type CreatePromiseCommand struct { Timeout int64 Param promise.Value Subscriptions []*CreateSubscriptionCommand + Tags map[string]string + CreatedOn int64 } type UpdatePromiseCommand struct { - Id string - State promise.State - Value promise.Value + Id string + State promise.State + Value promise.Value + CompletedOn int64 } type QueryPromisesResult struct { @@ -135,6 +138,7 @@ type CreateSubscriptionCommand struct { PromiseId string Url string RetryPolicy *subscription.RetryPolicy + CreatedOn int64 } type DeleteSubscriptionCommand struct { diff --git a/internal/kernel/types/api_request.go b/internal/kernel/types/api_request.go index 7f09dd6f..a16ba9ad 100644 --- a/internal/kernel/types/api_request.go +++ b/internal/kernel/types/api_request.go @@ -34,6 +34,7 @@ type CreatePromiseRequest struct { Param promise.Value `json:"param,omitempty"` Timeout int64 `json:"timeout"` Subscriptions []*CreateSubscriptionRequest `json:"subscriptions,omitempty"` + Tags map[string]string `json:"tags,omitempty"` } type CancelPromiseRequest struct { diff --git a/pkg/promise/promise.go b/pkg/promise/promise.go index 632b7ec4..713cc1c2 100644 --- a/pkg/promise/promise.go +++ b/pkg/promise/promise.go @@ -6,11 +6,14 @@ import ( ) type Promise struct { - Id string `json:"id"` - State State `json:"state"` - Param Value `json:"param,omitempty"` - Value Value `json:"value,omitempty"` - Timeout int64 `json:"timeout"` + Id string `json:"id"` + State State `json:"state"` + Param Value `json:"param,omitempty"` + Value Value `json:"value,omitempty"` + Timeout int64 `json:"timeout"` + CreatedOn *int64 `json:"createdOn,omitempty"` + CompletedOn *int64 `json:"completedOn,omitempty"` + Tags map[string]string `json:"tags"` } func (p *Promise) String() string { diff --git a/pkg/promise/record.go b/pkg/promise/record.go index 6075093c..4dc9309a 100644 --- a/pkg/promise/record.go +++ b/pkg/promise/record.go @@ -14,29 +14,40 @@ type PromiseRecord struct { ValueIkey *Ikey ValueData []byte Timeout int64 + CreatedOn *int64 + CompletedOn *int64 + Tags []byte } func (r *PromiseRecord) Promise() (*Promise, error) { - param, err := r.Param() + param, err := r.param() if err != nil { return nil, err } - value, err := r.Value() + value, err := r.value() + if err != nil { + return nil, err + } + + tags, err := r.tags() if err != nil { return nil, err } return &Promise{ - Id: r.Id, - State: r.State, - Timeout: r.Timeout, - Param: param, - Value: value, + Id: r.Id, + State: r.State, + Timeout: r.Timeout, + Param: param, + Value: value, + CreatedOn: r.CreatedOn, + CompletedOn: r.CompletedOn, + Tags: tags, }, nil } -func (r *PromiseRecord) Param() (Value, error) { +func (r *PromiseRecord) param() (Value, error) { var headers map[string]string if r.ParamHeaders != nil { @@ -52,7 +63,7 @@ func (r *PromiseRecord) Param() (Value, error) { }, nil } -func (r *PromiseRecord) Value() (Value, error) { +func (r *PromiseRecord) value() (Value, error) { var headers map[string]string if r.ValueHeaders != nil { @@ -67,3 +78,15 @@ func (r *PromiseRecord) Value() (Value, error) { Data: r.ValueData, }, nil } + +func (r *PromiseRecord) tags() (map[string]string, error) { + var tags map[string]string + + if r.Tags != nil { + if err := json.Unmarshal(r.Tags, &tags); err != nil { + return nil, err + } + } + + return tags, nil +} diff --git a/pkg/subscription/record.go b/pkg/subscription/record.go index 4c9d02df..b13f6ec3 100644 --- a/pkg/subscription/record.go +++ b/pkg/subscription/record.go @@ -7,6 +7,7 @@ type SubscriptionRecord struct { Id int64 Url string RetryPolicy []byte + CreatedOn int64 } func (r *SubscriptionRecord) Subscription() (*Subscription, error) { @@ -20,5 +21,6 @@ func (r *SubscriptionRecord) Subscription() (*Subscription, error) { Id: r.Id, Url: r.Url, RetryPolicy: retryPolicy, + CreatedOn: r.CreatedOn, }, nil } diff --git a/pkg/subscription/subscription.go b/pkg/subscription/subscription.go index a6aa64ba..c54b34b7 100644 --- a/pkg/subscription/subscription.go +++ b/pkg/subscription/subscription.go @@ -7,6 +7,7 @@ type Subscription struct { Id int64 `json:"id"` Url string `json:"url"` RetryPolicy *RetryPolicy `json:"retryPolicy"` + CreatedOn int64 `json:"createdOn"` } type RetryPolicy struct { diff --git a/test/dst/dst.go b/test/dst/dst.go index 73e27b76..a83d8109 100644 --- a/test/dst/dst.go +++ b/test/dst/dst.go @@ -122,7 +122,7 @@ func (d *DST) Run(t *testing.T, r *rand.Rand, seed int64) { for _, req := range generator.Generate(r, time, r.Intn(d.SQEsPerTick)) { req := req - api.Enqueue("dst", &bus.SQE[types.Request, types.Response]{ + api.Enqueue(&bus.SQE[types.Request, types.Response]{ Submission: req, Callback: func(res *types.Response, err error) { var errMsg string From 44ae88902a9ea2a663a2b39a8b7e47345a06a155 Mon Sep 17 00:00:00 2001 From: David Farr Date: Thu, 31 Aug 2023 11:57:08 -0700 Subject: [PATCH 2/2] Add tags to dst --- test/dst/dst.go | 3 ++- test/dst/dst_test.go | 1 + test/dst/generator.go | 45 +++++++++++++++++++++++++++++++------------ 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/test/dst/dst.go b/test/dst/dst.go index a83d8109..da0c1924 100644 --- a/test/dst/dst.go +++ b/test/dst/dst.go @@ -32,6 +32,7 @@ type DST struct { Ikeys int Data int Headers int + Tags int Retries int Subscriptions int PromiseCacheSize int @@ -91,7 +92,7 @@ func (d *DST) Run(t *testing.T, r *rand.Rand, seed int64) { system.AddOnTick(1, coroutines.NotifySubscriptions) // generator - generator := NewGenerator(r, d.Ids, d.Ikeys, d.Data, d.Headers, d.Retries, d.Subscriptions, d.Time(d.Ticks)) + generator := NewGenerator(r, d.Ids, d.Ikeys, d.Data, d.Headers, d.Tags, d.Retries, d.Subscriptions, d.Time(d.Ticks)) generator.AddRequest(generator.GenerateReadPromise) generator.AddRequest(generator.GenerateSearchPromises) generator.AddRequest(generator.GenerateCreatePromise) diff --git a/test/dst/dst_test.go b/test/dst/dst_test.go index ea607b2c..ab2a388a 100644 --- a/test/dst/dst_test.go +++ b/test/dst/dst_test.go @@ -45,6 +45,7 @@ func TestDST(t *testing.T) { Ikeys: test.RangeIntn(r, 1, 100), Data: test.RangeIntn(r, 1, 100), Headers: test.RangeIntn(r, 1, 100), + Tags: test.RangeIntn(r, 1, 100), Retries: test.RangeIntn(r, 1, 100), Subscriptions: test.RangeIntn(r, 1, 100), PromiseCacheSize: cs(i), diff --git a/test/dst/generator.go b/test/dst/generator.go index 770ac86d..f0abefd9 100644 --- a/test/dst/generator.go +++ b/test/dst/generator.go @@ -16,6 +16,7 @@ type Generator struct { ikeySet []*promise.Ikey dataSet [][]byte headersSet []map[string]string + tagsSet []map[string]string retrySet []int subscriptionSet []string time int64 @@ -25,7 +26,7 @@ type Generator struct { type RequestGenerator func(*rand.Rand, int64) *types.Request -func NewGenerator(r *rand.Rand, ids int, ikeys int, data int, headers int, retries int, subscriptions int, time int64) *Generator { +func NewGenerator(r *rand.Rand, ids int, ikeys int, data int, headers int, tags int, retries int, subscriptions int, time int64) *Generator { idSet := make([]string, ids) for i := 0; i < ids; i++ { idSet[i] = strconv.Itoa(i) @@ -53,6 +54,16 @@ func NewGenerator(r *rand.Rand, ids int, ikeys int, data int, headers int, retri headersSet = append(headersSet, headers, nil) // half of all headers are nil } + tagsSet := []map[string]string{} + for i := 0; i < tags; i++ { + tags := map[string]string{} + for j := 0; j < r.Intn(3)+1; j++ { + tags[strconv.Itoa(j)] = fmt.Sprintf("%d.%d", i, j) + } + + tagsSet = append(tagsSet, tags, nil) // half of all tags are nil + } + retrySet := make([]int, retries) for i := 0; i < retries; i++ { retrySet[i] = i @@ -68,6 +79,7 @@ func NewGenerator(r *rand.Rand, ids int, ikeys int, data int, headers int, retri ikeySet: ikeySet, dataSet: dataSet, headersSet: headersSet, + tagsSet: tagsSet, retrySet: retrySet, subscriptionSet: subscriptionSet, time: time, @@ -117,12 +129,26 @@ func (g *Generator) GenerateCreatePromise(r *rand.Rand, t int64) *types.Request ikey := g.ikeySet[r.Intn(len(g.ikeySet))] data := g.dataSet[r.Intn(len(g.dataSet))] headers := g.headersSet[r.Intn(len(g.headersSet))] + tags := g.tagsSet[r.Intn(len(g.tagsSet))] timeout := test.RangeInt63n(r, t, g.time) - url := g.subscriptionSet[r.Intn(len(g.subscriptionSet))] - delay := g.retrySet[r.Intn(len(g.retrySet))] - attempts := test.RangeIntn(r, 1, 4) - g.subscriptions++ + // include up to 3 subscriptions + n := r.Intn(4) + g.subscriptions += int64(n) + subscriptions := make([]*types.CreateSubscriptionRequest, n) + for i := 0; i < n; i++ { + url := g.subscriptionSet[r.Intn(len(g.subscriptionSet))] + delay := g.retrySet[r.Intn(len(g.retrySet))] + attempts := test.RangeIntn(r, 1, 4) + + subscriptions[i] = &types.CreateSubscriptionRequest{ + Url: url, + RetryPolicy: &subscription.RetryPolicy{ + Delay: int64(delay), + Attempts: int64(attempts), + }, + } + } return &types.Request{ Kind: types.CreatePromise, @@ -134,13 +160,8 @@ func (g *Generator) GenerateCreatePromise(r *rand.Rand, t int64) *types.Request Ikey: ikey, Data: data, }, - Subscriptions: []*types.CreateSubscriptionRequest{{ - Url: url, - RetryPolicy: &subscription.RetryPolicy{ - Delay: int64(delay), - Attempts: int64(attempts), - }, - }}, + Tags: tags, + Subscriptions: subscriptions, }, } }