From 36916ecde012102a0b2f42ae6f3f84e441f58379 Mon Sep 17 00:00:00 2001 From: eitam-ring Date: Sun, 20 Dec 2020 17:02:47 +0200 Subject: [PATCH 1/2] Wip target - hdfs --- examples/storage/hdfs/config.yaml | 24 + examples/storage/hdfs/examplefile.txt | 1 + examples/storage/hdfs/main.go | 54 ++ go.mod | 1 + targets/storage/hdfs/client.go | 161 ++++++ targets/storage/hdfs/client_test.go | 698 ++++++++++++++++++++++++++ targets/storage/hdfs/connector.go | 101 ++++ targets/storage/hdfs/metadata.go | 45 ++ targets/storage/hdfs/options.go | 26 + targets/storage/hdfs/stat.go | 38 ++ types/metadata.go | 13 + 11 files changed, 1162 insertions(+) create mode 100644 examples/storage/hdfs/config.yaml create mode 100644 examples/storage/hdfs/examplefile.txt create mode 100644 examples/storage/hdfs/main.go create mode 100644 targets/storage/hdfs/client.go create mode 100644 targets/storage/hdfs/client_test.go create mode 100644 targets/storage/hdfs/connector.go create mode 100644 targets/storage/hdfs/metadata.go create mode 100644 targets/storage/hdfs/options.go create mode 100644 targets/storage/hdfs/stat.go diff --git a/examples/storage/hdfs/config.yaml b/examples/storage/hdfs/config.yaml new file mode 100644 index 00000000..9c7f5e19 --- /dev/null +++ b/examples/storage/hdfs/config.yaml @@ -0,0 +1,24 @@ +bindings: + - name: kubemq-query-aws-s3 + source: + kind: kubemq.query + name: kubemq-query + properties: + address: "kubemq-cluster:50000" + client_id: "kubemq-query-aws-s3-connector" + auth_token: "" + channel: "query.aws.s3" + group: "" + auto_reconnect: "true" + reconnect_interval_seconds: "1" + max_reconnects: "0" + target: + kind: aws.s3 + name: aws-s3 + properties: + aws_key: "id" + aws_secret_key: 'json' + region: "region" + token: "" + downloader: "true" + uploader: "true" diff --git a/examples/storage/hdfs/examplefile.txt b/examples/storage/hdfs/examplefile.txt new file mode 100644 index 00000000..a3c28cfa --- /dev/null +++ b/examples/storage/hdfs/examplefile.txt @@ -0,0 +1 @@ +My example file to upload \ No newline at end of file diff --git a/examples/storage/hdfs/main.go b/examples/storage/hdfs/main.go new file mode 100644 index 00000000..6fb1c83b --- /dev/null +++ b/examples/storage/hdfs/main.go @@ -0,0 +1,54 @@ +package main + +import ( + "context" + "fmt" + "github.com/kubemq-hub/kubemq-targets/types" + "github.com/kubemq-io/kubemq-go" + "github.com/nats-io/nuid" + "log" + "time" +) + +func main() { + client, err := kubemq.NewClient(context.Background(), + kubemq.WithAddress("kubemq-cluster", 50000), + kubemq.WithClientId(nuid.Next()), + kubemq.WithTransportType(kubemq.TransportTypeGRPC)) + if err != nil { + log.Fatal(err) + } + // listRequest + listRequest := types.NewRequest(). + SetMetadataKeyValue("method", "list_buckets") + queryListResponse, err := client.SetQuery(listRequest.ToQuery()). + SetChannel("query.aws.s3"). + SetTimeout(10 * time.Second).Send(context.Background()) + if err != nil { + log.Fatal(err) + } + listResponse, err := types.ParseResponse(queryListResponse.Body) + if err != nil { + log.Fatal(err) + } + log.Println(fmt.Sprintf("list buckets executed, response: %s", listResponse.Data)) + + // Create Bucket + BucketName := "testmykubemqbucketname" + createRequest := types.NewRequest(). + SetMetadataKeyValue("method", "create_bucket"). + SetMetadataKeyValue("bucket_name", BucketName). + SetMetadataKeyValue("wait_for_completion", "true") + + getCreate, err := client.SetQuery(createRequest.ToQuery()). + SetChannel("query.aws.s3"). + SetTimeout(10 * time.Second).Send(context.Background()) + if err != nil { + log.Fatal(err) + } + createResponse, err := types.ParseResponse(getCreate.Body) + if err != nil { + log.Fatal(err) + } + log.Println(fmt.Sprintf("create bucket executed, error: %v", createResponse.IsError)) +} diff --git a/go.mod b/go.mod index 9a3f04be..3cbd4cf1 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/aws/aws-sdk-go v1.34.31 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/cockroachdb/cockroach-go v2.0.1+incompatible + github.com/colinmarc/hdfs/v2 v2.1.1 github.com/couchbase/gocb/v2 v2.1.6 github.com/denisenkom/go-mssqldb v0.0.0-20200910202707-1e08a3fab204 github.com/eclipse/paho.mqtt.golang v1.2.0 diff --git a/targets/storage/hdfs/client.go b/targets/storage/hdfs/client.go new file mode 100644 index 00000000..be6d6c92 --- /dev/null +++ b/targets/storage/hdfs/client.go @@ -0,0 +1,161 @@ +package s3 + +import ( + "context" + "errors" + hdfs "github.com/colinmarc/hdfs/v2" + "github.com/kubemq-hub/builder/connector/common" + "github.com/kubemq-hub/kubemq-targets/config" + "github.com/kubemq-hub/kubemq-targets/types" + "io/ioutil" +) + +type Client struct { + name string + opts options + client *hdfs.Client +} + +func New() *Client { + return &Client{} + +} +func (c *Client) Connector() *common.Connector { + return Connector() +} +func (c *Client) Init(ctx context.Context, cfg config.Spec) error { + c.name = cfg.Name + var err error + c.opts, err = parseOptions(cfg) + if err != nil { + return err + } + co := setClientOption(c.opts) + c.client, err = hdfs.NewClient(co) + if err != nil { + return err + } + return nil +} + +func (c *Client) Do(ctx context.Context, req *types.Request) (*types.Response, error) { + meta, err := parseMetadata(req.Metadata) + if err != nil { + return nil, err + } + switch meta.method { + case "read_file": + return c.readFile(meta) + case "write_file": + return c.writeFile(meta, req.Data) + case "remove_file": + return c.removeFile(meta) + case "rename_file": + return c.renameFile(meta) + case "mkdir": + return c.makeDir(meta) + case "stat": + return c.stat(meta) + default: + return nil, errors.New("invalid method type") + } +} + +func (c *Client) writeFile(meta metadata, data []byte) (*types.Response, error) { + + err := c.client.CreateEmptyFile(meta.filePath) + if err != nil { + return nil, err + } + return types.NewResponse(). + SetMetadataKeyValue("result", "ok"), + nil +} + +func (c *Client) makeDir(meta metadata) (*types.Response, error) { + err := c.client.Mkdir(meta.filePath, meta.fileMode) + if err != nil { + return nil, err + } + if err != nil { + return nil, err + } + return types.NewResponse(). + SetMetadataKeyValue("result", "ok"), + nil +} + +func (c *Client) removeFile(meta metadata) (*types.Response, error) { + err := c.client.Remove(meta.filePath) + if err != nil { + return nil, err + } + + return types.NewResponse(). + SetMetadataKeyValue("result", "ok"), + nil +} + +func (c *Client) renameFile(meta metadata) (*types.Response, error) { + err := c.client.Rename(meta.oldFilePath, meta.filePath) + if err != nil { + return nil, err + } + return types.NewResponse(). + SetMetadataKeyValue("result", "ok"), + nil +} + +func (c *Client) stat(meta metadata) (*types.Response, error) { + file, err := c.client.Stat(meta.filePath) + if err != nil { + return nil, err + } + b, err := createStatAsByteArray(file) + if err != nil { + return nil, err + } + return types.NewResponse(). + SetData(b). + SetMetadataKeyValue("result", "ok"), + nil +} + +func (c *Client) readFile(meta metadata) (*types.Response, error) { + file, err := c.client.Open(meta.filePath) + if err != nil { + return nil, err + } + bytes, err := ioutil.ReadAll(file) + if err != nil { + return nil, err + } + return types.NewResponse(). + SetData(bytes). + SetMetadataKeyValue("result", "ok"), + nil +} + +func (c *Client) Stop() error { + if c.client != nil { + return c.client.Close() + } + return nil +} + +func setClientOption(opts options) hdfs.ClientOptions { + c := hdfs.ClientOptions{ + + } + if opts.address != "" { + c.Addresses = append(c.Addresses, opts.address) + } + if opts.user != "" { + c.User = opts.user + } + return c +} + +func createFileStat() { + +} diff --git a/targets/storage/hdfs/client_test.go b/targets/storage/hdfs/client_test.go new file mode 100644 index 00000000..495cdcf1 --- /dev/null +++ b/targets/storage/hdfs/client_test.go @@ -0,0 +1,698 @@ +package s3 + +import ( + "context" + "github.com/kubemq-hub/kubemq-targets/config" + "github.com/kubemq-hub/kubemq-targets/types" + "github.com/stretchr/testify/require" + "io/ioutil" + + "testing" + "time" +) + +type testStructure struct { + address string + file []byte + user string +} + +func getTestStructure() (*testStructure, error) { + t := &testStructure{} + t.address = "localhost:9000" + contents, err := ioutil.ReadFile("./../../../examples/storage/hdfs/exampleFile.txt") + if err != nil { + return nil, err + } + t.file = contents + t.user = "test_user" + return t, nil +} + +func TestClient_Init(t *testing.T) { + dat, err := getTestStructure() + require.NoError(t, err) + tests := []struct { + name string + cfg config.Spec + wantErr bool + }{ + { + name: "init ", + cfg: config.Spec{ + Name: "storage-hdfs", + Kind: "storage.hdfs", + Properties: map[string]string{ + "address": dat.address, + "user": dat.address, + }, + }, + wantErr: false, + }, { + name: "invalid init - incorrect port", + cfg: config.Spec{ + Name: "storage-hdfs", + Kind: "storage.hdfs", + Properties: map[string]string{ + "address": "localhost:123123", + "user": dat.address, + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + c := New() + + err := c.Init(ctx, tt.cfg) + if tt.wantErr { + require.Error(t, err) + t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) + return + } + require.NoError(t, err) + + }) + } +} + +func TestClient_Mkdir(t *testing.T) { + dat, err := getTestStructure() + require.NoError(t, err) + cfg := config.Spec{ + Name: "storage-hdfs", + Kind: "storage.hdfs", + Properties: map[string]string{ + "address": dat.address, + "user": dat.address, + }, + } + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + c := New() + + err = c.Init(ctx, cfg) + require.NoError(t, err) + tests := []struct { + name string + request *types.Request + wantErr bool + }{ + { + name: "valid mkdir", + request: types.NewRequest(). + SetMetadataKeyValue("file_path", "/hadoop/dfs/name"). + SetMetadataKeyValue("file_mode", "0755"). + SetMetadataKeyValue("method", "mkdir"), + wantErr: false, + }, { + name: "invalid mkdir - missing path", + request: types.NewRequest(). + SetMetadataKeyValue("file_mode", "0755"). + SetMetadataKeyValue("method", "mkdir"), + wantErr: true, + }, { + name: "invalid mkdir invalid file_mode", + request: types.NewRequest(). + SetMetadataKeyValue("file_path", "/test"). + SetMetadataKeyValue("file_mode", "99999"). + SetMetadataKeyValue("method", "mkdir"), + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := c.Do(ctx, tt.request) + if tt.wantErr { + require.Error(t, err) + t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) + return + } + require.NoError(t, err) + require.NotNil(t, got) + }) + } +} + +func TestClient_Upload(t *testing.T) { + dat, err := getTestStructure() + require.NoError(t, err) + cfg := config.Spec{ + Name: "storage-hdfs", + Kind: "storage.hdfs", + Properties: map[string]string{ + "address": dat.address, + "user": dat.address, + }, + } + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + c := New() + + err = c.Init(ctx, cfg) + require.NoError(t, err) + tests := []struct { + name string + request *types.Request + wantErr bool + }{ + { + name: "valid upload", + request: types.NewRequest(). + SetMetadataKeyValue("file_path", "/test/foo.txt"). + SetData(dat.file). + SetMetadataKeyValue("method", "write_file"), + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := c.Do(ctx, tt.request) + if tt.wantErr { + require.Error(t, err) + t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) + return + } + require.NoError(t, err) + require.NotNil(t, got) + }) + } +} + +// +//func TestClient_List_Bucket_Items(t *testing.T) { +// dat, err := getTestStructure() +// require.NoError(t, err) +// cfg := config.Spec{ +// Name: "storage-hdfs", +// Kind: "storage.hdfs", +// Properties: map[string]string{ +// "aws_key": dat.awsKey, +// "aws_secret_key": dat.awsSecretKey, +// "region": dat.region, +// "downloader": "false", +// "uploader": "false", +// }, +// } +// ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) +// defer cancel() +// c := New() +// +// err = c.Init(ctx, cfg) +// require.NoError(t, err) +// tests := []struct { +// name string +// request *types.Request +// wantErr bool +// }{ +// { +// name: "valid list", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "list_bucket_items"). +// SetMetadataKeyValue("bucket_name", dat.bucketName), +// wantErr: false, +// }, +// { +// name: "invalid list - missing bucket_name", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "list_bucket_items"), +// wantErr: true, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// got, err := c.Do(ctx, tt.request) +// if tt.wantErr { +// require.Error(t, err) +// t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) +// return +// } +// require.NoError(t, err) +// require.NotNil(t, got) +// }) +// } +//} +// +//func TestClient_Create_Bucket(t *testing.T) { +// dat, err := getTestStructure() +// require.NoError(t, err) +// cfg := config.Spec{ +// Name: "storage-hdfs", +// Kind: "storage.hdfs", +// Properties: map[string]string{ +// "aws_key": dat.awsKey, +// "aws_secret_key": dat.awsSecretKey, +// "region": dat.region, +// "downloader": "false", +// "uploader": "false", +// }, +// } +// ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) +// defer cancel() +// c := New() +// +// err = c.Init(ctx, cfg) +// require.NoError(t, err) +// tests := []struct { +// name string +// request *types.Request +// wantErr bool +// }{ +// { +// name: "valid create", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "create_bucket"). +// SetMetadataKeyValue("wait_for_completion", "true"). +// SetMetadataKeyValue("bucket_name", dat.testBucketName), +// wantErr: false, +// }, +// { +// name: "invalid create - missing bucket_name", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "create_bucket"), +// wantErr: true, +// }, +// { +// name: "invalid create - Already exists", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "create_bucket"). +// SetMetadataKeyValue("bucket_name", dat.testBucketName), +// wantErr: true, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// got, err := c.Do(ctx, tt.request) +// if tt.wantErr { +// require.Error(t, err) +// t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) +// return +// } +// require.NoError(t, err) +// require.NotNil(t, got) +// }) +// } +//} +// +//func TestClient_Upload_Item(t *testing.T) { +// dat, err := getTestStructure() +// require.NoError(t, err) +// cfg := config.Spec{ +// Name: "storage-hdfs", +// Kind: "storage.hdfs", +// Properties: map[string]string{ +// "aws_key": dat.awsKey, +// "aws_secret_key": dat.awsSecretKey, +// "region": dat.region, +// "downloader": "false", +// "uploader": "true", +// }, +// } +// tests := []struct { +// name string +// request *types.Request +// wantErr bool +// setUploader bool +// }{ +// { +// name: "valid upload item", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "upload_item"). +// SetMetadataKeyValue("wait_for_completion", "true"). +// SetMetadataKeyValue("bucket_name", dat.testBucketName). +// SetMetadataKeyValue("item_name", dat.itemName). +// SetData(dat.file), +// wantErr: false, +// setUploader: true, +// }, +// { +// name: "invalid upload - missing item", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "upload_item"). +// SetMetadataKeyValue("bucket_name", dat.testBucketName). +// SetData(dat.file), +// wantErr: true, +// setUploader: true, +// }, +// { +// name: "invalid upload - missing uploader", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "upload_item"). +// SetMetadataKeyValue("item_name", dat.itemName). +// SetMetadataKeyValue("bucket_name", dat.testBucketName), +// wantErr: true, +// setUploader: false, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +// defer cancel() +// c := New() +// +// if !tt.setUploader { +// cfg.Properties["uploader"] = "false" +// } else { +// cfg.Properties["uploader"] = "true" +// } +// err = c.Init(ctx, cfg) +// require.NoError(t, err) +// got, err := c.Do(ctx, tt.request) +// if tt.wantErr { +// require.Error(t, err) +// t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) +// return +// } +// require.NoError(t, err) +// require.NotNil(t, got) +// }) +// } +//} +// +//func TestClient_Get_Item(t *testing.T) { +// dat, err := getTestStructure() +// require.NoError(t, err) +// cfg := config.Spec{ +// Name: "storage-hdfs", +// Kind: "storage.hdfs", +// Properties: map[string]string{ +// "aws_key": dat.awsKey, +// "aws_secret_key": dat.awsSecretKey, +// "region": dat.region, +// "downloader": "true", +// "uploader": "false", +// }, +// } +// tests := []struct { +// name string +// request *types.Request +// wantErr bool +// setDownloader bool +// }{ +// { +// name: "valid get item", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "get_item"). +// SetMetadataKeyValue("bucket_name", dat.testBucketName). +// SetMetadataKeyValue("item_name", dat.itemName), +// wantErr: false, +// setDownloader: true, +// }, +// { +// name: "invalid get - missing item", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "get_item"). +// SetMetadataKeyValue("bucket_name", dat.testBucketName), +// wantErr: true, +// setDownloader: true, +// }, { +// name: "invalid get - item does not exists", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "get_item"). +// SetMetadataKeyValue("bucket_name", dat.testBucketName). +// SetMetadataKeyValue("item_name", "fakeItemName"), +// wantErr: true, +// setDownloader: true, +// }, +// { +// name: "invalid get - missing bucketName", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "get_item"). +// SetMetadataKeyValue("item_name", dat.itemName), +// wantErr: true, +// setDownloader: true, +// }, { +// name: "invalid upload - missing downloader", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "get_item"). +// SetMetadataKeyValue("bucket_name", dat.testBucketName). +// SetMetadataKeyValue("item_name", dat.itemName), +// wantErr: true, +// setDownloader: false, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +// defer cancel() +// c := New() +// +// if !tt.setDownloader { +// cfg.Properties["downloader"] = "false" +// } else { +// cfg.Properties["downloader"] = "true" +// } +// err = c.Init(ctx, cfg) +// require.NoError(t, err) +// got, err := c.Do(ctx, tt.request) +// if tt.wantErr { +// require.Error(t, err) +// t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) +// return +// } +// require.NoError(t, err) +// require.NotNil(t, got) +// }) +// } +//} +// +//func TestClient_Delete_Item(t *testing.T) { +// dat, err := getTestStructure() +// require.NoError(t, err) +// cfg := config.Spec{ +// Name: "storage-hdfs", +// Kind: "storage.hdfs", +// Properties: map[string]string{ +// "aws_key": dat.awsKey, +// "aws_secret_key": dat.awsSecretKey, +// "region": dat.region, +// "downloader": "false", +// "uploader": "false", +// }, +// } +// tests := []struct { +// name string +// request *types.Request +// wantErr bool +// }{ +// { +// name: "valid delete item", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "delete_item_from_bucket"). +// SetMetadataKeyValue("wait_for_completion", "true"). +// SetMetadataKeyValue("bucket_name", dat.testBucketName). +// SetMetadataKeyValue("item_name", dat.itemName), +// wantErr: false, +// }, +// { +// name: "invalid delete - missing item", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "delete_item_from_bucket"). +// SetMetadataKeyValue("wait_for_completion", "true"). +// SetMetadataKeyValue("bucket_name", dat.testBucketName), +// wantErr: true, +// }, +// { +// name: "invalid delete - missing bucket_name", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "delete_item_from_bucket"). +// SetMetadataKeyValue("wait_for_completion", "true"). +// SetMetadataKeyValue("item_name", dat.itemName), +// wantErr: true, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +// defer cancel() +// c := New() +// +// err = c.Init(ctx, cfg) +// require.NoError(t, err) +// got, err := c.Do(ctx, tt.request) +// if tt.wantErr { +// require.Error(t, err) +// t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) +// return +// } +// require.NoError(t, err) +// require.NotNil(t, got) +// }) +// } +//} +// +//func TestClient_Copy_Items(t *testing.T) { +// dat, err := getTestStructure() +// require.NoError(t, err) +// cfg := config.Spec{ +// Name: "storage-hdfs", +// Kind: "storage.hdfs", +// Properties: map[string]string{ +// "aws_key": dat.awsKey, +// "aws_secret_key": dat.awsSecretKey, +// "region": dat.region, +// "downloader": "false", +// "uploader": "false", +// }, +// } +// ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +// defer cancel() +// c := New() +// +// err = c.Init(ctx, cfg) +// require.NoError(t, err) +// tests := []struct { +// name string +// request *types.Request +// wantErr bool +// }{ +// { +// name: "valid copy items", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "copy_item"). +// SetMetadataKeyValue("wait_for_completion", "true"). +// SetMetadataKeyValue("bucket_name", dat.bucketName). +// SetMetadataKeyValue("item_name", dat.itemName). +// SetMetadataKeyValue("copy_source", dat.dstBucketName), +// wantErr: false, +// }, +// { +// name: "invalid copy items - missing copy_source ", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "copy_item"). +// SetMetadataKeyValue("bucket_name", dat.testBucketName). +// SetMetadataKeyValue("item_name", dat.itemName), +// wantErr: true, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// got, err := c.Do(ctx, tt.request) +// if tt.wantErr { +// require.Error(t, err) +// t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) +// return +// } +// require.NoError(t, err) +// require.NotNil(t, got) +// }) +// } +//} +// +//func TestClient_Delete_All_Items(t *testing.T) { +// dat, err := getTestStructure() +// require.NoError(t, err) +// cfg := config.Spec{ +// Name: "storage-hdfs", +// Kind: "storage.hdfs", +// Properties: map[string]string{ +// "aws_key": dat.awsKey, +// "aws_secret_key": dat.awsSecretKey, +// "region": dat.region, +// "downloader": "false", +// "uploader": "false", +// }, +// } +// ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +// defer cancel() +// c := New() +// +// err = c.Init(ctx, cfg) +// require.NoError(t, err) +// tests := []struct { +// name string +// request *types.Request +// wantErr bool +// }{ +// { +// name: "valid delete all items", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "delete_all_items_from_bucket"). +// SetMetadataKeyValue("wait_for_completion", "true"). +// SetMetadataKeyValue("bucket_name", dat.testBucketName), +// wantErr: false, +// }, +// { +// name: "invalid valid delete all items - missing bucket", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "delete_all_items_from_bucket"), +// wantErr: true, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// got, err := c.Do(ctx, tt.request) +// if tt.wantErr { +// require.Error(t, err) +// t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) +// return +// } +// require.NoError(t, err) +// require.NotNil(t, got) +// }) +// } +//} +// +//func TestClient_Delete_Bucket(t *testing.T) { +// dat, err := getTestStructure() +// require.NoError(t, err) +// cfg := config.Spec{ +// Name: "storage-hdfs", +// Kind: "storage.hdfs", +// Properties: map[string]string{ +// "aws_key": dat.awsKey, +// "aws_secret_key": dat.awsSecretKey, +// "region": dat.region, +// "downloader": "false", +// "uploader": "false", +// }, +// } +// ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) +// defer cancel() +// c := New() +// +// err = c.Init(ctx, cfg) +// require.NoError(t, err) +// tests := []struct { +// name string +// request *types.Request +// wantErr bool +// }{ +// { +// name: "valid delete", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "delete_bucket"). +// SetMetadataKeyValue("wait_for_completion", "true"). +// SetMetadataKeyValue("bucket_name", dat.testBucketName), +// wantErr: false, +// }, +// { +// name: "invalid delete - missing bucket_name", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "create_bucket"), +// wantErr: true, +// }, +// { +// name: "invalid delete - bucket does not exists", +// request: types.NewRequest(). +// SetMetadataKeyValue("method", "delete_bucket"). +// SetMetadataKeyValue("bucket_name", dat.testBucketName), +// wantErr: true, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// got, err := c.Do(ctx, tt.request) +// if tt.wantErr { +// require.Error(t, err) +// t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) +// return +// } +// require.NoError(t, err) +// require.NotNil(t, got) +// }) +// } +//} diff --git a/targets/storage/hdfs/connector.go b/targets/storage/hdfs/connector.go new file mode 100644 index 00000000..f59da92f --- /dev/null +++ b/targets/storage/hdfs/connector.go @@ -0,0 +1,101 @@ +package s3 + +import ( + "github.com/kubemq-hub/builder/connector/common" +) + +func Connector() *common.Connector { + return common.NewConnector(). + SetKind("aws.s3"). + SetDescription("AWS S3 Target"). + AddProperty( + common.NewProperty(). + SetKind("string"). + SetName("aws_key"). + SetDescription("Set S3 aws key"). + SetMust(true). + SetDefault(""), + ). + AddProperty( + common.NewProperty(). + SetKind("string"). + SetName("aws_secret_key"). + SetDescription("Set S3 aws secret key"). + SetMust(true). + SetDefault(""), + ). + AddProperty( + common.NewProperty(). + SetKind("string"). + SetName("region"). + SetDescription("Set S3 aws region"). + SetMust(true). + SetDefault(""), + ). + AddProperty( + common.NewProperty(). + SetKind("string"). + SetName("token"). + SetDescription("Set S3 token"). + SetMust(false). + SetDefault(""), + ). + AddProperty( + common.NewProperty(). + SetKind("bool"). + SetName("downloader"). + SetDescription("Create S3 downloader instance"). + SetMust(false). + SetDefault("false"), + ). + AddProperty( + common.NewProperty(). + SetKind("bool"). + SetName("uploader"). + SetDescription("Create S3 uploader instance"). + SetMust(false). + SetDefault("false"), + ). + AddMetadata( + common.NewMetadata(). + SetName("method"). + SetKind("string"). + SetDescription("Set S3 execution method"). + SetOptions([]string{"list_buckets", "list_bucket_items", "create_bucket", "delete_bucket", "delete_item_from_bucket", "delete_all_items_from_bucket", "upload_item", "copy_item", "get_item"}). + SetDefault("upload_item"). + SetMust(true), + ). + AddMetadata( + common.NewMetadata(). + SetName("bucket_name"). + SetKind("string"). + SetDescription("Set S3 bucket name"). + SetDefault(""). + SetMust(false), + ). + AddMetadata( + common.NewMetadata(). + SetName("copy_source"). + SetKind("string"). + SetDescription("Set S3 copy source"). + SetDefault(""). + SetMust(false), + ). + AddMetadata( + common.NewMetadata(). + SetKind("bool"). + SetName("wait_for_completion"). + SetDescription("Set S3 wait for completion until retuning response"). + SetMust(false). + SetDefault("false"), + ). + AddMetadata( + common.NewMetadata(). + SetKind("string"). + SetName("item_name"). + SetDescription("Set S3 item name"). + SetMust(false). + SetDefault(""), + ) + +} diff --git a/targets/storage/hdfs/metadata.go b/targets/storage/hdfs/metadata.go new file mode 100644 index 00000000..d8425e14 --- /dev/null +++ b/targets/storage/hdfs/metadata.go @@ -0,0 +1,45 @@ +package s3 + +import ( + "fmt" + "github.com/kubemq-hub/kubemq-targets/types" + "os" +) + +type metadata struct { + method string + + filePath string + oldFilePath string + fileMode os.FileMode +} + +var methodsMap = map[string]string{ + "write_file": "write_file", + "remove_file": "remove_file", + "read_file": "read_file", + "rename_file": "rename_file", + "mkdir": "mkdir", + "stat": "stat", +} + +func parseMetadata(meta types.Metadata) (metadata, error) { + m := metadata{} + var err error + m.method, err = meta.ParseStringMap("method", methodsMap) + if err != nil { + return metadata{}, meta.GetValidMethodTypes(methodsMap) + } + m.filePath, err = meta.MustParseString("file_path") + if err != nil { + return metadata{}, fmt.Errorf("error parsing file_path, %w", err) + } + if m.method == "rename_file" { + m.oldFilePath, err = meta.MustParseString("old_file_path") + if err != nil { + return metadata{}, fmt.Errorf("error parsing old_file_path, %w", err) + } + } + m.fileMode = meta.ParseOSFileMode("file_mode", os.FileMode(0755)) + return m, nil +} diff --git a/targets/storage/hdfs/options.go b/targets/storage/hdfs/options.go new file mode 100644 index 00000000..ba048cba --- /dev/null +++ b/targets/storage/hdfs/options.go @@ -0,0 +1,26 @@ +package s3 + +import ( + "fmt" + "github.com/kubemq-hub/kubemq-targets/config" +) + + +type options struct { + address string + user string +} + +func parseOptions(cfg config.Spec) (options, error) { + o := options{} + var err error + o.address, err = cfg.Properties.MustParseString("address") + if err != nil { + return options{}, fmt.Errorf("error parsing address , %w", err) + } + o.user ,err = cfg.Properties.MustParseString("user") + if err != nil { + return options{}, fmt.Errorf("error parsing user , %w", err) + } + return o, nil +} diff --git a/targets/storage/hdfs/stat.go b/targets/storage/hdfs/stat.go new file mode 100644 index 00000000..9bb837ac --- /dev/null +++ b/targets/storage/hdfs/stat.go @@ -0,0 +1,38 @@ +package s3 + +import ( + "encoding/json" + "os" + "time" +) + +type Stat struct { + Name string `json:"name"` + Size int64 `json:"name"` + ModTime time.Time `json:"name"` + IsDir bool `json:"name"` +} + +func createStat(o os.FileInfo) Stat { + return Stat{ + Name: o.Name(), + Size: o.Size(), + ModTime: o.ModTime(), + IsDir: o.IsDir(), + } +} + +func createStatAsByteArray(o os.FileInfo) ([]byte, error) { + s := Stat{ + Name: o.Name(), + Size: o.Size(), + ModTime: o.ModTime(), + IsDir: o.IsDir(), + } + b, err := json.Marshal(s) + if err != nil { + return nil, err + } + return b, err + +} diff --git a/types/metadata.go b/types/metadata.go index e7bac512..c7bfac81 100644 --- a/types/metadata.go +++ b/types/metadata.go @@ -111,6 +111,19 @@ func (m Metadata) ParseInt(key string, defaultValue int) int { return defaultValue } } + +func (m Metadata) ParseOSFileMode(key string, defaultValue os.FileMode) os.FileMode { + if val, ok := m[key]; ok && val != "" { + parsedVal, err := strconv.ParseUint(val, 10, 32) + if err != nil { + return defaultValue + } else { + return os.FileMode(parsedVal) + } + } else { + return defaultValue + } +} func (m Metadata) ParseIntWithRange(key string, defaultValue, min, max int) (int, error) { val := m.ParseInt(key, defaultValue) if val < min { From c99fd94e42f06f292e9621742f634ad4b979e98e Mon Sep 17 00:00:00 2001 From: eitam-ring Date: Mon, 21 Dec 2020 16:39:21 +0200 Subject: [PATCH 2/2] Added target hadoop --- README.md | 1 + config.yaml | 9 +- examples/storage/hdfs/main.go | 31 +- targets/storage/hdfs/README.md | 201 +++++++++++ targets/storage/hdfs/client.go | 15 +- targets/storage/hdfs/client_test.go | 435 +++++++++++++----------- targets/storage/hdfs/connector.go | 74 +--- targets/storage/hdfs/docker-compose.yml | 75 ++++ targets/storage/hdfs/metadata.go | 2 +- targets/storage/hdfs/options.go | 2 +- targets/storage/hdfs/stat.go | 17 +- targets/target.go | 8 + 12 files changed, 565 insertions(+), 305 deletions(-) create mode 100644 targets/storage/hdfs/README.md create mode 100644 targets/storage/hdfs/docker-compose.yml diff --git a/README.md b/README.md index 839c9e97..e1731cc5 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,7 @@ A list of supported targets is below. | | [IBM-MQ](https://developer.ibm.com/components/ibm-mq) |messaging.ibmmq | [Usage](targets/messaging/ibmmq) | [Example](examples/messaging/ibmmq) | | Storage | | | | | | | [Minio/S3](https://min.io/) |storage.minio | [Usage](targets/storage/minio) | [Example](examples/storage/minio) | +| | [hadoop/hdfs](https://hadoop.apache.org/) |storage.hdfs | [Usage](targets/storage/hdfs) | [Example](examples/storage/hdfs) | | Serverless | | | | | | | [OpenFaas](https://www.openfaas.com/) |serverless.openfaas | [Usage](targets/serverless/openfass) | [Example](examples/serverless/openfaas) | | Http | | | | | diff --git a/config.yaml b/config.yaml index d1a5b52e..a3083e77 100644 --- a/config.yaml +++ b/config.yaml @@ -1,12 +1,13 @@ bindings: -- name: consulkv +- name: hdfs source: kind: kubemq.query properties: address: localhost:50000 - channel: query.consulkv + channel: query.hdfs target: - kind: stores.consulkv + kind: storage.hdfs properties: - address: localhost:8500 + address: localhost:9000 + user: test_user properties: {} diff --git a/examples/storage/hdfs/main.go b/examples/storage/hdfs/main.go index 6fb1c83b..90c1ca2f 100644 --- a/examples/storage/hdfs/main.go +++ b/examples/storage/hdfs/main.go @@ -19,36 +19,19 @@ func main() { log.Fatal(err) } // listRequest - listRequest := types.NewRequest(). - SetMetadataKeyValue("method", "list_buckets") - queryListResponse, err := client.SetQuery(listRequest.ToQuery()). - SetChannel("query.aws.s3"). + statRequest := types.NewRequest(). + SetMetadataKeyValue("file_path", "/test/foo.txt"). + SetMetadataKeyValue("method", "stat") + queryStatResponse, err := client.SetQuery(statRequest.ToQuery()). + SetChannel("query.hdfs"). SetTimeout(10 * time.Second).Send(context.Background()) if err != nil { log.Fatal(err) } - listResponse, err := types.ParseResponse(queryListResponse.Body) + statResponse, err := types.ParseResponse(queryStatResponse.Body) if err != nil { log.Fatal(err) } - log.Println(fmt.Sprintf("list buckets executed, response: %s", listResponse.Data)) + log.Println(fmt.Sprintf("stat executed, response: %s", statResponse.Data)) - // Create Bucket - BucketName := "testmykubemqbucketname" - createRequest := types.NewRequest(). - SetMetadataKeyValue("method", "create_bucket"). - SetMetadataKeyValue("bucket_name", BucketName). - SetMetadataKeyValue("wait_for_completion", "true") - - getCreate, err := client.SetQuery(createRequest.ToQuery()). - SetChannel("query.aws.s3"). - SetTimeout(10 * time.Second).Send(context.Background()) - if err != nil { - log.Fatal(err) - } - createResponse, err := types.ParseResponse(getCreate.Body) - if err != nil { - log.Fatal(err) - } - log.Println(fmt.Sprintf("create bucket executed, error: %v", createResponse.IsError)) } diff --git a/targets/storage/hdfs/README.md b/targets/storage/hdfs/README.md new file mode 100644 index 00000000..2f2511db --- /dev/null +++ b/targets/storage/hdfs/README.md @@ -0,0 +1,201 @@ +# Kubemq hadoop target Connector + +Kubemq -hadoop target connector allows services using kubemq server to access hadoop service. + +## Prerequisites +The following required to run the -hadoop target connector: + +- kubemq cluster +- hadoop active server +- kubemq-source deployment + +## Configuration + +hadoop target connector configuration properties: + +| Properties Key | Required | Description | Example | +|:---------------|:---------|:-------------------------------------------|:----------------------------| +| address | yes | hadoop address | "localhost:9000" | +| user | no | hadoop user | "my_user" | + + +Example: + +```yaml +bindings: + - name: kubemq-query--hadoop + source: + kind: kubemq.query + name: kubemq-query + properties: + address: "kubemq-cluster:50000" + client_id: "kubemq-query--hadoop-connector" + auth_token: "" + channel: "query..hadoop" + group: "" + auto_reconnect: "true" + reconnect_interval_seconds: "1" + max_reconnects: "0" + target: + kind: .hadoop + name: -hadoop + properties: + _key: "id" + _secret_key: 'json' + region: "region" + token: "" + downloader: "true" + uploader: "true" +``` + +## Usage + +### Read File + +Read File: + +| Metadata Key | Required | Description | Possible values | +|:------------------|:---------|:----------------------------------------|:-------------------------------------------| +| file_path | yes | path to file | "/test/foo2.txt" | +| method | yes | type of method | "read_file" | + + + + +Example: + +```json +{ + "metadata": { + "method": "read_file", + "file_path": "/test/foo2.txt" + }, + "data": null +} +``` + + +### Write File + +Write File: + +| Metadata Key | Required | Description | Possible values | +|:------------------|:---------|:----------------------------------------|:-------------------------------------------| +| file_path | yes | path to file | "/test/foo2.txt" | +| method | yes | type of method | "write_file" | +| file_mode | no | os permission mode default(0777) | "0777" | +| data | yes | file as byte array | "TXkgZXhhbXBsZSBmaWxlIHRvIHVwbG9hZA==" | + + + + +Example: + +```json +{ + "metadata": { + "method": "write_file", + "file_path": "/test/foo2.txt" + }, + "data": "TXkgZXhhbXBsZSBmaWxlIHRvIHVwbG9hZA==" +} +``` + +### Remove File + +Remove File: + +| Metadata Key | Required | Description | Possible values | +|:------------------|:---------|:----------------------------------------|:-------------------------------------------| +| file_path | yes | path to file | "/test/foo2.txt" | +| method | yes | type of method | "remove_file" | + + + + +Example: + +```json +{ + "metadata": { + "method": "remove_file", + "file_path": "/test/foo2.txt" + }, + "data": null +} +``` + +### Rename File + +Rename File: + +| Metadata Key | Required | Description | Possible values | +|:------------------|:---------|:----------------------------------------|:-------------------------------------------| +| file_path | yes | new path to file | "/test/foo3.txt" | +| old_file_path | yes | new path to file | "/test/foo2.txt" | +| method | yes | type of method | "rename_file" | + + + + +Example: + +```json +{ + "metadata": { + "method": "rename_file", + "file_path": "/test/foo3.txt", + "old_file_path": "/test/foo2.txt" + }, + "data": null +} +``` + +### Make Dir + +Make Dir : + +| Metadata Key | Required | Description | Possible values | +|:------------------|:---------|:----------------------------------------|:-------------------------------------------| +| file_path | yes | new path to file | "/test_folder" | +| file_mode | no | os permission mode default(0777) | "0777" | +| method | yes | type of method | "mkdir" | + + + + +Example: + +```json +{ + "metadata": { + "method": "mkdir", + "file_path": "/test_folder" + }, + "data": null +} +``` + +### Stat + +Stat : + +| Metadata Key | Required | Description | Possible values | +|:------------------|:---------|:----------------------------------------|:-------------------------------------------| +| file_path | yes | new path to file | "/test/foo3.txt" | +| method | yes | type of method | "stat" | + + + + +Example: + +```json +{ + "metadata": { + "method": "stat", + "file_path": "/test/foo2.txt" + }, + "data": null +} +``` \ No newline at end of file diff --git a/targets/storage/hdfs/client.go b/targets/storage/hdfs/client.go index be6d6c92..62e8dbd9 100644 --- a/targets/storage/hdfs/client.go +++ b/targets/storage/hdfs/client.go @@ -1,4 +1,4 @@ -package s3 +package hdfs import ( "context" @@ -63,7 +63,11 @@ func (c *Client) Do(ctx context.Context, req *types.Request) (*types.Response, e func (c *Client) writeFile(meta metadata, data []byte) (*types.Response, error) { - err := c.client.CreateEmptyFile(meta.filePath) + writer, err := c.client.Create(meta.filePath) + if err != nil { + return nil, err + } + _, err = writer.Write(data) if err != nil { return nil, err } @@ -77,9 +81,6 @@ func (c *Client) makeDir(meta metadata) (*types.Response, error) { if err != nil { return nil, err } - if err != nil { - return nil, err - } return types.NewResponse(). SetMetadataKeyValue("result", "ok"), nil @@ -155,7 +156,3 @@ func setClientOption(opts options) hdfs.ClientOptions { } return c } - -func createFileStat() { - -} diff --git a/targets/storage/hdfs/client_test.go b/targets/storage/hdfs/client_test.go index 495cdcf1..d494a481 100644 --- a/targets/storage/hdfs/client_test.go +++ b/targets/storage/hdfs/client_test.go @@ -1,4 +1,4 @@ -package s3 +package hdfs import ( "context" @@ -44,7 +44,7 @@ func TestClient_Init(t *testing.T) { Kind: "storage.hdfs", Properties: map[string]string{ "address": dat.address, - "user": dat.address, + "user": dat.user, }, }, wantErr: false, @@ -87,7 +87,7 @@ func TestClient_Mkdir(t *testing.T) { Kind: "storage.hdfs", Properties: map[string]string{ "address": dat.address, - "user": dat.address, + "user": dat.user, }, } ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) @@ -104,11 +104,19 @@ func TestClient_Mkdir(t *testing.T) { { name: "valid mkdir", request: types.NewRequest(). - SetMetadataKeyValue("file_path", "/hadoop/dfs/name"). - SetMetadataKeyValue("file_mode", "0755"). + SetMetadataKeyValue("file_path", "/test"). + SetMetadataKeyValue("file_mode", "0777"). SetMetadataKeyValue("method", "mkdir"), wantErr: false, - }, { + }, + { + name: "invalid mkdir already exists", + request: types.NewRequest(). + SetMetadataKeyValue("file_path", "/test"). + SetMetadataKeyValue("file_mode", "0777"). + SetMetadataKeyValue("method", "mkdir"), + wantErr: true, + },{ name: "invalid mkdir - missing path", request: types.NewRequest(). SetMetadataKeyValue("file_mode", "0755"). @@ -145,7 +153,7 @@ func TestClient_Upload(t *testing.T) { Kind: "storage.hdfs", Properties: map[string]string{ "address": dat.address, - "user": dat.address, + "user": dat.user, }, } ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) @@ -162,7 +170,16 @@ func TestClient_Upload(t *testing.T) { { name: "valid upload", request: types.NewRequest(). - SetMetadataKeyValue("file_path", "/test/foo.txt"). + SetMetadataKeyValue("file_path", "/test/foo2.txt"). + SetData(dat.file). + SetMetadataKeyValue("file_mode", "0777"). + SetMetadataKeyValue("method", "write_file"), + wantErr: false, + }, + { + name: "invalid upload", + request: types.NewRequest(). + SetMetadataKeyValue("file_path", "/test/foo2.txt"). SetData(dat.file). SetMetadataKeyValue("method", "write_file"), wantErr: false, @@ -182,195 +199,219 @@ func TestClient_Upload(t *testing.T) { } } -// -//func TestClient_List_Bucket_Items(t *testing.T) { -// dat, err := getTestStructure() -// require.NoError(t, err) -// cfg := config.Spec{ -// Name: "storage-hdfs", -// Kind: "storage.hdfs", -// Properties: map[string]string{ -// "aws_key": dat.awsKey, -// "aws_secret_key": dat.awsSecretKey, -// "region": dat.region, -// "downloader": "false", -// "uploader": "false", -// }, -// } -// ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) -// defer cancel() -// c := New() -// -// err = c.Init(ctx, cfg) -// require.NoError(t, err) -// tests := []struct { -// name string -// request *types.Request -// wantErr bool -// }{ -// { -// name: "valid list", -// request: types.NewRequest(). -// SetMetadataKeyValue("method", "list_bucket_items"). -// SetMetadataKeyValue("bucket_name", dat.bucketName), -// wantErr: false, -// }, -// { -// name: "invalid list - missing bucket_name", -// request: types.NewRequest(). -// SetMetadataKeyValue("method", "list_bucket_items"), -// wantErr: true, -// }, -// } -// for _, tt := range tests { -// t.Run(tt.name, func(t *testing.T) { -// got, err := c.Do(ctx, tt.request) -// if tt.wantErr { -// require.Error(t, err) -// t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) -// return -// } -// require.NoError(t, err) -// require.NotNil(t, got) -// }) -// } -//} -// -//func TestClient_Create_Bucket(t *testing.T) { -// dat, err := getTestStructure() -// require.NoError(t, err) -// cfg := config.Spec{ -// Name: "storage-hdfs", -// Kind: "storage.hdfs", -// Properties: map[string]string{ -// "aws_key": dat.awsKey, -// "aws_secret_key": dat.awsSecretKey, -// "region": dat.region, -// "downloader": "false", -// "uploader": "false", -// }, -// } -// ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) -// defer cancel() -// c := New() -// -// err = c.Init(ctx, cfg) -// require.NoError(t, err) -// tests := []struct { -// name string -// request *types.Request -// wantErr bool -// }{ -// { -// name: "valid create", -// request: types.NewRequest(). -// SetMetadataKeyValue("method", "create_bucket"). -// SetMetadataKeyValue("wait_for_completion", "true"). -// SetMetadataKeyValue("bucket_name", dat.testBucketName), -// wantErr: false, -// }, -// { -// name: "invalid create - missing bucket_name", -// request: types.NewRequest(). -// SetMetadataKeyValue("method", "create_bucket"), -// wantErr: true, -// }, -// { -// name: "invalid create - Already exists", -// request: types.NewRequest(). -// SetMetadataKeyValue("method", "create_bucket"). -// SetMetadataKeyValue("bucket_name", dat.testBucketName), -// wantErr: true, -// }, -// } -// for _, tt := range tests { -// t.Run(tt.name, func(t *testing.T) { -// got, err := c.Do(ctx, tt.request) -// if tt.wantErr { -// require.Error(t, err) -// t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) -// return -// } -// require.NoError(t, err) -// require.NotNil(t, got) -// }) -// } -//} -// -//func TestClient_Upload_Item(t *testing.T) { -// dat, err := getTestStructure() -// require.NoError(t, err) -// cfg := config.Spec{ -// Name: "storage-hdfs", -// Kind: "storage.hdfs", -// Properties: map[string]string{ -// "aws_key": dat.awsKey, -// "aws_secret_key": dat.awsSecretKey, -// "region": dat.region, -// "downloader": "false", -// "uploader": "true", -// }, -// } -// tests := []struct { -// name string -// request *types.Request -// wantErr bool -// setUploader bool -// }{ -// { -// name: "valid upload item", -// request: types.NewRequest(). -// SetMetadataKeyValue("method", "upload_item"). -// SetMetadataKeyValue("wait_for_completion", "true"). -// SetMetadataKeyValue("bucket_name", dat.testBucketName). -// SetMetadataKeyValue("item_name", dat.itemName). -// SetData(dat.file), -// wantErr: false, -// setUploader: true, -// }, -// { -// name: "invalid upload - missing item", -// request: types.NewRequest(). -// SetMetadataKeyValue("method", "upload_item"). -// SetMetadataKeyValue("bucket_name", dat.testBucketName). -// SetData(dat.file), -// wantErr: true, -// setUploader: true, -// }, -// { -// name: "invalid upload - missing uploader", -// request: types.NewRequest(). -// SetMetadataKeyValue("method", "upload_item"). -// SetMetadataKeyValue("item_name", dat.itemName). -// SetMetadataKeyValue("bucket_name", dat.testBucketName), -// wantErr: true, -// setUploader: false, -// }, -// } -// for _, tt := range tests { -// t.Run(tt.name, func(t *testing.T) { -// ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) -// defer cancel() -// c := New() -// -// if !tt.setUploader { -// cfg.Properties["uploader"] = "false" -// } else { -// cfg.Properties["uploader"] = "true" -// } -// err = c.Init(ctx, cfg) -// require.NoError(t, err) -// got, err := c.Do(ctx, tt.request) -// if tt.wantErr { -// require.Error(t, err) -// t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) -// return -// } -// require.NoError(t, err) -// require.NotNil(t, got) -// }) -// } -//} + +func TestClient_ReadFile(t *testing.T) { + dat, err := getTestStructure() + require.NoError(t, err) + cfg := config.Spec{ + Name: "storage-hdfs", + Kind: "storage.hdfs", + Properties: map[string]string{ + "address": dat.address, + "user": dat.user, + }, + } + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + c := New() + + err = c.Init(ctx, cfg) + require.NoError(t, err) + tests := []struct { + name string + request *types.Request + wantErr bool + }{ + { + name: "valid read", + request: types.NewRequest(). + SetMetadataKeyValue("file_path", "/test/foo2.txt"). + SetMetadataKeyValue("method", "read_file"), + wantErr: false, + }, + { + name: "invalid read missing file_path", + request: types.NewRequest(). + SetMetadataKeyValue("method", "read_file"), + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := c.Do(ctx, tt.request) + if tt.wantErr { + require.Error(t, err) + t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) + return + } + require.NoError(t, err) + require.NotNil(t, got) + }) + } +} + +func TestClient_Remove_File(t *testing.T) { + dat, err := getTestStructure() + require.NoError(t, err) + cfg := config.Spec{ + Name: "storage-hdfs", + Kind: "storage.hdfs", + Properties: map[string]string{ + "address": dat.address, + "user": dat.user, + }, + } + tests := []struct { + name string + request *types.Request + wantErr bool + }{ + { + name: "valid Remove item", + request: types.NewRequest(). + SetMetadataKeyValue("file_path", "/test/foo2.txt"). + SetMetadataKeyValue("method", "remove_file"), + wantErr: false, + }, + { + name: "invalid Remove item - file does not exists", + request: types.NewRequest(). + SetMetadataKeyValue("file_path", "/test/foo2.txt"). + SetMetadataKeyValue("method", "remove_file"), + wantErr: false, + }, + + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + c := New() + + err = c.Init(ctx, cfg) + require.NoError(t, err) + got, err := c.Do(ctx, tt.request) + if tt.wantErr { + require.Error(t, err) + t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) + return + } + require.NoError(t, err) + require.NotNil(t, got) + }) + } +} + +func TestClient_Stat(t *testing.T) { + dat, err := getTestStructure() + require.NoError(t, err) + cfg := config.Spec{ + Name: "storage-hdfs", + Kind: "storage.hdfs", + Properties: map[string]string{ + "address": dat.address, + "user": dat.user, + }, + } + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + c := New() + + err = c.Init(ctx, cfg) + require.NoError(t, err) + tests := []struct { + name string + request *types.Request + wantErr bool + }{ + { + name: "valid stat", + request: types.NewRequest(). + SetMetadataKeyValue("file_path", "/test/foo.txt"). + SetMetadataKeyValue("method", "stat"), + wantErr: false, + },{ + name: "invalid stat - file does not exists", + request: types.NewRequest(). + SetMetadataKeyValue("file_path", "/test/fake.txt"). + SetMetadataKeyValue("method", "stat"), + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := c.Do(ctx, tt.request) + if tt.wantErr { + require.Error(t, err) + t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) + return + } + require.NoError(t, err) + require.NotNil(t, got) + }) + } +} + + +func TestClient_Rename(t *testing.T) { + dat, err := getTestStructure() + require.NoError(t, err) + cfg := config.Spec{ + Name: "storage-hdfs", + Kind: "storage.hdfs", + Properties: map[string]string{ + "address": dat.address, + "user": dat.user, + }, + } + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + c := New() + + err = c.Init(ctx, cfg) + require.NoError(t, err) + tests := []struct { + name string + request *types.Request + wantErr bool + }{ + { + name: "valid rename", + request: types.NewRequest(). + SetMetadataKeyValue("file_path", "/test/foo3.txt"). + SetMetadataKeyValue("old_file_path", "/test/foo2.txt"). + SetMetadataKeyValue("method", "rename_file"), + wantErr: false, + },{ + name: "invalid rename - file does not exists", + request: types.NewRequest(). + SetMetadataKeyValue("file_path", "/test/foo3.txt"). + SetMetadataKeyValue("old_file_path", "/test/foo2.txt"). + SetMetadataKeyValue("method", "rename_file"), + wantErr: true, + },{ + name: "invalid rename - missing old file path", + request: types.NewRequest(). + SetMetadataKeyValue("file_path", "/test/foo3.txt"). + SetMetadataKeyValue("method", "rename_file"), + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := c.Do(ctx, tt.request) + if tt.wantErr { + require.Error(t, err) + t.Logf("init() error = %v, wantSetErr %v", err, tt.wantErr) + return + } + require.NoError(t, err) + require.NotNil(t, got) + }) + } +} + + // //func TestClient_Get_Item(t *testing.T) { // dat, err := getTestStructure() diff --git a/targets/storage/hdfs/connector.go b/targets/storage/hdfs/connector.go index f59da92f..8e4d3ef5 100644 --- a/targets/storage/hdfs/connector.go +++ b/targets/storage/hdfs/connector.go @@ -1,4 +1,4 @@ -package s3 +package hdfs import ( "github.com/kubemq-hub/builder/connector/common" @@ -6,96 +6,56 @@ import ( func Connector() *common.Connector { return common.NewConnector(). - SetKind("aws.s3"). - SetDescription("AWS S3 Target"). + SetKind("storage.hdfs"). + SetDescription("Hadoop Target"). AddProperty( common.NewProperty(). SetKind("string"). - SetName("aws_key"). - SetDescription("Set S3 aws key"). + SetName("address"). + SetDescription("Set Hadoop address"). SetMust(true). SetDefault(""), ). AddProperty( common.NewProperty(). SetKind("string"). - SetName("aws_secret_key"). - SetDescription("Set S3 aws secret key"). - SetMust(true). - SetDefault(""), - ). - AddProperty( - common.NewProperty(). - SetKind("string"). - SetName("region"). - SetDescription("Set S3 aws region"). - SetMust(true). - SetDefault(""), - ). - AddProperty( - common.NewProperty(). - SetKind("string"). - SetName("token"). - SetDescription("Set S3 token"). + SetName("user"). + SetDescription("Set Hadoop user"). SetMust(false). SetDefault(""), ). - AddProperty( - common.NewProperty(). - SetKind("bool"). - SetName("downloader"). - SetDescription("Create S3 downloader instance"). - SetMust(false). - SetDefault("false"), - ). - AddProperty( - common.NewProperty(). - SetKind("bool"). - SetName("uploader"). - SetDescription("Create S3 uploader instance"). - SetMust(false). - SetDefault("false"), - ). AddMetadata( common.NewMetadata(). SetName("method"). SetKind("string"). - SetDescription("Set S3 execution method"). - SetOptions([]string{"list_buckets", "list_bucket_items", "create_bucket", "delete_bucket", "delete_item_from_bucket", "delete_all_items_from_bucket", "upload_item", "copy_item", "get_item"}). - SetDefault("upload_item"). + SetDescription("Set Hadoop execution method"). + SetOptions([]string{"write_file", "remove_file", "read_file", "rename_file", "mkdir", "stat"}). + SetDefault("read_file"). SetMust(true), ). AddMetadata( common.NewMetadata(). - SetName("bucket_name"). + SetName("file_path"). SetKind("string"). - SetDescription("Set S3 bucket name"). + SetDescription("Set Hadoop file path"). SetDefault(""). SetMust(false), ). AddMetadata( common.NewMetadata(). - SetName("copy_source"). + SetName("old_file_path"). SetKind("string"). - SetDescription("Set S3 copy source"). + SetDescription("Set Hadoop old file path"). SetDefault(""). SetMust(false), ). - AddMetadata( - common.NewMetadata(). - SetKind("bool"). - SetName("wait_for_completion"). - SetDescription("Set S3 wait for completion until retuning response"). - SetMust(false). - SetDefault("false"), - ). AddMetadata( common.NewMetadata(). SetKind("string"). - SetName("item_name"). - SetDescription("Set S3 item name"). + SetName("file_mode"). + SetDescription("Set os file mode"). SetMust(false). - SetDefault(""), + SetDefault("0777"), ) } diff --git a/targets/storage/hdfs/docker-compose.yml b/targets/storage/hdfs/docker-compose.yml new file mode 100644 index 00000000..4bec30bb --- /dev/null +++ b/targets/storage/hdfs/docker-compose.yml @@ -0,0 +1,75 @@ +version: "2.1" + +services: + namenode: + build: ./namenode + container_name: namenode + environment: + - CLUSTER_NAME=test + env_file: + - ./hadoop.env + ports: + - "9870:9870" + - "9000:9000" + + resourcemanager: + build: ./resourcemanager + container_name: resourcemanager + restart: on-failure + depends_on: + - namenode + - datanode1 + - datanode2 + - datanode3 + env_file: + - ./hadoop.env + ports: + - "8089:8088" + + historyserver: + build: ./historyserver + container_name: historyserver + depends_on: + - namenode + - datanode1 + - datanode2 + env_file: + - ./hadoop.env + ports: + - "8188:8188" + + nodemanager1: + build: ./nodemanager + container_name: nodemanager1 + depends_on: + - namenode + - datanode1 + - datanode2 + env_file: + - ./hadoop.env + ports: + - "8042:8042" + + datanode1: + build: ./datanode + container_name: datanode1 + depends_on: + - namenode + env_file: + - ./hadoop.env + + datanode2: + build: ./datanode + container_name: datanode2 + depends_on: + - namenode + env_file: + - ./hadoop.env + + datanode3: + build: ./datanode + container_name: datanode3 + depends_on: + - namenode + env_file: + - ./hadoop.env diff --git a/targets/storage/hdfs/metadata.go b/targets/storage/hdfs/metadata.go index d8425e14..5ad00d9c 100644 --- a/targets/storage/hdfs/metadata.go +++ b/targets/storage/hdfs/metadata.go @@ -1,4 +1,4 @@ -package s3 +package hdfs import ( "fmt" diff --git a/targets/storage/hdfs/options.go b/targets/storage/hdfs/options.go index ba048cba..ca4ed330 100644 --- a/targets/storage/hdfs/options.go +++ b/targets/storage/hdfs/options.go @@ -1,4 +1,4 @@ -package s3 +package hdfs import ( "fmt" diff --git a/targets/storage/hdfs/stat.go b/targets/storage/hdfs/stat.go index 9bb837ac..8f7cf0b8 100644 --- a/targets/storage/hdfs/stat.go +++ b/targets/storage/hdfs/stat.go @@ -1,4 +1,4 @@ -package s3 +package hdfs import ( "encoding/json" @@ -8,19 +8,12 @@ import ( type Stat struct { Name string `json:"name"` - Size int64 `json:"name"` - ModTime time.Time `json:"name"` - IsDir bool `json:"name"` + Size int64 `json:"size"` + ModTime time.Time `json:"mod_time"` + IsDir bool `json:"is_dir"` } -func createStat(o os.FileInfo) Stat { - return Stat{ - Name: o.Name(), - Size: o.Size(), - ModTime: o.ModTime(), - IsDir: o.IsDir(), - } -} + func createStatAsByteArray(o os.FileInfo) ([]byte, error) { s := Stat{ diff --git a/targets/target.go b/targets/target.go index 9901a27f..e1b7db90 100644 --- a/targets/target.go +++ b/targets/target.go @@ -29,6 +29,7 @@ import ( "github.com/kubemq-hub/kubemq-targets/targets/gcp/firebase" "github.com/kubemq-hub/kubemq-targets/targets/messaging/ibmmq" "github.com/kubemq-hub/kubemq-targets/targets/messaging/nats" + "github.com/kubemq-hub/kubemq-targets/targets/storage/hdfs" "github.com/kubemq-hub/kubemq-targets/targets/stores/aerospike" "github.com/kubemq-hub/kubemq-targets/targets/stores/cockroachdb" "github.com/kubemq-hub/kubemq-targets/targets/stores/consulkv" @@ -426,6 +427,12 @@ func Init(ctx context.Context, cfg config.Spec) (Target, error) { return nil, err } return target, nil + case "storage.hdfs": + target := hdfs.New() + if err := target.Init(ctx, cfg); err != nil { + return nil, err + } + return target, nil case "azure.storage.blob": target := blob.New() if err := target.Init(ctx, cfg); err != nil { @@ -516,6 +523,7 @@ func Connectors() common.Connectors { //storage minio.Connector(), + hdfs.Connector(), // serverless openfaas.Connector(),