diff --git a/cmd/promises/complete.go b/cmd/promises/complete.go index 8e4445fe..99d002ff 100644 --- a/cmd/promises/complete.go +++ b/cmd/promises/complete.go @@ -84,7 +84,7 @@ func CompletePromiseCmds(c client.ResonateClient) []*cobra.Command { } else if resp.StatusCode() == 200 { cmd.Printf("%s promise: %s (deduplicated)\n", state.PastT, id) } else { - cmd.PrintErrln(string(resp.Body)) + cmd.PrintErrln(resp.Status(), string(resp.Body)) } }, } diff --git a/cmd/promises/completes_test.go b/cmd/promises/complete_test.go similarity index 100% rename from cmd/promises/completes_test.go rename to cmd/promises/complete_test.go diff --git a/cmd/promises/create.go b/cmd/promises/create.go index d096ffc1..1933860c 100644 --- a/cmd/promises/create.go +++ b/cmd/promises/create.go @@ -77,7 +77,7 @@ func CreatePromiseCmd(c client.ResonateClient) *cobra.Command { } else if resp.StatusCode() == 200 { cmd.Printf("Created promise: %s (deduplicated)\n", id) } else { - cmd.PrintErrln(string(resp.Body)) + cmd.PrintErrln(resp.Status(), string(resp.Body)) } }, } diff --git a/cmd/promises/get.go b/cmd/promises/get.go index 8779c005..0f484a54 100644 --- a/cmd/promises/get.go +++ b/cmd/promises/get.go @@ -36,7 +36,7 @@ func GetPromiseCmd(c client.ResonateClient) *cobra.Command { } if resp.StatusCode() != 200 { - cmd.PrintErrln(string(resp.Body)) + cmd.PrintErrln(resp.Status(), string(resp.Body)) return } diff --git a/cmd/promises/promises.go b/cmd/promises/promises.go index c00d75ea..9eca3da6 100644 --- a/cmd/promises/promises.go +++ b/cmd/promises/promises.go @@ -12,6 +12,11 @@ import ( ) func NewCmd(c client.ResonateClient) *cobra.Command { + var ( + username string + password string + ) + cmd := &cobra.Command{ Use: "promises", Aliases: []string{"promise"}, @@ -19,6 +24,12 @@ func NewCmd(c client.ResonateClient) *cobra.Command { Run: func(cmd *cobra.Command, args []string) { _ = cmd.Help() }, + PersistentPreRun: func(cmd *cobra.Command, args []string) { + // Set basic auth if provided + if username != "" || password != "" { + c.SetBasicAuth(username, password) + } + }, } // Add subcommands @@ -27,6 +38,10 @@ func NewCmd(c client.ResonateClient) *cobra.Command { cmd.AddCommand(CreatePromiseCmd(c)) cmd.AddCommand(CompletePromiseCmds(c)...) + // Flags + cmd.PersistentFlags().StringVarP(&username, "username", "U", "", "Basic auth username") + cmd.PersistentFlags().StringVarP(&password, "password", "P", "", "Basic auth password") + return cmd } diff --git a/cmd/promises/search.go b/cmd/promises/search.go index b1e8c527..1dc75993 100644 --- a/cmd/promises/search.go +++ b/cmd/promises/search.go @@ -69,7 +69,7 @@ func SearchPromisesCmd(c client.ResonateClient) *cobra.Command { } if resp.StatusCode() != 200 { - cmd.PrintErr(string(resp.Body)) + cmd.PrintErrln(resp.Status(), string(resp.Body)) return } diff --git a/cmd/schedules/create.go b/cmd/schedules/create.go index eb094776..80b6742f 100644 --- a/cmd/schedules/create.go +++ b/cmd/schedules/create.go @@ -82,7 +82,7 @@ func CreateScheduleCmd(c client.ResonateClient) *cobra.Command { } else if resp.StatusCode() == 200 { cmd.Printf("Created schedule: %s (deduplicated)\n", id) } else { - cmd.PrintErrln(string(resp.Body)) + cmd.PrintErrln(resp.Status(), string(resp.Body)) } }, } diff --git a/cmd/schedules/delete.go b/cmd/schedules/delete.go index 0c2c6973..b3547d2e 100644 --- a/cmd/schedules/delete.go +++ b/cmd/schedules/delete.go @@ -34,7 +34,7 @@ func DeleteScheduleCmd(c client.ResonateClient) *cobra.Command { } if resp.StatusCode() != 204 { - cmd.PrintErrln(string(resp.Body)) + cmd.PrintErrln(resp.Status(), string(resp.Body)) return } diff --git a/cmd/schedules/get.go b/cmd/schedules/get.go index 683e59be..15b13f93 100644 --- a/cmd/schedules/get.go +++ b/cmd/schedules/get.go @@ -37,7 +37,7 @@ func GetScheduleCmd(c client.ResonateClient) *cobra.Command { } if resp.StatusCode() != 200 { - cmd.PrintErrln(string(resp.Body)) + cmd.PrintErrln(resp.Status(), string(resp.Body)) return } diff --git a/cmd/schedules/schedules.go b/cmd/schedules/schedules.go index 991d5b9c..fcce3a51 100644 --- a/cmd/schedules/schedules.go +++ b/cmd/schedules/schedules.go @@ -12,6 +12,11 @@ import ( ) func NewCmd(c client.ResonateClient) *cobra.Command { + var ( + username string + password string + ) + cmd := &cobra.Command{ Use: "schedules", Aliases: []string{"schedule"}, @@ -19,6 +24,12 @@ func NewCmd(c client.ResonateClient) *cobra.Command { Run: func(cmd *cobra.Command, args []string) { _ = cmd.Help() }, + PersistentPreRun: func(cmd *cobra.Command, args []string) { + // Set basic auth if provided + if username != "" || password != "" { + c.SetBasicAuth(username, password) + } + }, } // Add subcommands @@ -27,6 +38,10 @@ func NewCmd(c client.ResonateClient) *cobra.Command { cmd.AddCommand(CreateScheduleCmd(c)) cmd.AddCommand(DeleteScheduleCmd(c)) + // Flags + cmd.PersistentFlags().StringVarP(&username, "username", "U", "", "Basic auth username") + cmd.PersistentFlags().StringVarP(&password, "password", "P", "", "Basic auth password") + return cmd } diff --git a/cmd/schedules/search.go b/cmd/schedules/search.go index a8b48dca..01b7f3e0 100644 --- a/cmd/schedules/search.go +++ b/cmd/schedules/search.go @@ -54,7 +54,7 @@ func SearchSchedulesCmd(c client.ResonateClient) *cobra.Command { } if resp.StatusCode() != 200 { - cmd.PrintErr(string(resp.Body)) + cmd.PrintErrln(resp.Status(), string(resp.Body)) return } diff --git a/cmd/serve/serve.go b/cmd/serve/serve.go index 8e681f06..45527920 100644 --- a/cmd/serve/serve.go +++ b/cmd/serve/serve.go @@ -184,22 +184,22 @@ func ServeCmd() *cobra.Command { }, } - // assert - cmd.Flags().Bool("ignore-asserts", false, "ignore-asserts mode") - _ = viper.BindPFlag("ignore-asserts", cmd.Flags().Lookup("ignore-asserts")) - // api cmd.Flags().Int("api-size", 100, "size of the submission queue buffered channel") cmd.Flags().String("api-http-addr", "0.0.0.0:8001", "http server address") cmd.Flags().Duration("api-http-timeout", 10*time.Second, "http server graceful shutdown timeout") cmd.Flags().String("api-grpc-addr", "0.0.0.0:50051", "grpc server address") cmd.Flags().String("api-base-url", "http://localhost:8001", "base url to automatically generate absolute URLs for the server's resources") + cmd.Flags().String("api-http-auth-username", "", "username for basic auth") + cmd.Flags().String("api-http-auth-password", "", "password for basic auth") _ = viper.BindPFlag("api.size", cmd.Flags().Lookup("api-size")) _ = viper.BindPFlag("api.subsystems.http.addr", cmd.Flags().Lookup("api-http-addr")) _ = viper.BindPFlag("api.subsystems.http.timeout", cmd.Flags().Lookup("api-http-timeout")) _ = viper.BindPFlag("api.subsystems.grpc.addr", cmd.Flags().Lookup("api-grpc-addr")) _ = viper.BindPFlag("api.baseUrl", cmd.Flags().Lookup("api-base-url")) + _ = viper.BindPFlag("api.subsystems.http.auth.username", cmd.Flags().Lookup("api-http-auth-username")) + _ = viper.BindPFlag("api.subsystems.http.auth.password", cmd.Flags().Lookup("api-http-auth-password")) // aio // Store @@ -272,10 +272,15 @@ func ServeCmd() *cobra.Command { _ = viper.BindPFlag("system.submissionBatchSize", cmd.Flags().Lookup("system-submission-batch-size")) _ = viper.BindPFlag("system.completionBatchSize", cmd.Flags().Lookup("system-completion-batch-size")) _ = viper.BindPFlag("system.scheduleBatchSize", cmd.Flags().Lookup("system-schedule-batch-size")) + // metrics cmd.Flags().Int("metrics-port", 9090, "prometheus metrics server port") _ = viper.BindPFlag("metrics.port", cmd.Flags().Lookup("metrics-port")) + // assert + cmd.Flags().Bool("ignore-asserts", false, "ignore-asserts mode") + _ = viper.BindPFlag("ignore-asserts", cmd.Flags().Lookup("ignore-asserts")) + cmd.Flags().SortFlags = false return cmd diff --git a/internal/app/subsystems/api/http/http.go b/internal/app/subsystems/api/http/http.go index 998df0f7..839c9d40 100644 --- a/internal/app/subsystems/api/http/http.go +++ b/internal/app/subsystems/api/http/http.go @@ -7,6 +7,7 @@ import ( "github.com/go-playground/validator/v10" "github.com/resonatehq/resonate/internal/app/subsystems/api/service" + "github.com/resonatehq/resonate/internal/util" "log/slog" @@ -15,8 +16,14 @@ import ( "github.com/resonatehq/resonate/internal/api" ) +type Auth struct { + Username string + Password string +} + type Config struct { Addr string + Auth *Auth Timeout time.Duration } @@ -39,26 +46,39 @@ func New(api api.API, config *Config) api.Subsystem { // Middleware r.Use(s.log) + // Authentication + authorized := r.Group("/") + if config.Auth.Username != "" || config.Auth.Password != "" { + util.Assert(config.Auth.Username != "", "http basic auth username is required") + util.Assert(config.Auth.Password != "", "http basic auth password is required") + + accounts := gin.Accounts{ + config.Auth.Username: config.Auth.Password, + } + basicAuthMiddleware := gin.BasicAuth(accounts) + authorized.Use(basicAuthMiddleware) + } + // Promises API - r.POST("/promises", s.createPromise) - r.GET("/promises", s.searchPromises) - r.GET("/promises/*id", s.readPromise) - r.PATCH("/promises/*id", s.completePromise) + authorized.POST("/promises", s.createPromise) + authorized.GET("/promises", s.searchPromises) + authorized.GET("/promises/*id", s.readPromise) + authorized.PATCH("/promises/*id", s.completePromise) // Schedules API - r.POST("/schedules", s.createSchedule) - r.GET("/schedules", s.searchSchedules) - r.GET("/schedules/*id", s.readSchedule) - r.DELETE("/schedules/*id", s.deleteSchedule) + authorized.POST("/schedules", s.createSchedule) + authorized.GET("/schedules", s.searchSchedules) + authorized.GET("/schedules/*id", s.readSchedule) + authorized.DELETE("/schedules/*id", s.deleteSchedule) // Distributed Locks API - r.POST("/locks/acquire", s.acquireLock) - r.POST("/locks/heartbeat", s.heartbeatLocks) - r.POST("/locks/release", s.releaseLock) + authorized.POST("/locks/acquire", s.acquireLock) + authorized.POST("/locks/heartbeat", s.heartbeatLocks) + authorized.POST("/locks/release", s.releaseLock) // Task API - r.POST("/tasks/claim", s.claimTask) - r.POST("/tasks/complete", s.completeTask) + authorized.POST("/tasks/claim", s.claimTask) + authorized.POST("/tasks/complete", s.completeTask) return &Http{ config: config, diff --git a/internal/app/subsystems/api/http/http_test.go b/internal/app/subsystems/api/http/http_test.go index 574965e7..9d0eca84 100644 --- a/internal/app/subsystems/api/http/http_test.go +++ b/internal/app/subsystems/api/http/http_test.go @@ -26,11 +26,12 @@ type httpTest struct { client *http.Client } -func setup() *httpTest { +func setup(auth *Auth) *httpTest { api := &test.API{} errors := make(chan error) subsystem := New(api, &Config{ Addr: "127.0.0.1:8888", + Auth: auth, Timeout: 1 * time.Second, }) @@ -53,947 +54,987 @@ func (t *httpTest) teardown() error { } func TestHttpServer(t *testing.T) { - httpTest := setup() - - for _, tc := range []struct { - name string - path string - method string - headers map[string]string - body []byte - req *t_api.Request - res *t_api.Response - status int + for _, ts := range []struct { + name string + auth *Auth + reqUsername string + reqPassword string + statusOveride int }{ { - name: "ReadPromise", - path: "promises/foo", - method: "GET", - req: &t_api.Request{ - Kind: t_api.ReadPromise, - ReadPromise: &t_api.ReadPromiseRequest{ - Id: "foo", - }, - }, - res: &t_api.Response{ - Kind: t_api.ReadPromise, - ReadPromise: &t_api.ReadPromiseResponse{ - Status: t_api.StatusOK, - Promise: &promise.Promise{ - Id: "foo", - State: promise.Pending, - }, - }, - }, - status: 200, - }, - { - name: "ReadPromiseWithSlash", - path: "promises/foo/bar", - method: "GET", - req: &t_api.Request{ - Kind: t_api.ReadPromise, - ReadPromise: &t_api.ReadPromiseRequest{ - Id: "foo/bar", - }, - }, - res: &t_api.Response{ - Kind: t_api.ReadPromise, - ReadPromise: &t_api.ReadPromiseResponse{ - Status: t_api.StatusOK, - Promise: &promise.Promise{ - Id: "foo/bar", - State: promise.Pending, - }, - }, - }, - status: 200, - }, - { - name: "ReadPromiseNotFound", - path: "promises/bar", - method: "GET", - req: &t_api.Request{ - Kind: t_api.ReadPromise, - ReadPromise: &t_api.ReadPromiseRequest{ - Id: "bar", - }, - }, - res: &t_api.Response{ - Kind: t_api.ReadPromise, - ReadPromise: &t_api.ReadPromiseResponse{ - Status: t_api.StatusPromiseNotFound, - Promise: nil, - }, - }, - status: 404, - }, - { - name: "SearchPromises", - path: "promises?id=*&limit=10", - method: "GET", - req: &t_api.Request{ - Kind: t_api.SearchPromises, - SearchPromises: &t_api.SearchPromisesRequest{ - Id: "*", - States: []promise.State{ - promise.Pending, - promise.Resolved, - promise.Rejected, - promise.Timedout, - promise.Canceled, - }, - Tags: map[string]string{}, - Limit: 10, - }, - }, - res: &t_api.Response{ - Kind: t_api.SearchPromises, - SearchPromises: &t_api.SearchPromisesResponse{ - Status: t_api.StatusOK, - Cursor: nil, - Promises: []*promise.Promise{}, - }, - }, - status: 200, - }, - { - name: "SearchPromisesCursor", - path: "promises?cursor=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJOZXh0Ijp7ImlkIjoiKiIsInN0YXRlcyI6WyJQRU5ESU5HIl0sInRhZ3MiOnt9LCJsaW1pdCI6MTAsInNvcnRJZCI6MTAwfX0.XKusWO-Jl4v7QVIwh5Pn3oIElBvtpf0VPOLJkXPvQLk", - method: "GET", - req: &t_api.Request{ - Kind: t_api.SearchPromises, - SearchPromises: &t_api.SearchPromisesRequest{ - Id: "*", - States: []promise.State{ - promise.Pending, - }, - Tags: map[string]string{}, - Limit: 10, - SortId: util.ToPointer(int64(100)), - }, - }, - res: &t_api.Response{ - Kind: t_api.SearchPromises, - SearchPromises: &t_api.SearchPromisesResponse{ - Status: t_api.StatusOK, - Cursor: nil, // not checked - Promises: []*promise.Promise{}, - }, - }, - status: 200, - }, - { - name: "SearchPromisesPending", - path: "promises?id=*&state=pending&limit=10", - method: "GET", - req: &t_api.Request{ - Kind: t_api.SearchPromises, - SearchPromises: &t_api.SearchPromisesRequest{ - Id: "*", - States: []promise.State{ - promise.Pending, - }, - Tags: map[string]string{}, - Limit: 10, - }, - }, - res: &t_api.Response{ - Kind: t_api.SearchPromises, - SearchPromises: &t_api.SearchPromisesResponse{ - Status: t_api.StatusOK, - Cursor: nil, - Promises: []*promise.Promise{}, - }, - }, - status: 200, - }, - { - name: "SearchPromisesResolved", - path: "promises?id=*&state=resolved&limit=10", - method: "GET", - req: &t_api.Request{ - Kind: t_api.SearchPromises, - SearchPromises: &t_api.SearchPromisesRequest{ - Id: "*", - States: []promise.State{ - promise.Resolved, - }, - Tags: map[string]string{}, - Limit: 10, - }, - }, - res: &t_api.Response{ - Kind: t_api.SearchPromises, - SearchPromises: &t_api.SearchPromisesResponse{ - Status: t_api.StatusOK, - Cursor: nil, - Promises: []*promise.Promise{}, - }, - }, - status: 200, - }, - { - name: "SearchPromisesRejected", - path: "promises?id=*&state=rejected&limit=10", - method: "GET", - req: &t_api.Request{ - Kind: t_api.SearchPromises, - SearchPromises: &t_api.SearchPromisesRequest{ - Id: "*", - States: []promise.State{ - promise.Rejected, - promise.Timedout, - promise.Canceled, - }, - Tags: map[string]string{}, - Limit: 10, - }, - }, - res: &t_api.Response{ - Kind: t_api.SearchPromises, - SearchPromises: &t_api.SearchPromisesResponse{ - Status: t_api.StatusOK, - Cursor: nil, - Promises: []*promise.Promise{}, - }, - }, - status: 200, - }, - { - name: "SearchPromisesTags", - path: "promises?id=*&tags[resonate:invocation]=true&limit=10", - method: "GET", - req: &t_api.Request{ - Kind: t_api.SearchPromises, - SearchPromises: &t_api.SearchPromisesRequest{ - Id: "*", - States: []promise.State{ - promise.Pending, - promise.Resolved, - promise.Rejected, - promise.Timedout, - promise.Canceled, - }, - Tags: map[string]string{ - "resonate:invocation": "true", - }, - Limit: 10, - }, - }, - res: &t_api.Response{ - Kind: t_api.SearchPromises, - SearchPromises: &t_api.SearchPromisesResponse{ - Status: t_api.StatusOK, - Cursor: nil, - Promises: []*promise.Promise{}, - }, - }, - status: 200, - }, - { - name: "SearchPromisesInvalidQuery", - path: "promises?id=", - method: "GET", - req: nil, - res: nil, - status: 400, - }, - { - name: "SearchPromisesInvalidLimit", - path: "promises?id=*&limit=0", - method: "GET", - req: nil, - res: nil, - status: 400, - }, - { - name: "SearchPromisesInvalidState", - path: "promises?id=*&state=x", - method: "GET", - req: nil, - res: nil, - status: 400, - }, - { - name: "SearchPromisesInvalidTags", - path: "promises?id=*&tags=x", - method: "GET", - req: nil, - res: nil, - status: 400, - }, - { - name: "CreatePromise", - path: "promises", - method: "POST", - headers: map[string]string{ - "Idempotency-Key": "bar", - "Strict": "true", - }, - body: []byte(`{ - "id": "foo/bar", - "param": { - "headers": {"a":"a","b":"b","c":"c"}, - "data": "cGVuZGluZw==" - }, - "timeout": 1 - }`), - req: &t_api.Request{ - Kind: t_api.CreatePromise, - CreatePromise: &t_api.CreatePromiseRequest{ - Id: "foo/bar", - IdempotencyKey: util.ToPointer(idempotency.Key("bar")), - Strict: true, - Param: promise.Value{ - Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, - Data: []byte("pending"), - }, - Timeout: 1, - }, - }, - res: &t_api.Response{ - Kind: t_api.CreatePromise, - CreatePromise: &t_api.CreatePromiseResponse{ - Status: t_api.StatusCreated, - Promise: &promise.Promise{ - Id: "foo/bar", - State: promise.Pending, - }, - }, - }, - status: 201, - }, - { - name: "CreatePromiseMinimal", - path: "promises", - method: "POST", - body: []byte(`{ - "id": "foo", - "timeout": 1 - }`), - req: &t_api.Request{ - Kind: t_api.CreatePromise, - CreatePromise: &t_api.CreatePromiseRequest{ - Id: "foo", - IdempotencyKey: nil, - Strict: false, - Param: promise.Value{ - Headers: nil, - Data: nil, - }, - Timeout: 1, - }, - }, - res: &t_api.Response{ - Kind: t_api.CreatePromise, - CreatePromise: &t_api.CreatePromiseResponse{ - Status: t_api.StatusCreated, - Promise: &promise.Promise{ - Id: "foo", - State: promise.Pending, - }, - }, - }, - status: 201, - }, - { - name: "CancelPromise", - path: "promises/foo/bar", - method: "PATCH", - headers: map[string]string{ - "Idempotency-Key": "bar", - "Strict": "true", - }, - body: []byte(`{ - "state": "REJECTED_CANCELED", - "value": { - "headers": {"a":"a","b":"b","c":"c"}, - "data": "Y2FuY2Vs" - } - }`), - req: &t_api.Request{ - Kind: t_api.CompletePromise, - CompletePromise: &t_api.CompletePromiseRequest{ - Id: "foo/bar", - IdempotencyKey: util.ToPointer(idempotency.Key("bar")), - Strict: true, - State: promise.Canceled, - Value: promise.Value{ - Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, - Data: []byte("cancel"), - }, - }, - }, - res: &t_api.Response{ - Kind: t_api.CompletePromise, - CompletePromise: &t_api.CompletePromiseResponse{ - Status: t_api.StatusCreated, - Promise: &promise.Promise{ - Id: "foo/bar", - State: promise.Canceled, - }, - }, - }, - status: 201, - }, - { - name: "CancelPromiseMinimal", - path: "promises/foo", - method: "PATCH", - body: []byte(`{ - "state": "REJECTED_CANCELED" - }`), - req: &t_api.Request{ - Kind: t_api.CompletePromise, - CompletePromise: &t_api.CompletePromiseRequest{ - Id: "foo", - IdempotencyKey: nil, - Strict: false, - State: promise.Canceled, - Value: promise.Value{ - Headers: nil, - Data: nil, - }, - }, - }, - res: &t_api.Response{ - Kind: t_api.CompletePromise, - CompletePromise: &t_api.CompletePromiseResponse{ - Status: t_api.StatusCreated, - Promise: &promise.Promise{ - Id: "foo", - State: promise.Canceled, - }, - }, - }, - status: 201, - }, - { - name: "ResolvePromise", - path: "promises/foo/bar", - method: "PATCH", - headers: map[string]string{ - "Idempotency-Key": "bar", - "Strict": "true", - }, - body: []byte(`{ - "state": "RESOLVED", - "value": { - "headers": {"a":"a","b":"b","c":"c"}, - "data": "cmVzb2x2ZQ==" - } - }`), - req: &t_api.Request{ - Kind: t_api.CompletePromise, - CompletePromise: &t_api.CompletePromiseRequest{ - Id: "foo/bar", - IdempotencyKey: util.ToPointer(idempotency.Key("bar")), - Strict: true, - State: promise.Resolved, - Value: promise.Value{ - Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, - Data: []byte("resolve"), - }, - }, - }, - res: &t_api.Response{ - Kind: t_api.CompletePromise, - CompletePromise: &t_api.CompletePromiseResponse{ - Status: t_api.StatusCreated, - Promise: &promise.Promise{ - Id: "foo/bar", - State: promise.Resolved, - }, - }, - }, - status: 201, - }, - { - name: "ResolvePromiseMinimal", - path: "promises/foo", - method: "PATCH", - body: []byte(`{ - "state": "RESOLVED" - }`), - req: &t_api.Request{ - Kind: t_api.CompletePromise, - CompletePromise: &t_api.CompletePromiseRequest{ - Id: "foo", - IdempotencyKey: nil, - Strict: false, - State: promise.Resolved, - Value: promise.Value{ - Headers: nil, - Data: nil, - }, - }, - }, - res: &t_api.Response{ - Kind: t_api.CompletePromise, - CompletePromise: &t_api.CompletePromiseResponse{ - Status: t_api.StatusCreated, - Promise: &promise.Promise{ - Id: "foo", - State: promise.Resolved, - }, - }, - }, - status: 201, - }, - { - name: "RejectPromise", - path: "promises/foo/bar", - method: "PATCH", - headers: map[string]string{ - "Idempotency-Key": "bar", - "Strict": "true", - }, - body: []byte(`{ - "state": "REJECTED", - "value": { - "headers": {"a":"a","b":"b","c":"c"}, - "data": "cmVqZWN0" - } - }`), - req: &t_api.Request{ - Kind: t_api.CompletePromise, - CompletePromise: &t_api.CompletePromiseRequest{ - Id: "foo/bar", - IdempotencyKey: util.ToPointer(idempotency.Key("bar")), - Strict: true, - State: promise.Rejected, - Value: promise.Value{ - Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, - Data: []byte("reject"), - }, - }, - }, - res: &t_api.Response{ - Kind: t_api.CompletePromise, - CompletePromise: &t_api.CompletePromiseResponse{ - Status: t_api.StatusCreated, - Promise: &promise.Promise{ - Id: "foo/bar", - State: promise.Rejected, - }, - }, - }, - status: 201, - }, - { - name: "RejectPromiseMinimal", - path: "promises/foo", - method: "PATCH", - body: []byte(`{ - "state": "REJECTED" - }`), - req: &t_api.Request{ - Kind: t_api.CompletePromise, - CompletePromise: &t_api.CompletePromiseRequest{ - Id: "foo", - IdempotencyKey: nil, - Strict: false, - State: promise.Rejected, - Value: promise.Value{ - Headers: nil, - Data: nil, - }, - }, - }, - res: &t_api.Response{ - Kind: t_api.CompletePromise, - CompletePromise: &t_api.CompletePromiseResponse{ - Status: t_api.StatusCreated, - Promise: &promise.Promise{ - Id: "foo", - State: promise.Rejected, - }, - }, - }, - status: 201, - }, - { - name: "ReadSchedule", - path: "schedules/foo", - method: "GET", - req: &t_api.Request{ - Kind: t_api.ReadSchedule, - ReadSchedule: &t_api.ReadScheduleRequest{ - Id: "foo", - }, - }, - res: &t_api.Response{ - Kind: t_api.ReadSchedule, - ReadSchedule: &t_api.ReadScheduleResponse{ - Status: t_api.StatusOK, - Schedule: &schedule.Schedule{ - Id: "foo", - Description: "", - Cron: "* * * * * *", - PromiseId: "foo.{{.timestamp}}", - PromiseTimeout: 1000000, - }, - }, - }, - status: 200, - }, - { - name: "SearchSchedules", - path: "schedules?id=*&limit=10", - method: "GET", - req: &t_api.Request{ - Kind: t_api.SearchSchedules, - SearchSchedules: &t_api.SearchSchedulesRequest{ - Id: "*", - Tags: map[string]string{}, - Limit: 10, - }, - }, - res: &t_api.Response{ - Kind: t_api.SearchSchedules, - SearchSchedules: &t_api.SearchSchedulesResponse{ - Status: t_api.StatusOK, - Cursor: nil, - Schedules: []*schedule.Schedule{}, - }, - }, - status: 200, - }, - { - name: "SearchSchedulesTags", - path: "schedules?id=*&tags[foo]=bar&limit=10", - method: "GET", - req: &t_api.Request{ - Kind: t_api.SearchSchedules, - SearchSchedules: &t_api.SearchSchedulesRequest{ - Id: "*", - Tags: map[string]string{ - "foo": "bar", - }, - Limit: 10, - }, - }, - res: &t_api.Response{ - Kind: t_api.SearchSchedules, - SearchSchedules: &t_api.SearchSchedulesResponse{ - Status: t_api.StatusOK, - Cursor: nil, - Schedules: []*schedule.Schedule{}, - }, - }, - status: 200, + name: "NoAuth", + auth: &Auth{}, }, { - name: "SearchSchedulesInvalidQuery", - path: "schedules?id=", - method: "GET", - req: nil, - res: nil, - status: 400, + name: "BasicAuthCorrectCredentials", + auth: &Auth{Username: "username", Password: "password"}, + reqUsername: "username", + reqPassword: "password", }, { - name: "SearchSchedulesInvalidLimit", - path: "schedules?id=*&limit=0", - method: "GET", - req: nil, - res: nil, - status: 400, - }, - { - name: "SearchSchedulesInvalidTags", - path: "schedules?id=*&tags=x", - method: "GET", - req: nil, - res: nil, - status: 400, - }, - { - name: "CreateSchedule", - path: "schedules", - method: "POST", - headers: map[string]string{ - "Idempotency-Key": "bar", - }, - body: []byte(`{ - "id": "foo", - "desc": "", - "cron": "* * * * * *", - "promiseId": "foo.{{.timestamp}}", - "promiseTimeout": 1000000 - }`), - req: &t_api.Request{ - Kind: t_api.CreateSchedule, - CreateSchedule: &t_api.CreateScheduleRequest{ - Id: "foo", - IdempotencyKey: util.ToPointer(idempotency.Key("bar")), - Cron: "* * * * * *", - PromiseId: "foo.{{.timestamp}}", - PromiseTimeout: 1000000, - }, - }, - res: &t_api.Response{ - Kind: t_api.CreateSchedule, - CreateSchedule: &t_api.CreateScheduleResponse{ - Status: t_api.StatusCreated, - Schedule: &schedule.Schedule{ - Id: "foo", - Description: "", - Cron: "* * * * * *", - PromiseId: "foo.{{.timestamp}}", - PromiseTimeout: 1000000, - }, - }, - }, - status: 201, - }, - { - name: "DeleteSchedule", - path: "schedules/foo", - method: "DELETE", - req: &t_api.Request{ - Kind: t_api.DeleteSchedule, - DeleteSchedule: &t_api.DeleteScheduleRequest{ - Id: "foo", - }, - }, - res: &t_api.Response{ - Kind: t_api.DeleteSchedule, - DeleteSchedule: &t_api.DeleteScheduleResponse{ - Status: t_api.StatusNoContent, - }, - }, - status: 204, + name: "BasicAuthIncorrectCredentials", + auth: &Auth{Username: "username", Password: "password"}, + reqUsername: "username", + reqPassword: "notthepassword", + statusOveride: 401, }, + } { + // start the server + httpTest := setup(ts.auth) - // Distributed Locks API - { - name: "AcquireLock", - path: "locks/acquire", - method: "POST", - body: []byte(`{ - "resourceId": "foo", - "processId": "bar", - "executionId": "baz", - "expiryInMilliseconds": 10000 - }`), - req: &t_api.Request{ - Kind: t_api.AcquireLock, - AcquireLock: &t_api.AcquireLockRequest{ - ResourceId: "foo", - ProcessId: "bar", - ExecutionId: "baz", - ExpiryInMilliseconds: 10_000, - }, - }, - res: &t_api.Response{ - Kind: t_api.AcquireLock, - AcquireLock: &t_api.AcquireLockResponse{ - Status: t_api.StatusCreated, - Lock: &lock.Lock{ - ResourceId: "foo", - ProcessId: "bar", - ExecutionId: "baz", - ExpiryInMilliseconds: 10_000, - }, - }, - }, - status: 201, - }, - { - name: "AcquireLock missing executionIdc", - path: "locks/acquire", - method: "POST", - body: []byte(`{ - "resourceId": "foo", - "processId": "bar", - "executionId": "", - "expiryInMilliseconds": 10000 - }`), - req: nil, - res: nil, - status: 400, - }, - { - name: "HeartbeatLocks", - path: "locks/heartbeat", - method: "POST", - body: []byte(`{ - "processId": "bar" - }`), - req: &t_api.Request{ - Kind: t_api.HeartbeatLocks, - HeartbeatLocks: &t_api.HeartbeatLocksRequest{ - ProcessId: "bar", - }, - }, - res: &t_api.Response{ - Kind: t_api.HeartbeatLocks, - HeartbeatLocks: &t_api.HeartbeatLocksResponse{ - Status: t_api.StatusOK, - LocksAffected: 0, - }, - }, - status: 200, - }, - { - name: "HeartbeatLocks missing processId", - path: "locks/heartbeat", - method: "POST", - body: []byte(`{ - "processId": "", - "timeout": 1736571600000 - }`), - req: nil, - res: nil, - status: 400, - }, - { - name: "ReleaseLock", - path: "locks/release", - method: "POST", - body: []byte(`{ - "resourceId": "foo", - "executionId": "baz" - }`), - req: &t_api.Request{ - Kind: t_api.ReleaseLock, - ReleaseLock: &t_api.ReleaseLockRequest{ - ResourceId: "foo", - ExecutionId: "baz", - }, - }, - res: &t_api.Response{ - Kind: t_api.ReleaseLock, - ReleaseLock: &t_api.ReleaseLockResponse{ - Status: t_api.StatusNoContent, - }, - }, - status: 204, - }, - { - name: "ReleaseLock missing resourceId", - path: "locks/release", - method: "POST", - body: []byte(`{ - "resourceId": "", - "executionId": "baz" - }`), - req: nil, - res: nil, - status: 400, - }, + t.Run(ts.name, func(t *testing.T) { + for _, tc := range []struct { + name string + path string + method string + headers map[string]string + body []byte + req *t_api.Request + res *t_api.Response + status int + }{ + { + name: "ReadPromise", + path: "promises/foo", + method: "GET", + req: &t_api.Request{ + Kind: t_api.ReadPromise, + ReadPromise: &t_api.ReadPromiseRequest{ + Id: "foo", + }, + }, + res: &t_api.Response{ + Kind: t_api.ReadPromise, + ReadPromise: &t_api.ReadPromiseResponse{ + Status: t_api.StatusOK, + Promise: &promise.Promise{ + Id: "foo", + State: promise.Pending, + }, + }, + }, + status: 200, + }, + { + name: "ReadPromiseWithSlash", + path: "promises/foo/bar", + method: "GET", + req: &t_api.Request{ + Kind: t_api.ReadPromise, + ReadPromise: &t_api.ReadPromiseRequest{ + Id: "foo/bar", + }, + }, + res: &t_api.Response{ + Kind: t_api.ReadPromise, + ReadPromise: &t_api.ReadPromiseResponse{ + Status: t_api.StatusOK, + Promise: &promise.Promise{ + Id: "foo/bar", + State: promise.Pending, + }, + }, + }, + status: 200, + }, + { + name: "ReadPromiseNotFound", + path: "promises/bar", + method: "GET", + req: &t_api.Request{ + Kind: t_api.ReadPromise, + ReadPromise: &t_api.ReadPromiseRequest{ + Id: "bar", + }, + }, + res: &t_api.Response{ + Kind: t_api.ReadPromise, + ReadPromise: &t_api.ReadPromiseResponse{ + Status: t_api.StatusPromiseNotFound, + Promise: nil, + }, + }, + status: 404, + }, + { + name: "SearchPromises", + path: "promises?id=*&limit=10", + method: "GET", + req: &t_api.Request{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesRequest{ + Id: "*", + States: []promise.State{ + promise.Pending, + promise.Resolved, + promise.Rejected, + promise.Timedout, + promise.Canceled, + }, + Tags: map[string]string{}, + Limit: 10, + }, + }, + res: &t_api.Response{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesResponse{ + Status: t_api.StatusOK, + Cursor: nil, + Promises: []*promise.Promise{}, + }, + }, + status: 200, + }, + { + name: "SearchPromisesCursor", + path: "promises?cursor=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJOZXh0Ijp7ImlkIjoiKiIsInN0YXRlcyI6WyJQRU5ESU5HIl0sInRhZ3MiOnt9LCJsaW1pdCI6MTAsInNvcnRJZCI6MTAwfX0.XKusWO-Jl4v7QVIwh5Pn3oIElBvtpf0VPOLJkXPvQLk", + method: "GET", + req: &t_api.Request{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesRequest{ + Id: "*", + States: []promise.State{ + promise.Pending, + }, + Tags: map[string]string{}, + Limit: 10, + SortId: util.ToPointer(int64(100)), + }, + }, + res: &t_api.Response{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesResponse{ + Status: t_api.StatusOK, + Cursor: nil, // not checked + Promises: []*promise.Promise{}, + }, + }, + status: 200, + }, + { + name: "SearchPromisesPending", + path: "promises?id=*&state=pending&limit=10", + method: "GET", + req: &t_api.Request{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesRequest{ + Id: "*", + States: []promise.State{ + promise.Pending, + }, + Tags: map[string]string{}, + Limit: 10, + }, + }, + res: &t_api.Response{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesResponse{ + Status: t_api.StatusOK, + Cursor: nil, + Promises: []*promise.Promise{}, + }, + }, + status: 200, + }, + { + name: "SearchPromisesResolved", + path: "promises?id=*&state=resolved&limit=10", + method: "GET", + req: &t_api.Request{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesRequest{ + Id: "*", + States: []promise.State{ + promise.Resolved, + }, + Tags: map[string]string{}, + Limit: 10, + }, + }, + res: &t_api.Response{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesResponse{ + Status: t_api.StatusOK, + Cursor: nil, + Promises: []*promise.Promise{}, + }, + }, + status: 200, + }, + { + name: "SearchPromisesRejected", + path: "promises?id=*&state=rejected&limit=10", + method: "GET", + req: &t_api.Request{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesRequest{ + Id: "*", + States: []promise.State{ + promise.Rejected, + promise.Timedout, + promise.Canceled, + }, + Tags: map[string]string{}, + Limit: 10, + }, + }, + res: &t_api.Response{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesResponse{ + Status: t_api.StatusOK, + Cursor: nil, + Promises: []*promise.Promise{}, + }, + }, + status: 200, + }, + { + name: "SearchPromisesTags", + path: "promises?id=*&tags[resonate:invocation]=true&limit=10", + method: "GET", + req: &t_api.Request{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesRequest{ + Id: "*", + States: []promise.State{ + promise.Pending, + promise.Resolved, + promise.Rejected, + promise.Timedout, + promise.Canceled, + }, + Tags: map[string]string{ + "resonate:invocation": "true", + }, + Limit: 10, + }, + }, + res: &t_api.Response{ + Kind: t_api.SearchPromises, + SearchPromises: &t_api.SearchPromisesResponse{ + Status: t_api.StatusOK, + Cursor: nil, + Promises: []*promise.Promise{}, + }, + }, + status: 200, + }, + { + name: "SearchPromisesInvalidQuery", + path: "promises?id=", + method: "GET", + req: nil, + res: nil, + status: 400, + }, + { + name: "SearchPromisesInvalidLimit", + path: "promises?id=*&limit=0", + method: "GET", + req: nil, + res: nil, + status: 400, + }, + { + name: "SearchPromisesInvalidState", + path: "promises?id=*&state=x", + method: "GET", + req: nil, + res: nil, + status: 400, + }, + { + name: "SearchPromisesInvalidTags", + path: "promises?id=*&tags=x", + method: "GET", + req: nil, + res: nil, + status: 400, + }, + { + name: "CreatePromise", + path: "promises", + method: "POST", + headers: map[string]string{ + "Idempotency-Key": "bar", + "Strict": "true", + }, + body: []byte(`{ + "id": "foo/bar", + "param": { + "headers": {"a":"a","b":"b","c":"c"}, + "data": "cGVuZGluZw==" + }, + "timeout": 1 + }`), + req: &t_api.Request{ + Kind: t_api.CreatePromise, + CreatePromise: &t_api.CreatePromiseRequest{ + Id: "foo/bar", + IdempotencyKey: util.ToPointer(idempotency.Key("bar")), + Strict: true, + Param: promise.Value{ + Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, + Data: []byte("pending"), + }, + Timeout: 1, + }, + }, + res: &t_api.Response{ + Kind: t_api.CreatePromise, + CreatePromise: &t_api.CreatePromiseResponse{ + Status: t_api.StatusCreated, + Promise: &promise.Promise{ + Id: "foo/bar", + State: promise.Pending, + }, + }, + }, + status: 201, + }, + { + name: "CreatePromiseMinimal", + path: "promises", + method: "POST", + body: []byte(`{ + "id": "foo", + "timeout": 1 + }`), + req: &t_api.Request{ + Kind: t_api.CreatePromise, + CreatePromise: &t_api.CreatePromiseRequest{ + Id: "foo", + IdempotencyKey: nil, + Strict: false, + Param: promise.Value{ + Headers: nil, + Data: nil, + }, + Timeout: 1, + }, + }, + res: &t_api.Response{ + Kind: t_api.CreatePromise, + CreatePromise: &t_api.CreatePromiseResponse{ + Status: t_api.StatusCreated, + Promise: &promise.Promise{ + Id: "foo", + State: promise.Pending, + }, + }, + }, + status: 201, + }, + { + name: "CancelPromise", + path: "promises/foo/bar", + method: "PATCH", + headers: map[string]string{ + "Idempotency-Key": "bar", + "Strict": "true", + }, + body: []byte(`{ + "state": "REJECTED_CANCELED", + "value": { + "headers": {"a":"a","b":"b","c":"c"}, + "data": "Y2FuY2Vs" + } + }`), + req: &t_api.Request{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ + Id: "foo/bar", + IdempotencyKey: util.ToPointer(idempotency.Key("bar")), + Strict: true, + State: promise.Canceled, + Value: promise.Value{ + Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, + Data: []byte("cancel"), + }, + }, + }, + res: &t_api.Response{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ + Status: t_api.StatusCreated, + Promise: &promise.Promise{ + Id: "foo/bar", + State: promise.Canceled, + }, + }, + }, + status: 201, + }, + { + name: "CancelPromiseMinimal", + path: "promises/foo", + method: "PATCH", + body: []byte(`{ + "state": "REJECTED_CANCELED" + }`), + req: &t_api.Request{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ + Id: "foo", + IdempotencyKey: nil, + Strict: false, + State: promise.Canceled, + Value: promise.Value{ + Headers: nil, + Data: nil, + }, + }, + }, + res: &t_api.Response{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ + Status: t_api.StatusCreated, + Promise: &promise.Promise{ + Id: "foo", + State: promise.Canceled, + }, + }, + }, + status: 201, + }, + { + name: "ResolvePromise", + path: "promises/foo/bar", + method: "PATCH", + headers: map[string]string{ + "Idempotency-Key": "bar", + "Strict": "true", + }, + body: []byte(`{ + "state": "RESOLVED", + "value": { + "headers": {"a":"a","b":"b","c":"c"}, + "data": "cmVzb2x2ZQ==" + } + }`), + req: &t_api.Request{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ + Id: "foo/bar", + IdempotencyKey: util.ToPointer(idempotency.Key("bar")), + Strict: true, + State: promise.Resolved, + Value: promise.Value{ + Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, + Data: []byte("resolve"), + }, + }, + }, + res: &t_api.Response{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ + Status: t_api.StatusCreated, + Promise: &promise.Promise{ + Id: "foo/bar", + State: promise.Resolved, + }, + }, + }, + status: 201, + }, + { + name: "ResolvePromiseMinimal", + path: "promises/foo", + method: "PATCH", + body: []byte(`{ + "state": "RESOLVED" + }`), + req: &t_api.Request{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ + Id: "foo", + IdempotencyKey: nil, + Strict: false, + State: promise.Resolved, + Value: promise.Value{ + Headers: nil, + Data: nil, + }, + }, + }, + res: &t_api.Response{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ + Status: t_api.StatusCreated, + Promise: &promise.Promise{ + Id: "foo", + State: promise.Resolved, + }, + }, + }, + status: 201, + }, + { + name: "RejectPromise", + path: "promises/foo/bar", + method: "PATCH", + headers: map[string]string{ + "Idempotency-Key": "bar", + "Strict": "true", + }, + body: []byte(`{ + "state": "REJECTED", + "value": { + "headers": {"a":"a","b":"b","c":"c"}, + "data": "cmVqZWN0" + } + }`), + req: &t_api.Request{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ + Id: "foo/bar", + IdempotencyKey: util.ToPointer(idempotency.Key("bar")), + Strict: true, + State: promise.Rejected, + Value: promise.Value{ + Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, + Data: []byte("reject"), + }, + }, + }, + res: &t_api.Response{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ + Status: t_api.StatusCreated, + Promise: &promise.Promise{ + Id: "foo/bar", + State: promise.Rejected, + }, + }, + }, + status: 201, + }, + { + name: "RejectPromiseMinimal", + path: "promises/foo", + method: "PATCH", + body: []byte(`{ + "state": "REJECTED" + }`), + req: &t_api.Request{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseRequest{ + Id: "foo", + IdempotencyKey: nil, + Strict: false, + State: promise.Rejected, + Value: promise.Value{ + Headers: nil, + Data: nil, + }, + }, + }, + res: &t_api.Response{ + Kind: t_api.CompletePromise, + CompletePromise: &t_api.CompletePromiseResponse{ + Status: t_api.StatusCreated, + Promise: &promise.Promise{ + Id: "foo", + State: promise.Rejected, + }, + }, + }, + status: 201, + }, + { + name: "ReadSchedule", + path: "schedules/foo", + method: "GET", + req: &t_api.Request{ + Kind: t_api.ReadSchedule, + ReadSchedule: &t_api.ReadScheduleRequest{ + Id: "foo", + }, + }, + res: &t_api.Response{ + Kind: t_api.ReadSchedule, + ReadSchedule: &t_api.ReadScheduleResponse{ + Status: t_api.StatusOK, + Schedule: &schedule.Schedule{ + Id: "foo", + Description: "", + Cron: "* * * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseTimeout: 1000000, + }, + }, + }, + status: 200, + }, + { + name: "SearchSchedules", + path: "schedules?id=*&limit=10", + method: "GET", + req: &t_api.Request{ + Kind: t_api.SearchSchedules, + SearchSchedules: &t_api.SearchSchedulesRequest{ + Id: "*", + Tags: map[string]string{}, + Limit: 10, + }, + }, + res: &t_api.Response{ + Kind: t_api.SearchSchedules, + SearchSchedules: &t_api.SearchSchedulesResponse{ + Status: t_api.StatusOK, + Cursor: nil, + Schedules: []*schedule.Schedule{}, + }, + }, + status: 200, + }, + { + name: "SearchSchedulesTags", + path: "schedules?id=*&tags[foo]=bar&limit=10", + method: "GET", + req: &t_api.Request{ + Kind: t_api.SearchSchedules, + SearchSchedules: &t_api.SearchSchedulesRequest{ + Id: "*", + Tags: map[string]string{ + "foo": "bar", + }, + Limit: 10, + }, + }, + res: &t_api.Response{ + Kind: t_api.SearchSchedules, + SearchSchedules: &t_api.SearchSchedulesResponse{ + Status: t_api.StatusOK, + Cursor: nil, + Schedules: []*schedule.Schedule{}, + }, + }, + status: 200, + }, + { + name: "SearchSchedulesInvalidQuery", + path: "schedules?id=", + method: "GET", + req: nil, + res: nil, + status: 400, + }, + { + name: "SearchSchedulesInvalidLimit", + path: "schedules?id=*&limit=0", + method: "GET", + req: nil, + res: nil, + status: 400, + }, + { + name: "SearchSchedulesInvalidTags", + path: "schedules?id=*&tags=x", + method: "GET", + req: nil, + res: nil, + status: 400, + }, + { + name: "CreateSchedule", + path: "schedules", + method: "POST", + headers: map[string]string{ + "Idempotency-Key": "bar", + }, + body: []byte(`{ + "id": "foo", + "desc": "", + "cron": "* * * * * *", + "promiseId": "foo.{{.timestamp}}", + "promiseTimeout": 1000000 + }`), + req: &t_api.Request{ + Kind: t_api.CreateSchedule, + CreateSchedule: &t_api.CreateScheduleRequest{ + Id: "foo", + IdempotencyKey: util.ToPointer(idempotency.Key("bar")), + Cron: "* * * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseTimeout: 1000000, + }, + }, + res: &t_api.Response{ + Kind: t_api.CreateSchedule, + CreateSchedule: &t_api.CreateScheduleResponse{ + Status: t_api.StatusCreated, + Schedule: &schedule.Schedule{ + Id: "foo", + Description: "", + Cron: "* * * * * *", + PromiseId: "foo.{{.timestamp}}", + PromiseTimeout: 1000000, + }, + }, + }, + status: 201, + }, + { + name: "DeleteSchedule", + path: "schedules/foo", + method: "DELETE", + req: &t_api.Request{ + Kind: t_api.DeleteSchedule, + DeleteSchedule: &t_api.DeleteScheduleRequest{ + Id: "foo", + }, + }, + res: &t_api.Response{ + Kind: t_api.DeleteSchedule, + DeleteSchedule: &t_api.DeleteScheduleResponse{ + Status: t_api.StatusNoContent, + }, + }, + status: 204, + }, - // Tasks API - { - name: "ClaimTask", - path: "tasks/claim", - method: "POST", - body: []byte(`{ - "taskId": "foo", - "counter": 1, - "processId": "bar", - "executionId": "baz", - "expiryInMilliseconds": 10000 - }`), - req: &t_api.Request{ - Kind: t_api.ClaimTask, - ClaimTask: &t_api.ClaimTaskRequest{ - TaskId: "foo", - Counter: 1, - ProcessId: "bar", - ExecutionId: "baz", - ExpiryInMilliseconds: 10_000, - }, - }, - res: &t_api.Response{ - Kind: t_api.ClaimTask, - ClaimTask: &t_api.ClaimTaskResponse{ - Status: t_api.StatusOK, - Promise: &promise.Promise{ - Id: "foo/bar", - State: promise.Pending, - }, - }, - }, - status: 200, - }, - { - name: "CompleteTask", - path: "tasks/complete", - method: "POST", - body: []byte(`{ - "taskId": "foo", - "counter": 1, - "executionId": "baz", - "state": "RESOLVED", - "value": { - "headers": {"a":"a","b":"b","c":"c"}, - "data": "cGVuZGluZw==" - } - }`), - req: &t_api.Request{ - Kind: t_api.CompleteTask, - CompleteTask: &t_api.CompleteTaskRequest{ - TaskId: "foo", - Counter: 1, - ExecutionId: "baz", - State: promise.Resolved, - Value: promise.Value{ - Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, - Data: []byte("pending"), - }, - }, - }, - res: &t_api.Response{ - Kind: t_api.CompleteTask, - CompleteTask: &t_api.CompleteTaskResponse{ - Status: t_api.StatusOK, - }, - }, - status: 200, - }, - } { - t.Run(tc.name, func(t *testing.T) { - httpTest.Load(t, tc.req, tc.res) + // Distributed Locks API + { + name: "AcquireLock", + path: "locks/acquire", + method: "POST", + body: []byte(`{ + "resourceId": "foo", + "processId": "bar", + "executionId": "baz", + "expiryInMilliseconds": 10000 + }`), + req: &t_api.Request{ + Kind: t_api.AcquireLock, + AcquireLock: &t_api.AcquireLockRequest{ + ResourceId: "foo", + ProcessId: "bar", + ExecutionId: "baz", + ExpiryInMilliseconds: 10_000, + }, + }, + res: &t_api.Response{ + Kind: t_api.AcquireLock, + AcquireLock: &t_api.AcquireLockResponse{ + Status: t_api.StatusCreated, + Lock: &lock.Lock{ + ResourceId: "foo", + ProcessId: "bar", + ExecutionId: "baz", + ExpiryInMilliseconds: 10_000, + }, + }, + }, + status: 201, + }, + { + name: "AcquireLock missing executionIdc", + path: "locks/acquire", + method: "POST", + body: []byte(`{ + "resourceId": "foo", + "processId": "bar", + "executionId": "", + "expiryInMilliseconds": 10000 + }`), + req: nil, + res: nil, + status: 400, + }, + { + name: "HeartbeatLocks", + path: "locks/heartbeat", + method: "POST", + body: []byte(`{ + "processId": "bar" + }`), + req: &t_api.Request{ + Kind: t_api.HeartbeatLocks, + HeartbeatLocks: &t_api.HeartbeatLocksRequest{ + ProcessId: "bar", + }, + }, + res: &t_api.Response{ + Kind: t_api.HeartbeatLocks, + HeartbeatLocks: &t_api.HeartbeatLocksResponse{ + Status: t_api.StatusOK, + LocksAffected: 0, + }, + }, + status: 200, + }, + { + name: "HeartbeatLocks missing processId", + path: "locks/heartbeat", + method: "POST", + body: []byte(`{ + "processId": "", + "timeout": 1736571600000 + }`), + req: nil, + res: nil, + status: 400, + }, + { + name: "ReleaseLock", + path: "locks/release", + method: "POST", + body: []byte(`{ + "resourceId": "foo", + "executionId": "baz" + }`), + req: &t_api.Request{ + Kind: t_api.ReleaseLock, + ReleaseLock: &t_api.ReleaseLockRequest{ + ResourceId: "foo", + ExecutionId: "baz", + }, + }, + res: &t_api.Response{ + Kind: t_api.ReleaseLock, + ReleaseLock: &t_api.ReleaseLockResponse{ + Status: t_api.StatusNoContent, + }, + }, + status: 204, + }, + { + name: "ReleaseLock missing resourceId", + path: "locks/release", + method: "POST", + body: []byte(`{ + "resourceId": "", + "executionId": "baz" + }`), + req: nil, + res: nil, + status: 400, + }, - req, err := http.NewRequest(tc.method, fmt.Sprintf("http://127.0.0.1:8888/%s", tc.path), bytes.NewBuffer(tc.body)) - if err != nil { - t.Fatal(err) - } + // Tasks API + { + name: "ClaimTask", + path: "tasks/claim", + method: "POST", + body: []byte(`{ + "taskId": "foo", + "counter": 1, + "processId": "bar", + "executionId": "baz", + "expiryInMilliseconds": 10000 + }`), + req: &t_api.Request{ + Kind: t_api.ClaimTask, + ClaimTask: &t_api.ClaimTaskRequest{ + TaskId: "foo", + Counter: 1, + ProcessId: "bar", + ExecutionId: "baz", + ExpiryInMilliseconds: 10_000, + }, + }, + res: &t_api.Response{ + Kind: t_api.ClaimTask, + ClaimTask: &t_api.ClaimTaskResponse{ + Status: t_api.StatusOK, + Promise: &promise.Promise{ + Id: "foo/bar", + State: promise.Pending, + }, + }, + }, + status: 200, + }, + { + name: "CompleteTask", + path: "tasks/complete", + method: "POST", + body: []byte(`{ + "taskId": "foo", + "counter": 1, + "executionId": "baz", + "state": "RESOLVED", + "value": { + "headers": {"a":"a","b":"b","c":"c"}, + "data": "cGVuZGluZw==" + } + }`), + req: &t_api.Request{ + Kind: t_api.CompleteTask, + CompleteTask: &t_api.CompleteTaskRequest{ + TaskId: "foo", + Counter: 1, + ExecutionId: "baz", + State: promise.Resolved, + Value: promise.Value{ + Headers: map[string]string{"a": "a", "b": "b", "c": "c"}, + Data: []byte("pending"), + }, + }, + }, + res: &t_api.Response{ + Kind: t_api.CompleteTask, + CompleteTask: &t_api.CompleteTaskResponse{ + Status: t_api.StatusOK, + }, + }, + status: 200, + }, + } { + t.Run(tc.name, func(t *testing.T) { + httpTest.Load(t, tc.req, tc.res) - // set headers - req.Header.Set("Content-Type", "application/json") - for key, val := range tc.headers { - req.Header.Set(key, val) - } + req, err := http.NewRequest(tc.method, fmt.Sprintf("http://127.0.0.1:8888/%s", tc.path), bytes.NewBuffer(tc.body)) + if err != nil { + t.Fatal(err) + } - res, err := httpTest.client.Do(req) - if err != nil { - t.Fatal(err) - } - defer res.Body.Close() + // set headers + req.Header.Set("Content-Type", "application/json") + for key, val := range tc.headers { + req.Header.Set(key, val) + } - body, err := io.ReadAll(res.Body) - if err != nil { - t.Fatal(err) - } + // set authorization + if ts.auth != nil { + req.SetBasicAuth(ts.reqUsername, ts.reqPassword) + } + + res, err := httpTest.client.Do(req) + if err != nil { + t.Fatal(err) + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + t.Fatal(err) + } - assert.Equal(t, tc.status, res.StatusCode, string(body)) + // apply override status if applicable + status := tc.status + if ts.statusOveride != 0 { + status = ts.statusOveride + } - // TODO: assert body + assert.Equal(t, status, res.StatusCode, string(body)) - select { - case err := <-httpTest.errors: - t.Fatal(err) - default: + // TODO: assert body + + select { + case err := <-httpTest.errors: + t.Fatal(err) + default: + } + }) } }) - } - // stop the server - if err := httpTest.teardown(); err != nil { - t.Fatal(err) + // stop the server + if err := httpTest.teardown(); err != nil { + t.Fatal(err) + } } } diff --git a/pkg/client/client.go b/pkg/client/client.go index 4828a969..5650f94d 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -1,6 +1,10 @@ package client import ( + "context" + "encoding/base64" + "net/http" + "github.com/resonatehq/resonate/pkg/client/promises" "github.com/resonatehq/resonate/pkg/client/schedules" ) @@ -8,21 +12,33 @@ import ( type ResonateClient interface { PromisesV1Alpha1() promises.ClientWithResponsesInterface SchedulesV1Alpha1() schedules.ClientWithResponsesInterface + SetBasicAuth(username, password string) } type ClientSet struct { Server *string + auth *auth + promisesV1alpha1 promises.ClientWithResponsesInterface schedulesV1alpha1 schedules.ClientWithResponsesInterface } +type auth struct { + username string + password string +} + func NewOrDie(server *string) ResonateClient { return &ClientSet{ Server: server, } } +func (c *ClientSet) SetBasicAuth(username, password string) { + c.auth = &auth{username, password} +} + func (c *ClientSet) SetPromisesV1Alpha1(client promises.ClientWithResponsesInterface) { c.promisesV1alpha1 = client } @@ -36,11 +52,22 @@ func (c *ClientSet) PromisesV1Alpha1() promises.ClientWithResponsesInterface { return c.promisesV1alpha1 } + opts := []promises.ClientOption{} + + // set basic auth if provided + if c.auth != nil { + opts = append(opts, func(client *promises.Client) error { + client.RequestEditors = append(client.RequestEditors, c.basicAuthRequestEditor()) + return nil + }) + } + var err error - c.promisesV1alpha1, err = promises.NewClientWithResponses(*c.Server) + c.promisesV1alpha1, err = promises.NewClientWithResponses(*c.Server, opts...) if err != nil { panic(err) } + return c.promisesV1alpha1 } @@ -49,10 +76,29 @@ func (c *ClientSet) SchedulesV1Alpha1() schedules.ClientWithResponsesInterface { return c.schedulesV1alpha1 } + opts := []schedules.ClientOption{} + + // set basic auth if provided + if c.auth != nil { + opts = append(opts, func(client *schedules.Client) error { + client.RequestEditors = append(client.RequestEditors, c.basicAuthRequestEditor()) + return nil + }) + } + var err error - c.schedulesV1alpha1, err = schedules.NewClientWithResponses(*c.Server) + c.schedulesV1alpha1, err = schedules.NewClientWithResponses(*c.Server, opts...) if err != nil { panic(err) } return c.schedulesV1alpha1 } + +func (c *ClientSet) basicAuthRequestEditor() func(ctx context.Context, req *http.Request) error { + return func(ctx context.Context, req *http.Request) error { + authHeader := "Basic " + base64.StdEncoding.EncodeToString([]byte(c.auth.username+":"+c.auth.password)) + req.Header.Set("Authorization", authHeader) + + return nil + } +}