Skip to content

Commit

Permalink
lightning: split and scatter regions in batches (#33625)
Browse files Browse the repository at this point in the history
close #33618
  • Loading branch information
sleepymole authored Apr 26, 2022
1 parent 28ea194 commit 2f2fa06
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 16 deletions.
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ const (
// lower the max-key-count to avoid tikv trigger region auto split
regionMaxKeyCount = 1_280_000
defaultRegionSplitSize = 96 * units.MiB
// The max ranges count in a batch to split and scatter.
maxBatchSplitRanges = 4096

propRangeIndex = "tikv.range_index"

Expand Down Expand Up @@ -1364,7 +1366,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
needSplit := len(unfinishedRanges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys
// split region by given ranges
for i := 0; i < maxRetryTimes; i++ {
err = local.SplitAndScatterRegionByRanges(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize)
err = local.SplitAndScatterRegionInBatches(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize, maxBatchSplitRanges)
if err == nil || common.IsContextCanceledError(err) {
break
}
Expand Down
62 changes: 48 additions & 14 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,32 @@ var (
splitRetryTimes = 8
)

// TODO remove this file and use br internal functions
// This File include region split & scatter operation just like br.
// SplitAndScatterRegionInBatches splits&scatter regions in batches.
// Too many split&scatter requests may put a lot of pressure on TiKV and PD.
func (local *local) SplitAndScatterRegionInBatches(
ctx context.Context,
ranges []Range,
tableInfo *checkpoints.TidbTableInfo,
needSplit bool,
regionSplitSize int64,
batchCnt int,
) error {
for i := 0; i < len(ranges); i += batchCnt {
batch := ranges[i:]
if len(batch) > batchCnt {
batch = batch[:batchCnt]
}
if err := local.SplitAndScatterRegionByRanges(ctx, batch, tableInfo, needSplit, regionSplitSize); err != nil {
return errors.Trace(err)
}
}
return nil
}

// SplitAndScatterRegionByRanges include region split & scatter operation just like br.
// we can simply call br function, but we need to change some function signature of br
// When the ranges total size is small, we can skip the split to avoid generate empty regions.
// TODO: remove this file and use br internal functions
func (local *local) SplitAndScatterRegionByRanges(
ctx context.Context,
ranges []Range,
Expand Down Expand Up @@ -424,16 +446,17 @@ func (local *local) waitForSplit(ctx context.Context, regionID uint64) {
}

func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) {
regionID := regionInfo.Region.GetId()
for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ {
ok, err := local.isScatterRegionFinished(ctx, regionID)
if err != nil {
log.L().Warn("scatter region failed: do not have the region",
logutil.Region(regionInfo.Region))
ok, err := local.checkScatterRegionFinishedOrReScatter(ctx, regionInfo)
if ok {
return
}
if ok {
break
if err != nil {
if !utils.IsRetryableError(err) {
log.L().Warn("wait for scatter region encountered non-retryable error", logutil.Region(regionInfo.Region), zap.Error(err))
return
}
log.L().Warn("wait for scatter region encountered error, will retry again", logutil.Region(regionInfo.Region), zap.Error(err))
}
select {
case <-time.After(time.Second):
Expand All @@ -443,8 +466,8 @@ func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.
}
}

func (local *local) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, error) {
resp, err := local.splitCli.GetOperator(ctx, regionID)
func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId())
if err != nil {
return false, err
}
Expand All @@ -462,9 +485,20 @@ func (local *local) isScatterRegionFinished(ctx context.Context, regionID uint64
return false, errors.Errorf("get operator error: %s", respErr.GetType())
}
// If the current operator of the region is not 'scatter-region', we could assume
// that 'scatter-operator' has finished or timeout
ok := string(resp.GetDesc()) != "scatter-region" || resp.GetStatus() != pdpb.OperatorStatus_RUNNING
return ok, nil
// that 'scatter-operator' has finished.
if string(resp.GetDesc()) != "scatter-region" {
return true, nil
}
switch resp.GetStatus() {
case pdpb.OperatorStatus_RUNNING:
return false, nil
case pdpb.OperatorStatus_SUCCESS:
return true, nil
default:
log.L().Warn("scatter-region operator status is abnormal, will scatter region again",
logutil.Region(regionInfo.Region), zap.Stringer("status", resp.GetStatus()))
return false, local.splitCli.ScatterRegion(ctx, regionInfo)
}
}

func getSplitKeysByRanges(ranges []Range, regions []*split.RegionInfo) map[uint64][][]byte {
Expand Down
42 changes: 41 additions & 1 deletion br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package local
import (
"bytes"
"context"
"fmt"
"math"
"math/rand"
"sort"
Expand Down Expand Up @@ -302,7 +303,8 @@ func cloneRegion(region *restore.RegionInfo) *restore.RegionInfo {
return &restore.RegionInfo{Region: r, Leader: l}
}

// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
// For keys ["", "aay", "bba", "bbh", "cca", ""], the key ranges of
// regions are [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ).
func initTestClient(keys [][]byte, hook clientHook) *testClient {
peers := make([]*metapb.Peer, 1)
peers[0] = &metapb.Peer{
Expand Down Expand Up @@ -574,6 +576,44 @@ func TestBatchSplitByRangesNoValidKeys(t *testing.T) {
doTestBatchSplitRegionByRanges(context.Background(), t, &splitRegionNoValidKeyHook{returnErrTimes: math.MaxInt32}, "no valid key", defaultHook{})
}

func TestSplitAndScatterRegionInBatches(t *testing.T) {
splitHook := defaultHook{}
deferFunc := splitHook.setup(t)
defer deferFunc()

keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")}
client := initTestClient(keys, nil)
local := &local{
splitCli: client,
g: glue.NewExternalTiDBGlue(nil, mysql.ModeNone),
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var ranges []Range
for i := 0; i < 20; i++ {
ranges = append(ranges, Range{
start: []byte(fmt.Sprintf("a%02d", i)),
end: []byte(fmt.Sprintf("a%02d", i+1)),
})
}

err := local.SplitAndScatterRegionInBatches(ctx, ranges, nil, true, 1000, 4)
require.NoError(t, err)

rangeStart := codec.EncodeBytes([]byte{}, []byte("a"))
rangeEnd := codec.EncodeBytes([]byte{}, []byte("b"))
regions, err := restore.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5)
require.NoError(t, err)
result := [][]byte{[]byte("a"), []byte("a00"), []byte("a01"), []byte("a02"), []byte("a03"), []byte("a04"),
[]byte("a05"), []byte("a06"), []byte("a07"), []byte("a08"), []byte("a09"), []byte("a10"), []byte("a11"),
[]byte("a12"), []byte("a13"), []byte("a14"), []byte("a15"), []byte("a16"), []byte("a17"), []byte("a18"),
[]byte("a19"), []byte("a20"), []byte("b"),
}
checkRegionRanges(t, regions, result)
}

type reportAfterSplitHook struct {
noopHook
ch chan<- struct{}
Expand Down

0 comments on commit 2f2fa06

Please sign in to comment.