Skip to content

Commit

Permalink
add sources options property to query
Browse files Browse the repository at this point in the history
  • Loading branch information
kubemq committed Feb 1, 2021
1 parent 27312d0 commit 873ff9c
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 61 deletions.
2 changes: 2 additions & 0 deletions sources/query/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Query source connector configuration properties:
| client_id | no | set client id | "client_id" |
| auth_token | no | set authentication token | jwt token |
| channel | yes | set channel to subscribe | |
| sources | no | set how many query sources to subscribe | 1 |
| group | no | set subscriber group | |
| auto_reconnect | no | set auto reconnect on lost connection | "false", "true" |
| reconnect_interval_seconds | no | set reconnection seconds | "5" |
Expand All @@ -43,6 +44,7 @@ bindings:
auth_token: ""
channel: "query.elastic-search"
group: ""
source: "1"
auto_reconnect: "true"
reconnect_interval_seconds: "1"
max_reconnects: "0"
Expand Down
119 changes: 67 additions & 52 deletions sources/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"github.com/kubemq-hub/kubemq-targets/config"
"github.com/kubemq-hub/kubemq-targets/middleware"
"github.com/kubemq-hub/kubemq-targets/pkg/logger"
"github.com/kubemq-hub/kubemq-targets/pkg/uuid"
"github.com/kubemq-hub/kubemq-targets/types"
"github.com/nats-io/nuid"

"github.com/kubemq-io/kubemq-go"
"time"
Expand All @@ -20,10 +20,10 @@ var (
)

type Client struct {
opts options
client *kubemq.Client
log *logger.Logger
target middleware.Middleware
opts options
clients []*kubemq.Client
log *logger.Logger
target middleware.Middleware
}

