Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flags): expose subscriptions api behind feature flag for test harness #110

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
netHttp "net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -162,6 +163,7 @@ var serveCmd = &cobra.Command{

func init() {
// api
serveCmd.Flags().Var(&ExperimentalFeatures{}, "enable-feature", "enable features that are disabled by default since they are considered experimental")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
serveCmd.Flags().Var(&ExperimentalFeatures{}, "enable-feature", "enable features that are disabled by default since they are considered experimental")
serveCmd.Flags().Var(&ExperimentalFeatures{}, "enable-feature", "enable experimental features that are disabled by default")

serveCmd.Flags().Int("api-size", 100, "size of the submission queue buffered channel")
serveCmd.Flags().String("api-http-addr", "0.0.0.0:8001", "http server address")
serveCmd.Flags().Duration("api-http-timeout", 10*time.Second, "http server graceful shutdown timeout")
Expand All @@ -170,6 +172,7 @@ func init() {
_ = viper.BindPFlag("api.size", serveCmd.Flags().Lookup("api-size"))
_ = viper.BindPFlag("api.subsystems.http.addr", serveCmd.Flags().Lookup("api-http-addr"))
_ = viper.BindPFlag("api.subsystems.http.timeout", serveCmd.Flags().Lookup("api-http-timeout"))
_ = viper.BindPFlag("api.subsystems.http.enable", serveCmd.Flags().Lookup("enable-feature"))
_ = viper.BindPFlag("api.subsystems.grpc.addr", serveCmd.Flags().Lookup("api-grpc-addr"))

// aio
Expand Down Expand Up @@ -226,3 +229,28 @@ func init() {
serveCmd.Flags().SortFlags = false
rootCmd.AddCommand(serveCmd)
}

type ExperimentalFeatures []string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this move to the flags.go file?


func (e *ExperimentalFeatures) String() string {
return strings.Join(*e, ",")
}

func (e *ExperimentalFeatures) Set(value string) error {
experimental := strings.Split(value, ",")

for _, feature := range experimental {
switch feature {
case "subscription":
*e = append(*e, feature)
default:
return fmt.Errorf("invalid experimental feature: %s", feature)
}
}

return nil
}

func (e *ExperimentalFeatures) Type() string {
return "stringArray"
}
12 changes: 11 additions & 1 deletion internal/app/subsystems/api/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import (
"context"
"github.com/resonatehq/resonate/internal/app/subsystems/api/service"
"net/http"
"slices"
"time"

"github.com/resonatehq/resonate/internal/app/subsystems/api/service"

"log/slog"

"github.com/gin-gonic/gin"
Expand All @@ -15,6 +17,7 @@
type Config struct {
Addr string
Timeout time.Duration
Enable []string
}

type Http struct {
Expand All @@ -39,6 +42,13 @@
r.POST("/promises/:id/resolve", s.resolvePromise)
r.POST("/promises/:id/reject", s.rejectPromise)

// Subscription API (experimental)
if slices.Contains(config.Enable, "subscription") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if slices.Contains(config.Enable, "subscription") {
if slices.Contains(config.Enable, "subscriptions") {

r.GET("/subscriptions", s.searchSubscriptions)
r.POST("/subscriptions/:id/create", s.createSubscription)
r.POST("/subscriptions/:id/delete", s.deleteSubscription)
}

Check warning on line 50 in internal/app/subsystems/api/http/http.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/http/http.go#L47-L50

Added lines #L47 - L50 were not covered by tests

return &Http{
config: config,
server: &http.Server{
Expand Down
90 changes: 90 additions & 0 deletions internal/app/subsystems/api/http/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package http

import (
"net/http"

"github.com/gin-gonic/gin"
"github.com/resonatehq/resonate/internal/app/subsystems/api/service"
)

// Search Subscription

func (s *server) searchSubscriptions(c *gin.Context) {
var header service.Header
if err := c.ShouldBindHeader(&header); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": err.Error(),
})
return
}

Check warning on line 19 in internal/app/subsystems/api/http/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/http/subscription.go#L12-L19

Added lines #L12 - L19 were not covered by tests

var params service.SearchSubscriptionsParams
if err := c.ShouldBindQuery(&params); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": err.Error(),
})
return
}

Check warning on line 27 in internal/app/subsystems/api/http/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/http/subscription.go#L21-L27

Added lines #L21 - L27 were not covered by tests

resp, err := s.service.SearchSubscriptions(c.Param("id"), &header, &params)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"error": err.Error(),
})
return
}

