diff --git a/storage/dataflux/fast_list.go b/storage/dataflux/fast_list.go index 08aacd959ae7..306d59b5d8c7 100644 --- a/storage/dataflux/fast_list.go +++ b/storage/dataflux/fast_list.go @@ -18,6 +18,8 @@ import ( "context" "errors" "fmt" + "runtime" + "strings" "cloud.google.com/go/storage" "golang.org/x/sync/errgroup" @@ -41,39 +43,51 @@ type ListerInput struct { // BucketName is the name of the bucket to list objects from. Required. BucketName string - // Parallelism is number of parallel workers to use for listing. Default value is 10x number of available CPU. Optional. + // Parallelism is number of parallel workers to use for listing. + // Default value is 10x number of available CPU. Optional. Parallelism int - // BatchSize is the number of objects to list. Default value returns all objects at once. Optional. - // The number of objects returned will be rounded up to a multiple of gcs page size. + // BatchSize is the number of objects to list. Default value returns + // all objects at once. The number of objects returned will be + // rounded up to a multiple of gcs page size. Optional. BatchSize int - // Query is the query to filter objects for listing. Default value is nil. Optional. - // Use ProjectionNoACL for faster listing. Including ACLs increases latency while fetching objects. + // Query is the query to filter objects for listing. Default value is nil. + // Use ProjectionNoACL for faster listing. Including ACLs increases + // latency while fetching objects. Optional. Query storage.Query - // SkipDirectoryObjects is to indicate whether to list directory objects. Default value is false. Optional. + // SkipDirectoryObjects is to indicate whether to list directory objects. + // Default value is false. Optional. SkipDirectoryObjects bool } -// Lister is used for interacting with Dataflux fast-listing. -// The caller should initialize it with NewLister() instead of creating it directly. +// Lister is used for interacting with Dataflux fast-listing. The caller should +// initialize it with NewLister() instead of creating it directly. type Lister struct { - // method indicates the listing method(open, sequential, worksteal) to be used for listing. + // method indicates the listing method(open, sequential, worksteal) to + // be used for listing. method listingMethod - // pageToken is the token to use for sequential listing. - pageToken string - // bucket is the bucket handle to list objects from. bucket *storage.BucketHandle // batchSize is the number of objects to list. batchSize int + // parallelism is number of parallel workers to use for listing. + parallelism int + // query is the query to filter objects for listing. query storage.Query + // pageToken is the token to use for sequential listing. + pageToken string + + // ranges is the channel to store the start and end ranges to be listed + // by the workers in worksteal listing. + ranges chan *listRange + // skipDirectoryObjects is to indicate whether to list directory objects. skipDirectoryObjects bool } @@ -81,13 +95,28 @@ type Lister struct { // NewLister creates a new dataflux Lister to list objects in the give bucket. func NewLister(c *storage.Client, in *ListerInput) *Lister { bucket := c.Bucket(in.BucketName) + + // If parallelism is not given, set default value to 10x the number of + // available CPU. + if in.Parallelism == 0 { + in.Parallelism = runtime.NumCPU() * 10 + } + // Initialize range channel with entire namespace of object for given + // prefix, startoffset and endoffset. For the default range to list is + // entire namespace, start and end will be empty. + rangeChannel := make(chan *listRange, in.Parallelism*2) + start, end := updateStartEndOffset(in.Query.StartOffset, in.Query.EndOffset, in.Query.Prefix) + rangeChannel <- &listRange{startRange: start, endRange: end} + lister := &Lister{ method: open, + parallelism: in.Parallelism, pageToken: "", bucket: bucket, batchSize: in.BatchSize, query: in.Query, skipDirectoryObjects: in.SkipDirectoryObjects, + ranges: rangeChannel, } return lister } @@ -102,13 +131,15 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error) var results []*storage.ObjectAttrs ctx, cancel := context.WithCancel(ctx) defer cancel() - // Errgroup takes care of running both methods in parallel. As soon as one of the method - // is complete, the running method also stops. + // Errgroup takes care of running both methods in parallel. As soon as one of + // the method is complete, the running method also stops. g, childCtx := errgroup.WithContext(ctx) - // To start listing method is Open and runs both worksteal and sequential listing in parallel. - // The method which completes first is used for all subsequent runs. + // To start listing method is Open and runs both worksteal and sequential listing + // in parallel. The method which completes first is used for all subsequent runs. + // TODO: Run worksteal listing when method is Open or WorkSteal. + // Run sequential listing when method is Open or Sequential. if c.method != worksteal { @@ -118,8 +149,8 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error) countError++ return fmt.Errorf("error in running sequential listing: %w", err) } - // If sequential listing completes first, set method to sequential listing and ranges to nil. - // The nextToken will be used to continue sequential listing. + // If sequential listing completes first, set method to sequential listing + // and ranges to nil. The nextToken will be used to continue sequential listing. results = objects c.pageToken = nextToken c.method = sequential @@ -135,13 +166,15 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error) // If the error is not context.Canceled, then return error instead of falling back // to the other method. This is so that the error can be fixed and user can take // advantage of fast-listing. - // As one of the listing method completes, it is expected to cancel context for the other method. - // If both sequential and worksteal listing fail due to context canceled, only then return error. + // As one of the listing method completes, it is expected to cancel context for the + // only then return error. other method. If both sequential and worksteal listing + // fail due to context canceled, return error. if err != nil && (!errors.Is(err, context.Canceled) || countError > 1) { return nil, fmt.Errorf("failed waiting for sequntial and work steal lister : %w", err) } - // If ranges for worksteal and pageToken for sequential listing is empty, then listing is complete. + // If ranges for worksteal and pageToken for sequential listing is empty, then + // listing is complete. if c.pageToken == "" { return results, iterator.Done } @@ -150,6 +183,49 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error) // Close closes the range channel of the Lister. func (c *Lister) Close() { + if c.ranges != nil { + close(c.ranges) + } +} - // TODO: Close range channel for worksteal lister. +// updateStartEndOffset updates start and end offset based on prefix. +// If a prefix is given, adjust start and end value such that it lists +// objects with the given prefix. updateStartEndOffset assumes prefix will +// be added to the object name while listing objects in worksteal algorithm. +// +// For example: +// start = "abc", end = "prefix_a", prefix = "prefix", +// +// end will change to "_a", prefix will be added in worksteal algorithm. +// "abc" is lexicographically smaller than "prefix". So start will be the first +// object with the given prefix. +// +// Therefore start will change to ""(empty string) and end to "_a" . +func updateStartEndOffset(start, end, prefix string) (string, string) { + if prefix == "" { + return start, end + } + if start != "" && end != "" && start >= end { + return start, start + } + if start != "" { + if start <= prefix { + start = "" + } else if strings.HasPrefix(start, prefix) { + start = start[len(prefix):] + } else { + return start, start + } + } + + if end != "" { + if len(end) > len(prefix) && strings.HasPrefix(end, prefix) { + end = end[len(prefix):] + } else if end > prefix { + end = "" + } else { + return end, end + } + } + return start, end } diff --git a/storage/dataflux/fast_list_test.go b/storage/dataflux/fast_list_test.go new file mode 100644 index 000000000000..2bbbcb57119e --- /dev/null +++ b/storage/dataflux/fast_list_test.go @@ -0,0 +1,194 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataflux + +import ( + "runtime" + "testing" + + "cloud.google.com/go/storage" +) + +func TestUpdateStartEndOffset(t *testing.T) { + testcase := []struct { + desc string + start string + end string + prefix string + wantStart string + wantEnd string + }{ + // List all objects with the given prefix. + { + desc: "start and end are empty", + start: "", + end: "", + prefix: "pre", + wantStart: "", + wantEnd: "", + }, + { + desc: "start is longer and lexicographically before prefix", + start: "abcqre", + end: "", + prefix: "pre", + wantStart: "", + wantEnd: "", + }, + { + desc: "start value same as prefix", + start: "pre", + end: "", + prefix: "pre", + wantStart: "", + wantEnd: "", + }, + { + desc: "lexicographically start comes before prefix and end after prefix", + start: "abc", + end: "xyz", + prefix: "pre", + wantStart: "", + wantEnd: "", + }, + // List bounded objects within the given prefix. + { + desc: "start value contains prefix", + start: "pre_a", + end: "", + prefix: "pre", + wantStart: "_a", + wantEnd: "", + }, + { + desc: "end value contains prefix", + start: "", + end: "pre_x", + prefix: "pre", + wantStart: "", + wantEnd: "_x", + }, + // With empty prefix, start and end will not be affected. + { + desc: "prefix is empty", + start: "abc", + end: "xyz", + prefix: "", + wantStart: "abc", + wantEnd: "xyz", + }, + { + desc: "start is lexicographically higher than end", + start: "xyz", + end: "abc", + prefix: "", + wantStart: "xyz", + wantEnd: "abc", + }, + // Cases where no objects will be listed when prefix is given. + { + desc: "end is same as prefix", + start: "", + end: "pre", + prefix: "pre", + wantStart: "pre", + wantEnd: "pre", + }, + { + desc: "start is lexicographically higher than end with prefix", + start: "xyz", + end: "abc", + prefix: "pre", + wantStart: "xyz", + wantEnd: "xyz", + }, + { + desc: "start is lexicographically higher than prefix", + start: "xyz", + end: "", + prefix: "pre", + wantStart: "xyz", + wantEnd: "xyz", + }, + } + + for _, tc := range testcase { + t.Run(tc.desc, func(t *testing.T) { + gotStart, gotEnd := updateStartEndOffset(tc.start, tc.end, tc.prefix) + if gotStart != tc.wantStart || gotEnd != tc.wantEnd { + t.Errorf("updateStartEndOffset(%q, %q, %q) got = (%q, %q), want = (%q, %q)", tc.start, tc.end, tc.prefix, gotStart, gotEnd, tc.wantStart, tc.wantEnd) + } + }) + } +} + +func TestNewLister(t *testing.T) { + gcs := &storage.Client{} + bucketName := "test-bucket" + testcase := []struct { + desc string + query storage.Query + parallelism int + wantStart string + wantEnd string + wantParallelism int + }{ + { + desc: "start and end are empty", + query: storage.Query{Prefix: "pre"}, + parallelism: 1, + wantStart: "", + wantEnd: "", + wantParallelism: 1, + }, + { + desc: "start is longer than prefix", + query: storage.Query{Prefix: "pre", StartOffset: "pre_a"}, + parallelism: 1, + wantStart: "_a", + wantEnd: "", + wantParallelism: 1, + }, + { + desc: "start and end are empty", + query: storage.Query{Prefix: "pre"}, + parallelism: 0, + wantStart: "", + wantEnd: "", + wantParallelism: 10 * runtime.NumCPU(), + }, + } + + for _, tc := range testcase { + t.Run(tc.desc, func(t *testing.T) { + in := ListerInput{ + BucketName: bucketName, + BatchSize: 0, + Query: tc.query, + Parallelism: tc.parallelism, + } + df := NewLister(gcs, &in) + defer df.Close() + if len(df.ranges) != 1 { + t.Errorf("NewLister(%v, %v %v, %v) got len of ranges = %v, want = %v", bucketName, 1, 0, tc.query, len(df.ranges), 1) + } + ranges := <-df.ranges + if df.method != open || df.pageToken != "" || ranges.startRange != tc.wantStart || ranges.endRange != tc.wantEnd || df.parallelism != tc.wantParallelism { + t.Errorf("NewLister(%q, %d, %d, %v) got = (method: %v, token: %q, start: %q, end: %q, parallelism: %d), want = (method: %v, token: %q, start: %q, end: %q, parallelism: %d)", bucketName, 1, 0, tc.query, df.method, df.pageToken, ranges.startRange, ranges.endRange, df.parallelism, open, "", tc.wantStart, tc.wantEnd, tc.wantParallelism) + } + + }) + } +} diff --git a/storage/dataflux/range_splitter.go b/storage/dataflux/range_splitter.go index 4451e00aa48d..7d5d2646a6ec 100644 --- a/storage/dataflux/range_splitter.go +++ b/storage/dataflux/range_splitter.go @@ -261,8 +261,8 @@ func (rs *rangeSplitter) isRangeEqualWithPadding(startRange, endRange []rune) bo return true } -// charAtOrDefault returns the character at the specified position, or the default character if -// the position is out of bounds. +// charAtOrDefault returns the character at the specified position, or the default +// character if the position is out of bounds. func charAtOrDefault(charArray []rune, position int, defaultChar rune) rune { if position < 0 || position >= len(charArray) { return defaultChar diff --git a/storage/dataflux/sequential.go b/storage/dataflux/sequential.go index e68d733df879..89deee8f72bf 100644 --- a/storage/dataflux/sequential.go +++ b/storage/dataflux/sequential.go @@ -40,7 +40,7 @@ func (c *Lister) sequentialListing(ctx context.Context) ([]*storage.ObjectAttrs, objectIterator.PageInfo().MaxSize = defaultPageSize for { - objects, nextToken, numObjects, err := doListing(objectIterator, c.skipDirectoryObjects) + objects, nextToken, numObjects, err := doSeqListing(objectIterator, c.skipDirectoryObjects) if err != nil { return nil, "", fmt.Errorf("failed while listing objects: %w", err) } @@ -55,7 +55,7 @@ func (c *Lister) sequentialListing(ctx context.Context) ([]*storage.ObjectAttrs, return result, lastToken, nil } -func doListing(objectIterator *storage.ObjectIterator, skipDirectoryObjects bool) (result []*storage.ObjectAttrs, token string, objectsListed int, err error) { +func doSeqListing(objectIterator *storage.ObjectIterator, skipDirectoryObjects bool) (result []*storage.ObjectAttrs, token string, objectsListed int, err error) { for { attrs, errObjectIterator := objectIterator.Next() diff --git a/storage/dataflux/worksteal.go b/storage/dataflux/worksteal.go index bd73bcab5043..2703500b353a 100644 --- a/storage/dataflux/worksteal.go +++ b/storage/dataflux/worksteal.go @@ -16,9 +16,20 @@ package dataflux import ( "context" + "fmt" "sync" + "time" "cloud.google.com/go/storage" + "golang.org/x/sync/errgroup" +) + +const ( + // defaultAlphabet used to initiliaze rangesplitter. It must contain at least two unique characters. + defaultAlphabet = "ab" + // sleepDurationWhenIdle is the milliseconds we want each worker to sleep before checking + // the next update if it is idle. + sleepDurationWhenIdle = time.Millisecond * time.Duration(200) ) // workerStatus indicates the status of a worker. @@ -45,16 +56,179 @@ type worker struct { idleChannel chan int result *listerResult generation int64 + lister *Lister } // workstealListing is the main entry point of the worksteal algorithm. // It performs worksteal to achieve highly dynamic object listing. -func (c *Lister) workstealListing(ctx context.Context) []*storage.ObjectAttrs { +// workstealListing creates multiple (parallelism) workers that simultaneosly lists +// objects from the buckets. +func (c *Lister) workstealListing(ctx context.Context) ([]*storage.ObjectAttrs, error) { + var workerErr []error + // Idle channel is used to track number of idle workers. + idleChannel := make(chan int, c.parallelism) + // Result is used to store results from each worker. + result := &listerResult{ + objects: []*storage.ObjectAttrs{}, + } + + rs, err := newRangeSplitter(defaultAlphabet) + if err != nil { + return nil, fmt.Errorf("creating new range splitter: %w", err) + } + + g, ctx := errgroup.WithContext(ctx) + // Initialize all workers as idle. + for i := 0; i < c.parallelism; i++ { + idleWorker := &worker{ + goroutineID: i, + startRange: "", + endRange: "", + status: idle, + rangesplitter: rs, + result: result, + idleChannel: idleChannel, + generation: int64(0), + lister: c, + } + idleChannel <- 1 + g.Go(func() error { + if err := idleWorker.doWorkstealListing(ctx); err != nil { + workerErr = append(workerErr, err) + return fmt.Errorf("listing worker ID %q: %w", i, err) + } + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, fmt.Errorf("failed waiting for multiple workers : %w", err) + } + if len(workerErr) > 0 { + return nil, fmt.Errorf("failure in workers : %v", workerErr) + } + + close(idleChannel) + + return result.objects, nil +} + +// doWorkstealListing implements the listing logic for each worker. +// An active worker lists next page of objects to be listed within the given range +// and then splits range into two half if there are idle workers. Worker keeps +// the first of splitted range and passes second half of the work in range channel +// for idle workers. It continues to do this until shutdown signal is true. +// An idle worker waits till it finds work in rangeChannel. Once it finds work, +// it acts like an active worker. +func (w *worker) doWorkstealListing(ctx context.Context) error { + for !w.shutDownSignal() { + if ctx.Err() != nil { + return ctx.Err() + } + + // If a worker is idle, sleep for a while before checking the next update. + // Worker is active when it finds work in range channel. + if w.status == idle { + if len(w.lister.ranges) == 0 { + time.Sleep(sleepDurationWhenIdle) + continue + } else { + newRange := <-w.lister.ranges + <-w.idleChannel + w.updateWorker(newRange.startRange, newRange.endRange, active) + } + } + // Active worker to list next page of objects within the range. + doneListing, err := w.objectLister(ctx) + if err != nil { + return fmt.Errorf("objectLister failed: %w", err) + } + + // If listing is complete for the range, make worker idle and continue. + if doneListing { + w.status = idle + w.idleChannel <- 1 + w.generation = int64(0) + continue + } + + // If listing not complete and idle workers are available, split the range + // and give half of work to idle worker. + if len(w.idleChannel)-len(w.lister.ranges) > 0 && ctx.Err() == nil { + // Split range and upload half of work for idle worker. + splitPoint, err := w.rangesplitter.splitRange(w.startRange, w.endRange, 1) + if err != nil { + return fmt.Errorf("splitting range for worker ID:%v, err: %w", w.goroutineID, err) + } + // If split point is empty, skip splitting the work. + if len(splitPoint) < 1 { + continue + } + w.lister.ranges <- &listRange{startRange: splitPoint[0], endRange: w.endRange} + + // Update current worker range. + w.endRange = splitPoint[0] + } + } + // If the worker is active, update range channel to store the remaining work. + if w.status == active { + w.lister.ranges <- &listRange{startRange: w.startRange, endRange: w.endRange} + // Worker is now idle. + w.status = idle + } return nil } -// newObjectListerOpts specifies options for instantiating the NewObjectLister. -type newObjectListerOpts struct { +// shutDownSignal returns true if all the workers are idle and the +// or number of objects listed is equal to page size. +func (w *worker) shutDownSignal() bool { + // If all the workers are idle and range channel is empty, no more objects to list. + noMoreObjects := len(w.idleChannel) == w.lister.parallelism && len(w.lister.ranges) == 0 + + // If number of objects listed is equal to the given batchSize, then shutdown. + // If batch size is not given i.e. 0, then list until all objects have been listed. + alreadyListedBatchSizeObjects := len(w.idleChannel) == w.lister.parallelism && len(w.lister.ranges) == 0 + + return noMoreObjects || alreadyListedBatchSizeObjects +} + +// updateWorker updates the worker's start range, end range and status. +func (w *worker) updateWorker(startRange, endRange string, status workerStatus) { + w.startRange = startRange + w.endRange = endRange + w.status = status + w.generation = int64(0) +} + +func (w *worker) objectLister(ctx context.Context) (bool, error) { + // Active worker to list next page of objects within the range. + nextPageResult, err := nextPage(ctx, nextPageOpts{ + startRange: w.startRange, + endRange: w.endRange, + bucketHandle: w.lister.bucket, + query: w.lister.query, + skipDirectoryObjects: w.lister.skipDirectoryObjects, + generation: w.generation, + }) + if err != nil { + return false, fmt.Errorf("listing next page for worker ID %v, err: %w", w.goroutineID, err) + } + + // Append objects listed by objectLister to result. + w.result.mu.Lock() + w.result.objects = append(w.result.objects, nextPageResult.items...) + w.result.mu.Unlock() + + // Listing completed for default page size for the given range. + // Update current worker start range to new range and generation + // of the last objects listed if versions is true. + w.startRange = nextPageResult.nextStartRange + w.generation = nextPageResult.generation + return nextPageResult.doneListing, nil +} + +// nextPageOpts specifies options for next page of listing result . +type nextPageOpts struct { // startRange is the start offset of the objects to be listed. startRange string // endRange is the end offset of the objects to be listed. @@ -69,8 +243,8 @@ type newObjectListerOpts struct { generation int64 } -// nextPageResult holds the next page of object names and indicates whether the -// lister has completed listing (no more objects to retrieve). +// nextPageResult holds the next page of object names, start of the next page +// and indicates whether the lister has completed listing (no more objects to retrieve). type nextPageResult struct { // items is the list of objects listed. items []*storage.ObjectAttrs @@ -82,6 +256,14 @@ type nextPageResult struct { generation int64 } +// nextPage lists objects using the given lister options. +func nextPage(ctx context.Context, opts nextPageOpts) (*nextPageResult, error) { + + // TODO: Implement objectLister. + + return nil, nil +} + func addPrefix(name, prefix string) string { if name != "" { return prefix + name