Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tasks): several small updates #270

Merged
merged 2 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion cmd/serve/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import (
"encoding/json"

"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/connections/t_conn"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/routes/t_route"
)

type (
// ConnectionSlice is a slice of connections a user can define to configure the queueing subsystem.
ConnectionSlice []*t_conn.ConnectionConfig

// RouteSlice is a slice of routing configurations a user can define to configure the queueing subsystem.
RouteSlice []*t_route.RoutingConfig
)

func (c *ConnectionSlice) String() string {
Expand All @@ -28,6 +32,27 @@ func (c *ConnectionSlice) Set(v string) error {
return nil
}

func (c *ConnectionSlice) Type() string {
func (r *ConnectionSlice) Type() string {
return "ConnectionSlice"
}

func (r *RouteSlice) String() string {
if r == nil || len(*r) == 0 {
return ""
}
jsonStr, _ := json.Marshal(r)
return string(jsonStr)
}

func (r *RouteSlice) Set(v string) error {
routings := make([]*t_route.RoutingConfig, 0)
if err := json.Unmarshal([]byte(v), &routings); err != nil {
return err
}
*r = RouteSlice(routings)
return nil
}

func (r *RouteSlice) Type() string {
return "RouteSlice"
}
9 changes: 7 additions & 2 deletions cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func ServeCmd() *cobra.Command {
if err != nil {
return err
}
queuing, err := queuing.NewSubsytemOrDie(config.AIO.Subsystems.Queuing.Config)
queuing, err := queuing.NewSubsytemOrDie(config.API.BaseURL, config.AIO.Subsystems.Queuing.Config)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,11 +189,13 @@ func ServeCmd() *cobra.Command {
cmd.Flags().String("api-http-addr", "0.0.0.0:8001", "http server address")
cmd.Flags().Duration("api-http-timeout", 10*time.Second, "http server graceful shutdown timeout")
cmd.Flags().String("api-grpc-addr", "0.0.0.0:50051", "grpc server address")
cmd.Flags().String("api-base-url", "http://localhost:8001", "base url to automatically generate absolute URLs for the server's resources")

_ = viper.BindPFlag("api.size", cmd.Flags().Lookup("api-size"))
_ = viper.BindPFlag("api.subsystems.http.addr", cmd.Flags().Lookup("api-http-addr"))
_ = viper.BindPFlag("api.subsystems.http.timeout", cmd.Flags().Lookup("api-http-timeout"))
_ = viper.BindPFlag("api.subsystems.grpc.addr", cmd.Flags().Lookup("api-grpc-addr"))
_ = viper.BindPFlag("api.baseUrl", cmd.Flags().Lookup("api-base-url"))

// aio
// Store
Expand All @@ -219,9 +221,11 @@ func ServeCmd() *cobra.Command {
cmd.Flags().Duration("aio-network-timeout", 10*time.Second, "network request timeout")
// Queuing
cmd.Flags().Int("aio-queuing-size", 100, "size of queuing submission queue buffered channel")
cmd.Flags().Int("aio-queuing-workers", 1, "number of queuing workers") // must be 1.
cmd.Flags().Int("aio-queuing-workers", 1, "number of queuing workers")
cmd.Flags().Lookup("aio-queuing-workers").Hidden = true // must be 1.
cmd.Flags().Int("aio-queuing-batch-size", 100, "max submissions processed each tick by a queuing worker")
cmd.Flags().Var(&ConnectionSlice{}, "aio-queuing-connections", "queuing subsystem connections")
cmd.Flags().Var(&RouteSlice{}, "aio-queuing-routes", "queuing subsystem routes")

// Store
_ = viper.BindPFlag("aio.size", cmd.Flags().Lookup("aio-size"))
Expand Down Expand Up @@ -250,6 +254,7 @@ func ServeCmd() *cobra.Command {
_ = viper.BindPFlag("aio.subsystems.queuing.subsystem.workers", cmd.Flags().Lookup("aio-queuing-workers"))
_ = viper.BindPFlag("aio.subsystems.queuing.subsystem.batchSize", cmd.Flags().Lookup("aio-queuing-batch-size"))
_ = viper.BindPFlag("aio.subsystems.queuing.config.connections", cmd.Flags().Lookup("aio-queuing-connections"))
_ = viper.BindPFlag("aio.subsystems.queuing.config.routes", cmd.Flags().Lookup("aio-queuing-routes"))

// system
cmd.Flags().Int("system-notification-cache-size", 100, "max number of notifications to keep in cache")
Expand Down
1 change: 1 addition & 0 deletions cmd/util/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Config struct {
}

type APIConfig struct {
BaseURL string
Size int
Subsystems *APISubsystems
}
Expand Down
11 changes: 11 additions & 0 deletions internal/app/subsystems/aio/queuing/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ import (
)

func NewConnection(tasks <-chan *t_conn.ConnectionSubmission, cfg *t_conn.ConnectionConfig) (t_conn.Connection, error) {
// Validate all required fields are present.
if cfg == nil {
return nil, fmt.Errorf("connection config is empty")
}
if cfg.Name == "" {
return nil, fmt.Errorf("field 'name' is empty for connection '%s'", cfg.Name)
}
if cfg.Kind == "" {
return nil, fmt.Errorf("field 'kind' is empty for connection '%s'", cfg.Name)
}

var (
conn t_conn.Connection
err error
Expand Down
33 changes: 30 additions & 3 deletions internal/app/subsystems/aio/queuing/connections/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"bytes"
"encoding/json"
"fmt"
"net/http"

Expand All @@ -21,6 +22,18 @@ type (
Metadata struct {
URL string `mapstructure:"url"`
}

Payload struct {
Queue string `json:"queue"`
TaskId string `json:"taskId"`
Counter int `json:"counter"`
Links Links `json:"links"`
}

Links struct {
Claim string `json:"claim"`
Complete string `json:"complete"`
}
)

func New() t_conn.Connection {
Expand All @@ -46,10 +59,24 @@ func (c *HTTP) Task() <-chan *t_conn.ConnectionSubmission {
}

func (c *HTTP) Execute(sub *t_conn.ConnectionSubmission) error {
// Form request.
payload := fmt.Sprintf(`{"taskId":"%s", "counter":%d}`, sub.TaskId, sub.Counter)
// Form payload.
payload := Payload{
Queue: sub.Queue,
TaskId: sub.TaskId,
Counter: sub.Counter,
Links: Links{
Claim: sub.Links.Claim,
Complete: sub.Links.Complete,
},
}

// Marshal payload.
bs, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal payload: %v", err)
}

req, err := http.NewRequest("POST", c.meta.URL, bytes.NewBuffer([]byte(payload)))
req, err := http.NewRequest("POST", c.meta.URL, bytes.NewBuffer(bs))
if err != nil {
return fmt.Errorf("failed to create request: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@ type (
// kind identifies the type of queueing system to connect to (e.g. http, aws.sqs, etc.)
Kind ConnectionKind

// Pattern is the pattern to match the task id.
Pattern string

// Name identifies the existing connection to the queueing system.
Name string

// Queue identifies the task queue within the queueing system.
Queue string

// Metadata is the any additional information or configuration for the connection.
Metadata *metadata.Metadata
}

Links struct {
Claim string `json:"claim"`
Complete string `json:"complete"`
}

ConnectionSubmission struct {
// Queue is the task queue within the queueing system.
Queue string `json:"queue"`
Expand All @@ -42,6 +41,9 @@ type (

// Counter is the version of the task for claiming purposes.
Counter int `json:"counter"`

// Links are the links to claim and complete the specific task.
Links Links `json:"links"`
}

Connection interface {
Expand Down
49 changes: 36 additions & 13 deletions internal/app/subsystems/aio/queuing/queuing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,28 @@ package queuing

import (
"context"
"fmt"
"sync"

"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/connections"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/connections/t_conn"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/routes"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/routes/t_route"
"github.com/resonatehq/resonate/internal/util"
)

// Config is the configuration for the queuing subsystem.
type Config struct {
Connections []*t_conn.ConnectionConfig
Routes []*t_route.RoutingConfig
}

// QueuingSubsystem is a subsystem that dispatches tasks to user defined connections.
type QueuingSubsystem struct {
// baseURL is the base URL for the API to use by the workers.
baseURL string

// router contains the information for routing requests to connections.
connectionRouter Router

Expand All @@ -37,37 +44,52 @@ type QueuingSubsystem struct {
}

// NewSubsytemOrDie creates a new queuing subsystem with the given config.
func NewSubsytemOrDie(config *Config) (*QueuingSubsystem, error) {
// TODO: if nil, no need to do anything.
func NewSubsytemOrDie(baseURL string, config *Config) (*QueuingSubsystem, error) {
var (
conns = make(map[string]t_conn.Connection, len(config.Connections))
connSQ = make(map[string]chan *t_conn.ConnectionSubmission, len(config.Connections))
connRouter = NewRouter()
err error
)

// Create a connection for each connection configuration.
for _, cfg := range config.Connections {
tsq := make(chan *t_conn.ConnectionSubmission, 100) // TODO: make this configurable

connRouter.Handle(cfg.Pattern, &RouteHandler{ // for routing requests to the appropriate connection
Connection: cfg.Name,
Queue: cfg.Queue,
})
// Workaround for no dependency injection in coroutines. (TODO: consider alternatives)
CoroutineRouter().Handle(cfg.Pattern, &RouteHandler{
Connection: cfg.Name,
Queue: cfg.Queue,
})
tsq := make(chan *t_conn.ConnectionSubmission, 100) // TODO: make this configurable
connSQ[cfg.Name] = tsq // for communication between worker and connection
conns[cfg.Name], err = connections.NewConnection(tsq, cfg) // for starting the connection
if err != nil {
return nil, err
}
}

// Register the routes with the desired router (right now only the default pattern router is supported).
for _, cfg := range config.Routes {

// Check if target connection exists.
if _, ok := conns[cfg.Target.Connection]; !ok {
return nil, fmt.Errorf("connection %q not found for routing %q", cfg.Target.Connection, cfg.Name)
}

route, err := routes.NewRoute(cfg)
if err != nil {
return nil, err
}

connRouter.Handle(route.Route(), &RouteHandler{ // for routing requests to the appropriate connection
Connection: cfg.Target.Connection,
Queue: cfg.Target.Queue,
})
// Workaround for no dependency injection in coroutines. (TODO: consider alternatives)
CoroutineRouter().Handle(route.Route(), &RouteHandler{
Connection: cfg.Target.Connection,
Queue: cfg.Target.Queue,
})
}

ctx, cancel := context.WithCancel(context.Background())

return &QueuingSubsystem{
baseURL: baseURL,
connectionRouter: connRouter,
connections: conns,
connectionsSQ: connSQ,
Expand All @@ -85,6 +107,7 @@ func (t *QueuingSubsystem) String() string {
// NewWorker creates a new worker for the queuing subsystem with the submission queues for each connection.
func (t *QueuingSubsystem) NewWorker(i int) aio.Worker {
return &QueuingWorker{
BaseURL: t.baseURL,
ConnectionRouter: t.connectionRouter,
ConnectionsSQ: t.connectionsSQ,
i: i,
Expand Down
39 changes: 39 additions & 0 deletions internal/app/subsystems/aio/queuing/routes/pattern/pattern.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package pattern

import (
"fmt"

"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/metadata"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/routes/t_route"
)

type (
Pattern struct {
meta Metadata
}

Metadata struct {
Pattern string `mapstructure:"pattern"`
}
)

func New(meta *metadata.Metadata) (t_route.Route, error) {
p := &Pattern{}
md := Metadata{}

if err := metadata.Decode(meta.Properties, &md); err != nil {
return nil, err
}

p.meta = md

if p.meta.Pattern == "" {
return nil, fmt.Errorf("pattern is required")
}

return p, nil
}

func (p *Pattern) Route() string {
return p.meta.Pattern
}
48 changes: 48 additions & 0 deletions internal/app/subsystems/aio/queuing/routes/routes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package routes

import (
"fmt"

"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/routes/pattern"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/routes/t_route"
)

func NewRoute(cfg *t_route.RoutingConfig) (t_route.Route, error) {
// Validate all required fields are present.
if cfg == nil {
return nil, fmt.Errorf("routing config is nil")
}
if cfg.Name == "" {
return nil, fmt.Errorf("field 'name' is empty")
}
if cfg.Kind == "" {
return nil, fmt.Errorf("field 'kind' is empty for route '%s'", cfg.Name)
}
if cfg.Target == nil {
return nil, fmt.Errorf("field 'target' is empty for route' %s'", cfg.Name)
}
if cfg.Target.Connection == "" {
return nil, fmt.Errorf("field 'target.connection' is empty for route '%s'", cfg.Name)
}
if cfg.Target.Queue == "" {
return nil, fmt.Errorf("field 'target.queue' is empty for route '%s'", cfg.Name)
}

var (
route t_route.Route
err error
)

switch cfg.Kind {
case t_route.Pattern:
route, err = pattern.New(cfg.Metadata)
default:
return nil, fmt.Errorf("invalid routing kind: %s", cfg.Kind)
}

if err != nil {
return nil, err
}

return route, nil
}
Loading
Loading