From 97ea2790f78106531656612cce93addc2ae03a06 Mon Sep 17 00:00:00 2001 From: eitam-ring Date: Thu, 10 Dec 2020 11:39:45 +0200 Subject: [PATCH] Merge branch 'master' of https://github.com/eitam-ring/kubemq-targets - Added Percona --- README.md | 1 + config.yaml | 8 +- examples/stores/percona/config.yaml | 12 + examples/stores/percona/main.go | 68 +++ targets/stores/percona/README.md | 153 +++++++ targets/stores/percona/client.go | 206 +++++++++ targets/stores/percona/client_test.go | 597 ++++++++++++++++++++++++++ targets/stores/percona/connector.go | 69 +++ targets/stores/percona/deploy.yaml | 1 + targets/stores/percona/metadata.go | 56 +++ targets/stores/percona/options.go | 46 ++ targets/target.go | 11 +- 12 files changed, 1221 insertions(+), 7 deletions(-) create mode 100644 examples/stores/percona/config.yaml create mode 100644 examples/stores/percona/main.go create mode 100644 targets/stores/percona/README.md create mode 100644 targets/stores/percona/client.go create mode 100644 targets/stores/percona/client_test.go create mode 100644 targets/stores/percona/connector.go create mode 100644 targets/stores/percona/deploy.yaml create mode 100644 targets/stores/percona/metadata.go create mode 100644 targets/stores/percona/options.go diff --git a/README.md b/README.md index db4e06ca..62297fd8 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,7 @@ A list of supported targets is below. | | [Elastic Search](https://www.elastic.co/) |stores.elastic-search | [Usage](targets/stores/elastic) | [Example](examples/stores/elastic) | | | [Cassandra](https://cassandra.apache.org/) |stores.cassandra | [Usage](targets/stores/cassandra) | [Example](examples/stores/cassandra) | | | [Couchbase](https://www.couchbase.com/) |stores.couchbase | [Usage](targets/stores/couchbase) | [Example](examples/stores/couchbase) | +| | [Percona](https://www.percona.com/) |stores.percona | [Usage](targets/stores/percona) | [Example](examples/stores/percona) | | Messaging | | | | | | | [Kafka](https://kafka.apache.org/) |messaging.kafka | [Usage](targets/messaging/kafka) | [Example](examples/messaging/kafka) | | | [Nats](https://nats.io/) |messaging.nats | [Usage](targets/messaging/nats) | [Example](examples/messaging/nats) | diff --git a/config.yaml b/config.yaml index 04c64fdc..d2a05451 100644 --- a/config.yaml +++ b/config.yaml @@ -1,12 +1,12 @@ bindings: -- name: cockroachdb +- name: percona source: kind: kubemq.query properties: address: localhost:50000 - channel: query.cockroachdb + channel: query.percona target: - kind: stores.cockroachdb + kind: stores.percona properties: - connection: postgres://root:postgres@localhost:26257/postgres?sslmode=disable + connection: root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local properties: {} diff --git a/examples/stores/percona/config.yaml b/examples/stores/percona/config.yaml new file mode 100644 index 00000000..9d48ac42 --- /dev/null +++ b/examples/stores/percona/config.yaml @@ -0,0 +1,12 @@ +bindings: + - name: percona + source: + kind: kubemq.query + properties: + address: localhost:50000 + channel: query.percona + target: + kind: stores.percona + properties: + connection: root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local + properties: {} diff --git a/examples/stores/percona/main.go b/examples/stores/percona/main.go new file mode 100644 index 00000000..dd90a39b --- /dev/null +++ b/examples/stores/percona/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "fmt" + "github.com/kubemq-hub/kubemq-targets/types" + "github.com/kubemq-io/kubemq-go" + "github.com/nats-io/nuid" + "log" + "time" +) + +var ( + transactionString = `DROP TABLE IF EXISTS post; + CREATE TABLE post ( + ID bigint, + TITLE varchar(40), + CONTENT varchar(255), + BIGNUMBER bigint, + BOOLVALUE boolean, + CONSTRAINT pk_post PRIMARY KEY(ID) + ); + INSERT INTO post(ID,TITLE,CONTENT,BIGNUMBER,BOOLVALUE) VALUES + (0,NULL,'Content One',1231241241231231123,true), + (1,'Title Two','Content Two',123125241231231123,false);` + queryString = `SELECT id,title,content,bignumber,boolvalue FROM post;` +) + +func main() { + client, err := kubemq.NewClient(context.Background(), + kubemq.WithAddress("localhost", 50000), + kubemq.WithClientId(nuid.Next()), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + } + + transactionRequest := types.NewRequest(). + SetMetadataKeyValue("method", "transaction"). + SetData([]byte(transactionString)) + queryTransactionResponse, err := client.SetQuery(transactionRequest.ToQuery()). + SetChannel("query.percona"). + SetTimeout(10 * time.Second).Send(context.Background()) + if err != nil { + log.Fatal(err) + } + transactionResponse, err := types.ParseResponse(queryTransactionResponse.Body) + if err != nil { + log.Fatal(err) + } + log.Println(fmt.Sprintf("transaction request result: %s ", transactionResponse.Metadata.String())) + + queryRequest := types.NewRequest(). + SetMetadataKeyValue("method", "query"). + SetData([]byte(queryString)) + + queryResponse, err := client.SetQuery(queryRequest.ToQuery()). + SetChannel("query.percona"). + SetTimeout(10 * time.Second).Send(context.Background()) + if err != nil { + log.Fatal(err) + } + response, err := types.ParseResponse(queryResponse.Body) + if err != nil { + log.Fatal(err) + } + log.Println(fmt.Sprintf("query request results: %s ", string(response.Data))) +} diff --git a/targets/stores/percona/README.md b/targets/stores/percona/README.md new file mode 100644 index 00000000..bc10e69b --- /dev/null +++ b/targets/stores/percona/README.md @@ -0,0 +1,153 @@ +# Kubemq percona Target Connector + +Kubemq percona target connector allows services using kubemq server to access percona database services. + +## Prerequisites +The following are required to run the percona target connector: + +- kubemq cluster +- percona server +- kubemq-targets deployment + +## Configuration + +percona target connector configuration properties: + +| Properties Key | Required | Description | Example | +|:--------------------------------|:---------|:--------------------------------------------|:-----------------------------------------------------------------------| +| connection | yes | percona connection string address | "root:percona@(localhost:3306)/store?charset=utf8&parseTime=True&loc=Local" | +| max_idle_connections | no | set max idle connections | "10" | +| max_open_connections | no | set max open connections | "100" | +| connection_max_lifetime_seconds | no | set max lifetime for connections in seconds | "3600" | + + +Example: + +```yaml +bindings: +- name: percona + source: + kind: kubemq.query + properties: + address: localhost:50000 + channel: query.percona + target: + kind: stores.percona + properties: + connection: root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local + properties: {} + +``` + +## Usage + +### Query Request + +Query request metadata setting: + +| Metadata Key | Required | Description | Possible values | +|:-------------|:---------|:-----------------|:----------------| +| method | yes | set type of request | "query" | + +Query request data setting: + +| Data Key | Required | Description | Possible values | +|:---------|:---------|:-------------|:-------------------| +| data | yes | query string | base64 bytes array | + +Example: + +Query string: `SELECT id,title,content,bignumber,boolvalue FROM post;` + +```json +{ + "metadata": { + "method": "query" + }, + "data": "U0VMRUNUIGlkLHRpdGxlLGNvbnRlbnQsYmlnbnVtYmVyLGJvb2x2YWx1ZSBGUk9NIHBvc3Q7" +} +``` + +### Exec Request + +Exec request metadata setting: + +| Metadata Key | Required | Description | Possible values | +|:----------------|:---------|:---------------------------------------|:-------------------| +| method | yes | set type of request | "exec" | +| isolation_level | no | set isolation level for exec operation | "" | +| | | | "read_uncommitted" | +| | | | "read_committed" | +| | | | "repeatable_read" | +| | | | "serializable" | +| | | | | + + +Exec request data setting: + +| Data Key | Required | Description | Possible values | +|:---------|:---------|:------------------------------|:--------------------| +| data | yes | exec string | base64 bytes array | + +Example: + +Exec string: +```sql +INSERT INTO post(ID,TITLE,CONTENT,BIGNUMBER,BOOLVALUE) VALUES + (0,NULL,'Content One',1231241241231231123,true), + (1,'Title Two','Content Two',123125241231231123,false); +``` + +```json +{ + "metadata": { + "method": "exec", + "isolation_level": "read_uncommitted" + }, + "data": "SU5TRVJUIElOVE8gcG9zdChJRCxUSVRMRSxDT05URU5ULEJJR05VTUJFUixCT09MVkFMVUUpIFZBTFVFUwoJICAgICAgICAgICAgICAgICAgICAgICAoMCxOVUxMLCdDb250ZW50IE9uZScsMTIzMTI0MTI0MTIzMTIzMTEyMyx0cnVlKSwKCSAgICAgICAgICAgICAgICAgICAgICAgKDEsJ1RpdGxlIFR3bycsJ0NvbnRlbnQgVHdvJywxMjMxMjUyNDEyMzEyMzExMjMsZmFsc2UpOw==" +} +``` + +### Transaction Request + +Transaction request metadata setting: + +| Metadata Key | Required | Description | Possible values | +|:----------------|:---------|:---------------------------------------|:-------------------| +| method | yes | set type of request | "transaction" | +| isolation_level | no | set isolation level for exec operation | "" | +| | | | "read_uncommitted" | +| | | | "read_committed" | +| | | | "repeatable_read" | +| | | | "serializable" | + + +Transaction request data setting: + +| Data Key | Required | Description | Possible values | +|:---------|:---------|:------------------------------|:--------------------| +| data | yes | string string | base64 bytes array | + +Example: + +Transaction string: +```sql +DROP TABLE IF EXISTS post; +CREATE TABLE post ( + ID bigint, + TITLE varchar(40), + CONTENT varchar(255), + BIGNUMBER bigint, + BOOLVALUE boolean, + CONSTRAINT pk_post PRIMARY KEY(ID) + ); +``` +```json +{ + "metadata": { + "key": "your-percona-key", + "method": "delete" + }, + "data": "RFJPUCBUQUJMRSBJRiBFWElTVFMgcG9zdDsKCSAgICAgICBDUkVBVEUgVEFCTEUgcG9zdCAoCgkgICAgICAgICBJRCBiaWdpbnQsCgkgICAgICAgICBUSVRMRSB2YXJjaGFyKDQwKSwKCSAgICAgICAgIENPTlRFTlQgdmFyY2hhcigyNTUpLAoJCQkgQklHTlVNQkVSIGJpZ2ludCwKCQkJIEJPT0xWQUxVRSBib29sZWFuLAoJICAgICAgICAgQ09OU1RSQUlOVCBwa19wb3N0IFBSSU1BUlkgS0VZKElEKQoJICAgICAgICk7" +} +``` diff --git a/targets/stores/percona/client.go b/targets/stores/percona/client.go new file mode 100644 index 00000000..b172123f --- /dev/null +++ b/targets/stores/percona/client.go @@ -0,0 +1,206 @@ +package percona + +import ( + "context" + "database/sql" + "fmt" + _ "github.com/go-sql-driver/mysql" + jsoniter "github.com/json-iterator/go" + "github.com/kubemq-hub/builder/connector/common" + "github.com/kubemq-hub/kubemq-targets/config" + "github.com/kubemq-hub/kubemq-targets/types" + "strconv" + "strings" + "time" +) + +var json = jsoniter.ConfigCompatibleWithStandardLibrary + +// Client is a Client state store +type Client struct { + name string + db *sql.DB + opts options +} + +func New() *Client { + return &Client{} +} +func (c *Client) Connector() *common.Connector { + return Connector() +} +func (c *Client) Init(ctx context.Context, cfg config.Spec) error { + c.name = cfg.Name + var err error + c.opts, err = parseOptions(cfg) + if err != nil { + return err + } + c.db, err = sql.Open("mysql", c.opts.connection) + if err != nil { + return err + } + err = c.db.PingContext(ctx) + if err != nil { + _ = c.db.Close() + return fmt.Errorf("error reaching mysql at %s: %w", c.opts.connection, err) + } + c.db.SetMaxOpenConns(c.opts.maxOpenConnections) + c.db.SetMaxIdleConns(c.opts.maxIdleConnections) + c.db.SetConnMaxLifetime(time.Duration(c.opts.connectionMaxLifetimeSeconds) * time.Second) + return nil +} + +func (c *Client) Do(ctx context.Context, req *types.Request) (*types.Response, error) { + meta, err := parseMetadata(req.Metadata) + if err != nil { + return nil, err + } + switch meta.method { + case "query": + return c.Query(ctx, meta, req.Data) + case "exec": + return c.Exec(ctx, meta, req.Data) + case "transaction": + return c.Transaction(ctx, meta, req.Data) + } + + return nil, nil +} +func (c *Client) Exec(ctx context.Context, meta metadata, value []byte) (*types.Response, error) { + stmts := getStatements(value) + if stmts == nil { + return nil, fmt.Errorf("no exec statement found") + } + for i, stmt := range stmts { + if stmt != "" { + _, err := c.db.ExecContext(ctx, stmt) + if err != nil { + return nil, fmt.Errorf("error on statement %d, %w", i, err) + } + } + } + + return types.NewResponse(). + SetMetadataKeyValue("result", "ok"), + nil +} +func getStatements(data []byte) []string { + if data == nil { + return nil + } + return strings.Split(string(data), ";") +} +func (c *Client) Transaction(ctx context.Context, meta metadata, value []byte) (*types.Response, error) { + stmts := getStatements(value) + if stmts == nil { + return nil, fmt.Errorf("no transaction statements found") + } + + tx, err := c.db.BeginTx(ctx, &sql.TxOptions{ + Isolation: meta.isolationLevel, + ReadOnly: false, + }) + if err != nil { + return nil, err + } + defer func() { + if r := recover(); r != nil { + _ = tx.Rollback() + } + }() + for i, stmt := range stmts { + if stmt != "" { + _, err := tx.ExecContext(ctx, stmt) + if err != nil { + rollBackErr := tx.Rollback() + if rollBackErr != nil { + return nil, rollBackErr + } + return nil, fmt.Errorf("error on statement %d, %w", i, err) + } + } + } + + err = tx.Commit() + if err != nil { + return nil, err + } + return types.NewResponse(). + SetMetadataKeyValue("result", "ok"), + nil +} + +func (c *Client) Query(ctx context.Context, meta metadata, value []byte) (*types.Response, error) { + stmt := string(value) + if stmt == "" { + return nil, fmt.Errorf("no query statement found") + } + rows, err := c.db.QueryContext(ctx, stmt) + if err != nil { + return nil, err + } + defer rows.Close() + return types.NewResponse(). + SetData(c.rowsToMap(rows)). + SetMetadataKeyValue("result", "ok"), nil + +} + +func (c *Client) rowsToMap(rows *sql.Rows) []byte { + + cols, _ := rows.Columns() + colsTypes, err := rows.ColumnTypes() + if err != nil { + return nil + } + var results []map[string]interface{} + for rows.Next() { + results = append(results, parseWithRawBytes(rows, cols, colsTypes)) + } + if results == nil { + return nil + } + b, _ := json.Marshal(results) + return b +} + +func parseWithRawBytes(rows *sql.Rows, cols []string, colsTypes []*sql.ColumnType) map[string]interface{} { + vals := make([]sql.RawBytes, len(cols)) + scanArgs := make([]interface{}, len(vals)) + for i := range vals { + scanArgs[i] = &vals[i] + } + if err := rows.Scan(scanArgs...); err != nil { + panic(err) + } + m := make(map[string]interface{}) + for i, col := range vals { + if col == nil { + continue + } + switch colsTypes[i].DatabaseTypeName() { + case "TINYINT", "BOOLEAN": + m[cols[i]], _ = strconv.ParseBool(string(col)) + case "SMALLINT", "MEDIUMINT": + m[cols[i]], _ = strconv.Atoi(string(col)) + case "BIGINT": + m[cols[i]], _ = strconv.ParseInt(string(col), 10, 64) + case "FLOAT": + val, _ := strconv.ParseFloat(string(col), 32) + m[cols[i]] = float32(val) + case "DOUBLE", "DECIMAL": + m[cols[i]], _ = strconv.ParseFloat(string(col), 64) + default: + m[cols[i]] = string(col) + } + } + return m +} + +func (c *Client) Stop() error { + if c.db != nil { + return c.db.Close() + } + return nil +} diff --git a/targets/stores/percona/client_test.go b/targets/stores/percona/client_test.go new file mode 100644 index 00000000..6bfa9ff6 --- /dev/null +++ b/targets/stores/percona/client_test.go @@ -0,0 +1,597 @@ +package percona + +import ( + "context" + "github.com/kubemq-hub/kubemq-targets/config" + "github.com/kubemq-hub/kubemq-targets/types" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +type post struct { + Id int64 `json:"id"` + Title string `json:"title,omitempty"` + Content string `json:"content,omitempty"` + BigNumber int64 `json:"bignumber,omitempty"` + BoolValue bool `json:"boolvalue"` +} +type posts []*post + +func (p *posts) marshal() []byte { + b, _ := json.Marshal(p) + return b +} +func unmarshal(data []byte) *posts { + if data == nil { + return nil + } + p := &posts{} + _ = json.Unmarshal(data, p) + return p +} + +var allPosts = posts{ + &post{ + Id: 0, + Content: "Content One", + BigNumber: 1231241241231231123, + BoolValue: true, + }, + &post{ + Id: 1, + Title: "Title Two", + Content: "Content Two", + BigNumber: 123125241231231123, + BoolValue: false, + }, +} + +const ( + createPostTable = `DROP TABLE IF EXISTS post; + CREATE TABLE post ( + ID bigint, + TITLE varchar(40), + CONTENT varchar(255), + BIGNUMBER bigint, + BOOLVALUE boolean, + CONSTRAINT pk_post PRIMARY KEY(ID) + ); + INSERT INTO post(ID,TITLE,CONTENT,BIGNUMBER,BOOLVALUE) VALUES + (0,NULL,'Content One',1231241241231231123,true), + (1,'Title Two','Content Two',123125241231231123,false);` + selectPostTable = `SELECT id,title,content,bignumber,boolvalue FROM post;` +) + +func TestClient_Init(t *testing.T) { + tests := []struct { + name string + cfg config.Spec + wantErr bool + }{ + { + name: "init", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + wantErr: false, + }, + { + name: "init - bad connection string", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "bad connection string", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + wantErr: true, + }, + { + name: "init - bad port connection string", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:5678)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + wantErr: true, + }, + { + name: "init - no connection string", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + wantErr: true, + }, + { + name: "init - bad max idle connections", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "-1", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + wantErr: true, + }, + { + name: "init - bad max open connections", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "-1", + "connection_max_lifetime_seconds": "", + }, + }, + wantErr: true, + }, + { + name: "init - bad connection max lifetime seconds", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "-1", + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + c := New() + + if err := c.Init(ctx, tt.cfg); (err != nil) != tt.wantErr { + t.Errorf("Init() error = %v, wantExecErr %v", err, tt.wantErr) + return + } + + }) + } +} + +func TestClient_Query_Exec_Transaction(t *testing.T) { + tests := []struct { + name string + cfg config.Spec + execRequest *types.Request + queryRequest *types.Request + wantExecResponse *types.Response + wantQueryResponse *types.Response + wantExecErr bool + wantQueryErr bool + }{ + { + name: "valid exec query request", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + execRequest: types.NewRequest(). + SetMetadataKeyValue("method", "exec"). + SetData([]byte(createPostTable)), + queryRequest: types.NewRequest(). + SetMetadataKeyValue("method", "query"). + SetData([]byte(selectPostTable)), + wantExecResponse: types.NewResponse(). + SetMetadataKeyValue("result", "ok"), + wantQueryResponse: types.NewResponse(). + SetMetadataKeyValue("result", "ok"). + SetData(allPosts.marshal()), + wantExecErr: false, + wantQueryErr: false, + }, + { + name: "empty exec request", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + execRequest: types.NewRequest(). + SetMetadataKeyValue("method", "exec"), + + queryRequest: nil, + wantExecResponse: nil, + wantQueryResponse: nil, + wantExecErr: true, + wantQueryErr: false, + }, + { + name: "invalid exec request", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + execRequest: types.NewRequest(). + SetMetadataKeyValue("method", "exec"). + SetData([]byte("bad statement")), + queryRequest: nil, + wantExecResponse: nil, + wantQueryResponse: nil, + wantExecErr: true, + wantQueryErr: false, + }, + { + name: "valid exec empty query request", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + execRequest: types.NewRequest(). + SetMetadataKeyValue("method", "exec"). + SetData([]byte(createPostTable)), + queryRequest: types.NewRequest(). + SetMetadataKeyValue("method", "query"). + SetData([]byte("")), + wantExecResponse: types.NewResponse(). + SetMetadataKeyValue("result", "ok"), + wantQueryResponse: nil, + wantExecErr: false, + wantQueryErr: true, + }, + { + name: "valid exec bad query request", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + execRequest: types.NewRequest(). + SetMetadataKeyValue("method", "exec"). + SetData([]byte(createPostTable)), + queryRequest: types.NewRequest(). + SetMetadataKeyValue("method", "query"). + SetData([]byte("some bad query")), + wantExecResponse: types.NewResponse(). + SetMetadataKeyValue("result", "ok"), + wantQueryResponse: nil, + wantExecErr: false, + wantQueryErr: true, + }, + { + name: "valid exec valid query - no results", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + execRequest: types.NewRequest(). + SetMetadataKeyValue("method", "exec"). + SetData([]byte(createPostTable)), + queryRequest: types.NewRequest(). + SetMetadataKeyValue("method", "query"). + SetData([]byte("SELECT id,title,content FROM post where id=100")), + wantExecResponse: types.NewResponse(). + SetMetadataKeyValue("result", "ok"), + wantQueryResponse: types.NewResponse(). + SetMetadataKeyValue("result", "ok"), + wantExecErr: false, + wantQueryErr: false, + }, + { + name: "valid exec query request", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + execRequest: types.NewRequest(). + SetMetadataKeyValue("method", "exec"). + SetData([]byte(createPostTable)), + queryRequest: types.NewRequest(). + SetMetadataKeyValue("method", "query"). + SetData([]byte(selectPostTable)), + wantExecResponse: types.NewResponse(). + SetMetadataKeyValue("result", "ok"), + wantQueryResponse: types.NewResponse(). + SetMetadataKeyValue("result", "ok"). + SetData(allPosts.marshal()), + wantExecErr: false, + wantQueryErr: false, + }, + { + name: "empty transaction request", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + execRequest: types.NewRequest(). + SetMetadataKeyValue("method", "transaction"), + queryRequest: nil, + wantExecResponse: nil, + wantQueryResponse: nil, + wantExecErr: true, + wantQueryErr: false, + }, + { + name: "invalid transaction request", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + execRequest: types.NewRequest(). + SetMetadataKeyValue("method", "transaction"). + SetData([]byte("bad statement")), + queryRequest: nil, + wantExecResponse: nil, + wantQueryResponse: nil, + wantExecErr: true, + wantQueryErr: false, + }, + { + name: "valid transaction empty query request", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + execRequest: types.NewRequest(). + SetMetadataKeyValue("method", "transaction"). + SetData([]byte(createPostTable)), + queryRequest: types.NewRequest(). + SetMetadataKeyValue("method", "query"), + + wantExecResponse: types.NewResponse(). + SetMetadataKeyValue("result", "ok"), + wantQueryResponse: nil, + wantExecErr: false, + wantQueryErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := New() + err := c.Init(ctx, tt.cfg) + require.NoError(t, err) + gotSetResponse, err := c.Do(ctx, tt.execRequest) + if tt.wantExecErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.NotNil(t, gotSetResponse) + require.EqualValues(t, tt.wantExecResponse, gotSetResponse) + gotGetResponse, err := c.Do(ctx, tt.queryRequest) + if tt.wantQueryErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.NotNil(t, gotGetResponse) + + if tt.wantQueryResponse != nil { + wantPosts := unmarshal(tt.wantQueryResponse.Data) + var gotPosts *posts + if gotGetResponse != nil { + gotPosts = unmarshal(gotGetResponse.Data) + } + require.EqualValues(t, wantPosts, gotPosts) + } else { + require.EqualValues(t, tt.wantQueryResponse, gotGetResponse) + } + + }) + } +} + +func TestClient_Do(t *testing.T) { + tests := []struct { + name string + cfg config.Spec + request *types.Request + wantErr bool + }{ + { + name: "valid request", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + request: types.NewRequest(). + SetMetadataKeyValue("method", "transaction"). + SetMetadataKeyValue("isolation_level", "read_uncommitted"). + SetData([]byte(createPostTable)), + wantErr: false, + }, + { + name: "valid request - 2", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + request: types.NewRequest(). + SetMetadataKeyValue("method", "transaction"). + SetMetadataKeyValue("isolation_level", "read_committed"). + SetData([]byte(createPostTable)), + wantErr: false, + }, + { + name: "valid request - 3", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + request: types.NewRequest(). + SetMetadataKeyValue("method", "transaction"). + SetMetadataKeyValue("isolation_level", "repeatable_read"). + SetData([]byte(createPostTable)), + wantErr: false, + }, + { + name: "valid request - 3", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + request: types.NewRequest(). + SetMetadataKeyValue("method", "transaction"). + SetMetadataKeyValue("isolation_level", "serializable"). + SetData([]byte(createPostTable)), + wantErr: false, + }, + { + name: "invalid request - bad method", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + request: types.NewRequest(). + SetMetadataKeyValue("method", "bad-method"), + wantErr: true, + }, + { + name: "invalid request - bad isolation level", + cfg: config.Spec{ + Name: "percona-target", + Kind: "percona.target", + Properties: map[string]string{ + "connection": "root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local", + "max_idle_connections": "", + "max_open_connections": "", + "connection_max_lifetime_seconds": "", + }, + }, + request: types.NewRequest(). + SetMetadataKeyValue("method", "transaction"). + SetMetadataKeyValue("isolation_level", "bad_level"). + SetData([]byte(createPostTable)), + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := New() + err := c.Init(ctx, tt.cfg) + require.NoError(t, err) + _, err = c.Do(ctx, tt.request) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + + }) + } +} diff --git a/targets/stores/percona/connector.go b/targets/stores/percona/connector.go new file mode 100644 index 00000000..7513f1db --- /dev/null +++ b/targets/stores/percona/connector.go @@ -0,0 +1,69 @@ +package percona + +import ( + "github.com/kubemq-hub/builder/connector/common" + "math" +) + +func Connector() *common.Connector { + return common.NewConnector(). + SetKind("stores.percona"). + SetDescription("Percona Target"). + AddProperty( + common.NewProperty(). + SetKind("string"). + SetName("connection"). + SetDescription("Set Percona connection string"). + SetMust(true). + SetDefault(""), + ). + AddProperty( + common.NewProperty(). + SetKind("int"). + SetName("max_idle_connections"). + SetDescription("Set Percona max idle connections"). + SetMust(false). + SetDefault("10"). + SetMin(1). + SetMax(math.MaxInt32), + ). + AddProperty( + common.NewProperty(). + SetKind("int"). + SetName("max_open_connections"). + SetDescription("Set Percona max open connections"). + SetMust(false). + SetDefault("100"). + SetMin(1). + SetMax(math.MaxInt32), + ). + AddProperty( + common.NewProperty(). + SetKind("int"). + SetName("connection_max_lifetime_seconds"). + SetDescription("Set Percona connection max lifetime seconds"). + SetMust(false). + SetDefault("3600"). + SetMin(1). + SetMax(math.MaxInt32), + ). + AddMetadata( + common.NewMetadata(). + SetName("method"). + SetKind("string"). + SetDescription("Set Percona execution method"). + SetOptions([]string{"query", "exec", "transaction"}). + SetDefault("query"). + SetMust(true), + ). + AddMetadata( + common.NewMetadata(). + SetName("isolation_level"). + SetKind("string"). + SetDescription("Set Percona isolation level"). + SetOptions([]string{"Default", "ReadUncommitted", "ReadCommitted", "RepeatableRead", "Serializable"}). + SetDefault("Default"). + SetMust(false), + ) + +} diff --git a/targets/stores/percona/deploy.yaml b/targets/stores/percona/deploy.yaml new file mode 100644 index 00000000..435e2e34 --- /dev/null +++ b/targets/stores/percona/deploy.yaml @@ -0,0 +1 @@ +docker run -d --publish 3306:3306 --name percona -e MYSQL_ROOT_PASSWORD=root percona/percona-server:8.0 \ No newline at end of file diff --git a/targets/stores/percona/metadata.go b/targets/stores/percona/metadata.go new file mode 100644 index 00000000..6bcb3b2b --- /dev/null +++ b/targets/stores/percona/metadata.go @@ -0,0 +1,56 @@ +package percona + +import ( + "database/sql" + "fmt" + "github.com/kubemq-hub/kubemq-targets/types" +) + +var methodsMap = map[string]string{ + "query": "query", + "exec": "exec", + "transaction": "transaction", +} + +var isolationLevelsMap = map[string]string{ + "read_uncommitted": "ReadUncommitted", + "read_committed": "ReadCommitted", + "repeatable_read": "RepeatableRead", + "serializable": "Serializable", + "": "Default", +} + +type metadata struct { + method string + isolationLevel sql.IsolationLevel +} + +func parseMetadata(meta types.Metadata) (metadata, error) { + m := metadata{} + var err error + m.method, err = meta.ParseStringMap("method", methodsMap) + if err != nil { + return metadata{}, fmt.Errorf("error parsing method, %w", err) + } + isolationLevel, err := meta.ParseStringMap("isolation_level", isolationLevelsMap) + if err != nil { + return metadata{}, fmt.Errorf("error parsing isolation_level, %w", err) + } + m.isolationLevel = convertToSqlIsolationLevel(isolationLevel) + return m, nil +} + +func convertToSqlIsolationLevel(value string) sql.IsolationLevel { + switch value { + case "ReadUncommitted": + return sql.LevelReadCommitted + case "ReadCommitted": + return sql.LevelReadCommitted + case "RepeatableRead": + return sql.LevelRepeatableRead + case "Serializable": + return sql.LevelSerializable + default: + return sql.LevelDefault + } +} diff --git a/targets/stores/percona/options.go b/targets/stores/percona/options.go new file mode 100644 index 00000000..4734a86a --- /dev/null +++ b/targets/stores/percona/options.go @@ -0,0 +1,46 @@ +package percona + +import ( + "fmt" + "github.com/kubemq-hub/kubemq-targets/config" + "math" +) + +const ( + defaultMaxIdleConnections = 10 + defaultMaxOpenConnections = 100 + defaultConnectionMaxLifetimeSeconds = 3600 +) + +type options struct { + connection string + // maxIdleConnections sets the maximum number of connections in the idle connection pool + maxIdleConnections int + //maxOpenConnections sets the maximum number of open connections to the database. + maxOpenConnections int + // connectionMaxLifetimeSeconds sets the maximum amount of time a connection may be reused. + connectionMaxLifetimeSeconds int +} + +func parseOptions(cfg config.Spec) (options, error) { + o := options{} + var err error + o.connection, err = cfg.Properties.MustParseString("connection") + if err != nil { + return options{}, fmt.Errorf("error parsing connection string, %w", err) + } + + o.maxIdleConnections, err = cfg.Properties.ParseIntWithRange("max_idle_connections", defaultMaxIdleConnections, 1, math.MaxInt32) + if err != nil { + return options{}, fmt.Errorf("error parsing max idle connections value, %w", err) + } + o.maxOpenConnections, err = cfg.Properties.ParseIntWithRange("max_open_connections", defaultMaxOpenConnections, 1, math.MaxInt32) + if err != nil { + return options{}, fmt.Errorf("error parsing max open connections value, %w", err) + } + o.connectionMaxLifetimeSeconds, err = cfg.Properties.ParseIntWithRange("connection_max_lifetime_seconds", defaultConnectionMaxLifetimeSeconds, 1, math.MaxInt32) + if err != nil { + return options{}, fmt.Errorf("error parsing connection max lifetime seconds value, %w", err) + } + return o, nil +} diff --git a/targets/target.go b/targets/target.go index 5bcc5400..e257f46a 100644 --- a/targets/target.go +++ b/targets/target.go @@ -31,6 +31,7 @@ import ( "github.com/kubemq-hub/kubemq-targets/targets/messaging/nats" "github.com/kubemq-hub/kubemq-targets/targets/stores/cockroachdb" "github.com/kubemq-hub/kubemq-targets/targets/stores/elastic" + "github.com/kubemq-hub/kubemq-targets/targets/stores/percona" "github.com/kubemq-hub/kubemq-targets/config" awsmariadb "github.com/kubemq-hub/kubemq-targets/targets/aws/rds/mariadb" @@ -379,6 +380,12 @@ func Init(ctx context.Context, cfg config.Spec) (Target, error) { return nil, err } return target, nil + case "stores.percona": + target := percona.New() + if err := target.Init(ctx, cfg); err != nil { + return nil, err + } + return target, nil case "serverless.openfaas": target := openfaas.New() if err := target.Init(ctx, cfg); err != nil { @@ -440,7 +447,6 @@ func Init(ctx context.Context, cfg config.Spec) (Target, error) { } return target, nil - default: return nil, fmt.Errorf("invalid kind %s for target %s", cfg.Kind, cfg.Name) } @@ -462,6 +468,7 @@ func Connectors() common.Connectors { cassandra.Connector(), couchbase.Connector(), cockroachdb.Connector(), + percona.Connector(), // http http.Connector(), @@ -531,7 +538,5 @@ func Connectors() common.Connectors { blob.Connector(), servicebus.Connector(), eventhubs.Connector(), - - } }