Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage/dataflux): add worksteal algorithm to fast-listing #10913

Merged
merged 12 commits into from
Sep 29, 2024
120 changes: 98 additions & 22 deletions storage/dataflux/fast_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"errors"
"fmt"
"runtime"
"strings"

"cloud.google.com/go/storage"
"golang.org/x/sync/errgroup"
Expand All @@ -41,53 +43,80 @@ type ListerInput struct {
// BucketName is the name of the bucket to list objects from. Required.
BucketName string

// Parallelism is number of parallel workers to use for listing. Default value is 10x number of available CPU. Optional.
// Parallelism is number of parallel workers to use for listing.
// Default value is 10x number of available CPU. Optional.
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
Parallelism int

// BatchSize is the number of objects to list. Default value returns all objects at once. Optional.
// The number of objects returned will be rounded up to a multiple of gcs page size.
// BatchSize is the number of objects to list. Default value returns
// all objects at once. The number of objects returned will be
// rounded up to a multiple of gcs page size. Optional.
BatchSize int

// Query is the query to filter objects for listing. Default value is nil. Optional.
// Use ProjectionNoACL for faster listing. Including ACLs increases latency while fetching objects.
// Query is the query to filter objects for listing. Default value is nil.
// Use ProjectionNoACL for faster listing. Including ACLs increases
// latency while fetching objects. Optional.
Query storage.Query

// SkipDirectoryObjects is to indicate whether to list directory objects. Default value is false. Optional.
// SkipDirectoryObjects is to indicate whether to list directory objects.
// Default value is false. Optional.
SkipDirectoryObjects bool
}

// Lister is used for interacting with Dataflux fast-listing.
// The caller should initialize it with NewLister() instead of creating it directly.
// Lister is used for interacting with Dataflux fast-listing. The caller should
// initialize it with NewLister() instead of creating it directly.
type Lister struct {
// method indicates the listing method(open, sequential, worksteal) to be used for listing.
// method indicates the listing method(open, sequential, worksteal) to
// be used for listing.
method listingMethod

// pageToken is the token to use for sequential listing.
pageToken string

// bucket is the bucket handle to list objects from.
bucket *storage.BucketHandle

// batchSize is the number of objects to list.
batchSize int

// parallelism is number of parallel workers to use for listing.
parallelism int

// query is the query to filter objects for listing.
query storage.Query

// pageToken is the token to use for sequential listing.
pageToken string

// ranges is the channel to store the start and end ranges to be listed
// by the workers in worksteal listing.
ranges chan *listRange

// skipDirectoryObjects is to indicate whether to list directory objects.
skipDirectoryObjects bool
}

// NewLister creates a new dataflux Lister to list objects in the give bucket.
func NewLister(c *storage.Client, in *ListerInput) *Lister {
bucket := c.Bucket(in.BucketName)

// If parallelism is not given, set default value to 10x the number of
// available CPU.
if in.Parallelism == 0 {
in.Parallelism = runtime.NumCPU() * 10
}
// Initialize range channel with entire namespace of object for given
// prefix, startoffset and endoffset. For the default range to list is
// entire namespace, start and end will be empty.
rangeChannel := make(chan *listRange, in.Parallelism*2)
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
start, end := updateStartEndOffset(in.Query.StartOffset, in.Query.EndOffset, in.Query.Prefix)
rangeChannel <- &listRange{startRange: start, endRange: end}

lister := &Lister{
method: open,
parallelism: in.Parallelism,
pageToken: "",
bucket: bucket,
batchSize: in.BatchSize,
query: in.Query,
skipDirectoryObjects: in.SkipDirectoryObjects,
ranges: rangeChannel,
}
return lister
}
Expand All @@ -102,13 +131,15 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
var results []*storage.ObjectAttrs
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Errgroup takes care of running both methods in parallel. As soon as one of the method
// is complete, the running method also stops.
// Errgroup takes care of running both methods in parallel. As soon as one of
// the method is complete, the running method also stops.
g, childCtx := errgroup.WithContext(ctx)

