diff --git a/internal/app/subsystems/api/grpc/grpc.go b/internal/app/subsystems/api/grpc/grpc.go index 9ef9251b..1580a9ec 100644 --- a/internal/app/subsystems/api/grpc/grpc.go +++ b/internal/app/subsystems/api/grpc/grpc.go @@ -2,15 +2,13 @@ package grpc import ( "context" - "net" - + "github.com/resonatehq/resonate/internal/app/subsystems/api/service" "log/slog" + "net" "github.com/resonatehq/resonate/internal/api" grpcApi "github.com/resonatehq/resonate/internal/app/subsystems/api/grpc/api" - "github.com/resonatehq/resonate/internal/kernel/bus" "github.com/resonatehq/resonate/internal/kernel/t_api" - "github.com/resonatehq/resonate/internal/util" "github.com/resonatehq/resonate/pkg/promise" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -27,7 +25,7 @@ type Grpc struct { } func New(api api.API, config *Config) api.Subsystem { - s := &server{api: api} + s := &server{service: service.New(api, "grpc")} server := grpc.NewServer(grpc.UnaryInterceptor(s.log)) // nosemgrep grpcApi.RegisterPromiseServiceServer(server, s) @@ -64,7 +62,7 @@ func (g *Grpc) String() string { type server struct { grpcApi.UnimplementedPromiseServiceServer - api api.API + service *service.Service } func (s *server) log(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { @@ -75,133 +73,55 @@ func (s *server) log(ctx context.Context, req interface{}, info *grpc.UnaryServe } func (s *server) ReadPromise(ctx context.Context, req *grpcApi.ReadPromiseRequest) (*grpcApi.ReadPromiseResponse, error) { - cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) - defer close(cq) - - s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: "grpc", - Submission: &t_api.Request{ - Kind: t_api.ReadPromise, - ReadPromise: &t_api.ReadPromiseRequest{ - Id: req.Id, - }, - }, - Callback: s.sendOrPanic(cq), - }) - - cqe := <-cq - if cqe.Error != nil { - return nil, grpcStatus.Error(codes.Internal, cqe.Error.Error()) + resp, err := s.service.ReadPromise(req.Id) + if err != nil { + return nil, grpcStatus.Error(codes.Internal, err.Error()) } - util.Assert(cqe.Completion.ReadPromise != nil, "response must not be nil") - return &grpcApi.ReadPromiseResponse{ - Status: protoStatus(cqe.Completion.ReadPromise.Status), - Promise: protoPromise(cqe.Completion.ReadPromise.Promise), + Status: protoStatus(resp.Status), + Promise: protoPromise(resp.Promise), }, nil } func (s *server) SearchPromises(ctx context.Context, req *grpcApi.SearchPromisesRequest) (*grpcApi.SearchPromisesResponse, error) { - cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) - defer close(cq) - - var searchPromises *t_api.SearchPromisesRequest - - if req.Cursor != "" { - cursor, err := t_api.NewCursor[t_api.SearchPromisesRequest](req.Cursor) - if err != nil { - return nil, grpcStatus.Error(codes.InvalidArgument, err.Error()) - } - searchPromises = cursor.Next - } else { - // validate - if req.Q == "" { - return nil, grpcStatus.Error(codes.InvalidArgument, "invalid query") - } - - var states []promise.State - switch req.State { - case grpcApi.SearchState_SEARCH_ALL: - states = []promise.State{ - promise.Pending, - promise.Resolved, - promise.Rejected, - promise.Timedout, - promise.Canceled, - } - case grpcApi.SearchState_SEARCH_PENDING: - states = []promise.State{ - promise.Pending, - } - case grpcApi.SearchState_SEARCH_RESOLVED: - states = []promise.State{ - promise.Resolved, - } - case grpcApi.SearchState_SEARCH_REJECTED: - states = []promise.State{ - promise.Rejected, - promise.Timedout, - promise.Canceled, - } - default: - return nil, grpcStatus.Error(codes.InvalidArgument, "invalid state") - } - - limit := int(req.Limit) - if limit <= 0 || limit > 100 { - limit = 100 - } - - searchPromises = &t_api.SearchPromisesRequest{ - Q: req.Q, - States: states, - Limit: limit, - } + params := &service.SearchPromiseParams{ + Q: req.Q, + State: searchState(req.State), + Limit: int(req.Limit), + Cursor: req.Cursor, } - - s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: "grpc", - Submission: &t_api.Request{ - Kind: t_api.SearchPromises, - SearchPromises: searchPromises, - }, - Callback: s.sendOrPanic(cq), - }) - - cqe := <-cq - if cqe.Error != nil { - return nil, grpcStatus.Error(codes.Internal, cqe.Error.Error()) + resp, err := s.service.SearchPromises(params) + if err != nil { + if verr, ok := err.(*service.ValidationError); ok { + return nil, grpcStatus.Error(codes.InvalidArgument, verr.Error()) + } else { + return nil, grpcStatus.Error(codes.Internal, err.Error()) + } } - util.Assert(cqe.Completion.SearchPromises != nil, "response must not be nil") - - promises := make([]*grpcApi.Promise, len(cqe.Completion.SearchPromises.Promises)) - for i, promise := range cqe.Completion.SearchPromises.Promises { + promises := make([]*grpcApi.Promise, len(resp.Promises)) + for i, promise := range resp.Promises { promises[i] = protoPromise(promise) } cursor := "" - if cqe.Completion.SearchPromises.Cursor != nil { + if resp.Cursor != nil { var err error - cursor, err = cqe.Completion.SearchPromises.Cursor.Encode() + cursor, err = resp.Cursor.Encode() if err != nil { - return nil, grpcStatus.Error(codes.Internal, cqe.Error.Error()) + return nil, grpcStatus.Error(codes.Internal, err.Error()) } - } return &grpcApi.SearchPromisesResponse{ - Status: protoStatus(cqe.Completion.SearchPromises.Status), + Status: protoStatus(resp.Status), Cursor: cursor, Promises: promises, }, nil } func (s *server) CreatePromise(ctx context.Context, req *grpcApi.CreatePromiseRequest) (*grpcApi.CreatePromiseResponse, error) { - cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) - defer close(cq) - var idempotencyKey *promise.IdempotencyKey if req.IdempotencyKey != "" { i := promise.IdempotencyKey(req.IdempotencyKey) @@ -218,41 +138,31 @@ func (s *server) CreatePromise(ctx context.Context, req *grpcApi.CreatePromiseRe data = req.Param.Data } - s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: "grpc", - Submission: &t_api.Request{ - Kind: t_api.CreatePromise, - CreatePromise: &t_api.CreatePromiseRequest{ - Id: req.Id, - IdempotencyKey: idempotencyKey, - Strict: req.Strict, - Param: promise.Value{ - Headers: headers, - Data: data, - }, - Timeout: req.Timeout, - }, - }, - Callback: s.sendOrPanic(cq), - }) + header := &service.CreatePromiseHeader{ + Strict: req.Strict, + IdempotencyKey: idempotencyKey, + } - cqe := <-cq - if cqe.Error != nil { - return nil, grpcStatus.Error(codes.Internal, cqe.Error.Error()) + body := &service.CreatePromiseBody{ + Param: promise.Value{ + Headers: headers, + Data: data, + }, + Timeout: req.Timeout, } - util.Assert(cqe.Completion.CreatePromise != nil, "response must not be nil") + resp, err := s.service.CreatePromise(req.Id, header, body) + if err != nil { + return nil, grpcStatus.Error(codes.Internal, err.Error()) + } return &grpcApi.CreatePromiseResponse{ - Status: protoStatus(cqe.Completion.CreatePromise.Status), - Promise: protoPromise(cqe.Completion.CreatePromise.Promise), + Status: protoStatus(resp.Status), + Promise: protoPromise(resp.Promise), }, nil } func (s *server) CancelPromise(ctx context.Context, req *grpcApi.CancelPromiseRequest) (*grpcApi.CancelPromiseResponse, error) { - cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) - defer close(cq) - var idempotencyKey *promise.IdempotencyKey if req.IdempotencyKey != "" { i := promise.IdempotencyKey(req.IdempotencyKey) @@ -269,40 +179,29 @@ func (s *server) CancelPromise(ctx context.Context, req *grpcApi.CancelPromiseRe data = req.Value.Data } - s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: "grpc", - Submission: &t_api.Request{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CancelPromiseRequest{ - Id: req.Id, - IdempotencyKey: idempotencyKey, - Strict: req.Strict, - Value: promise.Value{ - Headers: headers, - Data: data, - }, - }, - }, - Callback: s.sendOrPanic(cq), - }) - - cqe := <-cq - if cqe.Error != nil { - return nil, grpcStatus.Error(codes.Internal, cqe.Error.Error()) + header := &service.CancelPromiseHeader{ + Strict: req.Strict, + IdempotencyKey: idempotencyKey, } - util.Assert(cqe.Completion.CancelPromise != nil, "response must not be nil") + body := &service.CancelPromiseBody{ + Value: promise.Value{ + Headers: headers, + Data: data, + }, + } + resp, err := s.service.CancelPromise(req.Id, header, body) + if err != nil { + return nil, grpcStatus.Error(codes.Internal, err.Error()) + } return &grpcApi.CancelPromiseResponse{ - Status: protoStatus(cqe.Completion.CancelPromise.Status), - Promise: protoPromise(cqe.Completion.CancelPromise.Promise), + Status: protoStatus(resp.Status), + Promise: protoPromise(resp.Promise), }, nil } func (s *server) ResolvePromise(ctx context.Context, req *grpcApi.ResolvePromiseRequest) (*grpcApi.ResolvePromiseResponse, error) { - cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) - defer close(cq) - var idempotencyKey *promise.IdempotencyKey if req.IdempotencyKey != "" { i := promise.IdempotencyKey(req.IdempotencyKey) @@ -319,39 +218,30 @@ func (s *server) ResolvePromise(ctx context.Context, req *grpcApi.ResolvePromise data = req.Value.Data } - s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: "grpc", - Submission: &t_api.Request{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.ResolvePromiseRequest{ - Id: req.Id, - IdempotencyKey: idempotencyKey, - Strict: req.Strict, - Value: promise.Value{ - Headers: headers, - Data: data, - }, - }, - }, - Callback: s.sendOrPanic(cq), - }) + header := &service.ResolvePromiseHeader{ + Strict: req.Strict, + IdempotencyKey: idempotencyKey, + } - cqe := <-cq - if cqe.Error != nil { - return nil, grpcStatus.Error(codes.Internal, cqe.Error.Error()) + body := &service.ResolvePromiseBody{ + Value: promise.Value{ + Headers: headers, + Data: data, + }, } - util.Assert(cqe.Completion.ResolvePromise != nil, "response must not be nil") + resp, err := s.service.ResolvePromise(req.Id, header, body) + if err != nil { + return nil, grpcStatus.Error(codes.Internal, err.Error()) + } return &grpcApi.ResolvePromiseResponse{ - Status: protoStatus(cqe.Completion.ResolvePromise.Status), - Promise: protoPromise(cqe.Completion.ResolvePromise.Promise), + Status: protoStatus(resp.Status), + Promise: protoPromise(resp.Promise), }, nil } func (s *server) RejectPromise(ctx context.Context, req *grpcApi.RejectPromiseRequest) (*grpcApi.RejectPromiseResponse, error) { - cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) - defer close(cq) var idempotencyKey *promise.IdempotencyKey if req.IdempotencyKey != "" { @@ -369,52 +259,29 @@ func (s *server) RejectPromise(ctx context.Context, req *grpcApi.RejectPromiseRe data = req.Value.Data } - s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: "grpc", - Submission: &t_api.Request{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.RejectPromiseRequest{ - Id: req.Id, - IdempotencyKey: idempotencyKey, - Strict: req.Strict, - Value: promise.Value{ - Headers: headers, - Data: data, - }, - }, - }, - Callback: s.sendOrPanic(cq), - }) + header := &service.RejectPromiseHeader{ + Strict: req.Strict, + IdempotencyKey: idempotencyKey, + } - cqe := <-cq - if cqe.Error != nil { - return nil, grpcStatus.Error(codes.Internal, cqe.Error.Error()) + body := &service.RejectPromiseBody{ + Value: promise.Value{ + Headers: headers, + Data: data, + }, } - util.Assert(cqe.Completion.RejectPromise != nil, "response must not be nil") + resp, err := s.service.RejectPromise(req.Id, header, body) + if err != nil { + return nil, grpcStatus.Error(codes.Internal, err.Error()) + } return &grpcApi.RejectPromiseResponse{ - Status: protoStatus(cqe.Completion.RejectPromise.Status), - Promise: protoPromise(cqe.Completion.RejectPromise.Promise), + Status: protoStatus(resp.Status), + Promise: protoPromise(resp.Promise), }, nil } -func (s *server) sendOrPanic(cq chan *bus.CQE[t_api.Request, t_api.Response]) func(*t_api.Response, error) { - return func(completion *t_api.Response, err error) { - cqe := &bus.CQE[t_api.Request, t_api.Response]{ - Tags: "grpc", - Completion: completion, - Error: err, - } - - select { - case cq <- cqe: - default: - panic("response channel must not block") - } - } -} - func protoStatus(status t_api.ResponseStatus) grpcApi.Status { switch status { case t_api.ResponseOK: @@ -478,3 +345,18 @@ func protoState(state promise.State) grpcApi.State { panic("invalid state") } } + +func searchState(searchState grpcApi.SearchState) string { + switch searchState { + case grpcApi.SearchState_SEARCH_ALL: + return "" + case grpcApi.SearchState_SEARCH_RESOLVED: + return "resolved" + case grpcApi.SearchState_SEARCH_REJECTED: + return "rejected" + case grpcApi.SearchState_SEARCH_PENDING: + return "pending" + default: + panic("invalid state") + } +} diff --git a/internal/app/subsystems/api/http/http.go b/internal/app/subsystems/api/http/http.go index 3887693e..d180f886 100644 --- a/internal/app/subsystems/api/http/http.go +++ b/internal/app/subsystems/api/http/http.go @@ -2,6 +2,7 @@ package http import ( "context" + "github.com/resonatehq/resonate/internal/app/subsystems/api/service" "net/http" "time" @@ -9,8 +10,6 @@ import ( "github.com/gin-gonic/gin" "github.com/resonatehq/resonate/internal/api" - "github.com/resonatehq/resonate/internal/kernel/bus" - "github.com/resonatehq/resonate/internal/kernel/t_api" ) type Config struct { @@ -27,7 +26,7 @@ func New(api api.API, config *Config) api.Subsystem { gin.SetMode(gin.ReleaseMode) r := gin.New() - s := &server{api: api} + s := &server{service: service.New(api, "http")} // Middleware r.Use(s.log) @@ -68,26 +67,10 @@ func (h *Http) String() string { } type server struct { - api api.API + service *service.Service } func (s *server) log(c *gin.Context) { c.Next() slog.Debug("http", "method", c.Request.Method, "url", c.Request.RequestURI, "status", c.Writer.Status()) } - -func (s *server) sendOrPanic(cq chan *bus.CQE[t_api.Request, t_api.Response]) func(*t_api.Response, error) { - return func(completion *t_api.Response, err error) { - cqe := &bus.CQE[t_api.Request, t_api.Response]{ - Tags: "http", - Completion: completion, - Error: err, - } - - select { - case cq <- cqe: - default: - panic("response channel must not block") - } - } -} diff --git a/internal/app/subsystems/api/http/promise.go b/internal/app/subsystems/api/http/promise.go index ea89d806..f8bfb7d0 100644 --- a/internal/app/subsystems/api/http/promise.go +++ b/internal/app/subsystems/api/http/promise.go @@ -1,57 +1,30 @@ package http import ( + "github.com/resonatehq/resonate/internal/app/subsystems/api/service" "net/http" - "strings" "github.com/gin-gonic/gin" - "github.com/resonatehq/resonate/internal/kernel/bus" - "github.com/resonatehq/resonate/internal/kernel/t_api" - "github.com/resonatehq/resonate/internal/util" - "github.com/resonatehq/resonate/pkg/promise" ) // Read Promise func (s *server) readPromise(c *gin.Context) { - cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) - defer close(cq) - - s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: "http", - Submission: &t_api.Request{ - Kind: t_api.ReadPromise, - ReadPromise: &t_api.ReadPromiseRequest{ - Id: c.Param("id"), - }, - }, - Callback: s.sendOrPanic(cq), - }) - - cqe := <-cq - if cqe.Error != nil { + resp, err := s.service.ReadPromise(c.Param("id")) + if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ - "error": cqe.Error.Error(), + "error": err.Error(), }) return } - util.Assert(cqe.Completion.ReadPromise != nil, "response must not be nil") - c.JSON(int(cqe.Completion.ReadPromise.Status), cqe.Completion.ReadPromise.Promise) + c.JSON(int(resp.Status), resp.Promise) } // Search Promise -type SearchPromiseParams struct { - Q string `form:"q" json:"q"` - State string `form:"state" json:"state"` - Limit int `form:"limit" json:"limit"` - Cursor string `form:"cursor" json:"cursor"` -} - func (s *server) searchPromises(c *gin.Context) { - var params SearchPromiseParams - var searchPromises *t_api.SearchPromisesRequest + var params service.SearchPromiseParams if err := c.ShouldBindQuery(¶ms); err != nil { c.JSON(http.StatusBadRequest, gin.H{ @@ -60,109 +33,29 @@ func (s *server) searchPromises(c *gin.Context) { return } - if params.Cursor != "" { - cursor, err := t_api.NewCursor[t_api.SearchPromisesRequest](params.Cursor) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{ - "error": err.Error(), - }) - return - } - searchPromises = cursor.Next - } else { - // validate - if params.Q == "" { - c.JSON(http.StatusBadRequest, gin.H{ - "error": "query must be provided", - }) - return - } + resp, err := s.service.SearchPromises(¶ms) - var states []promise.State - switch strings.ToLower(params.State) { - case "": - states = []promise.State{ - promise.Pending, - promise.Resolved, - promise.Rejected, - promise.Timedout, - promise.Canceled, - } - case "pending": - states = []promise.State{ - promise.Pending, - } - case "resolved": - states = []promise.State{ - promise.Resolved, - } - case "rejected": - states = []promise.State{ - promise.Rejected, - promise.Timedout, - promise.Canceled, - } - default: + if err != nil { + if verr, ok := err.(*service.ValidationError); ok { c.JSON(http.StatusBadRequest, gin.H{ - "error": "state must be one of: pending, resolved, rejected", + "error": verr.Error(), + }) + } else { + c.JSON(http.StatusInternalServerError, gin.H{ + "error": err.Error(), }) - return - } - - limit := params.Limit - if limit <= 0 || limit > 100 { - limit = 100 - } - - searchPromises = &t_api.SearchPromisesRequest{ - Q: params.Q, - States: states, - Limit: limit, } - } - - cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) - defer close(cq) - - s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: "http", - Submission: &t_api.Request{ - Kind: t_api.SearchPromises, - SearchPromises: searchPromises, - }, - Callback: s.sendOrPanic(cq), - }) - - cqe := <-cq - if cqe.Error != nil { - c.JSON(http.StatusInternalServerError, gin.H{ - "error": cqe.Error.Error(), - }) return } - - util.Assert(cqe.Completion.SearchPromises != nil, "response must not be nil") - c.JSON(int(cqe.Completion.SearchPromises.Status), gin.H{ - "cursor": cqe.Completion.SearchPromises.Cursor, - "promises": cqe.Completion.SearchPromises.Promises, + c.JSON(int(resp.Status), gin.H{ + "cursor": resp.Cursor, + "promises": resp.Promises, }) } // Create Promise - -type createPromiseHeader struct { - IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"` - Strict bool `header:"strict"` -} - -type createPromiseBody struct { - Param promise.Value `json:"param"` - Timeout int64 `json:"timeout"` - Tags map[string]string `json:"tags"` -} - func (s *server) createPromise(c *gin.Context) { - var header createPromiseHeader + var header service.CreatePromiseHeader if err := c.ShouldBindHeader(&header); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "error": err.Error(), @@ -170,7 +63,7 @@ func (s *server) createPromise(c *gin.Context) { return } - var body *createPromiseBody + var body *service.CreatePromiseBody if err := c.ShouldBindJSON(&body); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "error": err.Error(), @@ -178,50 +71,20 @@ func (s *server) createPromise(c *gin.Context) { return } - cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) - defer close(cq) - - s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: "http", - Submission: &t_api.Request{ - Kind: t_api.CreatePromise, - CreatePromise: &t_api.CreatePromiseRequest{ - Id: c.Param("id"), - IdempotencyKey: header.IdempotencyKey, - Strict: header.Strict, - Param: body.Param, - Timeout: body.Timeout, - Tags: body.Tags, - }, - }, - Callback: s.sendOrPanic(cq), - }) - - cqe := <-cq - if cqe.Error != nil { + resp, err := s.service.CreatePromise(c.Param("id"), &header, body) + if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ - "error": cqe.Error.Error(), + "error": err.Error(), }) return } - util.Assert(cqe.Completion.CreatePromise != nil, "response must not be nil") - c.JSON(int(cqe.Completion.CreatePromise.Status), cqe.Completion.CreatePromise.Promise) + c.JSON(int(resp.Status), resp.Promise) } // Cancel Promise - -type cancelPromiseHeader struct { - IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"` - Strict bool `header:"strict"` -} - -type cancelPromiseBody struct { - Value promise.Value `json:"value"` -} - func (s *server) cancelPromise(c *gin.Context) { - var header cancelPromiseHeader + var header service.CancelPromiseHeader if err := c.ShouldBindHeader(&header); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "error": err.Error(), @@ -229,56 +92,28 @@ func (s *server) cancelPromise(c *gin.Context) { return } - var body *cancelPromiseBody + var body *service.CancelPromiseBody if err := c.ShouldBindJSON(&body); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "error": err.Error(), }) return } - - cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) - defer close(cq) - - s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: "http", - Submission: &t_api.Request{ - Kind: t_api.CancelPromise, - CancelPromise: &t_api.CancelPromiseRequest{ - Id: c.Param("id"), - IdempotencyKey: header.IdempotencyKey, - Strict: header.Strict, - Value: body.Value, - }, - }, - Callback: s.sendOrPanic(cq), - }) - - cqe := <-cq - if cqe.Error != nil { + resp, err := s.service.CancelPromise(c.Param("id"), &header, body) + if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ - "error": cqe.Error.Error(), + "error": err.Error(), }) return } - util.Assert(cqe.Completion.CancelPromise != nil, "response must not be nil") - c.JSON(int(cqe.Completion.CancelPromise.Status), cqe.Completion.CancelPromise.Promise) + c.JSON(int(resp.Status), resp.Promise) } // Resolve Promise -type resolvePromiseHeader struct { - IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"` - Strict bool `header:"strict"` -} - -type resolvePromiseBody struct { - Value promise.Value `json:"value"` -} - func (s *server) resolvePromise(c *gin.Context) { - var header resolvePromiseHeader + var header service.ResolvePromiseHeader if err := c.ShouldBindHeader(&header); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "error": err.Error(), @@ -286,7 +121,7 @@ func (s *server) resolvePromise(c *gin.Context) { return } - var body *resolvePromiseBody + var body *service.ResolvePromiseBody if err := c.ShouldBindJSON(&body); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "error": err.Error(), @@ -294,48 +129,19 @@ func (s *server) resolvePromise(c *gin.Context) { return } - cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) - defer close(cq) - - s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: "http", - Submission: &t_api.Request{ - Kind: t_api.ResolvePromise, - ResolvePromise: &t_api.ResolvePromiseRequest{ - Id: c.Param("id"), - IdempotencyKey: header.IdempotencyKey, - Strict: header.Strict, - Value: body.Value, - }, - }, - Callback: s.sendOrPanic(cq), - }) - - cqe := <-cq - if cqe.Error != nil { + resp, err := s.service.ResolvePromise(c.Param("id"), &header, body) + if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ - "error": cqe.Error.Error(), + "error": err.Error(), }) return } - - util.Assert(cqe.Completion.ResolvePromise != nil, "response must not be nil") - c.JSON(int(cqe.Completion.ResolvePromise.Status), cqe.Completion.ResolvePromise.Promise) + c.JSON(int(resp.Status), resp.Promise) } // Reject Promise - -type rejectPromiseHeader struct { - IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"` - Strict bool `header:"strict"` -} - -type rejectPromiseBody struct { - Value promise.Value `json:"value"` -} - func (s *server) rejectPromise(c *gin.Context) { - var header rejectPromiseHeader + var header service.RejectPromiseHeader if err := c.ShouldBindHeader(&header); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "error": err.Error(), @@ -343,39 +149,19 @@ func (s *server) rejectPromise(c *gin.Context) { return } - var body *rejectPromiseBody + var body *service.RejectPromiseBody if err := c.ShouldBindJSON(&body); err != nil { c.JSON(http.StatusBadRequest, gin.H{ "error": err.Error(), }) return } - - cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) - defer close(cq) - - s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ - Tags: "http", - Submission: &t_api.Request{ - Kind: t_api.RejectPromise, - RejectPromise: &t_api.RejectPromiseRequest{ - Id: c.Param("id"), - IdempotencyKey: header.IdempotencyKey, - Strict: header.Strict, - Value: body.Value, - }, - }, - Callback: s.sendOrPanic(cq), - }) - - cqe := <-cq - if cqe.Error != nil { + resp, err := s.service.RejectPromise(c.Param("id"), &header, body) + if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ - "error": cqe.Error.Error(), + "error": err.Error(), }) return } - - util.Assert(cqe.Completion.RejectPromise != nil, "response must not be nil") - c.JSON(int(cqe.Completion.RejectPromise.Status), cqe.Completion.RejectPromise.Promise) + c.JSON(int(resp.Status), resp.Promise) } diff --git a/internal/app/subsystems/api/service/request.go b/internal/app/subsystems/api/service/request.go new file mode 100644 index 00000000..6e0ef144 --- /dev/null +++ b/internal/app/subsystems/api/service/request.go @@ -0,0 +1,52 @@ +package service + +import "github.com/resonatehq/resonate/pkg/promise" + +type ValidationError struct { + msg string // description of error +} + +type SearchPromiseParams struct { + Q string `form:"q" json:"q"` + State string `form:"state" json:"state"` + Limit int `form:"limit" json:"limit"` + Cursor string `form:"cursor" json:"cursor"` +} + +type CreatePromiseHeader struct { + IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"` + Strict bool `header:"strict"` +} + +type CreatePromiseBody struct { + Param promise.Value `json:"param"` + Timeout int64 `json:"timeout"` + Tags map[string]string `json:"tags"` +} + +type CancelPromiseHeader struct { + IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"` + Strict bool `header:"strict"` +} + +type CancelPromiseBody struct { + Value promise.Value `json:"value"` +} + +type ResolvePromiseHeader struct { + IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"` + Strict bool `header:"strict"` +} + +type ResolvePromiseBody struct { + Value promise.Value `json:"value"` +} + +type RejectPromiseHeader struct { + IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"` + Strict bool `header:"strict"` +} + +type RejectPromiseBody struct { + Value promise.Value `json:"value"` +} diff --git a/internal/app/subsystems/api/service/service.go b/internal/app/subsystems/api/service/service.go new file mode 100644 index 00000000..880c5c38 --- /dev/null +++ b/internal/app/subsystems/api/service/service.go @@ -0,0 +1,266 @@ +package service + +import ( + "github.com/resonatehq/resonate/internal/api" + "github.com/resonatehq/resonate/internal/kernel/bus" + "github.com/resonatehq/resonate/internal/kernel/t_api" + "github.com/resonatehq/resonate/internal/util" + "github.com/resonatehq/resonate/pkg/promise" + "strings" +) + +func (e *ValidationError) Error() string { return e.msg } + +type Service struct { + api api.API + serverProtocol string +} + +func New(api api.API, serverProtocol string) *Service { + return &Service{ + api: api, + serverProtocol: serverProtocol, + } +} + +func (s *Service) protocol() string { + return s.serverProtocol +} + +// Read Promise + +func (s *Service) ReadPromise(id string) (*t_api.ReadPromiseResponse, error) { + cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) + defer close(cq) + + s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ + Tags: s.protocol(), + Submission: &t_api.Request{ + Kind: t_api.ReadPromise, + ReadPromise: &t_api.ReadPromiseRequest{ + Id: id, + }, + }, + Callback: s.sendOrPanic(cq), + }) + + cqe := <-cq + if cqe.Error != nil { + return nil, cqe.Error + } + + util.Assert(cqe.Completion.ReadPromise != nil, "response must not be nil") + return cqe.Completion.ReadPromise, nil + +} + +// Search Promise + +func (s *Service) SearchPromises(params *SearchPromiseParams) (*t_api.SearchPromisesResponse, error) { + var searchPromises *t_api.SearchPromisesRequest + if params.Cursor != "" { + cursor, err := t_api.NewCursor[t_api.SearchPromisesRequest](params.Cursor) + if err != nil { + return nil, &ValidationError{msg: err.Error()} + } + searchPromises = cursor.Next + } else { + // validate + if params.Q == "" { + return nil, &ValidationError{msg: "query must be provided"} + } + + var states []promise.State + switch strings.ToLower(params.State) { + case "": + states = []promise.State{ + promise.Pending, + promise.Resolved, + promise.Rejected, + promise.Timedout, + promise.Canceled, + } + case "pending": + states = []promise.State{ + promise.Pending, + } + case "resolved": + states = []promise.State{ + promise.Resolved, + } + case "rejected": + states = []promise.State{ + promise.Rejected, + promise.Timedout, + promise.Canceled, + } + default: + return nil, &ValidationError{"state must be one of: pending, resolved, rejected"} + } + + limit := params.Limit + if limit <= 0 || limit > 100 { + limit = 100 + } + + searchPromises = &t_api.SearchPromisesRequest{ + Q: params.Q, + States: states, + Limit: limit, + } + } + + cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) + defer close(cq) + + s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ + Tags: s.protocol(), + Submission: &t_api.Request{ + Kind: t_api.SearchPromises, + SearchPromises: searchPromises, + }, + Callback: s.sendOrPanic(cq), + }) + + cqe := <-cq + if cqe.Error != nil { + return nil, cqe.Error + } + + util.Assert(cqe.Completion.SearchPromises != nil, "response must not be nil") + return cqe.Completion.SearchPromises, nil +} + +// Create Promise + +func (s *Service) CreatePromise(id string, header *CreatePromiseHeader, body *CreatePromiseBody) (*t_api.CreatePromiseResponse, error) { + cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) + defer close(cq) + + s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ + Tags: s.protocol(), + Submission: &t_api.Request{ + Kind: t_api.CreatePromise, + CreatePromise: &t_api.CreatePromiseRequest{ + Id: id, + IdempotencyKey: header.IdempotencyKey, + Strict: header.Strict, + Param: body.Param, + Timeout: body.Timeout, + Tags: body.Tags, + }, + }, + Callback: s.sendOrPanic(cq), + }) + + cqe := <-cq + if cqe.Error != nil { + return nil, cqe.Error + } + + util.Assert(cqe.Completion.CreatePromise != nil, "response must not be nil") + return cqe.Completion.CreatePromise, nil +} + +// Cancel Promise + +func (s *Service) CancelPromise(id string, header *CancelPromiseHeader, body *CancelPromiseBody) (*t_api.CancelPromiseResponse, error) { + cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) + defer close(cq) + + s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ + Tags: s.protocol(), + Submission: &t_api.Request{ + Kind: t_api.CancelPromise, + CancelPromise: &t_api.CancelPromiseRequest{ + Id: id, + IdempotencyKey: header.IdempotencyKey, + Strict: header.Strict, + Value: body.Value, + }, + }, + Callback: s.sendOrPanic(cq), + }) + + cqe := <-cq + if cqe.Error != nil { + return nil, cqe.Error + } + + util.Assert(cqe.Completion.CancelPromise != nil, "response must not be nil") + return cqe.Completion.CancelPromise, nil +} + +// Resolve Promise + +func (s *Service) ResolvePromise(id string, header *ResolvePromiseHeader, body *ResolvePromiseBody) (*t_api.ResolvePromiseResponse, error) { + cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) + defer close(cq) + + s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ + Tags: s.protocol(), + Submission: &t_api.Request{ + Kind: t_api.ResolvePromise, + ResolvePromise: &t_api.ResolvePromiseRequest{ + Id: id, + IdempotencyKey: header.IdempotencyKey, + Strict: header.Strict, + Value: body.Value, + }, + }, + Callback: s.sendOrPanic(cq), + }) + + cqe := <-cq + if cqe.Error != nil { + return nil, cqe.Error + } + + util.Assert(cqe.Completion.ResolvePromise != nil, "response must not be nil") + return cqe.Completion.ResolvePromise, nil +} + +// Reject Promise + +func (s *Service) RejectPromise(id string, header *RejectPromiseHeader, body *RejectPromiseBody) (*t_api.RejectPromiseResponse, error) { + cq := make(chan *bus.CQE[t_api.Request, t_api.Response]) + defer close(cq) + + s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ + Tags: s.protocol(), + Submission: &t_api.Request{ + Kind: t_api.RejectPromise, + RejectPromise: &t_api.RejectPromiseRequest{ + Id: id, + IdempotencyKey: header.IdempotencyKey, + Strict: header.Strict, + Value: body.Value, + }, + }, + Callback: s.sendOrPanic(cq), + }) + + cqe := <-cq + if cqe.Error != nil { + return nil, cqe.Error + } + + util.Assert(cqe.Completion.RejectPromise != nil, "response must not be nil") + return cqe.Completion.RejectPromise, nil +} + +func (s *Service) sendOrPanic(cq chan *bus.CQE[t_api.Request, t_api.Response]) func(*t_api.Response, error) { + return func(completion *t_api.Response, err error) { + cqe := &bus.CQE[t_api.Request, t_api.Response]{ + Tags: s.protocol(), + Completion: completion, + Error: err, + } + + select { + case cq <- cqe: + default: + panic("response channel must not block") + } + } +} diff --git a/internal/app/subsystems/api/service/service_test.go b/internal/app/subsystems/api/service/service_test.go new file mode 100644 index 00000000..02d9e2e6 --- /dev/null +++ b/internal/app/subsystems/api/service/service_test.go @@ -0,0 +1,648 @@ +package service + +import ( + "fmt" + "github.com/resonatehq/resonate/internal/app/subsystems/api/test" + "github.com/resonatehq/resonate/pkg/promise" + "testing" + + "github.com/resonatehq/resonate/internal/kernel/t_api" + "github.com/stretchr/testify/assert" +) + +type serviceTest struct { + *test.API + service *Service +} + +func setup() *serviceTest { + api := &test.API{} + service := New(api, "local") + + return &serviceTest{ + API: api, + service: service, + } +} + +func TestReadPromise(t *testing.T) { + serviceTest := setup() + + for _, tc := range []struct { + name string + id string + req *t_api.Request + res *t_api.Response + }{ + { + name: "ReadPromise", + id: "foo", + req: &t_api.Request{ + Kind: t_api.ReadPromise, + ReadPromise: &t_api.ReadPromiseRequest{ + Id: "foo", + }, + }, + res: &t_api.Response{ + Kind: t_api.ReadPromise, + ReadPromise: &t_api.ReadPromiseResponse{ + Status: t_api.ResponseOK, + Promise: &promise.Promise{ + Id: "foo", + State: promise.Pending, + }, + }, + }, + }, + { + name: "ReadPromiseNotFound", + id: "bar", + req: &t_api.Request{ + Kind: t_api.ReadPromise, + ReadPromise: &t_api.ReadPromiseRequest{ + Id: "bar", + }, + }, + res: &t_api.Response{ + Kind: t_api.ReadPromise, + ReadPromise: &t_api.ReadPromiseResponse{ + Status: t_api.ResponseNotFound, + Promise: nil, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + serviceTest.Load(t, tc.req, tc.res) + + res, err := serviceTest.service.ReadPromise(tc.id) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, tc.res.ReadPromise, res) + }) + } +} + +func TestSearchPromises(t *testing.T) { + serviceTest := setup() + + for _, tc := range []struct { + name string + serviceReq *SearchPromiseParams + req *t_api.Request + res *t_api.Response + }{ + { + name: "SearchPromises", + serviceReq: &SearchPromiseParams{ + Q: "*", + Limit: 10, + }, + req: &t_api.Request{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesRequest{ + Q: "*", + States: []promise.State{ + promise.Pending, + promise.Resolved, + promise.Rejected, + promise.Timedout, + promise.Canceled, + }, + Limit: 10, + }, + }, + res: &t_api.Response{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesResponse{ + Status: t_api.ResponseOK, + Cursor: nil, + Promises: []*promise.Promise{}, + }, + }, + }, + { + name: "SearchPromisesCursor", + serviceReq: &SearchPromiseParams{ + Cursor: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJOZXh0Ijp7InEiOiIqIiwic3RhdGVzIjpbIlBFTkRJTkciXSwibGltaXQiOjEwLCJzb3J0SWQiOjEwMH19.yQxXjIxRmxdTQcBDHFv8PyXxrkGa90e4OcIzDqPP1rY", + }, + req: &t_api.Request{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesRequest{ + Q: "*", + States: []promise.State{ + promise.Pending, + }, + Limit: 10, + SortId: test.Int64ToPointer(100), + }, + }, + res: &t_api.Response{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesResponse{ + Status: t_api.ResponseOK, + Cursor: &t_api.Cursor[t_api.SearchPromisesRequest]{ + Next: &t_api.SearchPromisesRequest{ + Q: "*", + States: []promise.State{ + promise.Pending, + promise.Resolved, + promise.Rejected, + promise.Timedout, + promise.Canceled, + }, + Limit: 10, + SortId: test.Int64ToPointer(10), + }, + }, + Promises: []*promise.Promise{}, + }, + }, + }, + { + name: "SearchPromisesPending", + serviceReq: &SearchPromiseParams{ + Q: "*", + State: "pending", + Limit: 10, + }, + req: &t_api.Request{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesRequest{ + Q: "*", + States: []promise.State{ + promise.Pending, + }, + Limit: 10, + }, + }, + res: &t_api.Response{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesResponse{ + Status: t_api.ResponseOK, + Cursor: nil, + Promises: []*promise.Promise{}, + }, + }, + }, + { + name: "SearchPromisesResolved", + serviceReq: &SearchPromiseParams{ + Q: "*", + State: "resolved", + Limit: 10, + }, + req: &t_api.Request{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesRequest{ + Q: "*", + States: []promise.State{ + promise.Resolved, + }, + Limit: 10, + }, + }, + res: &t_api.Response{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesResponse{ + Status: t_api.ResponseOK, + Cursor: nil, + Promises: []*promise.Promise{}, + }, + }, + }, + { + name: "SearchPromisesRejected", + serviceReq: &SearchPromiseParams{ + Q: "*", + State: "rejected", + Limit: 10, + }, + req: &t_api.Request{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesRequest{ + Q: "*", + States: []promise.State{ + promise.Rejected, + promise.Timedout, + promise.Canceled, + }, + Limit: 10, + }, + }, + res: &t_api.Response{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesResponse{ + Status: t_api.ResponseOK, + Cursor: nil, + Promises: []*promise.Promise{}, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + serviceTest.Load(t, tc.req, tc.res) + res, err := serviceTest.service.SearchPromises(tc.serviceReq) + if err != nil { + fmt.Println("we are here bro", res.Status) + t.Fatal(err) + } + + assert.Equal(t, tc.res.SearchPromises, res) + }) + } +} + +func TestCreatePromise(t *testing.T) { + serviceTest := setup() + + for _, tc := range []struct { + name string + id string + serviceReqHeader *CreatePromiseHeader + serviceReqBody *CreatePromiseBody + req *t_api.Request + res *t_api.Response + }{ + { + name: "CreatePromise", + id: "foo", + serviceReqHeader: &CreatePromiseHeader{ + IdempotencyKey: test.IdempotencyKeyToPointer("bar"), + Strict: true, + }, + serviceReqBody: &CreatePromiseBody{ + Param: promise.Value{ + Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, + Data: []byte("pending"), + }, + Timeout: 1, + }, + + req: &t_api.Request{ + Kind: t_api.CreatePromise, + CreatePromise: &t_api.CreatePromiseRequest{ + Id: "foo", + IdempotencyKey: test.IdempotencyKeyToPointer("bar"), + Strict: true, + Param: promise.Value{ + Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, + Data: []byte("pending"), + }, + Timeout: 1, + }, + }, + res: &t_api.Response{ + Kind: t_api.CreatePromise, + CreatePromise: &t_api.CreatePromiseResponse{ + Status: t_api.ResponseCreated, + Promise: &promise.Promise{ + Id: "foo", + State: promise.Pending, + }, + }, + }, + }, + { + name: "CreatePromiseMinimal", + id: "foo", + serviceReqHeader: &CreatePromiseHeader{ + IdempotencyKey: nil, + Strict: false, + }, + serviceReqBody: &CreatePromiseBody{ + Param: promise.Value{ + Headers: nil, + Data: nil, + }, + Timeout: 1, + }, + + req: &t_api.Request{ + Kind: t_api.CreatePromise, + CreatePromise: &t_api.CreatePromiseRequest{ + Id: "foo", + IdempotencyKey: nil, + Strict: false, + Param: promise.Value{ + Headers: nil, + Data: nil, + }, + Timeout: 1, + }, + }, + res: &t_api.Response{ + Kind: t_api.CreatePromise, + CreatePromise: &t_api.CreatePromiseResponse{ + Status: t_api.ResponseCreated, + Promise: &promise.Promise{ + Id: "foo", + State: promise.Pending, + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + serviceTest.Load(t, tc.req, tc.res) + res, err := serviceTest.service.CreatePromise(tc.id, tc.serviceReqHeader, tc.serviceReqBody) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, tc.res.CreatePromise, res) + }) + } +} + +func TestCancelPromise(t *testing.T) { + serviceTest := setup() + + for _, tc := range []struct { + name string + id string + serviceReqHeader *CancelPromiseHeader + serviceReqBody *CancelPromiseBody + req *t_api.Request + res *t_api.Response + }{ + { + name: "CancelPromise", + id: "foo", + serviceReqHeader: &CancelPromiseHeader{ + IdempotencyKey: test.IdempotencyKeyToPointer("bar"), + Strict: true, + }, + serviceReqBody: &CancelPromiseBody{ + Value: promise.Value{ + Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, + Data: []byte("cancel"), + }, + }, + req: &t_api.Request{ + Kind: t_api.CancelPromise, + CancelPromise: &t_api.CancelPromiseRequest{ + Id: "foo", + IdempotencyKey: test.IdempotencyKeyToPointer("bar"), + Strict: true, + Value: promise.Value{ + Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, + Data: []byte("cancel"), + }, + }, + }, + res: &t_api.Response{ + Kind: t_api.CancelPromise, + CancelPromise: &t_api.CancelPromiseResponse{ + Status: t_api.ResponseCreated, + Promise: &promise.Promise{ + Id: "foo", + State: promise.Canceled, + }, + }, + }, + }, + { + name: "CancelPromiseMinimal", + id: "foo", + serviceReqHeader: &CancelPromiseHeader{ + IdempotencyKey: nil, + Strict: false, + }, + serviceReqBody: &CancelPromiseBody{ + Value: promise.Value{ + Headers: nil, + Data: nil, + }, + }, + req: &t_api.Request{ + Kind: t_api.CancelPromise, + CancelPromise: &t_api.CancelPromiseRequest{ + Id: "foo", + IdempotencyKey: nil, + Strict: false, + Value: promise.Value{ + Headers: nil, + Data: nil, + }, + }, + }, + res: &t_api.Response{ + Kind: t_api.CancelPromise, + CancelPromise: &t_api.CancelPromiseResponse{ + Status: t_api.ResponseCreated, + Promise: &promise.Promise{ + Id: "foo", + State: promise.Canceled, + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + serviceTest.Load(t, tc.req, tc.res) + + res, err := serviceTest.service.CancelPromise(tc.id, tc.serviceReqHeader, tc.serviceReqBody) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, tc.res.CancelPromise, res) + + }) + } +} + +func TestResolvePromise(t *testing.T) { + serviceTest := setup() + for _, tc := range []struct { + name string + id string + serviceReqHeader *ResolvePromiseHeader + serviceReqBody *ResolvePromiseBody + req *t_api.Request + res *t_api.Response + }{ + { + name: "ResolvePromise", + id: "foo", + serviceReqHeader: &ResolvePromiseHeader{ + IdempotencyKey: test.IdempotencyKeyToPointer("bar"), + Strict: true, + }, + serviceReqBody: &ResolvePromiseBody{ + Value: promise.Value{ + Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, + Data: []byte("cancel"), + }, + }, + req: &t_api.Request{ + Kind: t_api.ResolvePromise, + ResolvePromise: &t_api.ResolvePromiseRequest{ + Id: "foo", + IdempotencyKey: test.IdempotencyKeyToPointer("bar"), + Strict: true, + Value: promise.Value{ + Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, + Data: []byte("cancel"), + }, + }, + }, + res: &t_api.Response{ + Kind: t_api.ResolvePromise, + ResolvePromise: &t_api.ResolvePromiseResponse{ + Status: t_api.ResponseCreated, + Promise: &promise.Promise{ + Id: "foo", + State: promise.Resolved, + }, + }, + }, + }, + { + name: "ResolvePromiseMinimal", + id: "foo", + serviceReqHeader: &ResolvePromiseHeader{ + IdempotencyKey: nil, + Strict: false, + }, + serviceReqBody: &ResolvePromiseBody{ + Value: promise.Value{ + Headers: nil, + Data: nil, + }, + }, + req: &t_api.Request{ + Kind: t_api.ResolvePromise, + ResolvePromise: &t_api.ResolvePromiseRequest{ + Id: "foo", + IdempotencyKey: nil, + Strict: false, + Value: promise.Value{ + Headers: nil, + Data: nil, + }, + }, + }, + res: &t_api.Response{ + Kind: t_api.ResolvePromise, + ResolvePromise: &t_api.ResolvePromiseResponse{ + Status: t_api.ResponseCreated, + Promise: &promise.Promise{ + Id: "foo", + State: promise.Resolved, + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + serviceTest.Load(t, tc.req, tc.res) + res, err := serviceTest.service.ResolvePromise(tc.id, tc.serviceReqHeader, tc.serviceReqBody) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, tc.res.ResolvePromise, res) + }) + } +} + +func TestRejectPromise(t *testing.T) { + serviceTest := setup() + + for _, tc := range []struct { + name string + id string + serviceReqHeader *RejectPromiseHeader + serviceReqBody *RejectPromiseBody + req *t_api.Request + res *t_api.Response + }{ + { + name: "RejectPromise", + id: "foo", + serviceReqHeader: &RejectPromiseHeader{ + IdempotencyKey: test.IdempotencyKeyToPointer("bar"), + Strict: true, + }, + serviceReqBody: &RejectPromiseBody{ + Value: promise.Value{ + Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, + Data: []byte("cancel"), + }, + }, + req: &t_api.Request{ + Kind: t_api.RejectPromise, + RejectPromise: &t_api.RejectPromiseRequest{ + Id: "foo", + IdempotencyKey: test.IdempotencyKeyToPointer("bar"), + Strict: true, + Value: promise.Value{ + Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, + Data: []byte("cancel"), + }, + }, + }, + res: &t_api.Response{ + Kind: t_api.RejectPromise, + RejectPromise: &t_api.RejectPromiseResponse{ + Status: t_api.ResponseCreated, + Promise: &promise.Promise{ + Id: "foo", + State: promise.Rejected, + }, + }, + }, + }, + { + name: "RejectPromiseMinimal", + id: "foo", + serviceReqHeader: &RejectPromiseHeader{ + IdempotencyKey: nil, + Strict: false, + }, + serviceReqBody: &RejectPromiseBody{ + Value: promise.Value{ + Headers: nil, + Data: nil, + }, + }, + req: &t_api.Request{ + Kind: t_api.RejectPromise, + RejectPromise: &t_api.RejectPromiseRequest{ + Id: "foo", + IdempotencyKey: nil, + Strict: false, + Value: promise.Value{ + Headers: nil, + Data: nil, + }, + }, + }, + res: &t_api.Response{ + Kind: t_api.RejectPromise, + RejectPromise: &t_api.RejectPromiseResponse{ + Status: t_api.ResponseCreated, + Promise: &promise.Promise{ + Id: "foo", + State: promise.Rejected, + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + serviceTest.Load(t, tc.req, tc.res) + + res, err := serviceTest.service.RejectPromise(tc.id, tc.serviceReqHeader, tc.serviceReqBody) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, tc.res.RejectPromise, res) + }) + } +}