Skip to content

Commit

Permalink
Add poll plugin (#425)
Browse files Browse the repository at this point in the history
* Add poll plugin

* Add create task and create callback to create promise request

* Refactor http and grpc apis

* Add seperate routes for create promise and task/callback

* Add dst for createpromiseandtask and createpromiseandcallback

* Add grpc implementation for
- createpromiseandtask
- createpromiseandcallback

* Add poll plugin tests
  • Loading branch information
dfarr authored Oct 1, 2024
1 parent 85cf02d commit e46733c
Show file tree
Hide file tree
Showing 104 changed files with 11,330 additions and 10,427 deletions.
13 changes: 8 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
.PHONY: gen-proto
gen-proto:
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/api/promise.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/api/callback.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/api/schedule.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/api/lock.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/api/task.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/promise_t.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/promise.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/callback_t.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/callback.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/schedule_t.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/schedule.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/lock.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/task.proto

.PHONY: deps
deps:
Expand Down
146 changes: 59 additions & 87 deletions cmd/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type config[T t_api, U t_aio] struct {
System system.Config `flag:"system"`
API T `flag:"api"`
AIO U `flag:"aio"`
MetricsPort int `flag:"metrics-port" desc:"prometheus metrics server port" default:"9090"`
MetricsAddr string `flag:"metrics-addr" desc:"prometheus metrics server address" default:":9090"`
LogLevel string `flag:"log-level" desc:"can be one of: debug, info, warn, error" default:"info"`
}

Expand All @@ -64,35 +64,6 @@ func (c *Config) Parse() error {
return err
}

// TODO: rethink defaults

if c.System.Url == "" {
host := c.API.Subsystems.Http.Config.Host
if host == "0.0.0.0" {
host = "127.0.0.1"
}
c.System.Url = fmt.Sprintf("http://%s:%d", host, c.API.Subsystems.Http.Config.Port)
}

if c.AIO.Subsystems.Router.Enabled {
var found bool

for _, source := range c.AIO.Subsystems.Router.Config.Sources {
if source.Name == "default" {
found = true
break
}
}

if !found {
c.AIO.Subsystems.Router.Config.Sources = append(c.AIO.Subsystems.Router.Config.Sources, router.SourceConfig{
Name: "default",
Type: "tag",
Data: []byte(`{"key": "resonate:invoke"}`),
})
}
}

return nil
}

Expand All @@ -109,27 +80,6 @@ func (c *ConfigDST) Parse(r *rand.Rand) error {
return err
}

// TODO: rethink defaults

if c.AIO.Subsystems.Router.Enabled {
var found bool

for _, source := range c.AIO.Subsystems.Router.Config.Sources {
if source.Name == "default" {
found = true
break
}
}

if !found {
c.AIO.Subsystems.Router.Config.Sources = append(c.AIO.Subsystems.Router.Config.Sources, router.SourceConfig{
Name: "default",
Type: "tag",
Data: []byte(`{"key": "resonate:invoke"}`),
})
}
}

return nil
}

Expand Down Expand Up @@ -176,46 +126,56 @@ type DisabledSubsystem[T any] struct {
Config T `flag:"-"`
}

