Skip to content

Commit

Permalink
CR Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
N-o-Z committed Aug 8, 2023
1 parent 7c2d231 commit 87dafeb
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 86 deletions.
33 changes: 20 additions & 13 deletions cmd/lakectl/cmd/local_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-openapi/swag"
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/api"
"github.com/treeverse/lakefs/pkg/fileutil"
"github.com/treeverse/lakefs/pkg/local"
"github.com/treeverse/lakefs/pkg/uri"
)
Expand All @@ -21,22 +22,33 @@ const (

var localCloneCmd = &cobra.Command{
Use: "clone <path uri> [directory]",
Short: "init local directory and download objects from lakeFS path",
Short: "Clone a path from a lakeFS repository into a new directory.",
Args: cobra.RangeArgs(localCloneMinArgs, localCloneMaxArgs),
Run: func(cmd *cobra.Command, args []string) {
remote := MustParsePathURI("path", args[0])
dir := "."
if len(args) == localCloneMaxArgs {
dir = args[1]
}
force := Must(cmd.Flags().GetBool("force"))
syncFlags := getLocalSyncFlags(cmd)
localPath, err := filepath.Abs(dir)
if err != nil {
DieErr(err)
}
idx := localInit(cmd.Context(), localPath, remote, force)
stableRemote := uri.WithRef(remote, idx.AtHead)

empty, err := fileutil.IsDirEmpty(localPath)
if err != nil {
DieErr(err)
}
if !empty {
DieFmt("directory '%s' exists and is not empty", localPath)
}

idx, err := localInit(cmd.Context(), localPath, remote, false)
if err != nil {
DieErr(err)
}
stableRemote := remote.WithRef(idx.AtHead)
client := getClient()
// Dynamically construct changes
c := make(chan *local.Change, filesChanSize)
Expand All @@ -45,17 +57,13 @@ var localCloneCmd = &cobra.Command{
hasMore := true
var after string
for hasMore {
listResp, err := client.ListObjectsWithResponse(cmd.Context(), remote.Repository, remote.Ref, &api.ListObjectsParams{
listResp, err := client.ListObjectsWithResponse(cmd.Context(), remote.Repository, stableRemote.Ref, &api.ListObjectsParams{
After: (*api.PaginationAfter)(swag.String(after)),
Prefix: (*api.PaginationPrefix)(remote.Path),
UserMetadata: swag.Bool(true),
})
if err != nil {
DieErr(err)
}
if listResp.HTTPResponse.StatusCode != http.StatusOK {
DieErr(fmt.Errorf("%w: HTTP %d", err, listResp.StatusCode()))
}
DieOnErrorOrUnexpectedStatusCode(listResp, err, http.StatusOK)

for _, o := range listResp.JSON200.Results {
path := strings.TrimPrefix(o.Path, remote.GetPath())
// skip directory markers
Expand All @@ -81,13 +89,12 @@ var localCloneCmd = &cobra.Command{
if err != nil {
DieErr(err)
}
fmt.Printf("Successfully cloned %s to %s.\nTotal objects downloaded: %d", remote, localPath, s.Summary().Download)
fmt.Printf("Successfully cloned %s to %s.\nTotal objects downloaded: %d", remote, localPath, s.Summary().Downloaded)
},
}

//nolint:gochecknoinits
func init() {
localCloneCmd.Flags().Bool("force", false, "Overwrites if directory already linked to a lakeFS path")
withLocalSyncFlags(localCloneCmd)
localCmd.AddCommand(localCloneCmd)
}
23 changes: 15 additions & 8 deletions cmd/lakectl/cmd/local_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"

Expand All @@ -18,38 +19,38 @@ const (
localInitMaxArgs = 2
)

func localInit(ctx context.Context, dir string, remote *uri.URI, force bool) *local.Index {
func localInit(ctx context.Context, dir string, remote *uri.URI, force bool) (*local.Index, error) {
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
DieErr(err)
}
exists, err := local.IndexExists(dir)
if err != nil {
DieErr(err)
return nil, err
}
if exists && !force {
DieFmt("directory '%s' already linked to a lakefs path, run command with --force to overwrite", dir)
return nil, fs.ErrExist
}

// dereference
head := resolveCommitOrDie(ctx, getClient(), remote.Repository, remote.Ref)
idx, err := local.WriteIndex(dir, remote, head)
if err != nil {
DieErr(err)
return nil, err
}

ignoreFile, err := git.Ignore(dir, []string{dir}, []string{filepath.Join(dir, local.IndexFileName)}, local.IgnoreMarker)
if err == nil {
fmt.Println("location added to", ignoreFile)
} else if !errors.Is(err, git.ErrNotARepository) {
DieErr(err)
return nil, err
}

return idx
return idx, nil
}

var localInitCmd = &cobra.Command{
Use: "init <path uri> [directory]",
Short: "set a local directory to sync with a lakeFS path",
Short: "set a local directory to sync with a lakeFS path.",
Args: cobra.RangeArgs(localInitMinArgs, localInitMaxArgs),
Run: func(cmd *cobra.Command, args []string) {
remote := MustParsePathURI("path", args[0])
Expand All @@ -63,7 +64,13 @@ var localInitCmd = &cobra.Command{
}
force := Must(cmd.Flags().GetBool("force"))

localInit(cmd.Context(), localPath, remote, force)
_, err = localInit(cmd.Context(), localPath, remote, force)
if err != nil {
if errors.Is(err, fs.ErrExist) {
DieFmt("directory '%s' already linked to a lakeFS path, run command with --force to overwrite", dir)
}
DieErr(err)
}

fmt.Printf("Successfully linked local directory '%s' with remote '%s'\n", localPath, remote)
},
Expand Down
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (

var localListCmd = &cobra.Command{
Use: "list [directory]",
Short: "find and list directories that are synced with lakeFS",
Short: "find and list directories that are synced with lakeFS.",
Args: cobra.RangeArgs(localListMinArgs, localListMaxArgs),
Run: func(cmd *cobra.Command, args []string) {
dir := "."
Expand Down
7 changes: 3 additions & 4 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -2576,7 +2576,7 @@ BETA: sync local directories with lakeFS paths

### lakectl local clone

init local directory and download objects from lakeFS path
Clone a path from a lakeFS repository into a new directory.

```
lakectl local clone <path uri> [directory] [flags]
Expand All @@ -2586,7 +2586,6 @@ lakectl local clone <path uri> [directory] [flags]
{:.no_toc}

```
--force Overwrites if directory already linked to a lakeFS path
-h, --help help for clone
-p, --parallelism int Max concurrent operations to perform (default 25)
--presign Use pre-signed URLs when downloading/uploading data (recommended) (default true)
Expand Down Expand Up @@ -2619,7 +2618,7 @@ lakectl local help [command] [flags]

### lakectl local init

set a local directory to sync with a lakeFS path
set a local directory to sync with a lakeFS path.

```
lakectl local init <path uri> [directory] [flags]
Expand All @@ -2637,7 +2636,7 @@ lakectl local init <path uri> [directory] [flags]

### lakectl local list

find and list directories that are synced with lakeFS
find and list directories that are synced with lakeFS.

```
lakectl local list [directory] [flags]
Expand Down
26 changes: 19 additions & 7 deletions pkg/fileutil/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,23 @@ func FindInParents(dir, filename string) (string, error) {
return "", nil
}

func IsDirEmpty(dir string) (bool, error) {
s, err := godirwalk.NewScanner(dir)
if err != nil {
return false, err
}
// Attempt to read only the first directory entry. Note that Scan skips both "." and ".." entries.
hasAtLeastOneChild := s.Scan()
if err = s.Err(); err != nil {
return false, err
}

if hasAtLeastOneChild {
return false, nil
}
return true, nil
}

// PruneEmptyDirectories iterates through the directory tree, removing empty directories, and directories that only
// contain empty directories.
func PruneEmptyDirectories(dir string) ([]string, error) {
Expand All @@ -67,17 +84,12 @@ func PruneEmptyDirectories(dir string) ([]string, error) {
return nil
},
PostChildrenCallback: func(d string, _ *godirwalk.Dirent) error {
s, err := godirwalk.NewScanner(d)
empty, err := IsDirEmpty(d)
if err != nil {
return err
}
// Attempt to read only the first directory entry. Note that Scan skips both "." and ".." entries.
hasAtLeastOneChild := s.Scan()
if err := s.Err(); err != nil {
return err
}

if d == dir || hasAtLeastOneChild { // do not remove top level directory or a directory with at least one child
if d == dir || !empty { // do not remove top level directory or a directory with at least one child
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/local/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ func (p *ProgressUpdater) Done() {
p.t.MarkAsDone()
}

func (p *ProgressUpdater) Error() {
p.t.MarkAsErrored()
}

type ProgressSpinner struct {
t *progress.Tracker
}

func (p *ProgressSpinner) Done() {
p.t.Increment(progressTrackerDone)
p.t.MarkAsDone()
}

type ProgressPool struct {
Expand Down
Loading

0 comments on commit 87dafeb

Please sign in to comment.