From cf8a686d8fc26b52c89aaaee03b8f2547bf9bfea Mon Sep 17 00:00:00 2001 From: David Farr Date: Wed, 27 Sep 2023 22:02:28 -0700 Subject: [PATCH 1/3] Timeout promises in batch --- internal/app/coroutines/cancelPromise.go | 75 +-- internal/app/coroutines/completePromise.go | 75 +-- internal/app/coroutines/createPromise.go | 9 +- internal/app/coroutines/readPromise.go | 2 +- internal/app/coroutines/rejectPromise.go | 75 +-- internal/app/coroutines/resolvePromise.go | 73 +-- internal/app/coroutines/searchPromises.go | 57 +- internal/app/coroutines/timeoutPromise.go | 140 ++-- internal/app/coroutines/timeoutPromises.go | 120 +--- .../subsystems/aio/store/postgres/postgres.go | 150 ++++- .../app/subsystems/aio/store/sqlite/sqlite.go | 157 ++++- .../app/subsystems/aio/store/test/util.go | 616 ++++++++++++++---- internal/kernel/types/aio_store.go | 120 ++-- 13 files changed, 1053 insertions(+), 616 deletions(-) diff --git a/internal/app/coroutines/cancelPromise.go b/internal/app/coroutines/cancelPromise.go index 71a64002..f6ae15a2 100644 --- a/internal/app/coroutines/cancelPromise.go +++ b/internal/app/coroutines/cancelPromise.go @@ -30,12 +30,6 @@ func CancelPromise(t int64, req *types.Request, res func(int64, *types.Response, Id: req.CancelPromise.Id, }, }, - { - Kind: types.StoreReadSubscriptions, - ReadSubscriptions: &types.ReadSubscriptionsCommand{ - PromiseIds: []string{req.CancelPromise.Id}, - }, - }, }, }, }, @@ -49,11 +43,8 @@ func CancelPromise(t int64, req *types.Request, res func(int64, *types.Response, } util.Assert(completion.Store != nil, "completion must not be nil") - util.Assert(len(completion.Store.Results) == 2, "completion must contain two results") result := completion.Store.Results[0].ReadPromise - records := completion.Store.Results[1].ReadSubscriptions.Records - util.Assert(result.RowsReturned == 0 || result.RowsReturned == 1, "result must return 0 or 1 rows") if result.RowsReturned == 0 { @@ -73,7 +64,7 @@ func CancelPromise(t int64, req *types.Request, res func(int64, *types.Response, if p.State == promise.Pending { if t >= p.Timeout { - s.Add(TimeoutPromise(t, p, records, CancelPromise(t, req, res), func(t int64, err error) { + s.Add(TimeoutPromise(t, p, CancelPromise(t, req, res), func(t int64, err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(t, nil, err) @@ -102,55 +93,41 @@ func CancelPromise(t int64, req *types.Request, res func(int64, *types.Response, }, nil) })) } else { - commands := []*types.Command{ - { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.UpdatePromiseCommand{ - Id: req.CancelPromise.Id, - State: promise.Canceled, - Value: req.CancelPromise.Value, - CompletedOn: t, - }, - }, - { - Kind: types.StoreDeleteTimeout, - DeleteTimeout: &types.DeleteTimeoutCommand{ - Id: req.CancelPromise.Id, - }, - }, - { - Kind: types.StoreDeleteSubscriptions, - DeleteSubscriptions: &types.DeleteSubscriptionsCommand{ - PromiseId: req.CancelPromise.Id, - }, - }, - } - - for _, record := range records { - commands = append(commands, &types.Command{ - Kind: types.StoreCreateNotification, - CreateNotification: &types.CreateNotificationCommand{ - Id: record.Id, - PromiseId: record.PromiseId, - Url: record.Url, - RetryPolicy: record.RetryPolicy, - Time: t, - }, - }) - } - submission := &types.Submission{ Kind: types.Store, Store: &types.StoreSubmission{ Transaction: &types.Transaction{ - Commands: commands, + Commands: []*types.Command{ + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: req.CancelPromise.Id, + State: promise.Canceled, + Value: req.CancelPromise.Value, + CompletedOn: t, + }, + }, + { + Kind: types.StoreCreateNotifications, + CreateNotifications: &types.CreateNotificationsCommand{ + PromiseId: req.CancelPromise.Id, + Time: t, + }, + }, + { + Kind: types.StoreDeleteSubscriptions, + DeleteSubscriptions: &types.DeleteSubscriptionsCommand{ + PromiseId: req.CancelPromise.Id, + }, + }, + }, }, }, } c.Yield(submission, func(t int64, completion *types.Completion, err error) { if err != nil { - slog.Error("failed to update state", "req", req, "err", err) + slog.Error("failed to update promise", "req", req, "err", err) res(t, nil, err) return } diff --git a/internal/app/coroutines/completePromise.go b/internal/app/coroutines/completePromise.go index 5fa01644..70a183f0 100644 --- a/internal/app/coroutines/completePromise.go +++ b/internal/app/coroutines/completePromise.go @@ -32,12 +32,6 @@ func CompletePromise(t int64, req *types.Request, res func(int64, *types.Respons Id: req.CompletePromise.Id, }, }, - { - Kind: types.StoreReadSubscriptions, - ReadSubscriptions: &types.ReadSubscriptionsCommand{ - PromiseIds: []string{req.CompletePromise.Id}, - }, - }, }, }, }, @@ -51,11 +45,8 @@ func CompletePromise(t int64, req *types.Request, res func(int64, *types.Respons } util.Assert(completion.Store != nil, "completion must not be nil") - util.Assert(len(completion.Store.Results) == 2, "completion must contain two results") result := completion.Store.Results[0].ReadPromise - records := completion.Store.Results[1].ReadSubscriptions.Records - util.Assert(result.RowsReturned == 0 || result.RowsReturned == 1, "result must return 0 or 1 rows") if result.RowsReturned == 0 { @@ -75,7 +66,7 @@ func CompletePromise(t int64, req *types.Request, res func(int64, *types.Respons if p.State == promise.Pending { if t >= p.Timeout { - s.Add(TimeoutPromise(t, p, records, CompletePromise(t, req, res), func(t int64, err error) { + s.Add(TimeoutPromise(t, p, CompletePromise(t, req, res), func(t int64, err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(t, nil, err) @@ -104,55 +95,41 @@ func CompletePromise(t int64, req *types.Request, res func(int64, *types.Respons }, nil) })) } else { - commands := []*types.Command{ - { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.UpdatePromiseCommand{ - Id: req.CompletePromise.Id, - State: req.CompletePromise.State, - Value: req.CompletePromise.Value, - CompletedOn: t, - }, - }, - { - Kind: types.StoreDeleteTimeout, - DeleteTimeout: &types.DeleteTimeoutCommand{ - Id: req.CompletePromise.Id, - }, - }, - { - Kind: types.StoreDeleteSubscriptions, - DeleteSubscriptions: &types.DeleteSubscriptionsCommand{ - PromiseId: req.CompletePromise.Id, - }, - }, - } - - for _, record := range records { - commands = append(commands, &types.Command{ - Kind: types.StoreCreateNotification, - CreateNotification: &types.CreateNotificationCommand{ - Id: record.Id, - PromiseId: record.PromiseId, - Url: record.Url, - RetryPolicy: record.RetryPolicy, - Time: t, - }, - }) - } - submission := &types.Submission{ Kind: types.Store, Store: &types.StoreSubmission{ Transaction: &types.Transaction{ - Commands: commands, + Commands: []*types.Command{ + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: req.CompletePromise.Id, + State: req.CompletePromise.State, + Value: req.CompletePromise.Value, + CompletedOn: t, + }, + }, + { + Kind: types.StoreCreateNotifications, + CreateNotifications: &types.CreateNotificationsCommand{ + PromiseId: req.CompletePromise.Id, + Time: t, + }, + }, + { + Kind: types.StoreDeleteSubscriptions, + DeleteSubscriptions: &types.DeleteSubscriptionsCommand{ + PromiseId: req.CompletePromise.Id, + }, + }, + }, }, }, } c.Yield(submission, func(t int64, completion *types.Completion, err error) { if err != nil { - slog.Error("failed to update state", "req", req, "err", err) + slog.Error("failed to update promise", "req", req, "err", err) res(t, nil, err) return } diff --git a/internal/app/coroutines/createPromise.go b/internal/app/coroutines/createPromise.go index 768f3703..e6b5a078 100644 --- a/internal/app/coroutines/createPromise.go +++ b/internal/app/coroutines/createPromise.go @@ -66,13 +66,6 @@ func CreatePromise(t int64, req *types.Request, res func(int64, *types.Response, CreatedOn: t, }, }, - { - Kind: types.StoreCreateTimeout, - CreateTimeout: &types.CreateTimeoutCommand{ - Id: req.CreatePromise.Id, - Time: req.CreatePromise.Timeout, - }, - }, }, }, }, @@ -125,7 +118,7 @@ func CreatePromise(t int64, req *types.Request, res func(int64, *types.Response, } if p.State == promise.Pending && t >= p.Timeout { - s.Add(TimeoutPromise(t, p, nil, CreatePromise(t, req, res), func(t int64, err error) { + s.Add(TimeoutPromise(t, p, CreatePromise(t, req, res), func(t int64, err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(t, nil, err) diff --git a/internal/app/coroutines/readPromise.go b/internal/app/coroutines/readPromise.go index 1655aab3..1995dea8 100644 --- a/internal/app/coroutines/readPromise.go +++ b/internal/app/coroutines/readPromise.go @@ -56,7 +56,7 @@ func ReadPromise(t int64, req *types.Request, res func(int64, *types.Response, e } if p.State == promise.Pending && t >= p.Timeout { - s.Add(TimeoutPromise(t, p, nil, ReadPromise(t, req, res), func(t int64, err error) { + s.Add(TimeoutPromise(t, p, ReadPromise(t, req, res), func(t int64, err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(t, nil, err) diff --git a/internal/app/coroutines/rejectPromise.go b/internal/app/coroutines/rejectPromise.go index 5eb27034..0908c5aa 100644 --- a/internal/app/coroutines/rejectPromise.go +++ b/internal/app/coroutines/rejectPromise.go @@ -30,12 +30,6 @@ func RejectPromise(t int64, req *types.Request, res func(int64, *types.Response, Id: req.RejectPromise.Id, }, }, - { - Kind: types.StoreReadSubscriptions, - ReadSubscriptions: &types.ReadSubscriptionsCommand{ - PromiseIds: []string{req.RejectPromise.Id}, - }, - }, }, }, }, @@ -49,11 +43,8 @@ func RejectPromise(t int64, req *types.Request, res func(int64, *types.Response, } util.Assert(completion.Store != nil, "completion must not be nil") - util.Assert(len(completion.Store.Results) == 2, "completion must contain two results") result := completion.Store.Results[0].ReadPromise - records := completion.Store.Results[1].ReadSubscriptions.Records - util.Assert(result.RowsReturned == 0 || result.RowsReturned == 1, "result must return 0 or 1 rows") if result.RowsReturned == 0 { @@ -73,7 +64,7 @@ func RejectPromise(t int64, req *types.Request, res func(int64, *types.Response, if p.State == promise.Pending { if t >= p.Timeout { - s.Add(TimeoutPromise(t, p, records, RejectPromise(t, req, res), func(t int64, err error) { + s.Add(TimeoutPromise(t, p, RejectPromise(t, req, res), func(t int64, err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(t, nil, err) @@ -102,55 +93,41 @@ func RejectPromise(t int64, req *types.Request, res func(int64, *types.Response, }, nil) })) } else { - commands := []*types.Command{ - { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.UpdatePromiseCommand{ - Id: req.RejectPromise.Id, - State: promise.Rejected, - Value: req.RejectPromise.Value, - CompletedOn: t, - }, - }, - { - Kind: types.StoreDeleteTimeout, - DeleteTimeout: &types.DeleteTimeoutCommand{ - Id: req.RejectPromise.Id, - }, - }, - { - Kind: types.StoreDeleteSubscriptions, - DeleteSubscriptions: &types.DeleteSubscriptionsCommand{ - PromiseId: req.RejectPromise.Id, - }, - }, - } - - for _, record := range records { - commands = append(commands, &types.Command{ - Kind: types.StoreCreateNotification, - CreateNotification: &types.CreateNotificationCommand{ - Id: record.Id, - PromiseId: record.PromiseId, - Url: record.Url, - RetryPolicy: record.RetryPolicy, - Time: t, - }, - }) - } - submission := &types.Submission{ Kind: types.Store, Store: &types.StoreSubmission{ Transaction: &types.Transaction{ - Commands: commands, + Commands: []*types.Command{ + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: req.RejectPromise.Id, + State: promise.Rejected, + Value: req.RejectPromise.Value, + CompletedOn: t, + }, + }, + { + Kind: types.StoreCreateNotifications, + CreateNotifications: &types.CreateNotificationsCommand{ + PromiseId: req.RejectPromise.Id, + Time: t, + }, + }, + { + Kind: types.StoreDeleteSubscriptions, + DeleteSubscriptions: &types.DeleteSubscriptionsCommand{ + PromiseId: req.RejectPromise.Id, + }, + }, + }, }, }, } c.Yield(submission, func(t int64, completion *types.Completion, err error) { if err != nil { - slog.Error("failed to update state", "req", req, "err", err) + slog.Error("failed to update promise", "req", req, "err", err) res(t, nil, err) return } diff --git a/internal/app/coroutines/resolvePromise.go b/internal/app/coroutines/resolvePromise.go index c2a8e6c6..0fa3c328 100644 --- a/internal/app/coroutines/resolvePromise.go +++ b/internal/app/coroutines/resolvePromise.go @@ -30,12 +30,6 @@ func ResolvePromise(t int64, req *types.Request, res func(int64, *types.Response Id: req.ResolvePromise.Id, }, }, - { - Kind: types.StoreReadSubscriptions, - ReadSubscriptions: &types.ReadSubscriptionsCommand{ - PromiseIds: []string{req.ResolvePromise.Id}, - }, - }, }, }, }, @@ -49,11 +43,8 @@ func ResolvePromise(t int64, req *types.Request, res func(int64, *types.Response } util.Assert(completion.Store != nil, "completion must not be nil") - util.Assert(len(completion.Store.Results) == 2, "completion must contain two results") result := completion.Store.Results[0].ReadPromise - records := completion.Store.Results[1].ReadSubscriptions.Records - util.Assert(result.RowsReturned == 0 || result.RowsReturned == 1, "result must return 0 or 1 rows") if result.RowsReturned == 0 { @@ -73,7 +64,7 @@ func ResolvePromise(t int64, req *types.Request, res func(int64, *types.Response if p.State == promise.Pending { if t >= p.Timeout { - s.Add(TimeoutPromise(t, p, records, ResolvePromise(t, req, res), func(t int64, err error) { + s.Add(TimeoutPromise(t, p, ResolvePromise(t, req, res), func(t int64, err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(t, nil, err) @@ -102,48 +93,34 @@ func ResolvePromise(t int64, req *types.Request, res func(int64, *types.Response }, nil) })) } else { - commands := []*types.Command{ - { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.UpdatePromiseCommand{ - Id: req.ResolvePromise.Id, - State: promise.Resolved, - Value: req.ResolvePromise.Value, - CompletedOn: t, - }, - }, - { - Kind: types.StoreDeleteTimeout, - DeleteTimeout: &types.DeleteTimeoutCommand{ - Id: req.ResolvePromise.Id, - }, - }, - { - Kind: types.StoreDeleteSubscriptions, - DeleteSubscriptions: &types.DeleteSubscriptionsCommand{ - PromiseId: req.ResolvePromise.Id, - }, - }, - } - - for _, record := range records { - commands = append(commands, &types.Command{ - Kind: types.StoreCreateNotification, - CreateNotification: &types.CreateNotificationCommand{ - Id: record.Id, - PromiseId: record.PromiseId, - Url: record.Url, - RetryPolicy: record.RetryPolicy, - Time: t, - }, - }) - } - submission := &types.Submission{ Kind: types.Store, Store: &types.StoreSubmission{ Transaction: &types.Transaction{ - Commands: commands, + Commands: []*types.Command{ + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: req.ResolvePromise.Id, + State: promise.Resolved, + Value: req.ResolvePromise.Value, + CompletedOn: t, + }, + }, + { + Kind: types.StoreCreateNotifications, + CreateNotifications: &types.CreateNotificationsCommand{ + PromiseId: req.ResolvePromise.Id, + Time: t, + }, + }, + { + Kind: types.StoreDeleteSubscriptions, + DeleteSubscriptions: &types.DeleteSubscriptionsCommand{ + PromiseId: req.ResolvePromise.Id, + }, + }, + }, }, }, } diff --git a/internal/app/coroutines/searchPromises.go b/internal/app/coroutines/searchPromises.go index bc612889..3088e610 100644 --- a/internal/app/coroutines/searchPromises.go +++ b/internal/app/coroutines/searchPromises.go @@ -10,13 +10,31 @@ import ( "github.com/resonatehq/resonate/pkg/promise" ) -func SearchPromises(t1 int64, req *types.Request, res func(int64, *types.Response, error)) *scheduler.Coroutine { +func SearchPromises(t int64, req *types.Request, res func(int64, *types.Response, error)) *scheduler.Coroutine { return scheduler.NewCoroutine(fmt.Sprintf("SearchPromises(q=%s)", req.SearchPromises.Q), "SearchPromises", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { submission := &types.Submission{ Kind: types.Store, Store: &types.StoreSubmission{ Transaction: &types.Transaction{ Commands: []*types.Command{ + { + Kind: types.StoreTimeoutCreateNotifications, + TimeoutCreateNotifications: &types.TimeoutCreateNotificationsCommand{ + Time: t, + }, + }, + { + Kind: types.StoreTimeoutDeleteSubscriptions, + TimeoutDeleteSubscriptions: &types.TimeoutDeleteSubscriptionsCommand{ + Time: t, + }, + }, + { + Kind: types.StoreTimeoutPromises, + TimeoutPromises: &types.TimeoutPromisesCommand{ + Time: t, + }, + }, { Kind: types.StoreSearchPromises, SearchPromises: &types.SearchPromisesCommand{ @@ -31,50 +49,27 @@ func SearchPromises(t1 int64, req *types.Request, res func(int64, *types.Respons }, } - c.Yield(submission, func(t2 int64, completion *types.Completion, err error) { + c.Yield(submission, func(t int64, completion *types.Completion, err error) { if err != nil { slog.Error("failed to search promises", "req", req, "err", err) - res(t2, nil, err) + res(t, nil, err) return } util.Assert(completion.Store != nil, "completion must not be nil") + util.Assert(len(completion.Store.Results) == 4, "must have four results") - result := completion.Store.Results[0].SearchPromises + result := completion.Store.Results[3].SearchPromises promises := []*promise.Promise{} - var timedoutInSearch bool - for _, state := range req.SearchPromises.States { - if state == promise.Timedout { - timedoutInSearch = true - break - } - } - for _, record := range result.Records { - p, err := record.Promise() + promise, err := record.Promise() if err != nil { slog.Warn("failed to parse promise record", "record", record, "err", err) continue } - if p.State == promise.Pending && t1 >= p.Timeout { - // ignore "timedout" promise if not in search - if !timedoutInSearch { - continue - } - - // set to "timedout" if (request) time is greater than timeout - p.State = promise.Timedout - p.Value = promise.Value{ - Headers: map[string]string{}, - Ikey: nil, - Data: nil, - } - p.CompletedOn = &p.Timeout - } - - promises = append(promises, p) + promises = append(promises, promise) } // set cursor only if there are more results @@ -90,7 +85,7 @@ func SearchPromises(t1 int64, req *types.Request, res func(int64, *types.Respons } } - res(t2, &types.Response{ + res(t, &types.Response{ Kind: types.SearchPromises, SearchPromises: &types.SearchPromisesResponse{ Status: types.ResponseOK, diff --git a/internal/app/coroutines/timeoutPromise.go b/internal/app/coroutines/timeoutPromise.go index f119767a..96d7cdbf 100644 --- a/internal/app/coroutines/timeoutPromise.go +++ b/internal/app/coroutines/timeoutPromise.go @@ -8,113 +8,63 @@ import ( "github.com/resonatehq/resonate/internal/kernel/types" "github.com/resonatehq/resonate/internal/util" "github.com/resonatehq/resonate/pkg/promise" - "github.com/resonatehq/resonate/pkg/subscription" ) -func TimeoutPromise(t int64, p *promise.Promise, subscriptions []*subscription.SubscriptionRecord, retry *scheduler.Coroutine, res func(int64, error)) *scheduler.Coroutine { +func TimeoutPromise(t int64, p *promise.Promise, retry *scheduler.Coroutine, res func(int64, error)) *scheduler.Coroutine { return scheduler.NewCoroutine(fmt.Sprintf("TimeoutPromise(id=%s)", p.Id), "TimeoutPromise", func(s *scheduler.Scheduler, c *scheduler.Coroutine) { - if subscriptions != nil { - timeoutPromise(s, c, t, p, subscriptions, retry, res) - } else { - submission := &types.Submission{ - Kind: types.Store, - Store: &types.StoreSubmission{ - Transaction: &types.Transaction{ - Commands: []*types.Command{ - { - Kind: types.StoreReadSubscriptions, - ReadSubscriptions: &types.ReadSubscriptionsCommand{ - PromiseIds: []string{p.Id}, + submission := &types.Submission{ + Kind: types.Store, + Store: &types.StoreSubmission{ + Transaction: &types.Transaction{ + Commands: []*types.Command{ + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: p.Id, + State: promise.Timedout, + Value: promise.Value{ + Headers: map[string]string{}, + Ikey: nil, + Data: []byte{}, }, + CompletedOn: p.Timeout, + }, + }, + { + Kind: types.StoreCreateNotifications, + CreateNotifications: &types.CreateNotificationsCommand{ + PromiseId: p.Id, + Time: t, + }, + }, + { + Kind: types.StoreDeleteSubscriptions, + DeleteSubscriptions: &types.DeleteSubscriptionsCommand{ + PromiseId: p.Id, }, }, }, }, - } - - c.Yield(submission, func(t int64, completion *types.Completion, err error) { - if err != nil { - slog.Error("failed to read subscriptions", "id", p.Id, "err", err) - res(t, err) - return - } - - util.Assert(completion.Store != nil, "completion must not be nil") - subscriptions := completion.Store.Results[0].ReadSubscriptions.Records - - timeoutPromise(s, c, t, p, subscriptions, retry, res) - }) - } - }) -} - -func timeoutPromise(s *scheduler.Scheduler, c *scheduler.Coroutine, t int64, p *promise.Promise, subscriptions []*subscription.SubscriptionRecord, retry *scheduler.Coroutine, res func(int64, error)) { - commands := []*types.Command{ - { - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.UpdatePromiseCommand{ - Id: p.Id, - State: promise.Timedout, - Value: promise.Value{ - Headers: map[string]string{}, - Ikey: nil, - Data: []byte{}, - }, - CompletedOn: p.Timeout, }, - }, - { - Kind: types.StoreDeleteTimeout, - DeleteTimeout: &types.DeleteTimeoutCommand{ - Id: p.Id, - }, - }, - { - Kind: types.StoreDeleteSubscriptions, - DeleteSubscriptions: &types.DeleteSubscriptionsCommand{ - PromiseId: p.Id, - }, - }, - } - - for _, record := range subscriptions { - commands = append(commands, &types.Command{ - Kind: types.StoreCreateNotification, - CreateNotification: &types.CreateNotificationCommand{ - Id: record.Id, - PromiseId: record.PromiseId, - Url: record.Url, - RetryPolicy: record.RetryPolicy, - Time: t, - }, - }) - } - - submission := &types.Submission{ - Kind: types.Store, - Store: &types.StoreSubmission{ - Transaction: &types.Transaction{ - Commands: commands, - }, - }, - } - - c.Yield(submission, func(t int64, completion *types.Completion, err error) { - if err != nil { - slog.Error("failed to update state", "id", p.Id, "err", err) - res(t, err) - return } - util.Assert(completion.Store != nil, "completion must not be nil") + c.Yield(submission, func(t int64, completion *types.Completion, err error) { + if err != nil { + slog.Error("failed to update promise", "id", p.Id, "err", err) + res(t, err) + return + } - result := completion.Store.Results[0].UpdatePromise - util.Assert(result.RowsAffected == 0 || result.RowsAffected == 1, "result must return 0 or 1 rows") + util.Assert(completion.Store != nil, "completion must not be nil") - if result.RowsAffected == 1 { - res(t, nil) - } else { - s.Add(retry) - } + result := completion.Store.Results[0].UpdatePromise + util.Assert(result.RowsAffected == 0 || result.RowsAffected == 1, "result must return 0 or 1 rows") + + if result.RowsAffected == 1 { + res(t, nil) + } else { + s.Add(retry) + } + }) }) } diff --git a/internal/app/coroutines/timeoutPromises.go b/internal/app/coroutines/timeoutPromises.go index 8d855904..3c1d7723 100644 --- a/internal/app/coroutines/timeoutPromises.go +++ b/internal/app/coroutines/timeoutPromises.go @@ -8,8 +8,6 @@ import ( "github.com/resonatehq/resonate/internal/kernel/system" "github.com/resonatehq/resonate/internal/kernel/types" "github.com/resonatehq/resonate/internal/util" - "github.com/resonatehq/resonate/pkg/promise" - "github.com/resonatehq/resonate/pkg/timeout" ) func TimeoutPromises(t int64, config *system.Config) *scheduler.Coroutine { @@ -20,9 +18,21 @@ func TimeoutPromises(t int64, config *system.Config) *scheduler.Coroutine { Transaction: &types.Transaction{ Commands: []*types.Command{ { - Kind: types.StoreReadTimeouts, - ReadTimeouts: &types.ReadTimeoutsCommand{ - N: config.TimeoutCacheSize, + Kind: types.StoreTimeoutCreateNotifications, + TimeoutCreateNotifications: &types.TimeoutCreateNotificationsCommand{ + Time: t, + }, + }, + { + Kind: types.StoreTimeoutDeleteSubscriptions, + TimeoutDeleteSubscriptions: &types.TimeoutDeleteSubscriptionsCommand{ + Time: t, + }, + }, + { + Kind: types.StoreTimeoutPromises, + TimeoutPromises: &types.TimeoutPromisesCommand{ + Time: t, }, }, }, @@ -37,101 +47,15 @@ func TimeoutPromises(t int64, config *system.Config) *scheduler.Coroutine { } util.Assert(completion.Store != nil, "completion must not be nil") + util.Assert(len(completion.Store.Results) == 3, "completion must have three results") - records := completion.Store.Results[0].ReadTimeouts.Records - timeouts := []*timeout.TimeoutRecord{} - promiseIds := []string{} - - for _, record := range records { - if t >= record.Time { - timeouts = append(timeouts, record) - promiseIds = append(promiseIds, record.Id) - } - } - - if len(promiseIds) > 0 { - submission := &types.Submission{ - Kind: types.Store, - Store: &types.StoreSubmission{ - Transaction: &types.Transaction{ - Commands: []*types.Command{ - { - Kind: types.StoreReadSubscriptions, - ReadSubscriptions: &types.ReadSubscriptionsCommand{ - PromiseIds: promiseIds, - }, - }, - }, - }, - }, - } - - c.Yield(submission, func(t int64, completion *types.Completion, err error) { - if err != nil { - slog.Error("failed to read subscriptions", "err", err) - return - } - - util.Assert(completion.Store != nil, "completion must not be nil") - - records := completion.Store.Results[0].ReadSubscriptions.Records - commands := []*types.Command{} - - for _, timeout := range timeouts { - commands = append(commands, &types.Command{ - Kind: types.StoreUpdatePromise, - UpdatePromise: &types.UpdatePromiseCommand{ - Id: timeout.Id, - State: promise.Timedout, - Value: promise.Value{ - Headers: map[string]string{}, - Ikey: nil, - Data: []byte{}, - }, - CompletedOn: timeout.Time, - }, - }, &types.Command{ - Kind: types.StoreDeleteTimeout, - DeleteTimeout: &types.DeleteTimeoutCommand{ - Id: timeout.Id, - }, - }, &types.Command{ - Kind: types.StoreDeleteSubscriptions, - DeleteSubscriptions: &types.DeleteSubscriptionsCommand{ - PromiseId: timeout.Id, - }, - }) - } - - for _, record := range records { - commands = append(commands, &types.Command{ - Kind: types.StoreCreateNotification, - CreateNotification: &types.CreateNotificationCommand{ - Id: record.Id, - PromiseId: record.PromiseId, - Url: record.Url, - RetryPolicy: record.RetryPolicy, - Time: t, - }, - }) - } - - submission := &types.Submission{ - Kind: types.Store, - Store: &types.StoreSubmission{ - Transaction: &types.Transaction{ - Commands: commands, - }, - }, - } + notifications := completion.Store.Results[0].TimeoutCreateNotifications.RowsAffected + subscriptions := completion.Store.Results[1].TimeoutDeleteSubscriptions.RowsAffected + promises := completion.Store.Results[2].TimeoutPromises.RowsAffected - c.Yield(submission, func(t int64, completion *types.Completion, err error) { - if err != nil { - slog.Error("failed to update state", "err", err) - return - } - }) - }) + util.Assert(notifications == subscriptions, "must create the same number of notifications as subscriptions deleted") + if promises == 0 { + util.Assert(subscriptions == 0 && notifications == 0, "must not create notifications when no promises timed out") } }) }) diff --git a/internal/app/subsystems/aio/store/postgres/postgres.go b/internal/app/subsystems/aio/store/postgres/postgres.go index fe7c0f3d..eeac7d91 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres.go +++ b/internal/app/subsystems/aio/store/postgres/postgres.go @@ -110,6 +110,14 @@ const ( WHERE id = $6 AND state = 1` + PROMISE_UPDATE_TIMEOUT_STATEMENT = ` + UPDATE + promises + SET + state = 8, completed_on = timeout + WHERE + state = 1 AND timeout <= $1` + TIMEOUT_SELECT_STATEMENT = ` SELECT id, time @@ -160,6 +168,12 @@ const ( SUBSCRIPTION_DELETE_ALL_STATEMENT = ` DELETE FROM subscriptions WHERE promise_id = $1` + SUBSCRIPTION_DELETE_ALL_TIMEOUT_STATEMENT = ` + DELETE FROM + subscriptions + WHERE + promise_id IN (SELECT id FROM promises WHERE state = 1 AND timeout <= $1)` + NOTIFICATION_SELECT_STATEMENT = ` SELECT id, promise_id, url, retry_policy, time, attempt @@ -171,9 +185,24 @@ const ( NOTIFICATION_INSERT_STATEMENT = ` INSERT INTO notifications - (id, promise_id, url, retry_policy, time, attempt) - VALUES - ($1, $2, $3, $4, $5, 0) + (id, promise_id, url, retry_policy, time, attempt) + SELECT + id, promise_id, url, retry_policy, $1, 0 + FROM + subscriptions + WHERE + promise_id = $2 + ON CONFLICT(id, promise_id) DO NOTHING` + + NOTIFICATION_INSERT_TIMEOUT_STATEMENT = ` + INSERT INTO notifications + (id, promise_id, url, retry_policy, time, attempt) + SELECT + id, promise_id, url, retry_policy, $1, 0 + FROM + subscriptions + WHERE + promise_id IN (SELECT id FROM promises WHERE state = 1 AND timeout <= $1) ON CONFLICT(id, promise_id) DO NOTHING` NOTIFICATION_UPDATE_STATEMENT = ` @@ -302,6 +331,12 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*types. } defer promiseUpdateStmt.Close() + promiseUpdateTimeoutStmt, err := tx.Prepare(PROMISE_UPDATE_TIMEOUT_STATEMENT) + if err != nil { + return nil, err + } + defer promiseUpdateTimeoutStmt.Close() + timeoutInsertStmt, err := tx.Prepare(TIMEOUT_INSERT_STATEMENT) if err != nil { return nil, err @@ -332,12 +367,24 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*types. } defer subscriptionDeleteAllStmt.Close() + subscriptionDeleteAllTimeoutStmt, err := tx.Prepare(SUBSCRIPTION_DELETE_ALL_TIMEOUT_STATEMENT) + if err != nil { + return nil, err + } + defer subscriptionDeleteAllTimeoutStmt.Close() + notificationInsertStmt, err := tx.Prepare(NOTIFICATION_INSERT_STATEMENT) if err != nil { return nil, err } defer notificationInsertStmt.Close() + notificationInsertTimeoutStmt, err := tx.Prepare(NOTIFICATION_INSERT_TIMEOUT_STATEMENT) + if err != nil { + return nil, err + } + defer notificationInsertTimeoutStmt.Close() + notificationUpdateStmt, err := tx.Prepare(NOTIFICATION_UPDATE_STATEMENT) if err != nil { return nil, err @@ -373,6 +420,9 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*types. case types.StoreUpdatePromise: util.Assert(command.UpdatePromise != nil, "command must not be nil") results[i][j], err = w.updatePromise(tx, promiseUpdateStmt, command.UpdatePromise) + case types.StoreTimeoutPromises: + util.Assert(command.TimeoutPromises != nil, "command must not be nil") + results[i][j], err = w.timeoutPromises(tx, promiseUpdateTimeoutStmt, command.TimeoutPromises) // Timeout case types.StoreReadTimeouts: @@ -401,20 +451,27 @@ func (w *PostgresStoreWorker) performCommands(tx *sql.Tx, transactions []*types. case types.StoreDeleteSubscriptions: util.Assert(command.DeleteSubscriptions != nil, "command must not be nil") results[i][j], err = w.deleteSubscriptions(tx, subscriptionDeleteAllStmt, command.DeleteSubscriptions) + case types.StoreTimeoutDeleteSubscriptions: + util.Assert(command.TimeoutDeleteSubscriptions != nil, "command must not be nil") + results[i][j], err = w.timeoutDeleteSubscriptions(tx, subscriptionDeleteAllTimeoutStmt, command.TimeoutDeleteSubscriptions) // Notification case types.StoreReadNotifications: util.Assert(command.ReadNotifications != nil, "command must not be nil") results[i][j], err = w.readNotifications(tx, command.ReadNotifications) - case types.StoreCreateNotification: - util.Assert(command.CreateNotification != nil, "command must not be nil") - results[i][j], err = w.createNotification(tx, notificationInsertStmt, command.CreateNotification) + case types.StoreCreateNotifications: + util.Assert(command.CreateNotifications != nil, "command must not be nil") + results[i][j], err = w.createNotifications(tx, notificationInsertStmt, command.CreateNotifications) case types.StoreUpdateNotification: util.Assert(command.UpdateNotification != nil, "command must not be nil") results[i][j], err = w.updateNotification(tx, notificationUpdateStmt, command.UpdateNotification) case types.StoreDeleteNotification: util.Assert(command.DeleteNotification != nil, "command must not be nil") results[i][j], err = w.deleteNotification(tx, notificationDeleteStmt, command.DeleteNotification) + case types.StoreTimeoutCreateNotifications: + util.Assert(command.TimeoutCreateNotifications != nil, "command must not be nil") + results[i][j], err = w.timeoutCreateNotifications(tx, notificationInsertTimeoutStmt, command.TimeoutCreateNotifications) + default: panic("invalid command") } @@ -585,6 +642,28 @@ func (w *PostgresStoreWorker) updatePromise(tx *sql.Tx, stmt *sql.Stmt, cmd *typ }, nil } +func (w *PostgresStoreWorker) timeoutPromises(tx *sql.Tx, stmt *sql.Stmt, cmd *types.TimeoutPromisesCommand) (*types.Result, error) { + util.Assert(cmd.Time >= 0, "time must be non-negative") + + // udpate promises + res, err := stmt.Exec(cmd.Time) + if err != nil { + return nil, err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return nil, err + } + + return &types.Result{ + Kind: types.StoreTimeoutPromises, + TimeoutPromises: &types.AlterPromisesResult{ + RowsAffected: rowsAffected, + }, + }, nil +} + func (w *PostgresStoreWorker) readTimeouts(tx *sql.Tx, cmd *types.ReadTimeoutsCommand) (*types.Result, error) { // select rows, err := tx.Query(TIMEOUT_SELECT_STATEMENT, cmd.N) @@ -738,7 +817,7 @@ func (w *PostgresStoreWorker) createSubscription(tx *sql.Tx, stmt *sql.Stmt, cmd return &types.Result{ Kind: types.StoreCreateSubscription, - CreateSubscription: &types.AlterSubscriptionResult{ + CreateSubscription: &types.AlterSubscriptionsResult{ RowsAffected: rowsAffected, }, }, nil @@ -758,7 +837,7 @@ func (w *PostgresStoreWorker) deleteSubscription(tx *sql.Tx, stmt *sql.Stmt, cmd return &types.Result{ Kind: types.StoreDeleteSubscription, - DeleteSubscription: &types.AlterSubscriptionResult{ + DeleteSubscription: &types.AlterSubscriptionsResult{ RowsAffected: rowsAffected, }, }, nil @@ -778,7 +857,29 @@ func (w *PostgresStoreWorker) deleteSubscriptions(tx *sql.Tx, stmt *sql.Stmt, cm return &types.Result{ Kind: types.StoreDeleteSubscriptions, - DeleteSubscriptions: &types.AlterSubscriptionResult{ + DeleteSubscriptions: &types.AlterSubscriptionsResult{ + RowsAffected: rowsAffected, + }, + }, nil +} + +func (w *PostgresStoreWorker) timeoutDeleteSubscriptions(tx *sql.Tx, stmt *sql.Stmt, cmd *types.TimeoutDeleteSubscriptionsCommand) (*types.Result, error) { + util.Assert(cmd.Time >= 0, "time must be non-negative") + + // udpate promises + res, err := stmt.Exec(cmd.Time) + if err != nil { + return nil, err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return nil, err + } + + return &types.Result{ + Kind: types.StoreTimeoutDeleteSubscriptions, + TimeoutDeleteSubscriptions: &types.AlterSubscriptionsResult{ RowsAffected: rowsAffected, }, }, nil @@ -814,12 +915,11 @@ func (w *PostgresStoreWorker) readNotifications(tx *sql.Tx, cmd *types.ReadNotif }, nil } -func (w *PostgresStoreWorker) createNotification(tx *sql.Tx, stmt *sql.Stmt, cmd *types.CreateNotificationCommand) (*types.Result, error) { +func (w *PostgresStoreWorker) createNotifications(tx *sql.Tx, stmt *sql.Stmt, cmd *types.CreateNotificationsCommand) (*types.Result, error) { util.Assert(cmd.Time >= 0, "time must be non-negative") - util.Assert(cmd.RetryPolicy != nil, "retry policy must not be nil") // insert - res, err := stmt.Exec(cmd.Id, cmd.PromiseId, cmd.Url, cmd.RetryPolicy, cmd.Time) + res, err := stmt.Exec(cmd.Time, cmd.PromiseId) if err != nil { return nil, err } @@ -830,8 +930,8 @@ func (w *PostgresStoreWorker) createNotification(tx *sql.Tx, stmt *sql.Stmt, cmd } return &types.Result{ - Kind: types.StoreCreateNotification, - CreateNotification: &types.AlterNotificationsResult{ + Kind: types.StoreCreateNotifications, + CreateNotifications: &types.AlterNotificationsResult{ RowsAffected: rowsAffected, }, }, nil @@ -876,3 +976,25 @@ func (w *PostgresStoreWorker) deleteNotification(tx *sql.Tx, stmt *sql.Stmt, cmd }, }, nil } + +func (w *PostgresStoreWorker) timeoutCreateNotifications(tx *sql.Tx, stmt *sql.Stmt, cmd *types.TimeoutCreateNotificationsCommand) (*types.Result, error) { + util.Assert(cmd.Time >= 0, "time must be non-negative") + + // udpate promises + res, err := stmt.Exec(cmd.Time) + if err != nil { + return nil, err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return nil, err + } + + return &types.Result{ + Kind: types.StoreTimeoutCreateNotifications, + TimeoutCreateNotifications: &types.AlterNotificationsResult{ + RowsAffected: rowsAffected, + }, + }, nil +} diff --git a/internal/app/subsystems/aio/store/sqlite/sqlite.go b/internal/app/subsystems/aio/store/sqlite/sqlite.go index 31df778f..be060373 100644 --- a/internal/app/subsystems/aio/store/sqlite/sqlite.go +++ b/internal/app/subsystems/aio/store/sqlite/sqlite.go @@ -103,6 +103,14 @@ const ( WHERE id = ? AND state = 1` + PROMISE_UPDATE_TIMEOUT_STATEMENT = ` + UPDATE + promises + SET + state = 8, completed_on = timeout + WHERE + state = 1 AND timeout <= ?` + TIMEOUT_SELECT_STATEMENT = ` SELECT id, time @@ -153,6 +161,12 @@ const ( SUBSCRIPTION_DELETE_ALL_STATEMENT = ` DELETE FROM subscriptions WHERE promise_id = ?` + SUBSCRIPTION_DELETE_ALL_TIMEOUT_STATEMENT = ` + DELETE FROM + subscriptions + WHERE + promise_id IN (SELECT id FROM promises WHERE state = 1 AND timeout <= ?)` + NOTIFICATION_SELECT_STATEMENT = ` SELECT id, promise_id, url, retry_policy, time, attempt @@ -165,14 +179,32 @@ const ( NOTIFICATION_INSERT_STATEMENT = ` INSERT INTO notifications (id, promise_id, url, retry_policy, time, attempt) - VALUES - (?, ?, ?, ?, ?, 0) + SELECT + id, promise_id, url, retry_policy, ?, 0 + FROM + subscriptions + WHERE + promise_id = ? + ON CONFLICT(id, promise_id) DO NOTHING` + + NOTIFICATION_INSERT_TIMEOUT_STATEMENT = ` + INSERT INTO notifications + (id, promise_id, url, retry_policy, time, attempt) + SELECT + id, promise_id, url, retry_policy, ?, 0 + FROM + subscriptions + WHERE + promise_id IN (SELECT id FROM promises WHERE state = 1 AND timeout <= ?) ON CONFLICT(id, promise_id) DO NOTHING` NOTIFICATION_UPDATE_STATEMENT = ` - UPDATE notifications - SET time = ?, attempt = ? - WHERE id = ? AND promise_id = ?` + UPDATE + notifications + SET + time = ?, attempt = ? + WHERE + id = ? AND promise_id = ?` NOTIFICATION_DELETE_STATEMENT = ` DELETE FROM notifications WHERE id = ? AND promise_id = ?` @@ -275,6 +307,12 @@ func (w *SqliteStoreWorker) performCommands(tx *sql.Tx, transactions []*types.Tr } defer promiseUpdateStmt.Close() + promiseUpdateTimeoutStmt, err := tx.Prepare(PROMISE_UPDATE_TIMEOUT_STATEMENT) + if err != nil { + return nil, err + } + defer promiseUpdateTimeoutStmt.Close() + timeoutInsertStmt, err := tx.Prepare(TIMEOUT_INSERT_STATEMENT) if err != nil { return nil, err @@ -305,12 +343,24 @@ func (w *SqliteStoreWorker) performCommands(tx *sql.Tx, transactions []*types.Tr } defer subscriptionDeleteAllStmt.Close() + subscriptionDeleteAllTimeoutStmt, err := tx.Prepare(SUBSCRIPTION_DELETE_ALL_TIMEOUT_STATEMENT) + if err != nil { + return nil, err + } + defer subscriptionDeleteAllTimeoutStmt.Close() + notificationInsertStmt, err := tx.Prepare(NOTIFICATION_INSERT_STATEMENT) if err != nil { return nil, err } defer notificationInsertStmt.Close() + notificationInsertTimeoutStmt, err := tx.Prepare(NOTIFICATION_INSERT_TIMEOUT_STATEMENT) + if err != nil { + return nil, err + } + defer notificationInsertTimeoutStmt.Close() + notificationUpdateStmt, err := tx.Prepare(NOTIFICATION_UPDATE_STATEMENT) if err != nil { return nil, err @@ -346,6 +396,9 @@ func (w *SqliteStoreWorker) performCommands(tx *sql.Tx, transactions []*types.Tr case types.StoreUpdatePromise: util.Assert(command.UpdatePromise != nil, "command must not be nil") results[i][j], err = w.updatePromise(tx, promiseUpdateStmt, command.UpdatePromise) + case types.StoreTimeoutPromises: + util.Assert(command.TimeoutPromises != nil, "command must not be nil") + results[i][j], err = w.timeoutPromises(tx, promiseUpdateTimeoutStmt, command.TimeoutPromises) // Timeout case types.StoreReadTimeouts: @@ -374,20 +427,27 @@ func (w *SqliteStoreWorker) performCommands(tx *sql.Tx, transactions []*types.Tr case types.StoreDeleteSubscriptions: util.Assert(command.DeleteSubscriptions != nil, "command must not be nil") results[i][j], err = w.deleteSubscriptions(tx, subscriptionDeleteAllStmt, command.DeleteSubscriptions) + case types.StoreTimeoutDeleteSubscriptions: + util.Assert(command.TimeoutDeleteSubscriptions != nil, "command must not be nil") + results[i][j], err = w.timeoutDeleteSubscriptions(tx, subscriptionDeleteAllTimeoutStmt, command.TimeoutDeleteSubscriptions) // Notification case types.StoreReadNotifications: util.Assert(command.ReadNotifications != nil, "command must not be nil") results[i][j], err = w.readNotifications(tx, command.ReadNotifications) - case types.StoreCreateNotification: - util.Assert(command.CreateNotification != nil, "command must not be nil") - results[i][j], err = w.createNotification(tx, notificationInsertStmt, command.CreateNotification) + case types.StoreCreateNotifications: + util.Assert(command.CreateNotifications != nil, "command must not be nil") + results[i][j], err = w.createNotifications(tx, notificationInsertStmt, command.CreateNotifications) case types.StoreUpdateNotification: util.Assert(command.UpdateNotification != nil, "command must not be nil") results[i][j], err = w.updateNotification(tx, notificationUpdateStmt, command.UpdateNotification) case types.StoreDeleteNotification: util.Assert(command.DeleteNotification != nil, "command must not be nil") results[i][j], err = w.deleteNotification(tx, notificationDeleteStmt, command.DeleteNotification) + case types.StoreTimeoutCreateNotifications: + util.Assert(command.TimeoutCreateNotifications != nil, "command must not be nil") + results[i][j], err = w.timeoutCreateNotifications(tx, notificationInsertTimeoutStmt, command.TimeoutCreateNotifications) + default: panic("invalid command") } @@ -558,6 +618,28 @@ func (w *SqliteStoreWorker) updatePromise(tx *sql.Tx, stmt *sql.Stmt, cmd *types }, nil } +func (w *SqliteStoreWorker) timeoutPromises(tx *sql.Tx, stmt *sql.Stmt, cmd *types.TimeoutPromisesCommand) (*types.Result, error) { + util.Assert(cmd.Time >= 0, "time must be non-negative") + + // udpate promises + res, err := stmt.Exec(cmd.Time) + if err != nil { + return nil, err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return nil, err + } + + return &types.Result{ + Kind: types.StoreTimeoutPromises, + TimeoutPromises: &types.AlterPromisesResult{ + RowsAffected: rowsAffected, + }, + }, nil +} + func (w *SqliteStoreWorker) readTimeouts(tx *sql.Tx, cmd *types.ReadTimeoutsCommand) (*types.Result, error) { // select rows, err := tx.Query(TIMEOUT_SELECT_STATEMENT, cmd.N) @@ -725,7 +807,7 @@ func (w *SqliteStoreWorker) createSubscription(tx *sql.Tx, stmt *sql.Stmt, cmd * return &types.Result{ Kind: types.StoreCreateSubscription, - CreateSubscription: &types.AlterSubscriptionResult{ + CreateSubscription: &types.AlterSubscriptionsResult{ RowsAffected: rowsAffected, }, }, nil @@ -745,7 +827,7 @@ func (w *SqliteStoreWorker) deleteSubscription(tx *sql.Tx, stmt *sql.Stmt, cmd * return &types.Result{ Kind: types.StoreDeleteSubscription, - DeleteSubscription: &types.AlterSubscriptionResult{ + DeleteSubscription: &types.AlterSubscriptionsResult{ RowsAffected: rowsAffected, }, }, nil @@ -765,7 +847,29 @@ func (w *SqliteStoreWorker) deleteSubscriptions(tx *sql.Tx, stmt *sql.Stmt, cmd return &types.Result{ Kind: types.StoreDeleteSubscriptions, - DeleteSubscriptions: &types.AlterSubscriptionResult{ + DeleteSubscriptions: &types.AlterSubscriptionsResult{ + RowsAffected: rowsAffected, + }, + }, nil +} + +func (w *SqliteStoreWorker) timeoutDeleteSubscriptions(tx *sql.Tx, stmt *sql.Stmt, cmd *types.TimeoutDeleteSubscriptionsCommand) (*types.Result, error) { + util.Assert(cmd.Time >= 0, "time must be non-negative") + + // udpate promises + res, err := stmt.Exec(cmd.Time) + if err != nil { + return nil, err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return nil, err + } + + return &types.Result{ + Kind: types.StoreTimeoutDeleteSubscriptions, + TimeoutDeleteSubscriptions: &types.AlterSubscriptionsResult{ RowsAffected: rowsAffected, }, }, nil @@ -801,12 +905,11 @@ func (w *SqliteStoreWorker) readNotifications(tx *sql.Tx, cmd *types.ReadNotific }, nil } -func (w *SqliteStoreWorker) createNotification(tx *sql.Tx, stmt *sql.Stmt, cmd *types.CreateNotificationCommand) (*types.Result, error) { +func (w *SqliteStoreWorker) createNotifications(tx *sql.Tx, stmt *sql.Stmt, cmd *types.CreateNotificationsCommand) (*types.Result, error) { util.Assert(cmd.Time >= 0, "time must be non-negative") - util.Assert(cmd.RetryPolicy != nil, "retry policy must not be nil") // insert - res, err := stmt.Exec(cmd.Id, cmd.PromiseId, cmd.Url, cmd.RetryPolicy, cmd.Time) + res, err := stmt.Exec(cmd.Time, cmd.PromiseId) if err != nil { return nil, err } @@ -817,8 +920,8 @@ func (w *SqliteStoreWorker) createNotification(tx *sql.Tx, stmt *sql.Stmt, cmd * } return &types.Result{ - Kind: types.StoreCreateNotification, - CreateNotification: &types.AlterNotificationsResult{ + Kind: types.StoreCreateNotifications, + CreateNotifications: &types.AlterNotificationsResult{ RowsAffected: rowsAffected, }, }, nil @@ -863,3 +966,25 @@ func (w *SqliteStoreWorker) deleteNotification(tx *sql.Tx, stmt *sql.Stmt, cmd * }, }, nil } + +func (w *SqliteStoreWorker) timeoutCreateNotifications(tx *sql.Tx, stmt *sql.Stmt, cmd *types.TimeoutCreateNotificationsCommand) (*types.Result, error) { + util.Assert(cmd.Time >= 0, "time must be non-negative") + + // udpate promises + res, err := stmt.Exec(cmd.Time, cmd.Time) + if err != nil { + return nil, err + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return nil, err + } + + return &types.Result{ + Kind: types.StoreTimeoutCreateNotifications, + TimeoutCreateNotifications: &types.AlterNotificationsResult{ + RowsAffected: rowsAffected, + }, + }, nil +} diff --git a/internal/app/subsystems/aio/store/test/util.go b/internal/app/subsystems/aio/store/test/util.go index ad410d1f..2e434a8e 100644 --- a/internal/app/subsystems/aio/store/test/util.go +++ b/internal/app/subsystems/aio/store/test/util.go @@ -1690,7 +1690,7 @@ var TestCases = []*testCase{ expected: []*types.Result{ { Kind: types.StoreCreateSubscription, - CreateSubscription: &types.AlterSubscriptionResult{ + CreateSubscription: &types.AlterSubscriptionsResult{ RowsAffected: 1, }, }, @@ -1721,13 +1721,13 @@ var TestCases = []*testCase{ expected: []*types.Result{ { Kind: types.StoreCreateSubscription, - CreateSubscription: &types.AlterSubscriptionResult{ + CreateSubscription: &types.AlterSubscriptionsResult{ RowsAffected: 1, }, }, { Kind: types.StoreCreateSubscription, - CreateSubscription: &types.AlterSubscriptionResult{ + CreateSubscription: &types.AlterSubscriptionsResult{ RowsAffected: 0, }, }, @@ -1756,13 +1756,13 @@ var TestCases = []*testCase{ expected: []*types.Result{ { Kind: types.StoreCreateSubscription, - CreateSubscription: &types.AlterSubscriptionResult{ + CreateSubscription: &types.AlterSubscriptionsResult{ RowsAffected: 1, }, }, { Kind: types.StoreDeleteSubscription, - DeleteSubscription: &types.AlterSubscriptionResult{ + DeleteSubscription: &types.AlterSubscriptionsResult{ RowsAffected: 1, }, }, @@ -1791,7 +1791,7 @@ var TestCases = []*testCase{ expected: []*types.Result{ { Kind: types.StoreCreateSubscription, - CreateSubscription: &types.AlterSubscriptionResult{ + CreateSubscription: &types.AlterSubscriptionsResult{ RowsAffected: 1, }, }, @@ -1851,19 +1851,19 @@ var TestCases = []*testCase{ expected: []*types.Result{ { Kind: types.StoreCreateSubscription, - CreateSubscription: &types.AlterSubscriptionResult{ + CreateSubscription: &types.AlterSubscriptionsResult{ RowsAffected: 1, }, }, { Kind: types.StoreCreateSubscription, - CreateSubscription: &types.AlterSubscriptionResult{ + CreateSubscription: &types.AlterSubscriptionsResult{ RowsAffected: 1, }, }, { Kind: types.StoreCreateSubscription, - CreateSubscription: &types.AlterSubscriptionResult{ + CreateSubscription: &types.AlterSubscriptionsResult{ RowsAffected: 1, }, }, @@ -1896,63 +1896,386 @@ var TestCases = []*testCase{ }, }, { - name: "CreateNotification", + name: "TimeoutPromises", commands: []*types.Command{ { - Kind: types.StoreCreateNotification, - CreateNotification: &types.CreateNotificationCommand{ + Kind: types.StoreCreatePromise, + CreatePromise: &types.CreatePromiseCommand{ + Id: "foo", + Timeout: 2, + Param: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + Tags: map[string]string{}, + CreatedOn: 1, + }, + }, + { + Kind: types.StoreCreateSubscription, + CreateSubscription: &types.CreateSubscriptionCommand{ Id: "foo", PromiseId: "foo", Url: "https://foo.com", - Time: 0, - RetryPolicy: []byte("{}"), + RetryPolicy: &subscription.RetryPolicy{Delay: 1, Attempts: 1}, + }, + }, + { + Kind: types.StoreCreatePromise, + CreatePromise: &types.CreatePromiseCommand{ + Id: "bar", + Timeout: 2, + Param: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + Tags: map[string]string{}, + CreatedOn: 1, + }, + }, + { + Kind: types.StoreCreateSubscription, + CreateSubscription: &types.CreateSubscriptionCommand{ + Id: "bar", + PromiseId: "bar", + Url: "https://bar.com", + RetryPolicy: &subscription.RetryPolicy{Delay: 2, Attempts: 2}, + }, + }, + { + Kind: types.StoreCreatePromise, + CreatePromise: &types.CreatePromiseCommand{ + Id: "baz", + Timeout: 2, + Param: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + Tags: map[string]string{}, + CreatedOn: 1, + }, + }, + { + Kind: types.StoreCreateSubscription, + CreateSubscription: &types.CreateSubscriptionCommand{ + Id: "baz", + PromiseId: "baz", + Url: "https://baz.com", + RetryPolicy: &subscription.RetryPolicy{Delay: 3, Attempts: 3}, + }, + }, + { + Kind: types.StoreTimeoutCreateNotifications, + TimeoutCreateNotifications: &types.TimeoutCreateNotificationsCommand{ + Time: 2, + }, + }, + { + Kind: types.StoreTimeoutDeleteSubscriptions, + TimeoutDeleteSubscriptions: &types.TimeoutDeleteSubscriptionsCommand{ + Time: 2, + }, + }, + { + Kind: types.StoreTimeoutPromises, + TimeoutPromises: &types.TimeoutPromisesCommand{ + Time: 2, + }, + }, + { + Kind: types.StoreReadNotifications, + ReadNotifications: &types.ReadNotificationsCommand{ + N: 5, + }, + }, + { + Kind: types.StoreReadSubscriptions, + ReadSubscriptions: &types.ReadSubscriptionsCommand{ + PromiseIds: []string{"foo", "bar", "baz"}, + }, + }, + { + Kind: types.StoreSearchPromises, + SearchPromises: &types.SearchPromisesCommand{ + Q: "*", + States: []promise.State{promise.Timedout}, + Limit: 5, }, }, }, expected: []*types.Result{ { - Kind: types.StoreCreateNotification, - CreateNotification: &types.AlterNotificationsResult{ + Kind: types.StoreCreatePromise, + CreatePromise: &types.AlterPromisesResult{ RowsAffected: 1, }, }, + { + Kind: types.StoreCreateSubscription, + CreateSubscription: &types.AlterSubscriptionsResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreCreatePromise, + CreatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreCreateSubscription, + CreateSubscription: &types.AlterSubscriptionsResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreCreatePromise, + CreatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreCreateSubscription, + CreateSubscription: &types.AlterSubscriptionsResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreTimeoutCreateNotifications, + TimeoutCreateNotifications: &types.AlterNotificationsResult{ + RowsAffected: 3, + }, + }, + { + Kind: types.StoreTimeoutDeleteSubscriptions, + TimeoutDeleteSubscriptions: &types.AlterSubscriptionsResult{ + RowsAffected: 3, + }, + }, + { + Kind: types.StoreTimeoutPromises, + TimeoutPromises: &types.AlterPromisesResult{ + RowsAffected: 3, + }, + }, + { + Kind: types.StoreReadNotifications, + ReadNotifications: &types.QueryNotificationsResult{ + RowsReturned: 3, + Records: []*notification.NotificationRecord{ + { + Id: "bar", + PromiseId: "bar", + Url: "https://bar.com", + RetryPolicy: []byte("{\"delay\":2,\"attempts\":2}"), + Time: 2, + Attempt: 0, + }, + { + Id: "baz", + PromiseId: "baz", + Url: "https://baz.com", + RetryPolicy: []byte("{\"delay\":3,\"attempts\":3}"), + Time: 2, + Attempt: 0, + }, + { + Id: "foo", + PromiseId: "foo", + Url: "https://foo.com", + RetryPolicy: []byte("{\"delay\":1,\"attempts\":1}"), + Time: 2, + Attempt: 0, + }, + }, + }, + }, + { + Kind: types.StoreReadSubscriptions, + ReadSubscriptions: &types.QuerySubscriptionsResult{ + RowsReturned: 0, + }, + }, + { + Kind: types.StoreSearchPromises, + SearchPromises: &types.QueryPromisesResult{ + RowsReturned: 3, + LastSortId: 1, + Records: []*promise.PromiseRecord{ + { + Id: "baz", + State: 8, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + Timeout: 2, + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), + Tags: []byte("{}"), + SortId: 3, + }, + { + Id: "bar", + State: 8, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + Timeout: 2, + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), + Tags: []byte("{}"), + SortId: 2, + }, + { + Id: "foo", + State: 8, + ParamHeaders: []byte("{}"), + ParamData: []byte{}, + Timeout: 2, + CreatedOn: int64ToPointer(1), + CompletedOn: int64ToPointer(2), + Tags: []byte("{}"), + SortId: 1, + }, + }, + }, + }, }, }, { - name: "CreateNotificationTwice", + name: "CreateNotifications", commands: []*types.Command{ { - Kind: types.StoreCreateNotification, - CreateNotification: &types.CreateNotificationCommand{ + Kind: types.StoreCreatePromise, + CreatePromise: &types.CreatePromiseCommand{ + Id: "foo", + Timeout: 1, + Param: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + Tags: map[string]string{}, + CreatedOn: 1, + }, + }, + { + Kind: types.StoreCreateSubscription, + CreateSubscription: &types.CreateSubscriptionCommand{ Id: "foo", PromiseId: "foo", Url: "https://foo.com", - Time: 0, - RetryPolicy: []byte("{}"), + RetryPolicy: &subscription.RetryPolicy{Delay: 1, Attempts: 1}, }, }, { - Kind: types.StoreCreateNotification, - CreateNotification: &types.CreateNotificationCommand{ - Id: "foo", + Kind: types.StoreCreateSubscription, + CreateSubscription: &types.CreateSubscriptionCommand{ + Id: "bar", PromiseId: "foo", - Url: "https://foo.com", - Time: 1, - RetryPolicy: []byte("{}"), + Url: "https://bar.com", + RetryPolicy: &subscription.RetryPolicy{Delay: 2, Attempts: 2}, + }, + }, + { + Kind: types.StoreCreateSubscription, + CreateSubscription: &types.CreateSubscriptionCommand{ + Id: "baz", + PromiseId: "foo", + Url: "https://baz.com", + RetryPolicy: &subscription.RetryPolicy{Delay: 3, Attempts: 3}, + }, + }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: "foo", + State: 2, + Value: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + CompletedOn: 2, + }, + }, + { + Kind: types.StoreCreateNotifications, + CreateNotifications: &types.CreateNotificationsCommand{ + PromiseId: "foo", + Time: 2, + }, + }, + { + Kind: types.StoreReadNotifications, + ReadNotifications: &types.ReadNotificationsCommand{ + N: 3, }, }, }, expected: []*types.Result{ { - Kind: types.StoreCreateNotification, - CreateNotification: &types.AlterNotificationsResult{ + Kind: types.StoreCreatePromise, + CreatePromise: &types.AlterPromisesResult{ RowsAffected: 1, }, }, { - Kind: types.StoreCreateNotification, - CreateNotification: &types.AlterNotificationsResult{ - RowsAffected: 0, + Kind: types.StoreCreateSubscription, + CreateSubscription: &types.AlterSubscriptionsResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreCreateSubscription, + CreateSubscription: &types.AlterSubscriptionsResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreCreateSubscription, + CreateSubscription: &types.AlterSubscriptionsResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreCreateNotifications, + CreateNotifications: &types.AlterNotificationsResult{ + RowsAffected: 3, + }, + }, + { + Kind: types.StoreReadNotifications, + ReadNotifications: &types.QueryNotificationsResult{ + RowsReturned: 3, + Records: []*notification.NotificationRecord{ + { + Id: "bar", + PromiseId: "foo", + Url: "https://bar.com", + RetryPolicy: []byte("{\"delay\":2,\"attempts\":2}"), + Time: 2, + Attempt: 0, + }, + { + Id: "baz", + PromiseId: "foo", + Url: "https://baz.com", + RetryPolicy: []byte("{\"delay\":3,\"attempts\":3}"), + Time: 2, + Attempt: 0, + }, + { + Id: "foo", + PromiseId: "foo", + Url: "https://foo.com", + RetryPolicy: []byte("{\"delay\":1,\"attempts\":1}"), + Time: 2, + Attempt: 0, + }, + }, }, }, }, @@ -1961,13 +2284,44 @@ var TestCases = []*testCase{ name: "UpdateNotification", commands: []*types.Command{ { - Kind: types.StoreCreateNotification, - CreateNotification: &types.CreateNotificationCommand{ + Kind: types.StoreCreatePromise, + CreatePromise: &types.CreatePromiseCommand{ + Id: "foo", + Timeout: 1, + Param: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + Tags: map[string]string{}, + CreatedOn: 1, + }, + }, + { + Kind: types.StoreCreateSubscription, + CreateSubscription: &types.CreateSubscriptionCommand{ Id: "foo", PromiseId: "foo", Url: "https://foo.com", - Time: 0, - RetryPolicy: []byte("{}"), + RetryPolicy: &subscription.RetryPolicy{Delay: 1, Attempts: 1}, + }, + }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: "foo", + State: 2, + Value: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + CompletedOn: 2, + }, + }, + { + Kind: types.StoreCreateNotifications, + CreateNotifications: &types.CreateNotificationsCommand{ + PromiseId: "foo", + Time: 2, }, }, { @@ -1975,15 +2329,39 @@ var TestCases = []*testCase{ UpdateNotification: &types.UpdateNotificationCommand{ Id: "foo", PromiseId: "foo", - Time: 1, + Time: 4, Attempt: 1, }, }, + { + Kind: types.StoreReadNotifications, + ReadNotifications: &types.ReadNotificationsCommand{ + N: 1, + }, + }, }, expected: []*types.Result{ { - Kind: types.StoreCreateNotification, - CreateNotification: &types.AlterNotificationsResult{ + Kind: types.StoreCreatePromise, + CreatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreCreateSubscription, + CreateSubscription: &types.AlterSubscriptionsResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, + }, + }, + { + Kind: types.StoreCreateNotifications, + CreateNotifications: &types.AlterNotificationsResult{ RowsAffected: 1, }, }, @@ -1993,19 +2371,72 @@ var TestCases = []*testCase{ RowsAffected: 1, }, }, + { + Kind: types.StoreReadNotifications, + ReadNotifications: &types.QueryNotificationsResult{ + RowsReturned: 1, + Records: []*notification.NotificationRecord{ + { + Id: "foo", + PromiseId: "foo", + Url: "https://foo.com", + RetryPolicy: []byte("{\"delay\":1,\"attempts\":1}"), + Time: 4, + Attempt: 1, + }, + }, + }, + }, }, }, { name: "DeleteNotification", commands: []*types.Command{ { - Kind: types.StoreCreateNotification, - CreateNotification: &types.CreateNotificationCommand{ + Kind: types.StoreCreatePromise, + CreatePromise: &types.CreatePromiseCommand{ + Id: "foo", + Timeout: 1, + Param: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + Tags: map[string]string{}, + CreatedOn: 1, + }, + }, + { + Kind: types.StoreCreateSubscription, + CreateSubscription: &types.CreateSubscriptionCommand{ Id: "foo", PromiseId: "foo", Url: "https://foo.com", - Time: 0, - RetryPolicy: []byte("{}"), + RetryPolicy: &subscription.RetryPolicy{Delay: 1, Attempts: 1}, + }, + }, + { + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.UpdatePromiseCommand{ + Id: "foo", + State: 2, + Value: promise.Value{ + Headers: map[string]string{}, + Data: []byte{}, + }, + CompletedOn: 2, + }, + }, + { + Kind: types.StoreCreateNotifications, + CreateNotifications: &types.CreateNotificationsCommand{ + PromiseId: "foo", + Time: 2, + }, + }, + { + Kind: types.StoreDeleteSubscriptions, + DeleteSubscriptions: &types.DeleteSubscriptionsCommand{ + PromiseId: "foo", }, }, { @@ -2015,111 +2446,66 @@ var TestCases = []*testCase{ PromiseId: "foo", }, }, - }, - expected: []*types.Result{ { - Kind: types.StoreCreateNotification, - CreateNotification: &types.AlterNotificationsResult{ - RowsAffected: 1, + Kind: types.StoreReadSubscriptions, + ReadSubscriptions: &types.ReadSubscriptionsCommand{ + PromiseIds: []string{"foo"}, }, }, { - Kind: types.StoreDeleteNotification, - DeleteNotification: &types.AlterNotificationsResult{ - RowsAffected: 1, + Kind: types.StoreReadNotifications, + ReadNotifications: &types.ReadNotificationsCommand{ + N: 1, }, }, }, - }, - { - name: "ReadNotification", - commands: []*types.Command{ + expected: []*types.Result{ { - Kind: types.StoreCreateNotification, - CreateNotification: &types.CreateNotificationCommand{ - Id: "foo", - PromiseId: "foo", - Url: "https://foo.com", - Time: 0, - RetryPolicy: []byte("{\"delay\":1,\"attempts\":1}"), + Kind: types.StoreCreatePromise, + CreatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, }, }, { - Kind: types.StoreCreateNotification, - CreateNotification: &types.CreateNotificationCommand{ - Id: "bar", - PromiseId: "bar", - Url: "https://bar.com", - Time: 1, - RetryPolicy: []byte("{\"delay\":2,\"attempts\":2}"), + Kind: types.StoreCreateSubscription, + CreateSubscription: &types.AlterSubscriptionsResult{ + RowsAffected: 1, }, }, { - Kind: types.StoreCreateNotification, - CreateNotification: &types.CreateNotificationCommand{ - Id: "baz", - PromiseId: "baz", - Url: "https://baz.com", - Time: 2, - RetryPolicy: []byte("{\"delay\":3,\"attempts\":3}"), + Kind: types.StoreUpdatePromise, + UpdatePromise: &types.AlterPromisesResult{ + RowsAffected: 1, }, }, { - Kind: types.StoreReadNotifications, - ReadNotifications: &types.ReadNotificationsCommand{ - N: 3, + Kind: types.StoreCreateNotifications, + CreateNotifications: &types.AlterNotificationsResult{ + RowsAffected: 1, }, }, - }, - expected: []*types.Result{ { - Kind: types.StoreCreateNotification, - CreateNotification: &types.AlterNotificationsResult{ + Kind: types.StoreDeleteSubscriptions, + DeleteSubscriptions: &types.AlterSubscriptionsResult{ RowsAffected: 1, }, }, { - Kind: types.StoreCreateNotification, - CreateNotification: &types.AlterNotificationsResult{ + Kind: types.StoreDeleteNotification, + DeleteNotification: &types.AlterNotificationsResult{ RowsAffected: 1, }, }, { - Kind: types.StoreCreateNotification, - CreateNotification: &types.AlterNotificationsResult{ - RowsAffected: 1, + Kind: types.StoreReadSubscriptions, + ReadSubscriptions: &types.QuerySubscriptionsResult{ + RowsReturned: 0, }, }, { Kind: types.StoreReadNotifications, ReadNotifications: &types.QueryNotificationsResult{ - RowsReturned: 3, - Records: []*notification.NotificationRecord{ - { - Id: "foo", - PromiseId: "foo", - Url: "https://foo.com", - Time: 0, - Attempt: 0, - RetryPolicy: []byte("{\"delay\":1,\"attempts\":1}"), - }, - { - Id: "bar", - PromiseId: "bar", - Url: "https://bar.com", - Time: 1, - Attempt: 0, - RetryPolicy: []byte("{\"delay\":2,\"attempts\":2}"), - }, - { - Id: "baz", - PromiseId: "baz", - Url: "https://baz.com", - Time: 2, - Attempt: 0, - RetryPolicy: []byte("{\"delay\":3,\"attempts\":3}"), - }, - }, + RowsReturned: 0, }, }, }, diff --git a/internal/kernel/types/aio_store.go b/internal/kernel/types/aio_store.go index cbc1c0af..59d1e695 100644 --- a/internal/kernel/types/aio_store.go +++ b/internal/kernel/types/aio_store.go @@ -23,9 +23,12 @@ const ( StoreDeleteSubscription StoreDeleteSubscriptions StoreReadNotifications - StoreCreateNotification + StoreCreateNotifications StoreUpdateNotification StoreDeleteNotification + StoreTimeoutPromises + StoreTimeoutDeleteSubscriptions + StoreTimeoutCreateNotifications ) type StoreSubmission struct { @@ -41,44 +44,52 @@ type Transaction struct { } type Command struct { - Kind StoreKind - ReadPromise *ReadPromiseCommand - SearchPromises *SearchPromisesCommand - CreatePromise *CreatePromiseCommand - UpdatePromise *UpdatePromiseCommand - ReadTimeouts *ReadTimeoutsCommand - CreateTimeout *CreateTimeoutCommand - DeleteTimeout *DeleteTimeoutCommand - ReadSubscription *ReadSubscriptionCommand - ReadSubscriptions *ReadSubscriptionsCommand - CreateSubscription *CreateSubscriptionCommand - DeleteSubscription *DeleteSubscriptionCommand - DeleteSubscriptions *DeleteSubscriptionsCommand - ReadNotifications *ReadNotificationsCommand - CreateNotification *CreateNotificationCommand - UpdateNotification *UpdateNotificationCommand - DeleteNotification *DeleteNotificationCommand + Kind StoreKind + ReadPromise *ReadPromiseCommand + SearchPromises *SearchPromisesCommand + CreatePromise *CreatePromiseCommand + UpdatePromise *UpdatePromiseCommand + ReadTimeouts *ReadTimeoutsCommand + CreateTimeout *CreateTimeoutCommand + DeleteTimeout *DeleteTimeoutCommand + ReadSubscription *ReadSubscriptionCommand + ReadSubscriptions *ReadSubscriptionsCommand + CreateSubscription *CreateSubscriptionCommand + DeleteSubscription *DeleteSubscriptionCommand + DeleteSubscriptions *DeleteSubscriptionsCommand + ReadNotifications *ReadNotificationsCommand + CreateNotifications *CreateNotificationsCommand + UpdateNotification *UpdateNotificationCommand + DeleteNotification *DeleteNotificationCommand + TimeoutPromises *TimeoutPromisesCommand + TimeoutDeleteSubscriptions *TimeoutDeleteSubscriptionsCommand + TimeoutCreateNotifications *TimeoutCreateNotificationsCommand } type Result struct { - Kind StoreKind - ReadPromise *QueryPromisesResult - SearchPromises *QueryPromisesResult - CreatePromise *AlterPromisesResult - UpdatePromise *AlterPromisesResult - ReadTimeouts *QueryTimeoutsResult - CreateTimeout *AlterTimeoutsResult - DeleteTimeout *AlterTimeoutsResult - ReadSubscription *QuerySubscriptionsResult - ReadSubscriptions *QuerySubscriptionsResult - CreateSubscription *AlterSubscriptionResult - DeleteSubscription *AlterSubscriptionResult - DeleteSubscriptions *AlterSubscriptionResult - ReadNotifications *QueryNotificationsResult - CreateNotification *AlterNotificationsResult - UpdateNotification *AlterNotificationsResult - DeleteNotification *AlterNotificationsResult -} + Kind StoreKind + ReadPromise *QueryPromisesResult + SearchPromises *QueryPromisesResult + CreatePromise *AlterPromisesResult + UpdatePromise *AlterPromisesResult + ReadTimeouts *QueryTimeoutsResult + CreateTimeout *AlterTimeoutsResult + DeleteTimeout *AlterTimeoutsResult + ReadSubscription *QuerySubscriptionsResult + ReadSubscriptions *QuerySubscriptionsResult + CreateSubscription *AlterSubscriptionsResult + DeleteSubscription *AlterSubscriptionsResult + DeleteSubscriptions *AlterSubscriptionsResult + ReadNotifications *QueryNotificationsResult + CreateNotifications *AlterNotificationsResult + UpdateNotification *AlterNotificationsResult + DeleteNotification *AlterNotificationsResult + TimeoutPromises *AlterPromisesResult + TimeoutDeleteSubscriptions *AlterSubscriptionsResult + TimeoutCreateNotifications *AlterNotificationsResult +} + +// Promise commands type ReadPromiseCommand struct { Id string @@ -107,6 +118,8 @@ type UpdatePromiseCommand struct { CompletedOn int64 } +// Promise results + type QueryPromisesResult struct { RowsReturned int64 LastSortId int64 @@ -117,6 +130,8 @@ type AlterPromisesResult struct { RowsAffected int64 } +// Timeout commands + type ReadTimeoutsCommand struct { N int } @@ -130,6 +145,20 @@ type DeleteTimeoutCommand struct { Id string } +type TimeoutPromisesCommand struct { + Time int64 +} + +type TimeoutDeleteSubscriptionsCommand struct { + Time int64 +} + +type TimeoutCreateNotificationsCommand struct { + Time int64 +} + +// Timeout results + type QueryTimeoutsResult struct { RowsReturned int64 Records []*timeout.TimeoutRecord @@ -139,6 +168,8 @@ type AlterTimeoutsResult struct { RowsAffected int64 } +// Subscription commands + type ReadSubscriptionCommand struct { Id string PromiseId string @@ -165,25 +196,26 @@ type DeleteSubscriptionsCommand struct { PromiseId string } +// Subscription results + type QuerySubscriptionsResult struct { RowsReturned int64 Records []*subscription.SubscriptionRecord } -type AlterSubscriptionResult struct { +type AlterSubscriptionsResult struct { RowsAffected int64 } +// Notification commands + type ReadNotificationsCommand struct { N int } -type CreateNotificationCommand struct { - Id string - PromiseId string - Url string - RetryPolicy []byte - Time int64 +type CreateNotificationsCommand struct { + PromiseId string + Time int64 } type UpdateNotificationCommand struct { @@ -198,6 +230,8 @@ type DeleteNotificationCommand struct { PromiseId string } +// Notification results + type QueryNotificationsResult struct { RowsReturned int64 Records []*notification.NotificationRecord From 1fa43c040c64de9b66917ff365c603886117ddae Mon Sep 17 00:00:00 2001 From: David Farr Date: Wed, 27 Sep 2023 22:17:19 -0700 Subject: [PATCH 2/3] Tweak error logs --- internal/app/coroutines/createPromise.go | 2 +- internal/app/coroutines/resolvePromise.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/app/coroutines/createPromise.go b/internal/app/coroutines/createPromise.go index e6b5a078..c838753e 100644 --- a/internal/app/coroutines/createPromise.go +++ b/internal/app/coroutines/createPromise.go @@ -73,7 +73,7 @@ func CreatePromise(t int64, req *types.Request, res func(int64, *types.Response, c.Yield(submission, func(t int64, completion *types.Completion, err error) { if err != nil { - slog.Error("failed to update state", "req", req, "err", err) + slog.Error("failed to update promise", "req", req, "err", err) res(t, nil, err) return } diff --git a/internal/app/coroutines/resolvePromise.go b/internal/app/coroutines/resolvePromise.go index 0fa3c328..fb9cbb36 100644 --- a/internal/app/coroutines/resolvePromise.go +++ b/internal/app/coroutines/resolvePromise.go @@ -127,7 +127,7 @@ func ResolvePromise(t int64, req *types.Request, res func(int64, *types.Response c.Yield(submission, func(t int64, completion *types.Completion, err error) { if err != nil { - slog.Error("failed to update state", "req", req, "err", err) + slog.Error("failed to update promise", "req", req, "err", err) res(t, nil, err) return } From fe11001a4438e88a3aff76e623324e10937fbd4d Mon Sep 17 00:00:00 2001 From: David Farr Date: Thu, 28 Sep 2023 09:01:53 -0700 Subject: [PATCH 3/3] Update docker-compose.yml --- docker-compose.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index 247d9e1c..244c468f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -42,6 +42,8 @@ services: command: - dst - run + - --aio-store + - postgres - --aio-store-postgres-host - postgres-dst - --aio-store-postgres-username