Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement lakectl local checkout #6360

Merged
merged 3 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/lakectl/cmd/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/api"
"github.com/treeverse/lakefs/pkg/diff"
"github.com/treeverse/lakefs/pkg/uri"
)

const (
Expand Down Expand Up @@ -65,7 +66,7 @@ var diffCmd = &cobra.Command{
if leftRefURI.Repository != rightRefURI.Repository {
Die("both references must belong to the same repository", 1)
}
printDiffRefs(cmd.Context(), client, leftRefURI.Repository, leftRefURI.Ref, rightRefURI.Ref, twoWay)
printDiffRefs(cmd.Context(), client, leftRefURI, rightRefURI, twoWay)
},
}

Expand Down Expand Up @@ -106,9 +107,9 @@ func printDiffBranch(ctx context.Context, client api.ClientWithResponsesInterfac
}
}

func printDiffRefs(ctx context.Context, client api.ClientWithResponsesInterface, repository string, leftRef string, rightRef string, twoDot bool) {
func printDiffRefs(ctx context.Context, client api.ClientWithResponsesInterface, left, right *uri.URI, twoDot bool) {
diffs := make(chan api.Diff, maxDiffPageSize)
err := diff.StreamRepositoryDiffs(ctx, client, repository, leftRef, rightRef, "", diffs, twoDot)
err := diff.StreamRepositoryDiffs(ctx, client, left, right, "", diffs, twoDot)
if err != nil {
DieErr(err)
}
Expand Down
22 changes: 22 additions & 0 deletions cmd/lakectl/cmd/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package cmd

import (
"context"
"fmt"
"path/filepath"

"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/api"
Expand All @@ -13,8 +15,12 @@ import (
const (
localDefaultSyncParallelism = 25
localDefaultSyncPresign = true
localDefaultMinArgs = 0
localDefaultMaxArgs = 1
)

var localDefaultArgsRange = cobra.RangeArgs(localDefaultMinArgs, localDefaultMaxArgs)

func withParallelismFlag(cmd *cobra.Command) {
cmd.Flags().IntP("parallelism", "p", localDefaultSyncParallelism,
"Max concurrent operations to perform")
Expand All @@ -41,7 +47,23 @@ func getLocalSyncFlags(cmd *cobra.Command) syncFlags {
return syncFlags{parallelism: parallelism, presign: presign}
}

func getLocalArgs(args []string, requireRemote bool) (remote *uri.URI, localPath string) {
idx := 0
if requireRemote {
remote = MustParseRefURI("path", args[0])
idx += 1
}

dir := "."
if len(args) > idx {
dir = args[idx]
}
localPath = Must(filepath.Abs(dir))
return
}

func localDiff(ctx context.Context, client api.ClientWithResponsesInterface, remote *uri.URI, path string) local.Changes {
fmt.Printf("diff 'local://%s' <--> '%s'...\n", path, remote)
currentRemoteState := make(chan api.ObjectStats, maxDiffPageSize)
var wg errgroup.Group
wg.Go(func() error {
Expand Down
88 changes: 88 additions & 0 deletions cmd/lakectl/cmd/local_checkout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cmd

import (
"errors"
"fmt"
"io/fs"
"strings"

"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/local"
)

var localCheckoutCmd = &cobra.Command{
Use: "checkout [directory]",
Short: "Sync local directory with the remote state.",
Args: localDefaultArgsRange,
Run: func(cmd *cobra.Command, args []string) {
_, localPath := getLocalArgs(args, false)
syncFlags := getLocalSyncFlags(cmd)
specifiedRef := Must(cmd.Flags().GetString("ref"))
idx, err := local.ReadIndex(localPath)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
DieFmt("directory %s is not linked to a lakeFS path", localPath)
}
DieErr(err)
}

remote, err := idx.GetCurrentURI()
if err != nil {
DieErr(err)
}

currentBase := remote.WithRef(idx.AtHead)
client := getClient()
diffs := local.Undo(localDiff(cmd.Context(), client, currentBase, idx.LocalPath()))
syncMgr := local.NewSyncManager(cmd.Context(), client, syncFlags.parallelism, syncFlags.presign)
// confirm on local changes
if len(diffs) > 0 {
fmt.Println("Uncommitted changes exist, the operation will revert all changes on local directory.")
confirmation, err := Confirm(cmd.Flags(), "Proceed")
if err != nil || !confirmation {
Die("command aborted", 1)
}
}

if specifiedRef != "" && specifiedRef != idx.AtHead {
newRemote := remote.WithRef(specifiedRef)
newHead := resolveCommitOrDie(cmd.Context(), client, newRemote.Repository, newRemote.Ref)
newBase := newRemote.WithRef(newHead)
// write new index
_, err = local.WriteIndex(idx.LocalPath(), newRemote, newHead)
if err != nil {
DieErr(err)
}

newDiffs := local.Undo(localDiff(cmd.Context(), client, newBase, idx.LocalPath()))
diffs = diffs.MergeWith(newDiffs, local.MergeStrategyOther)
currentBase = newBase
}
c := make(chan *local.Change, filesChanSize)
go func() {
defer close(c)
for _, dif := range diffs {
c <- &local.Change{
Source: local.ChangeSourceRemote,
Path: strings.TrimPrefix(dif.Path, currentBase.GetPath()),
Type: dif.Type,
}
}
}()
err = syncMgr.Sync(idx.LocalPath(), currentBase, c)
if err != nil {
DieErr(err)
}

summary := syncMgr.Summary()
fmt.Printf("Checkout Summary:\nDownloaded:\t%d\nRemoved:\t%d\n", summary.Downloaded, summary.Removed)
},
}

//nolint:gochecknoinits
func init() {
localCheckoutCmd.Flags().StringP("ref", "r", "", "Checkout the given source branch or reference")
AssignAutoConfirmFlag(localCheckoutCmd.Flags())
withLocalSyncFlags(localCheckoutCmd)
localCmd.AddCommand(localCheckoutCmd)
}
11 changes: 1 addition & 10 deletions cmd/lakectl/cmd/local_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io/fs"
"net/http"
"path/filepath"
"strings"

"github.com/go-openapi/swag"
Expand All @@ -27,16 +26,8 @@ var localCloneCmd = &cobra.Command{
Short: "Clone a path from a lakeFS repository into a new directory.",
Args: cobra.RangeArgs(localCloneMinArgs, localCloneMaxArgs),
Run: func(cmd *cobra.Command, args []string) {
remote := MustParsePathURI("path", args[0])
dir := "."
if len(args) == localCloneMaxArgs {
dir = args[1]
}
remote, localPath := getLocalArgs(args, true)
syncFlags := getLocalSyncFlags(cmd)
localPath, err := filepath.Abs(dir)
if err != nil {
DieErr(err)
}

empty, err := fileutil.IsDirEmpty(localPath)
if err != nil {
Expand Down
14 changes: 3 additions & 11 deletions cmd/lakectl/cmd/local_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,13 @@ var localInitCmd = &cobra.Command{
Short: "set a local directory to sync with a lakeFS path.",
Args: cobra.RangeArgs(localInitMinArgs, localInitMaxArgs),
Run: func(cmd *cobra.Command, args []string) {
remote := MustParsePathURI("path", args[0])
dir := "."
if len(args) == localInitMaxArgs {
dir = args[1]
}
localPath, err := filepath.Abs(dir)
if err != nil {
DieErr(err)
}
remote, localPath := getLocalArgs(args, true)
force := Must(cmd.Flags().GetBool("force"))

_, err = localInit(cmd.Context(), localPath, remote, force)
_, err := localInit(cmd.Context(), localPath, remote, force)
if err != nil {
if errors.Is(err, fs.ErrExist) {
DieFmt("directory '%s' already linked to a lakeFS path, run command with --force to overwrite", dir)
DieFmt("directory '%s' already linked to a lakeFS path, run command with --force to overwrite", localPath)
}
DieErr(err)
}
Expand Down
22 changes: 6 additions & 16 deletions cmd/lakectl/cmd/local_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,30 @@ import (
)

const (
localListMinArgs = 0
localListMaxArgs = 1

indicesListTemplate = `{{.IndicesListTable | table -}}`
)

var localListCmd = &cobra.Command{
Use: "list [directory]",
Short: "find and list directories that are synced with lakeFS.",
Args: cobra.RangeArgs(localListMinArgs, localListMaxArgs),
Args: localDefaultArgsRange,
Run: func(cmd *cobra.Command, args []string) {
dir := "."
if len(args) > 0 {
dir = args[0]
}
abs, err := filepath.Abs(dir)
if err != nil {
DieErr(err)
}
gitRoot, err := git.GetRepositoryPath(abs)
_, localPath := getLocalArgs(args, false)
gitRoot, err := git.GetRepositoryPath(localPath)
if err == nil {
abs = gitRoot
localPath = gitRoot
} else if !(errors.Is(err, git.ErrNotARepository) || errors.Is(err, git.ErrNoGit)) { // allow support in environments with no git
DieErr(err)
}

dirs, err := local.FindIndices(abs)
dirs, err := local.FindIndices(localPath)
if err != nil {
DieErr(err)
}

var rows [][]interface{}
for _, d := range dirs {
idx, err := local.ReadIndex(d)
idx, err := local.ReadIndex(filepath.Join(localPath, d))
if err != nil {
DieErr(err)
}
Expand Down
28 changes: 7 additions & 21 deletions cmd/lakectl/cmd/local_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"fmt"
"path/filepath"
"strings"

"github.com/go-openapi/swag"
Expand All @@ -13,27 +12,14 @@ import (
"golang.org/x/sync/errgroup"
)

const (
localPullMinArgs = 0
localPullMaxArgs = 1
)

var localPullCmd = &cobra.Command{
Use: "pull [directory]",
Short: "Fetch latest changes from lakeFS.",
Args: cobra.RangeArgs(localPullMinArgs, localPullMaxArgs),
Args: localDefaultArgsRange,
Run: func(cmd *cobra.Command, args []string) {
dir := "."
if len(args) == localPullMaxArgs {
dir = args[1]
}
_, localPath := getLocalArgs(args, false)
force := Must(cmd.Flags().GetBool("force"))
syncFlags := getLocalSyncFlags(cmd)
localPath, err := filepath.Abs(dir)
if err != nil {
DieErr(err)
}

idx, err := local.ReadIndex(localPath)
if err != nil {
DieErr(err)
Expand Down Expand Up @@ -63,10 +49,11 @@ var localPullCmd = &cobra.Command{
d := make(chan api.Diff, maxDiffPageSize)
var wg errgroup.Group
wg.Go(func() error {
return diff.StreamRepositoryDiffs(cmd.Context(), client, currentBase.Repository, currentBase.Ref, newBase.Ref, swag.StringValue(currentBase.Path), d, false)
return diff.StreamRepositoryDiffs(cmd.Context(), client, currentBase, newBase, swag.StringValue(currentBase.Path), d, false)
})
c := make(chan *local.Change, filesChanSize)
wg.Go(func() error {
defer close(c)
for dif := range d {
c <- &local.Change{
Source: local.ChangeSourceRemote,
Expand All @@ -76,13 +63,12 @@ var localPullCmd = &cobra.Command{
}
return nil
})
err = wg.Wait()
s := local.NewSyncManager(cmd.Context(), client, syncFlags.parallelism, syncFlags.presign)
err = s.Sync(idx.LocalPath(), newBase, c)
if err != nil {
DieErr(err)
}

s := local.NewSyncManager(cmd.Context(), client, syncFlags.parallelism, syncFlags.presign)
err = s.Sync(idx.LocalPath(), newBase, c)
err = wg.Wait()
if err != nil {
DieErr(err)
}
Expand Down
20 changes: 5 additions & 15 deletions cmd/lakectl/cmd/local_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,13 @@ import (
"golang.org/x/sync/errgroup"
)

const (
localStatusMinArgs = 0
localStatusMaxArgs = 1
)

var localStatusCmd = &cobra.Command{
Use: "status [directory]",
Short: "show modifications (both remote and local) to the directory and the remote location it tracks",
Args: cobra.RangeArgs(localStatusMinArgs, localStatusMaxArgs),
Args: localDefaultArgsRange,
Run: func(cmd *cobra.Command, args []string) {
dir := "."
if len(args) > 0 {
dir = args[0]
}
abs, err := filepath.Abs(dir)
_, localPath := getLocalArgs(args, false)
abs, err := filepath.Abs(localPath)
if err != nil {
DieErr(err)
}
Expand All @@ -45,8 +37,6 @@ var localStatusCmd = &cobra.Command{
DieErr(err)
}
remoteBase := remote.WithRef(idx.AtHead)
fmt.Printf("diff 'local://%s' <--> '%s'...\n", idx.LocalPath(), remoteBase)

client := getClient()
c := localDiff(cmd.Context(), client, remoteBase, idx.LocalPath())

Expand All @@ -56,7 +46,7 @@ var localStatusCmd = &cobra.Command{
d := make(chan api.Diff, maxDiffPageSize)
var wg errgroup.Group
wg.Go(func() error {
return diff.StreamRepositoryDiffs(cmd.Context(), client, remoteBase.Repository, remoteBase.Ref, remote.Ref, swag.StringValue(remoteBase.Path), d, false)
return diff.StreamRepositoryDiffs(cmd.Context(), client, remoteBase, remote, swag.StringValue(remoteBase.Path), d, false)
})

var changes local.Changes
Expand All @@ -75,7 +65,7 @@ var localStatusCmd = &cobra.Command{
DieErr(err)
}

c = c.MergeWith(changes)
c = c.MergeWith(changes, local.MergeStrategyNone)
}

if len(c) == 0 {
Expand Down
Loading
Loading