Skip to content

Commit

Permalink
br: cherry pick tidb#32612 to fix rawkv backup failure
Browse files Browse the repository at this point in the history
Issue Number: tikv#67

Signed-off-by: pingyu <[email protected]>
  • Loading branch information
pingyu committed Mar 28, 2022
1 parent 57ff45e commit 7bd1b08
Show file tree
Hide file tree
Showing 16 changed files with 215 additions and 239 deletions.
19 changes: 19 additions & 0 deletions br/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,22 @@ clean:
rm -rf *.out
rm -rf bin
rm -rf tools/bin

br_integration_test: br_bins build_br build_for_br_integration_test
@cd br && tests/run.sh

# TODO: adjust bins when br integraion tests reformat.
br_bins:
@which bin/tikv-server
@which bin/pd-server
@which bin/pd-ctl

build_for_br_integration_test:
@make failpoint-enable
($(GOTEST) -c -cover -covermode=count \
-coverpkg=github.com/tikv/migration/br/... \
-o $(BR_BIN).test \
github.com/tikv/migration/br/cmd/br && \
$(GOBUILD) $(RACE_FLAG) -o bin/rawkv tests/br_rawkv/*.go \
) || (make failpoint-disable && exit 1)
@make failpoint-disable
18 changes: 13 additions & 5 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (bc *Client) BackupRange(
// TODO: test fine grained backup.
err = bc.fineGrainedBackup(
ctx, startKey, endKey, req.StartVersion, req.EndVersion, req.CompressionType, req.CompressionLevel,
req.RateLimit, req.Concurrency, results, progressCallBack)
req.RateLimit, req.Concurrency, req.IsRawKv, req.CipherInfo, results, progressCallBack)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -337,10 +337,12 @@ func (bc *Client) BackupRange(
return nil
}

func (bc *Client) findRegionLeader(ctx context.Context, key []byte) (*metapb.Peer, error) {
func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool) (*metapb.Peer, error) {
// Keys are saved in encoded format in TiKV, so the key must be encoded
// in order to find the correct region.
key = codec.EncodeBytes([]byte{}, key)
if !isRawKv {
key = codec.EncodeBytes([]byte{}, key)
}
for i := 0; i < 5; i++ {
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
Expand Down Expand Up @@ -371,6 +373,8 @@ func (bc *Client) fineGrainedBackup(
compressLevel int32,
rateLimit uint64,
concurrency uint32,
isRawKv bool,
cipherInfo *backuppb.CipherInfo,
rangeTree rtree.RangeTree,
progressCallBack func(ProgressUnit),
) error {
Expand Down Expand Up @@ -421,7 +425,7 @@ func (bc *Client) fineGrainedBackup(
for rg := range retry {
backoffMs, err :=
bc.handleFineGrained(ctx, boFork, rg, lastBackupTS, backupTS,
compressType, compressLevel, rateLimit, concurrency, respCh)
compressType, compressLevel, rateLimit, concurrency, isRawKv, cipherInfo, respCh)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -566,9 +570,11 @@ func (bc *Client) handleFineGrained(
compressionLevel int32,
rateLimit uint64,
concurrency uint32,
isRawKv bool,
cipherInfo *backuppb.CipherInfo,
respCh chan<- *backuppb.BackupResponse,
) (int, error) {
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey)
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey, isRawKv)
if pderr != nil {
return 0, errors.Trace(pderr)
}
Expand All @@ -583,8 +589,10 @@ func (bc *Client) handleFineGrained(
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
IsRawKv: isRawKv,
CompressionType: compressType,
CompressionLevel: compressionLevel,
CipherInfo: cipherInfo,
}
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
Expand Down
17 changes: 17 additions & 0 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/tikv/migration/br/pkg/errors"
"github.com/tikv/migration/br/pkg/logutil"
Expand Down Expand Up @@ -116,6 +117,7 @@ func (push *pushDown) pushBackup(
close(push.respCh)
}()

regionErrorIngestedOnce := false
for {
select {
case respAndStore, ok := <-push.respCh:
Expand All @@ -139,6 +141,21 @@ func (push *pushDown) pushBackup(
Msg: msg,
}
})
failpoint.Inject("tikv-region-error", func(val failpoint.Value) {
if !regionErrorIngestedOnce {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-regionh-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
// Msg: msg,
Detail: &backuppb.Error_RegionError{
RegionError: &errorpb.Error{
Message: msg,
},
},
}
}
regionErrorIngestedOnce = true
})
if resp.GetError() == nil {
// None error means range has been backuped successfully.
res.Put(
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func NewRestoreClient(
store kv.Storage,
tlsConf *tls.Config,
keepaliveConf keepalive.ClientParameters,
isRawKv bool,
) (*Client, error) {
db, err := NewDB(g, store)
if err != nil {
Expand All @@ -112,7 +113,7 @@ func NewRestoreClient(

return &Client{
pdClient: pdClient,
toolClient: NewSplitClient(pdClient, tlsConf),
toolClient: NewSplitClient(pdClient, tlsConf, isRawKv),
db: db,
tlsConf: tlsConf,
keepaliveConf: keepaliveConf,
Expand Down Expand Up @@ -197,7 +198,7 @@ func (rc *Client) InitBackupMeta(
rc.backupMeta = backupMeta
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))

metaClient := NewSplitClient(rc.pdClient, rc.tlsConf)
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf, rc.backupMeta.IsRawKv)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, rc.backupMeta.IsRawKv, rc.rateLimit)
return rc.fileImporter.CheckMultiIngestSupport(c, rc.pdClient)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var defaultKeepaliveCfg = keepalive.ClientParameters{

func TestIsOnline(t *testing.T) {
m := mc
client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg)
client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg, false) // TODO: add test case for "isRawKv=true"
require.NoError(t, err)

require.False(t, client.IsOnline())
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/restore/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func NewTiKVSender(
cli *Client,
updateCh glue.Progress,
splitConcurrency uint,
isRawKv bool,
) (BatchSender, error) {
inCh := make(chan DrainResult, defaultChannelSize)
midCh := make(chan drainResultAndDone, defaultChannelSize)
Expand All @@ -225,7 +226,7 @@ func NewTiKVSender(
}

sender.wg.Add(2)
go sender.splitWorker(ctx, inCh, midCh, splitConcurrency)
go sender.splitWorker(ctx, inCh, midCh, splitConcurrency, isRawKv)
go sender.restoreWorker(ctx, midCh)
return sender, nil
}
Expand All @@ -245,6 +246,7 @@ func (b *tikvSender) splitWorker(ctx context.Context,
ranges <-chan DrainResult,
next chan<- drainResultAndDone,
concurrency uint,
isRawKv bool,
) {
defer log.Debug("split worker closed")
eg, ectx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -289,7 +291,7 @@ func (b *tikvSender) splitWorker(ctx context.Context,
// hence the checksum would fail.
done := b.registerTableIsRestoring(result.TablesToSend)
pool.ApplyOnErrorGroup(eg, func() error {
err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh)
err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh, isRawKv)
if err != nil {
log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err))
return err
Expand Down
13 changes: 8 additions & 5 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (rs *RegionSplitter) Split(
ctx context.Context,
ranges []rtree.Range,
rewriteRules *RewriteRules,
isRawKv bool,
onSplit OnSplitFunc,
) error {
if len(ranges) == 0 {
Expand Down Expand Up @@ -111,7 +112,7 @@ SplitRegions:
}
return errors.Trace(errScan)
}
splitKeyMap := getSplitKeys(rewriteRules, sortedRanges, regions)
splitKeyMap := getSplitKeys(rewriteRules, sortedRanges, regions, isRawKv)
regionMap := make(map[uint64]*RegionInfo)
for _, region := range regions {
regionMap[region.Region.GetId()] = region
Expand Down Expand Up @@ -486,14 +487,14 @@ func (b *scanRegionBackoffer) Attempt() int {

// getSplitKeys checks if the regions should be split by the end key of
// the ranges, groups the split keys by region id.
func getSplitKeys(_ *RewriteRules, ranges []rtree.Range, regions []*RegionInfo) map[uint64][][]byte {
func getSplitKeys(_ *RewriteRules, ranges []rtree.Range, regions []*RegionInfo, isRawKv bool) map[uint64][][]byte {
splitKeyMap := make(map[uint64][][]byte)
checkKeys := make([][]byte, 0)
for _, rg := range ranges {
checkKeys = append(checkKeys, rg.EndKey)
}
for _, key := range checkKeys {
if region := NeedSplit(key, regions); region != nil {
if region := NeedSplit(key, regions, isRawKv); region != nil {
splitKeys, ok := splitKeyMap[region.Region.GetId()]
if !ok {
splitKeys = make([][]byte, 0, 1)
Expand All @@ -509,12 +510,14 @@ func getSplitKeys(_ *RewriteRules, ranges []rtree.Range, regions []*RegionInfo)
}

// NeedSplit checks whether a key is necessary to split, if true returns the split region.
func NeedSplit(splitKey []byte, regions []*RegionInfo) *RegionInfo {
func NeedSplit(splitKey []byte, regions []*RegionInfo, isRawKv bool) *RegionInfo {
// If splitKey is the max key.
if len(splitKey) == 0 {
return nil
}
splitKey = codec.EncodeBytes(splitKey)
if !isRawKv {
splitKey = codec.EncodeBytes(splitKey)
}
for _, region := range regions {
// If splitKey is the boundary of the region
if bytes.Equal(splitKey, region.Region.GetStartKey()) {
Expand Down
9 changes: 7 additions & 2 deletions br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,17 @@ type pdClient struct {
// this may mislead the scatter.
needScatterVal bool
needScatterInit sync.Once

isRawKv bool
}

// NewSplitClient returns a client used by RegionSplitter.
func NewSplitClient(client pd.Client, tlsConf *tls.Config) SplitClient {
func NewSplitClient(client pd.Client, tlsConf *tls.Config, isRawKv bool) SplitClient {
cli := &pdClient{
client: client,
tlsConf: tlsConf,
storeCache: make(map[uint64]*metapb.Store),
isRawKv: isRawKv,
}
return cli
}
Expand Down Expand Up @@ -255,6 +258,7 @@ func splitRegionWithFailpoint(
peer *metapb.Peer,
client tikvpb.TikvClient,
keys [][]byte,
isRawKv bool,
) (*kvrpcpb.SplitRegionResponse, error) {
failpoint.Inject("not-leader-error", func(injectNewLeader failpoint.Value) {
log.Debug("failpoint not-leader-error injected.")
Expand Down Expand Up @@ -285,6 +289,7 @@ func splitRegionWithFailpoint(
Peer: peer,
},
SplitKeys: keys,
IsRawKv: isRawKv,
})
}

Expand Down Expand Up @@ -320,7 +325,7 @@ func (c *pdClient) sendSplitRegionRequest(
}
defer conn.Close()
client := tikvpb.NewTikvClient(conn)
resp, err := splitRegionWithFailpoint(ctx, regionInfo, peer, client, keys)
resp, err := splitRegionWithFailpoint(ctx, regionInfo, peer, client, keys, c.isRawKv)
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
Expand Down
49 changes: 29 additions & 20 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func TestScatterFinishInTime(t *testing.T) {
regionSplitter := restore.NewRegionSplitter(client)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {})
err := regionSplitter.Split(ctx, ranges, rewriteRules, false, func(key [][]byte) {}) // TODO: add test case for "isRawKV=true"
require.NoError(t, err)
regions := client.GetAllRegions()
if !validateRegions(regions) {
Expand Down Expand Up @@ -330,7 +330,7 @@ func runTestSplitAndScatterWith(t *testing.T, client *TestClient) {
regionSplitter := restore.NewRegionSplitter(client)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {})
err := regionSplitter.Split(ctx, ranges, rewriteRules, false, func(key [][]byte) {}) // TODO: add test case for "isRawKV=true"
require.NoError(t, err)
regions := client.GetAllRegions()
if !validateRegions(regions) {
Expand Down Expand Up @@ -465,26 +465,35 @@ FindRegion:
}

func TestNeedSplit(t *testing.T) {
regions := []*restore.RegionInfo{
{
Region: &metapb.Region{
StartKey: codec.EncodeBytes([]byte{}, []byte("b")),
EndKey: codec.EncodeBytes([]byte{}, []byte("d")),
for _, isRawKv := range []bool{false, true} {
encode := func(in []byte) []byte {
if isRawKv {
return in
}
return codec.EncodeBytes([]byte{}, in)
}

regions := []*restore.RegionInfo{
{
Region: &metapb.Region{
StartKey: encode([]byte("b")),
EndKey: encode([]byte("d")),
},
},
},
}
// Out of region
require.Nil(t, restore.NeedSplit([]byte("a"), regions, isRawKv))
// Region start key
require.Nil(t, restore.NeedSplit([]byte("b"), regions, isRawKv))
// In region
region := restore.NeedSplit([]byte("c"), regions, isRawKv)
require.Equal(t, 0, bytes.Compare(region.Region.GetStartKey(), encode([]byte("b"))))
require.Equal(t, 0, bytes.Compare(region.Region.GetEndKey(), encode([]byte("d"))))
// Region end key
require.Nil(t, restore.NeedSplit([]byte("d"), regions, isRawKv))
// Out of region
require.Nil(t, restore.NeedSplit([]byte("e"), regions, isRawKv))
}
// Out of region
require.Nil(t, restore.NeedSplit([]byte("a"), regions))
// Region start key
require.Nil(t, restore.NeedSplit([]byte("b"), regions))
// In region
region := restore.NeedSplit([]byte("c"), regions)
require.Equal(t, 0, bytes.Compare(region.Region.GetStartKey(), codec.EncodeBytes([]byte{}, []byte("b"))))
require.Equal(t, 0, bytes.Compare(region.Region.GetEndKey(), codec.EncodeBytes([]byte{}, []byte("d"))))
// Region end key
require.Nil(t, restore.NeedSplit([]byte("d"), regions))
// Out of region
require.Nil(t, restore.NeedSplit([]byte("e"), regions))
}

func TestRegionConsistency(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,11 @@ func SplitRanges(
ranges []rtree.Range,
rewriteRules *RewriteRules,
updateCh glue.Progress,
isRawKv bool,
) error {
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig()))
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig(), isRawKv))

return splitter.Split(ctx, ranges, rewriteRules, func(keys [][]byte) {
return splitter.Split(ctx, ranges, rewriteRules, isRawKv, func(keys [][]byte) {
for range keys {
updateCh.Inc()
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/task/restore_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
// sometimes we have pooled the connections.
// sending heartbeats in idle times is useful.
keepaliveCfg.PermitWithoutStream = true
client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetStorage(), mgr.GetTLSConfig(), keepaliveCfg)
client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetStorage(), mgr.GetTLSConfig(), keepaliveCfg, true)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR

// RawKV restore does not need to rewrite keys.
rewrite := &restore.RewriteRules{}
err = restore.SplitRanges(ctx, client, ranges, rewrite, updateCh)
err = restore.SplitRanges(ctx, client, ranges, rewrite, updateCh, true)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 7bd1b08

Please sign in to comment.