diff --git a/go.mod b/go.mod index 3f49a05c..b766705c 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/Azure/go-autorest/autorest/validation v0.3.0 // indirect github.com/GoogleCloudPlatform/cloudsql-proxy v1.18.0 github.com/Shopify/sarama v1.27.0 + github.com/aerospike/aerospike-client-go v4.0.0+incompatible github.com/apache/thrift v0.13.0 // indirect github.com/aws/aws-sdk-go v1.34.31 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b @@ -59,6 +60,7 @@ require ( github.com/spf13/viper v1.7.1 github.com/streadway/amqp v1.0.0 github.com/stretchr/testify v1.6.1 + github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect go.mongodb.org/mongo-driver v1.4.1 go.uber.org/atomic v1.7.0 go.uber.org/zap v1.16.0 diff --git a/targets/stores/aerospike/client.go b/targets/stores/aerospike/client.go new file mode 100644 index 00000000..c72a3f7f --- /dev/null +++ b/targets/stores/aerospike/client.go @@ -0,0 +1,121 @@ +package aerospike + +import ( + "context" + "encoding/json" + "fmt" + aero "github.com/aerospike/aerospike-client-go" + "github.com/kubemq-hub/builder/connector/common" + "github.com/kubemq-hub/kubemq-targets/config" + "github.com/kubemq-hub/kubemq-targets/types" +) + +type Client struct { + name string + opts options + client *aero.Client +} + +func New() *Client { + return &Client{} +} +func (c *Client) Connector() *common.Connector { + return Connector() +} +func (c *Client) Init(ctx context.Context, cfg config.Spec) error { + c.name = cfg.Name + var err error + c.opts, err = parseOptions(cfg) + if err != nil { + return err + } + c.client, err = aero.NewClient(c.opts.host, c.opts.port) + if err != nil { + return fmt.Errorf("error in creating aerospike client: %s", err) + } + err = c.client.CreateUser(&aero.AdminPolicy{ + Timeout: c.opts.timeout, + }, c.opts.username, c.opts.password, nil) + if err != nil { + return err + } + return nil +} + +func (c *Client) Do(ctx context.Context, req *types.Request) (*types.Response, error) { + meta, err := parseMetadata(req.Metadata) + if err != nil { + return nil, err + } + switch meta.method { + case "get": + return c.Get(meta) + case "set": + return c.Put(meta, req.Data) + case "delete": + return c.Delete(meta) + + } + return nil, nil +} + +// +func (c *Client) Get(meta metadata) (*types.Response, error) { + key, err := aero.NewKey(meta.namespace, meta.key, nil) + if err != nil { + return nil, err + } + rec, err := c.client.Get(nil, key) + if err != nil { + return nil, err + } + if rec == nil { + return nil, fmt.Errorf("no data found for key %s", key.String()) + } + b, err := json.Marshal(rec) + if err != nil { + return nil, err + } + return types.NewResponse(). + SetData(b). + SetMetadataKeyValue("result", "ok"). + SetMetadataKeyValue("key", meta.key), nil +} + +// +func (c *Client) Put(meta metadata, data []byte) (*types.Response, error) { + key, err := aero.NewKey(meta.namespace, meta.key, data) + if err != nil { + return nil, err + } + err = c.client.Put(nil, key, nil) + if err != nil { + return nil, err + } + return types.NewResponse(). + SetMetadataKeyValue("result", "ok"). + SetMetadataKeyValue("key", meta.key), nil +} + +// +func (c *Client) Delete(meta metadata) (*types.Response, error) { + key, err := aero.NewKey(meta.namespace, meta.key, nil) + if err != nil { + return nil, err + } + del, err := c.client.Delete(nil, key) + if err != nil { + return nil, err + } + if !del { + return nil, fmt.Errorf("failed to delete %s", key.String()) + } + return types.NewResponse(). + SetMetadataKeyValue("result", "ok"). + SetMetadataKeyValue("key", meta.key), nil +} + +func (c *Client) Stop() error { + c.client.Close() + return nil +} diff --git a/targets/stores/aerospike/connector.go b/targets/stores/aerospike/connector.go new file mode 100644 index 00000000..34ee9935 --- /dev/null +++ b/targets/stores/aerospike/connector.go @@ -0,0 +1,104 @@ +package aerospike + +import ( + "github.com/kubemq-hub/builder/connector/common" + "math" +) + +func Connector() *common.Connector { + return common.NewConnector(). + SetKind("stores.mongodb"). + SetDescription("MongoDB Target"). + AddProperty( + common.NewProperty(). + SetKind("string"). + SetName("host"). + SetDescription("Set MongoDB host address"). + SetMust(true). + SetDefault(""), + ). + AddProperty( + common.NewProperty(). + SetKind("string"). + SetName("username"). + SetDescription("Set MongoDB username"). + SetMust(false). + SetDefault(""), + ). + AddProperty( + common.NewProperty(). + SetKind("string"). + SetName("password"). + SetDescription("Set MongoDB password"). + SetMust(false). + SetDefault(""), + ). + AddProperty( + common.NewProperty(). + SetKind("string"). + SetName("database"). + SetDescription("Set MongoDB database"). + SetMust(true). + SetDefault(""), + ). + AddProperty( + common.NewProperty(). + SetKind("string"). + SetName("collection"). + SetDescription("Set MongoDB collection"). + SetMust(true). + SetDefault(""), + ). + AddProperty( + common.NewProperty(). + SetKind("string"). + SetName("params"). + SetDescription("Set MongoDB params"). + SetMust(false). + SetDefault(""), + ). + AddProperty( + common.NewProperty(). + SetKind("string"). + SetName("read_concurrency"). + SetDescription("Set MongoDB read concurrency"). + SetOptions([]string{"local", "majority", "available", "linearizable", "snapshot"}). + SetMust(false). + SetDefault("local"), + ). + AddProperty( + common.NewProperty(). + SetKind("string"). + SetName("write_concurrency"). + SetDescription("Set MongoDB write concurrency"). + SetOptions([]string{"majority", "Other"}). + SetMust(false). + SetDefault("majority"), + ). + AddProperty( + common.NewProperty(). + SetKind("int"). + SetName("operation_timeout_seconds"). + SetDescription("Set MongoDB operation timeout seconds"). + SetMust(false). + SetDefault("30"). + SetMin(0). + SetMax(math.MaxInt32), + ). + AddMetadata( + common.NewMetadata(). + SetName("method"). + SetKind("string"). + SetDescription("Set MongoDB execution method"). + SetOptions([]string{"get", "set", "delete"}). + SetDefault("get"). + SetMust(true), + ). + AddMetadata( + common.NewMetadata(). + SetName("key"). + SetKind("string"). + SetDescription("Set MongoDB key"). + SetMust(true), + ) +} diff --git a/targets/stores/aerospike/deploy.sh b/targets/stores/aerospike/deploy.sh new file mode 100644 index 00000000..7d8397d3 --- /dev/null +++ b/targets/stores/aerospike/deploy.sh @@ -0,0 +1 @@ +docker run -it -d --name mongo -p 27017:27017 -e MONGO_INITDB_ROOT_USERNAME=admin -e MONGO_INITDB_ROOT_PASSWORD=password mongo:4.0.8 diff --git a/targets/stores/aerospike/metadata.go b/targets/stores/aerospike/metadata.go new file mode 100644 index 00000000..f50cc55e --- /dev/null +++ b/targets/stores/aerospike/metadata.go @@ -0,0 +1,34 @@ +package aerospike + +import ( + "fmt" + "github.com/kubemq-hub/kubemq-targets/types" +) + +var methodsMap = map[string]string{ + "get": "get", + "set": "set", + "delete": "delete", +} + +type metadata struct { + method string + key string + namespace string +} + +func parseMetadata(meta types.Metadata) (metadata, error) { + m := metadata{} + var err error + m.method, err = meta.ParseStringMap("method", methodsMap) + if err != nil { + return metadata{}, fmt.Errorf("error parsing method, %w", err) + } + + m.key, err = meta.MustParseString("key") + if err != nil { + return metadata{}, fmt.Errorf("error on parsing key value, %w", err) + } + + return m, nil +} diff --git a/targets/stores/aerospike/options.go b/targets/stores/aerospike/options.go new file mode 100644 index 00000000..3e432bfe --- /dev/null +++ b/targets/stores/aerospike/options.go @@ -0,0 +1,45 @@ +package aerospike + +import ( + "fmt" + "github.com/kubemq-hub/kubemq-targets/config" + "math" + "time" +) + +type options struct { + host string + port int + password string + username string + timeout time.Duration +} + +func parseOptions(cfg config.Spec) (options, error) { + o := options{} + var err error + o.host, err = cfg.Properties.MustParseString("host") + if err != nil { + return options{}, fmt.Errorf("error parsing host, %w", err) + } + o.username = cfg.Properties.ParseString("username", "") + o.password = cfg.Properties.ParseString("password", "") + o.database, err = cfg.Properties.MustParseString("database") + if err != nil { + return options{}, fmt.Errorf("error parsing database name, %w", err) + } + o.collection, err = cfg.Properties.MustParseString("collection") + if err != nil { + return options{}, fmt.Errorf("error parsing collection name, %w", err) + } + o.writeConcurrency = cfg.Properties.ParseString("write_concurrency", "") + o.readConcurrency = cfg.Properties.ParseString("read_concurrency", "") + + o.params = cfg.Properties.ParseString("params", "") + operationTimeoutSeconds, err := cfg.Properties.ParseIntWithRange("operation_timeout_seconds", 2, 0, math.MaxInt32) + if err != nil { + return options{}, fmt.Errorf("error operation timeout seconds, %w", err) + } + o.operationTimeout = time.Duration(operationTimeoutSeconds) * time.Second + return o, nil +}