Skip to content

Commit

Permalink
Merge pull request #33 from eitam-ring/master
Browse files Browse the repository at this point in the history
target - aws - finished amazonmq
  • Loading branch information
kubemq authored Sep 1, 2020
2 parents 2fc5aa9 + f6cf145 commit f5b427c
Show file tree
Hide file tree
Showing 13 changed files with 638 additions and 151 deletions.
23 changes: 23 additions & 0 deletions examples/aws/amazonmq/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
bindings:
- name: kubemq-query-amazonmq
source:
kind: source.query
name: kubemq-query
properties:
host: "localhost"
port: "50000"
client_id: "kubemq-query-amazonmq-connector"
auth_token: ""
channel: "query.amazonmq"
group: ""
concurrency: "1"
auto_reconnect: "true"
reconnect_interval_seconds: "1"
max_reconnects: "0"
target:
kind: target.aws.amazonmq
name: target-aws-amazonmq
properties:
host: "localhost:61613"
username: "admin"
password: "admin"
37 changes: 37 additions & 0 deletions examples/aws/amazonmq/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"context"
"fmt"
"github.com/kubemq-hub/kubemq-targets/types"
"github.com/kubemq-io/kubemq-go"
"github.com/nats-io/nuid"
"log"
"time"
)

func main() {
client, err := kubemq.NewClient(context.Background(),
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId(nuid.Next()),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)
}
publishRequest := types.NewRequest().
SetMetadataKeyValue("destination", "some-destination").
SetData([]byte("some-data"))
queryPublishResponse, err := client.SetQuery(publishRequest.ToQuery()).
SetChannel("query.amazonmq").
SetTimeout(10 * time.Second).Send(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("%+v \n", queryPublishResponse)
publishResponse, err := types.ParseResponse(queryPublishResponse.Body)
if err != nil {
log.Fatal(err)
}
log.Println(fmt.Sprintf("publish message, response: %+v", publishResponse))

}
24 changes: 24 additions & 0 deletions examples/aws/sqs/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
bindings:
- name: kubemq-query-aws-sqs
source:
kind: source.query
name: kubemq-query
properties:
host: "localhost"
port: "50000"
client_id: "kubemq-query-aws-sqs-connector"
auth_token: ""
channel: "query.aws.sqs"
group: ""
concurrency: "1"
auto_reconnect: "true"
reconnect_interval_seconds: "1"
max_reconnects: "0"
target:
kind: target.aws.sqs
name: target-aws-sqs
properties:
aws_key: "id"
aws_secret_key: 'json'
region: "instance"
retries: "1"
50 changes: 50 additions & 0 deletions examples/aws/sqs/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package main

import (
"context"
"encoding/json"
"fmt"
"github.com/kubemq-hub/kubemq-targets/types"
"github.com/kubemq-io/kubemq-go"
"github.com/nats-io/nuid"
"io/ioutil"
"log"
"time"
)

func main() {
dat, err := ioutil.ReadFile("./credentials/aws/sqs/queue.txt")
if err != nil {
log.Fatal(err)
}
queue := string(dat)
validBody, err := json.Marshal("valid body2")
if err != nil {
log.Fatal(err)
}
client, err := kubemq.NewClient(context.Background(),
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId(nuid.Next()),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)
}
// sendRequest
sendRequest := types.NewRequest().
SetMetadataKeyValue("tags", `{"tag-1":"test","tag-2":"test2"}`).
SetMetadataKeyValue("delay", "0").
SetMetadataKeyValue("queue", queue).
SetData(validBody)

querySendResponse, err := client.SetQuery(sendRequest.ToQuery()).
SetChannel("query.aws.sqs").
SetTimeout(10 * time.Second).Send(context.Background())
if err != nil {
log.Fatal(err)
}
sendResponse, err := types.ParseResponse(querySendResponse.Body)
if err != nil {
log.Fatal(err)
}
log.Println(fmt.Sprintf("send executed, response: %s", sendResponse.Data))
}
82 changes: 81 additions & 1 deletion targets/aws/amazonmq/README.md
Original file line number Diff line number Diff line change
@@ -1 +1,81 @@
//TODO
# Kubemq amazonMQ Target Connector

Kubemq AmazonMQ target connector allows services using kubemq server to access AmazonMQ messaging services.

## Prerequisites
The following are required to run the AmazonMQ target connector:

- kubemq cluster
- AmazonMQ server - with access
- kubemq-targets deployment


- Please note the connector uses connection with stomp+ssl, when finishing handling messages need to call Close().
## Configuration

AmazonMQ target connector configuration properties:

| Properties Key | Required | Description | Example |
|:--------------------------------|:---------|:--------------------------------------------|:-----------------------------------------------------------------------|
| host | yes | AmazonMQ connection host (stomp+ssl endpoint)| "localhost:1883" |
| username | no | set AmazonMQ username | "username" |
| password | no | set AmazonMQ password | "password" |


Example:

```yaml
bindings:
- name: kubemq-query-amazonmq
source:
kind: source.query
name: kubemq-query
properties:
host: "localhost"
port: "50000"
client_id: "kubemq-query-amazonmq-connector"
auth_token: ""
channel: "query.amazonmq"
group: ""
concurrency: "1"
auto_reconnect: "true"
reconnect_interval_seconds: "1"
max_reconnects: "0"
target:
kind: target.aws.amazonmq
name: target-aws-amazonmq
properties:
host: "localhost:61613"
username: "admin"
password: "admin"
```
## Usage
### Request
Request metadata setting:
| Metadata Key | Required | Description | Possible values |
|:---------------|:---------|:--------------------|:----------------|
| destination | yes | set destination name | "destination" |
Query request data setting:
| Data Key | Required | Description | Possible values |
|:---------|:---------|:-------------|:-------------------|
| data | yes | data to publish | base64 bytes array |
Example:
```json
{
"metadata": {
"destination": "destination"
},
"data": "U0VMRUNUIGlkLHRpdGxlLGNvbnRlbnQgRlJPTSBwb3N0Ow=="
}
```
58 changes: 58 additions & 0 deletions targets/aws/amazonmq/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package amazonmq

import (
"context"
"crypto/tls"
"github.com/go-stomp/stomp"
"github.com/kubemq-hub/kubemq-targets/config"
"github.com/kubemq-hub/kubemq-targets/types"
)

type Client struct {
name string
opts options
conn *stomp.Conn
}

func New() *Client {
return &Client{}
}
func (c *Client) Name() string {
return c.name
}
func (c *Client) Init(ctx context.Context, cfg config.Spec) error {
c.name = cfg.Name
var err error
c.opts, err = parseOptions(cfg)
if err != nil {
return err
}

netConn, err := tls.Dial("tcp", c.opts.host, &tls.Config{})
if err != nil {
return err
}

c.conn, err = stomp.Connect(netConn, stomp.ConnOpt.Login(c.opts.username, c.opts.password))
if err != nil {
return err
}

return nil
}

func (c *Client) Do(ctx context.Context, req *types.Request) (*types.Response, error) {
meta, err := parseMetadata(req.Metadata)
if err != nil {
return nil, err
}
err = c.conn.Send(meta.destination, "text/plain", req.Data)
if err != nil {
return nil, err
}
return types.NewResponse().SetMetadataKeyValue("result", "ok"), nil
}

func (c *Client) Close() error {
return c.conn.Disconnect()
}
Loading

0 comments on commit f5b427c

Please sign in to comment.