Skip to content

Commit

Permalink
br: clean codes -- restore tables (#55519)
Browse files Browse the repository at this point in the history
ref #52877
  • Loading branch information
Leavrth authored Aug 26, 2024
1 parent d041cd0 commit 4eeeef8
Show file tree
Hide file tree
Showing 30 changed files with 687 additions and 2,099 deletions.
5 changes: 2 additions & 3 deletions br/pkg/restore/internal/log_split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ go_library(
visibility = ["//br/pkg/restore:__subpackages__"],
deps = [
"//br/pkg/logutil",
"//br/pkg/restore/internal/utils",
"//br/pkg/restore/internal/snap_split",
"//br/pkg/restore/split",
"//br/pkg/restore/utils",
"//br/pkg/rtree",
"//br/pkg/utils",
"//pkg/kv",
"//pkg/tablecodec",
Expand All @@ -40,7 +39,7 @@ go_test(
flaky = True,
shard_count = 4,
deps = [
"//br/pkg/restore/internal/utils",
"//br/pkg/restore/internal/snap_split",
"//br/pkg/restore/split",
"//br/pkg/restore/utils",
"//br/pkg/utiltest",
Expand Down
23 changes: 9 additions & 14 deletions br/pkg/restore/internal/log_split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import (
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/restore/internal/utils"
snapsplit "github.com/pingcap/tidb/br/pkg/restore/internal/snap_split"
"github.com/pingcap/tidb/br/pkg/restore/split"
restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/codec"
Expand Down Expand Up @@ -139,11 +138,11 @@ func (helper *LogSplitHelper) Merge(file *backuppb.DataFileInfo) {
})
}

type splitFunc = func(context.Context, *utils.RegionSplitter, uint64, int64, *split.RegionInfo, []Valued) error
type splitFunc = func(context.Context, *snapsplit.RegionSplitter, uint64, int64, *split.RegionInfo, []Valued) error

func (helper *LogSplitHelper) splitRegionByPoints(
ctx context.Context,
regionSplitter *utils.RegionSplitter,
regionSplitter *snapsplit.RegionSplitter,
initialLength uint64,
initialNumber int64,
region *split.RegionInfo,
Expand Down Expand Up @@ -176,14 +175,10 @@ func (helper *LogSplitHelper) splitRegionByPoints(
newRegions, errSplit := regionSplitter.SplitWaitAndScatter(ctx, region, splitPoints)
if errSplit != nil {
log.Warn("failed to split the scaned region", zap.Error(errSplit))
_, startKey, _ := codec.DecodeBytes(region.Region.StartKey, nil)
ranges := make([]rtree.Range, 0, len(splitPoints))
for _, point := range splitPoints {
ranges = append(ranges, rtree.Range{StartKey: startKey, EndKey: point})
startKey = point
}

return regionSplitter.ExecuteSplit(ctx, ranges)
sort.Slice(splitPoints, func(i, j int) bool {
return bytes.Compare(splitPoints[i], splitPoints[j]) < 0
})
return regionSplitter.ExecuteSplit(ctx, splitPoints)
}
select {
case <-ctx.Done():
Expand All @@ -205,7 +200,7 @@ func SplitPoint(
) (err error) {
// common status
var (
regionSplitter *utils.RegionSplitter = utils.NewRegionSplitter(client)
regionSplitter *snapsplit.RegionSplitter = snapsplit.NewRegionSplitter(client)
)
// region traverse status
var (
Expand Down Expand Up @@ -357,7 +352,7 @@ func (helper *LogSplitHelper) Split(ctx context.Context) error {
}
}

regionSplitter := utils.NewRegionSplitter(helper.client)
regionSplitter := snapsplit.NewRegionSplitter(helper.client)
// It is too expensive to stop recovery and wait for a small number of regions
// to complete scatter, so the maximum waiting time is reduced to 1 minute.
_ = regionSplitter.WaitForScatterRegionsTimeout(ctx, scatterRegions, time.Minute)
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/restore/internal/log_split/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
logsplit "github.com/pingcap/tidb/br/pkg/restore/internal/log_split"
"github.com/pingcap/tidb/br/pkg/restore/internal/utils"
snapsplit "github.com/pingcap/tidb/br/pkg/restore/internal/snap_split"
"github.com/pingcap/tidb/br/pkg/restore/split"
restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
"github.com/pingcap/tidb/br/pkg/utiltest"
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestSplitPoint(t *testing.T) {
client.AppendRegion(keyWithTablePrefix(tableID, "j"), keyWithTablePrefix(tableID+1, "a"))

iter := logsplit.NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules)
err := logsplit.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *utils.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []logsplit.Valued) error {
err := logsplit.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *snapsplit.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []logsplit.Valued) error {
require.Equal(t, u, uint64(0))
require.Equal(t, o, int64(0))
require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a"))
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestSplitPoint2(t *testing.T) {

firstSplit := true
iter := logsplit.NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules)
err := logsplit.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *utils.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []logsplit.Valued) error {
err := logsplit.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *snapsplit.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []logsplit.Valued) error {
if firstSplit {
require.Equal(t, u, uint64(0))
require.Equal(t, o, int64(0))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,29 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "utils",
name = "snap_split",
srcs = ["split.go"],
importpath = "github.com/pingcap/tidb/br/pkg/restore/internal/utils",
importpath = "github.com/pingcap/tidb/br/pkg/restore/internal/snap_split",
visibility = ["//br/pkg/restore:__subpackages__"],
deps = [
"//br/pkg/errors",
"//br/pkg/logutil",
"//br/pkg/restore/split",
"//br/pkg/rtree",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "utils_test",
name = "snap_split_test",
timeout = "short",
srcs = ["split_test.go"],
flaky = True,
shard_count = 5,
shard_count = 4,
deps = [
":utils",
":snap_split",
"//br/pkg/restore/split",
"//br/pkg/restore/utils",
"//br/pkg/rtree",
"//pkg/tablecodec",
"//pkg/util/codec",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_stretchr_testify//require",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package utils
package snapsplit

import (
"context"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/rtree"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -41,37 +37,15 @@ func (rs *RegionSplitter) SplitWaitAndScatter(ctx context.Context, region *split
// note: all ranges and rewrite rules must have raw key.
func (rs *RegionSplitter) ExecuteSplit(
ctx context.Context,
ranges []rtree.Range,
sortedSplitKeys [][]byte,
) error {
if len(ranges) == 0 {
log.Info("skip split regions, no range")
if len(sortedSplitKeys) == 0 {
log.Info("skip split regions, no split keys")
return nil
}

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("RegionSplitter.Split", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

// Sort the range for getting the min and max key of the ranges
// TODO: this sort may not needed if we sort tables after creatation outside.
sortedRanges, errSplit := SortRanges(ranges)
if errSplit != nil {
return errors.Trace(errSplit)
}
if len(sortedRanges) == 0 {
log.Info("skip split regions after sorted, no range")
return nil
}
sortedKeys := make([][]byte, 0, len(sortedRanges))
totalRangeSize := uint64(0)
for _, r := range sortedRanges {
sortedKeys = append(sortedKeys, r.EndKey)
totalRangeSize += r.Size
}
// the range size must be greater than 0 here
return rs.executeSplitByRanges(ctx, sortedKeys)
log.Info("execute split sorted keys", zap.Int("keys count", len(sortedSplitKeys)))
return rs.executeSplitByRanges(ctx, sortedSplitKeys)
}

func (rs *RegionSplitter) executeSplitByRanges(
Expand Down Expand Up @@ -151,20 +125,3 @@ func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regi
leftRegions, _ := rs.client.WaitRegionsScattered(ctx2, regionInfos)
return leftRegions
}

// SortRanges checks if the range overlapped and sort them.
func SortRanges(ranges []rtree.Range) ([]rtree.Range, error) {
rangeTree := rtree.NewRangeTree()
for _, rg := range ranges {
if out := rangeTree.InsertRange(rg); out != nil {
log.Error("insert ranges overlapped",
logutil.Key("startKeyOut", out.StartKey),
logutil.Key("endKeyOut", out.EndKey),
logutil.Key("startKeyIn", rg.StartKey),
logutil.Key("endKeyIn", rg.EndKey))
return nil, errors.Annotatef(berrors.ErrInvalidRange, "ranges overlapped")
}
}
sortedRanges := rangeTree.GetSortedRanges()
return sortedRanges, nil
}
Loading

0 comments on commit 4eeeef8

Please sign in to comment.