Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

storage: fix s3 walk directory #750

Merged
merged 6 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"io/ioutil"
"net/url"
"path"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -279,8 +280,10 @@ func newS3Storage(backend *backup.S3, opts *ExternalStorageOptions) (*S3Storage,
return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "Bucket %s is not accessible: %v", qs.Bucket, err)
}
}
if len(qs.Prefix) > 0 {
qs.Prefix += "/"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there is a '/' already?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we just path.Clean it 🤷

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Prefix is parsed from url it can't contains the "/" suffix. See:

br/pkg/storage/parse.go

Lines 69 to 70 in 578be7f

prefix := strings.Trim(u.Path, "/")
s3 := &backup.S3{Bucket: u.Host, Prefix: prefix}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just not want the bare "/" prefix. for other non-empty prefixes, the suffix "/" is needed.
BTW, maybe we shouldn't depend on the Prefix filed to contain the "/" suffix, but in this way, we need to change more code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the user wants to load all data from one bucket?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original pr is try to resolve this issue then. If the input path is the bucket name, we should leave the prefix parameter empty in the ListObject request, otherwise, s3 will return empty result.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

	if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {

What about checking suffix too? In case the API is called from other projects.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

qs.Prefix += "/"
return &S3Storage{
session: ses,
svc: c,
Expand Down Expand Up @@ -378,28 +381,20 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
if opt == nil {
opt = &WalkOption{}
}

// NOTE: leave prefix empty if subDir is not set. Else, s3 will return empty result!
prefix := ""
if len(opt.SubDir) > 0 {
prefix = rs.options.Prefix + opt.SubDir
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
prefix := path.Join(rs.options.Prefix, opt.SubDir)
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}

maxKeys := int64(1000)
if opt.ListCount > 0 {
maxKeys = opt.ListCount
}

req := &s3.ListObjectsInput{
Bucket: aws.String(rs.options.Bucket),
Prefix: aws.String(prefix),
MaxKeys: aws.Int64(maxKeys),
}
if len(prefix) > 0 {
req.Prefix = aws.String(prefix)
}

for {
// FIXME: We can't use ListObjectsV2, it is not universally supported.
// (Ceph RGW supported ListObjectsV2 since v15.1.0, released 2020 Jan 30th)
Expand Down
75 changes: 73 additions & 2 deletions pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ func (s *s3Suite) TestWalkDir(c *C) {
}, nil
}).
After(firstCall)
s.s3.EXPECT().
thirdCall := s.s3.EXPECT().
ListObjectsWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
c.Assert(aws.StringValue(input.Marker), Equals, aws.StringValue(contents[3].Key))
Expand All @@ -881,6 +881,31 @@ func (s *s3Suite) TestWalkDir(c *C) {
}, nil
}).
After(secondCall)
forthCall := s.s3.EXPECT().
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo?

Suggested change
forthCall := s.s3.EXPECT().
fourthCall := s.s3.EXPECT().

ListObjectsWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
c.Assert(aws.StringValue(input.Bucket), Equals, "bucket")
c.Assert(aws.StringValue(input.Prefix), Equals, "prefix/")
c.Assert(aws.StringValue(input.Marker), Equals, "")
c.Assert(aws.Int64Value(input.MaxKeys), Equals, int64(4))
c.Assert(aws.StringValue(input.Delimiter), Equals, "")
return &s3.ListObjectsOutput{
IsTruncated: aws.Bool(true),
Contents: contents[:4],
}, nil
}).
After(thirdCall)
s.s3.EXPECT().
ListObjectsWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
c.Assert(aws.StringValue(input.Marker), Equals, aws.StringValue(contents[3].Key))
c.Assert(aws.Int64Value(input.MaxKeys), Equals, int64(4))
return &s3.ListObjectsOutput{
IsTruncated: aws.Bool(false),
Contents: contents[4:],
}, nil
}).
After(forthCall)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
After(forthCall)
After(fourthCall)


// Ensure we receive the items in order.
i := 0
Expand All @@ -897,6 +922,22 @@ func (s *s3Suite) TestWalkDir(c *C) {
)
c.Assert(err, IsNil)
c.Assert(i, Equals, len(contents))

// test with empty subDir
i = 0
err = s.storage.WalkDir(
ctx,
&WalkOption{ListCount: 4},
func(path string, size int64) error {
comment := Commentf("index = %d", i)
c.Assert("prefix/"+path, Equals, *contents[i].Key, comment)
c.Assert(size, Equals, *contents[i].Size, comment)
i++
return nil
},
)
c.Assert(err, IsNil)
c.Assert(i, Equals, len(contents))
}

// TestWalkDirBucket checks WalkDir retrieves all directory content under a bucket.
Expand Down Expand Up @@ -927,7 +968,7 @@ func (s *s3SuiteCustom) TestWalkDirWithEmptyPrefix(c *C) {
Size: aws.Int64(27499),
},
}
s3API.EXPECT().
firstCall := s3API.EXPECT().
ListObjectsWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
c.Assert(aws.StringValue(input.Bucket), Equals, "bucket")
Expand All @@ -940,6 +981,20 @@ func (s *s3SuiteCustom) TestWalkDirWithEmptyPrefix(c *C) {
Contents: contents,
}, nil
})
s3API.EXPECT().
ListObjectsWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
c.Assert(aws.StringValue(input.Bucket), Equals, "bucket")
c.Assert(aws.StringValue(input.Prefix), Equals, "sp/")
c.Assert(aws.StringValue(input.Marker), Equals, "")
c.Assert(aws.Int64Value(input.MaxKeys), Equals, int64(2))
c.Assert(aws.StringValue(input.Delimiter), Equals, "")
return &s3.ListObjectsOutput{
IsTruncated: aws.Bool(false),
Contents: contents[:1],
}, nil
}).
After(firstCall)

// Ensure we receive the items in order.
i := 0
Expand All @@ -956,4 +1011,20 @@ func (s *s3SuiteCustom) TestWalkDirWithEmptyPrefix(c *C) {
)
c.Assert(err, IsNil)
c.Assert(i, Equals, len(contents))

// test with non-empty sub-dir
i = 0
err = storage.WalkDir(
ctx,
&WalkOption{SubDir: "sp", ListCount: 2},
func(path string, size int64) error {
comment := Commentf("index = %d", i)
c.Assert(path, Equals, *contents[i].Key, comment)
c.Assert(size, Equals, *contents[i].Size, comment)
i++
return nil
},
)
c.Assert(err, IsNil)
c.Assert(i, Equals, 1)
}