From bc1273eda33a46f8ba1eeaa9e9ef35002cb2b8d7 Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Mon, 13 Nov 2023 08:23:32 -0500 Subject: [PATCH 1/2] feat(flags): expose subscriptions api behind feature flag for test harness --- cmd/serve.go | 28 ++++++ internal/app/subsystems/api/http/http.go | 12 ++- .../app/subsystems/api/http/subscription.go | 90 ++++++++++++++++++ .../app/subsystems/api/service/request.go | 38 +++++++- .../subsystems/api/service/subscription.go | 93 +++++++++++++++++++ 5 files changed, 259 insertions(+), 2 deletions(-) create mode 100644 internal/app/subsystems/api/http/subscription.go create mode 100644 internal/app/subsystems/api/service/subscription.go diff --git a/cmd/serve.go b/cmd/serve.go index c860141b..d3733187 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -6,6 +6,7 @@ import ( netHttp "net/http" "os" "os/signal" + "strings" "syscall" "time" @@ -162,6 +163,7 @@ var serveCmd = &cobra.Command{ func init() { // api + serveCmd.Flags().Var(&ExperimentalFeatures{}, "api-enable-feature", "enable features that are disabled by default since they are considered experimental") serveCmd.Flags().Int("api-size", 100, "size of the submission queue buffered channel") serveCmd.Flags().String("api-http-addr", "0.0.0.0:8001", "http server address") serveCmd.Flags().Duration("api-http-timeout", 10*time.Second, "http server graceful shutdown timeout") @@ -170,6 +172,7 @@ func init() { _ = viper.BindPFlag("api.size", serveCmd.Flags().Lookup("api-size")) _ = viper.BindPFlag("api.subsystems.http.addr", serveCmd.Flags().Lookup("api-http-addr")) _ = viper.BindPFlag("api.subsystems.http.timeout", serveCmd.Flags().Lookup("api-http-timeout")) + _ = viper.BindPFlag("api.subsystems.http.enable", serveCmd.Flags().Lookup("api-enable-feature")) _ = viper.BindPFlag("api.subsystems.grpc.addr", serveCmd.Flags().Lookup("api-grpc-addr")) // aio @@ -226,3 +229,28 @@ func init() { serveCmd.Flags().SortFlags = false rootCmd.AddCommand(serveCmd) } + +type ExperimentalFeatures []string + +func (e *ExperimentalFeatures) String() string { + return strings.Join(*e, ",") +} + +func (e *ExperimentalFeatures) Set(value string) error { + experimental := strings.Split(value, ",") + + for _, feature := range experimental { + switch feature { + case "subscription": + *e = append(*e, feature) + default: + return fmt.Errorf("invalid experimental feature: %s", feature) + } + } + + return nil +} + +func (e *ExperimentalFeatures) Type() string { + return "stringArray" +} diff --git a/internal/app/subsystems/api/http/http.go b/internal/app/subsystems/api/http/http.go index d180f886..3b6894af 100644 --- a/internal/app/subsystems/api/http/http.go +++ b/internal/app/subsystems/api/http/http.go @@ -2,10 +2,12 @@ package http import ( "context" - "github.com/resonatehq/resonate/internal/app/subsystems/api/service" "net/http" + "slices" "time" + "github.com/resonatehq/resonate/internal/app/subsystems/api/service" + "log/slog" "github.com/gin-gonic/gin" @@ -15,6 +17,7 @@ import ( type Config struct { Addr string Timeout time.Duration + Enable []string } type Http struct { @@ -39,6 +42,13 @@ func New(api api.API, config *Config) api.Subsystem { r.POST("/promises/:id/resolve", s.resolvePromise) r.POST("/promises/:id/reject", s.rejectPromise) + // Subscription API (experimental) + if slices.Contains(config.Enable, "subscription") { + r.GET("/subscriptions", s.searchSubscriptions) + r.POST("/subscriptions/:id/create", s.createSubscription) + r.POST("/subscriptions/:id/delete", s.deleteSubscription) + } + return &Http{ config: config, server: &http.Server{ diff --git a/internal/app/subsystems/api/http/subscription.go b/internal/app/subsystems/api/http/subscription.go new file mode 100644 index 00000000..a07e047a --- /dev/null +++ b/internal/app/subsystems/api/http/subscription.go @@ -0,0 +1,90 @@ +package http + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/resonatehq/resonate/internal/app/subsystems/api/service" +) + +// Search Subscription + +func (s *server) searchSubscriptions(c *gin.Context) { + var header service.Header + if err := c.ShouldBindHeader(&header); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "error": err.Error(), + }) + return + } + + var params service.SearchSubscriptionsParams + if err := c.ShouldBindQuery(¶ms); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "error": err.Error(), + }) + return + } + + resp, err := s.service.SearchSubscriptions(c.Param("id"), &header, ¶ms) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{ + "error": err.Error(), + }) + return + } + + c.JSON(int(resp.Status), resp.Subscriptions) +} + +// Create Subscription + +func (s *server) createSubscription(c *gin.Context) { + var header service.CreateSubscriptionHeader + if err := c.ShouldBindHeader(&header); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "error": err.Error(), + }) + return + } + + var body *service.CreateSubscriptionBody + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "error": err.Error(), + }) + return + } + + resp, err := s.service.CreateSubscription(c.Param("id"), header, body) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{ + "error": err.Error(), + }) + return + } + + c.JSON(int(resp.Status), resp.Subscription) +} + +// Delete Subscription + +func (s *server) deleteSubscription(c *gin.Context) { + var header service.DeleteSubscriptionHeader + if err := c.ShouldBindHeader(&header); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "error": err.Error(), + }) + return + } + + resp, err := s.service.DeleteSubscription(c.Param("id"), nil) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{ + "error": err.Error(), + }) + return + } + + c.JSON(int(resp.Status), nil) +} diff --git a/internal/app/subsystems/api/service/request.go b/internal/app/subsystems/api/service/request.go index ed79edf6..680330e1 100644 --- a/internal/app/subsystems/api/service/request.go +++ b/internal/app/subsystems/api/service/request.go @@ -1,6 +1,9 @@ package service -import "github.com/resonatehq/resonate/pkg/promise" +import ( + "github.com/resonatehq/resonate/pkg/promise" + "github.com/resonatehq/resonate/pkg/subscription" +) type ValidationError struct { msg string // description of error @@ -10,6 +13,8 @@ type Header struct { RequestId string `header:"request-id"` } +// Promise + type SearchPromiseParams struct { Q string `form:"q" json:"q"` State string `form:"state" json:"state"` @@ -58,3 +63,34 @@ type RejectPromiseHeader struct { type RejectPromiseBody struct { Value promise.Value `json:"value"` } + +// Subscription + +type SearchSubscriptionsParams struct { + // Q string `form:"q" json:"q"` + PromiseId string `json:"promiseId"` + Limit int `form:"limit" json:"limit"` + // Cursor string `form:"cursor" json:"cursor"` +} + +type CreateSubscriptionHeader struct { + RequestId string `header:"request-id"` // qq: just for grpc? + IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"` + Strict bool `header:"strict"` +} + +type CreateSubscriptionBody struct { + PromiseId string `json:"promiseId"` + Url string `json:"url"` + RetryPolicy *subscription.RetryPolicy `json:"retryPolicy"` +} + +type DeleteSubscriptionHeader struct { + RequestId string `header:"request-id"` + IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"` + Strict bool `header:"strict"` +} + +type DeleteSubscriptionBody struct { + PromiseId string `json:"promiseId"` +} diff --git a/internal/app/subsystems/api/service/subscription.go b/internal/app/subsystems/api/service/subscription.go new file mode 100644 index 00000000..f8731f30 --- /dev/null +++ b/internal/app/subsystems/api/service/subscription.go @@ -0,0 +1,93 @@ +package service + +import ( + "github.com/resonatehq/resonate/internal/kernel/bus" + "github.com/resonatehq/resonate/internal/kernel/t_api" + "github.com/resonatehq/resonate/internal/util" +) + +// TODO: (experimental) +func (s *Service) SearchSubscriptions(id string, header *Header, params *SearchSubscriptionsParams) (*t_api.ReadSubscriptionsResponse, error) { + cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1) + + // TODO: add cursor + + // validation + limit := params.Limit + if limit <= 0 || limit > 100 { + limit = 100 + } + + s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ + Metadata: s.metadata(header.RequestId, "read-subscriptions"), + Submission: &t_api.Request{ + Kind: t_api.ReadSubscriptions, + ReadSubscriptions: &t_api.ReadSubscriptionsRequest{ + // TODO: do we want query? + PromiseId: params.PromiseId, + Limit: limit, + // SortId: nil, + }, + }, + Callback: s.sendOrPanic(cq), + }) + + cqe := <-cq + if cqe.Error != nil { + return nil, cqe.Error + } + + util.Assert(cqe.Completion.ReadSubscriptions != nil, "response must not be nil") + return cqe.Completion.ReadSubscriptions, nil +} + +func (s *Service) CreateSubscription(id string, header CreateSubscriptionHeader, body *CreateSubscriptionBody) (*t_api.CreateSubscriptionResponse, error) { + cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1) + + s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ + Metadata: s.metadata(header.RequestId, "create-subscription"), + Submission: &t_api.Request{ + Kind: t_api.CreateSubscription, + CreateSubscription: &t_api.CreateSubscriptionRequest{ + Id: id, + PromiseId: body.PromiseId, + Url: body.Url, + RetryPolicy: body.RetryPolicy, + }, + }, + Callback: s.sendOrPanic(cq), + }) + + cqe := <-cq + if cqe.Error != nil { + return nil, cqe.Error + } + + util.Assert(cqe.Completion.CreateSubscription != nil, "response must not be nil") + return cqe.Completion.CreateSubscription, nil +} + +// TODO: (experimental +func (s *Service) DeleteSubscription(id string, header *Header) (*t_api.DeleteSubscriptionResponse, error) { + cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1) + + s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ + Metadata: s.metadata(header.RequestId, "delete-subscription"), + Submission: &t_api.Request{ + Kind: t_api.DeleteSubscription, + DeleteSubscription: &t_api.DeleteSubscriptionRequest{ + Id: id, + // TODO: should not require promise id + }, + }, + Callback: s.sendOrPanic(cq), + }) + + cqe := <-cq + if cqe.Error != nil { + return nil, cqe.Error + } + + util.Assert(cqe.Completion.DeleteSubscription != nil, "response must not be nil") + return cqe.Completion.DeleteSubscription, nil +} From 0c396696ad4e5a13daa242b88553dc4f2cbd98be Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Tue, 14 Nov 2023 10:25:03 -0500 Subject: [PATCH 2/2] feat(flag): changed name --- cmd/serve.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/serve.go b/cmd/serve.go index d3733187..83d642eb 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -163,7 +163,7 @@ var serveCmd = &cobra.Command{ func init() { // api - serveCmd.Flags().Var(&ExperimentalFeatures{}, "api-enable-feature", "enable features that are disabled by default since they are considered experimental") + serveCmd.Flags().Var(&ExperimentalFeatures{}, "enable-feature", "enable features that are disabled by default since they are considered experimental") serveCmd.Flags().Int("api-size", 100, "size of the submission queue buffered channel") serveCmd.Flags().String("api-http-addr", "0.0.0.0:8001", "http server address") serveCmd.Flags().Duration("api-http-timeout", 10*time.Second, "http server graceful shutdown timeout") @@ -172,7 +172,7 @@ func init() { _ = viper.BindPFlag("api.size", serveCmd.Flags().Lookup("api-size")) _ = viper.BindPFlag("api.subsystems.http.addr", serveCmd.Flags().Lookup("api-http-addr")) _ = viper.BindPFlag("api.subsystems.http.timeout", serveCmd.Flags().Lookup("api-http-timeout")) - _ = viper.BindPFlag("api.subsystems.http.enable", serveCmd.Flags().Lookup("api-enable-feature")) + _ = viper.BindPFlag("api.subsystems.http.enable", serveCmd.Flags().Lookup("enable-feature")) _ = viper.BindPFlag("api.subsystems.grpc.addr", serveCmd.Flags().Lookup("api-grpc-addr")) // aio