Skip to content

Commit

Permalink
Finished aws target - redshift - made separation between client and s…
Browse files Browse the repository at this point in the history
…ervice
  • Loading branch information
eitam-ring committed Aug 27, 2020
1 parent 69b23aa commit fd35019
Show file tree
Hide file tree
Showing 15 changed files with 121 additions and 17 deletions.
File renamed without changes.
File renamed without changes.
23 changes: 23 additions & 0 deletions examples/aws/redshift/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
bindings:
- name: kubemq-query-aws-redshift-service
source:
kind: source.query
name: kubemq-query
properties:
host: "localhost"
port: "50000"
client_id: "kubemq-query-aws-redshift-connector"
auth_token: ""
channel: "query.aws.redshift.service"
group: ""
concurrency: "1"
auto_reconnect: "true"
reconnect_interval_seconds: "1"
max_reconnects: "0"
target:
kind: target.aws.redshift.service
name: target-aws-redshift-service
properties:
aws_key: "id"
aws_secret_key: 'json'
region: "region"
68 changes: 68 additions & 0 deletions examples/aws/redshift/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package main

import (
"context"
"encoding/json"
"fmt"
"github.com/kubemq-hub/kubemq-targets/types"
"github.com/kubemq-io/kubemq-go"
"github.com/nats-io/nuid"
"io/ioutil"
"log"
"time"
)

func main() {
dat, err := ioutil.ReadFile("./credentials/aws/rds/redshift/resourceARN.txt")
if err != nil {
log.Fatal(err)
}
resourceARN := fmt.Sprintf("%s", dat)
client, err := kubemq.NewClient(context.Background(),
kubemq.WithAddress("localhost", 50000),
kubemq.WithClientId(nuid.Next()),
kubemq.WithTransportType(kubemq.TransportTypeGRPC))
if err != nil {
log.Fatal(err)
}

// Create tag
tags := make(map[string]string)
tags["test2-key"] = "test2-value"
b, err := json.Marshal(tags)
if err != nil {
log.Fatal(err)
}
createRequest := types.NewRequest().
SetMetadataKeyValue("method", "create_tags").
SetMetadataKeyValue("resource_arn", resourceARN).
SetData(b)

getCreate, err := client.SetQuery(createRequest.ToQuery()).
SetChannel("query.aws.redshift.service").
SetTimeout(10 * time.Second).Send(context.Background())
if err != nil {
log.Fatal(err)
}
createResponse, err := types.ParseResponse(getCreate.Body)
if err != nil {
log.Fatal(err)
}
log.Println(fmt.Sprintf("create tag executed, error: %v", createResponse.IsError))

// listRequest
listRequest := types.NewRequest().
SetMetadataKeyValue("method", "list_tags")
queryListResponse, err := client.SetQuery(listRequest.ToQuery()).
SetChannel("query.aws.redshift.service").
SetTimeout(10 * time.Second).Send(context.Background())
if err != nil {
log.Fatal(err)
}
listResponse, err := types.ParseResponse(queryListResponse.Body)
if err != nil {
log.Fatal(err)
}
log.Println(fmt.Sprintf("list tags executed, response: %s", listResponse.Data))

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Kubemq redshift Target Connector
# Kubemq redshift Target Connector (RDS Connector)

Kubemq redshift target connector allows services using kubemq server to access redshift database services.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package client
package redshift

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package client
package redshift

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package client
package redshift

import (
"database/sql"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package client
package redshift

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Kubemq redshift target Connector

# Kubemq redshift target Connector (Service admin)
Kubemq aws-redshift target connector allows services using kubemq server to access aws redshift service.

## Prerequisites
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package service
package redshift

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package service
package redshift

import (
"context"
Expand All @@ -25,29 +25,29 @@ type testStructure struct {

func getTestStructure() (*testStructure, error) {
t := &testStructure{}
dat, err := ioutil.ReadFile("./../../../../../credentials/aws/awsKey.txt")
dat, err := ioutil.ReadFile("./../../../credentials/aws/awsKey.txt")
if err != nil {
return nil, err
}
t.awsKey = string(dat)
dat, err = ioutil.ReadFile("./../../../../../credentials/aws/awsSecretKey.txt")
dat, err = ioutil.ReadFile("./../../../credentials/aws/awsSecretKey.txt")
if err != nil {
return nil, err
}
t.awsSecretKey = string(dat)
dat, err = ioutil.ReadFile("./../../../../../credentials/aws/region.txt")
dat, err = ioutil.ReadFile("./../../../credentials/aws/region.txt")
if err != nil {
return nil, err
}
t.region = fmt.Sprintf("%s", dat)
t.token = ""

dat, err = ioutil.ReadFile("./../../../../../credentials/aws/rds/redshift/resourceName.txt")
dat, err = ioutil.ReadFile("./../../../credentials/aws/rds/redshift/resourceName.txt")
if err != nil {
return nil, err
}
t.resourceName = fmt.Sprintf("%s", dat)
dat, err = ioutil.ReadFile("./../../../../../credentials/aws/rds/redshift/resourceARN.txt")
dat, err = ioutil.ReadFile("./../../../credentials/aws/rds/redshift/resourceARN.txt")
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package service
package redshift

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package service
package redshift

import (
"fmt"
Expand Down
16 changes: 15 additions & 1 deletion targets/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import (
"github.com/kubemq-hub/kubemq-targets/targets/aws/sns"
"github.com/kubemq-hub/kubemq-targets/targets/gcp/firebase"
"github.com/kubemq-hub/kubemq-targets/targets/stores/elastic"

"github.com/kubemq-hub/kubemq-targets/config"
awsmariadb "github.com/kubemq-hub/kubemq-targets/targets/aws/rds/mariadb"
awsmssql "github.com/kubemq-hub/kubemq-targets/targets/aws/rds/mssql"
awsmysql "github.com/kubemq-hub/kubemq-targets/targets/aws/rds/mysql"
awspostgres "github.com/kubemq-hub/kubemq-targets/targets/aws/rds/postgres"
"github.com/kubemq-hub/kubemq-targets/targets/aws/rds/redshift"
redshiftsvc "github.com/kubemq-hub/kubemq-targets/targets/aws/redshift"
"github.com/kubemq-hub/kubemq-targets/targets/aws/sqs"
"github.com/kubemq-hub/kubemq-targets/targets/cache/memcached"
"github.com/kubemq-hub/kubemq-targets/targets/cache/redis"
Expand Down Expand Up @@ -150,6 +152,18 @@ func Init(ctx context.Context, cfg config.Spec) (Target, error) {
return nil, err
}
return target, nil
case "target.aws.rds.redshift":
target := redshift.New()
if err := target.Init(ctx, cfg); err != nil {
return nil, err
}
return target, nil
case "target.aws.redshift.service":
target := redshiftsvc.New()
if err := target.Init(ctx, cfg); err != nil {
return nil, err
}
return target, nil
case "target.cache.redis":
target := redis.New()
if err := target.Init(ctx, cfg); err != nil {
Expand Down

0 comments on commit fd35019

Please sign in to comment.