Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage/dataflux): run worksteal listing parallel to sequential listing #10966

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3ab3831
test case for fast-list
akansha1812 Sep 25, 2024
559ae40
resolve comments
akansha1812 Sep 27, 2024
1cdc956
add object_lister and worksteal to fast_list
akansha1812 Oct 7, 2024
7eb0471
Merge branch 'googleapis:main' into main
akansha1812 Oct 7, 2024
1f05177
add unit tests with emulator
akansha1812 Oct 8, 2024
05b1515
Merge branch 'googleapis:main' into main
akansha1812 Oct 8, 2024
5bed09c
resolve PR errors
akansha1812 Oct 8, 2024
80054bf
reduce numobjects to resolve timeout error
akansha1812 Oct 8, 2024
e33adf5
reduce objects created for timeout error
akansha1812 Oct 8, 2024
b1b69b9
remove env variables for grpc and http
akansha1812 Oct 9, 2024
c2e56f8
run dataflux emulator tests
akansha1812 Oct 9, 2024
c78aaba
Merge branch 'main' into main
akansha1812 Oct 14, 2024
b714222
Merge branch 'main' into main
akansha1812 Oct 21, 2024
4028cb6
resolve comments
akansha1812 Oct 21, 2024
4851cc8
update ranges to nil when sequential listing is faster
akansha1812 Oct 21, 2024
0d494e0
default page size for seq listing is 5000
akansha1812 Oct 21, 2024
722e961
remove version enabled from TestDoSeqListingEmulated
akansha1812 Oct 21, 2024
f3c1571
increase emulator time
akansha1812 Oct 21, 2024
db1a757
make next_page more readable
akansha1812 Oct 22, 2024
56ab509
Merge branch 'main' into main
akansha1812 Oct 22, 2024
2b72b0e
to resolve race conditions
akansha1812 Oct 22, 2024
692e74e
rename goroutineID to id
akansha1812 Oct 22, 2024
a6cb0f4
Merge branch 'main' into main
akansha1812 Oct 23, 2024
1998873
move counter from beginning of the loop
akansha1812 Oct 25, 2024
7583e16
add mutex to error counter
akansha1812 Oct 25, 2024
4b1dcf0
emulator test for error counter and remove worker to track error from…
akansha1812 Oct 25, 2024
34f0840
Merge branch 'main' into main
akansha1812 Oct 25, 2024
564bbd3
test emulator error
akansha1812 Oct 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions storage/dataflux/fast_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,24 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)
// To start listing method is Open and runs both worksteal and sequential listing
// in parallel. The method which completes first is used for all subsequent runs.

// TODO: Run worksteal listing when method is Open or WorkSteal.
// Run worksteal listing when method is Open or WorkSteal.
if c.method != sequential {

g.Go(func() error {
objects, err := c.workstealListing(childCtx)
if err != nil {
countError++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this be a race condition when c.method is open and both sequential and worksteal are running at the same time? Can we hit this case in a emulator test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added unit test to catch this in the race conditions.

return fmt.Errorf("error in running worksteal_lister: %w", err)
}
// If worksteal listing completes first, set method to worksteal listing and nextToken to "".
// The c.ranges channel will be used to continue worksteal listing.
results = objects
c.pageToken = ""
c.method = worksteal
cancel()
return nil
})
}

// Run sequential listing when method is Open or Sequential.
if c.method != worksteal {
Expand Down Expand Up @@ -175,7 +192,7 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)

akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
// If ranges for worksteal and pageToken for sequential listing is empty, then
// listing is complete.
if c.pageToken == "" {
if c.pageToken == "" && len(c.ranges) == 0 {
return results, iterator.Done
}
return results, nil
Expand Down
93 changes: 93 additions & 0 deletions storage/dataflux/fast_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
package dataflux

import (
"context"
"fmt"
"log"
"os"
"runtime"
"testing"
"time"

"cloud.google.com/go/storage"
)
Expand Down Expand Up @@ -192,3 +197,91 @@ func TestNewLister(t *testing.T) {
})
}
}

var emulatorClients map[string]*storage.Client

type skipTransportTestKey string

func initEmulatorClients() func() error {
noopCloser := func() error { return nil }

if !isEmulatorEnvironmentSet() {
return noopCloser
}
ctx := context.Background()

grpcClient, err := storage.NewGRPCClient(ctx)
if err != nil {
log.Fatalf("Error setting up gRPC client for emulator tests: %v", err)
return noopCloser
}
httpClient, err := storage.NewClient(ctx)
if err != nil {
log.Fatalf("Error setting up HTTP client for emulator tests: %v", err)
return noopCloser
}

emulatorClients = map[string]*storage.Client{
"http": httpClient,
"grpc": grpcClient,
}

return func() error {
gerr := grpcClient.Close()
herr := httpClient.Close()

if gerr != nil {
return gerr
}
return herr
}
}

