Skip to content

Commit

Permalink
Aerospike WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
eitam-ring committed Dec 10, 2020
1 parent 9f42d0c commit e543058
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 0 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/Azure/go-autorest/autorest/validation v0.3.0 // indirect
github.com/GoogleCloudPlatform/cloudsql-proxy v1.18.0
github.com/Shopify/sarama v1.27.0
github.com/aerospike/aerospike-client-go v4.0.0+incompatible
github.com/apache/thrift v0.13.0 // indirect
github.com/aws/aws-sdk-go v1.34.31
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
Expand Down Expand Up @@ -59,6 +60,7 @@ require (
github.com/spf13/viper v1.7.1
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.6.1
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect
go.mongodb.org/mongo-driver v1.4.1
go.uber.org/atomic v1.7.0
go.uber.org/zap v1.16.0
Expand Down
121 changes: 121 additions & 0 deletions targets/stores/aerospike/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package aerospike

import (
"context"
"encoding/json"
"fmt"
aero "github.com/aerospike/aerospike-client-go"
"github.com/kubemq-hub/builder/connector/common"
"github.com/kubemq-hub/kubemq-targets/config"
"github.com/kubemq-hub/kubemq-targets/types"
)

type Client struct {
name string
opts options
client *aero.Client
}

func New() *Client {
return &Client{}
}
func (c *Client) Connector() *common.Connector {
return Connector()
}
func (c *Client) Init(ctx context.Context, cfg config.Spec) error {
c.name = cfg.Name
var err error
c.opts, err = parseOptions(cfg)
if err != nil {
return err
}
c.client, err = aero.NewClient(c.opts.host, c.opts.port)
if err != nil {
return fmt.Errorf("error in creating aerospike client: %s", err)
}
err = c.client.CreateUser(&aero.AdminPolicy{
Timeout: c.opts.timeout,
}, c.opts.username, c.opts.password, nil)
if err != nil {
return err
}
return nil
}

func (c *Client) Do(ctx context.Context, req *types.Request) (*types.Response, error) {
meta, err := parseMetadata(req.Metadata)
if err != nil {
return nil, err
}
switch meta.method {
case "get":
return c.Get(meta)
case "set":
return c.Put(meta, req.Data)
case "delete":
return c.Delete(meta)

}
return nil, nil
}

//
func (c *Client) Get(meta metadata) (*types.Response, error) {
key, err := aero.NewKey(meta.namespace, meta.key, nil)
if err != nil {
return nil, err
}
rec, err := c.client.Get(nil, key)
if err != nil {
return nil, err
}
if rec == nil {
return nil, fmt.Errorf("no data found for key %s", key.String())
}
b, err := json.Marshal(rec)
if err != nil {
return nil, err
}
return types.NewResponse().
SetData(b).
SetMetadataKeyValue("result", "ok").
SetMetadataKeyValue("key", meta.key), nil
}

//
func (c *Client) Put(meta metadata, data []byte) (*types.Response, error) {
key, err := aero.NewKey(meta.namespace, meta.key, data)
if err != nil {
return nil, err
}
err = c.client.Put(nil, key, nil)
if err != nil {
return nil, err
}
return types.NewResponse().
SetMetadataKeyValue("result", "ok").
SetMetadataKeyValue("key", meta.key), nil
}

//
func (c *Client) Delete(meta metadata) (*types.Response, error) {
key, err := aero.NewKey(meta.namespace, meta.key, nil)
if err != nil {
return nil, err
}
del, err := c.client.Delete(nil, key)
if err != nil {
return nil, err
}
if !del {
return nil, fmt.Errorf("failed to delete %s", key.String())
}
return types.NewResponse().
SetMetadataKeyValue("result", "ok").
SetMetadataKeyValue("key", meta.key), nil
}

func (c *Client) Stop() error {
c.client.Close()
return nil
}
104 changes: 104 additions & 0 deletions targets/stores/aerospike/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package aerospike

import (
"github.com/kubemq-hub/builder/connector/common"
"math"
)

func Connector() *common.Connector {
return common.NewConnector().
SetKind("stores.mongodb").
SetDescription("MongoDB Target").
AddProperty(
common.NewProperty().
SetKind("string").
SetName("host").
SetDescription("Set MongoDB host address").
SetMust(true).
SetDefault(""),
).
AddProperty(
common.NewProperty().
SetKind("string").
SetName("username").
SetDescription("Set MongoDB username").
SetMust(false).
SetDefault(""),
).
AddProperty(
common.NewProperty().
SetKind("string").
SetName("password").
SetDescription("Set MongoDB password").
SetMust(false).
SetDefault(""),
).
AddProperty(
common.NewProperty().
SetKind("string").
SetName("database").
SetDescription("Set MongoDB database").
SetMust(true).
SetDefault(""),
).
AddProperty(
common.NewProperty().
SetKind("string").
SetName("collection").
SetDescription("Set MongoDB collection").
SetMust(true).
SetDefault(""),
).
AddProperty(
common.NewProperty().
SetKind("string").
SetName("params").
SetDescription("Set MongoDB params").
SetMust(false).
SetDefault(""),
).
AddProperty(
common.NewProperty().
SetKind("string").
SetName("read_concurrency").
SetDescription("Set MongoDB read concurrency").
SetOptions([]string{"local", "majority", "available", "linearizable", "snapshot"}).
SetMust(false).
SetDefault("local"),
).
AddProperty(
common.NewProperty().
SetKind("string").
SetName("write_concurrency").
SetDescription("Set MongoDB write concurrency").
SetOptions([]string{"majority", "Other"}).
SetMust(false).
SetDefault("majority"),
).
AddProperty(
common.NewProperty().
SetKind("int").
SetName("operation_timeout_seconds").
SetDescription("Set MongoDB operation timeout seconds").
SetMust(false).
SetDefault("30").
SetMin(0).
SetMax(math.MaxInt32),
).
AddMetadata(
common.NewMetadata().
SetName("method").
SetKind("string").
SetDescription("Set MongoDB execution method").
SetOptions([]string{"get", "set", "delete"}).
SetDefault("get").
SetMust(true),
).
AddMetadata(
common.NewMetadata().
SetName("key").
SetKind("string").
SetDescription("Set MongoDB key").
SetMust(true),
)
}
1 change: 1 addition & 0 deletions targets/stores/aerospike/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker run -it -d --name mongo -p 27017:27017 -e MONGO_INITDB_ROOT_USERNAME=admin -e MONGO_INITDB_ROOT_PASSWORD=password mongo:4.0.8
34 changes: 34 additions & 0 deletions targets/stores/aerospike/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package aerospike

import (
"fmt"
"github.com/kubemq-hub/kubemq-targets/types"
)

var methodsMap = map[string]string{
"get": "get",
"set": "set",
"delete": "delete",
}

type metadata struct {
method string
key string
namespace string
}

func parseMetadata(meta types.Metadata) (metadata, error) {
m := metadata{}
var err error
m.method, err = meta.ParseStringMap("method", methodsMap)
if err != nil {
return metadata{}, fmt.Errorf("error parsing method, %w", err)
}

m.key, err = meta.MustParseString("key")
if err != nil {
return metadata{}, fmt.Errorf("error on parsing key value, %w", err)
}

return m, nil
}
45 changes: 45 additions & 0 deletions targets/stores/aerospike/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package aerospike

import (
"fmt"
"github.com/kubemq-hub/kubemq-targets/config"
"math"
"time"
)

type options struct {
host string
port int
password string
username string
timeout time.Duration
}

func parseOptions(cfg config.Spec) (options, error) {
o := options{}
var err error
o.host, err = cfg.Properties.MustParseString("host")
if err != nil {
return options{}, fmt.Errorf("error parsing host, %w", err)
}
o.username = cfg.Properties.ParseString("username", "")
o.password = cfg.Properties.ParseString("password", "")
o.database, err = cfg.Properties.MustParseString("database")
if err != nil {
return options{}, fmt.Errorf("error parsing database name, %w", err)
}
o.collection, err = cfg.Properties.MustParseString("collection")
if err != nil {
return options{}, fmt.Errorf("error parsing collection name, %w", err)
}
o.writeConcurrency = cfg.Properties.ParseString("write_concurrency", "")
o.readConcurrency = cfg.Properties.ParseString("read_concurrency", "")

o.params = cfg.Properties.ParseString("params", "")
operationTimeoutSeconds, err := cfg.Properties.ParseIntWithRange("operation_timeout_seconds", 2, 0, math.MaxInt32)
if err != nil {
return options{}, fmt.Errorf("error operation timeout seconds, %w", err)
}
o.operationTimeout = time.Duration(operationTimeoutSeconds) * time.Second
return o, nil
}

0 comments on commit e543058

Please sign in to comment.