From 873ff9cf7180f84a083b9e76f0d25cf903bad544 Mon Sep 17 00:00:00 2001 From: kubemq Date: Mon, 1 Feb 2021 13:58:01 +0200 Subject: [PATCH] add sources options property to query --- sources/query/README.md | 2 + sources/query/client.go | 119 +++++++++++++++++++--------------- sources/query/client_test.go | 9 ++- sources/query/connector.go | 32 +++++++++ sources/query/options.go | 15 +++-- sources/query/options_test.go | 44 ++++++++++++- sources/queue/options_test.go | 11 ++++ 7 files changed, 171 insertions(+), 61 deletions(-) diff --git a/sources/query/README.md b/sources/query/README.md index 35790915..7f6b4e48 100644 --- a/sources/query/README.md +++ b/sources/query/README.md @@ -19,6 +19,7 @@ Query source connector configuration properties: | client_id | no | set client id | "client_id" | | auth_token | no | set authentication token | jwt token | | channel | yes | set channel to subscribe | | +| sources | no | set how many query sources to subscribe | 1 | | group | no | set subscriber group | | | auto_reconnect | no | set auto reconnect on lost connection | "false", "true" | | reconnect_interval_seconds | no | set reconnection seconds | "5" | @@ -43,6 +44,7 @@ bindings: auth_token: "" channel: "query.elastic-search" group: "" + source: "1" auto_reconnect: "true" reconnect_interval_seconds: "1" max_reconnects: "0" diff --git a/sources/query/client.go b/sources/query/client.go index 95c95d5b..ba23502d 100644 --- a/sources/query/client.go +++ b/sources/query/client.go @@ -8,8 +8,8 @@ import ( "github.com/kubemq-hub/kubemq-targets/config" "github.com/kubemq-hub/kubemq-targets/middleware" "github.com/kubemq-hub/kubemq-targets/pkg/logger" + "github.com/kubemq-hub/kubemq-targets/pkg/uuid" "github.com/kubemq-hub/kubemq-targets/types" - "github.com/nats-io/nuid" "github.com/kubemq-io/kubemq-go" "time" @@ -20,10 +20,10 @@ var ( ) type Client struct { - opts options - client *kubemq.Client - log *logger.Logger - target middleware.Middleware + opts options + clients []*kubemq.Client + log *logger.Logger + target middleware.Middleware } func New() *Client { @@ -41,17 +41,24 @@ func (c *Client) Init(ctx context.Context, cfg config.Spec) error { if err != nil { return err } - c.client, err = kubemq.NewClient(ctx, - kubemq.WithAddress(c.opts.host, c.opts.port), - kubemq.WithClientId(c.opts.clientId), - kubemq.WithTransportType(kubemq.TransportTypeGRPC), - kubemq.WithCheckConnection(true), - kubemq.WithAuthToken(c.opts.authToken), - kubemq.WithMaxReconnects(c.opts.maxReconnects), - kubemq.WithAutoReconnect(c.opts.autoReconnect), - kubemq.WithReconnectInterval(c.opts.reconnectIntervalSeconds)) - if err != nil { - return err + for i := 0; i < c.opts.sources; i++ { + clientId := c.opts.clientId + if c.opts.sources > 1 { + clientId = fmt.Sprintf("%s-%d", clientId, i) + } + client, err := kubemq.NewClient(ctx, + kubemq.WithAddress(c.opts.host, c.opts.port), + kubemq.WithClientId(c.opts.clientId), + kubemq.WithTransportType(kubemq.TransportTypeGRPC), + kubemq.WithCheckConnection(true), + kubemq.WithAuthToken(c.opts.authToken), + kubemq.WithMaxReconnects(c.opts.maxReconnects), + kubemq.WithAutoReconnect(c.opts.autoReconnect), + kubemq.WithReconnectInterval(c.opts.reconnectIntervalSeconds)) + if err != nil { + return err + } + c.clients = append(c.clients, client) } return nil } @@ -62,52 +69,57 @@ func (c *Client) Start(ctx context.Context, target middleware.Middleware) error } else { c.target = target } - group := nuid.Next() - if c.opts.group != "" { - group = c.opts.group + if c.opts.sources > 1 && c.opts.group == "" { + c.opts.group = uuid.New().String() + } + + for i := 0; i < len(c.clients); i++ { + err := c.runClient(ctx, c.clients[i]) + if err != nil { + return fmt.Errorf("error during start of client %d: %s", i, err.Error()) + } } + return nil +} +func (c *Client) runClient(ctx context.Context, client *kubemq.Client) error { errCh := make(chan error, 1) - queriesCh, err := c.client.SubscribeToQueries(ctx, c.opts.channel, group, errCh) + queriesCh, err := client.SubscribeToQueries(ctx, c.opts.channel, c.opts.group, errCh) if err != nil { return fmt.Errorf("error on subscribing to query channel, %w", err) } go func(ctx context.Context, queryCh <-chan *kubemq.QueryReceive, errCh chan error) { - c.run(ctx, queriesCh, errCh) - }(ctx, queriesCh, errCh) + for { + select { + case query := <-queryCh: - return nil -} - -func (c *Client) run(ctx context.Context, queryCh <-chan *kubemq.QueryReceive, errCh chan error) { - for { - select { - case query := <-queryCh: + go func(q *kubemq.QueryReceive) { + queryResponse := client.R(). + SetRequestId(query.Id). + SetResponseTo(query.ResponseTo) + resp, err := c.processQuery(ctx, query) + if err != nil { + resp = types.NewResponse().SetError(err) + } + queryResponse.SetExecutedAt(time.Now()). + SetBody(resp.MarshalBinary()) + err = queryResponse.Send(ctx) + if err != nil { + c.log.Errorf("error sending query response %s", err.Error()) + } + }(query) - go func(q *kubemq.QueryReceive) { - queryResponse := c.client.R(). - SetRequestId(query.Id). - SetResponseTo(query.ResponseTo) - resp, err := c.processQuery(ctx, query) - if err != nil { - resp = types.NewResponse().SetError(err) - } - queryResponse.SetExecutedAt(time.Now()). - SetBody(resp.MarshalBinary()) - err = queryResponse.Send(ctx) - if err != nil { - c.log.Errorf("error sending query response %s", err.Error()) - } - }(query) - - case err := <-errCh: - c.log.Errorf("error received from kuebmq server, %s", err.Error()) - return - case <-ctx.Done(): - return + case err := <-errCh: + c.log.Errorf("error received from kuebmq server, %s", err.Error()) + return + case <-ctx.Done(): + return + } } - } + + }(ctx, queriesCh, errCh) + return nil } func (c *Client) processQuery(ctx context.Context, query *kubemq.QueryReceive) (*types.Response, error) { @@ -122,5 +134,8 @@ func (c *Client) processQuery(ctx context.Context, query *kubemq.QueryReceive) ( return resp, nil } func (c *Client) Stop() error { - return c.client.Close() + for _, client := range c.clients { + _ = client.Close() + } + return nil } diff --git a/sources/query/client_test.go b/sources/query/client_test.go index 87aefc1a..22969bd2 100644 --- a/sources/query/client_test.go +++ b/sources/query/client_test.go @@ -5,10 +5,10 @@ import ( "fmt" "github.com/kubemq-hub/kubemq-targets/config" "github.com/kubemq-hub/kubemq-targets/middleware" + "github.com/kubemq-hub/kubemq-targets/pkg/uuid" "github.com/kubemq-hub/kubemq-targets/targets/null" "github.com/kubemq-hub/kubemq-targets/types" "github.com/kubemq-io/kubemq-go" - "github.com/nats-io/nuid" "github.com/stretchr/testify/require" "testing" "time" @@ -18,7 +18,6 @@ import ( func setupClient(ctx context.Context, target middleware.Middleware) (*Client, error) { c := New() - err := c.Init(ctx, config.Spec{ Name: "kubemq-rpc", Kind: "", @@ -31,6 +30,7 @@ func setupClient(ctx context.Context, target middleware.Middleware) (*Client, er "auto_reconnect": "true", "reconnect_interval_seconds": "1", "max_reconnects": "0", + "sources": "2", }, }) if err != nil { @@ -46,7 +46,7 @@ func setupClient(ctx context.Context, target middleware.Middleware) (*Client, er func sendQuery(ctx context.Context, req *types.Request, sendChannel string, timeout time.Duration) (*types.Response, error) { client, err := kubemq.NewClient(ctx, kubemq.WithAddress("localhost", 50000), - kubemq.WithClientId(nuid.Next()), + kubemq.WithClientId(uuid.New().String()), kubemq.WithTransportType(kubemq.TransportTypeGRPC)) if err != nil { return nil, err @@ -164,6 +164,7 @@ func TestClient_Init(t *testing.T) { "auto_reconnect": "true", "reconnect_interval_seconds": "1", "max_reconnects": "0", + "sources": "2", }, }, wantErr: false, @@ -221,6 +222,7 @@ func TestClient_Start(t *testing.T) { "auto_reconnect": "false", "reconnect_interval_seconds": "1", "max_reconnects": "0", + "sources": "2", }, }, wantErr: false, @@ -240,6 +242,7 @@ func TestClient_Start(t *testing.T) { "auto_reconnect": "true", "reconnect_interval_seconds": "1", "max_reconnects": "0", + "sources": "2", }, }, wantErr: true, diff --git a/sources/query/connector.go b/sources/query/connector.go index 8fa1950e..91755503 100644 --- a/sources/query/connector.go +++ b/sources/query/connector.go @@ -33,6 +33,14 @@ func Connector() *common.Connector { SetMust(false). SetDefault(""), ). + AddProperty( + common.NewProperty(). + SetKind("string"). + SetName("sources"). + SetDescription("Set how many concurrent Query sources to run"). + SetMust(false). + SetDefault("1"), + ). AddProperty( common.NewProperty(). SetKind("string"). @@ -48,5 +56,29 @@ func Connector() *common.Connector { SetDescription("Set Query connection authentication token"). SetMust(false). SetDefault(""), + ). + AddProperty( + common.NewProperty(). + SetKind("bool"). + SetName("auto_reconnect"). + SetDescription("Set Query auto reconnection "). + SetMust(false). + SetDefault("true"), + ). + AddProperty( + common.NewProperty(). + SetKind("int"). + SetName("reconnect_interval_seconds"). + SetDescription("Set Query auto reconnection interval in seconds "). + SetMust(false). + SetDefault("0"), + ). + AddProperty( + common.NewProperty(). + SetKind("int"). + SetName("max_reconnects"). + SetDescription("Set Query auto reconnection max reconnects"). + SetMust(false). + SetDefault("0"), ) } diff --git a/sources/query/options.go b/sources/query/options.go index 692475df..e61b966f 100644 --- a/sources/query/options.go +++ b/sources/query/options.go @@ -3,13 +3,13 @@ package query import ( "fmt" "github.com/kubemq-hub/kubemq-targets/config" - "github.com/nats-io/nuid" + "github.com/kubemq-hub/kubemq-targets/pkg/uuid" "time" ) const ( - defaultAddress = "localhost:50000" defaultAutoReconnect = true + defaultSources = 1 ) type options struct { @@ -22,27 +22,32 @@ type options struct { autoReconnect bool reconnectIntervalSeconds time.Duration maxReconnects int + sources int } func parseOptions(cfg config.Spec) (options, error) { o := options{} var err error - o.host, o.port, err = cfg.Properties.MustParseAddress("address", defaultAddress) + o.host, o.port, err = cfg.Properties.MustParseAddress("address", "") if err != nil { return options{}, fmt.Errorf("error parsing address value, %w", err) } o.authToken = cfg.Properties.ParseString("auth_token", "") - o.clientId = cfg.Properties.ParseString("client_id", nuid.Next()) + o.clientId = cfg.Properties.ParseString("client_id", uuid.New().String()) o.channel, err = cfg.Properties.MustParseString("channel") if err != nil { return o, fmt.Errorf("error parsing channel value, %w", err) } + o.sources, err = cfg.Properties.ParseIntWithRange("sources", defaultSources, 1, 1024) + if err != nil { + return options{}, fmt.Errorf("error parsing batch size value, %w", err) + } o.group = cfg.Properties.ParseString("group", "") o.autoReconnect = cfg.Properties.ParseBool("auto_reconnect", defaultAutoReconnect) - interval, err := cfg.Properties.ParseIntWithRange("reconnect_interval_seconds",0, 0, 1000000) + interval, err := cfg.Properties.ParseIntWithRange("reconnect_interval_seconds", 0, 0, 1000000) if err != nil { return o, fmt.Errorf("error parsing reconnect interval seconds value, %w", err) } diff --git a/sources/query/options_test.go b/sources/query/options_test.go index 50bbfce3..b0a117cb 100644 --- a/sources/query/options_test.go +++ b/sources/query/options_test.go @@ -27,10 +27,30 @@ func TestOptions_parseOptions(t *testing.T) { "auto_reconnect": "true", "reconnect_interval_seconds": "1", "max_reconnects": "0", + "sources": "2", }, }, wantErr: false, }, + { + name: "invalid options - bad address", + cfg: config.Spec{ + Name: "kubemq-rpc", + Kind: "", + Properties: map[string]string{ + "address": "bad-address", + "client_id": "", + "auth_token": "some-auth token", + "channel": "some-channel", + "group": "", + "auto_reconnect": "true", + "reconnect_interval_seconds": "1", + "max_reconnects": "0", + "sources": "2", + }, + }, + wantErr: true, + }, { name: "invalid options - no channel", cfg: config.Spec{ @@ -45,6 +65,7 @@ func TestOptions_parseOptions(t *testing.T) { "auto_reconnect": "true", "reconnect_interval_seconds": "1", "max_reconnects": "0", + "sources": "2", }, }, wantErr: true, @@ -63,6 +84,7 @@ func TestOptions_parseOptions(t *testing.T) { "auto_reconnect": "some-bad-error", "reconnect_interval_seconds": "1", "max_reconnects": "0", + "sources": "2", }, }, wantErr: false, @@ -79,8 +101,28 @@ func TestOptions_parseOptions(t *testing.T) { "channel": "some-channel", "group": "", "auto_reconnect": "true", - "reconnect_interval_seconds": "bad value", + "reconnect_interval_seconds": "-1", + "max_reconnects": "0", + "sources": "2", + }, + }, + wantErr: true, + }, + { + name: "valid options - bad sources", + cfg: config.Spec{ + Name: "kubemq-rpc", + Kind: "", + Properties: map[string]string{ + "address": "localhost:50000", + "client_id": "", + "auth_token": "some-auth token", + "channel": "some-channel", + "group": "", + "auto_reconnect": "true", + "reconnect_interval_seconds": "-1", "max_reconnects": "0", + "sources": "-1", }, }, wantErr: true, diff --git a/sources/queue/options_test.go b/sources/queue/options_test.go index 298d36c1..9c1b9e14 100644 --- a/sources/queue/options_test.go +++ b/sources/queue/options_test.go @@ -43,6 +43,17 @@ func TestOptions_parseOptions(t *testing.T) { }, wantErr: false, }, + { + name: "invalid options - bad address", + cfg: config.Spec{ + Name: "kubemq-rpc", + Kind: "", + Properties: map[string]string{ + "address": "bad-address", + }, + }, + wantErr: true, + }, { name: "invalid options - bad channel", cfg: config.Spec{