Skip to content

Commit

Permalink
Merge pull request #66 from eitam-ring/master
Browse files Browse the repository at this point in the history
Added Percona
  • Loading branch information
kubemq authored Dec 10, 2020
2 parents 520be86 + 97ea279 commit e800502
Show file tree
Hide file tree
Showing 12 changed files with 1,221 additions and 7 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ A list of supported targets is below.
| | [Elastic Search](https://www.elastic.co/) |stores.elastic-search | [Usage](targets/stores/elastic) | [Example](examples/stores/elastic) |
| | [Cassandra](https://cassandra.apache.org/) |stores.cassandra | [Usage](targets/stores/cassandra) | [Example](examples/stores/cassandra) |
| | [Couchbase](https://www.couchbase.com/) |stores.couchbase | [Usage](targets/stores/couchbase) | [Example](examples/stores/couchbase) |
| | [Percona](https://www.percona.com/) |stores.percona | [Usage](targets/stores/percona) | [Example](examples/stores/percona) |
| Messaging | | | | |
| | [Kafka](https://kafka.apache.org/) |messaging.kafka | [Usage](targets/messaging/kafka) | [Example](examples/messaging/kafka) |
| | [Nats](https://nats.io/) |messaging.nats | [Usage](targets/messaging/nats) | [Example](examples/messaging/nats) |
Expand Down
8 changes: 4 additions & 4 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
bindings:
- name: cockroachdb
- name: percona
source:
kind: kubemq.query
properties:
address: localhost:50000
channel: query.cockroachdb
channel: query.percona
target:
kind: stores.cockroachdb
kind: stores.percona
properties:
connection: postgres://root:postgres@localhost:26257/postgres?sslmode=disable
connection: root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local
properties: {}
12 changes: 12 additions & 0 deletions examples/stores/percona/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
bindings:
- name: percona
source:
kind: kubemq.query
properties:
address: localhost:50000
channel: query.percona
target:
kind: stores.percona
properties:
connection: root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local
properties: {}
68 changes: 68 additions & 0 deletions examples/stores/percona/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package main

import (
"context"
"fmt"
"github.com/kubemq-hub/kubemq-targets/types"
"github.com/kubemq-io/kubemq-go"
"github.com/nats-io/nuid"
"log"
"time"
)

var (
transactionString = `DROP TABLE IF EXISTS post;
CREATE TABLE post (
ID bigint,
TITLE varchar(40),
CONTENT varchar(255),
BIGNUMBER bigint,
BOOLVALUE boolean,
CONSTRAINT pk_post PRIMARY KEY(ID)
);
INSERT INTO post(ID,TITLE,CONTENT,BIGNUMBER,BOOLVALUE) VALUES
(0,NULL,'Content One',1231241241231231123,true),
(1,'Title Two','Content Two',123125241231231123,false);`
queryString = `SELECT id,title,content,bignumber,boolvalue FROM post;`
)

func main() {
client, err := kubemq.NewClient(context.Background(),
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId(nuid.Next()),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)
}

transactionRequest := types.NewRequest().
SetMetadataKeyValue("method", "transaction").
SetData([]byte(transactionString))
queryTransactionResponse, err := client.SetQuery(transactionRequest.ToQuery()).
SetChannel("query.percona").
SetTimeout(10 * time.Second).Send(context.Background())
if err != nil {
log.Fatal(err)
}
transactionResponse, err := types.ParseResponse(queryTransactionResponse.Body)
if err != nil {
log.Fatal(err)
}
log.Println(fmt.Sprintf("transaction request result: %s ", transactionResponse.Metadata.String()))

queryRequest := types.NewRequest().
SetMetadataKeyValue("method", "query").
SetData([]byte(queryString))

queryResponse, err := client.SetQuery(queryRequest.ToQuery()).
SetChannel("query.percona").
SetTimeout(10 * time.Second).Send(context.Background())
if err != nil {
log.Fatal(err)
}
response, err := types.ParseResponse(queryResponse.Body)
if err != nil {
log.Fatal(err)
}
log.Println(fmt.Sprintf("query request results: %s ", string(response.Data)))
}
153 changes: 153 additions & 0 deletions targets/stores/percona/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Kubemq percona Target Connector

Kubemq percona target connector allows services using kubemq server to access percona database services.

## Prerequisites
The following are required to run the percona target connector:

- kubemq cluster
- percona server
- kubemq-targets deployment

## Configuration

percona target connector configuration properties:

| Properties Key | Required | Description | Example |
|:--------------------------------|:---------|:--------------------------------------------|:-----------------------------------------------------------------------|
| connection | yes | percona connection string address | "root:percona@(localhost:3306)/store?charset=utf8&parseTime=True&loc=Local" |
| max_idle_connections | no | set max idle connections | "10" |
| max_open_connections | no | set max open connections | "100" |
| connection_max_lifetime_seconds | no | set max lifetime for connections in seconds | "3600" |


Example:

```yaml
bindings:
- name: percona
source:
kind: kubemq.query
properties:
address: localhost:50000
channel: query.percona
target:
kind: stores.percona
properties:
connection: root:root@(localhost:3306)/percona?charset=utf8&parseTime=True&loc=Local
properties: {}

```

## Usage

### Query Request

Query request metadata setting:

| Metadata Key | Required | Description | Possible values |
|:-------------|:---------|:-----------------|:----------------|
| method | yes | set type of request | "query" |

Query request data setting:

| Data Key | Required | Description | Possible values |
|:---------|:---------|:-------------|:-------------------|
| data | yes | query string | base64 bytes array |

Example:

Query string: `SELECT id,title,content,bignumber,boolvalue FROM post;`

```json
{
"metadata": {
"method": "query"
},
"data": "U0VMRUNUIGlkLHRpdGxlLGNvbnRlbnQsYmlnbnVtYmVyLGJvb2x2YWx1ZSBGUk9NIHBvc3Q7"
}
```

### Exec Request

Exec request metadata setting:

| Metadata Key | Required | Description | Possible values |
|:----------------|:---------|:---------------------------------------|:-------------------|
| method | yes | set type of request | "exec" |
| isolation_level | no | set isolation level for exec operation | "" |
| | | | "read_uncommitted" |
| | | | "read_committed" |
| | | | "repeatable_read" |
| | | | "serializable" |
| | | | |


Exec request data setting:

| Data Key | Required | Description | Possible values |
|:---------|:---------|:------------------------------|:--------------------|
| data | yes | exec string | base64 bytes array |

Example:

Exec string:
```sql
INSERT INTO post(ID,TITLE,CONTENT,BIGNUMBER,BOOLVALUE) VALUES
(0,NULL,'Content One',1231241241231231123,true),
(1,'Title Two','Content Two',123125241231231123,false);
```

```json
{
"metadata": {
"method": "exec",
"isolation_level": "read_uncommitted"
},
"data": "SU5TRVJUIElOVE8gcG9zdChJRCxUSVRMRSxDT05URU5ULEJJR05VTUJFUixCT09MVkFMVUUpIFZBTFVFUwoJICAgICAgICAgICAgICAgICAgICAgICAoMCxOVUxMLCdDb250ZW50IE9uZScsMTIzMTI0MTI0MTIzMTIzMTEyMyx0cnVlKSwKCSAgICAgICAgICAgICAgICAgICAgICAgKDEsJ1RpdGxlIFR3bycsJ0NvbnRlbnQgVHdvJywxMjMxMjUyNDEyMzEyMzExMjMsZmFsc2UpOw=="
}
```

### Transaction Request

Transaction request metadata setting:

| Metadata Key | Required | Description | Possible values |
|:----------------|:---------|:---------------------------------------|:-------------------|
| method | yes | set type of request | "transaction" |
| isolation_level | no | set isolation level for exec operation | "" |
| | | | "read_uncommitted" |
| | | | "read_committed" |
| | | | "repeatable_read" |
| | | | "serializable" |


Transaction request data setting:

| Data Key | Required | Description | Possible values |
|:---------|:---------|:------------------------------|:--------------------|
| data | yes | string string | base64 bytes array |

Example:

Transaction string:
```sql
DROP TABLE IF EXISTS post;
CREATE TABLE post (
ID bigint,
TITLE varchar(40),
CONTENT varchar(255),
BIGNUMBER bigint,
BOOLVALUE boolean,
CONSTRAINT pk_post PRIMARY KEY(ID)
);
```
```json
{
"metadata": {
"key": "your-percona-key",
"method": "delete"
},
"data": "RFJPUCBUQUJMRSBJRiBFWElTVFMgcG9zdDsKCSAgICAgICBDUkVBVEUgVEFCTEUgcG9zdCAoCgkgICAgICAgICBJRCBiaWdpbnQsCgkgICAgICAgICBUSVRMRSB2YXJjaGFyKDQwKSwKCSAgICAgICAgIENPTlRFTlQgdmFyY2hhcigyNTUpLAoJCQkgQklHTlVNQkVSIGJpZ2ludCwKCQkJIEJPT0xWQUxVRSBib29sZWFuLAoJICAgICAgICAgQ09OU1RSQUlOVCBwa19wb3N0IFBSSU1BUlkgS0VZKElEKQoJICAgICAgICk7"
}
```
Loading

0 comments on commit e800502

Please sign in to comment.