From 7c9cb178749f9daa7b23379205abeb265e8fd53c Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Fri, 12 Apr 2024 13:56:50 -0400 Subject: [PATCH 1/2] test(unit): add more unit test coverage --- .github/workflows/cicd.yaml | 1 + cmd/promises/completes_test.go | 130 ++++++++++++++++++ codecov.yml | 2 + .../aio/queuing/connections/connections.go | 41 +++++- .../queuing/connections/connections_test.go | 89 ++++++++++++ .../aio/queuing/connections/http/http.go | 50 +++++-- .../aio/queuing/connections/http/http_test.go | 120 ++++++++++++++++ .../queuing/connections/t_conn/connections.go | 4 +- .../connections/t_conn/connections_test.go | 113 +++++++++++++++ .../aio/queuing/metadata/metadata_test.go | 51 +++++++ .../app/subsystems/aio/queuing/queuing.go | 7 +- .../subsystems/aio/queuing/queuing_test.go | 122 ++++++++++++++++ .../aio/queuing/routes/pattern/pattern.go | 31 +++-- .../queuing/routes/pattern/pattern_test.go | 43 ++++-- .../subsystems/aio/queuing/routes/routes.go | 26 ++-- .../aio/queuing/routes/routes_test.go | 30 ++++ .../app/subsystems/aio/queuing/worker_test.go | 115 ++++++++++++++++ 17 files changed, 929 insertions(+), 46 deletions(-) create mode 100644 cmd/promises/completes_test.go create mode 100644 internal/app/subsystems/aio/queuing/connections/connections_test.go create mode 100644 internal/app/subsystems/aio/queuing/connections/http/http_test.go create mode 100644 internal/app/subsystems/aio/queuing/connections/t_conn/connections_test.go create mode 100644 internal/app/subsystems/aio/queuing/metadata/metadata_test.go create mode 100644 internal/app/subsystems/aio/queuing/queuing_test.go create mode 100644 internal/app/subsystems/aio/queuing/worker_test.go diff --git a/.github/workflows/cicd.yaml b/.github/workflows/cicd.yaml index 939ec7ad..c48d0f05 100644 --- a/.github/workflows/cicd.yaml +++ b/.github/workflows/cicd.yaml @@ -61,6 +61,7 @@ jobs: uses: codecov/codecov-action@v4 # nosemgrep with: token: ${{ secrets.CODECOV_TOKEN }} + codecov_yml_path: codecov.yml - name: Build resonate run: go build -o resonate diff --git a/cmd/promises/completes_test.go b/cmd/promises/completes_test.go new file mode 100644 index 00000000..00b88f97 --- /dev/null +++ b/cmd/promises/completes_test.go @@ -0,0 +1,130 @@ +package promises + +import ( + "bytes" + "io" + "net/http" + "strings" + "testing" + + "github.com/golang/mock/gomock" + "github.com/resonatehq/resonate/pkg/client" + "github.com/resonatehq/resonate/pkg/client/promises" + "github.com/spf13/cobra" + "github.com/stretchr/testify/assert" +) + +var ( + patchPromiseResponse = &promises.PatchPromisesIdResponse{ + HTTPResponse: &http.Response{ + StatusCode: 201, + Body: io.NopCloser(strings.NewReader("")), + }, + } +) + +func TestCompletePromiseCmd(t *testing.T) { + // Set Gomock controller + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // Set test cases + tcs := []struct { + name string + mockPromiseClient promises.ClientWithResponsesInterface + args []string + wantStdout string + wantStderr string + }{ + { + name: "resolve a promise", + mockPromiseClient: func() *promises.MockClientWithResponsesInterface { + mock := promises.NewMockClientWithResponsesInterface(ctrl) + mock. + EXPECT(). + PatchPromisesIdWithResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(patchPromiseResponse, nil). + Times(1) + return mock + }(), + args: []string{"resolve", "foo", "--data", `{"foo": "bar"}`, "--header", "foo=bar"}, + wantStdout: "Resolved promise: foo\n", + }, + { + name: "reject a promise", + mockPromiseClient: func() *promises.MockClientWithResponsesInterface { + mock := promises.NewMockClientWithResponsesInterface(ctrl) + mock. + EXPECT(). + PatchPromisesIdWithResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(patchPromiseResponse, nil). + Times(1) + return mock + }(), + args: []string{"reject", "bar", "--data", `{"foo": "bar"}`}, + wantStdout: "Rejected promise: bar\n", + }, + { + name: "cancel a promise", + mockPromiseClient: func() *promises.MockClientWithResponsesInterface { + mock := promises.NewMockClientWithResponsesInterface(ctrl) + mock. + EXPECT(). + PatchPromisesIdWithResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(patchPromiseResponse, nil). + Times(1) + return mock + }(), + args: []string{"cancel", "baz"}, + wantStdout: "Canceled promise: baz\n", + }, + { + name: "Missing ID arg", + mockPromiseClient: func() *promises.MockClientWithResponsesInterface { + mock := promises.NewMockClientWithResponsesInterface(ctrl) + return mock + }(), + args: []string{"resolve"}, + wantStderr: "Must specify promise id\n", + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + // Create buffer writer + stdout := &bytes.Buffer{} + stderr := &bytes.Buffer{} + + // Wire up client set + clientSet := &client.ClientSet{} + clientSet.SetPromisesV1Alpha1(tc.mockPromiseClient) + + // Create commands in test + cmds := CompletePromiseCmds(clientSet) + + // Find the appropriate command based on the first argument + var cmd *cobra.Command + for _, c := range cmds { + if c.Name() == tc.args[0] { + cmd = c + break + } + } + + // Set streams for command + cmd.SetOut(stdout) + cmd.SetErr(stderr) + + // Set args for command + cmd.SetArgs(tc.args[1:]) + + // Execute command + if err := cmd.Execute(); err != nil { + t.Fatalf("Received unexpected error: %v", err) + } + + assert.Equal(t, tc.wantStdout, stdout.String()) + assert.Equal(t, tc.wantStderr, stderr.String()) + }) + } +} diff --git a/codecov.yml b/codecov.yml index c809ca3e..b717a750 100644 --- a/codecov.yml +++ b/codecov.yml @@ -1,3 +1,5 @@ ignore: + - "cmd/*" - "pkg/client" - "*.pb.go" + - "test/*" diff --git a/internal/app/subsystems/aio/queuing/connections/connections.go b/internal/app/subsystems/aio/queuing/connections/connections.go index 8f5bc017..222dcf8f 100644 --- a/internal/app/subsystems/aio/queuing/connections/connections.go +++ b/internal/app/subsystems/aio/queuing/connections/connections.go @@ -1,23 +1,47 @@ package connections import ( + "errors" "fmt" + "net/http" http_conn "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/connections/http" "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/connections/t_conn" + "github.com/resonatehq/resonate/internal/util" +) + +var ( + ErrMissingConnectionConfig = errors.New("connection config is nil") + ErrMissingFieldName = errors.New("missing field 'name'") + ErrMissingFieldKind = errors.New("missing field 'kind'") + ErrMissingMetadata = errors.New("missing field `metadata`") + ErrMissingMetadataProperties = errors.New("missing field `metadata.properties`") + ErrInvalidConnectionKind = errors.New("invalid connection kind") ) func NewConnection(tasks <-chan *t_conn.ConnectionSubmission, cfg *t_conn.ConnectionConfig) (t_conn.Connection, error) { - // Validate all required fields are present. + // Validate all common required fields are present. if cfg == nil { - return nil, fmt.Errorf("connection config is empty") + return nil, ErrMissingConnectionConfig } if cfg.Name == "" { - return nil, fmt.Errorf("field 'name' is empty for connection '%s'", cfg.Name) + return nil, ErrMissingFieldName } if cfg.Kind == "" { - return nil, fmt.Errorf("field 'kind' is empty for connection '%s'", cfg.Name) + return nil, fmt.Errorf("validation error for connection '%s': %w", cfg.Name, ErrMissingFieldKind) + } + if cfg.Metadata == nil { + return nil, fmt.Errorf("validation error for connection '%s': %w", cfg.Name, ErrMissingMetadata) } + if cfg.Metadata.Properties == nil { + return nil, fmt.Errorf("validation error for connection '%s': %w", cfg.Name, ErrMissingMetadataProperties) + } + + util.Assert(cfg != nil, "config must not be nil") + util.Assert(cfg.Name != "", "name must not be empty") + util.Assert(cfg.Kind != "", "kind must not be empty") + util.Assert(cfg.Metadata != nil, "metadata must not be nil") + util.Assert(cfg.Metadata.Properties != nil, "metadata properties must not be nil") var ( conn t_conn.Connection @@ -26,15 +50,18 @@ func NewConnection(tasks <-chan *t_conn.ConnectionSubmission, cfg *t_conn.Connec switch cfg.Kind { case t_conn.HTTP: - conn = http_conn.New() - err = conn.Init(tasks, cfg.Metadata) + param := &http.Client{} + conn = http_conn.New(param) + err = conn.Init(tasks, cfg) default: - return nil, fmt.Errorf("invalid queuing kind: %s", cfg.Kind) + return nil, fmt.Errorf("validation error for connection '%s': %w", cfg.Name, ErrInvalidConnectionKind) } if err != nil { return nil, err } + util.Assert(conn != nil, "connection must not be nil") + return conn, nil } diff --git a/internal/app/subsystems/aio/queuing/connections/connections_test.go b/internal/app/subsystems/aio/queuing/connections/connections_test.go new file mode 100644 index 00000000..515fa24c --- /dev/null +++ b/internal/app/subsystems/aio/queuing/connections/connections_test.go @@ -0,0 +1,89 @@ +package connections + +import ( + "errors" + "testing" + + "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/connections/t_conn" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/metadata" +) + +func TestNewConnection(t *testing.T) { + testCases := []struct { + name string + config *t_conn.ConnectionConfig + expectedError error + }{ + { + name: "nil config", + config: nil, + expectedError: ErrMissingConnectionConfig, + }, + { + name: "empty name", + config: &t_conn.ConnectionConfig{}, + expectedError: ErrMissingFieldName, + }, + { + name: "empty kind", + config: &t_conn.ConnectionConfig{Name: "test"}, + expectedError: ErrMissingFieldKind, + }, + { + name: "nil metadata", + config: &t_conn.ConnectionConfig{Name: "test", Kind: t_conn.HTTP}, + expectedError: ErrMissingMetadata, + }, + { + name: "nil metadata properties", + config: &t_conn.ConnectionConfig{ + Name: "test", + Kind: t_conn.HTTP, + Metadata: &metadata.Metadata{}, + }, + expectedError: ErrMissingMetadataProperties, + }, + { + name: "invalid connection kind", + config: &t_conn.ConnectionConfig{ + Name: "test", + Kind: "invalid", + Metadata: &metadata.Metadata{ + Properties: map[string]interface{}{}, + }, + }, + expectedError: ErrInvalidConnectionKind, + }, + { + name: "valid config", + config: &t_conn.ConnectionConfig{ + Name: "test", + Kind: t_conn.HTTP, + Metadata: &metadata.Metadata{ + Properties: map[string]interface{}{ + "url": "http://example.com", + }, + }, + }, + expectedError: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tasks := make(chan *t_conn.ConnectionSubmission) + _, err := NewConnection(tasks, tc.config) + if tc.expectedError != nil { + if err == nil { + t.Errorf("expected error: %s, got nil", tc.expectedError) + } else if !errors.Is(err, tc.expectedError) { + t.Errorf("expected error: %s, got: %s", tc.expectedError, err.Error()) + } + } else { + if err != nil { + t.Errorf("unexpected error: %s", err.Error()) + } + } + }) + } +} diff --git a/internal/app/subsystems/aio/queuing/connections/http/http.go b/internal/app/subsystems/aio/queuing/connections/http/http.go index ef98809c..8caa26cc 100644 --- a/internal/app/subsystems/aio/queuing/connections/http/http.go +++ b/internal/app/subsystems/aio/queuing/connections/http/http.go @@ -8,15 +8,24 @@ import ( "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/connections/t_conn" "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/metadata" + "github.com/resonatehq/resonate/internal/util" +) + +var ( + ErrMissingURL = fmt.Errorf("missing field 'url'") ) type ( // HTTP is a connection to an HTTP endpoint. It implements the Connection interface and // is the only connection type that does not require a queue. HTTP struct { - client *http.Client + client HTTPClient tasks <-chan *t_conn.ConnectionSubmission - meta Metadata + meta *Metadata + } + + HTTPClient interface { + Do(req *http.Request) (*http.Response, error) } Metadata struct { @@ -36,20 +45,36 @@ type ( } ) -func New() t_conn.Connection { - return &HTTP{} +// New creates a new connection with the type specific client. +func New(c HTTPClient) t_conn.Connection { + return &HTTP{ + client: c, + meta: &Metadata{}, + } } -func (c *HTTP) Init(tasks <-chan *t_conn.ConnectionSubmission, meta *metadata.Metadata) error { - c.client = &http.Client{} +// Init initializes the connection with the generic & type specific connection configuration. +func (c *HTTP) Init(tasks <-chan *t_conn.ConnectionSubmission, cfg *t_conn.ConnectionConfig) error { + util.Assert(c.client != nil, "client must not be nil") + util.Assert(c.meta != nil, "meta must not be nil") + util.Assert(tasks != nil, "tasks must not be nil") + util.Assert(cfg != nil, "config must not be nil") + util.Assert(cfg.Metadata != nil, "metadata must not be nil") + util.Assert(cfg.Metadata.Properties != nil, "metadata properties must not be nil") + c.tasks = tasks - md := Metadata{} - if err := metadata.Decode(meta.Properties, &md); err != nil { + if err := metadata.Decode(cfg.Metadata.Properties, c.meta); err != nil { return err } - c.meta = md + if c.meta.URL == "" { + return fmt.Errorf("validation error for connection '%s': %w", cfg.Name, ErrMissingURL) + } + + util.Assert(c.client != nil, "client must not be nil") + util.Assert(c.tasks != nil, "tasks must not be nil") + util.Assert(c.meta.URL != "", "url must not be empty") return nil } @@ -59,6 +84,13 @@ func (c *HTTP) Task() <-chan *t_conn.ConnectionSubmission { } func (c *HTTP) Execute(sub *t_conn.ConnectionSubmission) error { + util.Assert(sub != nil, "submission must not be nil") + util.Assert(sub.Queue != "", "queue must not be empty") + util.Assert(sub.TaskId != "", "task id must not be empty") + util.Assert(sub.Counter >= 0, "counter must be greater than or equal to 0") + util.Assert(sub.Links.Claim != "", "claim link must not be empty") + util.Assert(sub.Links.Complete != "", "complete link must not be empty") + // Form payload. payload := Payload{ Queue: sub.Queue, diff --git a/internal/app/subsystems/aio/queuing/connections/http/http_test.go b/internal/app/subsystems/aio/queuing/connections/http/http_test.go new file mode 100644 index 00000000..c8067f67 --- /dev/null +++ b/internal/app/subsystems/aio/queuing/connections/http/http_test.go @@ -0,0 +1,120 @@ +package http + +import ( + "bytes" + "errors" + "fmt" + "io" + "net/http" + "testing" + + "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/connections/t_conn" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/metadata" +) + +type ( + MockHTTPClient struct { + HTTPClient + + expectedURL string + expectedBody string + mockResponse *http.Response + } +) + +func (m *MockHTTPClient) Do(req *http.Request) (*http.Response, error) { + if req.Header.Get("Content-Type") != "application/json" { + return nil, fmt.Errorf("expected application/json, got %s", req.Header.Get("Content-Type")) + } + + if req.URL.String() != m.expectedURL { + return nil, fmt.Errorf("expected %s, got %s", m.expectedURL, req.URL.String()) + } + + bs, err := io.ReadAll(req.Body) + if err != nil { + return nil, err + } + + if string(bs) != m.expectedBody { + return nil, fmt.Errorf("expected %s, got %s", m.expectedBody, string(bs)) + } + + return m.mockResponse, nil +} + +func TestHTTPConnection(t *testing.T) { + tcs := []struct { + name string + client HTTPClient + tasks chan *t_conn.ConnectionSubmission + config *t_conn.ConnectionConfig + submission *t_conn.ConnectionSubmission + str string + expectedErr error + }{ + { + name: "valid", + client: &MockHTTPClient{ + expectedURL: "http://example.com", + expectedBody: `{"queue":"queue","taskId":"taskid","counter":1,"links":{"claim":"http://example.com/task/claim","complete":"http://example.com/task/complete"}}`, + mockResponse: &http.Response{StatusCode: http.StatusOK, Body: io.NopCloser(bytes.NewBufferString("ok"))}, + }, + tasks: make(chan *t_conn.ConnectionSubmission), + config: &t_conn.ConnectionConfig{ + Kind: t_conn.HTTP, + Name: "http", + Metadata: &metadata.Metadata{Properties: map[string]interface{}{"url": "http://example.com"}}, + }, + submission: &t_conn.ConnectionSubmission{Queue: "queue", TaskId: "taskid", Counter: 1, Links: t_conn.Links{Claim: "http://example.com/task/claim", Complete: "http://example.com/task/complete"}}, + str: "http::http://example.com", + }, + { + name: "invalid-missing-url", + client: &MockHTTPClient{}, + tasks: make(chan *t_conn.ConnectionSubmission), + config: &t_conn.ConnectionConfig{ + Kind: t_conn.HTTP, + Name: "http", + Metadata: &metadata.Metadata{Properties: map[string]interface{}{"random": "http://example.com"}}, + }, + expectedErr: ErrMissingURL, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + conn := New(tc.client) + + if err := conn.Init(tc.tasks, tc.config); err != nil { + if tc.expectedErr == nil { + t.Fatalf("expected nil, got %v", err) + } + + if !errors.Is(err, tc.expectedErr) { + t.Fatalf("expected %v, got %v", tc.expectedErr, err) + } + + return + } + + if conn.Task() != tc.tasks { + t.Fatalf("expected %v, got %v", tc.tasks, conn.Task()) + } + + if err := conn.Execute(tc.submission); err != nil { + if tc.expectedErr == nil { + t.Fatalf("expected nil, got %v", err) + } + + if !errors.Is(err, tc.expectedErr) { + t.Fatalf("expected %v, got %v", tc.expectedErr, err) + } + } + + if conn.String() != tc.str { + t.Fatalf("expected %s, got %s", tc.str, conn.String()) + } + }) + } +} diff --git a/internal/app/subsystems/aio/queuing/connections/t_conn/connections.go b/internal/app/subsystems/aio/queuing/connections/t_conn/connections.go index 4003d2f3..27f9f54a 100644 --- a/internal/app/subsystems/aio/queuing/connections/t_conn/connections.go +++ b/internal/app/subsystems/aio/queuing/connections/t_conn/connections.go @@ -47,8 +47,8 @@ type ( } Connection interface { - // Init initializes the connection with the given tasks channel and metadata. - Init(tasks <-chan *ConnectionSubmission, meta *metadata.Metadata) error + // Init initializes the connection with the given tasks channel and configuration. + Init(tasks <-chan *ConnectionSubmission, cfg *ConnectionConfig) error // Tasks returns the tasks channel. Task() <-chan *ConnectionSubmission diff --git a/internal/app/subsystems/aio/queuing/connections/t_conn/connections_test.go b/internal/app/subsystems/aio/queuing/connections/t_conn/connections_test.go new file mode 100644 index 00000000..7b1b0e45 --- /dev/null +++ b/internal/app/subsystems/aio/queuing/connections/t_conn/connections_test.go @@ -0,0 +1,113 @@ +package t_conn + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/metadata" +) + +var ( + mockExecuteError = errors.New("execute error") +) + +type ( + mockConnection struct { + initCalled bool + taskCh <-chan *ConnectionSubmission + executeCalled bool + expectExecuteError bool + executeError error + } +) + +func (m *mockConnection) Init(tasks <-chan *ConnectionSubmission, cfg *ConnectionConfig) error { + m.initCalled = true + m.taskCh = tasks + return nil +} + +func (m *mockConnection) Task() <-chan *ConnectionSubmission { + return m.taskCh +} + +func (m *mockConnection) Execute(sub *ConnectionSubmission) error { + m.executeCalled = true + + if m.expectExecuteError { + m.executeError = mockExecuteError + return m.executeError + } + + return nil +} + +func (m *mockConnection) String() string { + return "mock" +} + +func TestConnectionEventLoop(t *testing.T) { + testCases := []struct { + name string + connection *mockConnection + tasks []*ConnectionSubmission + expectedCalls int + expectExecuteError bool + expectedError error + }{ + { + name: "Success", + connection: &mockConnection{}, + tasks: []*ConnectionSubmission{ + {TaskId: "task1"}, + {TaskId: "task2"}, + }, + expectedCalls: 2, + expectedError: nil, + }, + { + name: "ExecuteError", + connection: &mockConnection{expectExecuteError: true, executeError: errors.New("execute error")}, + tasks: []*ConnectionSubmission{ + {TaskId: "task1"}, + }, + expectedCalls: 1, + expectExecuteError: true, + expectedError: mockExecuteError, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + taskCh := make(chan *ConnectionSubmission, len(tc.tasks)) + for _, task := range tc.tasks { + taskCh <- task + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() // simulate a cancelled context from queuing subsystem. + + tc.connection.Init(taskCh, &ConnectionConfig{ + Kind: HTTP, + Name: "test", + Metadata: &metadata.Metadata{}, + }) + + Start(ctx, tc.connection) + + if !tc.connection.initCalled { + t.Error("Expected Init to be called") + } + + if tc.connection.executeCalled != (tc.expectedCalls > 0) { + t.Errorf("Expected Execute to be called: %v, but got: %v", tc.expectedCalls > 0, tc.connection.executeCalled) + } + + if tc.expectedError != nil && !errors.Is(tc.connection.executeError, tc.expectedError) { + t.Errorf("Expected execute error: %v, but got: %v", tc.expectedError, tc.connection.executeError) + } + }) + } +} diff --git a/internal/app/subsystems/aio/queuing/metadata/metadata_test.go b/internal/app/subsystems/aio/queuing/metadata/metadata_test.go new file mode 100644 index 00000000..964533b4 --- /dev/null +++ b/internal/app/subsystems/aio/queuing/metadata/metadata_test.go @@ -0,0 +1,51 @@ +package metadata + +import "testing" + +type MockMetadataImpl struct { + Name string `mapstructure:"name"` + Value int `mapstructure:"value"` +} + +func TestMetadata(t *testing.T) { + tcs := []struct { + name string + metadata Metadata + expected MockMetadataImpl + }{ + { + name: "Test decoding metadata", + metadata: Metadata{ + Properties: map[string]interface{}{ + "name": "Test", + "value": 42, + }, + }, + expected: MockMetadataImpl{ + Name: "Test", + Value: 42, + }, + }, + { + name: "Test decoding empty metadata", + metadata: Metadata{ + Properties: map[string]interface{}{}, + }, + expected: MockMetadataImpl{}, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + var result MockMetadataImpl + err := Decode(tc.metadata.Properties, &result) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + if result != tc.expected { + t.Errorf("Expected %+v, but got %+v", tc.expected, result) + } + }) + } +} diff --git a/internal/app/subsystems/aio/queuing/queuing.go b/internal/app/subsystems/aio/queuing/queuing.go index cc8e1d68..60c1825b 100644 --- a/internal/app/subsystems/aio/queuing/queuing.go +++ b/internal/app/subsystems/aio/queuing/queuing.go @@ -2,6 +2,7 @@ package queuing import ( "context" + "errors" "fmt" "sync" @@ -13,6 +14,10 @@ import ( "github.com/resonatehq/resonate/internal/util" ) +var ( + ErrConnectionNotFound = errors.New("connection not found") +) + type ( // Config is the configuration for the queuing subsystem. Config struct { @@ -69,7 +74,7 @@ func New(baseURL string, config *Config) (aio.Subsystem, error) { // Check if target connection exists. if _, ok := conns[cfg.Target.Connection]; !ok { - return nil, fmt.Errorf("connection %q not found for routing %q", cfg.Target.Connection, cfg.Name) + return nil, fmt.Errorf("validation error for route '%s': %w", cfg.Name, ErrConnectionNotFound) } route, err := routes.NewRoute(cfg) diff --git a/internal/app/subsystems/aio/queuing/queuing_test.go b/internal/app/subsystems/aio/queuing/queuing_test.go new file mode 100644 index 00000000..88fa2561 --- /dev/null +++ b/internal/app/subsystems/aio/queuing/queuing_test.go @@ -0,0 +1,122 @@ +package queuing + +import ( + "errors" + "testing" + + "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/connections/t_conn" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/metadata" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/routes/t_route" +) + +func TestQueuingSubsystem(t *testing.T) { + testCases := []struct { + name string + config *Config + expectedError error + }{ + { + name: "valid config", + config: &Config{ + Connections: []*t_conn.ConnectionConfig{ + { + Name: "test_connection", + Kind: t_conn.HTTP, + Metadata: &metadata.Metadata{ + Properties: map[string]interface{}{ + "url": "http://example.com", + }, + }, + }, + }, + Routes: []*t_route.RoutingConfig{ + { + Name: "test_route", + Kind: t_route.Pattern, + Target: &t_route.Target{ + Connection: "test_connection", + Queue: "test_queue", + }, + Metadata: &metadata.Metadata{ + Properties: map[string]interface{}{ + "pattern": "/test/*", + }, + }, + }, + }, + }, + expectedError: nil, + }, + { + name: "missing connection for route", + config: &Config{ + Connections: []*t_conn.ConnectionConfig{}, + Routes: []*t_route.RoutingConfig{ + { + Name: "test_route", + Kind: t_route.Pattern, + Target: &t_route.Target{ + Connection: "missing_connection", + Queue: "test_queue", + }, + Metadata: &metadata.Metadata{ + Properties: map[string]interface{}{ + "pattern": "/test/*", + }, + }, + }, + }, + }, + expectedError: ErrConnectionNotFound, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + _, err := New("", tc.config) + if tc.expectedError != nil { + if err == nil { + t.Errorf("expected error: %s, got nil", tc.expectedError) + } else if !errors.Is(err, tc.expectedError) { + t.Errorf("expected error: %s, got: %s", tc.expectedError, err.Error()) + } + } else { + if err != nil { + t.Errorf("unexpected error: %s", err.Error()) + } + } + }) + } +} + +func TestQueuingSubsystem_StartStop(t *testing.T) { + config := &Config{ + Connections: []*t_conn.ConnectionConfig{ + { + Name: "test_connection", + Kind: t_conn.HTTP, + Metadata: &metadata.Metadata{ + Properties: map[string]interface{}{ + "url": "http://example.com", + }, + }, + }, + }, + Routes: []*t_route.RoutingConfig{}, + } + + qs, err := New("", config) + if err != nil { + t.Fatalf("unexpected error: %s", err.Error()) + } + + err = qs.Start() + if err != nil { + t.Errorf("unexpected error: %s", err.Error()) + } + + err = qs.Stop() + if err != nil { + t.Errorf("unexpected error: %s", err.Error()) + } +} diff --git a/internal/app/subsystems/aio/queuing/routes/pattern/pattern.go b/internal/app/subsystems/aio/queuing/routes/pattern/pattern.go index 50c314bd..9a7b19a0 100644 --- a/internal/app/subsystems/aio/queuing/routes/pattern/pattern.go +++ b/internal/app/subsystems/aio/queuing/routes/pattern/pattern.go @@ -2,18 +2,20 @@ package pattern import ( "errors" + "fmt" "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/metadata" "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/routes/t_route" + "github.com/resonatehq/resonate/internal/util" ) var ( - ErrMissingPattern = errors.New("pattern is required") + ErrMissingPattern = errors.New("missing field 'pattern'") ) type ( Pattern struct { - meta Metadata + meta *Metadata } Metadata struct { @@ -21,19 +23,32 @@ type ( } ) -func New(meta *metadata.Metadata) (t_route.Route, error) { +func New(cfg *t_route.RoutingConfig) (t_route.Route, error) { + util.Assert(cfg != nil, "routing config must not be nil") + util.Assert(cfg.Name != "", "name must not be empty") + util.Assert(cfg.Kind != "", "kind must not be empty") + util.Assert(cfg.Target != nil, "target must not be nil") + util.Assert(cfg.Target.Connection != "", "connection must not be empty") + util.Assert(cfg.Target.Queue != "", "queue must not be empty") + util.Assert(cfg.Metadata != nil, "metadata must not be nil") + util.Assert(cfg.Metadata.Properties != nil, "metadata properties must not be nil") + p := &Pattern{} - md := Metadata{} + md := &Metadata{} - if err := metadata.Decode(meta.Properties, &md); err != nil { + if err := metadata.Decode(cfg.Metadata.Properties, md); err != nil { return nil, err } + if md.Pattern == "" { + return nil, fmt.Errorf("validation error for route '%s': %w", cfg.Name, ErrMissingPattern) + } + p.meta = md - if p.meta.Pattern == "" { - return nil, ErrMissingPattern - } + util.Assert(p != nil, "pattern must not be nil") + util.Assert(p.meta != nil, "meta must not be nil") + util.Assert(p.meta.Pattern != "", "pattern must not be empty") return p, nil } diff --git a/internal/app/subsystems/aio/queuing/routes/pattern/pattern_test.go b/internal/app/subsystems/aio/queuing/routes/pattern/pattern_test.go index 6549b743..1794ef78 100644 --- a/internal/app/subsystems/aio/queuing/routes/pattern/pattern_test.go +++ b/internal/app/subsystems/aio/queuing/routes/pattern/pattern_test.go @@ -5,36 +5,59 @@ import ( "testing" "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/metadata" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/routes/t_route" ) func TestPattern(t *testing.T) { tcs := []struct { name string - meta *metadata.Metadata + config *t_route.RoutingConfig expectedError error }{ { name: "empty pattern", - meta: &metadata.Metadata{ - Properties: map[string]interface{}{}, + config: &t_route.RoutingConfig{ + Kind: "pattern", + Name: "test", + Target: &t_route.Target{ + Connection: "test", + Queue: "test", + }, + Metadata: &metadata.Metadata{ + Properties: map[string]interface{}{}, + }, }, expectedError: ErrMissingPattern, }, { name: "missing pattern", - meta: &metadata.Metadata{ - Properties: map[string]interface{}{ - "pattern": "", + config: &t_route.RoutingConfig{ + Kind: "pattern", + Name: "test", + Target: &t_route.Target{ + Connection: "test", + Queue: "test", }, + Metadata: &metadata.Metadata{ + Properties: map[string]interface{}{ + "pattern": "", + }}, }, expectedError: ErrMissingPattern, }, { name: "normal", - meta: &metadata.Metadata{ - Properties: map[string]interface{}{ - "pattern": "/gpu/summarize/*", + config: &t_route.RoutingConfig{ + Kind: "pattern", + Name: "test", + Target: &t_route.Target{ + Connection: "test", + Queue: "test", }, + Metadata: &metadata.Metadata{ + Properties: map[string]interface{}{ + "pattern": "/gpu/summarize/*", + }}, }, expectedError: nil, }, @@ -42,7 +65,7 @@ func TestPattern(t *testing.T) { for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - _, err := New(tc.meta) + _, err := New(tc.config) if tc.expectedError != nil { if err == nil { t.Errorf("expected error: %v, got nil", tc.expectedError) diff --git a/internal/app/subsystems/aio/queuing/routes/routes.go b/internal/app/subsystems/aio/queuing/routes/routes.go index dd1b8af3..ecec52d9 100644 --- a/internal/app/subsystems/aio/queuing/routes/routes.go +++ b/internal/app/subsystems/aio/queuing/routes/routes.go @@ -9,17 +9,19 @@ import ( ) var ( - ErrMissingRoutingConfig = errors.New("routing config is nil") - ErrMissingFieldName = errors.New("missing field 'name'") - ErrMissingFieldKind = errors.New("missing field 'kind'") - ErrMissingFieldTarget = errors.New("missing field 'target'") - ErrMissingFieldConn = errors.New("missing field 'target.connection'") - ErrMissingFieldQueue = errors.New("missing field 'target.queue'") - ErrInvalidRoutingKind = errors.New("invalid routing kind") + ErrMissingRoutingConfig = errors.New("routing config is nil") + ErrMissingFieldName = errors.New("missing field 'name'") + ErrMissingFieldKind = errors.New("missing field 'kind'") + ErrMissingFieldTarget = errors.New("missing field 'target'") + ErrMissingFieldConn = errors.New("missing field 'target.connection'") + ErrMissingFieldQueue = errors.New("missing field 'target.queue'") + ErrMissingMetadata = errors.New("missing field `metadata`") + ErrMissingMetadataProperties = errors.New("missing field `metadata.properties`") + ErrInvalidRoutingKind = errors.New("invalid routing kind") ) func NewRoute(cfg *t_route.RoutingConfig) (t_route.Route, error) { - // Validate all required fields are present. + // Validate all common required fields are present. if cfg == nil { return nil, ErrMissingRoutingConfig } @@ -38,6 +40,12 @@ func NewRoute(cfg *t_route.RoutingConfig) (t_route.Route, error) { if cfg.Target.Queue == "" { return nil, fmt.Errorf("validation error for route '%s': %w", cfg.Name, ErrMissingFieldQueue) } + if cfg.Metadata == nil { + return nil, fmt.Errorf("validation error for route '%s': %w", cfg.Name, ErrMissingMetadata) + } + if cfg.Metadata.Properties == nil { + return nil, fmt.Errorf("validation error for route '%s': %w", cfg.Name, ErrMissingMetadataProperties) + } var ( route t_route.Route @@ -46,7 +54,7 @@ func NewRoute(cfg *t_route.RoutingConfig) (t_route.Route, error) { switch cfg.Kind { case t_route.Pattern: - route, err = pattern.New(cfg.Metadata) + route, err = pattern.New(cfg) default: return nil, fmt.Errorf("validation error for route '%s': %w", cfg.Name, ErrInvalidRoutingKind) } diff --git a/internal/app/subsystems/aio/queuing/routes/routes_test.go b/internal/app/subsystems/aio/queuing/routes/routes_test.go index b1609878..afa03c12 100644 --- a/internal/app/subsystems/aio/queuing/routes/routes_test.go +++ b/internal/app/subsystems/aio/queuing/routes/routes_test.go @@ -54,6 +54,31 @@ func TestNewRoute(t *testing.T) { }, expectedError: ErrMissingFieldQueue, }, + { + name: "nil metadata", + config: &t_route.RoutingConfig{ + Name: "test", + Kind: t_route.Pattern, + Target: &t_route.Target{ + Connection: "amqp://localhost", + Queue: "test_queue", + }, + }, + expectedError: ErrMissingMetadata, + }, + { + name: "nil metadata properties", + config: &t_route.RoutingConfig{ + Name: "test", + Kind: t_route.Pattern, + Target: &t_route.Target{ + Connection: "amqp://localhost", + Queue: "test_queue", + }, + Metadata: &metadata.Metadata{}, + }, + expectedError: ErrMissingMetadataProperties, + }, { name: "invalid routing kind", config: &t_route.RoutingConfig{ @@ -63,6 +88,11 @@ func TestNewRoute(t *testing.T) { Connection: "amqp://localhost", Queue: "test_queue", }, + Metadata: &metadata.Metadata{ + Properties: map[string]interface{}{ + "pattern": "/test/*", + }, + }, }, expectedError: ErrInvalidRoutingKind, }, diff --git a/internal/app/subsystems/aio/queuing/worker_test.go b/internal/app/subsystems/aio/queuing/worker_test.go new file mode 100644 index 00000000..3f1dc953 --- /dev/null +++ b/internal/app/subsystems/aio/queuing/worker_test.go @@ -0,0 +1,115 @@ +package queuing + +import ( + "testing" + + "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/connections/t_conn" + "github.com/resonatehq/resonate/internal/kernel/bus" + "github.com/resonatehq/resonate/internal/kernel/t_aio" +) + +func TestQueuingWorker_Process(t *testing.T) { + testCases := []struct { + name string + submissions []*bus.SQE[t_aio.Submission, t_aio.Completion] + completions []*bus.CQE[t_aio.Submission, t_aio.Completion] + }{ + { + name: "valid submissions", + submissions: []*bus.SQE[t_aio.Submission, t_aio.Completion]{ + { + Submission: &t_aio.Submission{ + Queuing: &t_aio.QueuingSubmission{ + TaskId: "/test/task1", + Counter: 1, + }, + }, + }, + { + Submission: &t_aio.Submission{ + Queuing: &t_aio.QueuingSubmission{ + TaskId: "/test/task2", + Counter: 2, + }, + }, + }, + }, + completions: []*bus.CQE[t_aio.Submission, t_aio.Completion]{ + { + Completion: &t_aio.Completion{ + Queuing: &t_aio.QueuingCompletion{ + Result: t_aio.Success, + }, + }, + }, + { + Completion: &t_aio.Completion{ + Queuing: &t_aio.QueuingCompletion{ + Result: t_aio.Success, + }, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + router := NewRouter() + router.Handle("/test/*", &RouteHandler{ + Connection: "test_connection", + Queue: "test_queue", + }) + + connectionsSQ := make(map[string]chan *t_conn.ConnectionSubmission) + connectionsSQ["test_connection"] = make(chan *t_conn.ConnectionSubmission, 10) + + worker := &QueuingWorker{ + BaseURL: "http://localhost:8080", + ConnectionRouter: router, + ConnectionsSQ: connectionsSQ, + } + + cqes := worker.Process(tc.submissions) + + if len(tc.completions) != len(tc.submissions) { + t.Errorf("expected completions length: %d, got: %d", len(tc.submissions), len(cqes)) + } + + for _, cqe := range cqes { + if cqe.Completion.Queuing.Result != t_aio.Success { + t.Errorf("expected success, got: %v", cqe.Completion.Queuing.Result) + } + } + }) + } +} + +func TestQueuingWorker_ProcessInvalidTaskId(t *testing.T) { + router := NewRouter() + + worker := &QueuingWorker{ + BaseURL: "http://localhost:8080", + ConnectionRouter: router, + ConnectionsSQ: make(map[string]chan *t_conn.ConnectionSubmission), + } + + sqes := []*bus.SQE[t_aio.Submission, t_aio.Completion]{ + { + Submission: &t_aio.Submission{ + Queuing: &t_aio.QueuingSubmission{ + TaskId: "invalid_task", + Counter: 1, + }, + }, + }, + } + + defer func() { + if r := recover(); r == nil { + t.Errorf("expected panic, got nil") + } + }() + + worker.Process(sqes) +} From f7a1fe15d57b190aa60275cdc347a63132fe51a8 Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Fri, 12 Apr 2024 13:59:04 -0400 Subject: [PATCH 2/2] fix(lint): linting gods strike again --- .../aio/queuing/connections/t_conn/connections_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/app/subsystems/aio/queuing/connections/t_conn/connections_test.go b/internal/app/subsystems/aio/queuing/connections/t_conn/connections_test.go index 7b1b0e45..4f977aaf 100644 --- a/internal/app/subsystems/aio/queuing/connections/t_conn/connections_test.go +++ b/internal/app/subsystems/aio/queuing/connections/t_conn/connections_test.go @@ -89,11 +89,14 @@ func TestConnectionEventLoop(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // simulate a cancelled context from queuing subsystem. - tc.connection.Init(taskCh, &ConnectionConfig{ + err := tc.connection.Init(taskCh, &ConnectionConfig{ Kind: HTTP, Name: "test", Metadata: &metadata.Metadata{}, }) + if err != nil { + t.Fatalf("Failed to init connection: %v", err) + } Start(ctx, tc.connection)