func (s *APISubsystems) Instantiate(a api.API) []api.Subsystem {
func (c *Config) APISubsystems(a api.API) ([]api.Subsystem, error) {
subsystems := []api.Subsystem{}
if s.Http.Enabled {
subsystems = append(subsystems, http.New(a, &s.Http.Config))
if c.API.Subsystems.Http.Enabled {
subsystem, err := http.New(a, &c.API.Subsystems.Http.Config)
if err != nil {
return nil, err
}

subsystems = append(subsystems, subsystem)
}
if s.Grpc.Enabled {
subsystems = append(subsystems, grpc.New(a, &s.Grpc.Config))
if c.API.Subsystems.Grpc.Enabled {
subsystem, err := grpc.New(a, &c.API.Subsystems.Grpc.Config)
if err != nil {
return nil, err
}

subsystems = append(subsystems, subsystem)
}

return subsystems
return subsystems, nil
}

func (s *AIOSubsystems) Instantiate(a aio.AIO, metrics *metrics.Metrics) ([]aio.Subsystem, error) {
func (c *Config) AIOSubsystems(a aio.AIO, metrics *metrics.Metrics) ([]aio.Subsystem, error) {
subsystems := []aio.Subsystem{}
if s.Echo.Enabled {
subsystem, err := echo.New(a, metrics, &s.Echo.Config)
if c.AIO.Subsystems.Echo.Enabled {
subsystem, err := echo.New(a, metrics, &c.AIO.Subsystems.Echo.Config)
if err != nil {
return nil, err
}

subsystems = append(subsystems, subsystem)
}
if s.Router.Enabled {
subsystem, err := router.New(a, metrics, &s.Router.Config)
if c.AIO.Subsystems.Router.Enabled {
subsystem, err := router.New(a, metrics, &c.AIO.Subsystems.Router.Config)
if err != nil {
return nil, err
}

subsystems = append(subsystems, subsystem)
}
if s.Sender.Enabled {
subsystem, err := sender.New(a, metrics, &s.Sender.Config)
if c.AIO.Subsystems.Sender.Enabled {
subsystem, err := sender.New(a, metrics, &c.AIO.Subsystems.Sender.Config)
if err != nil {
return nil, err
}

subsystems = append(subsystems, subsystem)
}

subsystem, err := s.instantiateStore(a, metrics)
subsystem, err := c.store(a, metrics)
if err != nil {
return nil, err
}
Expand All @@ -224,12 +184,13 @@ func (s *AIOSubsystems) Instantiate(a aio.AIO, metrics *metrics.Metrics) ([]aio.
return subsystems, nil
}

func (s *AIOSubsystems) instantiateStore(a aio.AIO, metrics *metrics.Metrics) (aio.Subsystem, error) {
if s.StorePostgres.Enabled {
return postgres.New(a, metrics, &s.StorePostgres.Config)
} else if s.StoreSqlite.Enabled {
return sqlite.New(a, metrics, &s.StoreSqlite.Config)
func (c *Config) store(a aio.AIO, metrics *metrics.Metrics) (aio.Subsystem, error) {
if c.AIO.Subsystems.StorePostgres.Enabled {
return postgres.New(a, metrics, &c.AIO.Subsystems.StorePostgres.Config)
} else if c.AIO.Subsystems.StoreSqlite.Enabled {
return sqlite.New(a, metrics, &c.AIO.Subsystems.StoreSqlite.Config)
}

return nil, fmt.Errorf("no store enabled")
}

Expand All @@ -245,38 +206,48 @@ type AIODSTSubsystems struct {
StoreSqlite EnabledSubsystem[sqlite.Config] `flag:"store-sqlite"`
}

func (s *APIDSTSubsystems) Instantiate(a api.API) []api.Subsystem {
func (c *ConfigDST) APISubsystems(a api.API) ([]api.Subsystem, error) {
subsystems := []api.Subsystem{}
if s.Http.Enabled {
subsystems = append(subsystems, http.New(a, &s.Http.Config))
if c.API.Subsystems.Http.Enabled {
subsystem, err := http.New(a, &c.API.Subsystems.Http.Config)
if err != nil {
return nil, err
}

subsystems = append(subsystems, subsystem)
}
if s.Grpc.Enabled {
subsystems = append(subsystems, grpc.New(a, &s.Grpc.Config))
if c.API.Subsystems.Grpc.Enabled {
subsystem, err := grpc.New(a, &c.API.Subsystems.Grpc.Config)
if err != nil {
return nil, err
}

subsystems = append(subsystems, subsystem)
}

return subsystems
return subsystems, nil
}

func (s *AIODSTSubsystems) Instantiate(a aio.AIO, metrics *metrics.Metrics, r *rand.Rand, backchannel chan interface{}) ([]aio.SubsystemDST, error) {
func (c *ConfigDST) AIOSubsystems(a aio.AIO, metrics *metrics.Metrics, r *rand.Rand, backchannel chan interface{}) ([]aio.SubsystemDST, error) {
subsystems := []aio.SubsystemDST{}
if s.Router.Enabled {
subsystem, err := router.New(a, metrics, &s.Router.Config)
if c.AIO.Subsystems.Router.Enabled {
subsystem, err := router.New(a, metrics, &c.AIO.Subsystems.Router.Config)
if err != nil {
return nil, err
}

subsystems = append(subsystems, subsystem)
}
if s.Sender.Enabled {
subsystem, err := sender.NewDST(r, backchannel, &s.Sender.Config)
if c.AIO.Subsystems.Sender.Enabled {
subsystem, err := sender.NewDST(r, backchannel, &c.AIO.Subsystems.Sender.Config)
if err != nil {
return nil, err
}

subsystems = append(subsystems, subsystem)
}

subsystem, err := s.instantiateStore(a, metrics)
subsystem, err := c.store(a, metrics)
if err != nil {
return nil, err
}
Expand All @@ -285,12 +256,13 @@ func (s *AIODSTSubsystems) Instantiate(a aio.AIO, metrics *metrics.Metrics, r *r
return subsystems, nil
}

func (s *AIODSTSubsystems) instantiateStore(a aio.AIO, metrics *metrics.Metrics) (aio.SubsystemDST, error) {
if s.StorePostgres.Enabled {
return postgres.New(a, metrics, &s.StorePostgres.Config)
} else if s.StoreSqlite.Enabled {
return sqlite.New(a, metrics, &s.StoreSqlite.Config)
func (c *ConfigDST) store(a aio.AIO, metrics *metrics.Metrics) (aio.SubsystemDST, error) {
if c.AIO.Subsystems.StorePostgres.Enabled {
return postgres.New(a, metrics, &c.AIO.Subsystems.StorePostgres.Config)
} else if c.AIO.Subsystems.StoreSqlite.Enabled {
return sqlite.New(a, metrics, &c.AIO.Subsystems.StoreSqlite.Config)
}

return nil, fmt.Errorf("no store enabled")
}

Expand Down
20 changes: 10 additions & 10 deletions cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func RunDSTCmd() *cobra.Command {
headers = util.NewRangeIntFlag(1, 25)
data = util.NewRangeIntFlag(1, 25)
tags = util.NewRangeIntFlag(1, 25)
searches = util.NewRangeIntFlag(1, 10)
backchannelSize = util.NewRangeIntFlag(1, 1000)
)

Expand Down Expand Up @@ -80,10 +79,7 @@ func RunDSTCmd() *cobra.Command {

mux := netHttp.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
metricsServer := &netHttp.Server{
Addr: ":9090",
Handler: mux,
}
metricsServer := &netHttp.Server{Addr: ":9090", Handler: mux}

go metricsServer.ListenAndServe() // nolint: errcheck

Expand Down Expand Up @@ -113,16 +109,20 @@ func RunDSTCmd() *cobra.Command {
aio := aio.NewDST(r, p, metrics)

// api subsystems
for _, subsystem := range config.API.Subsystems.Instantiate(api) {
apiSubsystems, err := config.APISubsystems(api)
if err != nil {
return err
}
for _, subsystem := range apiSubsystems {
api.AddSubsystem(subsystem)
}

// aio subsystems
subsystems, err := config.AIO.Subsystems.Instantiate(aio, metrics, r, backchannel)
aioSubsystems, err := config.AIOSubsystems(aio, metrics, r, backchannel)
if err != nil {
return err
}
for _, subsystem := range subsystems {
for _, subsystem := range aioSubsystems {
aio.AddSubsystem(subsystem)
}

Expand All @@ -141,6 +141,8 @@ func RunDSTCmd() *cobra.Command {
system.AddOnRequest(t_api.ReadPromise, coroutines.ReadPromise)
system.AddOnRequest(t_api.SearchPromises, coroutines.SearchPromises)
system.AddOnRequest(t_api.CreatePromise, coroutines.CreatePromise)
system.AddOnRequest(t_api.CreatePromiseAndTask, coroutines.CreatePromiseAndTask)
system.AddOnRequest(t_api.CreatePromiseAndCallback, coroutines.CreatePromiseAndCallback)
system.AddOnRequest(t_api.CompletePromise, coroutines.CompletePromise)
system.AddOnRequest(t_api.CreateCallback, coroutines.CreateCallback)
system.AddOnRequest(t_api.ReadSchedule, coroutines.ReadSchedule)
Expand Down Expand Up @@ -176,7 +178,6 @@ func RunDSTCmd() *cobra.Command {
Headers: headers.Resolve(r),
Data: data.Resolve(r),
Tags: tags.Resolve(r),
Searches: searches.Resolve(r),
FaultInjection: p != 0,
Backchannel: backchannel,
})
Expand Down Expand Up @@ -213,7 +214,6 @@ func RunDSTCmd() *cobra.Command {
cmd.Flags().Var(headers, "headers", "promise header set size")
cmd.Flags().Var(data, "data", "promise data set size")
cmd.Flags().Var(tags, "tags", "promise tags set size")
cmd.Flags().Var(searches, "searches", "search set size")
cmd.Flags().Var(backchannelSize, "backchannel-size", "backchannel size")

// bind config
Expand Down
Loading

0 comments on commit e46733c

Please sign in to comment.