Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement lakectl local commit #6369

Merged
merged 2 commits into from
Aug 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var commitCmd = &cobra.Command{
},
}

func getKV(cmd *cobra.Command, name string) (map[string]string, error) {
func getKV(cmd *cobra.Command, name string) (map[string]string, error) { //nolint:unparam
kvList, err := cmd.Flags().GetStringSlice(name)
if err != nil {
return nil, err
Expand Down
13 changes: 13 additions & 0 deletions cmd/lakectl/cmd/common_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,19 @@ func MustParseRefURI(name, s string) *uri.URI {
return u
}

func MustParseRefWithPathURI(name, s string) *uri.URI {
u, err := uri.ParseWithBaseURI(s, baseURI)
if err != nil {
DieFmt("Invalid '%s': %s", name, err)
}
refURI := u
refURI.Path = nil
if !refURI.IsRef() {
DieFmt("Invalid %s: %s", name, uri.ErrInvalidRefURI)
}
return u
}

func MustParseBranchURI(name, s string) *uri.URI {
u, err := uri.ParseWithBaseURI(s, baseURI)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/lakectl/cmd/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func getLocalSyncFlags(cmd *cobra.Command) syncFlags {
func getLocalArgs(args []string, requireRemote bool, considerGitRoot bool) (remote *uri.URI, localPath string) {
idx := 0
if requireRemote {
remote = MustParseRefURI("path", args[0])
remote = MustParseRefWithPathURI("path", args[0])
idx += 1
}

Expand All @@ -76,7 +76,7 @@ func getLocalArgs(args []string, requireRemote bool, considerGitRoot bool) (remo
}

func localDiff(ctx context.Context, client api.ClientWithResponsesInterface, remote *uri.URI, path string) local.Changes {
fmt.Printf("diff 'local://%s' <--> '%s'...\n", path, remote)
fmt.Printf("\ndiff 'local://%s' <--> '%s'...\n", path, remote)
currentRemoteState := make(chan api.ObjectStats, maxDiffPageSize)
var wg errgroup.Group
wg.Go(func() error {
Expand Down
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local_checkout.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func checkout(ctx context.Context, localPath string, syncFlags syncFlags, specif
newHead := resolveCommitOrDie(ctx, client, newRemote.Repository, newRemote.Ref)
newBase := newRemote.WithRef(newHead)
// write new index
_, err = local.WriteIndex(idx.LocalPath(), newRemote, newHead)
_, err = local.WriteIndex(idx.LocalPath(), remote, newHead)
if err != nil {
DieErr(err)
}
Expand Down
171 changes: 171 additions & 0 deletions cmd/lakectl/cmd/local_commit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package cmd

import (
"fmt"
"net/http"
"strings"

"github.com/go-openapi/swag"
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/api"
"github.com/treeverse/lakefs/pkg/diff"
"github.com/treeverse/lakefs/pkg/git"
"github.com/treeverse/lakefs/pkg/local"
"github.com/treeverse/lakefs/pkg/uri"
"golang.org/x/sync/errgroup"
)

const (
localCommitAllowEmptyMessage = "allow-empty-message"
localCommitMessageFlag = "message"
)

func findConflicts(changes local.Changes) (conflicts []string) {
for _, c := range changes {
if c.Type == local.ChangeTypeConflict {
conflicts = append(conflicts, c.Path)
}
}
return
}

var localCommitCmd = &cobra.Command{
Use: "commit [directory]",
Short: "Commit changes from local directory to the lakeFS branch it tracks.",
Args: localDefaultArgsRange,
Run: func(cmd *cobra.Command, args []string) {
_, localPath := getLocalArgs(args, false, false)
syncFlags := getLocalSyncFlags(cmd)
message := Must(cmd.Flags().GetString(localCommitMessageFlag))
allowEmptyMessage := Must(cmd.Flags().GetBool(localCommitAllowEmptyMessage))
if message == "" && !allowEmptyMessage {
DieFmt("Commit message empty! To commit with empty message pass --%s flag", localCommitAllowEmptyMessage)
}
idx, err := local.ReadIndex(localPath)
if err != nil {
DieErr(err)
}

remote, err := idx.GetCurrentURI()
if err != nil {
DieErr(err)
}

fmt.Printf("\nGetting branch: %s\n", remote.Ref)
client := getClient()
resp, err := client.GetBranchWithResponse(cmd.Context(), remote.Repository, remote.Ref)
DieOnErrorOrUnexpectedStatusCode(resp, err, http.StatusOK)

// Diff local with current head
baseRemote := remote.WithRef(idx.AtHead)
changes := localDiff(cmd.Context(), client, baseRemote, idx.LocalPath())

branchCommit := resp.JSON200.CommitId
if branchCommit != idx.AtHead { // check for changes and conflicts with new head
newRemote := remote.WithRef(branchCommit)
fmt.Printf("\ndiff '%s' <--> '%s'...\n", newRemote, remote)
d := make(chan api.Diff, maxDiffPageSize)
var wg errgroup.Group
wg.Go(func() error {
return diff.StreamRepositoryDiffs(cmd.Context(), client, baseRemote, newRemote, swag.StringValue(remote.Path), d, false)
})

var remoteChanges local.Changes
wg.Go(func() error {
for dif := range d {
remoteChanges = append(remoteChanges, &local.Change{
Source: local.ChangeSourceRemote,
Path: strings.TrimPrefix(dif.Path, remote.GetPath()),
Type: local.ChangeTypeFromString(dif.Type),
})
}
return nil
})
err = wg.Wait()
if err != nil {
DieErr(err)
}

changes = changes.MergeWith(remoteChanges, local.MergeStrategyNone)
conflicts := findConflicts(changes)
switch {
case len(changes) == 0:
fmt.Println("Local directory and remote branch are synced")
return
case len(conflicts) > 0:
DieFmt("Conflicts found between local directory and remote in the following files:\n%s", strings.Join(conflicts, "\n"))
}
}

// sync changes
c := make(chan *local.Change, filesChanSize)
go func() {
defer close(c)
for _, change := range changes {
c <- change
}
}()
s := local.NewSyncManager(cmd.Context(), client, syncFlags.parallelism, syncFlags.presign)
err = s.Sync(idx.LocalPath(), remote, c)
if err != nil {
DieErr(err)
}

fmt.Printf("\nFinished syncing changes. Perform commit on branch...\n")
// add kv pairs if any
kvPairs, err := getKV(cmd, metaFlagName)
if err != nil {
DieErr(err)
}
// add git context to kv pairs, if any
if git.IsRepository(idx.LocalPath()) {
gitRef, err := git.CurrentCommit(idx.LocalPath())
if err == nil {
md, err := git.MetadataFor(idx.LocalPath(), gitRef)
if err == nil {
for k, v := range md {
kvPairs[k] = v
}
}
}
}

// commit!
response, err := client.CommitWithResponse(cmd.Context(), remote.Repository, remote.Ref, &api.CommitParams{}, api.CommitJSONRequestBody{
Message: message,
Metadata: &api.CommitCreation_Metadata{
AdditionalProperties: kvPairs,
},
})
commit := response.JSON201
if commit == nil {
Die("Bad response from server", 1)
}

branchURI := &uri.URI{
Repository: remote.Repository,
Ref: remote.Ref,
}
DieOnErrorOrUnexpectedStatusCode(response, err, http.StatusCreated)
Write(commitCreateTemplate, struct {
Branch *uri.URI
Commit *api.Commit
}{Branch: branchURI, Commit: commit})

newHead := response.JSON201.Id
_, err = local.WriteIndex(idx.LocalPath(), remote, newHead)
if err != nil {
DieErr(err)
}
},
}

//nolint:gochecknoinits
func init() {
localCommitCmd.Flags().StringP(localCommitMessageFlag, "m", "", "Commit message")
localCommitCmd.Flags().Bool(localCommitAllowEmptyMessage, false, "Allow commit with empty message")
localCommitCmd.MarkFlagsMutuallyExclusive(localCommitMessageFlag, localCommitAllowEmptyMessage)
localCommitCmd.Flags().StringSlice(metaFlagName, []string{}, "key value pair in the form of key=value")
withLocalSyncFlags(localCommitCmd)
localCmd.AddCommand(localCommitCmd)
}
22 changes: 22 additions & 0 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -2615,6 +2615,28 @@ lakectl local clone <path uri> [directory] [flags]



### lakectl local commit

Commit changes from local directory to the lakeFS branch it tracks.

```
lakectl local commit [directory] [flags]
```

#### Options
{:.no_toc}

```
--allow-empty-message Allow commit with empty message
-h, --help help for commit
-m, --message string Commit message
--meta strings key value pair in the form of key=value
-p, --parallelism int Max concurrent operations to perform (default 25)
--presign Use pre-signed URLs when downloading/uploading data (recommended) (default true)
```



### lakectl local help

Help about any command
Expand Down
1 change: 1 addition & 0 deletions pkg/git/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
)

var (
ErrRemoteNotFound = errors.New("remote not found")
ErrNotARepository = errors.New("not a git repository")
ErrNoGit = errors.New("no git support")
)
Loading
Loading