func New() *Client {
Expand All @@ -41,17 +41,24 @@ func (c *Client) Init(ctx context.Context, cfg config.Spec) error {
if err != nil {
return err
}
c.client, err = kubemq.NewClient(ctx,
kubemq.WithAddress(c.opts.host, c.opts.port),
kubemq.WithClientId(c.opts.clientId),
kubemq.WithTransportType(kubemq.TransportTypeGRPC),
kubemq.WithCheckConnection(true),
kubemq.WithAuthToken(c.opts.authToken),
kubemq.WithMaxReconnects(c.opts.maxReconnects),
kubemq.WithAutoReconnect(c.opts.autoReconnect),
kubemq.WithReconnectInterval(c.opts.reconnectIntervalSeconds))
if err != nil {
return err
for i := 0; i < c.opts.sources; i++ {
clientId := c.opts.clientId
if c.opts.sources > 1 {
clientId = fmt.Sprintf("%s-%d", clientId, i)
}
client, err := kubemq.NewClient(ctx,
kubemq.WithAddress(c.opts.host, c.opts.port),
kubemq.WithClientId(c.opts.clientId),
kubemq.WithTransportType(kubemq.TransportTypeGRPC),
kubemq.WithCheckConnection(true),
kubemq.WithAuthToken(c.opts.authToken),
kubemq.WithMaxReconnects(c.opts.maxReconnects),
kubemq.WithAutoReconnect(c.opts.autoReconnect),
kubemq.WithReconnectInterval(c.opts.reconnectIntervalSeconds))
if err != nil {
return err
}
c.clients = append(c.clients, client)
}
return nil
}
Expand All @@ -62,52 +69,57 @@ func (c *Client) Start(ctx context.Context, target middleware.Middleware) error
} else {
c.target = target
}
group := nuid.Next()
if c.opts.group != "" {
group = c.opts.group
if c.opts.sources > 1 && c.opts.group == "" {
c.opts.group = uuid.New().String()
}

for i := 0; i < len(c.clients); i++ {
err := c.runClient(ctx, c.clients[i])
if err != nil {
return fmt.Errorf("error during start of client %d: %s", i, err.Error())
}
}

return nil
}
func (c *Client) runClient(ctx context.Context, client *kubemq.Client) error {
errCh := make(chan error, 1)
queriesCh, err := c.client.SubscribeToQueries(ctx, c.opts.channel, group, errCh)
queriesCh, err := client.SubscribeToQueries(ctx, c.opts.channel, c.opts.group, errCh)
if err != nil {
return fmt.Errorf("error on subscribing to query channel, %w", err)
}
go func(ctx context.Context, queryCh <-chan *kubemq.QueryReceive, errCh chan error) {
c.run(ctx, queriesCh, errCh)
}(ctx, queriesCh, errCh)
for {
select {
case query := <-queryCh:

return nil
}

func (c *Client) run(ctx context.Context, queryCh <-chan *kubemq.QueryReceive, errCh chan error) {
for {
select {
case query := <-queryCh:
go func(q *kubemq.QueryReceive) {
queryResponse := client.R().
SetRequestId(query.Id).
SetResponseTo(query.ResponseTo)
resp, err := c.processQuery(ctx, query)
if err != nil {
resp = types.NewResponse().SetError(err)
}
queryResponse.SetExecutedAt(time.Now()).
SetBody(resp.MarshalBinary())
err = queryResponse.Send(ctx)
if err != nil {
c.log.Errorf("error sending query response %s", err.Error())
}
}(query)

go func(q *kubemq.QueryReceive) {
queryResponse := c.client.R().
SetRequestId(query.Id).
SetResponseTo(query.ResponseTo)
resp, err := c.processQuery(ctx, query)
if err != nil {
resp = types.NewResponse().SetError(err)
}
queryResponse.SetExecutedAt(time.Now()).
SetBody(resp.MarshalBinary())
err = queryResponse.Send(ctx)
if err != nil {
c.log.Errorf("error sending query response %s", err.Error())
}
}(query)

case err := <-errCh:
c.log.Errorf("error received from kuebmq server, %s", err.Error())
return
case <-ctx.Done():
return
case err := <-errCh:
c.log.Errorf("error received from kuebmq server, %s", err.Error())
return
case <-ctx.Done():
return

}
}
}

}(ctx, queriesCh, errCh)
return nil
}

func (c *Client) processQuery(ctx context.Context, query *kubemq.QueryReceive) (*types.Response, error) {
Expand All @@ -122,5 +134,8 @@ func (c *Client) processQuery(ctx context.Context, query *kubemq.QueryReceive) (
return resp, nil
}
func (c *Client) Stop() error {
return c.client.Close()
for _, client := range c.clients {
_ = client.Close()
}
return nil
}
9 changes: 6 additions & 3 deletions sources/query/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"fmt"
"github.com/kubemq-hub/kubemq-targets/config"
"github.com/kubemq-hub/kubemq-targets/middleware"
"github.com/kubemq-hub/kubemq-targets/pkg/uuid"
"github.com/kubemq-hub/kubemq-targets/targets/null"
"github.com/kubemq-hub/kubemq-targets/types"
"github.com/kubemq-io/kubemq-go"
"github.com/nats-io/nuid"
"github.com/stretchr/testify/require"
"testing"
"time"
Expand All @@ -18,7 +18,6 @@ import (

func setupClient(ctx context.Context, target middleware.Middleware) (*Client, error) {
c := New()

err := c.Init(ctx, config.Spec{
Name: "kubemq-rpc",
Kind: "",
Expand All @@ -31,6 +30,7 @@ func setupClient(ctx context.Context, target middleware.Middleware) (*Client, er
"auto_reconnect": "true",
"reconnect_interval_seconds": "1",
"max_reconnects": "0",
"sources": "2",
},
})
if err != nil {
Expand All @@ -46,7 +46,7 @@ func setupClient(ctx context.Context, target middleware.Middleware) (*Client, er
func sendQuery(ctx context.Context, req *types.Request, sendChannel string, timeout time.Duration) (*types.Response, error) {
client, err := kubemq.NewClient(ctx,
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId(nuid.Next()),
kubemq.WithClientId(uuid.New().String()),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
return nil, err
Expand Down Expand Up @@ -164,6 +164,7 @@ func TestClient_Init(t *testing.T) {
"auto_reconnect": "true",
"reconnect_interval_seconds": "1",
"max_reconnects": "0",
"sources": "2",
},
},
wantErr: false,
Expand Down Expand Up @@ -221,6 +222,7 @@ func TestClient_Start(t *testing.T) {
"auto_reconnect": "false",
"reconnect_interval_seconds": "1",
"max_reconnects": "0",
"sources": "2",
},
},
wantErr: false,
Expand All @@ -240,6 +242,7 @@ func TestClient_Start(t *testing.T) {
"auto_reconnect": "true",
"reconnect_interval_seconds": "1",
"max_reconnects": "0",
"sources": "2",
},
},
wantErr: true,
Expand Down
32 changes: 32 additions & 0 deletions sources/query/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ func Connector() *common.Connector {
SetMust(false).
SetDefault(""),
).
AddProperty(
common.NewProperty().
SetKind("string").
SetName("sources").
SetDescription("Set how many concurrent Query sources to run").
SetMust(false).
SetDefault("1"),
).
AddProperty(
common.NewProperty().
SetKind("string").
Expand All @@ -48,5 +56,29 @@ func Connector() *common.Connector {
SetDescription("Set Query connection authentication token").
SetMust(false).
SetDefault(""),
).
AddProperty(
common.NewProperty().
SetKind("bool").
SetName("auto_reconnect").
SetDescription("Set Query auto reconnection ").
SetMust(false).
SetDefault("true"),
).
AddProperty(
common.NewProperty().
SetKind("int").
SetName("reconnect_interval_seconds").
SetDescription("Set Query auto reconnection interval in seconds ").
SetMust(false).
SetDefault("0"),
).
AddProperty(
common.NewProperty().
SetKind("int").
SetName("max_reconnects").
SetDescription("Set Query auto reconnection max reconnects").
SetMust(false).
SetDefault("0"),
)
}
15 changes: 10 additions & 5 deletions sources/query/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package query
import (
"fmt"
"github.com/kubemq-hub/kubemq-targets/config"
"github.com/nats-io/nuid"
"github.com/kubemq-hub/kubemq-targets/pkg/uuid"
"time"
)

const (
defaultAddress = "localhost:50000"
defaultAutoReconnect = true
defaultSources = 1
)

type options struct {
Expand All @@ -22,27 +22,32 @@ type options struct {
autoReconnect bool
reconnectIntervalSeconds time.Duration
maxReconnects int
sources int
}

func parseOptions(cfg config.Spec) (options, error) {
o := options{}
var err error
o.host, o.port, err = cfg.Properties.MustParseAddress("address", defaultAddress)
o.host, o.port, err = cfg.Properties.MustParseAddress("address", "")
if err != nil {
return options{}, fmt.Errorf("error parsing address value, %w", err)
}
o.authToken = cfg.Properties.ParseString("auth_token", "")

o.clientId = cfg.Properties.ParseString("client_id", nuid.Next())
o.clientId = cfg.Properties.ParseString("client_id", uuid.New().String())

o.channel, err = cfg.Properties.MustParseString("channel")
if err != nil {
return o, fmt.Errorf("error parsing channel value, %w", err)
}
o.sources, err = cfg.Properties.ParseIntWithRange("sources", defaultSources, 1, 1024)
if err != nil {
return options{}, fmt.Errorf("error parsing batch size value, %w", err)
}

o.group = cfg.Properties.ParseString("group", "")
o.autoReconnect = cfg.Properties.ParseBool("auto_reconnect", defaultAutoReconnect)
interval, err := cfg.Properties.ParseIntWithRange("reconnect_interval_seconds",0, 0, 1000000)
interval, err := cfg.Properties.ParseIntWithRange("reconnect_interval_seconds", 0, 0, 1000000)
if err != nil {
return o, fmt.Errorf("error parsing reconnect interval seconds value, %w", err)
}
Expand Down
Loading

0 comments on commit 873ff9c

Please sign in to comment.