Check warning on line 35 in internal/app/subsystems/api/http/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/http/subscription.go#L29-L35

Added lines #L29 - L35 were not covered by tests

c.JSON(int(resp.Status), resp.Subscriptions)

Check warning on line 37 in internal/app/subsystems/api/http/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/http/subscription.go#L37

Added line #L37 was not covered by tests
}

// Create Subscription

func (s *server) createSubscription(c *gin.Context) {
var header service.CreateSubscriptionHeader
if err := c.ShouldBindHeader(&header); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": err.Error(),
})
return
}

Check warning on line 49 in internal/app/subsystems/api/http/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/http/subscription.go#L42-L49

Added lines #L42 - L49 were not covered by tests

var body *service.CreateSubscriptionBody
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": err.Error(),
})
return
}

Check warning on line 57 in internal/app/subsystems/api/http/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/http/subscription.go#L51-L57

Added lines #L51 - L57 were not covered by tests

resp, err := s.service.CreateSubscription(c.Param("id"), header, body)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"error": err.Error(),
})
return
}

Check warning on line 65 in internal/app/subsystems/api/http/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/http/subscription.go#L59-L65

Added lines #L59 - L65 were not covered by tests

c.JSON(int(resp.Status), resp.Subscription)

Check warning on line 67 in internal/app/subsystems/api/http/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/http/subscription.go#L67

Added line #L67 was not covered by tests
}

// Delete Subscription

func (s *server) deleteSubscription(c *gin.Context) {
var header service.DeleteSubscriptionHeader
if err := c.ShouldBindHeader(&header); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": err.Error(),
})
return
}

Check warning on line 79 in internal/app/subsystems/api/http/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/http/subscription.go#L72-L79

Added lines #L72 - L79 were not covered by tests

resp, err := s.service.DeleteSubscription(c.Param("id"), nil)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"error": err.Error(),
})
return
}

Check warning on line 87 in internal/app/subsystems/api/http/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/http/subscription.go#L81-L87

Added lines #L81 - L87 were not covered by tests

c.JSON(int(resp.Status), nil)

Check warning on line 89 in internal/app/subsystems/api/http/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/http/subscription.go#L89

Added line #L89 was not covered by tests
}
38 changes: 37 additions & 1 deletion internal/app/subsystems/api/service/request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package service

import "github.com/resonatehq/resonate/pkg/promise"
import (
"github.com/resonatehq/resonate/pkg/promise"
"github.com/resonatehq/resonate/pkg/subscription"
)