// transportClienttest executes the given function with a sub-test, a project name
// based on the transport, a unique bucket name also based on the transport, and
// the transport-specific client to run the test with. It also checks the environment
// to ensure it is suitable for emulator-based tests, or skips.
func transportClientTest(ctx context.Context, t *testing.T, test func(*testing.T, context.Context, string, string, *storage.Client)) {
checkEmulatorEnvironment(t)
for transport, client := range emulatorClients {
if reason := ctx.Value(skipTransportTestKey(transport)); reason != nil {
t.Skip("transport", fmt.Sprintf("%q", transport), "explicitly skipped:", reason)
}
t.Run(transport, func(t *testing.T) {
project := fmt.Sprintf("%s-project", transport)
bucket := fmt.Sprintf("%s-bucket-%d", transport, time.Now().Nanosecond())
test(t, ctx, project, bucket, client)
})
}
}

// checkEmulatorEnvironment skips the test if the emulator environment variables
// are not set.
func checkEmulatorEnvironment(t *testing.T) {
if !isEmulatorEnvironmentSet() {
t.Skip("Emulator tests skipped without emulator environment variables set")
}
}

// isEmulatorEnvironmentSet checks if the emulator environment variables are set.
func isEmulatorEnvironmentSet() bool {
return os.Getenv("STORAGE_EMULATOR_HOST_GRPC") != "" && os.Getenv("STORAGE_EMULATOR_HOST") != ""
}

