Skip to content

Commit

Permalink
aws - minor improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
eitam-ring committed Sep 2, 2020
1 parent dec381e commit f65d77d
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 27 deletions.
1 change: 1 addition & 0 deletions targets/aws/amazonmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The following are required to run the AmazonMQ target connector:


- Please note the connector uses connection with stomp+ssl, when finishing handling messages need to call Close().

## Configuration

AmazonMQ target connector configuration properties:
Expand Down
2 changes: 1 addition & 1 deletion targets/aws/athena/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Kubemq athena target Connector

Kubemq athena target connector allows services using kubemq server to access aws athena service.
Kubemq athena target connector allows services using kubemq server to access aws athena service.

## Prerequisites
The following required to run the aws-dynamodb target connector:
Expand Down
2 changes: 1 addition & 1 deletion targets/aws/cloudwatch/events/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestClient_Init(t *testing.T) {
wantErr bool
}{
{
name: "init ",
name: "init",
cfg: config.Spec{
Name: "aws-cloudwatch-events",
Kind: "aws.cloudwatch.events",
Expand Down
2 changes: 0 additions & 2 deletions targets/aws/cloudwatch/logs/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type testStructure struct {
policyName string
policyDocument string

limit int64
sequenceToken string
}

Expand Down Expand Up @@ -81,7 +80,6 @@ func getTestStructure() (*testStructure, error) {
}
t.sequenceToken = fmt.Sprintf("%s", dat)

t.limit = 10

return t, nil
}
Expand Down
10 changes: 5 additions & 5 deletions targets/aws/keyspaces/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
)

const (
DefaultKey = ""
DefaultTable = ""
DefaultKey = ""
DefaultTable = ""
DefaultKeyspace = ""
)

Expand All @@ -20,10 +20,10 @@ var methodsMap = map[string]string{
}

var consistencyMap = map[string]string{
"one": "one",
"local_one": "local_one",
"one": "one",
"local_one": "local_one",
"local_quorum": "local_quorum",
"": "",
"": "",
}

type metadata struct {
Expand Down
32 changes: 18 additions & 14 deletions targets/aws/keyspaces/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,23 @@ const (
defaultReplicationFactor = 1
defaultConsistency = gocql.LocalQuorum
defaultPort = 9142
defaultUsername = ""
defaultPassword = ""
defaultKeyspace = ""
defaultTable = ""
)

type options struct {
hosts []string
port int
protoVersion int
replicationFactor int
username string
password string
consistency gocql.Consistency
defaultTable string
defaultKeyspace string
tls string
hosts []string
port int
protoVersion int
replicationFactor int
username string
password string
consistency gocql.Consistency
defaultTable string
defaultKeyspace string
tls string
timeoutSeconds time.Duration
connectTimeoutSeconds time.Duration
}
Expand Down Expand Up @@ -53,14 +57,14 @@ func parseOptions(cfg config.Spec) (options, error) {
if err != nil {
return options{}, fmt.Errorf("error parsing replication factor value, %w", err)
}
o.username = cfg.ParseString("username", "")
o.password = cfg.ParseString("password", "")
o.username = cfg.ParseString("username", defaultUsername)
o.password = cfg.ParseString("password", defaultPassword)
o.consistency, err = getConsistency(cfg.ParseString("consistency", "local_quorum"))
if err != nil {
return options{}, fmt.Errorf("error parsing consistency value, %w", err)
}
o.defaultTable = cfg.ParseString("default_table", "")
o.defaultKeyspace = cfg.ParseString("default_keyspace", "")
o.defaultTable = cfg.ParseString("default_table", defaultTable)
o.defaultKeyspace = cfg.ParseString("default_keyspace", defaultKeyspace)
connectTimeout, err := cfg.ParseIntWithRange("connect_timeout_seconds", 60, 1, math.MaxInt32)
if err != nil {
return options{}, fmt.Errorf("error parsing connect timeout seconds timeout value, %w", err)
Expand Down
2 changes: 1 addition & 1 deletion targets/aws/lambda/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The following required to run the aws-lambda target connector:

## Configuration

sns target connector configuration properties:
lambda target connector configuration properties:

| Properties Key | Required | Description | Example |
|:---------------|:---------|:-------------------------------------------|:----------------------------|
Expand Down
9 changes: 7 additions & 2 deletions targets/aws/msk/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import (
"github.com/kubemq-hub/kubemq-targets/types"
)

const (
defaultKey = ""
defaultHeaders = ""
)

type metadata struct {
Headers []kafka.RecordHeader
Key []byte
Expand All @@ -18,11 +23,11 @@ type metadata struct {
func parseMetadata(meta types.Metadata, opts options) (metadata, error) {
m := metadata{}
var err error
err = m.parseHeaders(meta.ParseString("headers", ""))
err = m.parseHeaders(meta.ParseString("headers", defaultHeaders))
if err != nil {
return metadata{}, fmt.Errorf("error parsing headers, %w", err)
}
k := meta.ParseString("key", "")
k := meta.ParseString("key", defaultKey)
err = m.parseKey(k)
if err != nil {
return metadata{}, fmt.Errorf("error parsing Key, %w", err)
Expand Down
2 changes: 1 addition & 1 deletion targets/aws/redshift/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ bindings:
properties:
host: "localhost"
port: "50000"
client_id: "kubemq-query-aws-redshift-connector"
client_id: "kubemq-query-aws-redshift-connector-svc"
auth_token: ""
channel: "query.aws.redshift.service"
group: ""
Expand Down

0 comments on commit f65d77d

Please sign in to comment.