// To start listing method is Open and runs both worksteal and sequential listing in parallel.
// The method which completes first is used for all subsequent runs.
// To start listing method is Open and runs both worksteal and sequential listing
// in parallel. The method which completes first is used for all subsequent runs.

// TODO: Run worksteal listing when method is Open or WorkSteal.

// Run sequential listing when method is Open or Sequential.
if c.method != worksteal {

Expand All @@ -118,8 +149,8 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
countError++
return fmt.Errorf("error in running sequential listing: %w", err)
}
// If sequential listing completes first, set method to sequential listing and ranges to nil.
// The nextToken will be used to continue sequential listing.
// If sequential listing completes first, set method to sequential listing
// and ranges to nil. The nextToken will be used to continue sequential listing.
results = objects
c.pageToken = nextToken
c.method = sequential
Expand All @@ -135,13 +166,15 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
// If the error is not context.Canceled, then return error instead of falling back
// to the other method. This is so that the error can be fixed and user can take
// advantage of fast-listing.
// As one of the listing method completes, it is expected to cancel context for the other method.
// If both sequential and worksteal listing fail due to context canceled, only then return error.
// As one of the listing method completes, it is expected to cancel context for the
// only then return error. other method. If both sequential and worksteal listing
// fail due to context canceled, return error.
if err != nil && (!errors.Is(err, context.Canceled) || countError > 1) {
return nil, fmt.Errorf("failed waiting for sequntial and work steal lister : %w", err)
}

// If ranges for worksteal and pageToken for sequential listing is empty, then listing is complete.
// If ranges for worksteal and pageToken for sequential listing is empty, then
// listing is complete.
if c.pageToken == "" {
return results, iterator.Done
}
Expand All @@ -150,6 +183,49 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)

// Close closes the range channel of the Lister.
func (c *Lister) Close() {
if c.ranges != nil {
close(c.ranges)
}
}

// TODO: Close range channel for worksteal lister.
// updateStartEndOffset updates start and end offset based on prefix.
// If a prefix is given, adjust start and end value such that it lists
// objects with the given prefix. updateStartEndOffset assumes prefix will
// be added to the object name while listing objects in worksteal algorithm.
//
// For example:
// start = "abc", end = "prefix_a", prefix = "prefix",
//
// end will change to "_a", prefix will be added in worksteal algorithm.
// "abc" is lexicographically smaller than "prefix". So start will be the first
// object with the given prefix.
//
// Therefore start will change to ""(empty string) and end to "_a" .
func updateStartEndOffset(start, end, prefix string) (string, string) {
if prefix == "" {
return start, end
}
if start != "" && end != "" && start >= end {
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
return start, start
}
if start != "" {
if start <= prefix {
start = ""
} else if strings.HasPrefix(start, prefix) {
start = start[len(prefix):]
} else {
return start, start
}
}

if end != "" {
if len(end) > len(prefix) && strings.HasPrefix(end, prefix) {
end = end[len(prefix):]
} else if end > prefix {
end = ""
} else {
return end, end
}
}
return start, end
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
}
194 changes: 194 additions & 0 deletions storage/dataflux/fast_list_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataflux

import (
"runtime"
"testing"

"cloud.google.com/go/storage"
)

