Skip to content

Commit

Permalink
br: merge range in lightweight when snapshot restore (#50648)
Browse files Browse the repository at this point in the history
close #50613
  • Loading branch information
Leavrth authored Feb 2, 2024
1 parent 372b807 commit 7cdab19
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 65 deletions.
1 change: 0 additions & 1 deletion br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ go_test(
"import_retry_test.go",
"log_client_test.go",
"main_test.go",
"merge_fuzz_test.go",
"merge_test.go",
"range_test.go",
"rawkv_client_test.go",
Expand Down
59 changes: 1 addition & 58 deletions br/pkg/restore/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@ import (

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/tablecodec"
)

const (
Expand All @@ -33,47 +29,6 @@ type MergeRangesStat struct {
MergedRegionBytesAvg int
}

// NeedsMerge checks whether two ranges needs to be merged.
func NeedsMerge(left, right *rtree.Range, splitSizeBytes, splitKeyCount uint64) bool {
leftBytes, leftKeys := left.BytesAndKeys()
rightBytes, rightKeys := right.BytesAndKeys()
if rightBytes == 0 {
return true
}
if leftBytes+rightBytes > splitSizeBytes {
return false
}
if leftKeys+rightKeys > splitKeyCount {
return false
}
tableID1, indexID1, isRecord1, err1 := tablecodec.DecodeKeyHead(kv.Key(left.StartKey))
tableID2, indexID2, isRecord2, err2 := tablecodec.DecodeKeyHead(kv.Key(right.StartKey))

// Failed to decode the file key head... can this happen?
if err1 != nil || err2 != nil {
log.Warn("Failed to parse the key head for merging files, skipping",
logutil.Key("left-start-key", left.StartKey),
logutil.Key("right-start-key", right.StartKey),
logutil.AShortError("left-err", err1),
logutil.AShortError("right-err", err2),
)
return false
}
// Merge if they are both record keys
if isRecord1 && isRecord2 {
// Do not merge ranges in different tables.
return tableID1 == tableID2
}
// If they are all index keys...
if !isRecord1 && !isRecord2 {
// Do not merge ranges in different indexes even if they are in the same
// table, as rewrite rule only supports rewriting one pattern.
// Merge left and right if they are in the same index.
return tableID1 == tableID2 && indexID1 == indexID2
}
return false
}

// MergeFileRanges returns ranges of the files are merged based on
// splitSizeBytes and splitKeyCount.
//
Expand Down Expand Up @@ -134,19 +89,7 @@ func MergeFileRanges(
}
}

sortedRanges := rangeTree.GetSortedRanges()
for i := 1; i < len(sortedRanges); {
if !NeedsMerge(&sortedRanges[i-1], &sortedRanges[i], splitSizeBytes, splitKeyCount) {
i++
continue
}
sortedRanges[i-1].EndKey = sortedRanges[i].EndKey
sortedRanges[i-1].Size += sortedRanges[i].Size
sortedRanges[i-1].Files = append(sortedRanges[i-1].Files, sortedRanges[i].Files...)
// TODO: this is slow when there are lots of ranges need to merge.
sortedRanges = append(sortedRanges[:i], sortedRanges[i+1:]...)
}

sortedRanges := rangeTree.MergedRanges(splitSizeBytes, splitKeyCount)
regionBytesAvg := totalBytes / uint64(totalRegions)
regionKeysAvg := totalKvs / uint64(totalRegions)
mergedRegionBytesAvg := totalBytes / uint64(len(sortedRanges))
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/rtree/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ go_library(
deps = [
"//br/pkg/logutil",
"//br/pkg/redact",
"//pkg/kv",
"//pkg/tablecodec",
"@com_github_google_btree//:btree",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_log//:log",
Expand All @@ -25,13 +27,16 @@ go_test(
srcs = [
"logging_test.go",
"main_test.go",
"merge_fuzz_test.go",
"rtree_test.go",
],
flaky = True,
race = "on",
shard_count = 3,
shard_count = 4,
deps = [
":rtree",
"//pkg/kv",
"//pkg/tablecodec",
"//pkg/testkit/testsetup",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_stretchr_testify//require",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
//go:build go1.18
// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0.

package restore_test
package rtree_test

import (
"testing"

backup "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/pkg/tablecodec"
)
Expand All @@ -19,6 +17,6 @@ func FuzzMerge(f *testing.F) {
f.Fuzz(func(t *testing.T, a, b []byte) {
left := rtree.Range{StartKey: a, Files: []*backup.File{{TotalKvs: 1, TotalBytes: 1}}}
right := rtree.Range{StartKey: b, Files: []*backup.File{{TotalKvs: 1, TotalBytes: 1}}}
restore.NeedsMerge(&left, &right, 42, 42)
rtree.NeedsMerge(&left, &right, 42, 42)
})
}
65 changes: 65 additions & 0 deletions br/pkg/rtree/rtree.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/tablecodec"
)

// Range represents a backup response.
Expand Down Expand Up @@ -74,6 +76,47 @@ func (rg *Range) Less(than btree.Item) bool {
return bytes.Compare(rg.StartKey, ta.StartKey) < 0
}

// NeedsMerge checks whether two ranges needs to be merged.
func NeedsMerge(left, right *Range, splitSizeBytes, splitKeyCount uint64) bool {
leftBytes, leftKeys := left.BytesAndKeys()
rightBytes, rightKeys := right.BytesAndKeys()
if rightBytes == 0 {
return true
}
if leftBytes+rightBytes > splitSizeBytes {
return false
}
if leftKeys+rightKeys > splitKeyCount {
return false
}
tableID1, indexID1, isRecord1, err1 := tablecodec.DecodeKeyHead(kv.Key(left.StartKey))
tableID2, indexID2, isRecord2, err2 := tablecodec.DecodeKeyHead(kv.Key(right.StartKey))

// Failed to decode the file key head... can this happen?
if err1 != nil || err2 != nil {
log.Warn("Failed to parse the key head for merging files, skipping",
logutil.Key("left-start-key", left.StartKey),
logutil.Key("right-start-key", right.StartKey),
logutil.AShortError("left-err", err1),
logutil.AShortError("right-err", err2),
)
return false
}
// Merge if they are both record keys
if isRecord1 && isRecord2 {
// Do not merge ranges in different tables.
return tableID1 == tableID2
}
// If they are all index keys...
if !isRecord1 && !isRecord2 {
// Do not merge ranges in different indexes even if they are in the same
// table, as rewrite rule only supports rewriting one pattern.
// Merge left and right if they are in the same index.
return tableID1 == tableID2 && indexID1 == indexID2
}
return false
}

var _ btree.Item = &Range{}

// RangeTree is sorted tree for Ranges.
Expand Down Expand Up @@ -165,6 +208,28 @@ func (rangeTree *RangeTree) InsertRange(rg Range) *Range {
return out.(*Range)
}

// MergedRanges output the sortedRanges having merged according to given `splitSizeBytes` and `splitKeyCount`.
func (rangeTree *RangeTree) MergedRanges(splitSizeBytes, splitKeyCount uint64) []Range {
var mergeTargetIndex int = -1
sortedRanges := make([]Range, 0, rangeTree.Len())
rangeTree.Ascend(func(item btree.Item) bool {
rg := item.(*Range)
if mergeTargetIndex < 0 || !NeedsMerge(&sortedRanges[mergeTargetIndex], rg, splitSizeBytes, splitKeyCount) {
// unintialized or the sortedRanges[mergeTargetIndex] does not need to merged
mergeTargetIndex += 1
sortedRanges = append(sortedRanges, *rg)
} else {
// need to merge from rg to sortedRages[mergeTargetIndex]
sortedRanges[mergeTargetIndex].EndKey = rg.EndKey
sortedRanges[mergeTargetIndex].Size += rg.Size
sortedRanges[mergeTargetIndex].Files = append(sortedRanges[mergeTargetIndex].Files, rg.Files...)
}

return true
})
return sortedRanges
}

// GetSortedRanges collects and returns sorted ranges.
func (rangeTree *RangeTree) GetSortedRanges() []Range {
sortedRanges := make([]Range, 0, rangeTree.Len())
Expand Down
40 changes: 40 additions & 0 deletions br/pkg/rtree/rtree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"fmt"
"testing"

backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -180,3 +183,40 @@ func BenchmarkRangeTreeUpdate(b *testing.B) {
rangeTree.Update(item)
}
}

func encodeTableRecord(prefix kv.Key, rowID uint64) []byte {
return tablecodec.EncodeRecordKey(prefix, kv.IntHandle(rowID))
}

func TestRangeTreeMerge(t *testing.T) {
rangeTree := rtree.NewRangeTree()
tablePrefix := tablecodec.GenTableRecordPrefix(1)
for i := uint64(0); i < 10000; i += 1 {
item := rtree.Range{
StartKey: encodeTableRecord(tablePrefix, i),
EndKey: encodeTableRecord(tablePrefix, i+1),
Files: []*backuppb.File{
{
Name: fmt.Sprintf("%20d", i),
TotalKvs: 1,
TotalBytes: 1,
},
},
Size: i,
}
rangeTree.Update(item)
}
sortedRanges := rangeTree.MergedRanges(10, 10)
require.Equal(t, 1000, len(sortedRanges))
for i, rg := range sortedRanges {
require.Equal(t, encodeTableRecord(tablePrefix, uint64(i)*10), rg.StartKey)
require.Equal(t, encodeTableRecord(tablePrefix, uint64(i+1)*10), rg.EndKey)
require.Equal(t, uint64(i*10*10+45), rg.Size)
require.Equal(t, 10, len(rg.Files))
for j, file := range rg.Files {
require.Equal(t, fmt.Sprintf("%20d", i*10+j), file.Name)
require.Equal(t, uint64(1), file.TotalKvs)
require.Equal(t, uint64(1), file.TotalBytes)
}
}
}

0 comments on commit 7cdab19

Please sign in to comment.