Skip to content

Commit

Permalink
Merge branch 'release-5.3' into release-5.3-64b057dea2e9
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored Jun 23, 2022
2 parents f51ca9f + d2a264a commit 75a3f03
Show file tree
Hide file tree
Showing 155 changed files with 3,259 additions and 551 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/br_compatible_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ on:
push:
branches:
- master
- 'release-[0-9].[0-9]*'
paths:
- 'br/**'
- '!**.html'
Expand All @@ -17,7 +16,6 @@ on:
pull_request:
branches:
- master
- 'release-[0-9].[0-9]*'
paths:
- 'br/**'
- '!**.html'
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/compile_br.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ on:
push:
branches:
- master
- 'release-[0-9].[0-9]*'
paths:
- 'br/**'
- '!**.html'
Expand All @@ -16,7 +15,6 @@ on:
pull_request:
branches:
- master
- 'release-[0-9].[0-9]*'
paths:
- 'br/**'
- '!**.html'
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ coverage.txt
var
fix.sql
export-20*/
*-coverage.xml
*-junit-report.xml
# Files generated when testing
out
45 changes: 34 additions & 11 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (bc *Client) BackupRange(
// TODO: test fine grained backup.
err = bc.fineGrainedBackup(
ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType, req.CompressionLevel,
req.RateLimit, req.Concurrency, results, progressCallBack)
req.RateLimit, req.Concurrency, req.IsRawKv, req.CipherInfo, results, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -550,10 +550,12 @@ func (bc *Client) BackupRange(
return nil
}

func (bc *Client) findRegionLeader(ctx context.Context, key []byte) (*metapb.Peer, error) {
func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool) (*metapb.Peer, error) {
// Keys are saved in encoded format in TiKV, so the key must be encoded
// in order to find the correct region.
key = codec.EncodeBytes([]byte{}, key)
if !isRawKv {
key = codec.EncodeBytes([]byte{}, key)
}
for i := 0; i < 5; i++ {
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
Expand Down Expand Up @@ -584,6 +586,8 @@ func (bc *Client) fineGrainedBackup(
compressLevel int32,
rateLimit uint64,
concurrency uint32,
isRawKv bool,
cipherInfo *backuppb.CipherInfo,
rangeTree rtree.RangeTree,
progressCallBack func(ProgressUnit),
) error {
Expand Down Expand Up @@ -634,7 +638,7 @@ func (bc *Client) fineGrainedBackup(
for rg := range retry {
backoffMs, err :=
bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS,
compressType, compressLevel, rateLimit, concurrency, respCh)
compressType, compressLevel, rateLimit, concurrency, isRawKv, cipherInfo, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -779,9 +783,11 @@ func (bc *Client) handleFineGrained(
compressionLevel int32,
rateLimit uint64,
concurrency uint32,
isRawKv bool,
cipherInfo *backuppb.CipherInfo,
respCh chan<- *backuppb.BackupResponse,
) (int, error) {
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey)
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey, isRawKv)
if pderr != nil {
return 0, errors.Trace(pderr)
}
Expand All @@ -796,8 +802,10 @@ func (bc *Client) handleFineGrained(
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
IsRawKv: isRawKv,
CompressionType: compressType,
CompressionLevel: compressionLevel,
CipherInfo: cipherInfo,
}
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
Expand Down Expand Up @@ -902,9 +910,17 @@ backupLoop:
})
bcli, err := client.Backup(ctx, &req)
failpoint.Inject("reset-retryable-error", func(val failpoint.Value) {
if val.(bool) {
logutil.CL(ctx).Debug("failpoint reset-retryable-error injected.")
err = status.Error(codes.Unavailable, "Unavailable error")
switch val.(string) {
case "Unavaiable":
{
logutil.CL(ctx).Debug("failpoint reset-retryable-error unavailable injected.")
err = status.Error(codes.Unavailable, "Unavailable error")
}
case "Internal":
{
logutil.CL(ctx).Debug("failpoint reset-retryable-error internal injected.")
err = status.Error(codes.Internal, "Internal error")
}
}
})
failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) {
Expand Down Expand Up @@ -970,16 +986,23 @@ const (

// isRetryableError represents whether we should retry reset grpc connection.
func isRetryableError(err error) bool {

if status.Code(err) == codes.Unavailable {
return true
// some errors can be retried
// https://github.com/pingcap/tidb/issues/34350
switch status.Code(err) {
case codes.Unavailable, codes.DeadlineExceeded,
codes.ResourceExhausted, codes.Aborted, codes.Internal:
{
log.Warn("backup met some errors, these errors can be retry 5 times", zap.Error(err))
return true
}
}

// At least, there are two possible cancel() call,
// one from backup range, another from gRPC, here we retry when gRPC cancel with connection closing
if status.Code(err) == codes.Canceled {
if s, ok := status.FromError(err); ok {
if strings.Contains(s.Message(), gRPC_Cancel) {
log.Warn("backup met grpc cancel error, this errors can be retry 5 times", zap.Error(err))
return true
}
}
Expand Down
17 changes: 17 additions & 0 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand Down Expand Up @@ -116,6 +117,7 @@ func (push *pushDown) pushBackup(
close(push.respCh)
}()

regionErrorIngestedOnce := false
for {
select {
case respAndStore, ok := <-push.respCh:
Expand All @@ -139,6 +141,21 @@ func (push *pushDown) pushBackup(
Msg: msg,
}
})
failpoint.Inject("tikv-region-error", func(val failpoint.Value) {
if !regionErrorIngestedOnce {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-regionh-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
// Msg: msg,
Detail: &backuppb.Error_RegionError{
RegionError: &errorpb.Error{
Message: msg,
},
},
}
}
regionErrorIngestedOnce = true
})
if resp.GetError() == nil {
// None error means range has been backuped successfully.
res.Put(
Expand Down
1 change: 1 addition & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Session interface {
CreateDatabase(ctx context.Context, schema *model.DBInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error
Close()
GetGlobalVariable(name string) (string, error)
}

// Progress is an interface recording the current execution progress.
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ func (gs *tidbSession) Close() {
gs.se.Close()
}

// GetGlobalVariables implements glue.Session.
func (gs *tidbSession) GetGlobalVariable(name string) (string, error) {
return gs.se.GetSessionVars().GlobalVarsAccessor.GetTiDBTableValue(name)
}

// showCreateTable shows the result of SHOW CREATE TABLE from a TableInfo.
func (gs *tidbSession) showCreateTable(tbl *model.TableInfo) (string, error) {
table := tbl.Clone()
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ const (
// lower the max-key-count to avoid tikv trigger region auto split
regionMaxKeyCount = 1_280_000
defaultRegionSplitSize = 96 * units.MiB
// The max ranges count in a batch to split and scatter.
maxBatchSplitRanges = 4096

propRangeIndex = "tikv.range_index"

Expand Down Expand Up @@ -916,7 +918,7 @@ func NewLocalBackend(
if err != nil {
return backend.MakeBackend(nil), errors.Annotate(err, "construct pd client failed")
}
splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig())
splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig(), false)

shouldCreate := true
if cfg.Checkpoint.Enable {
Expand Down Expand Up @@ -2066,7 +2068,7 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
needSplit := len(unfinishedRanges) > 1 || lfTotalSize > regionSplitSize || lfLength > regionSplitKeys
// split region by given ranges
for i := 0; i < maxRetryTimes; i++ {
err = local.SplitAndScatterRegionByRanges(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize)
err = local.SplitAndScatterRegionInBatches(ctx, unfinishedRanges, lf.tableInfo, needSplit, regionSplitSize, maxBatchSplitRanges)
if err == nil || common.IsContextCanceledError(err) {
break
}
Expand Down
62 changes: 48 additions & 14 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,32 @@ var (
splitRegionBaseBackOffTime = time.Second
)

// TODO remove this file and use br internal functions
// This File include region split & scatter operation just like br.
// SplitAndScatterRegionInBatches splits&scatter regions in batches.
// Too many split&scatter requests may put a lot of pressure on TiKV and PD.
func (local *local) SplitAndScatterRegionInBatches(
ctx context.Context,
ranges []Range,
tableInfo *checkpoints.TidbTableInfo,
needSplit bool,
regionSplitSize int64,
batchCnt int,
) error {
for i := 0; i < len(ranges); i += batchCnt {
batch := ranges[i:]
if len(batch) > batchCnt {
batch = batch[:batchCnt]
}
if err := local.SplitAndScatterRegionByRanges(ctx, batch, tableInfo, needSplit, regionSplitSize); err != nil {
return errors.Trace(err)
}
}
return nil
}

// SplitAndScatterRegionByRanges include region split & scatter operation just like br.
// we can simply call br function, but we need to change some function signature of br
// When the ranges total size is small, we can skip the split to avoid generate empty regions.
// TODO: remove this file and use br internal functions
func (local *local) SplitAndScatterRegionByRanges(
ctx context.Context,
ranges []Range,
Expand Down Expand Up @@ -423,16 +445,17 @@ func (local *local) waitForSplit(ctx context.Context, regionID uint64) {
}

func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) {
regionID := regionInfo.Region.GetId()
for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ {
ok, err := local.isScatterRegionFinished(ctx, regionID)
if err != nil {
log.L().Warn("scatter region failed: do not have the region",
logutil.Region(regionInfo.Region))
ok, err := local.checkScatterRegionFinishedOrReScatter(ctx, regionInfo)
if ok {
return
}
if ok {
break
if err != nil {
if !utils.IsRetryableError(err) {
log.L().Warn("wait for scatter region encountered non-retryable error", logutil.Region(regionInfo.Region), zap.Error(err))
return
}
log.L().Warn("wait for scatter region encountered error, will retry again", logutil.Region(regionInfo.Region), zap.Error(err))
}
select {
case <-time.After(time.Second):
Expand All @@ -442,8 +465,8 @@ func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.
}
}

func (local *local) isScatterRegionFinished(ctx context.Context, regionID uint64) (bool, error) {
resp, err := local.splitCli.GetOperator(ctx, regionID)
func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId())
if err != nil {
return false, err
}
Expand All @@ -461,9 +484,20 @@ func (local *local) isScatterRegionFinished(ctx context.Context, regionID uint64
return false, errors.Errorf("get operator error: %s", respErr.GetType())
}
// If the current operator of the region is not 'scatter-region', we could assume
// that 'scatter-operator' has finished or timeout
ok := string(resp.GetDesc()) != "scatter-region" || resp.GetStatus() != pdpb.OperatorStatus_RUNNING
return ok, nil
// that 'scatter-operator' has finished.
if string(resp.GetDesc()) != "scatter-region" {
return true, nil
}
switch resp.GetStatus() {
case pdpb.OperatorStatus_RUNNING:
return false, nil
case pdpb.OperatorStatus_SUCCESS:
return true, nil
default:
log.L().Warn("scatter-region operator status is abnormal, will scatter region again",
logutil.Region(regionInfo.Region), zap.Stringer("status", resp.GetStatus()))
return false, local.splitCli.ScatterRegion(ctx, regionInfo)
}
}

func getSplitKeysByRanges(ranges []Range, regions []*split.RegionInfo) map[uint64][][]byte {
Expand Down
42 changes: 41 additions & 1 deletion br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package local
import (
"bytes"
"context"
"fmt"
"math"
"math/rand"
"sort"
Expand Down Expand Up @@ -291,7 +292,8 @@ func cloneRegion(region *restore.RegionInfo) *restore.RegionInfo {
return &restore.RegionInfo{Region: r, Leader: l}
}

// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
// For keys ["", "aay", "bba", "bbh", "cca", ""], the key ranges of
// regions are [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ).
func initTestClient(keys [][]byte, hook clientHook) *testClient {
peers := make([]*metapb.Peer, 1)
peers[0] = &metapb.Peer{
Expand Down Expand Up @@ -562,6 +564,44 @@ func (s *localSuite) TestBatchSplitByRangesNoValidKeys(c *C) {
s.doTestBatchSplitRegionByRanges(context.Background(), c, &splitRegionNoValidKeyHook{returnErrTimes: math.MaxInt32}, ".*no valid key.*", defaultHook{})
}

func (s *localSuite) TestSplitAndScatterRegionInBatches(c *C) {
splitHook := defaultHook{}
deferFunc := splitHook.setup(c)
defer deferFunc()

keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")}
client := initTestClient(keys, nil)
local := &local{
splitCli: client,
g: glue.NewExternalTiDBGlue(nil, mysql.ModeNone),
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var ranges []Range
for i := 0; i < 20; i++ {
ranges = append(ranges, Range{
start: []byte(fmt.Sprintf("a%02d", i)),
end: []byte(fmt.Sprintf("a%02d", i+1)),
})
}

err := local.SplitAndScatterRegionInBatches(ctx, ranges, nil, true, 1000, 4)
c.Check(err, IsNil)

rangeStart := codec.EncodeBytes([]byte{}, []byte("a"))
rangeEnd := codec.EncodeBytes([]byte{}, []byte("b"))
regions, err := restore.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5)
c.Check(err, IsNil)
result := [][]byte{[]byte("a"), []byte("a00"), []byte("a01"), []byte("a02"), []byte("a03"), []byte("a04"),
[]byte("a05"), []byte("a06"), []byte("a07"), []byte("a08"), []byte("a09"), []byte("a10"), []byte("a11"),
[]byte("a12"), []byte("a13"), []byte("a14"), []byte("a15"), []byte("a16"), []byte("a17"), []byte("a18"),
[]byte("a19"), []byte("a20"), []byte("b"),
}
checkRegionRanges(c, regions, result)
}

type reportAfterSplitHook struct {
noopHook
ch chan<- struct{}
Expand Down
Loading

0 comments on commit 75a3f03

Please sign in to comment.