From 3c4b0ab3bf2a622229990ba1cff3b00dcaa67ee3 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Wed, 3 Mar 2021 15:40:22 +0800 Subject: [PATCH] storage: fix s3 walk directory (#750) (#756) Signed-off-by: ti-srebot --- pkg/storage/s3.go | 23 +++++-------- pkg/storage/s3_test.go | 75 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 82 insertions(+), 16 deletions(-) diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 69df47120..700887352 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -9,6 +9,7 @@ import ( "io" "io/ioutil" "net/url" + "path" "regexp" "strconv" "strings" @@ -273,8 +274,10 @@ func newS3Storage(backend *backup.S3, opts *ExternalStorageOptions) (*S3Storage, return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "Bucket %s is not accessible: %v", qs.Bucket, err) } } + if len(qs.Prefix) > 0 && !strings.HasSuffix(qs.Prefix, "/") { + qs.Prefix += "/" + } - qs.Prefix += "/" return &S3Storage{ session: ses, svc: c, @@ -372,28 +375,20 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin if opt == nil { opt = &WalkOption{} } - - // NOTE: leave prefix empty if subDir is not set. Else, s3 will return empty result! - prefix := "" - if len(opt.SubDir) > 0 { - prefix = rs.options.Prefix + opt.SubDir - if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") { - prefix += "/" - } + prefix := path.Join(rs.options.Prefix, opt.SubDir) + if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") { + prefix += "/" } - maxKeys := int64(1000) if opt.ListCount > 0 { maxKeys = opt.ListCount } - req := &s3.ListObjectsInput{ Bucket: aws.String(rs.options.Bucket), + Prefix: aws.String(prefix), MaxKeys: aws.Int64(maxKeys), } - if len(prefix) > 0 { - req.Prefix = aws.String(prefix) - } + for { // FIXME: We can't use ListObjectsV2, it is not universally supported. // (Ceph RGW supported ListObjectsV2 since v15.1.0, released 2020 Jan 30th) diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index b9e9a04e8..36eeed407 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -779,7 +779,7 @@ func (s *s3Suite) TestWalkDir(c *C) { }, nil }). After(firstCall) - s.s3.EXPECT(). + thirdCall := s.s3.EXPECT(). ListObjectsWithContext(ctx, gomock.Any()). DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) { c.Assert(aws.StringValue(input.Marker), Equals, aws.StringValue(contents[3].Key)) @@ -790,6 +790,31 @@ func (s *s3Suite) TestWalkDir(c *C) { }, nil }). After(secondCall) + fourthCall := s.s3.EXPECT(). + ListObjectsWithContext(ctx, gomock.Any()). + DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) { + c.Assert(aws.StringValue(input.Bucket), Equals, "bucket") + c.Assert(aws.StringValue(input.Prefix), Equals, "prefix/") + c.Assert(aws.StringValue(input.Marker), Equals, "") + c.Assert(aws.Int64Value(input.MaxKeys), Equals, int64(4)) + c.Assert(aws.StringValue(input.Delimiter), Equals, "") + return &s3.ListObjectsOutput{ + IsTruncated: aws.Bool(true), + Contents: contents[:4], + }, nil + }). + After(thirdCall) + s.s3.EXPECT(). + ListObjectsWithContext(ctx, gomock.Any()). + DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) { + c.Assert(aws.StringValue(input.Marker), Equals, aws.StringValue(contents[3].Key)) + c.Assert(aws.Int64Value(input.MaxKeys), Equals, int64(4)) + return &s3.ListObjectsOutput{ + IsTruncated: aws.Bool(false), + Contents: contents[4:], + }, nil + }). + After(fourthCall) // Ensure we receive the items in order. i := 0 @@ -806,6 +831,22 @@ func (s *s3Suite) TestWalkDir(c *C) { ) c.Assert(err, IsNil) c.Assert(i, Equals, len(contents)) + + // test with empty subDir + i = 0 + err = s.storage.WalkDir( + ctx, + &WalkOption{ListCount: 4}, + func(path string, size int64) error { + comment := Commentf("index = %d", i) + c.Assert("prefix/"+path, Equals, *contents[i].Key, comment) + c.Assert(size, Equals, *contents[i].Size, comment) + i++ + return nil + }, + ) + c.Assert(err, IsNil) + c.Assert(i, Equals, len(contents)) } // TestWalkDirBucket checks WalkDir retrieves all directory content under a bucket. @@ -836,7 +877,7 @@ func (s *s3SuiteCustom) TestWalkDirWithEmptyPrefix(c *C) { Size: aws.Int64(27499), }, } - s3API.EXPECT(). + firstCall := s3API.EXPECT(). ListObjectsWithContext(ctx, gomock.Any()). DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) { c.Assert(aws.StringValue(input.Bucket), Equals, "bucket") @@ -849,6 +890,20 @@ func (s *s3SuiteCustom) TestWalkDirWithEmptyPrefix(c *C) { Contents: contents, }, nil }) + s3API.EXPECT(). + ListObjectsWithContext(ctx, gomock.Any()). + DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) { + c.Assert(aws.StringValue(input.Bucket), Equals, "bucket") + c.Assert(aws.StringValue(input.Prefix), Equals, "sp/") + c.Assert(aws.StringValue(input.Marker), Equals, "") + c.Assert(aws.Int64Value(input.MaxKeys), Equals, int64(2)) + c.Assert(aws.StringValue(input.Delimiter), Equals, "") + return &s3.ListObjectsOutput{ + IsTruncated: aws.Bool(false), + Contents: contents[:1], + }, nil + }). + After(firstCall) // Ensure we receive the items in order. i := 0 @@ -865,4 +920,20 @@ func (s *s3SuiteCustom) TestWalkDirWithEmptyPrefix(c *C) { ) c.Assert(err, IsNil) c.Assert(i, Equals, len(contents)) + + // test with non-empty sub-dir + i = 0 + err = storage.WalkDir( + ctx, + &WalkOption{SubDir: "sp", ListCount: 2}, + func(path string, size int64) error { + comment := Commentf("index = %d", i) + c.Assert(path, Equals, *contents[i].Key, comment) + c.Assert(size, Equals, *contents[i].Size, comment) + i++ + return nil + }, + ) + c.Assert(err, IsNil) + c.Assert(i, Equals, 1) }