Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: restore merge split ranges with small data #55104

Closed
wants to merge 13 commits into from
16 changes: 15 additions & 1 deletion br/pkg/config/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/docker/go-units"
)

type ConfigTerm[T uint | uint64] struct {
type ConfigTerm[T any] struct {
Value T
Modified bool
}
Expand All @@ -16,6 +16,7 @@ type KVConfig struct {
ImportGoroutines ConfigTerm[uint]
MergeRegionSize ConfigTerm[uint64]
MergeRegionKeyCount ConfigTerm[uint64]
SplitRegionOnTable ConfigTerm[bool]
}

func ParseImportThreadsFromConfig(resp []byte) (uint, error) {
Expand All @@ -35,6 +36,19 @@ func ParseImportThreadsFromConfig(resp []byte) (uint, error) {
return c.Import.Threads, nil
}

func ParseSplitRegionOnTable(resp []byte) (bool, error) {
type coprocessor struct {
SplitRegionOnTable bool `json:"split-region-on-table"`
}

type config struct {
Cop coprocessor `json:"coprocessor"`
}
var c config
e := json.Unmarshal(resp, &c)
return c.Cop.SplitRegionOnTable, e
}

func ParseMergeRegionSizeFromConfig(resp []byte) (uint64, uint64, error) {
type coprocessor struct {
RegionSplitSize string `json:"region-split-size"`
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ func (mgr *Mgr) ProcessTiKVConfigs(ctx context.Context, cfg *kvconfig.KVConfig,
mergeRegionSize := cfg.MergeRegionSize
mergeRegionKeyCount := cfg.MergeRegionKeyCount
importGoroutines := cfg.ImportGoroutines
splitRegionOnTable := cfg.SplitRegionOnTable

if mergeRegionSize.Modified && mergeRegionKeyCount.Modified && importGoroutines.Modified {
log.Info("no need to retrieve the config from tikv if user has set the config")
Expand All @@ -324,6 +325,14 @@ func (mgr *Mgr) ProcessTiKVConfigs(ctx context.Context, cfg *kvconfig.KVConfig,
if err != nil {
return err
}
if !splitRegionOnTable.Modified {
splitTable, e := kvconfig.ParseSplitRegionOnTable(respBytes)
if e != nil {
log.Warn("Failed to parse split region on table from config", logutil.ShortError(e))
return e
}
splitRegionOnTable.Value = splitRegionOnTable.Value || splitTable
}
if !mergeRegionSize.Modified || !mergeRegionKeyCount.Modified {
size, keys, e := kvconfig.ParseMergeRegionSizeFromConfig(respBytes)
if e != nil {
Expand All @@ -347,6 +356,7 @@ func (mgr *Mgr) ProcessTiKVConfigs(ctx context.Context, cfg *kvconfig.KVConfig,
}
}
// replace the value
cfg.SplitRegionOnTable = splitRegionOnTable
cfg.MergeRegionSize = mergeRegionSize
cfg.MergeRegionKeyCount = mergeRegionKeyCount
cfg.ImportGoroutines = importGoroutines
Expand Down
5 changes: 2 additions & 3 deletions br/pkg/restore/internal/log_split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ go_library(
visibility = ["//br/pkg/restore:__subpackages__"],
deps = [
"//br/pkg/logutil",
"//br/pkg/restore/internal/utils",
"//br/pkg/restore/internal/snap_split",
"//br/pkg/restore/split",
"//br/pkg/restore/utils",
"//br/pkg/rtree",
"//br/pkg/utils",
"//pkg/kv",
"//pkg/tablecodec",
Expand All @@ -40,7 +39,7 @@ go_test(
flaky = True,
shard_count = 4,
deps = [
"//br/pkg/restore/internal/utils",
"//br/pkg/restore/internal/snap_split",
"//br/pkg/restore/split",
"//br/pkg/restore/utils",
"//br/pkg/utiltest",
Expand Down
23 changes: 9 additions & 14 deletions br/pkg/restore/internal/log_split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import (
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/restore/internal/utils"
snapsplit "github.com/pingcap/tidb/br/pkg/restore/internal/snap_split"
"github.com/pingcap/tidb/br/pkg/restore/split"
restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/codec"
Expand Down Expand Up @@ -139,11 +138,11 @@ func (helper *LogSplitHelper) Merge(file *backuppb.DataFileInfo) {
})
}

type splitFunc = func(context.Context, *utils.RegionSplitter, uint64, int64, *split.RegionInfo, []Valued) error
type splitFunc = func(context.Context, *snapsplit.RegionSplitter, uint64, int64, *split.RegionInfo, []Valued) error

func (helper *LogSplitHelper) splitRegionByPoints(
ctx context.Context,
regionSplitter *utils.RegionSplitter,
regionSplitter *snapsplit.RegionSplitter,
initialLength uint64,
initialNumber int64,
region *split.RegionInfo,
Expand Down Expand Up @@ -176,14 +175,10 @@ func (helper *LogSplitHelper) splitRegionByPoints(
newRegions, errSplit := regionSplitter.SplitWaitAndScatter(ctx, region, splitPoints)
if errSplit != nil {
log.Warn("failed to split the scaned region", zap.Error(errSplit))
_, startKey, _ := codec.DecodeBytes(region.Region.StartKey, nil)
ranges := make([]rtree.Range, 0, len(splitPoints))
for _, point := range splitPoints {
ranges = append(ranges, rtree.Range{StartKey: startKey, EndKey: point})
startKey = point
}

return regionSplitter.ExecuteSplit(ctx, ranges)
sort.Slice(splitPoints, func(i, j int) bool {
return bytes.Compare(splitPoints[i], splitPoints[j]) < 0
})
return regionSplitter.ExecuteSplit(ctx, splitPoints)
}
select {
case <-ctx.Done():
Expand All @@ -205,7 +200,7 @@ func SplitPoint(
) (err error) {
// common status
var (
regionSplitter *utils.RegionSplitter = utils.NewRegionSplitter(client)
regionSplitter *snapsplit.RegionSplitter = snapsplit.NewRegionSplitter(client)
)
// region traverse status
var (
Expand Down Expand Up @@ -357,7 +352,7 @@ func (helper *LogSplitHelper) Split(ctx context.Context) error {
}
}

regionSplitter := utils.NewRegionSplitter(helper.client)
regionSplitter := snapsplit.NewRegionSplitter(helper.client)
// It is too expensive to stop recovery and wait for a small number of regions
// to complete scatter, so the maximum waiting time is reduced to 1 minute.
_ = regionSplitter.WaitForScatterRegionsTimeout(ctx, scatterRegions, time.Minute)
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/restore/internal/log_split/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
logsplit "github.com/pingcap/tidb/br/pkg/restore/internal/log_split"
"github.com/pingcap/tidb/br/pkg/restore/internal/utils"
snapsplit "github.com/pingcap/tidb/br/pkg/restore/internal/snap_split"
"github.com/pingcap/tidb/br/pkg/restore/split"
restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
"github.com/pingcap/tidb/br/pkg/utiltest"
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestSplitPoint(t *testing.T) {
client.AppendRegion(keyWithTablePrefix(tableID, "j"), keyWithTablePrefix(tableID+1, "a"))

iter := logsplit.NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules)
err := logsplit.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *utils.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []logsplit.Valued) error {
err := logsplit.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *snapsplit.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []logsplit.Valued) error {
require.Equal(t, u, uint64(0))
require.Equal(t, o, int64(0))
require.Equal(t, ri.Region.StartKey, keyWithTablePrefix(tableID, "a"))
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestSplitPoint2(t *testing.T) {

firstSplit := true
iter := logsplit.NewSplitHelperIteratorForTest(splitHelper, tableID, rewriteRules)
err := logsplit.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *utils.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []logsplit.Valued) error {
err := logsplit.SplitPoint(ctx, iter, client, func(ctx context.Context, rs *snapsplit.RegionSplitter, u uint64, o int64, ri *split.RegionInfo, v []logsplit.Valued) error {
if firstSplit {
require.Equal(t, u, uint64(0))
require.Equal(t, o, int64(0))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,29 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "utils",
name = "snap_split",
srcs = ["split.go"],
importpath = "github.com/pingcap/tidb/br/pkg/restore/internal/utils",
importpath = "github.com/pingcap/tidb/br/pkg/restore/internal/snap_split",
visibility = ["//br/pkg/restore:__subpackages__"],
deps = [
"//br/pkg/errors",
"//br/pkg/logutil",
"//br/pkg/restore/split",
"//br/pkg/rtree",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "utils_test",
name = "snap_split_test",
timeout = "short",
srcs = ["split_test.go"],
flaky = True,
shard_count = 5,
shard_count = 4,
deps = [
":utils",
":snap_split",
"//br/pkg/restore/split",
"//br/pkg/restore/utils",
"//br/pkg/rtree",
"//pkg/tablecodec",
"//pkg/util/codec",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_stretchr_testify//require",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package utils
package snapsplit

import (
"context"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/rtree"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -41,37 +37,15 @@ func (rs *RegionSplitter) SplitWaitAndScatter(ctx context.Context, region *split
// note: all ranges and rewrite rules must have raw key.
func (rs *RegionSplitter) ExecuteSplit(
ctx context.Context,
ranges []rtree.Range,
sortedSplitKeys [][]byte,
) error {
if len(ranges) == 0 {
log.Info("skip split regions, no range")
if len(sortedSplitKeys) == 0 {
log.Info("skip split regions, no split keys")
return nil
}

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("RegionSplitter.Split", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

// Sort the range for getting the min and max key of the ranges
// TODO: this sort may not needed if we sort tables after creatation outside.
sortedRanges, errSplit := SortRanges(ranges)
if errSplit != nil {
return errors.Trace(errSplit)
}
if len(sortedRanges) == 0 {
log.Info("skip split regions after sorted, no range")
return nil
}
sortedKeys := make([][]byte, 0, len(sortedRanges))
totalRangeSize := uint64(0)
for _, r := range sortedRanges {
sortedKeys = append(sortedKeys, r.EndKey)
totalRangeSize += r.Size
}
// the range size must be greater than 0 here
return rs.executeSplitByRanges(ctx, sortedKeys)
log.Info("execute split sorted keys", zap.Int("keys count", len(sortedSplitKeys)))
return rs.executeSplitByRanges(ctx, sortedSplitKeys)
}

func (rs *RegionSplitter) executeSplitByRanges(
Expand Down Expand Up @@ -151,20 +125,3 @@ func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regi
leftRegions, _ := rs.client.WaitRegionsScattered(ctx2, regionInfos)
return leftRegions
}

// SortRanges checks if the range overlapped and sort them.
func SortRanges(ranges []rtree.Range) ([]rtree.Range, error) {
rangeTree := rtree.NewRangeTree()
for _, rg := range ranges {
if out := rangeTree.InsertRange(rg); out != nil {
log.Error("insert ranges overlapped",
logutil.Key("startKeyOut", out.StartKey),
logutil.Key("endKeyOut", out.EndKey),
logutil.Key("startKeyIn", rg.StartKey),
logutil.Key("endKeyIn", rg.EndKey))
return nil, errors.Annotatef(berrors.ErrInvalidRange, "ranges overlapped")
}
}
sortedRanges := rangeTree.GetSortedRanges()
return sortedRanges, nil
}
Loading