func TestUpdateStartEndOffset(t *testing.T) {
testcase := []struct {
desc string
start string
end string
prefix string
wantStart string
wantEnd string
}{
// List all objects with the given prefix.
{
desc: "start and end are empty",
start: "",
end: "",
prefix: "pre",
wantStart: "",
wantEnd: "",
},
{
desc: "start is longer and lexicographically before prefix",
start: "abcqre",
end: "",
prefix: "pre",
wantStart: "",
wantEnd: "",
},
{
desc: "start value same as prefix",
start: "pre",
end: "",
prefix: "pre",
wantStart: "",
wantEnd: "",
},
{
desc: "lexicographically start comes before prefix and end after prefix",
start: "abc",
end: "xyz",
prefix: "pre",
wantStart: "",
wantEnd: "",
},
// List bounded objects within the given prefix.
{
desc: "start value contains prefix",
start: "pre_a",
end: "",
prefix: "pre",
wantStart: "_a",
wantEnd: "",
},
{
desc: "end value contains prefix",
start: "",
end: "pre_x",
prefix: "pre",
wantStart: "",
wantEnd: "_x",
},
// With empty prefix, start and end will not be affected.
{
desc: "prefix is empty",
start: "abc",
end: "xyz",
prefix: "",
wantStart: "abc",
wantEnd: "xyz",
},
{
desc: "start is lexicographically higher than end",
start: "xyz",
end: "abc",
prefix: "",
wantStart: "xyz",
wantEnd: "abc",
},
// Cases where no objects will be listed when prefix is given.
{
desc: "end is same as prefix",
start: "",
end: "pre",
prefix: "pre",
wantStart: "pre",
wantEnd: "pre",
},
{
desc: "start is lexicographically higher than end with prefix",
start: "xyz",
end: "abc",
prefix: "pre",
wantStart: "xyz",
wantEnd: "xyz",
},
{
desc: "start is lexicographically higher than prefix",
start: "xyz",
end: "",
prefix: "pre",
wantStart: "xyz",
wantEnd: "xyz",
},
}

for _, tc := range testcase {
t.Run(tc.desc, func(t *testing.T) {
gotStart, gotEnd := updateStartEndOffset(tc.start, tc.end, tc.prefix)
if gotStart != tc.wantStart || gotEnd != tc.wantEnd {
t.Errorf("updateStartEndOffset(%q, %q, %q) got = (%q, %q), want = (%q, %q)", tc.start, tc.end, tc.prefix, gotStart, gotEnd, tc.wantStart, tc.wantEnd)
}
})
}
}

func TestNewLister(t *testing.T) {
gcs := &storage.Client{}
bucketName := "test-bucket"
testcase := []struct {
desc string
query storage.Query
parallelism int
wantStart string
wantEnd string
wantParallelism int
}{
{
desc: "start and end are empty",
query: storage.Query{Prefix: "pre"},
parallelism: 1,
wantStart: "",
wantEnd: "",
wantParallelism: 1,
},
{
desc: "start is longer than prefix",
query: storage.Query{Prefix: "pre", StartOffset: "pre_a"},
parallelism: 1,
wantStart: "_a",
wantEnd: "",
wantParallelism: 1,
},
{
desc: "start and end are empty",
query: storage.Query{Prefix: "pre"},
parallelism: 0,
wantStart: "",
wantEnd: "",
wantParallelism: 10 * runtime.NumCPU(),
},
}

for _, tc := range testcase {
t.Run(tc.desc, func(t *testing.T) {
in := ListerInput{
BucketName: bucketName,
BatchSize: 0,
Query: tc.query,
Parallelism: tc.parallelism,
}
df := NewLister(gcs, &in)
defer df.Close()
if len(df.ranges) != 1 {
t.Errorf("NewLister(%v, %v %v, %v) got len of ranges = %v, want = %v", bucketName, 1, 0, tc.query, len(df.ranges), 1)
}
ranges := <-df.ranges
if df.method != open || df.pageToken != "" || ranges.startRange != tc.wantStart || ranges.endRange != tc.wantEnd || df.parallelism != tc.wantParallelism {
t.Errorf("NewLister(%q, %d, %d, %v) got = (method: %v, token: %q, start: %q, end: %q, parallelism: %d), want = (method: %v, token: %q, start: %q, end: %q, parallelism: %d)", bucketName, 1, 0, tc.query, df.method, df.pageToken, ranges.startRange, ranges.endRange, df.parallelism, open, "", tc.wantStart, tc.wantEnd, tc.wantParallelism)
}

})
}
}
Loading
Loading