Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
fix s3 walk directory
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed Feb 24, 2021
1 parent 578be7f commit e069dd5
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 16 deletions.
22 changes: 8 additions & 14 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,10 @@ func newS3Storage(backend *backup.S3, opts *ExternalStorageOptions) (*S3Storage,
return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "Bucket %s is not accessible: %v", qs.Bucket, err)
}
}
if len(qs.Prefix) > 0 {
qs.Prefix += "/"
}

qs.Prefix += "/"
return &S3Storage{
session: ses,
svc: c,
Expand Down Expand Up @@ -378,28 +380,20 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
if opt == nil {
opt = &WalkOption{}
}

// NOTE: leave prefix empty if subDir is not set. Else, s3 will return empty result!
prefix := ""
if len(opt.SubDir) > 0 {
prefix = rs.options.Prefix + opt.SubDir
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
prefix := rs.options.Prefix + opt.SubDir
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}

maxKeys := int64(1000)
if opt.ListCount > 0 {
maxKeys = opt.ListCount
}

req := &s3.ListObjectsInput{
Bucket: aws.String(rs.options.Bucket),
Prefix: aws.String(prefix),
MaxKeys: aws.Int64(maxKeys),
}
if len(prefix) > 0 {
req.Prefix = aws.String(prefix)
}

for {
// FIXME: We can't use ListObjectsV2, it is not universally supported.
// (Ceph RGW supported ListObjectsV2 since v15.1.0, released 2020 Jan 30th)
Expand Down
76 changes: 74 additions & 2 deletions pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ func (s *s3Suite) TestWalkDir(c *C) {
}, nil
}).
After(firstCall)
s.s3.EXPECT().
thirdCall := s.s3.EXPECT().
ListObjectsWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
c.Assert(aws.StringValue(input.Marker), Equals, aws.StringValue(contents[3].Key))
Expand All @@ -881,6 +881,31 @@ func (s *s3Suite) TestWalkDir(c *C) {
}, nil
}).
After(secondCall)
forthCall := s.s3.EXPECT().
ListObjectsWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
c.Assert(aws.StringValue(input.Bucket), Equals, "bucket")
c.Assert(aws.StringValue(input.Prefix), Equals, "prefix/")
c.Assert(aws.StringValue(input.Marker), Equals, "")
c.Assert(aws.Int64Value(input.MaxKeys), Equals, int64(4))
c.Assert(aws.StringValue(input.Delimiter), Equals, "")
return &s3.ListObjectsOutput{
IsTruncated: aws.Bool(true),
Contents: contents[:4],
}, nil
}).
After(thirdCall)
s.s3.EXPECT().
ListObjectsWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
c.Assert(aws.StringValue(input.Marker), Equals, aws.StringValue(contents[3].Key))
c.Assert(aws.Int64Value(input.MaxKeys), Equals, int64(4))
return &s3.ListObjectsOutput{
IsTruncated: aws.Bool(false),
Contents: contents[4:],
}, nil
}).
After(forthCall)

// Ensure we receive the items in order.
i := 0
Expand All @@ -897,6 +922,23 @@ func (s *s3Suite) TestWalkDir(c *C) {
)
c.Assert(err, IsNil)
c.Assert(i, Equals, len(contents))

// test with empty subDir
i = 0
err = s.storage.WalkDir(
ctx,
&WalkOption{ListCount: 4},
func(path string, size int64) error {
comment := Commentf("index = %d", i)
c.Assert("prefix/"+path, Equals, *contents[i].Key, comment)
c.Assert(size, Equals, *contents[i].Size, comment)
i++
return nil
},
)
c.Assert(err, IsNil)
c.Assert(i, Equals, len(contents))

}

// TestWalkDirBucket checks WalkDir retrieves all directory content under a bucket.
Expand Down Expand Up @@ -927,7 +969,7 @@ func (s *s3SuiteCustom) TestWalkDirWithEmptyPrefix(c *C) {
Size: aws.Int64(27499),
},
}
s3API.EXPECT().
firstCall := s3API.EXPECT().
ListObjectsWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
c.Assert(aws.StringValue(input.Bucket), Equals, "bucket")
Expand All @@ -940,6 +982,20 @@ func (s *s3SuiteCustom) TestWalkDirWithEmptyPrefix(c *C) {
Contents: contents,
}, nil
})
s3API.EXPECT().
ListObjectsWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
c.Assert(aws.StringValue(input.Bucket), Equals, "bucket")
c.Assert(aws.StringValue(input.Prefix), Equals, "sp/")
c.Assert(aws.StringValue(input.Marker), Equals, "")
c.Assert(aws.Int64Value(input.MaxKeys), Equals, int64(2))
c.Assert(aws.StringValue(input.Delimiter), Equals, "")
return &s3.ListObjectsOutput{
IsTruncated: aws.Bool(false),
Contents: contents[:1],
}, nil
}).
After(firstCall)

// Ensure we receive the items in order.
i := 0
Expand All @@ -956,4 +1012,20 @@ func (s *s3SuiteCustom) TestWalkDirWithEmptyPrefix(c *C) {
)
c.Assert(err, IsNil)
c.Assert(i, Equals, len(contents))

// test with non-empty sub-dir
i = 0
err = storage.WalkDir(
ctx,
&WalkOption{SubDir: "sp", ListCount: 2},
func(path string, size int64) error {
comment := Commentf("index = %d", i)
c.Assert(path, Equals, *contents[i].Key, comment)
c.Assert(size, Equals, *contents[i].Size, comment)
i++
return nil
},
)
c.Assert(err, IsNil)
c.Assert(i, Equals, 1)
}

0 comments on commit e069dd5

Please sign in to comment.