From 1e6c8e2aa31b88a6ad0197e072900a826d24649b Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Thu, 24 Jun 2021 13:54:12 +0200 Subject: [PATCH 01/21] New methods PullTransfer,RetryTransfer,ListTransfers. --- internal/grpc/services/datatx/datatx.go | 18 +++++++-- internal/grpc/services/gateway/datatx.go | 48 ++++++++++++++++++++---- 2 files changed, 56 insertions(+), 10 deletions(-) diff --git a/internal/grpc/services/datatx/datatx.go b/internal/grpc/services/datatx/datatx.go index 33e9f73d56..3845469b08 100644 --- a/internal/grpc/services/datatx/datatx.go +++ b/internal/grpc/services/datatx/datatx.go @@ -81,9 +81,9 @@ func (s *service) UnprotectedEndpoints() []string { return []string{} } -func (s *service) CreateTransfer(ctx context.Context, req *datatx.CreateTransferRequest) (*datatx.CreateTransferResponse, error) { - return &datatx.CreateTransferResponse{ - Status: status.NewUnimplemented(ctx, errtypes.NotSupported("CreateTransfer not implemented"), "CreateTransfer not implemented"), +func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) { + return &datatx.PullTransferResponse{ + Status: status.NewUnimplemented(ctx, errtypes.NotSupported("PullTransfer not implemented"), "PullTransfer not implemented"), }, nil } @@ -98,3 +98,15 @@ func (s *service) CancelTransfer(ctx context.Context, in *datatx.CancelTransferR Status: status.NewUnimplemented(ctx, errtypes.NotSupported("CancelTransfer not implemented"), "CancelTransfer not implemented"), }, nil } + +func (s *service) ListTransfers(ctx context.Context, in *datatx.ListTransfersRequest) (*datatx.ListTransfersResponse, error) { + return &datatx.ListTransfersResponse{ + Status: status.NewUnimplemented(ctx, errtypes.NotSupported("ListTransfers not implemented"), "ListTransfers not implemented"), + }, nil +} + +func (s *service) RetryTransfer(ctx context.Context, in *datatx.RetryTransferRequest) (*datatx.RetryTransferResponse, error) { + return &datatx.RetryTransferResponse{ + Status: status.NewUnimplemented(ctx, errtypes.NotSupported("RetryTransfer not implemented"), "RetryTransfer not implemented"), + }, nil +} diff --git a/internal/grpc/services/gateway/datatx.go b/internal/grpc/services/gateway/datatx.go index c4d857b910..ac2fed8a3e 100644 --- a/internal/grpc/services/gateway/datatx.go +++ b/internal/grpc/services/gateway/datatx.go @@ -27,18 +27,18 @@ import ( "github.com/pkg/errors" ) -func (s *svc) CreateTransfer(ctx context.Context, req *datatx.CreateTransferRequest) (*datatx.CreateTransferResponse, error) { +func (s *svc) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) { c, err := pool.GetDataTxClient(s.c.DataTxEndpoint) if err != nil { - err = errors.Wrap(err, "gateway: error calling GetOCMShareProviderClient") - return &datatx.CreateTransferResponse{ + err = errors.Wrap(err, "gateway: error calling GetDataTxClient") + return &datatx.PullTransferResponse{ Status: status.NewInternal(ctx, err, "error getting data transfer client"), }, nil } - res, err := c.CreateTransfer(ctx, req) + res, err := c.PullTransfer(ctx, req) if err != nil { - return nil, errors.Wrap(err, "gateway: error calling CreateTransfer") + return nil, errors.Wrap(err, "gateway: error calling PullTransfer") } return res, nil @@ -47,7 +47,7 @@ func (s *svc) CreateTransfer(ctx context.Context, req *datatx.CreateTransferRequ func (s *svc) GetTransferStatus(ctx context.Context, req *datatx.GetTransferStatusRequest) (*datatx.GetTransferStatusResponse, error) { c, err := pool.GetDataTxClient(s.c.DataTxEndpoint) if err != nil { - err = errors.Wrap(err, "gateway: error calling GetOCMShareProviderClient") + err = errors.Wrap(err, "gateway: error calling GetDataTxClient") return &datatx.GetTransferStatusResponse{ Status: status.NewInternal(ctx, err, "error getting data transfer client"), }, nil @@ -64,7 +64,7 @@ func (s *svc) GetTransferStatus(ctx context.Context, req *datatx.GetTransferStat func (s *svc) CancelTransfer(ctx context.Context, req *datatx.CancelTransferRequest) (*datatx.CancelTransferResponse, error) { c, err := pool.GetDataTxClient(s.c.DataTxEndpoint) if err != nil { - err = errors.Wrap(err, "gateway: error calling GetOCMShareProviderClient") + err = errors.Wrap(err, "gateway: error calling GetDataTxClient") return &datatx.CancelTransferResponse{ Status: status.NewInternal(ctx, err, "error getting data transfer client"), }, nil @@ -77,3 +77,37 @@ func (s *svc) CancelTransfer(ctx context.Context, req *datatx.CancelTransferRequ return res, nil } + +func (s *svc) ListTransfers(ctx context.Context, req *datatx.ListTransfersRequest) (*datatx.ListTransfersResponse, error) { + c, err := pool.GetDataTxClient(s.c.DataTxEndpoint) + if err != nil { + err = errors.Wrap(err, "gateway: error calling GetDataTxClient") + return &datatx.ListTransfersResponse{ + Status: status.NewInternal(ctx, err, "error getting data transfer client"), + }, nil + } + + res, err := c.ListTransfers(ctx, req) + if err != nil { + return nil, errors.Wrap(err, "gateway: error calling ListTransfers") + } + + return res, nil +} + +func (s *svc) RetryTransfer(ctx context.Context, req *datatx.RetryTransferRequest) (*datatx.RetryTransferResponse, error) { + c, err := pool.GetDataTxClient(s.c.DataTxEndpoint) + if err != nil { + err = errors.Wrap(err, "gateway: error calling GetDataTxClient") + return &datatx.RetryTransferResponse{ + Status: status.NewInternal(ctx, err, "error getting data transfer client"), + }, nil + } + + res, err := c.RetryTransfer(ctx, req) + if err != nil { + return nil, errors.Wrap(err, "gateway: error calling RetryTransfer") + } + + return res, nil +} From 6cf498ada7d247dfe5891399502e2cb90c64547f Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Thu, 24 Jun 2021 14:13:51 +0200 Subject: [PATCH 02/21] Add changelog --- changelog/unreleased/pull-transfer.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 changelog/unreleased/pull-transfer.md diff --git a/changelog/unreleased/pull-transfer.md b/changelog/unreleased/pull-transfer.md new file mode 100644 index 0000000000..de908f781b --- /dev/null +++ b/changelog/unreleased/pull-transfer.md @@ -0,0 +1,4 @@ +Enhancement: New CS3API datatx methods + +CS3 datatx pull model methods: PullTransfer, RetryTransfer, ListTransfers +Method CreateTransfer removed. From a91aadff71a0058b76ecfdbe93401b6d42fa2f37 Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Thu, 24 Jun 2021 14:27:02 +0200 Subject: [PATCH 03/21] PR link added --- changelog/unreleased/pull-transfer.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/changelog/unreleased/pull-transfer.md b/changelog/unreleased/pull-transfer.md index de908f781b..b02fd62ac2 100644 --- a/changelog/unreleased/pull-transfer.md +++ b/changelog/unreleased/pull-transfer.md @@ -2,3 +2,5 @@ Enhancement: New CS3API datatx methods CS3 datatx pull model methods: PullTransfer, RetryTransfer, ListTransfers Method CreateTransfer removed. + +https://github.com/cs3org/reva/pull/1824 \ No newline at end of file From 2fae89ee4770e4d99072700d9f5e475f82fa95b7 Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Wed, 25 Aug 2021 11:38:28 +0200 Subject: [PATCH 04/21] new commands transfer-list, transfer-retry --- cmd/reva/main.go | 2 + cmd/reva/transfer-list.go | 91 ++++++++++++++++++++++++++++++++++++++ cmd/reva/transfer-retry.go | 84 +++++++++++++++++++++++++++++++++++ 3 files changed, 177 insertions(+) create mode 100644 cmd/reva/transfer-list.go create mode 100644 cmd/reva/transfer-retry.go diff --git a/cmd/reva/main.go b/cmd/reva/main.go index a77df4b34f..f98551d339 100644 --- a/cmd/reva/main.go +++ b/cmd/reva/main.go @@ -82,6 +82,8 @@ var ( transferCreateCommand(), transferGetStatusCommand(), transferCancelCommand(), + transferListCommand(), + transferRetryCommand(), appTokensListCommand(), appTokensRemoveCommand(), appTokensCreateCommand(), diff --git a/cmd/reva/transfer-list.go b/cmd/reva/transfer-list.go new file mode 100644 index 0000000000..f11721ee10 --- /dev/null +++ b/cmd/reva/transfer-list.go @@ -0,0 +1,91 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package main + +import ( + "encoding/gob" + "io" + "os" + + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1" + datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" + "github.com/jedib0t/go-pretty/table" +) + +func transferListCommand() *command { + cmd := newCommand("transfer-list") + cmd.Description = func() string { return "get a list of transfers" } + cmd.Usage = func() string { return "Usage: transfer-list [-flags]" } + filterShareId := cmd.String("shareId", "", "share ID filter (optional)") + + cmd.Action = func(w ...io.Writer) error { + ctx := getAuthContext() + client, err := getClient() + if err != nil { + return err + } + + // validate flags + var filters []*datatx.ListTransfersRequest_Filter + if *filterShareId != "" { + filters = append(filters, &datatx.ListTransfersRequest_Filter{ + Type: datatx.ListTransfersRequest_Filter_TYPE_SHARE_ID, + Term: &datatx.ListTransfersRequest_Filter_ShareId{ + ShareId: &ocm.ShareId{ + OpaqueId: *filterShareId, + }, + }, + }) + } + + transferslistRequest := &datatx.ListTransfersRequest{ + Filters: filters, + } + + listTransfersResponse, err := client.ListTransfers(ctx, transferslistRequest) + if err != nil { + return err + } + if listTransfersResponse.Status.Code != rpc.Code_CODE_OK { + return formatError(listTransfersResponse.Status) + } + + if len(w) == 0 { + t := table.NewWriter() + t.SetOutputMirror(os.Stdout) + t.AppendHeader(table.Row{"ShareId.OpaqueId", "Id.OpaqueId"}) + + for _, s := range listTransfersResponse.Transfers { + t.AppendRows([]table.Row{ + {s.ShareId.OpaqueId, s.Id.OpaqueId}, + }) + } + t.Render() + } else { + enc := gob.NewEncoder(w[0]) + if err := enc.Encode(listTransfersResponse.Transfers); err != nil { + return err + } + } + + return nil + } + return cmd +} diff --git a/cmd/reva/transfer-retry.go b/cmd/reva/transfer-retry.go new file mode 100644 index 0000000000..cc3e1643a1 --- /dev/null +++ b/cmd/reva/transfer-retry.go @@ -0,0 +1,84 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package main + +import ( + "encoding/gob" + "errors" + "io" + "os" + "time" + + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" + "github.com/jedib0t/go-pretty/table" +) + +func transferRetryCommand() *command { + cmd := newCommand("transfer-retry") + cmd.Description = func() string { return "retry a transfer" } + cmd.Usage = func() string { return "Usage: transfer-retry [-flags]" } + txId := cmd.String("txId", "", "the transfer identifier") + + cmd.Action = func(w ...io.Writer) error { + // validate flags + if *txId == "" { + return errors.New("txId must be specified: use -txId flag\n" + cmd.Usage()) + } + + ctx := getAuthContext() + client, err := getClient() + if err != nil { + return err + } + + retryRequest := &datatx.RetryTransferRequest{ + TxId: &datatx.TxId{ + OpaqueId: *txId, + }, + } + + retryResponse, err := client.RetryTransfer(ctx, retryRequest) + if err != nil { + return err + } + if retryResponse.Status.Code != rpc.Code_CODE_OK { + return formatError(retryResponse.Status) + } + + if len(w) == 0 { + t := table.NewWriter() + t.SetOutputMirror(os.Stdout) + t.AppendHeader(table.Row{"ShareId.OpaqueId", "Id.OpaqueId", "Status", "Ctime"}) + cTime := time.Unix(int64(retryResponse.TxInfo.Ctime.Seconds), int64(retryResponse.TxInfo.Ctime.Nanos)) + t.AppendRows([]table.Row{ + {retryResponse.TxInfo.ShareId.OpaqueId, retryResponse.TxInfo.Id.OpaqueId, retryResponse.TxInfo.Status, cTime}, + }) + t.Render() + } else { + enc := gob.NewEncoder(w[0]) + if err := enc.Encode(retryResponse.TxInfo); err != nil { + return err + } + } + + return nil + } + return cmd +} From e80ca961952e3b647cfcdde014e4c9a67dfdf66e Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Wed, 25 Aug 2021 11:42:05 +0200 Subject: [PATCH 05/21] Transfer id is parameter. Pretty output. --- cmd/reva/transfer-cancel.go | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/cmd/reva/transfer-cancel.go b/cmd/reva/transfer-cancel.go index aafcce950c..2eede16952 100644 --- a/cmd/reva/transfer-cancel.go +++ b/cmd/reva/transfer-cancel.go @@ -19,23 +19,27 @@ package main import ( + "encoding/gob" "errors" "io" + "os" + "time" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" + "github.com/jedib0t/go-pretty/table" ) func transferCancelCommand() *command { cmd := newCommand("transfer-cancel") cmd.Description = func() string { return "cancel a running transfer" } cmd.Usage = func() string { return "Usage: transfer-cancel [-flags]" } - txID := cmd.String("txID", "", "the transfer identifier") + txId := cmd.String("txId", "", "the transfer identifier") cmd.Action = func(w ...io.Writer) error { // validate flags - if *txID == "" { - return errors.New("txID must be specified: use -txID flag\n" + cmd.Usage()) + if *txId == "" { + return errors.New("txId must be specified: use -txId flag\n" + cmd.Usage()) } ctx := getAuthContext() @@ -44,7 +48,9 @@ func transferCancelCommand() *command { return err } - cancelRequest := &datatx.CancelTransferRequest{} + cancelRequest := &datatx.CancelTransferRequest{ + TxId: &datatx.TxId{OpaqueId: *txId}, + } cancelResponse, err := client.CancelTransfer(ctx, cancelRequest) if err != nil { @@ -54,6 +60,22 @@ func transferCancelCommand() *command { return formatError(cancelResponse.Status) } + if len(w) == 0 { + t := table.NewWriter() + t.SetOutputMirror(os.Stdout) + t.AppendHeader(table.Row{"ShareId.OpaqueId", "Id.OpaqueId", "Status", "Ctime"}) + cTime := time.Unix(int64(cancelResponse.TxInfo.Ctime.Seconds), int64(cancelResponse.TxInfo.Ctime.Nanos)) + t.AppendRows([]table.Row{ + {cancelResponse.TxInfo.ShareId.OpaqueId, cancelResponse.TxInfo.Id.OpaqueId, cancelResponse.TxInfo.Status, cTime.Format("Mon Jan 2 15:04:05 -0700 MST 2006")}, + }) + t.Render() + } else { + enc := gob.NewEncoder(w[0]) + if err := enc.Encode(cancelResponse.TxInfo); err != nil { + return err + } + } + return nil } return cmd From ff69a4092481273e244180a0abc11d5234b853e3 Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Wed, 25 Aug 2021 11:43:05 +0200 Subject: [PATCH 06/21] Transfer id is parameter. Pretty output. --- cmd/reva/transfer-get-status.go | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/cmd/reva/transfer-get-status.go b/cmd/reva/transfer-get-status.go index bef584c913..0fd9f74a7d 100644 --- a/cmd/reva/transfer-get-status.go +++ b/cmd/reva/transfer-get-status.go @@ -19,23 +19,27 @@ package main import ( + "encoding/gob" "errors" "io" + "os" + "time" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" + "github.com/jedib0t/go-pretty/table" ) func transferGetStatusCommand() *command { cmd := newCommand("transfer-get-status") cmd.Description = func() string { return "get the status of a transfer" } cmd.Usage = func() string { return "Usage: transfer-get-status [-flags]" } - txID := cmd.String("txID", "", "the transfer identifier") + txId := cmd.String("txId", "", "the transfer identifier") cmd.Action = func(w ...io.Writer) error { // validate flags - if *txID == "" { - return errors.New("txID must be specified: use -txID flag\n" + cmd.Usage()) + if *txId == "" { + return errors.New("txId must be specified: use -txId flag\n" + cmd.Usage()) } ctx := getAuthContext() @@ -44,7 +48,9 @@ func transferGetStatusCommand() *command { return err } - getStatusRequest := &datatx.GetTransferStatusRequest{} + getStatusRequest := &datatx.GetTransferStatusRequest{ + TxId: &datatx.TxId{OpaqueId: *txId}, + } getStatusResponse, err := client.GetTransferStatus(ctx, getStatusRequest) if err != nil { @@ -54,6 +60,22 @@ func transferGetStatusCommand() *command { return formatError(getStatusResponse.Status) } + if len(w) == 0 { + t := table.NewWriter() + t.SetOutputMirror(os.Stdout) + t.AppendHeader(table.Row{"ShareId.OpaqueId", "Id.OpaqueId", "Status", "Ctime"}) + cTime := time.Unix(int64(getStatusResponse.TxInfo.Ctime.Seconds), int64(getStatusResponse.TxInfo.Ctime.Nanos)) + t.AppendRows([]table.Row{ + {getStatusResponse.TxInfo.ShareId.OpaqueId, getStatusResponse.TxInfo.Id.OpaqueId, getStatusResponse.TxInfo.Status, cTime.Format("Mon Jan 2 15:04:05 -0700 MST 2006")}, + }) + t.Render() + } else { + enc := gob.NewEncoder(w[0]) + if err := enc.Encode(getStatusResponse.TxInfo); err != nil { + return err + } + } + return nil } return cmd From 50e31f9a250f6b4a4bf4b96227268fb9566df811 Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Wed, 25 Aug 2021 11:45:02 +0200 Subject: [PATCH 07/21] Add pkg datatx loader. --- cmd/revad/runtime/loader.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/revad/runtime/loader.go b/cmd/revad/runtime/loader.go index b7a3f3ce10..2dabdd5029 100644 --- a/cmd/revad/runtime/loader.go +++ b/cmd/revad/runtime/loader.go @@ -33,6 +33,7 @@ import ( _ "github.com/cs3org/reva/pkg/auth/manager/loader" _ "github.com/cs3org/reva/pkg/auth/registry/loader" _ "github.com/cs3org/reva/pkg/cbox/loader" + _ "github.com/cs3org/reva/pkg/datatx/manager/loader" _ "github.com/cs3org/reva/pkg/group/manager/loader" _ "github.com/cs3org/reva/pkg/metrics/driver/loader" _ "github.com/cs3org/reva/pkg/ocm/invite/manager/loader" From f7a0ae410136a1a5534429539b27132741157627 Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Wed, 25 Aug 2021 11:46:45 +0200 Subject: [PATCH 08/21] Data transfers folder renamed. --- pkg/storage/utils/localfs/localfs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/utils/localfs/localfs.go b/pkg/storage/utils/localfs/localfs.go index 4c1a68f225..dce53038d8 100644 --- a/pkg/storage/utils/localfs/localfs.go +++ b/pkg/storage/utils/localfs/localfs.go @@ -76,7 +76,7 @@ func (c *Config) init() { } if c.DataTransfersFolder == "" { - c.DataTransfersFolder = "/Data-Transfers" + c.DataTransfersFolder = "/DataTransfers" } // ensure share folder always starts with slash From 67a1d20afef8404648d9f90429c1b140c7658959 Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Wed, 25 Aug 2021 11:49:10 +0200 Subject: [PATCH 09/21] Implement datatx pull model and methods. --- internal/grpc/services/datatx/datatx.go | 322 +++++++++++++++++- internal/grpc/services/gateway/gateway.go | 2 +- .../grpc/services/gateway/ocmshareprovider.go | 108 ++++++ 3 files changed, 418 insertions(+), 14 deletions(-) diff --git a/internal/grpc/services/datatx/datatx.go b/internal/grpc/services/datatx/datatx.go index 3845469b08..0a25f51241 100644 --- a/internal/grpc/services/datatx/datatx.go +++ b/internal/grpc/services/datatx/datatx.go @@ -20,11 +20,23 @@ package datatx import ( "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/url" + "os" + "path" + "sync" + ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1" datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" + types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + txdriver "github.com/cs3org/reva/pkg/datatx" + txregistry "github.com/cs3org/reva/pkg/datatx/manager/registry" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/rgrpc" "github.com/cs3org/reva/pkg/rgrpc/status" + "github.com/cs3org/reva/pkg/token" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "google.golang.org/grpc" @@ -35,23 +47,68 @@ func init() { } type config struct { + // transfer driver + TxDriver string `mapstructure:"txdriver"` + TxDrivers map[string]map[string]interface{} `mapstructure:"txdrivers"` + // storage driver to persist share/transfer relation + StorageDriver string `mapstructure:"storage_driver"` + StorageDrivers map[string]map[string]interface{} `mapstructure:"storage_drivers"` + TxSharesFile string `mapstructure:"tx_shares_file"` + DataTransfersFolder string `mapstructure:"data_transfers_folder"` } type service struct { - conf *config + conf *config + txManager txdriver.Manager + txShareDriver *txShareDriver +} + +type txShareDriver struct { + sync.Mutex // concurrent access to the file + model *txShareModel +} +type txShareModel struct { + File string + TxShares map[string]*txShare `json:"shares"` +} + +type txShare struct { + TxID string + TargetUri string + Opaque *types.Opaque `json:"opaque"` +} + +type webdavEndpoint struct { + filePath string + endpoint string + endpointScheme string + token string } func (c *config) init() { + if c.TxDriver == "" { + c.TxDriver = "rclone" + } + if c.DataTransfersFolder == "" { + c.DataTransfersFolder = "DataTransfers" + } } func (s *service) Register(ss *grpc.Server) { datatx.RegisterTxAPIServer(ss, s) } +func getDatatxManager(c *config) (txdriver.Manager, error) { + if f, ok := txregistry.NewFuncs[c.TxDriver]; ok { + return f(c.TxDrivers[c.TxDriver]) + } + return nil, errtypes.NotFound("datatx service: driver not found: " + c.TxDriver) +} + func parseConfig(m map[string]interface{}) (*config, error) { c := &config{} if err := mapstructure.Decode(m, c); err != nil { - err = errors.Wrap(err, "error decoding conf") + err = errors.Wrap(err, "datatx service: error decoding conf") return nil, err } return c, nil @@ -66,8 +123,24 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) { } c.init() + txManager, err := getDatatxManager(c) + if err != nil { + return nil, err + } + + model, err := loadOrCreate(c.TxSharesFile) + if err != nil { + err = errors.Wrap(err, "datatx service: error loading the file containing the transfer shares") + return nil, err + } + txShareDriver := &txShareDriver{ + model: model, + } + service := &service{ - conf: c, + conf: c, + txManager: txManager, + txShareDriver: txShareDriver, } return service, nil @@ -82,31 +155,254 @@ func (s *service) UnprotectedEndpoints() []string { } func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) { + fmt.Printf("PullTransfer reached: req: %v\n", req) + var srcRemote string + var srcPath string + var srcToken string + var dstRemote string + var dstPath string + var dstToken string + srcRemote = "" + srcPath = "" + srcToken = "" + dstRemote = "" + dstPath = "" + dstToken = "" + + ep, err := s.extractEndpointInfo(ctx, req.GetTargetUri()) + if err != nil { + return nil, err + } + srcRemote = fmt.Sprintf("%s://%s", ep.endpointScheme, ep.endpoint) + srcPath = ep.filePath + srcToken = ep.token + // destination(grantee) webdav endpoint + // user := user.ContextMustGetUser(ctx) -> user.Id.Idp + endpoint, ok := req.Opaque.Map["endpoint"] + if !ok { + return nil, errtypes.NotSupported("endpoint not defined") + } + dstRemote = string(endpoint.Value) + // // home dir prefix must be removed from the path + // dstPath = path.Join(s.conf.DataTransfersFolder, strings.TrimPrefix(ep.filePath, "/home")) + dstPath = path.Join(s.conf.DataTransfersFolder, ep.filePath) + dstToken = token.ContextMustGetToken(ctx) + + txInfo, err := s.txManager.StartTransfer(ctx, srcRemote, srcPath, srcToken, dstRemote, dstPath, dstToken) + if err != nil { + err = errors.Wrap(err, "datatx service: error starting transfer job") + return &datatx.PullTransferResponse{ + Status: status.NewInvalid(ctx, "datatx service: error pulling transfer"), + TxInfo: txInfo, + }, err + } + fmt.Printf("err: %v\n", err) + fmt.Printf("txInfo Status: %v\n", txInfo.Status) + fmt.Printf("txInfo TxID: %v\n", txInfo.GetId().OpaqueId) + fmt.Printf("txInfo Ctime: %v\n", txInfo.GetCtime()) + + txShare := &txShare{ + TxID: txInfo.GetId().OpaqueId, + TargetUri: req.TargetUri, + Opaque: req.Opaque, + } + + s.txShareDriver.Lock() + defer s.txShareDriver.Unlock() + + s.txShareDriver.model.TxShares[txInfo.GetId().OpaqueId] = txShare + + if err := s.txShareDriver.model.saveTxShare(); err != nil { + err = errors.Wrap(err, "datatx service: error saving transfer share: "+datatx.Status_STATUS_INVALID.String()) + return &datatx.PullTransferResponse{ + Status: status.NewInvalid(ctx, "error pulling transfer"), + }, err + } + return &datatx.PullTransferResponse{ - Status: status.NewUnimplemented(ctx, errtypes.NotSupported("PullTransfer not implemented"), "PullTransfer not implemented"), - }, nil + Status: status.NewOK(ctx), + TxInfo: txInfo, + }, err } -func (s *service) GetTransferStatus(ctx context.Context, in *datatx.GetTransferStatusRequest) (*datatx.GetTransferStatusResponse, error) { +func (s *service) GetTransferStatus(ctx context.Context, req *datatx.GetTransferStatusRequest) (*datatx.GetTransferStatusResponse, error) { + txShare, ok := s.txShareDriver.model.TxShares[req.GetTxId().GetOpaqueId()] + if !ok { + return nil, errtypes.InternalError("datatx service: transfer not found") + } + + txInfo, err := s.txManager.GetTransferStatus(ctx, req.GetTxId().OpaqueId) + if err != nil { + err = errors.Wrap(err, "datatx service: error retrieving transfer status") + return &datatx.GetTransferStatusResponse{ + Status: status.NewInternal(ctx, err, "datatx service: error getting transfer status"), + TxInfo: txInfo, + }, err + } + + txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)} + return &datatx.GetTransferStatusResponse{ - Status: status.NewUnimplemented(ctx, errtypes.NotSupported("GetTransferStatus not implemented"), "GetTransferStatus not implemented"), + Status: status.NewOK(ctx), + TxInfo: txInfo, }, nil } -func (s *service) CancelTransfer(ctx context.Context, in *datatx.CancelTransferRequest) (*datatx.CancelTransferResponse, error) { +func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransferRequest) (*datatx.CancelTransferResponse, error) { + txShare, ok := s.txShareDriver.model.TxShares[req.GetTxId().GetOpaqueId()] + if !ok { + return nil, errtypes.InternalError("datatx service: transfer not found") + } + + txInfo, err := s.txManager.CancelTransfer(ctx, req.GetTxId().OpaqueId) + if err != nil { + txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)} + txInfo.Status = datatx.Status_STATUS_TRANSFER_CANCELLED + return &datatx.CancelTransferResponse{ + Status: status.NewOK(ctx), + TxInfo: txInfo, + }, nil + + // err = errors.Wrap(err, "datatx service: error cancelling transfer") + // return &datatx.CancelTransferResponse{ + // Status: status.NewInternal(ctx, err, "error cancelling transfer"), + // TxInfo: txInfo, + // }, err + } + + txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)} + return &datatx.CancelTransferResponse{ - Status: status.NewUnimplemented(ctx, errtypes.NotSupported("CancelTransfer not implemented"), "CancelTransfer not implemented"), + Status: status.NewOK(ctx), + TxInfo: txInfo, }, nil } -func (s *service) ListTransfers(ctx context.Context, in *datatx.ListTransfersRequest) (*datatx.ListTransfersResponse, error) { +func (s *service) ListTransfers(ctx context.Context, req *datatx.ListTransfersRequest) (*datatx.ListTransfersResponse, error) { + filters := req.Filters + var txInfos []*datatx.TxInfo + for _, txShare := range s.txShareDriver.model.TxShares { + if len(filters) == 0 { + txInfos = append(txInfos, &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txShare.TxID}, + ShareId: &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)}, + }) + } else { + for _, f := range filters { + if f.Type == datatx.ListTransfersRequest_Filter_TYPE_SHARE_ID { + if f.GetShareId().GetOpaqueId() == string(txShare.Opaque.Map["shareId"].Value) { + txInfos = append(txInfos, &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txShare.TxID}, + ShareId: &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)}, + }) + } + } + } + } + } + return &datatx.ListTransfersResponse{ - Status: status.NewUnimplemented(ctx, errtypes.NotSupported("ListTransfers not implemented"), "ListTransfers not implemented"), + Status: status.NewOK(ctx), + Transfers: txInfos, }, nil } -func (s *service) RetryTransfer(ctx context.Context, in *datatx.RetryTransferRequest) (*datatx.RetryTransferResponse, error) { +func (s *service) RetryTransfer(ctx context.Context, req *datatx.RetryTransferRequest) (*datatx.RetryTransferResponse, error) { + txShare, ok := s.txShareDriver.model.TxShares[req.GetTxId().GetOpaqueId()] + if !ok { + return nil, errtypes.InternalError("datatx service: transfer not found") + } + + txInfo, err := s.txManager.RetryTransfer(ctx, req.GetTxId().OpaqueId) + if err != nil { + err = errors.Wrap(err, "datatx service: error retrying transfer") + return &datatx.RetryTransferResponse{ + Status: status.NewInternal(ctx, err, "error retrying transfer"), + TxInfo: txInfo, + }, err + } + + txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)} + return &datatx.RetryTransferResponse{ - Status: status.NewUnimplemented(ctx, errtypes.NotSupported("RetryTransfer not implemented"), "RetryTransfer not implemented"), + Status: status.NewOK(ctx), + TxInfo: txInfo, + }, nil +} + +func (s *service) extractEndpointInfo(ctx context.Context, targetURL string) (*webdavEndpoint, error) { + if targetURL == "" { + return nil, errtypes.BadRequest("datatx service: ref target is an empty uri") + } + + uri, err := url.Parse(targetURL) + if err != nil { + return nil, errors.Wrap(err, "datatx service: error parsing target uri: "+targetURL) + } + if uri.Scheme != "datatx" { + return nil, errtypes.NotSupported("datatx service: ref target does not have the datatx scheme") + } + + m, err := url.ParseQuery(uri.RawQuery) + if err != nil { + return nil, errors.Wrap(err, "datatx service: error parsing target resource name") + } + + return &webdavEndpoint{ + filePath: m["name"][0], + endpoint: uri.Host + uri.Path, + endpointScheme: m["endpointscheme"][0], + token: uri.User.String(), }, nil } + +func loadOrCreate(file string) (*txShareModel, error) { + _, err := os.Stat(file) + if os.IsNotExist(err) { + if err := ioutil.WriteFile(file, []byte("{}"), 0700); err != nil { + err = errors.Wrap(err, "datatx service: error creating the transfer shares storage file: "+file) + return nil, err + } + } + + fd, err := os.OpenFile(file, os.O_CREATE, 0644) + if err != nil { + err = errors.Wrap(err, "datatx service: error opening the transfer shares storage file: "+file) + return nil, err + } + defer fd.Close() + + data, err := ioutil.ReadAll(fd) + if err != nil { + err = errors.Wrap(err, "datatx service: error reading the data") + return nil, err + } + + model := &txShareModel{} + if err := json.Unmarshal(data, model); err != nil { + err = errors.Wrap(err, "datatx service: error decoding transfer shares data to json") + return nil, err + } + + if model.TxShares == nil { + model.TxShares = make(map[string]*txShare) + } + + model.File = file + return model, nil +} + +func (m *txShareModel) saveTxShare() error { + data, err := json.Marshal(m) + if err != nil { + err = errors.Wrap(err, "datatx service: error encoding transfer share data to json") + return err + } + + if err := ioutil.WriteFile(m.File, data, 0644); err != nil { + err = errors.Wrap(err, "datatx service: error writing transfer share data to file: "+m.File) + return err + } + + return nil +} diff --git a/internal/grpc/services/gateway/gateway.go b/internal/grpc/services/gateway/gateway.go index 25819dfb22..5108abf501 100644 --- a/internal/grpc/services/gateway/gateway.go +++ b/internal/grpc/services/gateway/gateway.go @@ -83,7 +83,7 @@ func (c *config) init() { c.ShareFolder = strings.Trim(c.ShareFolder, "/") if c.DataTransfersFolder == "" { - c.DataTransfersFolder = "Data-Transfers" + c.DataTransfersFolder = "DataTransfers" } if c.TokenManager == "" { diff --git a/internal/grpc/services/gateway/ocmshareprovider.go b/internal/grpc/services/gateway/ocmshareprovider.go index bd7ebb76bb..fa6f5f1b1c 100644 --- a/internal/grpc/services/gateway/ocmshareprovider.go +++ b/internal/grpc/services/gateway/ocmshareprovider.go @@ -21,11 +21,16 @@ package gateway import ( "context" "fmt" + "net/url" "path" + "strings" + ocmprovider "github.com/cs3org/go-cs3apis/cs3/ocm/provider/v1beta1" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" + types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/rgrpc/status" @@ -262,6 +267,109 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive panic("gateway: error updating a received share: the share is nil") } + if share.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER { + idp := share.GetShare().GetOwner().GetIdp() + meshProvider, err := s.GetInfoByDomain(ctx, &ocmprovider.GetInfoByDomainRequest{ + Domain: idp, + }) + if err != nil { + log.Err(err).Msg("gateway: error calling GetInfoByDomain") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{ + Code: rpc.Code_CODE_INTERNAL, + }, + }, nil + } + var endpoint string + var endpointScheme string + for _, s := range meshProvider.ProviderInfo.Services { + fmt.Printf("provider info service: %v\n", s) + fmt.Printf(" endpoint type name: %v\n", s.Endpoint.Type.Name) + fmt.Printf(" endpoint Path: %v\n", s.Endpoint.Path) + fmt.Printf(" service host: %v\n", s.GetHost()) + if strings.ToLower(s.Endpoint.Type.Name) == "webdav" { + url, _ := url.Parse(s.Endpoint.Path) + endpoint = url.Host + url.Path + endpointScheme = url.Scheme + break + } + } + + var token string + tokenOpaque, ok := share.GetShare().Grantee.Opaque.Map["token"] + if !ok { + return &ocm.UpdateReceivedOCMShareResponse{ + Status: status.NewNotFound(ctx, "token not found"), + }, nil + } + switch tokenOpaque.Decoder { + case "plain": + token = string(tokenOpaque.Value) + default: + err := errtypes.NotSupported("opaque entry decoder not recognized: " + tokenOpaque.Decoder) + return &ocm.UpdateReceivedOCMShareResponse{ + Status: status.NewInternal(ctx, err, "error updating received share"), + }, nil + } + + // TODO we provide all necessary info with the targetURI + // either provide this in a 'proper' way or, + // inject the necessary services into datatx service and resolve everything there + targetURI := fmt.Sprintf("://%s@%s?name=%s&endpointscheme=%s", token, endpoint, share.GetShare().Name, endpointScheme) + // src: https(from src webdav endoint)://token@srcwebdavendpoint?name=path + // target idem taken from the grantee + // /home/DataTransfers/home/mytransfer/innerfolder/ + fmt.Printf("idp: %v\n", idp) + fmt.Printf("endpoint: %v\n", endpoint) + fmt.Printf("token: %v\n", token) + fmt.Printf("targetURI: %v\n", targetURI) + + // get the webdav endpoint of the grantee's idp + // assume grantee is of type user + granteeIdpEndpoint, err := s.getWebdavEndpoint(ctx, share.GetShare().GetGrantee().GetUserId().Idp) + if err != nil { + log.Err(err).Msg("gateway: error calling PullTransfer") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{ + Code: rpc.Code_CODE_INTERNAL, + }, + }, nil + } + + opaqueObj := &types.Opaque{ + Map: map[string]*types.OpaqueEntry{ + "shareId": &types.OpaqueEntry{ + Decoder: "plain", + Value: []byte(share.GetShare().GetId().OpaqueId), + }, + "endpoint": &types.OpaqueEntry{ + Decoder: "plain", + Value: []byte(granteeIdpEndpoint), + }, + }, + } + req := &datatx.PullTransferRequest{ + TargetUri: targetURI, + Opaque: opaqueObj, + } + res, err := s.PullTransfer(ctx, req) + if err != nil { + log.Err(err).Msg("gateway: error calling PullTransfer") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{ + Code: rpc.Code_CODE_INTERNAL, + }, + }, err + } + + log.Info().Msgf("gateway: PullTransfer: %v", res.TxInfo) + + // do not create an OCM reference, just return + return &ocm.UpdateReceivedOCMShareResponse{ + Status: status.NewOK(ctx), + }, nil + } + createRefStatus, err := s.createOCMReference(ctx, share.Share) return &ocm.UpdateReceivedOCMShareResponse{ Status: createRefStatus, From eb62539c2997883a45d21da44c23e3a5ff8f52f2 Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Wed, 25 Aug 2021 11:51:22 +0200 Subject: [PATCH 10/21] pkg datatx (rclone) implemented. --- pkg/datatx/datatx.go | 38 ++ pkg/datatx/manager/loader/loader.go | 25 + pkg/datatx/manager/rclone/rclone.go | 775 ++++++++++++++++++++++++ pkg/datatx/manager/registry/registry.go | 36 ++ 4 files changed, 874 insertions(+) create mode 100644 pkg/datatx/datatx.go create mode 100644 pkg/datatx/manager/loader/loader.go create mode 100644 pkg/datatx/manager/rclone/rclone.go create mode 100644 pkg/datatx/manager/registry/registry.go diff --git a/pkg/datatx/datatx.go b/pkg/datatx/datatx.go new file mode 100644 index 0000000000..cc4a65584e --- /dev/null +++ b/pkg/datatx/datatx.go @@ -0,0 +1,38 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package datatx + +import ( + "context" + + datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" +) + +// Manager the interface any transfer driver should implement +type Manager interface { + // StartTransfer initiates a transfer job and returns a TxInfo object that includes a unique transfer id. + StartTransfer(ctx context.Context, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) + // GetTransferStatus returns a TxInfo object that includes the current status. + GetTransferStatus(ctx context.Context, transferId string) (*datatx.TxInfo, error) + // CancelTransfer cancels the transfer and returns a TxInfo object that includes the current status. + CancelTransfer(ctx context.Context, transferId string) (*datatx.TxInfo, error) + // RetryTransfer retries the transfer with the specified transfer ID. + // Note that tokens must still be valid. + RetryTransfer(ctx context.Context, transferId string) (*datatx.TxInfo, error) +} diff --git a/pkg/datatx/manager/loader/loader.go b/pkg/datatx/manager/loader/loader.go new file mode 100644 index 0000000000..28df90ed12 --- /dev/null +++ b/pkg/datatx/manager/loader/loader.go @@ -0,0 +1,25 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package loader + +import ( + // Load datatx drivers. + _ "github.com/cs3org/reva/pkg/datatx/manager/rclone" + // Add your own here +) diff --git a/pkg/datatx/manager/rclone/rclone.go b/pkg/datatx/manager/rclone/rclone.go new file mode 100644 index 0000000000..ea0dd9ded9 --- /dev/null +++ b/pkg/datatx/manager/rclone/rclone.go @@ -0,0 +1,775 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package rclone + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "os" + "path" + "strconv" + "sync" + "time" + + datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" + typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/cs3org/reva/pkg/appctx" + txdriver "github.com/cs3org/reva/pkg/datatx" + registry "github.com/cs3org/reva/pkg/datatx/manager/registry" + "github.com/cs3org/reva/pkg/rhttp" + "github.com/google/uuid" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" +) + +func init() { + registry.Register("rclone", New) +} + +func (c *config) init(m map[string]interface{}) { + // set sane defaults + if c.File == "" { + c.File = "/var/tmp/reva/datatx-transfers.json" + } + if c.JobStatusCheckInterval == 0 { + c.JobStatusCheckInterval = 2000 + } + if c.JobTimeout == 0 { + c.JobTimeout = 50000 + } +} + +type config struct { + Endpoint string `mapstructure:"endpoint"` + AuthUser string `mapstructure:"auth_user"` // rclone basicauth user + AuthPass string `mapstructure:"auth_pass"` // rclone basicauth pass + File string `mapstructure:"file"` + JobStatusCheckInterval int `mapstructure:"job_status_check_interval"` + JobTimeout int `mapstructure:"job_timeout"` +} + +type rclone struct { + config *config + client *http.Client + pDriver *pDriver +} + +type transferModel struct { + File string + Transfers map[string]*transfer `json:"transfers"` +} + +// persistency driver +type pDriver struct { + sync.Mutex // concurrent access to the file + model *transferModel +} + +type transfer struct { + TransferID string + JobID int64 + TransferStatus datatx.Status + SrcToken string + SrcRemote string + SrcPath string + DestToken string + DestRemote string + DestPath string + Ctime string +} + +// txEndStatuses final statuses that cannot be changed anymore +var txEndStatuses = map[string]int32{ + "STATUS_INVALID": 0, + "STATUS_DESTINATION_NOT_FOUND": 1, + "STATUS_TRANSFER_COMPLETE": 6, + "STATUS_TRANSFER_FAILED": 7, + "STATUS_TRANSFER_CANCELLED": 8, + "STATUS_TRANSFER_CANCEL_FAILED": 9, + "STATUS_TRANSFER_EXPIRED": 10, +} + +// New returns a new rclone driver +func New(m map[string]interface{}) (txdriver.Manager, error) { + c, err := parseConfig(m) + if err != nil { + return nil, err + } + c.init(m) + + // TODO insecure should be configurable + client := rhttp.GetHTTPClient(rhttp.Insecure(true)) + + // The persistency driver + // Load or create 'db' + model, err := loadOrCreate(c.File) + if err != nil { + err = errors.Wrap(err, "error loading the file containing the transfers") + return nil, err + } + pDriver := &pDriver{ + model: model, + } + + return &rclone{ + config: c, + client: client, + pDriver: pDriver, + }, nil +} + +func parseConfig(m map[string]interface{}) (*config, error) { + c := &config{} + if err := mapstructure.Decode(m, c); err != nil { + err = errors.Wrap(err, "error decoding conf") + return nil, err + } + return c, nil +} + +func loadOrCreate(file string) (*transferModel, error) { + _, err := os.Stat(file) + if os.IsNotExist(err) { + if err := ioutil.WriteFile(file, []byte("{}"), 0700); err != nil { + err = errors.Wrap(err, "error creating the transfers storage file: "+file) + return nil, err + } + } + + fd, err := os.OpenFile(file, os.O_CREATE, 0644) + if err != nil { + err = errors.Wrap(err, "error opening the transfers storage file: "+file) + return nil, err + } + defer fd.Close() + + data, err := ioutil.ReadAll(fd) + if err != nil { + err = errors.Wrap(err, "error reading the data") + return nil, err + } + + model := &transferModel{} + if err := json.Unmarshal(data, model); err != nil { + err = errors.Wrap(err, "error decoding transfers data to json") + return nil, err + } + + if model.Transfers == nil { + model.Transfers = make(map[string]*transfer) + } + + model.File = file + return model, nil +} + +// saveTransfer saves the transfer. If an error is specified than that error will be returned, possibly wrapped with additional errors. +func (m *transferModel) saveTransfer(e error) error { + data, err := json.Marshal(m) + if err != nil { + e = errors.Wrap(err, "error encoding transfer data to json") + return err + } + + if err := ioutil.WriteFile(m.File, data, 0644); err != nil { + e = errors.Wrap(err, "error writing transfer data to file: "+m.File) + return err + } + + return e +} + +// StartTransfer initiates a transfer job and returns a TxInfo object that includes a unique transfer id. +func (driver *rclone) StartTransfer(ctx context.Context, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) { + return driver.startJob(ctx, "", srcRemote, srcPath, srcToken, destRemote, destPath, destToken) +} + +// startJob starts a transfer job. Retries a previous job if transferId is specified. +func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) { + fmt.Printf("rclone StartTransfer:\n srcRemote: %v \n srcPath: %v \n srcToken: %v \n destRemote: %v \n destPath: %v \n destToken: %v \n", srcRemote, srcPath, srcToken, destRemote, destPath, destToken) + + logger := appctx.GetLogger(ctx) + + driver.pDriver.Lock() + defer driver.pDriver.Unlock() + + var txId string + var cTime *typespb.Timestamp + + if transferId == "" { + txId = uuid.New().String() + cTime = &typespb.Timestamp{Seconds: uint64(time.Now().Unix())} + } else { // restart existing transfer if transferId is specified + fmt.Printf("Restarting transfer (txId: %s)\n", transferId) + txId = transferId + transfer, err := driver.pDriver.model.getTransfer(txId) + if err != nil { + err = errors.Wrap(err, "rclone: error retrying transfer (transferId: "+txId+")") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: nil, + }, err + } + seconds, _ := strconv.ParseInt(transfer.Ctime, 10, 64) + cTime = &typespb.Timestamp{Seconds: uint64(seconds)} + _, endStatusFound := txEndStatuses[transfer.TransferStatus.String()] + fmt.Printf("retrying transfer with status (%s)\n", transfer.TransferStatus.String()) + if !endStatusFound { + fmt.Println("transfer not in end status, unable to restart") + err := errors.New("rclone: transfer still running, unable to restart") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txId}, + Status: transfer.TransferStatus, + Ctime: cTime, + }, err + } + srcToken = transfer.SrcToken + srcRemote = transfer.SrcRemote + srcPath = transfer.SrcPath + destToken = transfer.DestToken + destRemote = transfer.DestRemote + destPath = transfer.DestPath + delete(driver.pDriver.model.Transfers, txId) + } + + transferStatus := datatx.Status_STATUS_TRANSFER_NEW + + transfer := &transfer{ + TransferID: txId, + JobID: int64(-1), + TransferStatus: transferStatus, + SrcToken: srcToken, + SrcRemote: srcRemote, + SrcPath: srcPath, + DestToken: destToken, + DestRemote: destRemote, + DestPath: destPath, + Ctime: fmt.Sprint(cTime.Seconds), // TODO do we need nanos here? + } + + driver.pDriver.model.Transfers[txId] = transfer + // if err := driver.pDriver.model.saveTransfer(nil); err != nil { + // err = errors.Wrap(err, "rclone: error pulling transfer") + // return &datatx.TxInfo{ + // Id: &datatx.TxId{OpaqueId: txId}, + // Status: datatx.Status_STATUS_INVALID, + // Ctime: cTime, + // }, err + // } + + type rcloneAsyncReqJSON struct { + SrcFs string `json:"srcFs"` + SrcToken string `json:"srcToken"` + DstFs string `json:"dstFs"` + DstToken string `json:"destToken"` + Async bool `json:"_async"` + } + // TODO what about the url schema? + srcFs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":%v", srcToken, srcRemote, srcPath) + dstFs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":%v", destToken, destRemote, destPath) + fmt.Printf("srcFs: %v\n", srcFs) + fmt.Printf("dstFs: %v\n", dstFs) + + rcloneReq := &rcloneAsyncReqJSON{ + SrcFs: srcFs, + DstFs: dstFs, + Async: true, + } + data, err := json.Marshal(rcloneReq) + if err != nil { + err = errors.Wrap(err, "rclone: error pulling transfer: error marshalling rclone req data") + transfer.TransferStatus = datatx.Status_STATUS_INVALID + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(err) + } + + pathIsFolder, err := driver.srcPathIsFolder() + if err != nil { + err = errors.Wrap(err, "rclone: error pulling transfer: error stating src path") + transfer.TransferStatus = datatx.Status_STATUS_INVALID + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(err) + } + transferFileMethod := "/operations/copyfile" + if pathIsFolder { + // TODO sync/copy will overwrite existing data; use a configurable check for this? + // But not necessary if unique folder per transfer + transferFileMethod = "/sync/copy" + } + + u, err := url.Parse(driver.config.Endpoint) + if err != nil { + err = errors.Wrap(err, "rclone: error pulling transfer: error parsing driver endpoint") + transfer.TransferStatus = datatx.Status_STATUS_INVALID + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(err) + } + u.Path = path.Join(u.Path, transferFileMethod) + requestURL := u.String() + req, err := http.NewRequest("POST", requestURL, bytes.NewReader(data)) + if err != nil { + err = errors.Wrap(err, "rclone: error pulling transfer: error framing post request") + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txId}, + Status: transfer.TransferStatus, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(err) + } + req.Header.Set("Content-Type", "application/json") + + req.SetBasicAuth(driver.config.AuthUser, driver.config.AuthPass) + + res, err := driver.client.Do(req) + if err != nil { + err = errors.Wrap(err, "rclone: error pulling transfer: error sending post request") + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txId}, + Status: transfer.TransferStatus, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(err) + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + resBody, err := ioutil.ReadAll(res.Body) + if err != nil { + err = errors.Wrap(err, "rclone: error reading response body") + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txId}, + Status: transfer.TransferStatus, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(err) + } + e := errors.New("rclone: rclone request responded with error: " + fmt.Sprintf("%s: %s", res.Status, string(resBody))) + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txId}, + Status: transfer.TransferStatus, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(e) + } + + type rcloneAsyncResJSON struct { + JobID int64 `json:"jobid"` + } + var resData rcloneAsyncResJSON + if err = json.NewDecoder(res.Body).Decode(&resData); err != nil { + err = errors.Wrap(err, "rclone: error decoding response data") + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txId}, + Status: transfer.TransferStatus, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(err) + } + + transfer.JobID = resData.JobID + + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + err = errors.Wrap(err, "rclone: error pulling transfer") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: cTime, + }, err + } + + // start separate dedicated process to periodically check this transfer progress + go func() { + // runs for as long as no end state or time out has been reached + startTimeMs := time.Now().Nanosecond() / 1000 + timeout := driver.config.JobTimeout + + driver.pDriver.Lock() + defer driver.pDriver.Unlock() + + for { + transfer, err := driver.pDriver.model.getTransfer(txId) + fmt.Printf("found transfer to check: %v\n", transfer) + if err != nil { + transfer.TransferStatus = datatx.Status_STATUS_INVALID + err = driver.pDriver.model.saveTransfer(err) + logger.Error().Err(err).Msgf("rclone driver: unable to retreive transfer with id: %v", txId) + break + } + + // check for end status first + endStatus, endStatusFound := txEndStatuses[transfer.TransferStatus.String()] + if endStatusFound { + logger.Info().Msgf("rclone driver: endstatus reached: %v", endStatus) + break + } + + // check for possible timeout and if true were done + currentTimeMs := time.Now().Nanosecond() / 1000 + timePastMs := currentTimeMs - startTimeMs + + if timePastMs > timeout { + logger.Info().Msgf("rclone driver: transfer timed out: %vms (timeout = %v)", timePastMs, timeout) + // set status to EXPIRED and save + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_EXPIRED + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + // log this? + fmt.Printf("Save transfer failed: %v \n", err) + logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + } + break + } + + // request rclone for current job status + // + // TODO: what do we do in case calling rclone results in errors ? + // simply break, or log|save(status invalid)|break ? + // or don't break with any error and try until successful or expiration ? + // for now break on any error + + jobID := transfer.JobID + + type rcloneStatusReqJSON struct { + JobID int64 `json:"jobid"` + } + rcloneStatusReq := &rcloneStatusReqJSON{ + JobID: jobID, + } + + data, err := json.Marshal(rcloneStatusReq) + if err != nil { + logger.Error().Err(err).Msgf("rclone driver: marshalling request failed: %v", err) + transfer.TransferStatus = datatx.Status_STATUS_INVALID + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + break + } + break + } + + transferFileMethod := "/job/status" + + u, err := url.Parse(driver.config.Endpoint) + if err != nil { + logger.Error().Err(err).Msgf("rclone driver: could not parse driver endpoint: %v", err) + break + } + u.Path = path.Join(u.Path, transferFileMethod) + requestURL := u.String() + + req, err := http.NewRequest("POST", requestURL, bytes.NewReader(data)) + if err != nil { + logger.Error().Err(err).Msgf("rclone driver: error framing post request: %v", err) + break + } + req.Header.Set("Content-Type", "application/json") + + req.SetBasicAuth(driver.config.AuthUser, driver.config.AuthPass) + + res, err := driver.client.Do(req) + if err != nil { + logger.Error().Err(err).Msgf("rclone driver: error sending post request: %v", err) + break + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + // TODO "job not found" also gives a 500 + // Should that return STATUS_INVALID ?? + // at the minimum the returned error message should be the rclone error message + resBody, e := ioutil.ReadAll(res.Body) + if e != nil { + logger.Error().Err(err).Msgf("rclone driver: error reading response body: %v", err) + } + logger.Error().Err(err).Msgf("rclone driver: rclone request responded with error: %s: %s", res.Status, string(resBody)) + break + } + + type rcloneStatusResJSON struct { + Finished bool `json:"finished"` + Success bool `json:"success"` + ID int64 `json:"id"` + Error string `json:"error"` + Group string `json:"group"` + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + Duration float64 `json:"duration"` + // think we don't need this + // "output": {} // output of the job as would have been returned if called synchronously + } + var resData rcloneStatusResJSON + if err = json.NewDecoder(res.Body).Decode(&resData); err != nil { + logger.Error().Err(err).Msgf("rclone driver: error decoding response data: %v", err) + break + } + + fmt.Printf("rclone resData: %v\n", resData) + + if resData.Error != "" { + logger.Error().Err(err).Msgf("rclone driver: rclone responded with error: %v", resData.Error) + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: error saving transfer: %v", err) + break + } + break + } + + // transfer complete + if resData.Finished && resData.Success { + logger.Info().Msg("rclone driver: transfer job finished") + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_COMPLETE + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: error saving transfer: %v", err) + break + } + break + } + + // transfer completed unsuccessfully without error + if resData.Finished && !resData.Success { + logger.Info().Msgf("rclone driver: transfer job failed") + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: error saving transfer: %v", err) + break + } + break + } + + // transfer not yet finished: continue + if !resData.Finished { + logger.Info().Msgf("rclone driver: transfer job in progress") + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_IN_PROGRESS + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: error saving transfer: %v", err) + break + } + } + + <-time.After(time.Millisecond * time.Duration(driver.config.JobStatusCheckInterval)) + } + }() + + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txId}, + Status: transferStatus, + Ctime: cTime, + }, nil +} + +// GetTransferStatus returns the status of the transfer with the specified job id +func (driver *rclone) GetTransferStatus(ctx context.Context, transferId string) (*datatx.TxInfo, error) { + transfer, err := driver.pDriver.model.getTransfer(transferId) + if err != nil { + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: nil, + }, err + } + cTime, _ := strconv.ParseInt(transfer.Ctime, 10, 64) + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferId}, + Status: transfer.TransferStatus, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, nil +} + +// CancelTransfer cancels the transfer with the specified transfer id +func (driver *rclone) CancelTransfer(ctx context.Context, transferId string) (*datatx.TxInfo, error) { + transfer, err := driver.pDriver.model.getTransfer(transferId) + if err != nil { + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: nil, + }, err + } + cTime, _ := strconv.ParseInt(transfer.Ctime, 10, 64) + _, endStatusFound := txEndStatuses[transfer.TransferStatus.String()] + if endStatusFound { + err := errors.New("rclone driver: transfer already in end state") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + + // rcloneStop the rclone job/stop method json request + type rcloneStopRequest struct { + JobID int64 `json:"jobid"` + } + rcloneCancelTransferReq := &rcloneStopRequest{ + JobID: transfer.JobID, + } + + data, err := json.Marshal(rcloneCancelTransferReq) + if err != nil { + err = errors.Wrap(err, "rclone driver: error marshalling rclone req data") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + + transferFileMethod := "/job/stop" + + u, err := url.Parse(driver.config.Endpoint) + if err != nil { + err = errors.Wrap(err, "rclone driver: error parsing driver endpoint") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + u.Path = path.Join(u.Path, transferFileMethod) + requestURL := u.String() + + req, err := http.NewRequest("POST", requestURL, bytes.NewReader(data)) + if err != nil { + err = errors.Wrap(err, "rclone driver: error framing post request") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + req.Header.Set("Content-Type", "application/json") + + req.SetBasicAuth(driver.config.AuthUser, driver.config.AuthPass) + + res, err := driver.client.Do(req) + if err != nil { + err = errors.Wrap(err, "rclone driver: error sending post request") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + // TODO "job not found" also gives a 500 + // Should that return STATUS_INVALID ?? + // at the minimum the returned error message should be the rclone error message + resBody, err := ioutil.ReadAll(res.Body) + if err != nil { + err = errors.Wrap(err, "rclone driver: error reading response body") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + err = errors.Wrap(errors.New(fmt.Sprintf("%s: %s", res.Status, string(resBody))), "rclone driver: rclone request responded with error") + + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + + type rcloneCancelTransferResJSON struct { + Finished bool `json:"finished"` + Success bool `json:"success"` + ID int64 `json:"id"` + Error string `json:"error"` + Group string `json:"group"` + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + Duration float64 `json:"duration"` + // think we don't need this + // "output": {} // output of the job as would have been returned if called synchronously + } + var resData rcloneCancelTransferResJSON + if err = json.NewDecoder(res.Body).Decode(&resData); err != nil { + err = errors.Wrap(err, "rclone driver: error decoding response data") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + + if resData.Error != "" { + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferId}, + Status: datatx.Status_STATUS_TRANSFER_CANCEL_FAILED, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, errors.New(resData.Error) + } + + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_CANCELLED + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferId}, + Status: datatx.Status_STATUS_TRANSFER_CANCELLED, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, nil +} + +// RetryTransfer retries the transfer with the specified transfer ID. +// Note that tokens must still be valid. +func (driver *rclone) RetryTransfer(ctx context.Context, transferId string) (*datatx.TxInfo, error) { + return driver.startJob(ctx, transferId, "", "", "", "", "", "") +} + +// getTransfer returns the transfer with the specified transfer ID +func (m *transferModel) getTransfer(transferId string) (*transfer, error) { + transfer, ok := m.Transfers[transferId] + if !ok { + return nil, errors.New("rclone driver: invalid transfer ID") + } + return transfer, nil +} + +func (driver *rclone) srcPathIsFolder() (bool, error) { + // TODO rclone stat src to determine resource type + return true, nil +} diff --git a/pkg/datatx/manager/registry/registry.go b/pkg/datatx/manager/registry/registry.go new file mode 100644 index 0000000000..95a4830b3c --- /dev/null +++ b/pkg/datatx/manager/registry/registry.go @@ -0,0 +1,36 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package registry + +import ( + "github.com/cs3org/reva/pkg/datatx" +) + +// NewFunc is the function that datatx implementations +// should register at init time. +type NewFunc func(map[string]interface{}) (datatx.Manager, error) + +// NewFuncs is a map containing all the registered datatx backends. +var NewFuncs = map[string]NewFunc{} + +// Register registers a new datatx backend new function. +// Not safe for concurrent use. Safe for use from package init. +func Register(name string, f NewFunc) { + NewFuncs[name] = f +} From 27b77a53d35820427beec018072e9c2f05f448d6 Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Mon, 6 Sep 2021 10:35:43 +0200 Subject: [PATCH 11/21] stat to analyse path proper logging properly decode rclone http error response --- pkg/datatx/datatx.go | 8 +- pkg/datatx/manager/rclone/rclone.go | 184 +++++++++++++++++----------- 2 files changed, 117 insertions(+), 75 deletions(-) diff --git a/pkg/datatx/datatx.go b/pkg/datatx/datatx.go index cc4a65584e..945479d23b 100644 --- a/pkg/datatx/datatx.go +++ b/pkg/datatx/datatx.go @@ -26,13 +26,13 @@ import ( // Manager the interface any transfer driver should implement type Manager interface { - // StartTransfer initiates a transfer job and returns a TxInfo object that includes a unique transfer id. + // StartTransfer initiates a transfer job and returns a TxInfo object including a unique transfer id, and error if any. StartTransfer(ctx context.Context, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) - // GetTransferStatus returns a TxInfo object that includes the current status. + // GetTransferStatus returns a TxInfo object including the current status, and error if any. GetTransferStatus(ctx context.Context, transferId string) (*datatx.TxInfo, error) - // CancelTransfer cancels the transfer and returns a TxInfo object that includes the current status. + // CancelTransfer cancels the transfer and returns a TxInfo object and error if any. CancelTransfer(ctx context.Context, transferId string) (*datatx.TxInfo, error) - // RetryTransfer retries the transfer with the specified transfer ID. + // RetryTransfer retries the transfer and returns a TxInfo object and error if any. // Note that tokens must still be valid. RetryTransfer(ctx context.Context, transferId string) (*datatx.TxInfo, error) } diff --git a/pkg/datatx/manager/rclone/rclone.go b/pkg/datatx/manager/rclone/rclone.go index ea0dd9ded9..f977c00217 100644 --- a/pkg/datatx/manager/rclone/rclone.go +++ b/pkg/datatx/manager/rclone/rclone.go @@ -75,6 +75,13 @@ type rclone struct { pDriver *pDriver } +type rcloneHttpErrorRes struct { + Error string `json:"error"` + Input map[string]interface{} `json:"input"` + Path string `json:"path"` + Status int `json:"status"` +} + type transferModel struct { File string Transfers map[string]*transfer `json:"transfers"` @@ -189,12 +196,12 @@ func (m *transferModel) saveTransfer(e error) error { data, err := json.Marshal(m) if err != nil { e = errors.Wrap(err, "error encoding transfer data to json") - return err + return e } if err := ioutil.WriteFile(m.File, data, 0644); err != nil { e = errors.Wrap(err, "error writing transfer data to file: "+m.File) - return err + return e } return e @@ -207,8 +214,6 @@ func (driver *rclone) StartTransfer(ctx context.Context, srcRemote string, srcPa // startJob starts a transfer job. Retries a previous job if transferId is specified. func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) { - fmt.Printf("rclone StartTransfer:\n srcRemote: %v \n srcPath: %v \n srcToken: %v \n destRemote: %v \n destPath: %v \n destToken: %v \n", srcRemote, srcPath, srcToken, destRemote, destPath, destToken) - logger := appctx.GetLogger(ctx) driver.pDriver.Lock() @@ -221,7 +226,7 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote txId = uuid.New().String() cTime = &typespb.Timestamp{Seconds: uint64(time.Now().Unix())} } else { // restart existing transfer if transferId is specified - fmt.Printf("Restarting transfer (txId: %s)\n", transferId) + logger.Debug().Msgf("Restarting transfer (txId: %s)", transferId) txId = transferId transfer, err := driver.pDriver.model.getTransfer(txId) if err != nil { @@ -235,9 +240,7 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote seconds, _ := strconv.ParseInt(transfer.Ctime, 10, 64) cTime = &typespb.Timestamp{Seconds: uint64(seconds)} _, endStatusFound := txEndStatuses[transfer.TransferStatus.String()] - fmt.Printf("retrying transfer with status (%s)\n", transfer.TransferStatus.String()) if !endStatusFound { - fmt.Println("transfer not in end status, unable to restart") err := errors.New("rclone: transfer still running, unable to restart") return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txId}, @@ -270,28 +273,16 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote } driver.pDriver.model.Transfers[txId] = transfer - // if err := driver.pDriver.model.saveTransfer(nil); err != nil { - // err = errors.Wrap(err, "rclone: error pulling transfer") - // return &datatx.TxInfo{ - // Id: &datatx.TxId{OpaqueId: txId}, - // Status: datatx.Status_STATUS_INVALID, - // Ctime: cTime, - // }, err - // } type rcloneAsyncReqJSON struct { - SrcFs string `json:"srcFs"` - SrcToken string `json:"srcToken"` - DstFs string `json:"dstFs"` - DstToken string `json:"destToken"` - Async bool `json:"_async"` + SrcFs string `json:"srcFs"` + // SrcToken string `json:"srcToken"` + DstFs string `json:"dstFs"` + // DstToken string `json:"destToken"` + Async bool `json:"_async"` } - // TODO what about the url schema? srcFs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":%v", srcToken, srcRemote, srcPath) dstFs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":%v", destToken, destRemote, destPath) - fmt.Printf("srcFs: %v\n", srcFs) - fmt.Printf("dstFs: %v\n", dstFs) - rcloneReq := &rcloneAsyncReqJSON{ SrcFs: srcFs, DstFs: dstFs, @@ -308,7 +299,8 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote }, driver.pDriver.model.saveTransfer(err) } - pathIsFolder, err := driver.srcPathIsFolder() + transferFileMethod := "/sync/copy" + remotePathIsFolder, err := driver.remotePathIsFolder(srcRemote, srcPath, srcToken) if err != nil { err = errors.Wrap(err, "rclone: error pulling transfer: error stating src path") transfer.TransferStatus = datatx.Status_STATUS_INVALID @@ -318,11 +310,15 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote Ctime: cTime, }, driver.pDriver.model.saveTransfer(err) } - transferFileMethod := "/operations/copyfile" - if pathIsFolder { - // TODO sync/copy will overwrite existing data; use a configurable check for this? - // But not necessary if unique folder per transfer - transferFileMethod = "/sync/copy" + if !remotePathIsFolder { + transferFileMethod = "/operations/copyfile" + err = errors.Wrap(err, "rclone: error pulling transfer: path is a file, only folder transfer is implemented") + transfer.TransferStatus = datatx.Status_STATUS_INVALID + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txId}, + Status: datatx.Status_STATUS_INVALID, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(err) } u, err := url.Parse(driver.config.Endpoint) @@ -347,10 +343,9 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote Ctime: cTime, }, driver.pDriver.model.saveTransfer(err) } - req.Header.Set("Content-Type", "application/json") + req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(driver.config.AuthUser, driver.config.AuthPass) - res, err := driver.client.Do(req) if err != nil { err = errors.Wrap(err, "rclone: error pulling transfer: error sending post request") @@ -365,9 +360,9 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote defer res.Body.Close() if res.StatusCode != http.StatusOK { - resBody, err := ioutil.ReadAll(res.Body) - if err != nil { - err = errors.Wrap(err, "rclone: error reading response body") + var errorResData rcloneHttpErrorRes + if err = json.NewDecoder(res.Body).Decode(&errorResData); err != nil { + err = errors.Wrap(err, "rclone driver: error decoding response data") transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txId}, @@ -375,7 +370,7 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote Ctime: cTime, }, driver.pDriver.model.saveTransfer(err) } - e := errors.New("rclone: rclone request responded with error: " + fmt.Sprintf("%s: %s", res.Status, string(resBody))) + e := errors.New("rclone: rclone request responded with error, " + fmt.Sprintf(" status: %v, error: %v", errorResData.Status, errorResData.Error)) transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: txId}, @@ -409,7 +404,7 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote }, err } - // start separate dedicated process to periodically check this transfer progress + // start separate dedicated process to periodically check the transfer progress go func() { // runs for as long as no end state or time out has been reached startTimeMs := time.Now().Nanosecond() / 1000 @@ -420,18 +415,17 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote for { transfer, err := driver.pDriver.model.getTransfer(txId) - fmt.Printf("found transfer to check: %v\n", transfer) if err != nil { transfer.TransferStatus = datatx.Status_STATUS_INVALID err = driver.pDriver.model.saveTransfer(err) - logger.Error().Err(err).Msgf("rclone driver: unable to retreive transfer with id: %v", txId) + logger.Error().Err(err).Msgf("rclone driver: unable to retrieve transfer with id: %v", txId) break } // check for end status first - endStatus, endStatusFound := txEndStatuses[transfer.TransferStatus.String()] + _, endStatusFound := txEndStatuses[transfer.TransferStatus.String()] if endStatusFound { - logger.Info().Msgf("rclone driver: endstatus reached: %v", endStatus) + logger.Info().Msgf("rclone driver: transfer endstatus reached: %v", transfer.TransferStatus) break } @@ -444,22 +438,12 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote // set status to EXPIRED and save transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_EXPIRED if err := driver.pDriver.model.saveTransfer(nil); err != nil { - // log this? - fmt.Printf("Save transfer failed: %v \n", err) logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) } break } - // request rclone for current job status - // - // TODO: what do we do in case calling rclone results in errors ? - // simply break, or log|save(status invalid)|break ? - // or don't break with any error and try until successful or expiration ? - // for now break on any error - jobID := transfer.JobID - type rcloneStatusReqJSON struct { JobID int64 `json:"jobid"` } @@ -494,9 +478,7 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote break } req.Header.Set("Content-Type", "application/json") - req.SetBasicAuth(driver.config.AuthUser, driver.config.AuthPass) - res, err := driver.client.Do(req) if err != nil { logger.Error().Err(err).Msgf("rclone driver: error sending post request: %v", err) @@ -506,14 +488,12 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote defer res.Body.Close() if res.StatusCode != http.StatusOK { - // TODO "job not found" also gives a 500 - // Should that return STATUS_INVALID ?? - // at the minimum the returned error message should be the rclone error message - resBody, e := ioutil.ReadAll(res.Body) - if e != nil { + var errorResData rcloneHttpErrorRes + if err = json.NewDecoder(res.Body).Decode(&errorResData); err != nil { + err = errors.Wrap(err, "rclone driver: error decoding response data") logger.Error().Err(err).Msgf("rclone driver: error reading response body: %v", err) } - logger.Error().Err(err).Msgf("rclone driver: rclone request responded with error: %s: %s", res.Status, string(resBody)) + logger.Error().Err(err).Msgf("rclone driver: rclone request responded with error, status: %v, error: %v", errorResData.Status, errorResData.Error) break } @@ -535,8 +515,6 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote break } - fmt.Printf("rclone resData: %v\n", resData) - if resData.Error != "" { logger.Error().Err(err).Msgf("rclone driver: rclone responded with error: %v", resData.Error) transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED @@ -687,20 +665,16 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferId string) (*d defer res.Body.Close() if res.StatusCode != http.StatusOK { - // TODO "job not found" also gives a 500 - // Should that return STATUS_INVALID ?? - // at the minimum the returned error message should be the rclone error message - resBody, err := ioutil.ReadAll(res.Body) - if err != nil { - err = errors.Wrap(err, "rclone driver: error reading response body") + var errorResData rcloneHttpErrorRes + if err = json.NewDecoder(res.Body).Decode(&errorResData); err != nil { + err = errors.Wrap(err, "rclone driver: error decoding response data") return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: transferId}, Status: datatx.Status_STATUS_INVALID, Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, }, err } - err = errors.Wrap(errors.New(fmt.Sprintf("%s: %s", res.Status, string(resBody))), "rclone driver: rclone request responded with error") - + err = errors.Wrap(errors.New(fmt.Sprintf("status: %v, error: %v", errorResData.Status, errorResData.Error)), "rclone driver: rclone request responded with error") return &datatx.TxInfo{ Id: &datatx.TxId{OpaqueId: transferId}, Status: datatx.Status_STATUS_INVALID, @@ -769,7 +743,75 @@ func (m *transferModel) getTransfer(transferId string) (*transfer, error) { return transfer, nil } -func (driver *rclone) srcPathIsFolder() (bool, error) { - // TODO rclone stat src to determine resource type +func (driver *rclone) remotePathIsFolder(remote string, remotePath string, remoteToken string) (bool, error) { + type rcloneListReqJSON struct { + Fs string `json:"fs"` + Remote string `json:"remote"` + } + fs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":", remoteToken, remote) + rcloneReq := &rcloneListReqJSON{ + Fs: fs, + Remote: remotePath, + } + data, err := json.Marshal(rcloneReq) + if err != nil { + return false, errors.Wrap(err, "rclone: error marshalling rclone req data") + } + + listMethod := "/operations/list" + + u, err := url.Parse(driver.config.Endpoint) + if err != nil { + return false, errors.Wrap(err, "rclone driver: error parsing driver endpoint") + } + u.Path = path.Join(u.Path, listMethod) + requestURL := u.String() + + req, err := http.NewRequest("POST", requestURL, bytes.NewReader(data)) + if err != nil { + return false, errors.Wrap(err, "rclone driver: error framing post request") + } + req.Header.Set("Content-Type", "application/json") + + req.SetBasicAuth(driver.config.AuthUser, driver.config.AuthPass) + + res, err := driver.client.Do(req) + if err != nil { + return false, errors.Wrap(err, "rclone driver: error sending post request") + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + var errorResData rcloneHttpErrorRes + if err = json.NewDecoder(res.Body).Decode(&errorResData); err != nil { + return false, errors.Wrap(err, "rclone driver: error decoding response data") + } + return false, errors.Wrap(errors.New(fmt.Sprintf("status: %v, error: %v", errorResData.Status, errorResData.Error)), "rclone driver: rclone request responded with error") + } + + type item struct { + Path string `json:"Path"` + Name string `json:"Name"` + Size int64 `json:"Size"` + MimeType string `json:"MimeType"` + ModTime string `json:"ModTime"` + IsDir bool `json:"IsDir"` + } + type rcloneListResJSON struct { + List []*item `json:"list"` + } + + var resData rcloneListResJSON + if err = json.NewDecoder(res.Body).Decode(&resData); err != nil { + return false, errors.Wrap(err, "rclone driver: error decoding response data") + } + + // a file will return one single item, the file, with path being the remote path and IsDir will be false + if len(resData.List) == 1 && resData.List[0].Path == remotePath && !resData.List[0].IsDir { + return false, nil + } + + // in all other cases the remote path is a directory return true, nil } From 1b438658ead3722055137fc992301f3bc52c6af9 Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Mon, 6 Sep 2021 10:43:35 +0200 Subject: [PATCH 12/21] implement handling of both src and dest targetURI of pull model --- internal/grpc/services/datatx/datatx.go | 76 +++------- .../grpc/services/gateway/ocmshareprovider.go | 138 +++++++++++++++++- 2 files changed, 155 insertions(+), 59 deletions(-) diff --git a/internal/grpc/services/datatx/datatx.go b/internal/grpc/services/datatx/datatx.go index 0a25f51241..b1c21c0c01 100644 --- a/internal/grpc/services/datatx/datatx.go +++ b/internal/grpc/services/datatx/datatx.go @@ -25,7 +25,6 @@ import ( "io/ioutil" "net/url" "os" - "path" "sync" ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1" @@ -36,7 +35,6 @@ import ( "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/rgrpc" "github.com/cs3org/reva/pkg/rgrpc/status" - "github.com/cs3org/reva/pkg/token" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "google.golang.org/grpc" @@ -73,9 +71,10 @@ type txShareModel struct { } type txShare struct { - TxID string - TargetUri string - Opaque *types.Opaque `json:"opaque"` + TxID string + SrcTargetURI string + DestTargetURI string + Opaque *types.Opaque `json:"opaque"` } type webdavEndpoint struct { @@ -155,38 +154,21 @@ func (s *service) UnprotectedEndpoints() []string { } func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) { - fmt.Printf("PullTransfer reached: req: %v\n", req) - var srcRemote string - var srcPath string - var srcToken string - var dstRemote string - var dstPath string - var dstToken string - srcRemote = "" - srcPath = "" - srcToken = "" - dstRemote = "" - dstPath = "" - dstToken = "" - - ep, err := s.extractEndpointInfo(ctx, req.GetTargetUri()) + srcEp, err := s.extractEndpointInfo(ctx, req.SrcTargetUri) if err != nil { return nil, err } - srcRemote = fmt.Sprintf("%s://%s", ep.endpointScheme, ep.endpoint) - srcPath = ep.filePath - srcToken = ep.token - // destination(grantee) webdav endpoint - // user := user.ContextMustGetUser(ctx) -> user.Id.Idp - endpoint, ok := req.Opaque.Map["endpoint"] - if !ok { - return nil, errtypes.NotSupported("endpoint not defined") + srcRemote := fmt.Sprintf("%s://%s", srcEp.endpointScheme, srcEp.endpoint) + srcPath := srcEp.filePath + srcToken := srcEp.token + + destEp, err := s.extractEndpointInfo(ctx, req.DestTargetUri) + if err != nil { + return nil, err } - dstRemote = string(endpoint.Value) - // // home dir prefix must be removed from the path - // dstPath = path.Join(s.conf.DataTransfersFolder, strings.TrimPrefix(ep.filePath, "/home")) - dstPath = path.Join(s.conf.DataTransfersFolder, ep.filePath) - dstToken = token.ContextMustGetToken(ctx) + dstRemote := fmt.Sprintf("%s://%s", destEp.endpointScheme, destEp.endpoint) + dstPath := destEp.filePath + dstToken := destEp.token txInfo, err := s.txManager.StartTransfer(ctx, srcRemote, srcPath, srcToken, dstRemote, dstPath, dstToken) if err != nil { @@ -196,15 +178,12 @@ func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequ TxInfo: txInfo, }, err } - fmt.Printf("err: %v\n", err) - fmt.Printf("txInfo Status: %v\n", txInfo.Status) - fmt.Printf("txInfo TxID: %v\n", txInfo.GetId().OpaqueId) - fmt.Printf("txInfo Ctime: %v\n", txInfo.GetCtime()) txShare := &txShare{ - TxID: txInfo.GetId().OpaqueId, - TargetUri: req.TargetUri, - Opaque: req.Opaque, + TxID: txInfo.GetId().OpaqueId, + SrcTargetURI: req.SrcTargetUri, + DestTargetURI: req.DestTargetUri, + Opaque: req.Opaque, } s.txShareDriver.Lock() @@ -257,17 +236,11 @@ func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransfer txInfo, err := s.txManager.CancelTransfer(ctx, req.GetTxId().OpaqueId) if err != nil { txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)} - txInfo.Status = datatx.Status_STATUS_TRANSFER_CANCELLED + err = errors.Wrap(err, "datatx service: error cancelling transfer") return &datatx.CancelTransferResponse{ - Status: status.NewOK(ctx), + Status: status.NewInternal(ctx, err, "error cancelling transfer"), TxInfo: txInfo, - }, nil - - // err = errors.Wrap(err, "datatx service: error cancelling transfer") - // return &datatx.CancelTransferResponse{ - // Status: status.NewInternal(ctx, err, "error cancelling transfer"), - // TxInfo: txInfo, - // }, err + }, err } txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)} @@ -339,9 +312,6 @@ func (s *service) extractEndpointInfo(ctx context.Context, targetURL string) (*w if err != nil { return nil, errors.Wrap(err, "datatx service: error parsing target uri: "+targetURL) } - if uri.Scheme != "datatx" { - return nil, errtypes.NotSupported("datatx service: ref target does not have the datatx scheme") - } m, err := url.ParseQuery(uri.RawQuery) if err != nil { @@ -351,7 +321,7 @@ func (s *service) extractEndpointInfo(ctx context.Context, targetURL string) (*w return &webdavEndpoint{ filePath: m["name"][0], endpoint: uri.Host + uri.Path, - endpointScheme: m["endpointscheme"][0], + endpointScheme: uri.Scheme, token: uri.User.String(), }, nil } diff --git a/internal/grpc/services/gateway/ocmshareprovider.go b/internal/grpc/services/gateway/ocmshareprovider.go index fa6f5f1b1c..09e595534d 100644 --- a/internal/grpc/services/gateway/ocmshareprovider.go +++ b/internal/grpc/services/gateway/ocmshareprovider.go @@ -32,6 +32,7 @@ import ( datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" "github.com/cs3org/reva/pkg/appctx" + ctxpkg "github.com/cs3org/reva/pkg/ctx" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/rgrpc/status" "github.com/cs3org/reva/pkg/rgrpc/todo/pool" @@ -206,6 +207,36 @@ func (s *svc) ListReceivedOCMShares(ctx context.Context, req *ocm.ListReceivedOC func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceivedOCMShareRequest) (*ocm.UpdateReceivedOCMShareResponse, error) { log := appctx.GetLogger(ctx) + + getShareReq := &ocm.GetReceivedOCMShareRequest{Ref: req.Ref} + getShareRes, err := s.GetReceivedOCMShare(ctx, getShareReq) + if err != nil { + log.Err(err).Msg("gateway: error calling GetReceivedShare") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + }, nil + } + + if getShareRes.Status.Code != rpc.Code_CODE_OK { + log.Error().Msg("gateway: error calling GetReceivedShare") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + }, nil + } + + share := getShareRes.Share + if share == nil { + panic("gateway: error updating a received share: the share is nil") + } + + // return early if transfer type share has already been accepted + if share.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER && share.GetState() == ocm.ShareState_SHARE_STATE_ACCEPTED { + log.Err(err).Msg("gateway: transfer type share already accepted") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: status.NewOK(ctx), + }, nil + } + c, err := pool.GetOCMShareProviderClient(s.c.OCMShareProviderEndpoint) if err != nil { err = errors.Wrap(err, "gateway: error calling GetOCMShareProviderClient") @@ -244,27 +275,122 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive }, } getShareRes, err := s.GetReceivedOCMShare(ctx, getShareReq) + // we don't commit to storage invalid update fields or empty display names. + if req.Field.GetState() == ocm.ShareState_SHARE_STATE_INVALID && req.Field.GetDisplayName() == "" { + log.Error().Msg("the update field is invalid, aborting reference manipulation") + return res, nil + + } + + // TODO(labkode): if update field is displayName we need to do a rename on the storage to align + // share display name and storage filename. + if req.Field.GetState() != ocm.ShareState_SHARE_STATE_INVALID { + if req.Field.GetState() == ocm.ShareState_SHARE_STATE_ACCEPTED { + if share.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER { + srcIdp := share.GetShare().GetOwner().GetIdp() + meshProvider, err := s.GetInfoByDomain(ctx, &ocmprovider.GetInfoByDomainRequest{ + Domain: srcIdp, + }) if err != nil { log.Err(err).Msg("gateway: error calling GetReceivedShare") return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{ - Code: rpc.Code_CODE_INTERNAL, - }, + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, }, nil } if getShareRes.Status.Code != rpc.Code_CODE_OK { log.Error().Msg("gateway: error calling GetReceivedShare") + var srcEndpoint string + var srcEndpointBaseURI string + // target URI scheme will be the webdav endpoint scheme + var srcEndpointScheme string + for _, s := range meshProvider.ProviderInfo.Services { + if strings.ToLower(s.Endpoint.Type.Name) == "webdav" { + url, err := url.Parse(s.Endpoint.Path) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + s.Endpoint.Path) + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + }, nil + } + srcEndpoint = url.Host + srcEndpointBaseURI = url.Path + srcEndpointScheme = url.Scheme + break + } + } + + var srcToken string + srcTokenOpaque, ok := share.GetShare().Grantee.Opaque.Map["token"] + if !ok { return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{ - Code: rpc.Code_CODE_INTERNAL, - }, + Status: status.NewNotFound(ctx, "token not found"), + }, nil + } + switch srcTokenOpaque.Decoder { + case "plain": + srcToken = string(srcTokenOpaque.Value) + default: + err := errtypes.NotSupported("opaque entry decoder not recognized: " + srcTokenOpaque.Decoder) + return &ocm.UpdateReceivedOCMShareResponse{ + Status: status.NewInternal(ctx, err, "error updating received share"), + }, nil + } + + srcPath := path.Join(srcEndpointBaseURI, share.GetShare().Name) + srcTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", srcEndpointScheme, srcToken, srcEndpoint, srcPath) + + // get the webdav endpoint of the grantee's idp + var granteeIdp string + if share.GetShare().GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_USER { + granteeIdp = share.GetShare().GetGrantee().GetUserId().Idp + } + if share.GetShare().GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_GROUP { + granteeIdp = share.GetShare().GetGrantee().GetGroupId().Idp + } + destWebdavEndpoint, err := s.getWebdavEndpoint(ctx, granteeIdp) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + }, nil + } + url, err := url.Parse(destWebdavEndpoint) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + destWebdavEndpoint) + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + }, nil + } + destEndpoint := url.Host + destEndpointBaseURI := url.Path + destEndpointScheme := url.Scheme + destToken := ctxpkg.ContextMustGetToken(ctx) + homeRes, err := s.GetHome(ctx, &provider.GetHomeRequest{}) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, }, nil } + destPath := path.Join(destEndpointBaseURI, homeRes.Path, s.c.DataTransfersFolder, path.Base(share.GetShare().Name)) + destTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", destEndpointScheme, destToken, destEndpoint, destPath) share := getShareRes.Share if share == nil { panic("gateway: error updating a received share: the share is nil") + opaqueObj := &types.Opaque{ + Map: map[string]*types.OpaqueEntry{ + "shareId": { + Decoder: "plain", + Value: []byte(share.GetShare().GetId().OpaqueId), + }, + }, + } + req := &datatx.PullTransferRequest{ + SrcTargetUri: srcTargetURI, + DestTargetUri: destTargetURI, + Opaque: opaqueObj, } if share.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER { From e84d0b6f4c196a52ea3b0719a0a52035b5e2c95c Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Mon, 6 Sep 2021 10:45:39 +0200 Subject: [PATCH 13/21] command comments fixed --- cmd/reva/transfer-create.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/reva/transfer-create.go b/cmd/reva/transfer-create.go index 44654f9104..9f359eb439 100644 --- a/cmd/reva/transfer-create.go +++ b/cmd/reva/transfer-create.go @@ -42,8 +42,8 @@ func transferCreateCommand() *command { cmd.Description = func() string { return "create transfer between 2 sites" } cmd.Usage = func() string { return "Usage: transfer-create [-flags] " } grantee := cmd.String("grantee", "", "the grantee, receiver of the transfer") - granteeType := cmd.String("granteeType", "user", "the grantee type, one of: user, group") - idp := cmd.String("idp", "", "the idp of the grantee, default to same idp as the user triggering the action") + granteeType := cmd.String("granteeType", "user", "the grantee type, one of: user, group (defaults to user)") + idp := cmd.String("idp", "", "the idp of the grantee") userType := cmd.String("user-type", "primary", "the type of user account, defaults to primary") cmd.Action = func(w ...io.Writer) error { From bdaf0fce8e67012ac80d8341816f4fc3c5d04b5e Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Mon, 6 Sep 2021 12:58:32 +0200 Subject: [PATCH 14/21] datatx service example configuration --- examples/datatx/datatx.toml | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 examples/datatx/datatx.toml diff --git a/examples/datatx/datatx.toml b/examples/datatx/datatx.toml new file mode 100644 index 0000000000..cb50385005 --- /dev/null +++ b/examples/datatx/datatx.toml @@ -0,0 +1,22 @@ +# example data transfer service configuration +[grpc.services.datatx] +# rclone is the default data transfer driver +txdriver = "rclone" +# the shares,transfers db file (default: /var/tmp/reva/datatx-shares.json) +tx_shares_file = "" +# base folder of the data transfers (default: /home/DataTransfers) +data_transfers_folder = "" + +# rclone data transfer driver +[grpc.services.datatx.txdrivers.rclone] +# rclone endpoint +endpoint = "http://..." +# basic auth is used +auth_user = "...rcloneuser" +auth_pass = "...rcloneusersecret" +# the transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json) +file = "" +# check status job interval in milliseconds +job_status_check_interval = 2000 +# the job timeout in milliseconds (must be long enough for big transfers!) +job_timeout = 120000 \ No newline at end of file From 1d1544667d7e1adec7f527eca8b1d9f904d96913 Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Mon, 6 Sep 2021 13:02:19 +0200 Subject: [PATCH 15/21] Always save the transfer regardless of start transfer outcome --- internal/grpc/services/datatx/datatx.go | 27 +++++++++++++++---------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/internal/grpc/services/datatx/datatx.go b/internal/grpc/services/datatx/datatx.go index b1c21c0c01..8c2c2b87e2 100644 --- a/internal/grpc/services/datatx/datatx.go +++ b/internal/grpc/services/datatx/datatx.go @@ -88,8 +88,11 @@ func (c *config) init() { if c.TxDriver == "" { c.TxDriver = "rclone" } + if c.TxSharesFile == "" { + c.TxSharesFile = "/var/tmp/reva/datatx-shares.json" + } if c.DataTransfersFolder == "" { - c.DataTransfersFolder = "DataTransfers" + c.DataTransfersFolder = "/home/DataTransfers" } } @@ -170,27 +173,20 @@ func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequ dstPath := destEp.filePath dstToken := destEp.token - txInfo, err := s.txManager.StartTransfer(ctx, srcRemote, srcPath, srcToken, dstRemote, dstPath, dstToken) - if err != nil { - err = errors.Wrap(err, "datatx service: error starting transfer job") - return &datatx.PullTransferResponse{ - Status: status.NewInvalid(ctx, "datatx service: error pulling transfer"), - TxInfo: txInfo, - }, err - } + txInfo, startTransferErr := s.txManager.StartTransfer(ctx, srcRemote, srcPath, srcToken, dstRemote, dstPath, dstToken) + // we always save the transfer regardless of start transfer outcome + // only then, if starting fails, can we try to restart it txShare := &txShare{ TxID: txInfo.GetId().OpaqueId, SrcTargetURI: req.SrcTargetUri, DestTargetURI: req.DestTargetUri, Opaque: req.Opaque, } - s.txShareDriver.Lock() defer s.txShareDriver.Unlock() s.txShareDriver.model.TxShares[txInfo.GetId().OpaqueId] = txShare - if err := s.txShareDriver.model.saveTxShare(); err != nil { err = errors.Wrap(err, "datatx service: error saving transfer share: "+datatx.Status_STATUS_INVALID.String()) return &datatx.PullTransferResponse{ @@ -198,6 +194,15 @@ func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequ }, err } + // now check start transfer outcome + if startTransferErr != nil { + startTransferErr = errors.Wrap(startTransferErr, "datatx service: error starting transfer job") + return &datatx.PullTransferResponse{ + Status: status.NewInvalid(ctx, "datatx service: error pulling transfer"), + TxInfo: txInfo, + }, startTransferErr + } + return &datatx.PullTransferResponse{ Status: status.NewOK(ctx), TxInfo: txInfo, From 8c00dab2b3910028ff5aa5a88f57d477d2198386 Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Wed, 8 Sep 2021 10:32:54 +0200 Subject: [PATCH 16/21] New PR. --- changelog/unreleased/pull-transfer.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog/unreleased/pull-transfer.md b/changelog/unreleased/pull-transfer.md index b02fd62ac2..276609dd63 100644 --- a/changelog/unreleased/pull-transfer.md +++ b/changelog/unreleased/pull-transfer.md @@ -3,4 +3,4 @@ Enhancement: New CS3API datatx methods CS3 datatx pull model methods: PullTransfer, RetryTransfer, ListTransfers Method CreateTransfer removed. -https://github.com/cs3org/reva/pull/1824 \ No newline at end of file +https://github.com/cs3org/reva/pull/2052 \ No newline at end of file From 84a1c5f2f3a6b65e0b7d481bf790cf9a1ab8f03d Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Wed, 8 Sep 2021 14:14:54 +0200 Subject: [PATCH 17/21] Lint fixes. --- cmd/reva/transfer-cancel.go | 6 +- cmd/reva/transfer-get-status.go | 6 +- cmd/reva/transfer-list.go | 6 +- cmd/reva/transfer-retry.go | 6 +- pkg/datatx/datatx.go | 6 +- pkg/datatx/manager/rclone/rclone.go | 115 ++++++++++++++-------------- 6 files changed, 72 insertions(+), 73 deletions(-) diff --git a/cmd/reva/transfer-cancel.go b/cmd/reva/transfer-cancel.go index 2eede16952..dfa086467b 100644 --- a/cmd/reva/transfer-cancel.go +++ b/cmd/reva/transfer-cancel.go @@ -34,11 +34,11 @@ func transferCancelCommand() *command { cmd := newCommand("transfer-cancel") cmd.Description = func() string { return "cancel a running transfer" } cmd.Usage = func() string { return "Usage: transfer-cancel [-flags]" } - txId := cmd.String("txId", "", "the transfer identifier") + txID := cmd.String("txId", "", "the transfer identifier") cmd.Action = func(w ...io.Writer) error { // validate flags - if *txId == "" { + if *txID == "" { return errors.New("txId must be specified: use -txId flag\n" + cmd.Usage()) } @@ -49,7 +49,7 @@ func transferCancelCommand() *command { } cancelRequest := &datatx.CancelTransferRequest{ - TxId: &datatx.TxId{OpaqueId: *txId}, + TxId: &datatx.TxId{OpaqueId: *txID}, } cancelResponse, err := client.CancelTransfer(ctx, cancelRequest) diff --git a/cmd/reva/transfer-get-status.go b/cmd/reva/transfer-get-status.go index 0fd9f74a7d..ab73a52e69 100644 --- a/cmd/reva/transfer-get-status.go +++ b/cmd/reva/transfer-get-status.go @@ -34,11 +34,11 @@ func transferGetStatusCommand() *command { cmd := newCommand("transfer-get-status") cmd.Description = func() string { return "get the status of a transfer" } cmd.Usage = func() string { return "Usage: transfer-get-status [-flags]" } - txId := cmd.String("txId", "", "the transfer identifier") + txID := cmd.String("txId", "", "the transfer identifier") cmd.Action = func(w ...io.Writer) error { // validate flags - if *txId == "" { + if *txID == "" { return errors.New("txId must be specified: use -txId flag\n" + cmd.Usage()) } @@ -49,7 +49,7 @@ func transferGetStatusCommand() *command { } getStatusRequest := &datatx.GetTransferStatusRequest{ - TxId: &datatx.TxId{OpaqueId: *txId}, + TxId: &datatx.TxId{OpaqueId: *txID}, } getStatusResponse, err := client.GetTransferStatus(ctx, getStatusRequest) diff --git a/cmd/reva/transfer-list.go b/cmd/reva/transfer-list.go index f11721ee10..b0ea83ae9d 100644 --- a/cmd/reva/transfer-list.go +++ b/cmd/reva/transfer-list.go @@ -33,7 +33,7 @@ func transferListCommand() *command { cmd := newCommand("transfer-list") cmd.Description = func() string { return "get a list of transfers" } cmd.Usage = func() string { return "Usage: transfer-list [-flags]" } - filterShareId := cmd.String("shareId", "", "share ID filter (optional)") + filterShareID := cmd.String("shareId", "", "share ID filter (optional)") cmd.Action = func(w ...io.Writer) error { ctx := getAuthContext() @@ -44,12 +44,12 @@ func transferListCommand() *command { // validate flags var filters []*datatx.ListTransfersRequest_Filter - if *filterShareId != "" { + if *filterShareID != "" { filters = append(filters, &datatx.ListTransfersRequest_Filter{ Type: datatx.ListTransfersRequest_Filter_TYPE_SHARE_ID, Term: &datatx.ListTransfersRequest_Filter_ShareId{ ShareId: &ocm.ShareId{ - OpaqueId: *filterShareId, + OpaqueId: *filterShareID, }, }, }) diff --git a/cmd/reva/transfer-retry.go b/cmd/reva/transfer-retry.go index cc3e1643a1..2f2ada08d9 100644 --- a/cmd/reva/transfer-retry.go +++ b/cmd/reva/transfer-retry.go @@ -34,11 +34,11 @@ func transferRetryCommand() *command { cmd := newCommand("transfer-retry") cmd.Description = func() string { return "retry a transfer" } cmd.Usage = func() string { return "Usage: transfer-retry [-flags]" } - txId := cmd.String("txId", "", "the transfer identifier") + txID := cmd.String("txId", "", "the transfer identifier") cmd.Action = func(w ...io.Writer) error { // validate flags - if *txId == "" { + if *txID == "" { return errors.New("txId must be specified: use -txId flag\n" + cmd.Usage()) } @@ -50,7 +50,7 @@ func transferRetryCommand() *command { retryRequest := &datatx.RetryTransferRequest{ TxId: &datatx.TxId{ - OpaqueId: *txId, + OpaqueId: *txID, }, } diff --git a/pkg/datatx/datatx.go b/pkg/datatx/datatx.go index 945479d23b..3b1d98d6b2 100644 --- a/pkg/datatx/datatx.go +++ b/pkg/datatx/datatx.go @@ -29,10 +29,10 @@ type Manager interface { // StartTransfer initiates a transfer job and returns a TxInfo object including a unique transfer id, and error if any. StartTransfer(ctx context.Context, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) // GetTransferStatus returns a TxInfo object including the current status, and error if any. - GetTransferStatus(ctx context.Context, transferId string) (*datatx.TxInfo, error) + GetTransferStatus(ctx context.Context, transferID string) (*datatx.TxInfo, error) // CancelTransfer cancels the transfer and returns a TxInfo object and error if any. - CancelTransfer(ctx context.Context, transferId string) (*datatx.TxInfo, error) + CancelTransfer(ctx context.Context, transferID string) (*datatx.TxInfo, error) // RetryTransfer retries the transfer and returns a TxInfo object and error if any. // Note that tokens must still be valid. - RetryTransfer(ctx context.Context, transferId string) (*datatx.TxInfo, error) + RetryTransfer(ctx context.Context, transferID string) (*datatx.TxInfo, error) } diff --git a/pkg/datatx/manager/rclone/rclone.go b/pkg/datatx/manager/rclone/rclone.go index f977c00217..f25624af46 100644 --- a/pkg/datatx/manager/rclone/rclone.go +++ b/pkg/datatx/manager/rclone/rclone.go @@ -75,7 +75,7 @@ type rclone struct { pDriver *pDriver } -type rcloneHttpErrorRes struct { +type rcloneHTTPErrorRes struct { Error string `json:"error"` Input map[string]interface{} `json:"input"` Path string `json:"path"` @@ -212,27 +212,27 @@ func (driver *rclone) StartTransfer(ctx context.Context, srcRemote string, srcPa return driver.startJob(ctx, "", srcRemote, srcPath, srcToken, destRemote, destPath, destToken) } -// startJob starts a transfer job. Retries a previous job if transferId is specified. -func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) { +// startJob starts a transfer job. Retries a previous job if transferID is specified. +func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) { logger := appctx.GetLogger(ctx) driver.pDriver.Lock() defer driver.pDriver.Unlock() - var txId string + var txID string var cTime *typespb.Timestamp - if transferId == "" { - txId = uuid.New().String() + if transferID == "" { + txID = uuid.New().String() cTime = &typespb.Timestamp{Seconds: uint64(time.Now().Unix())} - } else { // restart existing transfer if transferId is specified - logger.Debug().Msgf("Restarting transfer (txId: %s)", transferId) - txId = transferId - transfer, err := driver.pDriver.model.getTransfer(txId) + } else { // restart existing transfer if transferID is specified + logger.Debug().Msgf("Restarting transfer (txID: %s)", transferID) + txID = transferID + transfer, err := driver.pDriver.model.getTransfer(txID) if err != nil { - err = errors.Wrap(err, "rclone: error retrying transfer (transferId: "+txId+")") + err = errors.Wrap(err, "rclone: error retrying transfer (transferID: "+txID+")") return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: txId}, + Id: &datatx.TxId{OpaqueId: txID}, Status: datatx.Status_STATUS_INVALID, Ctime: nil, }, err @@ -243,7 +243,7 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote if !endStatusFound { err := errors.New("rclone: transfer still running, unable to restart") return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: txId}, + Id: &datatx.TxId{OpaqueId: txID}, Status: transfer.TransferStatus, Ctime: cTime, }, err @@ -254,13 +254,13 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote destToken = transfer.DestToken destRemote = transfer.DestRemote destPath = transfer.DestPath - delete(driver.pDriver.model.Transfers, txId) + delete(driver.pDriver.model.Transfers, txID) } transferStatus := datatx.Status_STATUS_TRANSFER_NEW transfer := &transfer{ - TransferID: txId, + TransferID: txID, JobID: int64(-1), TransferStatus: transferStatus, SrcToken: srcToken, @@ -272,7 +272,7 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote Ctime: fmt.Sprint(cTime.Seconds), // TODO do we need nanos here? } - driver.pDriver.model.Transfers[txId] = transfer + driver.pDriver.model.Transfers[txID] = transfer type rcloneAsyncReqJSON struct { SrcFs string `json:"srcFs"` @@ -293,7 +293,7 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote err = errors.Wrap(err, "rclone: error pulling transfer: error marshalling rclone req data") transfer.TransferStatus = datatx.Status_STATUS_INVALID return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: txId}, + Id: &datatx.TxId{OpaqueId: txID}, Status: datatx.Status_STATUS_INVALID, Ctime: cTime, }, driver.pDriver.model.saveTransfer(err) @@ -305,17 +305,16 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote err = errors.Wrap(err, "rclone: error pulling transfer: error stating src path") transfer.TransferStatus = datatx.Status_STATUS_INVALID return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: txId}, + Id: &datatx.TxId{OpaqueId: txID}, Status: datatx.Status_STATUS_INVALID, Ctime: cTime, }, driver.pDriver.model.saveTransfer(err) } if !remotePathIsFolder { - transferFileMethod = "/operations/copyfile" err = errors.Wrap(err, "rclone: error pulling transfer: path is a file, only folder transfer is implemented") transfer.TransferStatus = datatx.Status_STATUS_INVALID return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: txId}, + Id: &datatx.TxId{OpaqueId: txID}, Status: datatx.Status_STATUS_INVALID, Ctime: cTime, }, driver.pDriver.model.saveTransfer(err) @@ -326,7 +325,7 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote err = errors.Wrap(err, "rclone: error pulling transfer: error parsing driver endpoint") transfer.TransferStatus = datatx.Status_STATUS_INVALID return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: txId}, + Id: &datatx.TxId{OpaqueId: txID}, Status: datatx.Status_STATUS_INVALID, Ctime: cTime, }, driver.pDriver.model.saveTransfer(err) @@ -338,7 +337,7 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote err = errors.Wrap(err, "rclone: error pulling transfer: error framing post request") transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: txId}, + Id: &datatx.TxId{OpaqueId: txID}, Status: transfer.TransferStatus, Ctime: cTime, }, driver.pDriver.model.saveTransfer(err) @@ -351,7 +350,7 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote err = errors.Wrap(err, "rclone: error pulling transfer: error sending post request") transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: txId}, + Id: &datatx.TxId{OpaqueId: txID}, Status: transfer.TransferStatus, Ctime: cTime, }, driver.pDriver.model.saveTransfer(err) @@ -360,12 +359,12 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote defer res.Body.Close() if res.StatusCode != http.StatusOK { - var errorResData rcloneHttpErrorRes + var errorResData rcloneHTTPErrorRes if err = json.NewDecoder(res.Body).Decode(&errorResData); err != nil { err = errors.Wrap(err, "rclone driver: error decoding response data") transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: txId}, + Id: &datatx.TxId{OpaqueId: txID}, Status: transfer.TransferStatus, Ctime: cTime, }, driver.pDriver.model.saveTransfer(err) @@ -373,7 +372,7 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote e := errors.New("rclone: rclone request responded with error, " + fmt.Sprintf(" status: %v, error: %v", errorResData.Status, errorResData.Error)) transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: txId}, + Id: &datatx.TxId{OpaqueId: txID}, Status: transfer.TransferStatus, Ctime: cTime, }, driver.pDriver.model.saveTransfer(e) @@ -387,7 +386,7 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote err = errors.Wrap(err, "rclone: error decoding response data") transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: txId}, + Id: &datatx.TxId{OpaqueId: txID}, Status: transfer.TransferStatus, Ctime: cTime, }, driver.pDriver.model.saveTransfer(err) @@ -398,7 +397,7 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote if err := driver.pDriver.model.saveTransfer(nil); err != nil { err = errors.Wrap(err, "rclone: error pulling transfer") return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: txId}, + Id: &datatx.TxId{OpaqueId: txID}, Status: datatx.Status_STATUS_INVALID, Ctime: cTime, }, err @@ -414,11 +413,11 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote defer driver.pDriver.Unlock() for { - transfer, err := driver.pDriver.model.getTransfer(txId) + transfer, err := driver.pDriver.model.getTransfer(txID) if err != nil { transfer.TransferStatus = datatx.Status_STATUS_INVALID err = driver.pDriver.model.saveTransfer(err) - logger.Error().Err(err).Msgf("rclone driver: unable to retrieve transfer with id: %v", txId) + logger.Error().Err(err).Msgf("rclone driver: unable to retrieve transfer with id: %v", txID) break } @@ -488,7 +487,7 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote defer res.Body.Close() if res.StatusCode != http.StatusOK { - var errorResData rcloneHttpErrorRes + var errorResData rcloneHTTPErrorRes if err = json.NewDecoder(res.Body).Decode(&errorResData); err != nil { err = errors.Wrap(err, "rclone driver: error decoding response data") logger.Error().Err(err).Msgf("rclone driver: error reading response body: %v", err) @@ -562,36 +561,36 @@ func (driver *rclone) startJob(ctx context.Context, transferId string, srcRemote }() return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: txId}, + Id: &datatx.TxId{OpaqueId: txID}, Status: transferStatus, Ctime: cTime, }, nil } // GetTransferStatus returns the status of the transfer with the specified job id -func (driver *rclone) GetTransferStatus(ctx context.Context, transferId string) (*datatx.TxInfo, error) { - transfer, err := driver.pDriver.model.getTransfer(transferId) +func (driver *rclone) GetTransferStatus(ctx context.Context, transferID string) (*datatx.TxInfo, error) { + transfer, err := driver.pDriver.model.getTransfer(transferID) if err != nil { return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: transferId}, + Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, Ctime: nil, }, err } cTime, _ := strconv.ParseInt(transfer.Ctime, 10, 64) return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: transferId}, + Id: &datatx.TxId{OpaqueId: transferID}, Status: transfer.TransferStatus, Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, }, nil } // CancelTransfer cancels the transfer with the specified transfer id -func (driver *rclone) CancelTransfer(ctx context.Context, transferId string) (*datatx.TxInfo, error) { - transfer, err := driver.pDriver.model.getTransfer(transferId) +func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*datatx.TxInfo, error) { + transfer, err := driver.pDriver.model.getTransfer(transferID) if err != nil { return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: transferId}, + Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, Ctime: nil, }, err @@ -601,7 +600,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferId string) (*d if endStatusFound { err := errors.New("rclone driver: transfer already in end state") return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: transferId}, + Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, }, err @@ -619,7 +618,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferId string) (*d if err != nil { err = errors.Wrap(err, "rclone driver: error marshalling rclone req data") return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: transferId}, + Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, }, err @@ -631,7 +630,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferId string) (*d if err != nil { err = errors.Wrap(err, "rclone driver: error parsing driver endpoint") return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: transferId}, + Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, }, err @@ -643,7 +642,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferId string) (*d if err != nil { err = errors.Wrap(err, "rclone driver: error framing post request") return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: transferId}, + Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, }, err @@ -656,7 +655,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferId string) (*d if err != nil { err = errors.Wrap(err, "rclone driver: error sending post request") return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: transferId}, + Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, }, err @@ -665,18 +664,18 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferId string) (*d defer res.Body.Close() if res.StatusCode != http.StatusOK { - var errorResData rcloneHttpErrorRes + var errorResData rcloneHTTPErrorRes if err = json.NewDecoder(res.Body).Decode(&errorResData); err != nil { err = errors.Wrap(err, "rclone driver: error decoding response data") return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: transferId}, + Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, }, err } - err = errors.Wrap(errors.New(fmt.Sprintf("status: %v, error: %v", errorResData.Status, errorResData.Error)), "rclone driver: rclone request responded with error") + err = errors.Wrap(errors.Errorf("status: %v, error: %v", errorResData.Status, errorResData.Error), "rclone driver: rclone request responded with error") return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: transferId}, + Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, }, err @@ -698,7 +697,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferId string) (*d if err = json.NewDecoder(res.Body).Decode(&resData); err != nil { err = errors.Wrap(err, "rclone driver: error decoding response data") return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: transferId}, + Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, }, err @@ -706,7 +705,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferId string) (*d if resData.Error != "" { return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: transferId}, + Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_TRANSFER_CANCEL_FAILED, Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, }, errors.New(resData.Error) @@ -715,14 +714,14 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferId string) (*d transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_CANCELLED if err := driver.pDriver.model.saveTransfer(nil); err != nil { return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: transferId}, + Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_INVALID, Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, }, err } return &datatx.TxInfo{ - Id: &datatx.TxId{OpaqueId: transferId}, + Id: &datatx.TxId{OpaqueId: transferID}, Status: datatx.Status_STATUS_TRANSFER_CANCELLED, Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, }, nil @@ -730,13 +729,13 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferId string) (*d // RetryTransfer retries the transfer with the specified transfer ID. // Note that tokens must still be valid. -func (driver *rclone) RetryTransfer(ctx context.Context, transferId string) (*datatx.TxInfo, error) { - return driver.startJob(ctx, transferId, "", "", "", "", "", "") +func (driver *rclone) RetryTransfer(ctx context.Context, transferID string) (*datatx.TxInfo, error) { + return driver.startJob(ctx, transferID, "", "", "", "", "", "") } // getTransfer returns the transfer with the specified transfer ID -func (m *transferModel) getTransfer(transferId string) (*transfer, error) { - transfer, ok := m.Transfers[transferId] +func (m *transferModel) getTransfer(transferID string) (*transfer, error) { + transfer, ok := m.Transfers[transferID] if !ok { return nil, errors.New("rclone driver: invalid transfer ID") } @@ -783,11 +782,11 @@ func (driver *rclone) remotePathIsFolder(remote string, remotePath string, remot defer res.Body.Close() if res.StatusCode != http.StatusOK { - var errorResData rcloneHttpErrorRes + var errorResData rcloneHTTPErrorRes if err = json.NewDecoder(res.Body).Decode(&errorResData); err != nil { return false, errors.Wrap(err, "rclone driver: error decoding response data") } - return false, errors.Wrap(errors.New(fmt.Sprintf("status: %v, error: %v", errorResData.Status, errorResData.Error)), "rclone driver: rclone request responded with error") + return false, errors.Wrap(errors.Errorf("status: %v, error: %v", errorResData.Status, errorResData.Error), "rclone driver: rclone request responded with error") } type item struct { From e9268291bde82b90346d1bb9a8e9fec978c575d0 Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Thu, 16 Sep 2021 16:37:48 +0200 Subject: [PATCH 18/21] Make sure to be in end state on any transfer failure. --- pkg/datatx/manager/rclone/rclone.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pkg/datatx/manager/rclone/rclone.go b/pkg/datatx/manager/rclone/rclone.go index f25624af46..5e4008cbbf 100644 --- a/pkg/datatx/manager/rclone/rclone.go +++ b/pkg/datatx/manager/rclone/rclone.go @@ -456,7 +456,6 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote transfer.TransferStatus = datatx.Status_STATUS_INVALID if err := driver.pDriver.model.saveTransfer(nil); err != nil { logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) - break } break } @@ -466,6 +465,10 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote u, err := url.Parse(driver.config.Endpoint) if err != nil { logger.Error().Err(err).Msgf("rclone driver: could not parse driver endpoint: %v", err) + transfer.TransferStatus = datatx.Status_STATUS_INVALID + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + } break } u.Path = path.Join(u.Path, transferFileMethod) @@ -474,6 +477,10 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote req, err := http.NewRequest("POST", requestURL, bytes.NewReader(data)) if err != nil { logger.Error().Err(err).Msgf("rclone driver: error framing post request: %v", err) + transfer.TransferStatus = datatx.Status_STATUS_INVALID + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + } break } req.Header.Set("Content-Type", "application/json") @@ -481,6 +488,10 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote res, err := driver.client.Do(req) if err != nil { logger.Error().Err(err).Msgf("rclone driver: error sending post request: %v", err) + transfer.TransferStatus = datatx.Status_STATUS_INVALID + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + } break } @@ -493,6 +504,10 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote logger.Error().Err(err).Msgf("rclone driver: error reading response body: %v", err) } logger.Error().Err(err).Msgf("rclone driver: rclone request responded with error, status: %v, error: %v", errorResData.Status, errorResData.Error) + transfer.TransferStatus = datatx.Status_STATUS_INVALID + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + } break } From c73e8d8da5e4a455376fa7b7a111d2fb46317614 Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Thu, 4 Nov 2021 15:09:54 +0100 Subject: [PATCH 19/21] Merge branch 'main' into datatx-pull-model --- .../grpc/services/gateway/ocmshareprovider.go | 238 +++++------------- 1 file changed, 64 insertions(+), 174 deletions(-) diff --git a/internal/grpc/services/gateway/ocmshareprovider.go b/internal/grpc/services/gateway/ocmshareprovider.go index 09e595534d..d0675b7c6e 100644 --- a/internal/grpc/services/gateway/ocmshareprovider.go +++ b/internal/grpc/services/gateway/ocmshareprovider.go @@ -207,36 +207,6 @@ func (s *svc) ListReceivedOCMShares(ctx context.Context, req *ocm.ListReceivedOC func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceivedOCMShareRequest) (*ocm.UpdateReceivedOCMShareResponse, error) { log := appctx.GetLogger(ctx) - - getShareReq := &ocm.GetReceivedOCMShareRequest{Ref: req.Ref} - getShareRes, err := s.GetReceivedOCMShare(ctx, getShareReq) - if err != nil { - log.Err(err).Msg("gateway: error calling GetReceivedShare") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - - if getShareRes.Status.Code != rpc.Code_CODE_OK { - log.Error().Msg("gateway: error calling GetReceivedShare") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - - share := getShareRes.Share - if share == nil { - panic("gateway: error updating a received share: the share is nil") - } - - // return early if transfer type share has already been accepted - if share.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER && share.GetState() == ocm.ShareState_SHARE_STATE_ACCEPTED { - log.Err(err).Msg("gateway: transfer type share already accepted") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: status.NewOK(ctx), - }, nil - } - c, err := pool.GetOCMShareProviderClient(s.c.OCMShareProviderEndpoint) if err != nil { err = errors.Wrap(err, "gateway: error calling GetOCMShareProviderClient") @@ -275,208 +245,128 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive }, } getShareRes, err := s.GetReceivedOCMShare(ctx, getShareReq) - // we don't commit to storage invalid update fields or empty display names. - if req.Field.GetState() == ocm.ShareState_SHARE_STATE_INVALID && req.Field.GetDisplayName() == "" { - log.Error().Msg("the update field is invalid, aborting reference manipulation") - return res, nil - - } - - // TODO(labkode): if update field is displayName we need to do a rename on the storage to align - // share display name and storage filename. - if req.Field.GetState() != ocm.ShareState_SHARE_STATE_INVALID { - if req.Field.GetState() == ocm.ShareState_SHARE_STATE_ACCEPTED { - if share.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER { - srcIdp := share.GetShare().GetOwner().GetIdp() - meshProvider, err := s.GetInfoByDomain(ctx, &ocmprovider.GetInfoByDomainRequest{ - Domain: srcIdp, - }) if err != nil { log.Err(err).Msg("gateway: error calling GetReceivedShare") return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + Status: &rpc.Status{ + Code: rpc.Code_CODE_INTERNAL, + }, }, nil } if getShareRes.Status.Code != rpc.Code_CODE_OK { log.Error().Msg("gateway: error calling GetReceivedShare") - var srcEndpoint string - var srcEndpointBaseURI string - // target URI scheme will be the webdav endpoint scheme - var srcEndpointScheme string - for _, s := range meshProvider.ProviderInfo.Services { - if strings.ToLower(s.Endpoint.Type.Name) == "webdav" { - url, err := url.Parse(s.Endpoint.Path) - if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + s.Endpoint.Path) - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - srcEndpoint = url.Host - srcEndpointBaseURI = url.Path - srcEndpointScheme = url.Scheme - break - } - } - - var srcToken string - srcTokenOpaque, ok := share.GetShare().Grantee.Opaque.Map["token"] - if !ok { return &ocm.UpdateReceivedOCMShareResponse{ - Status: status.NewNotFound(ctx, "token not found"), - }, nil - } - switch srcTokenOpaque.Decoder { - case "plain": - srcToken = string(srcTokenOpaque.Value) - default: - err := errtypes.NotSupported("opaque entry decoder not recognized: " + srcTokenOpaque.Decoder) - return &ocm.UpdateReceivedOCMShareResponse{ - Status: status.NewInternal(ctx, err, "error updating received share"), - }, nil - } - - srcPath := path.Join(srcEndpointBaseURI, share.GetShare().Name) - srcTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", srcEndpointScheme, srcToken, srcEndpoint, srcPath) - - // get the webdav endpoint of the grantee's idp - var granteeIdp string - if share.GetShare().GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_USER { - granteeIdp = share.GetShare().GetGrantee().GetUserId().Idp - } - if share.GetShare().GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_GROUP { - granteeIdp = share.GetShare().GetGrantee().GetGroupId().Idp - } - destWebdavEndpoint, err := s.getWebdavEndpoint(ctx, granteeIdp) - if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - url, err := url.Parse(destWebdavEndpoint) - if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + destWebdavEndpoint) - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - destEndpoint := url.Host - destEndpointBaseURI := url.Path - destEndpointScheme := url.Scheme - destToken := ctxpkg.ContextMustGetToken(ctx) - homeRes, err := s.GetHome(ctx, &provider.GetHomeRequest{}) - if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + Status: &rpc.Status{ + Code: rpc.Code_CODE_INTERNAL, + }, }, nil } - destPath := path.Join(destEndpointBaseURI, homeRes.Path, s.c.DataTransfersFolder, path.Base(share.GetShare().Name)) - destTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", destEndpointScheme, destToken, destEndpoint, destPath) share := getShareRes.Share if share == nil { panic("gateway: error updating a received share: the share is nil") - opaqueObj := &types.Opaque{ - Map: map[string]*types.OpaqueEntry{ - "shareId": { - Decoder: "plain", - Value: []byte(share.GetShare().GetId().OpaqueId), - }, - }, - } - req := &datatx.PullTransferRequest{ - SrcTargetUri: srcTargetURI, - DestTargetUri: destTargetURI, - Opaque: opaqueObj, } if share.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER { - idp := share.GetShare().GetOwner().GetIdp() + srcIdp := share.GetShare().GetOwner().GetIdp() meshProvider, err := s.GetInfoByDomain(ctx, &ocmprovider.GetInfoByDomainRequest{ - Domain: idp, + Domain: srcIdp, }) if err != nil { log.Err(err).Msg("gateway: error calling GetInfoByDomain") return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{ - Code: rpc.Code_CODE_INTERNAL, - }, + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, }, nil } - var endpoint string - var endpointScheme string + var srcEndpoint string + var srcEndpointBaseURI string + // target URI scheme will be the webdav endpoint scheme + var srcEndpointScheme string for _, s := range meshProvider.ProviderInfo.Services { - fmt.Printf("provider info service: %v\n", s) - fmt.Printf(" endpoint type name: %v\n", s.Endpoint.Type.Name) - fmt.Printf(" endpoint Path: %v\n", s.Endpoint.Path) - fmt.Printf(" service host: %v\n", s.GetHost()) if strings.ToLower(s.Endpoint.Type.Name) == "webdav" { - url, _ := url.Parse(s.Endpoint.Path) - endpoint = url.Host + url.Path - endpointScheme = url.Scheme + url, err := url.Parse(s.Endpoint.Path) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + s.Endpoint.Path) + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + }, nil + } + srcEndpoint = url.Host + srcEndpointBaseURI = url.Path + srcEndpointScheme = url.Scheme break } } - var token string - tokenOpaque, ok := share.GetShare().Grantee.Opaque.Map["token"] + var srcToken string + srcTokenOpaque, ok := share.GetShare().Grantee.Opaque.Map["token"] if !ok { return &ocm.UpdateReceivedOCMShareResponse{ Status: status.NewNotFound(ctx, "token not found"), }, nil } - switch tokenOpaque.Decoder { + switch srcTokenOpaque.Decoder { case "plain": - token = string(tokenOpaque.Value) + srcToken = string(srcTokenOpaque.Value) default: - err := errtypes.NotSupported("opaque entry decoder not recognized: " + tokenOpaque.Decoder) + err := errtypes.NotSupported("opaque entry decoder not recognized: " + srcTokenOpaque.Decoder) return &ocm.UpdateReceivedOCMShareResponse{ Status: status.NewInternal(ctx, err, "error updating received share"), }, nil } - // TODO we provide all necessary info with the targetURI - // either provide this in a 'proper' way or, - // inject the necessary services into datatx service and resolve everything there - targetURI := fmt.Sprintf("://%s@%s?name=%s&endpointscheme=%s", token, endpoint, share.GetShare().Name, endpointScheme) - // src: https(from src webdav endoint)://token@srcwebdavendpoint?name=path - // target idem taken from the grantee - // /home/DataTransfers/home/mytransfer/innerfolder/ - fmt.Printf("idp: %v\n", idp) - fmt.Printf("endpoint: %v\n", endpoint) - fmt.Printf("token: %v\n", token) - fmt.Printf("targetURI: %v\n", targetURI) + srcPath := path.Join(srcEndpointBaseURI, share.GetShare().Name) + srcTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", srcEndpointScheme, srcToken, srcEndpoint, srcPath) // get the webdav endpoint of the grantee's idp - // assume grantee is of type user - granteeIdpEndpoint, err := s.getWebdavEndpoint(ctx, share.GetShare().GetGrantee().GetUserId().Idp) + var granteeIdp string + if share.GetShare().GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_USER { + granteeIdp = share.GetShare().GetGrantee().GetUserId().Idp + } + if share.GetShare().GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_GROUP { + granteeIdp = share.GetShare().GetGrantee().GetGroupId().Idp + } + destWebdavEndpoint, err := s.getWebdavEndpoint(ctx, granteeIdp) if err != nil { - log.Err(err).Msg("gateway: error calling PullTransfer") + log.Err(err).Msg("gateway: error calling UpdateReceivedShare") return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{ - Code: rpc.Code_CODE_INTERNAL, - }, + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + }, nil + } + url, err := url.Parse(destWebdavEndpoint) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + destWebdavEndpoint) + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + }, nil + } + destEndpoint := url.Host + destEndpointBaseURI := url.Path + destEndpointScheme := url.Scheme + destToken := ctxpkg.ContextMustGetToken(ctx) + homeRes, err := s.GetHome(ctx, &provider.GetHomeRequest{}) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, }, nil } + destPath := path.Join(destEndpointBaseURI, homeRes.Path, s.c.DataTransfersFolder, path.Base(share.GetShare().Name)) + destTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", destEndpointScheme, destToken, destEndpoint, destPath) opaqueObj := &types.Opaque{ Map: map[string]*types.OpaqueEntry{ - "shareId": &types.OpaqueEntry{ + "shareId": { Decoder: "plain", Value: []byte(share.GetShare().GetId().OpaqueId), }, - "endpoint": &types.OpaqueEntry{ - Decoder: "plain", - Value: []byte(granteeIdpEndpoint), - }, }, } req := &datatx.PullTransferRequest{ - TargetUri: targetURI, - Opaque: opaqueObj, + SrcTargetUri: srcTargetURI, + DestTargetUri: destTargetURI, + Opaque: opaqueObj, } res, err := s.PullTransfer(ctx, req) if err != nil { From f412f5dcfb5c30b9d3bb6ecf791c4d595084945d Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Mon, 14 Feb 2022 11:51:41 +0100 Subject: [PATCH 20/21] New cs3apis version: data transfers --- go.mod | 3 ++- go.sum | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 11452b6a6b..4d15ab1c62 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/cheggaaa/pb v1.0.29 github.com/coreos/go-oidc v2.2.1+incompatible github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e - github.com/cs3org/go-cs3apis v0.0.0-20211214102128-4e8745ab1654 + github.com/cs3org/go-cs3apis v0.0.0-20220214084335-d975ab5d6e65 github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 github.com/dgraph-io/ristretto v0.1.0 github.com/eventials/go-tus v0.0.0-20200718001131-45c7ec8f5d59 @@ -89,6 +89,7 @@ require ( go 1.16 replace ( + // github.com/cs3org/go-cs3apis => ../../go-cs3apis github.com/eventials/go-tus => github.com/andrewmostello/go-tus v0.0.0-20200314041820-904a9904af9a github.com/oleiade/reflections => github.com/oleiade/reflections v1.0.1 google.golang.org/grpc => google.golang.org/grpc v1.26.0 // temporary downgrade diff --git a/go.sum b/go.sum index bc81b2f76a..24ab813bc3 100644 --- a/go.sum +++ b/go.sum @@ -210,6 +210,8 @@ github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e h1:tqSPWQeueWTKnJVMJff github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e/go.mod h1:XJEZ3/EQuI3BXTp/6DUzFr850vlxq11I6satRtz0YQ4= github.com/cs3org/go-cs3apis v0.0.0-20211214102128-4e8745ab1654 h1:ha5tiuuFyDrwKUrVEc3TrRDFgTKVQ9NGDRmEP0PRPno= github.com/cs3org/go-cs3apis v0.0.0-20211214102128-4e8745ab1654/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= +github.com/cs3org/go-cs3apis v0.0.0-20220214084335-d975ab5d6e65 h1:cee0dhBsF8KofV2TM52T41eOo1QLSgtgEZsjYmC5dhU= +github.com/cs3org/go-cs3apis v0.0.0-20220214084335-d975ab5d6e65/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= From 02754ff928d00632466ffab7cbb17614dbcbb4ab Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Wed, 16 Mar 2022 10:47:35 +0100 Subject: [PATCH 21/21] Remove obsolete line. --- go.mod | 1 - 1 file changed, 1 deletion(-) diff --git a/go.mod b/go.mod index 4d15ab1c62..c077f351f2 100644 --- a/go.mod +++ b/go.mod @@ -89,7 +89,6 @@ require ( go 1.16 replace ( - // github.com/cs3org/go-cs3apis => ../../go-cs3apis github.com/eventials/go-tus => github.com/andrewmostello/go-tus v0.0.0-20200314041820-904a9904af9a github.com/oleiade/reflections => github.com/oleiade/reflections v1.0.1 google.golang.org/grpc => google.golang.org/grpc v1.26.0 // temporary downgrade