Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: split and scatter regions in batches #33625

Merged
merged 11 commits into from
Apr 26, 2022
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ const (
// lower the max-key-count to avoid tikv trigger region auto split
regionMaxKeyCount = 1_280_000
defaultRegionSplitSize = 96 * units.MiB
// The max ranges count in a batch to split and scatter.
maxBatchSplitRanges = 4096

propRangeIndex = "tikv.range_index"

Expand Down Expand Up @@ -1357,7 +1359,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
needSplit := len(unfinishedRanges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys
// split region by given ranges
for i := 0; i < maxRetryTimes; i++ {
err = local.SplitAndScatterRegionByRanges(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize)
err = local.SplitAndScatterRegionInBatches(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize, maxBatchSplitRanges)
if err == nil || common.IsContextCanceledError(err) {
break
}
Expand Down
53 changes: 45 additions & 8 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,32 @@ var (
splitRetryTimes = 8
)

// TODO remove this file and use br internal functions
// This File include region split & scatter operation just like br.
// SplitAndScatterRegionInBatches splits&scatter regions in batches.
// Too many split&scatter requests may put a lot of pressure on TiKV and PD.
func (local *local) SplitAndScatterRegionInBatches(
ctx context.Context,
ranges []Range,
tableInfo *checkpoints.TidbTableInfo,
needSplit bool,
regionSplitSize int64,
batchCnt int,
) error {
for i := 0; i < len(ranges); i += batchCnt {
batch := ranges[i:]
if len(batch) > batchCnt {
batch = batch[:batchCnt]
}
if err := local.SplitAndScatterRegionByRanges(ctx, batch, tableInfo, needSplit, regionSplitSize); err != nil {
return errors.Trace(err)
}
}
return nil
}

// SplitAndScatterRegionByRanges include region split & scatter operation just like br.
// we can simply call br function, but we need to change some function signature of br
// When the ranges total size is small, we can skip the split to avoid generate empty regions.
// TODO: remove this file and use br internal functions
func (local *local) SplitAndScatterRegionByRanges(
ctx context.Context,
ranges []Range,
Expand Down Expand Up @@ -428,9 +450,15 @@ func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.
for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ {
ok, err := local.isScatterRegionFinished(ctx, regionID)
if err != nil {
log.L().Warn("scatter region failed: do not have the region",
logutil.Region(regionInfo.Region))
return
log.L().Warn(
"wait for scatter region failed, will scatter again",
logutil.Region(regionInfo.Region),
zap.Error(err),
)
if err := local.splitCli.ScatterRegion(ctx, regionInfo); err != nil {
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
log.L().Warn("scatter region failed", zap.Error(err))
}
continue
}
if ok {
break
Expand Down Expand Up @@ -462,9 +490,18 @@ func (local *local) isScatterRegionFinished(ctx context.Context, regionID uint64
return false, errors.Errorf("get operator error: %s", respErr.GetType())
}
// If the current operator of the region is not 'scatter-region', we could assume
// that 'scatter-operator' has finished or timeout
ok := string(resp.GetDesc()) != "scatter-region" || resp.GetStatus() != pdpb.OperatorStatus_RUNNING
return ok, nil
// that 'scatter-operator' has finished.
if string(resp.GetDesc()) != "scatter-region" {
return true, nil
}
switch resp.GetStatus() {
case pdpb.OperatorStatus_RUNNING:
return false, nil
case pdpb.OperatorStatus_SUCCESS:
return true, nil
default:
return false, errors.Errorf("operator status is %v", resp.GetStatus())
}
}

func getSplitKeysByRanges(ranges []Range, regions []*split.RegionInfo) map[uint64][][]byte {
Expand Down
42 changes: 41 additions & 1 deletion br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package local
import (
"bytes"
"context"
"fmt"
"math"
"math/rand"
"sort"
Expand Down Expand Up @@ -302,7 +303,8 @@ func cloneRegion(region *restore.RegionInfo) *restore.RegionInfo {
return &restore.RegionInfo{Region: r, Leader: l}
}

// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
// For keys ["", "aay", "bba", "bbh", "cca", ""], the key ranges of
// regions are [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ).
func initTestClient(keys [][]byte, hook clientHook) *testClient {
peers := make([]*metapb.Peer, 1)
peers[0] = &metapb.Peer{
Expand Down Expand Up @@ -574,6 +576,44 @@ func TestBatchSplitByRangesNoValidKeys(t *testing.T) {
doTestBatchSplitRegionByRanges(context.Background(), t, &splitRegionNoValidKeyHook{returnErrTimes: math.MaxInt32}, "no valid key", defaultHook{})
}

func TestSplitAndScatterRegionInBatches(t *testing.T) {
splitHook := defaultHook{}
deferFunc := splitHook.setup(t)
defer deferFunc()

keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")}
client := initTestClient(keys, nil)
local := &local{
splitCli: client,
g: glue.NewExternalTiDBGlue(nil, mysql.ModeNone),
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var ranges []Range
for i := 0; i < 20; i++ {
ranges = append(ranges, Range{
start: []byte(fmt.Sprintf("a%02d", i)),
end: []byte(fmt.Sprintf("a%02d", i+1)),
})
}

err := local.SplitAndScatterRegionInBatches(ctx, ranges, nil, true, 1000, 4)
require.NoError(t, err)

rangeStart := codec.EncodeBytes([]byte{}, []byte("a"))
rangeEnd := codec.EncodeBytes([]byte{}, []byte("b"))
regions, err := restore.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5)
require.NoError(t, err)
result := [][]byte{[]byte("a"), []byte("a00"), []byte("a01"), []byte("a02"), []byte("a03"), []byte("a04"),
[]byte("a05"), []byte("a06"), []byte("a07"), []byte("a08"), []byte("a09"), []byte("a10"), []byte("a11"),
[]byte("a12"), []byte("a13"), []byte("a14"), []byte("a15"), []byte("a16"), []byte("a17"), []byte("a18"),
[]byte("a19"), []byte("a20"), []byte("b"),
}
checkRegionRanges(t, regions, result)
}

type reportAfterSplitHook struct {
noopHook
ch chan<- struct{}
Expand Down