-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(flags): expose subscriptions api behind feature flag for test harness #110
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ import ( | |
netHttp "net/http" | ||
"os" | ||
"os/signal" | ||
"strings" | ||
"syscall" | ||
"time" | ||
|
||
|
@@ -162,6 +163,7 @@ var serveCmd = &cobra.Command{ | |
|
||
func init() { | ||
// api | ||
serveCmd.Flags().Var(&ExperimentalFeatures{}, "enable-feature", "enable features that are disabled by default since they are considered experimental") | ||
serveCmd.Flags().Int("api-size", 100, "size of the submission queue buffered channel") | ||
serveCmd.Flags().String("api-http-addr", "0.0.0.0:8001", "http server address") | ||
serveCmd.Flags().Duration("api-http-timeout", 10*time.Second, "http server graceful shutdown timeout") | ||
|
@@ -170,6 +172,7 @@ func init() { | |
_ = viper.BindPFlag("api.size", serveCmd.Flags().Lookup("api-size")) | ||
_ = viper.BindPFlag("api.subsystems.http.addr", serveCmd.Flags().Lookup("api-http-addr")) | ||
_ = viper.BindPFlag("api.subsystems.http.timeout", serveCmd.Flags().Lookup("api-http-timeout")) | ||
_ = viper.BindPFlag("api.subsystems.http.enable", serveCmd.Flags().Lookup("enable-feature")) | ||
_ = viper.BindPFlag("api.subsystems.grpc.addr", serveCmd.Flags().Lookup("api-grpc-addr")) | ||
|
||
// aio | ||
|
@@ -226,3 +229,28 @@ func init() { | |
serveCmd.Flags().SortFlags = false | ||
rootCmd.AddCommand(serveCmd) | ||
} | ||
|
||
type ExperimentalFeatures []string | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could this move to the |
||
|
||
func (e *ExperimentalFeatures) String() string { | ||
return strings.Join(*e, ",") | ||
} | ||
|
||
func (e *ExperimentalFeatures) Set(value string) error { | ||
experimental := strings.Split(value, ",") | ||
|
||
for _, feature := range experimental { | ||
switch feature { | ||
case "subscription": | ||
*e = append(*e, feature) | ||
default: | ||
return fmt.Errorf("invalid experimental feature: %s", feature) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (e *ExperimentalFeatures) Type() string { | ||
return "stringArray" | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -2,10 +2,12 @@ | |||||
|
||||||
import ( | ||||||
"context" | ||||||
"github.com/resonatehq/resonate/internal/app/subsystems/api/service" | ||||||
"net/http" | ||||||
"slices" | ||||||
"time" | ||||||
|
||||||
"github.com/resonatehq/resonate/internal/app/subsystems/api/service" | ||||||
|
||||||
"log/slog" | ||||||
|
||||||
"github.com/gin-gonic/gin" | ||||||
|
@@ -15,6 +17,7 @@ | |||||
type Config struct { | ||||||
Addr string | ||||||
Timeout time.Duration | ||||||
Enable []string | ||||||
} | ||||||
|
||||||
type Http struct { | ||||||
|
@@ -39,6 +42,13 @@ | |||||
r.POST("/promises/:id/resolve", s.resolvePromise) | ||||||
r.POST("/promises/:id/reject", s.rejectPromise) | ||||||
|
||||||
// Subscription API (experimental) | ||||||
if slices.Contains(config.Enable, "subscription") { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
r.GET("/subscriptions", s.searchSubscriptions) | ||||||
r.POST("/subscriptions/:id/create", s.createSubscription) | ||||||
r.POST("/subscriptions/:id/delete", s.deleteSubscription) | ||||||
} | ||||||
|
||||||
return &Http{ | ||||||
config: config, | ||||||
server: &http.Server{ | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
package http | ||
|
||
import ( | ||
"net/http" | ||
|
||
"github.com/gin-gonic/gin" | ||
"github.com/resonatehq/resonate/internal/app/subsystems/api/service" | ||
) | ||
|
||
// Search Subscription | ||
|
||
func (s *server) searchSubscriptions(c *gin.Context) { | ||
var header service.Header | ||
if err := c.ShouldBindHeader(&header); err != nil { | ||
c.JSON(http.StatusBadRequest, gin.H{ | ||
"error": err.Error(), | ||
}) | ||
return | ||
} | ||
|
||
var params service.SearchSubscriptionsParams | ||
if err := c.ShouldBindQuery(¶ms); err != nil { | ||
c.JSON(http.StatusBadRequest, gin.H{ | ||
"error": err.Error(), | ||
}) | ||
return | ||
} | ||
|
||
resp, err := s.service.SearchSubscriptions(c.Param("id"), &header, ¶ms) | ||
if err != nil { | ||
c.JSON(http.StatusInternalServerError, gin.H{ | ||
"error": err.Error(), | ||
}) | ||
return | ||
} | ||
|
||
c.JSON(int(resp.Status), resp.Subscriptions) | ||
} | ||
|
||
// Create Subscription | ||
|
||
func (s *server) createSubscription(c *gin.Context) { | ||
var header service.CreateSubscriptionHeader | ||
if err := c.ShouldBindHeader(&header); err != nil { | ||
c.JSON(http.StatusBadRequest, gin.H{ | ||
"error": err.Error(), | ||
}) | ||
return | ||
} | ||
|
||
var body *service.CreateSubscriptionBody | ||
if err := c.ShouldBindJSON(&body); err != nil { | ||
c.JSON(http.StatusBadRequest, gin.H{ | ||
"error": err.Error(), | ||
}) | ||
return | ||
} | ||
|
||
resp, err := s.service.CreateSubscription(c.Param("id"), header, body) | ||
if err != nil { | ||
c.JSON(http.StatusInternalServerError, gin.H{ | ||
"error": err.Error(), | ||
}) | ||
return | ||
} | ||
|
||
c.JSON(int(resp.Status), resp.Subscription) | ||
} | ||
|
||
// Delete Subscription | ||
|
||
func (s *server) deleteSubscription(c *gin.Context) { | ||
var header service.DeleteSubscriptionHeader | ||
if err := c.ShouldBindHeader(&header); err != nil { | ||
c.JSON(http.StatusBadRequest, gin.H{ | ||
"error": err.Error(), | ||
}) | ||
return | ||
} | ||
|
||
resp, err := s.service.DeleteSubscription(c.Param("id"), nil) | ||
if err != nil { | ||
c.JSON(http.StatusInternalServerError, gin.H{ | ||
"error": err.Error(), | ||
}) | ||
return | ||
} | ||
|
||
c.JSON(int(resp.Status), nil) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,9 @@ | ||
package service | ||
|
||
import "github.com/resonatehq/resonate/pkg/promise" | ||
import ( | ||
"github.com/resonatehq/resonate/pkg/promise" | ||
"github.com/resonatehq/resonate/pkg/subscription" | ||
) | ||
|
||
type ValidationError struct { | ||
msg string // description of error | ||
|
@@ -10,6 +13,8 @@ type Header struct { | |
RequestId string `header:"request-id"` | ||
} | ||
|
||
// Promise | ||
|
||
type SearchPromiseParams struct { | ||
Q string `form:"q" json:"q"` | ||
State string `form:"state" json:"state"` | ||
|
@@ -58,3 +63,34 @@ type RejectPromiseHeader struct { | |
type RejectPromiseBody struct { | ||
Value promise.Value `json:"value"` | ||
} | ||
|
||
// Subscription | ||
|
||
type SearchSubscriptionsParams struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably won't have a search subscriptions API, just a list all subscriptions on a promise (which is also paginated), |
||
// Q string `form:"q" json:"q"` | ||
PromiseId string `json:"promiseId"` | ||
Limit int `form:"limit" json:"limit"` | ||
// Cursor string `form:"cursor" json:"cursor"` | ||
} | ||
|
||
type CreateSubscriptionHeader struct { | ||
RequestId string `header:"request-id"` // qq: just for grpc? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an optional parameter for tracking requests cross-systems. If provided this field is used in corresponding logs, if not provided I believe we generate a uuid for each request. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can set in both http/grpc |
||
IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"` | ||
Strict bool `header:"strict"` | ||
} | ||
|
||
type CreateSubscriptionBody struct { | ||
PromiseId string `json:"promiseId"` | ||
Url string `json:"url"` | ||
RetryPolicy *subscription.RetryPolicy `json:"retryPolicy"` | ||
} | ||
|
||
type DeleteSubscriptionHeader struct { | ||
RequestId string `header:"request-id"` | ||
IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"` | ||
Strict bool `header:"strict"` | ||
} | ||
|
||
type DeleteSubscriptionBody struct { | ||
PromiseId string `json:"promiseId"` | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
package service | ||
|
||
import ( | ||
"github.com/resonatehq/resonate/internal/kernel/bus" | ||
"github.com/resonatehq/resonate/internal/kernel/t_api" | ||
"github.com/resonatehq/resonate/internal/util" | ||
) | ||
|
||
// TODO: (experimental) | ||
func (s *Service) SearchSubscriptions(id string, header *Header, params *SearchSubscriptionsParams) (*t_api.ReadSubscriptionsResponse, error) { | ||
cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1) | ||
|
||
// TODO: add cursor | ||
|
||
// validation | ||
limit := params.Limit | ||
if limit <= 0 || limit > 100 { | ||
limit = 100 | ||
} | ||
|
||
s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ | ||
Metadata: s.metadata(header.RequestId, "read-subscriptions"), | ||
Submission: &t_api.Request{ | ||
Kind: t_api.ReadSubscriptions, | ||
ReadSubscriptions: &t_api.ReadSubscriptionsRequest{ | ||
// TODO: do we want query? | ||
PromiseId: params.PromiseId, | ||
Limit: limit, | ||
// SortId: nil, | ||
}, | ||
}, | ||
Callback: s.sendOrPanic(cq), | ||
}) | ||
|
||
cqe := <-cq | ||
if cqe.Error != nil { | ||
return nil, cqe.Error | ||
} | ||
|
||
util.Assert(cqe.Completion.ReadSubscriptions != nil, "response must not be nil") | ||
return cqe.Completion.ReadSubscriptions, nil | ||
} | ||
|
||
func (s *Service) CreateSubscription(id string, header CreateSubscriptionHeader, body *CreateSubscriptionBody) (*t_api.CreateSubscriptionResponse, error) { | ||
cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1) | ||
|
||
s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ | ||
Metadata: s.metadata(header.RequestId, "create-subscription"), | ||
Submission: &t_api.Request{ | ||
Kind: t_api.CreateSubscription, | ||
CreateSubscription: &t_api.CreateSubscriptionRequest{ | ||
Id: id, | ||
PromiseId: body.PromiseId, | ||
Url: body.Url, | ||
RetryPolicy: body.RetryPolicy, | ||
}, | ||
}, | ||
Callback: s.sendOrPanic(cq), | ||
}) | ||
|
||
cqe := <-cq | ||
if cqe.Error != nil { | ||
return nil, cqe.Error | ||
} | ||
|
||
util.Assert(cqe.Completion.CreateSubscription != nil, "response must not be nil") | ||
return cqe.Completion.CreateSubscription, nil | ||
} | ||
|
||
// TODO: (experimental | ||
func (s *Service) DeleteSubscription(id string, header *Header) (*t_api.DeleteSubscriptionResponse, error) { | ||
cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1) | ||
|
||
s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{ | ||
Metadata: s.metadata(header.RequestId, "delete-subscription"), | ||
Submission: &t_api.Request{ | ||
Kind: t_api.DeleteSubscription, | ||
DeleteSubscription: &t_api.DeleteSubscriptionRequest{ | ||
Id: id, | ||
// TODO: should not require promise id | ||
}, | ||
}, | ||
Callback: s.sendOrPanic(cq), | ||
}) | ||
|
||
cqe := <-cq | ||
if cqe.Error != nil { | ||
return nil, cqe.Error | ||
} | ||
|
||
util.Assert(cqe.Completion.DeleteSubscription != nil, "response must not be nil") | ||
return cqe.Completion.DeleteSubscription, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.