Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

storage: fix s3 storage walk-dir with empty sub-dir #713

Merged
merged 5 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,20 +378,28 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
if opt == nil {
opt = &WalkOption{}
}
prefix := rs.options.Prefix + opt.SubDir
if !strings.HasSuffix(prefix, "/") {
prefix += "/"

// NOTE: leave prefix empty if subDir is not set. Else, s3 will return empty result!
prefix := ""
if len(opt.SubDir) > 0 {
prefix = rs.options.Prefix + opt.SubDir
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
}

maxKeys := int64(1000)
if opt.ListCount > 0 {
maxKeys = opt.ListCount
}

req := &s3.ListObjectsInput{
Bucket: aws.String(rs.options.Bucket),
Prefix: aws.String(prefix),
MaxKeys: aws.Int64(maxKeys),
}
if len(prefix) > 0 {
req.Prefix = aws.String(prefix)
}
for {
// FIXME: We can't use ListObjectsV2, it is not universally supported.
// (Ceph RGW supported ListObjectsV2 since v15.1.0, released 2020 Jan 30th)
Expand Down
66 changes: 65 additions & 1 deletion pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ type s3Suite struct {
storage *S3Storage
}

var _ = Suite(&s3Suite{})
type s3SuiteCustom struct{}

var (
_ = Suite(&s3Suite{})
_ = Suite(&s3SuiteCustom{})
)

// FIXME: Cannot use the real SetUpTest/TearDownTest to set up the mock
// otherwise the mock error will be ignored.
Expand Down Expand Up @@ -893,3 +898,62 @@ func (s *s3Suite) TestWalkDir(c *C) {
c.Assert(err, IsNil)
c.Assert(i, Equals, len(contents))
}

// TestWalkDirBucket checks WalkDir retrieves all directory content under a bucket.
func (s *s3SuiteCustom) TestWalkDirWithEmptyPrefix(c *C) {
controller := gomock.NewController(c)
s3API := mock.NewMockS3API(controller)
storage := NewS3StorageForTest(
s3API,
&backup.S3{
Region: "us-west-2",
Bucket: "bucket",
Prefix: "",
Acl: "acl",
Sse: "sse",
StorageClass: "sc",
},
)
defer controller.Finish()
ctx := aws.BackgroundContext()

contents := []*s3.Object{
{
Key: aws.String("sp/.gitignore"),
Size: aws.Int64(437),
},
{
Key: aws.String("prefix/sp/01.jpg"),
Size: aws.Int64(27499),
},
}
s3API.EXPECT().
ListObjectsWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
c.Assert(aws.StringValue(input.Bucket), Equals, "bucket")
c.Assert(aws.StringValue(input.Prefix), Equals, "")
c.Assert(aws.StringValue(input.Marker), Equals, "")
c.Assert(aws.Int64Value(input.MaxKeys), Equals, int64(2))
c.Assert(aws.StringValue(input.Delimiter), Equals, "")
return &s3.ListObjectsOutput{
IsTruncated: aws.Bool(false),
Contents: contents,
}, nil
})

// Ensure we receive the items in order.
i := 0
err := storage.WalkDir(
ctx,
&WalkOption{SubDir: "", ListCount: 2},
func(path string, size int64) error {
comment := Commentf("index = %d", i)
c.Assert(path, Equals, *contents[i].Key, comment)
c.Assert(size, Equals, *contents[i].Size, comment)
i++
return nil
},
)
c.Assert(err, IsNil)
c.Assert(i, Equals, len(contents))
}