From d49da26be7dc52fad37c392c2876f62b1a5625a2 Mon Sep 17 00:00:00 2001 From: Akansha Maloo Date: Tue, 24 Sep 2024 12:56:35 -0700 Subject: [PATCH] feat(storage/dataflux): add range_splitter #10748 (#10899) --- storage/dataflux/fast_list.go | 3 +- storage/dataflux/integration_test.go | 1 - storage/dataflux/range_splitter.go | 335 ++++++++++++++++++++++- storage/dataflux/range_splitter_test.go | 346 ++++++++++++++++++++++++ 4 files changed, 677 insertions(+), 8 deletions(-) create mode 100644 storage/dataflux/range_splitter_test.go diff --git a/storage/dataflux/fast_list.go b/storage/dataflux/fast_list.go index 45a59758ee0c..08aacd959ae7 100644 --- a/storage/dataflux/fast_list.go +++ b/storage/dataflux/fast_list.go @@ -49,8 +49,7 @@ type ListerInput struct { BatchSize int // Query is the query to filter objects for listing. Default value is nil. Optional. - //Use ProjectionNoACL For faster listing. ACL is expensive and this results in fewer objects - // to be returned from GCS in each API call. + // Use ProjectionNoACL for faster listing. Including ACLs increases latency while fetching objects. Query storage.Query // SkipDirectoryObjects is to indicate whether to list directory objects. Default value is false. Optional. diff --git a/storage/dataflux/integration_test.go b/storage/dataflux/integration_test.go index 193356dc6c83..099f2c33c2a2 100644 --- a/storage/dataflux/integration_test.go +++ b/storage/dataflux/integration_test.go @@ -49,7 +49,6 @@ var ( func TestMain(m *testing.M) { flag.Parse() - fmt.Println("creating bucket") if err := httpTestBucket.Create(testPrefix); err != nil { log.Fatalf("test bucket creation failed: %v", err) } diff --git a/storage/dataflux/range_splitter.go b/storage/dataflux/range_splitter.go index 4c3564ecc54b..4451e00aa48d 100644 --- a/storage/dataflux/range_splitter.go +++ b/storage/dataflux/range_splitter.go @@ -15,13 +15,16 @@ package dataflux import ( + "fmt" + "math/big" + "sort" "sync" ) // rangeSplitter specifies the a list and a map of sorted alphabets. type rangeSplitter struct { mu sync.Mutex - sortedAlphabet *[]rune + sortedAlphabet []rune alphabetMap map[rune]int } @@ -31,12 +34,334 @@ type listRange struct { endRange string } +// minimalIntRange specifies start and end range in base-10 form, along with the +// minimal string length for the split range strings. +type minimalIntRange struct { + startInteger *big.Int + endInteger *big.Int + minimalLength int +} + +// generateSplitsOpts specifies the parameters needed to generate the split +// range strings. +type generateSplitsOpts struct { + minimalIntRange *minimalIntRange + numSplits int + startRange string + endRange string +} + // newRangeSplitter creates a new RangeSplitter with the given alphabets. -func newRangeSplitter(alphabet string) *rangeSplitter { - return &rangeSplitter{} +// RangeSplitter determines split points within a given range based on the given +// alphabets. +func newRangeSplitter(alphabet string) (*rangeSplitter, error) { + + // Validate that we do not have empty alphabet passed in. + if len(alphabet) == 0 { + return nil, fmt.Errorf("no alphabet specified for the range splitter") + } + // Sort the alphabet lexicographically and store a mapping of each alphabet + // to its index. We need a mapping for efficient index lookup in later operations. + sortedAlphabet := []rune(alphabet) + sortAlphabet(sortedAlphabet) + alphabetMap := constructAlphabetMap(sortedAlphabet) + + return &rangeSplitter{ + alphabetMap: alphabetMap, + sortedAlphabet: sortedAlphabet, + }, nil } -// splitRange creates a given number of splits based on a provided start and end range. +// splitRange divides the provided start and end range into approximately equal +// subranges, returning the split points. An empty slice is returned if suitable +// splits cannot be determined. Please note that this method provides a rough +// estimate of split points, without ensuring precise even partitioning of the range. +// Additionally, the number of found splits might be fewer than requested if the +// algorithm struggles to find sufficient split points. If the start range is empty +// the algorithm assumes it to be sequence of smallest possible character and empty +// end range as sequence of highest possible characters. +// +// For example, sorted alphabet = {"a","b","c","d"} +// Input: startRange= "d", endRange= "", numSplits=2 +// +// This will be converted from base-N to base-10 integers. +// While calculating base-10 integer, "a" will be appended to startRange +// and "d" will be appended to endRange until the difference between integers is +// more than number of splits. +// startInteger for "da" = 12, endInteger for "dd" = 15 +// +// The splits points will be 13 and 14 in base-10. This will be converted back to +// base-N value and returned as split points: +// {"db","dc"} + func (rs *rangeSplitter) splitRange(startRange, endRange string, numSplits int) ([]string, error) { - return nil, nil + // Number of splits has to be at least one, otherwise it is not splittable. + if numSplits < 1 { + return nil, fmt.Errorf("number of splits should be at least 1, got %d", numSplits) + } + + // End range (if specified) has to be lexicographically greater than the start range + // for the range to be valid. + if len(endRange) != 0 && startRange >= endRange { + return nil, fmt.Errorf("start range %q cannot be lexicographically greater than end range %q", startRange, endRange) + } + + rs.addCharsToAlphabet([]rune(startRange)) + rs.addCharsToAlphabet([]rune(endRange)) + + // Validate start range characters and convert into character array form. + startRangeCharArray, err := rs.convertRangeStringToArray(startRange) + if err != nil { + return nil, fmt.Errorf("unable to convert start range %q to array: %v", startRange, err) + } + + // Validate end range characters and convert into character array form. + endRangeCharArray, err := rs.convertRangeStringToArray(endRange) + if err != nil { + return nil, fmt.Errorf("unable to convert end range %q to array: %v", endRange, err) + } + + // Construct the final split ranges to be returned. + var splitPoints []string + + // If the start and end string ranges are equal with padding, no splitting is + // necessary. In such cases, an empty array of split ranges is returned. + if rs.isRangeEqualWithPadding(startRangeCharArray, endRangeCharArray) { + return splitPoints, nil + } + // Convert the range strings from base-N to base-10 and employ a greedy approach + // to determine the smallest splittable integer range difference. + minimalIntRange, err := rs.convertStringRangeToMinimalIntRange( + startRangeCharArray, endRangeCharArray, numSplits) + if err != nil { + return nil, fmt.Errorf("range splitting with start range %q and end range %q: %v", + startRange, endRange, err) + } + + // Generate the split points and return them. + splitPoints = rs.generateSplits(generateSplitsOpts{ + startRange: startRange, + endRange: endRange, + numSplits: numSplits, + minimalIntRange: minimalIntRange, + }) + + return splitPoints, nil +} + +// generateSplits generates the split points by translating the start and end +// range strings into base-10 integers, performing a split within the integer +// domain, and then converting the splits back into strings. In essence, this +// operation resembles a base-N to base-10 conversion, followed by a split in +// base 10, and finally another base-10 to base-N conversion. In this scenario, +// N represents the size of the alphabet, with the character's position in the +// alphabet indicating the digit's value. +func (rs *rangeSplitter) generateSplits(opts generateSplitsOpts) []string { + + startInteger := opts.minimalIntRange.startInteger + endInteger := opts.minimalIntRange.endInteger + minimalLength := opts.minimalIntRange.minimalLength + + rangeDifference := new(big.Int).Sub(endInteger, startInteger) + + var splitPoints []string + + // The number of intervals is one more than the number of split points. + rangeInterval := new(big.Int).SetInt64(int64(opts.numSplits + 1)) + + for i := 1; i <= opts.numSplits; i++ { + // Combine the range interval and index to determine the split point in base-10 form. + rangeDiffWithIdx := new(big.Int).Mul(rangeDifference, big.NewInt(int64(i))) + rangeInterval := new(big.Int).Div(rangeDiffWithIdx, rangeInterval) + splitPoint := new(big.Int).Add(rangeInterval, startInteger) + + // Convert the split point back from base-10 to base-N. + splitString := rs.convertIntToString(splitPoint, minimalLength) + + // Due to the approximate nature on how the minimal int range is derived, we need to perform + // another validation to check to ensure each split point falls in valid range. + isGreaterThanStart := len(splitString) > 0 && splitString > opts.startRange + isLessThanEnd := len(opts.endRange) == 0 || (len(splitString) > 0 && splitString < opts.endRange) + if isGreaterThanStart && isLessThanEnd { + splitPoints = append(splitPoints, splitString) + } + } + return splitPoints +} + +// sortAlphabet sorts the alphabets string lexicographically. +func sortAlphabet(unsortedAlphabet []rune) { + sort.Slice(unsortedAlphabet, func(i, j int) bool { + return unsortedAlphabet[i] < unsortedAlphabet[j] + }) +} + +// constructAlphabetMap constructs a mapping from each character in the +// alphabets to its index in the alphabet array. +func constructAlphabetMap(alphabet []rune) map[rune]int { + alphabetMap := make(map[rune]int) + for i, char := range alphabet { + alphabetMap[char] = i + } + return alphabetMap +} + +// addCharsToAlphabet adds a character to the known alphabet. +func (rs *rangeSplitter) addCharsToAlphabet(characters []rune) { + rs.mu.Lock() // Acquire the lock + defer rs.mu.Unlock() // Release the lock when the function exits + allAlphabet := rs.sortedAlphabet + newChars := false + for _, char := range characters { + if _, exists := rs.alphabetMap[char]; !exists { + allAlphabet = append(allAlphabet, char) + newChars = true + } + } + if newChars { + sortAlphabet(allAlphabet) + rs.sortedAlphabet = allAlphabet + rs.alphabetMap = constructAlphabetMap(rs.sortedAlphabet) + } +} + +// isRangeEqualWithPadding checks if two range strings are identical. Equality +// encompasses any padding using the smallest alphabet character from the set. +func (rs *rangeSplitter) isRangeEqualWithPadding(startRange, endRange []rune) bool { + + sortedAlphabet := rs.sortedAlphabet + + // When the end range is unspecified, it's interpreted as a sequence of the + // highest possible characters. Consequently, they are not deemed equal. + // If start range has highest possible characters, then smaller characters + // are appended to start range to find split points. + if len(endRange) == 0 { + return false + } + + // Get the longer length of the two range strings. + maxLength := max(len(startRange), len(endRange)) + + smallestChar := sortedAlphabet[0] + + // Loop through the string range. + for i := 0; i < maxLength; i++ { + + // In cases where a character is absent at a specific position (due to a length + // difference), the position is padded with the smallest character in the alphabet. + charStart := charAtOrDefault(startRange, i, smallestChar) + charEnd := charAtOrDefault(endRange, i, smallestChar) + + // As soon as we find a difference, we conclude the two strings are different. + if charStart != charEnd { + return false + } + } + // Otherwise, we conclude the two strings are equal. + return true +} + +// charAtOrDefault returns the character at the specified position, or the default character if +// the position is out of bounds. +func charAtOrDefault(charArray []rune, position int, defaultChar rune) rune { + if position < 0 || position >= len(charArray) { + return defaultChar + } + return (charArray)[position] +} + +// convertStringRangeToMinimalIntRange gradually extends the start and end string +// range in base-10 representation, until the difference reaches a threshold +// suitable for splitting. +func (rs *rangeSplitter) convertStringRangeToMinimalIntRange( + startRange, endRange []rune, numSplits int) (*minimalIntRange, error) { + + startInteger := big.NewInt(0) + endInteger := big.NewInt(0) + + alphabetLength := len(rs.sortedAlphabet) + startChar := (rs.sortedAlphabet)[0] + endChar := (rs.sortedAlphabet)[alphabetLength-1] + + endDefaultChar := startChar + if len(endRange) == 0 { + endDefaultChar = endChar + } + + for i := 0; ; i++ { + + // Convert each character of the start range string into a big integer + // based on the alphabet system. + startPosition, err := rs.charPosition(charAtOrDefault(startRange, i, startChar)) + if err != nil { + return nil, err + } + startInteger.Mul(startInteger, big.NewInt(int64(alphabetLength))) + startInteger.Add(startInteger, big.NewInt(int64(startPosition))) + + // Convert each character of the end range string into a big integer + // based on the alphabet system. + endPosition, err := rs.charPosition(charAtOrDefault(endRange, i, endDefaultChar)) + if err != nil { + return nil, err + } + endInteger.Mul(endInteger, big.NewInt(int64(alphabetLength))) + endInteger.Add(endInteger, big.NewInt(int64(endPosition))) + + // Calculate the difference between the start and end range in big integer representation. + difference := new(big.Int).Sub(endInteger, startInteger) + + // If the difference is bigger than the number of split points, we are done. + // In particular, the minimal length is one greater than the index (due to zero indexing). + if difference.Cmp(big.NewInt(int64(numSplits))) > 0 { + return &minimalIntRange{ + startInteger: startInteger, + endInteger: endInteger, + minimalLength: i + 1, + }, nil + } + } +} + +// charPosition returns the index of the character in the alphabet set. +func (rs *rangeSplitter) charPosition(ch rune) (int, error) { + if idx, ok := rs.alphabetMap[ch]; ok { + return idx, nil + } + return -1, fmt.Errorf("character %c is not found in the alphabet map %v", ch, rs.alphabetMap) +} + +// convertRangeStringToArray transforms the range string into a rune slice while +// verifying the presence of each character in the alphabets. +func (rs *rangeSplitter) convertRangeStringToArray(rangeString string) ([]rune, error) { + for _, char := range rangeString { + if _, exists := rs.alphabetMap[char]; !exists { + return nil, fmt.Errorf("character %c in range string %q is not found in the alphabet array", char, rangeString) + } + } + characterArray := []rune(rangeString) + return characterArray, nil +} + +// convertIntToString converts the split point from base-10 to base-N. +func (rs *rangeSplitter) convertIntToString(splitPoint *big.Int, stringLength int) string { + + remainder := new(big.Int) + + var splitChar []rune + alphabetSize := big.NewInt(int64(len(rs.sortedAlphabet))) + + // Iterate through the split point and convert alphabet by alphabet. + for i := 0; i < stringLength; i++ { + remainder.Mod(splitPoint, alphabetSize) + splitPoint.Div(splitPoint, alphabetSize) + splitChar = append(splitChar, (rs.sortedAlphabet)[(int)(remainder.Int64())]) + } + + // Reverse the converted alphabet order because we originally processed from right to left. + for i, j := 0, len(splitChar)-1; i < j; i, j = i+1, j-1 { + splitChar[i], splitChar[j] = splitChar[j], splitChar[i] + } + + return string(splitChar) } diff --git a/storage/dataflux/range_splitter_test.go b/storage/dataflux/range_splitter_test.go new file mode 100644 index 000000000000..934ef0748074 --- /dev/null +++ b/storage/dataflux/range_splitter_test.go @@ -0,0 +1,346 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataflux + +import ( + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestNewRangeSplitter(t *testing.T) { + testCases := []struct { + desc string + alphabet string + wantErr bool + }{ + { + desc: "valid alphabet", + alphabet: "0123456789", + wantErr: false, + }, + { + desc: "empty alphabet", + alphabet: "", + wantErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + _, gotErr := newRangeSplitter(tc.alphabet) + if (gotErr != nil) != tc.wantErr { + t.Errorf("NewRangeSplitter(%q) got error = %v, want error = %v", tc.alphabet, gotErr, tc.wantErr) + } + }) + } +} + +func TestSplitRange(t *testing.T) { + + testAlphabet := "0123456789" + + // We use the numbers as the base alphabet for testing purposes. + rangeSplitter, err := newRangeSplitter(testAlphabet) + if err != nil { + t.Fatalf("NewRangeSplitter(%q) got error = %v, want error = nil", testAlphabet, err) + } + + testCases := []struct { + desc string + startRange string + endRange string + numSplits int + wantErr bool + wantSplitPoints []string + }{ + { + desc: "empty start", + startRange: "", + endRange: "9", + numSplits: 2, + wantErr: false, + wantSplitPoints: []string{"3", "6"}, + }, + { + desc: "empty end", + startRange: "0", + endRange: "", + numSplits: 2, + wantErr: false, + wantSplitPoints: []string{"3", "6"}, + }, + // Tests for checking invalid arguments are properly handled. + { + desc: "splits less than one", + startRange: "123", + endRange: "456", + numSplits: 0, + wantErr: true, + wantSplitPoints: nil, + }, + { + desc: "end range lexicographically smaller than start range", + startRange: "456", + endRange: "123", + numSplits: 2, + wantErr: true, + wantSplitPoints: nil, + }, + // Test for unsplittable cases. + { + desc: "unsplittable with empty start range", + startRange: "", + endRange: "0", + numSplits: 100, + wantErr: false, + wantSplitPoints: nil, + }, + { + desc: "unsplittable with non empty ranges", + startRange: "9", + endRange: "90", + numSplits: 100, + wantErr: false, + wantSplitPoints: nil, + }, + // Test for splittable cases. + { + desc: "Split Entire Bucket Namespace", + startRange: "", + endRange: "", + numSplits: 24, + wantErr: false, + wantSplitPoints: []string{"03", "07", "11", "15", "19", "23", "27", "31", "35", "39", "43", "47", "51", "55", "59", "63", "67", "71", "75", "79", "83", "87", "91", "95"}, + }, + { + desc: "split with only start range", + startRange: "5555", + endRange: "", + numSplits: 4, + wantErr: false, + wantSplitPoints: []string{"63", "72", "81", "90"}, + }, + { + desc: "split large distance with few split points", + startRange: "0", + endRange: "9", + numSplits: 3, + wantErr: false, + wantSplitPoints: []string{"2", "4", "6"}, + }, + { + desc: "split with prefix, distance at index 5 > 1", + startRange: "0123455111", + endRange: "012347", + numSplits: 1, + wantErr: false, + wantSplitPoints: []string{"012346"}, + }, + { + desc: "split with prefix, distance at index 6 > 1", + startRange: "00005699", + endRange: "00006", + numSplits: 3, + wantErr: false, + wantSplitPoints: []string{"000057", "000058", "000059"}, + }, + { + desc: "split into half with small range", + startRange: "199999", + endRange: "2", + numSplits: 1, + wantErr: false, + wantSplitPoints: []string{"1999995"}, + }, + { + desc: "split into multuple pieces with small range", + startRange: "011", + endRange: "022", + numSplits: 5, + wantErr: false, + wantSplitPoints: []string{"012", "014", "016", "018", "020"}, + }, + { + desc: "split towards end range", + startRange: "8999", + endRange: "", + numSplits: 4, + wantErr: false, + wantSplitPoints: []string{"91", "93", "95", "97"}, + }, + { + desc: "split with sequence of adjacent characters", + startRange: "12345", + endRange: "23456", + numSplits: 4, + wantErr: false, + wantSplitPoints: []string{"14", "16", "18", "20"}, + }, + { + desc: "split into adjenct split points", + startRange: "0999998", + endRange: "1000002", + numSplits: 3, + wantErr: false, + wantSplitPoints: []string{"0999999", "1000000", "1000001"}, + }, + { + desc: "end range contains new character", + startRange: "123", + endRange: "xyz", + numSplits: 2, + wantErr: false, + wantSplitPoints: []string{"4", "7"}, + }, + { + desc: "start range contains new character", + startRange: "abc", + endRange: "xyz", + numSplits: 2, + wantErr: false, + wantSplitPoints: []string{"b", "c"}, + }, + { + desc: "start range is sequence of highest characters", + startRange: "zzz", + endRange: "", + numSplits: 2, + wantErr: false, + wantSplitPoints: []string{"zzz5", "zzza"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + gotSplitPoints, gotErr := rangeSplitter.splitRange(tc.startRange, tc.endRange, tc.numSplits) + if (gotErr != nil) != tc.wantErr { + t.Errorf("SplitRange(%q, %q, %d) got error = %v, want error = %v", + tc.startRange, tc.endRange, tc.numSplits, gotErr, tc.wantErr) + } + + if diff := cmp.Diff(tc.wantSplitPoints, gotSplitPoints); diff != "" { + t.Errorf("SplitRange(%q, %q, %d) returned unexpected diff (-want +got):\n%s", + tc.startRange, tc.endRange, tc.numSplits, diff) + } + }) + } +} + +func TestSortAlphabet(t *testing.T) { + testCases := []struct { + desc string + unsortedAlphabet []rune + wantAphabet []rune + }{ + { + desc: "unsorted array", + unsortedAlphabet: []rune{'8', '9', '7'}, + wantAphabet: []rune{'7', '8', '9'}, + }, + { + desc: "one alphabet", + unsortedAlphabet: []rune{'7'}, + wantAphabet: []rune{'7'}, + }, + { + desc: "empty array", + unsortedAlphabet: []rune{}, + wantAphabet: []rune{}, + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + sortAlphabet(tc.unsortedAlphabet) + if diff := cmp.Diff(tc.wantAphabet, tc.unsortedAlphabet); diff != "" { + t.Errorf("sortAlphabet(%q) returned unexpected diff (-want +got):\n%s", tc.unsortedAlphabet, diff) + } + }) + } +} + +func TestConstructAlphabetMap(t *testing.T) { + testCases := []struct { + desc string + sortedAlphabet []rune + wantMap map[rune]int + }{ + { + desc: "sorted array", + sortedAlphabet: []rune{'7', '8', '9'}, + wantMap: map[rune]int{'7': 0, '8': 1, '9': 2}, + }, + { + desc: "unsorted array", + sortedAlphabet: []rune{'7', '9', '8'}, + wantMap: map[rune]int{'7': 0, '9': 1, '8': 2}, + }, + { + desc: "one alphabet", + sortedAlphabet: []rune{'7'}, + wantMap: map[rune]int{'7': 0}, + }, + { + desc: "empty array", + sortedAlphabet: []rune{}, + wantMap: map[rune]int{}, + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + got := constructAlphabetMap(tc.sortedAlphabet) + if diff := cmp.Diff(tc.wantMap, got); diff != "" { + t.Errorf("constructAlphabetMap(%q) returned unexpected diff (-want +got):\n%s", tc.sortedAlphabet, diff) + } + }) + } +} + +func TestCharPosition(t *testing.T) { + testCases := []struct { + desc string + character rune + wantErr bool + wantPos int + }{ + { + desc: "no error", + character: '7', + wantErr: false, + wantPos: 0, + }, + { + desc: "character not present", + character: '6', + wantErr: true, + wantPos: -1, + }, + } + rs, err := newRangeSplitter("78898") + if err != nil { + t.Fatalf("Failed to initialize range splitter, err: %v", err) + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + got, err := rs.charPosition(tc.character) + if (err != nil) != tc.wantErr { + t.Errorf("charPosition(%q) got error = %v, want error = %v", tc.character, err, tc.wantErr) + } + if got != tc.wantPos { + t.Errorf("charPosition(%q) got = %v, want = %v", tc.character, got, tc.wantPos) + } + }) + } +}