type ValidationError struct {
msg string // description of error
Expand All @@ -10,6 +13,8 @@ type Header struct {
RequestId string `header:"request-id"`
}

// Promise

type SearchPromiseParams struct {
Q string `form:"q" json:"q"`
State string `form:"state" json:"state"`
Expand Down Expand Up @@ -58,3 +63,34 @@ type RejectPromiseHeader struct {
type RejectPromiseBody struct {
Value promise.Value `json:"value"`
}

// Subscription

type SearchSubscriptionsParams struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably won't have a search subscriptions API, just a list all subscriptions on a promise (which is also paginated),

// Q string `form:"q" json:"q"`
PromiseId string `json:"promiseId"`
Limit int `form:"limit" json:"limit"`
// Cursor string `form:"cursor" json:"cursor"`
}

type CreateSubscriptionHeader struct {
RequestId string `header:"request-id"` // qq: just for grpc?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an optional parameter for tracking requests cross-systems. If provided this field is used in corresponding logs, if not provided I believe we generate a uuid for each request.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can set in both http/grpc

IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"`
Strict bool `header:"strict"`
}

type CreateSubscriptionBody struct {
PromiseId string `json:"promiseId"`
Url string `json:"url"`
RetryPolicy *subscription.RetryPolicy `json:"retryPolicy"`
}

type DeleteSubscriptionHeader struct {
RequestId string `header:"request-id"`
IdempotencyKey *promise.IdempotencyKey `header:"idempotency-key"`
Strict bool `header:"strict"`
}

type DeleteSubscriptionBody struct {
PromiseId string `json:"promiseId"`
}
93 changes: 93 additions & 0 deletions internal/app/subsystems/api/service/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package service

import (
"github.com/resonatehq/resonate/internal/kernel/bus"
"github.com/resonatehq/resonate/internal/kernel/t_api"
"github.com/resonatehq/resonate/internal/util"
)

// TODO: (experimental)
func (s *Service) SearchSubscriptions(id string, header *Header, params *SearchSubscriptionsParams) (*t_api.ReadSubscriptionsResponse, error) {
cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1)

// TODO: add cursor

// validation
limit := params.Limit
if limit <= 0 || limit > 100 {
limit = 100
}

Check warning on line 19 in internal/app/subsystems/api/service/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/service/subscription.go#L10-L19

Added lines #L10 - L19 were not covered by tests

s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{
Metadata: s.metadata(header.RequestId, "read-subscriptions"),
Submission: &t_api.Request{
Kind: t_api.ReadSubscriptions,
ReadSubscriptions: &t_api.ReadSubscriptionsRequest{
// TODO: do we want query?
PromiseId: params.PromiseId,
Limit: limit,
// SortId: nil,
},
},
Callback: s.sendOrPanic(cq),
})

cqe := <-cq
if cqe.Error != nil {
return nil, cqe.Error
}

Check warning on line 38 in internal/app/subsystems/api/service/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/service/subscription.go#L21-L38

Added lines #L21 - L38 were not covered by tests

util.Assert(cqe.Completion.ReadSubscriptions != nil, "response must not be nil")
return cqe.Completion.ReadSubscriptions, nil

Check warning on line 41 in internal/app/subsystems/api/service/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/service/subscription.go#L40-L41

Added lines #L40 - L41 were not covered by tests
}

func (s *Service) CreateSubscription(id string, header CreateSubscriptionHeader, body *CreateSubscriptionBody) (*t_api.CreateSubscriptionResponse, error) {
cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1)

s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{
Metadata: s.metadata(header.RequestId, "create-subscription"),
Submission: &t_api.Request{
Kind: t_api.CreateSubscription,
CreateSubscription: &t_api.CreateSubscriptionRequest{
Id: id,
PromiseId: body.PromiseId,
Url: body.Url,
RetryPolicy: body.RetryPolicy,
},
},
Callback: s.sendOrPanic(cq),
})

cqe := <-cq
if cqe.Error != nil {
return nil, cqe.Error
}

Check warning on line 64 in internal/app/subsystems/api/service/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/service/subscription.go#L44-L64

Added lines #L44 - L64 were not covered by tests

util.Assert(cqe.Completion.CreateSubscription != nil, "response must not be nil")
return cqe.Completion.CreateSubscription, nil

Check warning on line 67 in internal/app/subsystems/api/service/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/service/subscription.go#L66-L67

Added lines #L66 - L67 were not covered by tests
}

// TODO: (experimental
func (s *Service) DeleteSubscription(id string, header *Header) (*t_api.DeleteSubscriptionResponse, error) {
cq := make(chan *bus.CQE[t_api.Request, t_api.Response], 1)

s.api.Enqueue(&bus.SQE[t_api.Request, t_api.Response]{
Metadata: s.metadata(header.RequestId, "delete-subscription"),
Submission: &t_api.Request{
Kind: t_api.DeleteSubscription,
DeleteSubscription: &t_api.DeleteSubscriptionRequest{
Id: id,
// TODO: should not require promise id
},
},
Callback: s.sendOrPanic(cq),
})

cqe := <-cq
if cqe.Error != nil {
return nil, cqe.Error
}

Check warning on line 89 in internal/app/subsystems/api/service/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/service/subscription.go#L71-L89

Added lines #L71 - L89 were not covered by tests

util.Assert(cqe.Completion.DeleteSubscription != nil, "response must not be nil")
return cqe.Completion.DeleteSubscription, nil

Check warning on line 92 in internal/app/subsystems/api/service/subscription.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/api/service/subscription.go#L91-L92

Added lines #L91 - L92 were not covered by tests
}