diff --git a/cmd/dst/run.go b/cmd/dst/run.go index 64d8ed8e..a4aa3b8b 100644 --- a/cmd/dst/run.go +++ b/cmd/dst/run.go @@ -110,9 +110,7 @@ func RunDSTCmd() *cobra.Command { system.AddOnRequest(t_api.ReadPromise, coroutines.ReadPromise) system.AddOnRequest(t_api.SearchPromises, coroutines.SearchPromises) system.AddOnRequest(t_api.CreatePromise, coroutines.CreatePromise) - system.AddOnRequest(t_api.CancelPromise, coroutines.CancelPromise) - system.AddOnRequest(t_api.ResolvePromise, coroutines.ResolvePromise) - system.AddOnRequest(t_api.RejectPromise, coroutines.RejectPromise) + system.AddOnRequest(t_api.CompletePromise, coroutines.CompletePromise) system.AddOnRequest(t_api.ReadSchedule, coroutines.ReadSchedule) system.AddOnRequest(t_api.SearchSchedules, coroutines.SearchSchedules) system.AddOnRequest(t_api.CreateSchedule, coroutines.CreateSchedule) @@ -133,9 +131,7 @@ func RunDSTCmd() *cobra.Command { t_api.ReadPromise, t_api.SearchPromises, t_api.CreatePromise, - t_api.CancelPromise, - t_api.ResolvePromise, - t_api.RejectPromise, + t_api.CompletePromise, // SCHEDULE t_api.ReadSchedule, diff --git a/cmd/serve/serve.go b/cmd/serve/serve.go index 4210c779..70d04852 100644 --- a/cmd/serve/serve.go +++ b/cmd/serve/serve.go @@ -89,9 +89,7 @@ func ServeCmd() *cobra.Command { system.AddOnRequest(t_api.ReadPromise, coroutines.ReadPromise) system.AddOnRequest(t_api.SearchPromises, coroutines.SearchPromises) system.AddOnRequest(t_api.CreatePromise, coroutines.CreatePromise) - system.AddOnRequest(t_api.ResolvePromise, coroutines.ResolvePromise) - system.AddOnRequest(t_api.RejectPromise, coroutines.RejectPromise) - system.AddOnRequest(t_api.CancelPromise, coroutines.CancelPromise) + system.AddOnRequest(t_api.CompletePromise, coroutines.CompletePromise) system.AddOnRequest(t_api.ReadSchedule, coroutines.ReadSchedule) system.AddOnRequest(t_api.SearchSchedules, coroutines.SearchSchedules) system.AddOnRequest(t_api.CreateSchedule, coroutines.CreateSchedule) diff --git a/internal/api/api.go b/internal/api/api.go index b8eac01e..6179f245 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -96,12 +96,8 @@ func (a *api) Enqueue(sqe *bus.SQE[t_api.Request, t_api.Response]) { status = int(res.SearchPromises.Status) case t_api.CreatePromise: status = int(res.CreatePromise.Status) - case t_api.CancelPromise: - status = int(res.CancelPromise.Status) - case t_api.ResolvePromise: - status = int(res.ResolvePromise.Status) - case t_api.RejectPromise: - status = int(res.RejectPromise.Status) + case t_api.CompletePromise: + status = int(res.CompletePromise.Status) case t_api.ReadSchedule: status = int(res.ReadSchedule.Status) case t_api.SearchSchedules: diff --git a/internal/app/coroutines/cancelPromise.go b/internal/app/coroutines/cancelPromise.go deleted file mode 100644 index 11848877..00000000 --- a/internal/app/coroutines/cancelPromise.go +++ /dev/null @@ -1,185 +0,0 @@ -package coroutines - -import ( - "log/slog" - - "github.com/resonatehq/resonate/internal/kernel/metadata" - "github.com/resonatehq/resonate/internal/kernel/scheduler" - "github.com/resonatehq/resonate/internal/kernel/t_aio" - "github.com/resonatehq/resonate/internal/kernel/t_api" - "github.com/resonatehq/resonate/internal/util" - "github.com/resonatehq/resonate/pkg/promise" -) - -func CancelPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { - return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { - if req.CancelPromise.Value.Headers == nil { - req.CancelPromise.Value.Headers = map[string]string{} - } - if req.CancelPromise.Value.Data == nil { - req.CancelPromise.Value.Data = []byte{} - } - - completion, err := c.Yield(&t_aio.Submission{ - Kind: t_aio.Store, - Store: &t_aio.StoreSubmission{ - Transaction: &t_aio.Transaction{ - Commands: []*t_aio.Command{ - { - Kind: t_aio.ReadPromise, - ReadPromise: &t_aio.ReadPromiseCommand{ - Id: req.CancelPromise.Id, - }, - }, - }, - }, - }, - }) - - if err != nil { - slog.Error("failed to read promise", "req", req, "err", err) - // put in original error - res(nil, t_api.NewResonateError(t_api.ErrAIOStoreFailure, "failed to read promise", err)) - // store submission is failing -- single AIOStoreSubmissionFailure -- metadata information on that guy - return - } - - util.Assert(completion.Store != nil, "completion must not be nil") - - result := completion.Store.Results[0].ReadPromise - util.Assert(result.RowsReturned == 0 || result.RowsReturned == 1, "result must return 0 or 1 rows") - - if result.RowsReturned == 0 { - res(&t_api.Response{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CompletePromiseResponse{ - Status: t_api.StatusPromiseNotFound, - }, - }, nil) - } else { - p, err := result.Records[0].Promise() - if err != nil { - slog.Error("failed to parse promise record", "record", result.Records[0], "err", err) - res(nil, t_api.NewResonateError(t_api.ErrAIOStoreSerializationFailure, "failed to parse promise record", err)) - return - } - - if p.State == promise.Pending { - if c.Time() >= p.Timeout { - c.Scheduler.Add(TimeoutPromise(metadata, p, CancelPromise(metadata, req, res), func(err error) { - if err != nil { - slog.Error("failed to timeout promise", "req", req, "err", err) - res(nil, t_api.NewResonateError(t_api.ErrAIOStoreFailure, "failed to timeout promise", err)) - return - } - - res(&t_api.Response{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CompletePromiseResponse{ - Status: t_api.StatusPromiseAlreadyTimedOut, - Promise: &promise.Promise{ - Id: p.Id, - State: promise.Timedout, - Param: p.Param, - Value: promise.Value{ - Headers: map[string]string{}, - Data: []byte{}, - }, - Timeout: p.Timeout, - IdempotencyKeyForCreate: p.IdempotencyKeyForCreate, - IdempotencyKeyForComplete: p.IdempotencyKeyForComplete, - Tags: p.Tags, - CreatedOn: p.CreatedOn, - CompletedOn: &p.Timeout, - }, - }, - }, nil) - })) - } else { - completedOn := c.Time() - completion, err := c.Yield(&t_aio.Submission{ - Kind: t_aio.Store, - Store: &t_aio.StoreSubmission{ - Transaction: &t_aio.Transaction{ - Commands: []*t_aio.Command{ - { - Kind: t_aio.UpdatePromise, - UpdatePromise: &t_aio.UpdatePromiseCommand{ - Id: req.CancelPromise.Id, - State: promise.Canceled, - Value: req.CancelPromise.Value, - IdempotencyKey: req.CancelPromise.IdempotencyKey, - CompletedOn: completedOn, - }, - }, - { - Kind: t_aio.CreateNotifications, - CreateNotifications: &t_aio.CreateNotificationsCommand{ - PromiseId: req.CancelPromise.Id, - Time: completedOn, - }, - }, - { - Kind: t_aio.DeleteSubscriptions, - DeleteSubscriptions: &t_aio.DeleteSubscriptionsCommand{ - PromiseId: req.CancelPromise.Id, - }, - }, - }, - }, - }, - }) - - if err != nil { - slog.Error("failed to update promise", "req", req, "err", err) - res(nil, t_api.NewResonateError(t_api.ErrAIOStoreFailure, "failed to update promise", err)) - return - } - - util.Assert(completion.Store != nil, "completion must not be nil") - - result := completion.Store.Results[0].UpdatePromise - util.Assert(result.RowsAffected == 0 || result.RowsAffected == 1, "result must return 0 or 1 rows") - - if result.RowsAffected == 1 { - res(&t_api.Response{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CompletePromiseResponse{ - Status: t_api.StatusCreated, - Promise: &promise.Promise{ - Id: p.Id, - State: promise.Canceled, - Param: p.Param, - Value: req.CancelPromise.Value, - Timeout: p.Timeout, - IdempotencyKeyForCreate: p.IdempotencyKeyForCreate, - IdempotencyKeyForComplete: req.CancelPromise.IdempotencyKey, - Tags: p.Tags, - CreatedOn: p.CreatedOn, - CompletedOn: &completedOn, - }, - }, - }, nil) - } else { - c.Scheduler.Add(CancelPromise(metadata, req, res)) - } - } - } else { - status := t_api.ForbiddenStatus(p.State) - strict := req.CancelPromise.Strict && p.State != promise.Canceled - - if !strict && p.IdempotencyKeyForComplete.Match(req.CancelPromise.IdempotencyKey) { - status = t_api.StatusOK - } - - res(&t_api.Response{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CompletePromiseResponse{ - Status: status, - Promise: p, - }, - }, nil) - } - } - }) -} diff --git a/internal/app/coroutines/rejectPromise.go b/internal/app/coroutines/completePromise.go similarity index 74% rename from internal/app/coroutines/rejectPromise.go rename to internal/app/coroutines/completePromise.go index 82804dd3..ddd5561e 100644 --- a/internal/app/coroutines/rejectPromise.go +++ b/internal/app/coroutines/completePromise.go @@ -11,13 +11,13 @@ import ( "github.com/resonatehq/resonate/pkg/promise" ) -func RejectPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { +func CompletePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { - if req.RejectPromise.Value.Headers == nil { - req.RejectPromise.Value.Headers = map[string]string{} + if req.CompletePromise.Value.Headers == nil { + req.CompletePromise.Value.Headers = map[string]string{} } - if req.RejectPromise.Value.Data == nil { - req.RejectPromise.Value.Data = []byte{} + if req.CompletePromise.Value.Data == nil { + req.CompletePromise.Value.Data = []byte{} } completion, err := c.Yield(&t_aio.Submission{ @@ -28,7 +28,7 @@ func RejectPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_ { Kind: t_aio.ReadPromise, ReadPromise: &t_aio.ReadPromiseCommand{ - Id: req.RejectPromise.Id, + Id: req.CompletePromise.Id, }, }, }, @@ -49,8 +49,8 @@ func RejectPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_ if result.RowsReturned == 0 { res(&t_api.Response{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.CompletePromiseResponse{ + Kind: req.Kind, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusPromiseNotFound, }, }, nil) @@ -64,7 +64,7 @@ func RejectPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_ if p.State == promise.Pending { if c.Time() >= p.Timeout { - c.Scheduler.Add(TimeoutPromise(metadata, p, RejectPromise(metadata, req, res), func(err error) { + c.Scheduler.Add(TimeoutPromise(metadata, p, CompletePromise(metadata, req, res), func(err error) { if err != nil { slog.Error("failed to timeout promise", "req", req, "err", err) res(nil, t_api.NewResonateError(t_api.ErrAIOStoreFailure, "failed to timeout promise", err)) @@ -72,8 +72,8 @@ func RejectPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_ } res(&t_api.Response{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.CompletePromiseResponse{ + Kind: req.Kind, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusPromiseAlreadyTimedOut, Promise: &promise.Promise{ Id: p.Id, @@ -103,24 +103,24 @@ func RejectPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_ { Kind: t_aio.UpdatePromise, UpdatePromise: &t_aio.UpdatePromiseCommand{ - Id: req.RejectPromise.Id, - State: promise.Rejected, - Value: req.RejectPromise.Value, - IdempotencyKey: req.RejectPromise.IdempotencyKey, + Id: req.CompletePromise.Id, + State: req.CompletePromise.State, + Value: req.CompletePromise.Value, + IdempotencyKey: req.CompletePromise.IdempotencyKey, CompletedOn: completedOn, }, }, { Kind: t_aio.CreateNotifications, CreateNotifications: &t_aio.CreateNotificationsCommand{ - PromiseId: req.RejectPromise.Id, + PromiseId: req.CompletePromise.Id, Time: completedOn, }, }, { Kind: t_aio.DeleteSubscriptions, DeleteSubscriptions: &t_aio.DeleteSubscriptionsCommand{ - PromiseId: req.RejectPromise.Id, + PromiseId: req.CompletePromise.Id, }, }, }, @@ -141,17 +141,17 @@ func RejectPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_ if result.RowsAffected == 1 { res(&t_api.Response{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.CompletePromiseResponse{ + Kind: req.Kind, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusCreated, Promise: &promise.Promise{ Id: p.Id, - State: promise.Rejected, + State: req.CompletePromise.State, Param: p.Param, - Value: req.RejectPromise.Value, + Value: req.CompletePromise.Value, Timeout: p.Timeout, IdempotencyKeyForCreate: p.IdempotencyKeyForCreate, - IdempotencyKeyForComplete: req.RejectPromise.IdempotencyKey, + IdempotencyKeyForComplete: req.CompletePromise.IdempotencyKey, Tags: p.Tags, CreatedOn: p.CreatedOn, CompletedOn: &completedOn, @@ -159,20 +159,20 @@ func RejectPromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_ }, }, nil) } else { - c.Scheduler.Add(RejectPromise(metadata, req, res)) + c.Scheduler.Add(CompletePromise(metadata, req, res)) } } } else { status := t_api.ForbiddenStatus(p.State) - strict := req.RejectPromise.Strict && p.State != promise.Rejected + strict := req.CompletePromise.Strict && p.State != req.CompletePromise.State - if !strict && p.IdempotencyKeyForComplete.Match(req.RejectPromise.IdempotencyKey) { + if !strict && p.IdempotencyKeyForComplete.Match(req.CompletePromise.IdempotencyKey) { status = t_api.StatusOK } res(&t_api.Response{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.CompletePromiseResponse{ + Kind: req.Kind, + CompletePromise: &t_api.CompletePromiseResponse{ Status: status, Promise: p, }, diff --git a/internal/app/coroutines/resolvePromise.go b/internal/app/coroutines/resolvePromise.go deleted file mode 100644 index 295686b7..00000000 --- a/internal/app/coroutines/resolvePromise.go +++ /dev/null @@ -1,182 +0,0 @@ -package coroutines - -import ( - "log/slog" - - "github.com/resonatehq/resonate/internal/kernel/metadata" - "github.com/resonatehq/resonate/internal/kernel/scheduler" - "github.com/resonatehq/resonate/internal/kernel/t_aio" - "github.com/resonatehq/resonate/internal/kernel/t_api" - "github.com/resonatehq/resonate/internal/util" - "github.com/resonatehq/resonate/pkg/promise" -) - -func ResolvePromise(metadata *metadata.Metadata, req *t_api.Request, res func(*t_api.Response, error)) *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission] { - return scheduler.NewCoroutine(metadata, func(c *scheduler.Coroutine[*t_aio.Completion, *t_aio.Submission]) { - if req.ResolvePromise.Value.Headers == nil { - req.ResolvePromise.Value.Headers = map[string]string{} - } - if req.ResolvePromise.Value.Data == nil { - req.ResolvePromise.Value.Data = []byte{} - } - - completion, err := c.Yield(&t_aio.Submission{ - Kind: t_aio.Store, - Store: &t_aio.StoreSubmission{ - Transaction: &t_aio.Transaction{ - Commands: []*t_aio.Command{ - { - Kind: t_aio.ReadPromise, - ReadPromise: &t_aio.ReadPromiseCommand{ - Id: req.ResolvePromise.Id, - }, - }, - }, - }, - }, - }) - - if err != nil { - slog.Error("failed to read promise", "req", req, "err", err) - res(nil, t_api.NewResonateError(t_api.ErrAIOStoreFailure, "failed to read promise", err)) - return - } - - util.Assert(completion.Store != nil, "completion must not be nil") - - result := completion.Store.Results[0].ReadPromise - util.Assert(result.RowsReturned == 0 || result.RowsReturned == 1, "result must return 0 or 1 rows") - - if result.RowsReturned == 0 { - res(&t_api.Response{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.CompletePromiseResponse{ - Status: t_api.StatusPromiseNotFound, - }, - }, nil) - } else { - p, err := result.Records[0].Promise() - if err != nil { - slog.Error("failed to parse promise record", "record", result.Records[0], "err", err) - res(nil, t_api.NewResonateError(t_api.ErrAIOStoreSerializationFailure, "failed to parse promise record", err)) - return - } - - if p.State == promise.Pending { - if c.Time() >= p.Timeout { - c.Scheduler.Add(TimeoutPromise(metadata, p, ResolvePromise(metadata, req, res), func(err error) { - if err != nil { - slog.Error("failed to timeout promise", "req", req, "err", err) - res(nil, t_api.NewResonateError(t_api.ErrAIOStoreFailure, "failed to timeout promise", err)) - return - } - - res(&t_api.Response{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.CompletePromiseResponse{ - Status: t_api.StatusPromiseAlreadyTimedOut, - Promise: &promise.Promise{ - Id: p.Id, - State: promise.Timedout, - Param: p.Param, - Value: promise.Value{ - Headers: map[string]string{}, - Data: []byte{}, - }, - Timeout: p.Timeout, - IdempotencyKeyForCreate: p.IdempotencyKeyForCreate, - IdempotencyKeyForComplete: p.IdempotencyKeyForComplete, - Tags: p.Tags, - CreatedOn: p.CreatedOn, - CompletedOn: &p.Timeout, - }, - }, - }, nil) - })) - } else { - completedOn := c.Time() - completion, err := c.Yield(&t_aio.Submission{ - Kind: t_aio.Store, - Store: &t_aio.StoreSubmission{ - Transaction: &t_aio.Transaction{ - Commands: []*t_aio.Command{ - { - Kind: t_aio.UpdatePromise, - UpdatePromise: &t_aio.UpdatePromiseCommand{ - Id: req.ResolvePromise.Id, - State: promise.Resolved, - Value: req.ResolvePromise.Value, - IdempotencyKey: req.ResolvePromise.IdempotencyKey, - CompletedOn: completedOn, - }, - }, - { - Kind: t_aio.CreateNotifications, - CreateNotifications: &t_aio.CreateNotificationsCommand{ - PromiseId: req.ResolvePromise.Id, - Time: completedOn, - }, - }, - { - Kind: t_aio.DeleteSubscriptions, - DeleteSubscriptions: &t_aio.DeleteSubscriptionsCommand{ - PromiseId: req.ResolvePromise.Id, - }, - }, - }, - }, - }, - }) - - if err != nil { - slog.Error("failed to update promise", "req", req, "err", err) - res(nil, t_api.NewResonateError(t_api.ErrAIOStoreFailure, "failed to update promise", err)) - return - } - - util.Assert(completion.Store != nil, "completion must not be nil") - - result := completion.Store.Results[0].UpdatePromise - util.Assert(result.RowsAffected == 0 || result.RowsAffected == 1, "result must return 0 or 1 rows") - - if result.RowsAffected == 1 { - res(&t_api.Response{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.CompletePromiseResponse{ - Status: t_api.StatusCreated, - Promise: &promise.Promise{ - Id: p.Id, - State: promise.Resolved, - Param: p.Param, - Value: req.ResolvePromise.Value, - Timeout: p.Timeout, - IdempotencyKeyForCreate: p.IdempotencyKeyForCreate, - IdempotencyKeyForComplete: req.ResolvePromise.IdempotencyKey, - CreatedOn: p.CreatedOn, - CompletedOn: &completedOn, - }, - }, - }, nil) - } else { - c.Scheduler.Add(ResolvePromise(metadata, req, res)) - } - } - } else { - status := t_api.ForbiddenStatus(p.State) - strict := req.ResolvePromise.Strict && p.State != promise.Resolved - - if !strict && p.IdempotencyKeyForComplete.Match(req.ResolvePromise.IdempotencyKey) { - status = t_api.StatusOK - } - - res(&t_api.Response{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.CompletePromiseResponse{ - Status: status, - Promise: p, - }, - }, nil) - } - } - }) -} diff --git a/internal/app/subsystems/api/grpc/grpc.go b/internal/app/subsystems/api/grpc/grpc.go index f51aaed6..93b91bde 100644 --- a/internal/app/subsystems/api/grpc/grpc.go +++ b/internal/app/subsystems/api/grpc/grpc.go @@ -224,7 +224,7 @@ func (s *server) CancelPromise(ctx context.Context, req *grpcApi.CancelPromiseRe Data: data, }, } - resp, err := s.service.CancelPromise(req.Id, header, body) + resp, err := s.service.CompletePromise(req.Id, promise.Canceled, header, body) if err != nil { var apiErr *api.APIErrorResponse util.Assert(errors.As(err, &apiErr), "err must be api error") @@ -266,7 +266,7 @@ func (s *server) ResolvePromise(ctx context.Context, req *grpcApi.ResolvePromise }, } - resp, err := s.service.ResolvePromise(req.Id, header, body) + resp, err := s.service.CompletePromise(req.Id, promise.Resolved, header, body) if err != nil { var apiErr *api.APIErrorResponse util.Assert(errors.As(err, &apiErr), "err must be api error") @@ -308,7 +308,7 @@ func (s *server) RejectPromise(ctx context.Context, req *grpcApi.RejectPromiseRe }, } - resp, err := s.service.RejectPromise(req.Id, header, body) + resp, err := s.service.CompletePromise(req.Id, promise.Rejected, header, body) if err != nil { var apiErr *api.APIErrorResponse util.Assert(errors.As(err, &apiErr), "err must be api error") diff --git a/internal/app/subsystems/api/grpc/grpc_test.go b/internal/app/subsystems/api/grpc/grpc_test.go index ed428fa7..91f8c483 100644 --- a/internal/app/subsystems/api/grpc/grpc_test.go +++ b/internal/app/subsystems/api/grpc/grpc_test.go @@ -544,11 +544,12 @@ func TestCancelPromise(t *testing.T) { }, }, req: &t_api.Request{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CancelPromiseRequest{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ Id: "foo", IdempotencyKey: util.ToPointer(idempotency.Key("bar")), Strict: true, + State: promise.Canceled, Value: promise.Value{ Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, Data: []byte("cancel"), @@ -556,8 +557,8 @@ func TestCancelPromise(t *testing.T) { }, }, res: &t_api.Response{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CompletePromiseResponse{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusCreated, Promise: &promise.Promise{ Id: "foo", @@ -573,11 +574,12 @@ func TestCancelPromise(t *testing.T) { Id: "foo", }, req: &t_api.Request{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CancelPromiseRequest{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ Id: "foo", IdempotencyKey: nil, Strict: false, + State: promise.Canceled, Value: promise.Value{ Headers: nil, Data: nil, @@ -585,8 +587,8 @@ func TestCancelPromise(t *testing.T) { }, }, res: &t_api.Response{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CompletePromiseResponse{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusCreated, Promise: &promise.Promise{ Id: "foo", @@ -602,11 +604,12 @@ func TestCancelPromise(t *testing.T) { Id: "foo", }, req: &t_api.Request{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CancelPromiseRequest{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ Id: "foo", IdempotencyKey: nil, Strict: false, + State: promise.Canceled, Value: promise.Value{ Headers: nil, Data: nil, @@ -614,8 +617,8 @@ func TestCancelPromise(t *testing.T) { }, }, res: &t_api.Response{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CompletePromiseResponse{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusPromiseAlreadyResolved, Promise: &promise.Promise{ Id: "foo", @@ -684,11 +687,12 @@ func TestResolvePromise(t *testing.T) { }, }, req: &t_api.Request{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.ResolvePromiseRequest{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ Id: "foo", IdempotencyKey: util.ToPointer(idempotency.Key("bar")), Strict: true, + State: promise.Resolved, Value: promise.Value{ Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, Data: []byte("cancel"), @@ -696,8 +700,8 @@ func TestResolvePromise(t *testing.T) { }, }, res: &t_api.Response{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.CompletePromiseResponse{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusCreated, Promise: &promise.Promise{ Id: "foo", @@ -713,11 +717,12 @@ func TestResolvePromise(t *testing.T) { Id: "foo", }, req: &t_api.Request{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.ResolvePromiseRequest{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ Id: "foo", IdempotencyKey: nil, Strict: false, + State: promise.Resolved, Value: promise.Value{ Headers: nil, Data: nil, @@ -725,8 +730,8 @@ func TestResolvePromise(t *testing.T) { }, }, res: &t_api.Response{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.CompletePromiseResponse{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusCreated, Promise: &promise.Promise{ Id: "foo", @@ -742,11 +747,12 @@ func TestResolvePromise(t *testing.T) { Id: "foo", }, req: &t_api.Request{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.ResolvePromiseRequest{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ Id: "foo", IdempotencyKey: nil, Strict: false, + State: promise.Resolved, Value: promise.Value{ Headers: nil, Data: nil, @@ -754,8 +760,8 @@ func TestResolvePromise(t *testing.T) { }, }, res: &t_api.Response{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.CompletePromiseResponse{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusPromiseAlreadyRejected, Promise: &promise.Promise{ Id: "foo", @@ -824,11 +830,12 @@ func TestRejectPromise(t *testing.T) { }, }, req: &t_api.Request{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.RejectPromiseRequest{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ Id: "foo", IdempotencyKey: util.ToPointer(idempotency.Key("bar")), Strict: true, + State: promise.Rejected, Value: promise.Value{ Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, Data: []byte("cancel"), @@ -836,8 +843,8 @@ func TestRejectPromise(t *testing.T) { }, }, res: &t_api.Response{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.CompletePromiseResponse{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusCreated, Promise: &promise.Promise{ Id: "foo", @@ -853,11 +860,12 @@ func TestRejectPromise(t *testing.T) { Id: "foo", }, req: &t_api.Request{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.RejectPromiseRequest{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ Id: "foo", IdempotencyKey: nil, Strict: false, + State: promise.Rejected, Value: promise.Value{ Headers: nil, Data: nil, @@ -865,8 +873,8 @@ func TestRejectPromise(t *testing.T) { }, }, res: &t_api.Response{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.CompletePromiseResponse{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusCreated, Promise: &promise.Promise{ Id: "foo", @@ -882,11 +890,12 @@ func TestRejectPromise(t *testing.T) { Id: "foo", }, req: &t_api.Request{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.RejectPromiseRequest{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ Id: "foo", IdempotencyKey: nil, Strict: false, + State: promise.Rejected, Value: promise.Value{ Headers: nil, Data: nil, @@ -894,8 +903,8 @@ func TestRejectPromise(t *testing.T) { }, }, res: &t_api.Response{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.CompletePromiseResponse{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusPromiseAlreadyResolved, Promise: &promise.Promise{ Id: "foo", diff --git a/internal/app/subsystems/api/http/http_test.go b/internal/app/subsystems/api/http/http_test.go index 904b6dd4..1912a11e 100644 --- a/internal/app/subsystems/api/http/http_test.go +++ b/internal/app/subsystems/api/http/http_test.go @@ -413,11 +413,12 @@ func TestHttpServer(t *testing.T) { } }`), req: &t_api.Request{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CancelPromiseRequest{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ Id: "foo/bar", IdempotencyKey: util.ToPointer(idempotency.Key("bar")), Strict: true, + State: promise.Canceled, Value: promise.Value{ Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, Data: []byte("cancel"), @@ -425,8 +426,8 @@ func TestHttpServer(t *testing.T) { }, }, res: &t_api.Response{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CompletePromiseResponse{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusCreated, Promise: &promise.Promise{ Id: "foo/bar", @@ -444,11 +445,12 @@ func TestHttpServer(t *testing.T) { "state": "REJECTED_CANCELED" }`), req: &t_api.Request{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CancelPromiseRequest{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ Id: "foo", IdempotencyKey: nil, Strict: false, + State: promise.Canceled, Value: promise.Value{ Headers: nil, Data: nil, @@ -456,8 +458,8 @@ func TestHttpServer(t *testing.T) { }, }, res: &t_api.Response{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CompletePromiseResponse{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusCreated, Promise: &promise.Promise{ Id: "foo", @@ -483,11 +485,12 @@ func TestHttpServer(t *testing.T) { } }`), req: &t_api.Request{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.ResolvePromiseRequest{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ Id: "foo/bar", IdempotencyKey: util.ToPointer(idempotency.Key("bar")), Strict: true, + State: promise.Resolved, Value: promise.Value{ Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, Data: []byte("resolve"), @@ -495,8 +498,8 @@ func TestHttpServer(t *testing.T) { }, }, res: &t_api.Response{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.CompletePromiseResponse{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusCreated, Promise: &promise.Promise{ Id: "foo/bar", @@ -514,11 +517,12 @@ func TestHttpServer(t *testing.T) { "state": "RESOLVED" }`), req: &t_api.Request{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.ResolvePromiseRequest{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ Id: "foo", IdempotencyKey: nil, Strict: false, + State: promise.Resolved, Value: promise.Value{ Headers: nil, Data: nil, @@ -526,8 +530,8 @@ func TestHttpServer(t *testing.T) { }, }, res: &t_api.Response{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.CompletePromiseResponse{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusCreated, Promise: &promise.Promise{ Id: "foo", @@ -553,11 +557,12 @@ func TestHttpServer(t *testing.T) { } }`), req: &t_api.Request{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.RejectPromiseRequest{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ Id: "foo/bar", IdempotencyKey: util.ToPointer(idempotency.Key("bar")), Strict: true, + State: promise.Rejected, Value: promise.Value{ Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, Data: []byte("reject"), @@ -565,8 +570,8 @@ func TestHttpServer(t *testing.T) { }, }, res: &t_api.Response{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.CompletePromiseResponse{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusCreated, Promise: &promise.Promise{ Id: "foo/bar", @@ -584,11 +589,12 @@ func TestHttpServer(t *testing.T) { "state": "REJECTED" }`), req: &t_api.Request{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.RejectPromiseRequest{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ Id: "foo", IdempotencyKey: nil, Strict: false, + State: promise.Rejected, Value: promise.Value{ Headers: nil, Data: nil, @@ -596,8 +602,8 @@ func TestHttpServer(t *testing.T) { }, }, res: &t_api.Response{ - Kind: t_api.ResolvePromise, - RejectPromise: &t_api.CompletePromiseResponse{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ Status: t_api.StatusCreated, Promise: &promise.Promise{ Id: "foo", diff --git a/internal/app/subsystems/api/http/promise.go b/internal/app/subsystems/api/http/promise.go index 0979ad2e..c4fe3a19 100644 --- a/internal/app/subsystems/api/http/promise.go +++ b/internal/app/subsystems/api/http/promise.go @@ -3,7 +3,6 @@ package http import ( "errors" "net/http" - "strings" "github.com/resonatehq/resonate/internal/api" "github.com/resonatehq/resonate/internal/app/subsystems/api/service" @@ -122,17 +121,12 @@ func (s *server) completePromise(c *gin.Context) { err error ) - switch strings.ToUpper(body.State) { - case promise.Resolved.String(): - resp, err = s.service.ResolvePromise(id, &header, body) - case promise.Rejected.String(): - resp, err = s.service.RejectPromise(id, &header, body) - case promise.Canceled.String(): - resp, err = s.service.CancelPromise(id, &header, body) - default: + if !body.State.In(promise.Resolved | promise.Rejected | promise.Canceled) { c.JSON(http.StatusBadRequest, api.HandleValidationError(errors.New("invalid state"))) + return } + resp, err = s.service.CompletePromise(id, body.State, &header, body) if err != nil { var apiErr *api.APIErrorResponse if errors.As(err, &apiErr) { diff --git a/internal/app/subsystems/api/service/promise.go b/internal/app/subsystems/api/service/promise.go index da04c5eb..b7de7ac3 100644 --- a/internal/app/subsystems/api/service/promise.go +++ b/internal/app/subsystems/api/service/promise.go @@ -172,91 +172,19 @@ func (s *Service) CreatePromise(header *CreatePromiseHeader, body *promise.Promi return cqe.Completion.CreatePromise, nil } -// Cancel Promise - -func (s *Service) CancelPromise(id string, header *CompletePromiseHeader, body *CompletePromiseBody) (*t_api.CompletePromiseResponse, error) { - cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1) - - s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Metadata: s.metadata(header.RequestId, "cancel-promise"), - Submission: &t_api.Request{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CancelPromiseRequest{ - Id: id, - IdempotencyKey: header.IdempotencyKey, - Strict: header.Strict, - Value: body.Value, - }, - }, - Callback: s.sendOrPanic(cq), - }) - - cqe := <-cq - if cqe.Error != nil { - var resErr *t_api.ResonateError - util.Assert(errors.As(cqe.Error, &resErr), "err must be a ResonateError") - return nil, api.HandleResonateError(resErr) - } - - util.Assert(cqe.Completion.CancelPromise != nil, "response must not be nil") - - if api.IsRequestError(cqe.Completion.CancelPromise.Status) { - return nil, api.HandleRequestError(cqe.Completion.CancelPromise.Status) - } - - // success - return cqe.Completion.CancelPromise, nil -} - -// Resolve Promise - -func (s *Service) ResolvePromise(id string, header *CompletePromiseHeader, body *CompletePromiseBody) (*t_api.CompletePromiseResponse, error) { - cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1) - - s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Metadata: s.metadata(header.RequestId, "resolve-promise"), - Submission: &t_api.Request{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.ResolvePromiseRequest{ - Id: id, - IdempotencyKey: header.IdempotencyKey, - Strict: header.Strict, - Value: body.Value, - }, - }, - Callback: s.sendOrPanic(cq), - }) - - cqe := <-cq - if cqe.Error != nil { - var resErr *t_api.ResonateError - util.Assert(errors.As(cqe.Error, &resErr), "err must be a ResonateError") - return nil, api.HandleResonateError(resErr) - } - - util.Assert(cqe.Completion.ResolvePromise != nil, "response must not be nil") - - if api.IsRequestError(cqe.Completion.ResolvePromise.Status) { - return nil, api.HandleRequestError(cqe.Completion.ResolvePromise.Status) - } - - // success - return cqe.Completion.ResolvePromise, nil -} - -// Reject Promise - -func (s *Service) RejectPromise(id string, header *CompletePromiseHeader, body *CompletePromiseBody) (*t_api.CompletePromiseResponse, error) { +// Complete Promise +func (s *Service) CompletePromise(id string, state promise.State, header *CompletePromiseHeader, body *CompletePromiseBody) (*t_api.CompletePromiseResponse, error) { cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1) s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Metadata: s.metadata(header.RequestId, "reject-promise"), + Metadata: s.metadata(header.RequestId, "complete-promise"), Submission: &t_api.Request{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.RejectPromiseRequest{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ Id: id, IdempotencyKey: header.IdempotencyKey, Strict: header.Strict, + State: state, Value: body.Value, }, }, @@ -270,12 +198,12 @@ func (s *Service) RejectPromise(id string, header *CompletePromiseHeader, body * return nil, api.HandleResonateError(resErr) } - util.Assert(cqe.Completion.RejectPromise != nil, "response must not be nil") + util.Assert(cqe.Completion.CompletePromise != nil, "response must not be nil") - if api.IsRequestError(cqe.Completion.RejectPromise.Status) { - return nil, api.HandleRequestError(cqe.Completion.RejectPromise.Status) + if api.IsRequestError(cqe.Completion.CompletePromise.Status) { + return nil, api.HandleRequestError(cqe.Completion.CompletePromise.Status) } // success - return cqe.Completion.RejectPromise, nil + return cqe.Completion.CompletePromise, nil } diff --git a/internal/app/subsystems/api/service/request.go b/internal/app/subsystems/api/service/request.go index 9174b9ef..0ced0800 100644 --- a/internal/app/subsystems/api/service/request.go +++ b/internal/app/subsystems/api/service/request.go @@ -32,7 +32,7 @@ type CompletePromiseHeader struct { } type CompletePromiseBody struct { - State string `json:"state" binding:"required,oneofcaseinsensitive=resolved rejected rejected_canceled"` + State promise.State `json:"state" binding:"required"` Value promise.Value `json:"value"` } diff --git a/internal/kernel/t_api/api.go b/internal/kernel/t_api/api.go index 4dbb757e..ff6493d8 100644 --- a/internal/kernel/t_api/api.go +++ b/internal/kernel/t_api/api.go @@ -7,9 +7,7 @@ const ( ReadPromise Kind = iota SearchPromises CreatePromise - CancelPromise - ResolvePromise - RejectPromise + CompletePromise // SCHEDULES ReadSchedule @@ -40,12 +38,8 @@ func (k Kind) String() string { return "search-promises" case CreatePromise: return "create-promise" - case CancelPromise: - return "cancel-promise" - case ResolvePromise: - return "resolve-promise" - case RejectPromise: - return "reject-promise" + case CompletePromise: + return "complete-promise" // SCHEDULES case ReadSchedule: return "read-schedule" diff --git a/internal/kernel/t_api/request.go b/internal/kernel/t_api/request.go index 933e26a4..18b4124e 100644 --- a/internal/kernel/t_api/request.go +++ b/internal/kernel/t_api/request.go @@ -14,12 +14,10 @@ type Request struct { Kind Kind // PROMISES - ReadPromise *ReadPromiseRequest - SearchPromises *SearchPromisesRequest - CreatePromise *CreatePromiseRequest - CancelPromise *CancelPromiseRequest - ResolvePromise *ResolvePromiseRequest - RejectPromise *RejectPromiseRequest + ReadPromise *ReadPromiseRequest + SearchPromises *SearchPromisesRequest + CreatePromise *CreatePromiseRequest + CompletePromise *CompletePromiseRequest // SCHEDULES ReadSchedule *ReadScheduleRequest @@ -64,6 +62,14 @@ type CreatePromiseRequest struct { Tags map[string]string `json:"tags,omitempty"` } +type CompletePromiseRequest struct { + Id string `json:"id"` + IdempotencyKey *idempotency.Key `json:"idemptencyKey,omitempty"` + Strict bool `json:"strict"` + State promise.State `json:"state"` + Value promise.Value `json:"value,omitempty"` +} + type CancelPromiseRequest struct { Id string `json:"id"` IdempotencyKey *idempotency.Key `json:"idemptencyKey,omitempty"` @@ -186,26 +192,13 @@ func (r *Request) String() string { r.CreatePromise.Timeout, r.CreatePromise.Strict, ) - case CancelPromise: - return fmt.Sprintf( - "CancelPromise(id=%s, idempotencyKey=%s, strict=%t)", - r.CancelPromise.Id, - r.CancelPromise.IdempotencyKey, - r.CancelPromise.Strict, - ) - case ResolvePromise: - return fmt.Sprintf( - "ResolvePromise(id=%s, idempotencyKey=%s, strict=%t)", - r.ResolvePromise.Id, - r.ResolvePromise.IdempotencyKey, - r.ResolvePromise.Strict, - ) - case RejectPromise: + case CompletePromise: return fmt.Sprintf( - "RejectPromise(id=%s, idempotencyKey=%s, strict=%t)", - r.RejectPromise.Id, - r.RejectPromise.IdempotencyKey, - r.RejectPromise.Strict, + "CompletePromise(id=%s, state=%s, idempotencyKey=%s, strict=%t)", + r.CompletePromise.Id, + r.CompletePromise.State, + r.CompletePromise.IdempotencyKey, + r.CompletePromise.Strict, ) // SCHEDULES case ReadSchedule: diff --git a/internal/kernel/t_api/response.go b/internal/kernel/t_api/response.go index f2ee0c23..9ac77431 100644 --- a/internal/kernel/t_api/response.go +++ b/internal/kernel/t_api/response.go @@ -12,13 +12,11 @@ import ( type Response struct { Kind Kind - // PROMISES - CreatePromise *CreatePromiseResponse - ReadPromise *ReadPromiseResponse - SearchPromises *SearchPromisesResponse - CancelPromise *CompletePromiseResponse - ResolvePromise *CompletePromiseResponse - RejectPromise *CompletePromiseResponse + // Promises + CreatePromise *CreatePromiseResponse + ReadPromise *ReadPromiseResponse + SearchPromises *SearchPromisesResponse + CompletePromise *CompletePromiseResponse // SCHEDULES CreateSchedule *CreateScheduleResponse @@ -142,26 +140,14 @@ func (r *Response) String() string { case CreatePromise: return fmt.Sprintf( "CreatePromise(status=%d, promise=%s)", - r.CreatePromise.Status, - r.CreatePromise.Promise, - ) - case CancelPromise: - return fmt.Sprintf( - "CancelPromise(status=%d, promise=%s)", - r.CancelPromise.Status, - r.CancelPromise.Promise, - ) - case ResolvePromise: - return fmt.Sprintf( - "ResolvePromise(status=%d, promise=%s)", - r.ResolvePromise.Status, - r.ResolvePromise.Promise, + r.CompletePromise.Status, + r.CompletePromise.Promise, ) - case RejectPromise: + case CompletePromise: return fmt.Sprintf( - "RejectPromise(status=%d, promise=%s)", - r.RejectPromise.Status, - r.RejectPromise.Promise, + "CompletePromise(status=%d, promise=%s)", + r.CompletePromise.Status, + r.CompletePromise.Promise, ) // SCHEDULES diff --git a/test/dst/dst.go b/test/dst/dst.go index 9ff1a09b..68e664c3 100644 --- a/test/dst/dst.go +++ b/test/dst/dst.go @@ -57,15 +57,9 @@ func (d *DST) Run(r *rand.Rand, api api.API, aio aio.AIO, system *system.System, case t_api.CreatePromise: generator.AddRequest(generator.GenerateCreatePromise) model.AddResponse(t_api.CreatePromise, model.ValidatCreatePromise) - case t_api.CancelPromise: + case t_api.CompletePromise: generator.AddRequest(generator.GenerateCancelPromise) - model.AddResponse(t_api.CancelPromise, model.ValidateCancelPromise) - case t_api.ResolvePromise: - generator.AddRequest(generator.GenerateResolvePromise) - model.AddResponse(t_api.ResolvePromise, model.ValidateResolvePromise) - case t_api.RejectPromise: - generator.AddRequest(generator.GenerateRejectPromise) - model.AddResponse(t_api.RejectPromise, model.ValidateRejectPromise) + model.AddResponse(t_api.CompletePromise, model.ValidateCompletePromise) // SCHEDULES case t_api.ReadSchedule: diff --git a/test/dst/dst_test.go b/test/dst/dst_test.go index 8f8c1054..46824976 100644 --- a/test/dst/dst_test.go +++ b/test/dst/dst_test.go @@ -51,9 +51,7 @@ func TestDST(t *testing.T) { system.AddOnRequest(t_api.ReadPromise, coroutines.ReadPromise) system.AddOnRequest(t_api.SearchPromises, coroutines.SearchPromises) system.AddOnRequest(t_api.CreatePromise, coroutines.CreatePromise) - system.AddOnRequest(t_api.CancelPromise, coroutines.CancelPromise) - system.AddOnRequest(t_api.ResolvePromise, coroutines.ResolvePromise) - system.AddOnRequest(t_api.RejectPromise, coroutines.RejectPromise) + system.AddOnRequest(t_api.CompletePromise, coroutines.CompletePromise) system.AddOnRequest(t_api.ReadSchedule, coroutines.ReadSchedule) system.AddOnRequest(t_api.SearchSchedules, coroutines.SearchSchedules) system.AddOnRequest(t_api.CreateSchedule, coroutines.CreateSchedule) @@ -75,9 +73,7 @@ func TestDST(t *testing.T) { t_api.ReadPromise, t_api.SearchPromises, t_api.CreatePromise, - t_api.CancelPromise, - t_api.ResolvePromise, - t_api.RejectPromise, + t_api.CompletePromise, // SCHEDULE t_api.ReadSchedule, diff --git a/test/dst/generator.go b/test/dst/generator.go index b3509cd2..de47d077 100644 --- a/test/dst/generator.go +++ b/test/dst/generator.go @@ -201,9 +201,10 @@ func (g *Generator) GenerateCancelPromise(r *rand.Rand, t int64) *t_api.Request strict := r.Intn(2) == 0 return &t_api.Request{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CancelPromiseRequest{ - Id: id, + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ + Id: id, + State: promise.Canceled, Value: promise.Value{ Headers: headers, Data: data, @@ -222,9 +223,10 @@ func (g *Generator) GenerateResolvePromise(r *rand.Rand, t int64) *t_api.Request strict := r.Intn(2) == 0 return &t_api.Request{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.ResolvePromiseRequest{ - Id: id, + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ + Id: id, + State: promise.Resolved, Value: promise.Value{ Headers: headers, Data: data, @@ -243,9 +245,10 @@ func (g *Generator) GenerateRejectPromise(r *rand.Rand, t int64) *t_api.Request strict := r.Intn(2) == 0 return &t_api.Request{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.RejectPromiseRequest{ - Id: id, + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ + Id: id, + State: promise.Rejected, Value: promise.Value{ Headers: headers, Data: data, diff --git a/test/dst/model.go b/test/dst/model.go index da03ac33..94649053 100644 --- a/test/dst/model.go +++ b/test/dst/model.go @@ -246,15 +246,15 @@ func (m *Model) ValidatCreatePromise(t int64, req *t_api.Request, res *t_api.Res } } -func (m *Model) ValidateCancelPromise(t int64, req *t_api.Request, res *t_api.Response) error { - pm := m.promises.Get(req.CancelPromise.Id) +func (m *Model) ValidateCompletePromise(t int64, req *t_api.Request, res *t_api.Response) error { + pm := m.promises.Get(req.CompletePromise.Id) - switch res.CancelPromise.Status { + switch res.CompletePromise.Status { case t_api.StatusOK: if pm.completed() { - if !pm.idempotencyKeyForCompleteMatch(res.CancelPromise.Promise) { - return fmt.Errorf("ikey mismatch (%s, %s)", pm.promise.IdempotencyKeyForComplete, res.CancelPromise.Promise.IdempotencyKeyForComplete) - } else if req.CancelPromise.Strict && pm.promise.State != promise.Canceled { + if !pm.idempotencyKeyForCompleteMatch(res.CompletePromise.Promise) { + return fmt.Errorf("ikey mismatch (%s, %s)", pm.promise.IdempotencyKeyForComplete, res.CompletePromise.Promise.IdempotencyKeyForComplete) + } else if req.CompletePromise.Strict && pm.promise.State != promise.Canceled { return fmt.Errorf("unexpected state %s when strict true", pm.promise.State) } } @@ -265,11 +265,11 @@ func (m *Model) ValidateCancelPromise(t int64, req *t_api.Request, res *t_api.Re } // update model state - pm.promise = res.CancelPromise.Promise + pm.promise = res.CompletePromise.Promise return nil case t_api.StatusCreated: - if res.CancelPromise.Promise.State != promise.Canceled { - return fmt.Errorf("unexpected state %s after cancel promise", res.CancelPromise.Promise.State) + if res.CompletePromise.Promise.State != promise.Canceled { + return fmt.Errorf("unexpected state %s after cancel promise", res.CompletePromise.Promise.State) } if pm.completed() { return fmt.Errorf("invalid state transition (%s -> %s)", pm.promise.State, promise.Canceled) @@ -281,7 +281,7 @@ func (m *Model) ValidateCancelPromise(t int64, req *t_api.Request, res *t_api.Re } // update model state - pm.promise = res.CancelPromise.Promise + pm.promise = res.CompletePromise.Promise return nil case t_api.StatusPromiseAlreadyResolved, t_api.StatusPromiseAlreadyRejected, t_api.StatusPromiseAlreadyCanceled, t_api.StatusPromiseAlreadyTimedOut: return nil @@ -291,105 +291,7 @@ func (m *Model) ValidateCancelPromise(t int64, req *t_api.Request, res *t_api.Re } return nil default: - return fmt.Errorf("unexpected resonse status '%d'", res.CancelPromise.Status) - } -} - -func (m *Model) ValidateResolvePromise(t int64, req *t_api.Request, res *t_api.Response) error { - pm := m.promises.Get(req.ResolvePromise.Id) - - switch res.ResolvePromise.Status { - case t_api.StatusOK: - if pm.completed() { - if !pm.idempotencyKeyForCompleteMatch(res.ResolvePromise.Promise) { - return fmt.Errorf("ikey mismatch (%s, %s)", pm.promise.IdempotencyKeyForComplete, res.ResolvePromise.Promise.IdempotencyKeyForComplete) - } else if req.ResolvePromise.Strict && pm.promise.State != promise.Resolved { - return fmt.Errorf("unexpected state %s when strict true", pm.promise.State) - } - } - - // delete all subscriptions - for _, sm := range pm.subscriptions { - sm.subscription = nil - } - - // update model state - pm.promise = res.ResolvePromise.Promise - return nil - case t_api.StatusCreated: - if res.ResolvePromise.Promise.State != promise.Resolved { - return fmt.Errorf("unexpected state %s after resolve promise", res.ResolvePromise.Promise.State) - } - if pm.completed() { - return fmt.Errorf("invalid state transition (%s -> %s)", pm.promise.State, promise.Resolved) - } - - // delete all subscriptions - for _, sm := range pm.subscriptions { - sm.subscription = nil - } - - // update model state - pm.promise = res.ResolvePromise.Promise - return nil - case t_api.StatusPromiseAlreadyResolved, t_api.StatusPromiseAlreadyRejected, t_api.StatusPromiseAlreadyCanceled, t_api.StatusPromiseAlreadyTimedOut: - return nil - case t_api.StatusPromiseNotFound: - if pm.promise != nil { - return fmt.Errorf("promise exists %s", pm.promise) - } - return nil - default: - return fmt.Errorf("unexpected resonse status '%d'", res.ResolvePromise.Status) - } -} - -func (m *Model) ValidateRejectPromise(t int64, req *t_api.Request, res *t_api.Response) error { - pm := m.promises.Get(req.RejectPromise.Id) - - switch res.RejectPromise.Status { - case t_api.StatusOK: // dst use the 200 for idempotency,, uggghhh - if pm.completed() { - if !pm.idempotencyKeyForCompleteMatch(res.RejectPromise.Promise) { - return fmt.Errorf("ikey mismatch (%s, %s)", pm.promise.IdempotencyKeyForComplete, res.RejectPromise.Promise.IdempotencyKeyForComplete) - } else if req.RejectPromise.Strict && pm.promise.State != promise.Rejected { - return fmt.Errorf("unexpected state %s when strict true", pm.promise.State) - } - } - - // delete all subscriptions - for _, sm := range pm.subscriptions { - sm.subscription = nil - } - - // update model state - pm.promise = res.RejectPromise.Promise - return nil - case t_api.StatusCreated: - if res.RejectPromise.Promise.State != promise.Rejected { - return fmt.Errorf("unexpected state %s after reject promise", res.RejectPromise.Promise.State) - } - if pm.completed() { - return fmt.Errorf("invalid state transition (%s -> %s)", pm.promise.State, promise.Rejected) - } - - // delete all subscriptions - for _, sm := range pm.subscriptions { - sm.subscription = nil - } - - // update model state - pm.promise = res.RejectPromise.Promise - return nil - case t_api.StatusPromiseAlreadyResolved, t_api.StatusPromiseAlreadyRejected, t_api.StatusPromiseAlreadyCanceled, t_api.StatusPromiseAlreadyTimedOut: - return nil - case t_api.StatusPromiseNotFound: - if pm.promise != nil { - return fmt.Errorf("promise exists %s", pm.promise) - } - return nil - default: - return fmt.Errorf("unexpected resonse status '%d'", res.RejectPromise.Status) + return fmt.Errorf("unexpected resonse status '%d'", res.CompletePromise.Status) } }