Skip to content

Commit

Permalink
Merge pull request #62 from eitam-ring/master
Browse files Browse the repository at this point in the history
messaging - target - added nats
  • Loading branch information
kubemq authored Dec 4, 2020
2 parents 76ef6ff + f9d4051 commit 903869a
Show file tree
Hide file tree
Showing 14 changed files with 603 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ A list of supported targets is below.
| | [Couchbase](https://www.couchbase.com/) |stores.couchbase | [Usage](targets/stores/couchbase) | [Example](examples/stores/couchbase) |
| Messaging | | | | |
| | [Kafka](https://kafka.apache.org/) |messaging.kafka | [Usage](targets/messaging/kafka) | [Example](examples/messaging/kafka) |
| | [Nats](https://nats.io/) |messaging.nats | [Usage](targets/messaging/nats) | [Example](examples/messaging/nats) |
| | [RabbitMQ](https://www.rabbitmq.com/) |messaging.rabbitmq | [Usage](targets/messaging/rabbitmq) | [Example](examples/messaging/rabbitmq) |
| | [MQTT](http://mqtt.org/) |messaging.mqtt | [Usage](targets/messaging/mqtt) | [Example](examples/messaging/mqtt) |
| | [ActiveMQ](http://activemq.apache.org/) |messaging.activemq | [Usage](targets/messaging/activemq) | [Example](examples/messaging/activemq) |
Expand Down
20 changes: 20 additions & 0 deletions examples/messaging/nats/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
bindings:
- name: nats
source:
kind: kubemq.events
properties:
address: localhost:50000
channel: event.messaging.nats
target:
kind: messaging.nats
properties:
cert_file: |-
-----BEGIN CERTIFICATE-----
-----END CERTIFICATE-----
cert_key: |-
-----BEGIN PRIVATE KEY-----
jrwkrWynmfwXr1ctWeb7O4W9Ng==
-----END PRIVATE KEY-----
dynamic_mapping: "false"
url: nats://localhost:4222
properties: {}
37 changes: 37 additions & 0 deletions examples/messaging/nats/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"context"
"fmt"
"github.com/kubemq-hub/kubemq-targets/types"
"github.com/kubemq-io/kubemq-go"
"github.com/nats-io/nuid"
"log"
"time"
)

func main() {
client, err := kubemq.NewClient(context.Background(),
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId(nuid.Next()),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)
}
publishRequest := types.NewRequest().
SetMetadataKeyValue("destination", "some-destination").
SetData([]byte("some-data"))
queryPublishResponse, err := client.SetQuery(publishRequest.ToQuery()).
SetChannel("query.activemq").
SetTimeout(10 * time.Second).Send(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("%+v \n", queryPublishResponse)
publishResponse, err := types.ParseResponse(queryPublishResponse.Body)
if err != nil {
log.Fatal(err)
}
log.Println(fmt.Sprintf("publish message, response: %+v", publishResponse))

}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ require (
github.com/lib/pq v1.8.0
github.com/minio/minio-go/v7 v7.0.0-20200623213017-e5cd2d129325
github.com/mitchellh/mapstructure v1.3.3 // indirect
github.com/nats-io/nats.go v1.10.0
github.com/nats-io/nuid v1.0.1
github.com/olivere/elastic/v7 v7.0.20
github.com/prometheus/client_golang v1.7.1
Expand Down
2 changes: 1 addition & 1 deletion targets-manifest-hash.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
26b9e2455d250c41417733c1aff61baab0f4b97a0bdf7c40215b729d3e77a4c3
565eb6ca495589f6e3aff99ce3995a1270b9792147a3dbac9e181493b4e6ceda
2 changes: 1 addition & 1 deletion targets-manifest.json

Large diffs are not rendered by default.

72 changes: 72 additions & 0 deletions targets/messaging/nats/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Kubemq Nats Target Connector

Kubemq Nats target connector allows services using kubemq server to access Nats messaging services.

## Prerequisites
The following are required to run the Nats target connector:

- kubemq cluster
- Nats server
- kubemq-targets deployment

## Configuration

Nats target connector configuration properties:

| Properties Key | Required | Description | Example |
|:--------------------------------|:---------|:--------------------------------------------------------|:-----------------------------------------------------------------------|
| url | yes | nats connection host | "localhost:1883" |
| subject | yes | set subject name | any string |
| username | no | set nats username | "username" |
| password | no | set nats password | "password" |
| token | no | set nats token | "my_token" |
| tls | no | set if tls is needed | "false","true" |
| cert_file | no | tls certificate file in string format | "my_file" |
| cert_key | no | tls certificate key in string format | "my_key" |
| timeout | no | connection timeout in seconds | "130" |


Example:

```yaml
bindings:
- name: nats
source:
kind: kubemq.events
properties:
address: localhost:50000
channel: event.messaging.nats
target:
kind: messaging.nats
properties:
cert_file: |-
-----BEGIN CERTIFICATE-----
-----END CERTIFICATE-----
cert_key: |-
-----BEGIN PRIVATE KEY-----
-----END PRIVATE KEY-----
dynamic_mapping: "false"
url: nats://localhost:4222
properties: {}

```

## Usage

### Request


Query request data setting:

| Data Key | Required | Description | Possible values |
|:------------------|:---------|:------------------------------------------|:-------------------|
| data | yes | data to publish | base64 bytes array |

Example:


```json
{
"data": "U0VMRUNUIGlkLHRpdGxlLGNvbnRlbnQgRlJPTSBwb3N0Ow=="
}
```
100 changes: 100 additions & 0 deletions targets/messaging/nats/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package nats

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"github.com/kubemq-hub/builder/connector/common"
"github.com/kubemq-hub/kubemq-targets/config"
"github.com/kubemq-hub/kubemq-targets/types"
"github.com/nats-io/nats.go"
"time"
)

type Client struct {
name string
opts options
client *nats.Conn
}

func New() *Client {
return &Client{}
}

func (c *Client) Connector() *common.Connector {
return Connector()
}
func (c *Client) Init(ctx context.Context, cfg config.Spec) error {
c.name = cfg.Name
var err error
c.opts, err = parseOptions(cfg)
if err != nil {
return err
}
o := setOptions(c.opts.certFile, c.opts.certKey, c.opts.username, c.opts.password, c.opts.token, c.opts.tls, c.opts.timeout)
c.client, err = nats.Connect(c.opts.url, o)
if err != nil {
return err
}
return nil
}

func (c *Client) Do(ctx context.Context, req *types.Request) (*types.Response, error) {
meta, err := parseMetadata(req.Metadata)
if err != nil {
return nil, err
}
err = c.client.Publish(meta.subject, req.Data)
if err != nil {
return nil, err
}
return types.NewResponse().SetMetadataKeyValue("result", "ok"), nil
}


func (c *Client) Stop() error {
if c.client != nil {
c.client.Close()
}
return nil
}

func setOptions(sslcertificatefile string, sslcertificatekey string, username string, password string, token string, useTls bool, timeout int) nats.Option {
return func(o *nats.Options) error {
if useTls {
if sslcertificatefile != "" && sslcertificatekey != "" {
cert, err := tls.X509KeyPair([]byte(sslcertificatefile), []byte(sslcertificatekey))
if err != nil {
return fmt.Errorf("nats: error parsing client certificate: %v", err)
}
cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
if err != nil {
return fmt.Errorf("nats: error parsing client certificate: %v", err)
}
if o.TLSConfig == nil {
o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
}
o.TLSConfig.Certificates = []tls.Certificate{cert}
o.Secure = true
} else {
return errors.New("when using tls make sure to pass file and key")
}
}
if username != "" {
o.User = username
}
if password != "" {
o.Password = password
}
if token != "" {
o.Token = token
}
if timeout != 0 {
o.Timeout = time.Duration(timeout) * time.Second
}

return nil
}
}
Loading

0 comments on commit 903869a

Please sign in to comment.