// createObject creates an object in the emulator and returns its name, generation, and
// metageneration.
func createObject(ctx context.Context, bucket *storage.BucketHandle, numObjects int) error {
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved

for i := 0; i < numObjects; i++ {
// Generate a unique object name using UUIDs
objectName := fmt.Sprintf("object%d", i)
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
// Create a writer for the object
wc := bucket.Object(objectName).NewWriter(ctx)
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved

// Close the writer to finalize the upload
if err := wc.Close(); err != nil {
return fmt.Errorf("failed to close writer for object %q: %v", objectName, err)
}
}
return nil
}
6 changes: 5 additions & 1 deletion storage/dataflux/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestMain(m *testing.M) {
if err := httpTestBucket.Create(testPrefix); err != nil {
log.Fatalf("test bucket creation failed: %v", err)
}

cleanupEmulatorClients := initEmulatorClients()
m.Run()

if err := httpTestBucket.Cleanup(); err != nil {
Expand All @@ -62,6 +62,10 @@ func TestMain(m *testing.M) {
if err := deleteExpiredBuckets(testPrefix); err != nil {
log.Printf("expired http bucket cleanup failed: %v", err)
}
if err := cleanupEmulatorClients(); err != nil {
// Don't fail the test if cleanup fails.
log.Printf("Post-test cleanup failed for emulator clients: %v", err)
}
}

// Lists the all the objects in the bucket.
Expand Down
170 changes: 170 additions & 0 deletions storage/dataflux/next_page.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataflux

import (
"context"
"fmt"
"strings"

"cloud.google.com/go/storage"
"google.golang.org/api/iterator"
)

// nextPageOpts specifies options for next page of listing result .
type nextPageOpts struct {
// startRange is the start offset of the objects to be listed.
startRange string
// endRange is the end offset of the objects to be listed.
endRange string
// bucketHandle is the bucket handle of the bucket to be listed.
bucketHandle *storage.BucketHandle
// query is the storage.Query to filter objects for listing.
query storage.Query
// skipDirectoryObjects is to indicate whether to list directory objects.
skipDirectoryObjects bool
// generation is the generation number of the last object in the page.
generation int64
}

// nextPageResult holds the next page of object names, start of the next page
// and indicates whether the lister has completed listing (no more objects to retrieve).
type nextPageResult struct {
// items is the list of objects listed.
items []*storage.ObjectAttrs
// doneListing indicates whether the lister has completed listing.
doneListing bool
// nextStartRange is the start offset of the next page of objects to be listed.
nextStartRange string
// generation is the generation number of the last object in the page.
generation int64
}

// nextPage lists objects using the given lister options.
func nextPage(ctx context.Context, opts nextPageOpts) (*nextPageResult, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method seems a little convoluted; do you think there is a way to simplify it or make it a little easier to read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update nextPage. Let me know if its more readable now.


opts.query.StartOffset = addPrefix(opts.startRange, opts.query.Prefix)
opts.query.EndOffset = addPrefix(opts.endRange, opts.query.Prefix)

// objectLexLast is the lexicographically last item in the page.
objectLexLast := ""
// indexLexLast is the index of lexicographically last item in the page.
indexLexLast := 0

objectIterator := opts.bucketHandle.Objects(ctx, &opts.query)
var items []*storage.ObjectAttrs
// itemIndex is the index of the last item in the items list.
itemIndex := -1
// The Go Listing API does not expose a convenient interface to list multiple objects together,
// thus we need to manually loop to construct a page of results using the iterator.
for i := 0; i < defaultPageSize; i++ {
attrs, err := objectIterator.Next()

// If the lister has listed the last item for the assigned range,
// then set doneListing to true and return.
if err == iterator.Done {
return &nextPageResult{
items: items,
doneListing: true,
nextStartRange: "",
generation: int64(0),
}, nil
}

if err != nil {
return nil, fmt.Errorf("iterating through objects: %w", err)
}

// Skip object versions already processed in the previous page to prevent duplicates.
if opts.query.Versions && opts.query.StartOffset == attrs.Name && attrs.Generation < opts.generation {
continue
}

if !(opts.skipDirectoryObjects && strings.HasSuffix(attrs.Name, "/")) {
items = append(items, attrs)
// Track index of the current item added to the items list.
itemIndex++
}

// If name/prefix is greater than objectLexLast, update objectLexLast and indexLexLast.
if objectLexLast <= attrs.Name || objectLexLast <= attrs.Prefix {
objectLexLast = attrs.Prefix
if objectLexLast <= attrs.Name {
objectLexLast = attrs.Name
}
// If object is added to the items list, then update indexLexLast to current item index, else set indexLexLast to -1.
// Setting indexLexLast to -1, indicates that the lexicographically last item is not added to items list.
if !(opts.skipDirectoryObjects && strings.HasSuffix(attrs.Name, "/")) {
indexLexLast = itemIndex
} else {
indexLexLast = -1
}
}

// If the "startoffset" value matches the name of the last object,
// list another page to ensure the next NextStartRange is distinct from the current one.
if opts.query.Versions && i == defaultPageSize-1 && attrs.Generation != int64(0) && opts.query.StartOffset == attrs.Name {
i = -1
}

// When generation value is not set, list next page if the last item is a version of previous item to prevent duplicate listing.
if opts.query.Versions && i == defaultPageSize-1 && attrs.Generation == int64(0) && indexLexLast > 0 && items[indexLexLast-1].Name == items[indexLexLast].Name {
i = -1
}
}

// Make last item as next start range.
nextStartRange := strings.TrimPrefix(objectLexLast, opts.query.Prefix)
// When the lexicographically last item is not added to items list due to skipDirectoryObjects,
// then set doneListing return objectLexLast as next start range.
if len(items) < 1 || indexLexLast == -1 {
return &nextPageResult{
items: items,
doneListing: false,
nextStartRange: nextStartRange,
}, nil
}

generation := int64(0)

// Remove lexicographically last item from the item list to avoid duplicate listing.
// Store generation of the item to be removed from the list.
if indexLexLast >= itemIndex {
generation = items[itemIndex].Generation
items = items[:len(items)-1]
} else if indexLexLast >= 0 {
generation = items[indexLexLast].Generation
items = append(items[:indexLexLast], items[indexLexLast+1:]...)
}

// Check if is versions is false, generation is not required.
if !opts.query.Versions {
generation = int64(0)
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
}

return &nextPageResult{
items: items,
doneListing: false,
nextStartRange: nextStartRange,
generation: generation,
}, nil
}

func addPrefix(name, prefix string) string {
if name != "" {
return prefix + name
}
return name
}
12 changes: 6 additions & 6 deletions storage/dataflux/sequential.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,37 @@ const (
// If the next token is empty, then listing is complete.
func (c *Lister) sequentialListing(ctx context.Context) ([]*storage.ObjectAttrs, string, error) {
var result []*storage.ObjectAttrs
var objectsListed int
var objectsIterated int
var lastToken string
objectIterator := c.bucket.Objects(ctx, &c.query)
objectIterator.PageInfo().Token = c.pageToken
objectIterator.PageInfo().MaxSize = defaultPageSize

for {
objects, nextToken, numObjects, err := doSeqListing(objectIterator, c.skipDirectoryObjects)
objects, nextToken, pageSize, err := doSeqListing(objectIterator, c.skipDirectoryObjects)
if err != nil {
return nil, "", fmt.Errorf("failed while listing objects: %w", err)
}
result = append(result, objects...)
lastToken = nextToken
objectsListed += numObjects
if nextToken == "" || (c.batchSize > 0 && objectsListed >= c.batchSize) {
objectsIterated += pageSize
if nextToken == "" || (c.batchSize > 0 && objectsIterated >= c.batchSize) {
break
}
c.pageToken = nextToken
}
return result, lastToken, nil
}

func doSeqListing(objectIterator *storage.ObjectIterator, skipDirectoryObjects bool) (result []*storage.ObjectAttrs, token string, objectsListed int, err error) {
func doSeqListing(objectIterator *storage.ObjectIterator, skipDirectoryObjects bool) (result []*storage.ObjectAttrs, token string, pageSize int, err error) {

for {
attrs, errObjectIterator := objectIterator.Next()
objectsListed++
// Stop listing when all the requested objects have been listed.
if errObjectIterator == iterator.Done {
break
}
pageSize++
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
if errObjectIterator != nil {
err = fmt.Errorf("iterating through objects %w", errObjectIterator)
return
Expand Down
Loading
Loading