Skip to content

Commit

Permalink
br: less split during restoring (pingcap#27240)
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen authored Sep 1, 2021
1 parent 6dc08e0 commit 95393fe
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 53 deletions.
18 changes: 7 additions & 11 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/pkg/codec"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -451,18 +450,14 @@ func (importer *FileImporter) downloadSST(
) (*import_sstpb.SSTMeta, error) {
uid := uuid.New()
id := uid[:]
// Assume one region reflects to one rewrite rule
_, key, err := codec.DecodeBytes(regionInfo.Region.GetStartKey())
if err != nil {
return nil, errors.Trace(err)
}
regionRule := matchNewPrefix(key, rewriteRules)
if regionRule == nil {
return nil, errors.Annotate(berrors.ErrKVRewriteRuleNotFound, "failed to find rewrite rule.")
// Get the rewrite rule for the file.
fileRule := findMatchedRewriteRule(file, rewriteRules)
if fileRule == nil {
return nil, errors.Trace(berrors.ErrKVRewriteRuleNotFound)
}
rule := import_sstpb.RewriteRule{
OldKeyPrefix: encodeKeyPrefix(regionRule.GetOldKeyPrefix()),
NewKeyPrefix: encodeKeyPrefix(regionRule.GetNewKeyPrefix()),
OldKeyPrefix: encodeKeyPrefix(fileRule.GetOldKeyPrefix()),
NewKeyPrefix: encodeKeyPrefix(fileRule.GetNewKeyPrefix()),
}
sstMeta := GetSSTMetaFromFile(id, file, regionInfo.Region, &rule)

Expand All @@ -479,6 +474,7 @@ func (importer *FileImporter) downloadSST(
)
var resp *import_sstpb.DownloadResponse
for _, peer := range regionInfo.Region.GetPeers() {
var err error
resp, err = importer.importClient.DownloadSST(ctx, peer.GetStoreId(), req)
if err != nil {
return nil, errors.Trace(err)
Expand Down
30 changes: 18 additions & 12 deletions pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,6 @@ func (rs *RegionSplitter) Split(
}
minKey := codec.EncodeBytes(sortedRanges[0].StartKey)
maxKey := codec.EncodeBytes(sortedRanges[len(sortedRanges)-1].EndKey)
for _, rule := range rewriteRules.Data {
if bytes.Compare(minKey, rule.GetNewKeyPrefix()) > 0 {
minKey = rule.GetNewKeyPrefix()
}
if bytes.Compare(maxKey, rule.GetNewKeyPrefix()) < 0 {
maxKey = rule.GetNewKeyPrefix()
}
}
interval := SplitRetryInterval
scatterRegions := make([]*RegionInfo, 0)
SplitRegions:
Expand Down Expand Up @@ -262,10 +254,27 @@ func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo *
func (rs *RegionSplitter) splitAndScatterRegions(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) ([]*RegionInfo, error) {
if len(keys) == 0 {
return []*RegionInfo{regionInfo}, nil
}

newRegions, err := rs.client.BatchSplitRegions(ctx, regionInfo, keys)
if err != nil {
return nil, errors.Trace(err)
}
// There would be some regions be scattered twice, e.g.:
// |--1-|--2-+----|-3--|
// | +(t1)|
// +(t1_r4) |
// +(t2_r42)
// When spliting at `t1_r4`, we would scatter region 1, 2.
// When spliting at `t2_r42`, we would scatter region 2, 3.
// Because we don't split at t1 anymore.
// The trick here is a pinky promise: never scatter regions you haven't imported any data.
// In this scenario, it is the last region after spliting (applying to >= 5.0).
if bytes.Equal(newRegions[len(newRegions)-1].Region.StartKey, keys[len(keys)-1]) {
newRegions = newRegions[:len(newRegions)-1]
}
rs.ScatterRegions(ctx, newRegions)
return newRegions, nil
}
Expand Down Expand Up @@ -383,14 +392,11 @@ func (b *scanRegionBackoffer) Attempt() int {
return b.attempt
}

// getSplitKeys checks if the regions should be split by the new prefix of the rewrites rule and the end key of
// getSplitKeys checks if the regions should be split by the end key of
// the ranges, groups the split keys by region id.
func getSplitKeys(rewriteRules *RewriteRules, ranges []rtree.Range, regions []*RegionInfo) map[uint64][][]byte {
splitKeyMap := make(map[uint64][][]byte)
checkKeys := make([][]byte, 0)
for _, rule := range rewriteRules.Data {
checkKeys = append(checkKeys, rule.GetNewKeyPrefix())
}
for _, rg := range ranges {
checkKeys = append(checkKeys, rg.EndKey)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ func (c *TestClient) checkScatter(check *C) {
// range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj)
// rewrite rules: aa -> xx, cc -> bb
// expected regions after split:
// [, aay), [aay, bb), [bb, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xx), [xx, xxe), [xxe, xxz), [xxz, )
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
func (s *testRangeSuite) TestSplitAndScatter(c *C) {
client := initTestClient()
ranges := initRanges()
Expand Down Expand Up @@ -320,11 +320,11 @@ func initRewriteRules() *restore.RewriteRules {
}

// expected regions after split:
// [, aay), [aay, bb), [bb, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xx), [xx, xxe), [xxe, xxz), [xxz, )
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
func validateRegions(regions map[uint64]*restore.RegionInfo) bool {
keys := [12]string{"", "aay", "bb", "bba", "bbf", "bbh", "bbj", "cca", "xx", "xxe", "xxz", ""}
if len(regions) != 11 {
keys := [...]string{"", "aay", "bba", "bbf", "bbh", "bbj", "cca", "xxe", "xxz", ""}
if len(regions) != len(keys)-1 {
return false
}
FindRegion:
Expand Down
39 changes: 17 additions & 22 deletions pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,9 @@ func ValidateFileRewriteRule(file *backuppb.File, rewriteRules *RewriteRules) er
)
return errors.Annotate(berrors.ErrRestoreInvalidRewrite, "cannot find rewrite rule")
}
// the new prefix of the start rule must equal or less than the new prefix of the end rule
if bytes.Compare(startRule.GetNewKeyPrefix(), endRule.GetNewKeyPrefix()) > 0 {
// the rewrite rule of the start key and the end key should be equaled.
// i.e. there should only one rewrite rule for one file, a file should only be imported into one region.
if !bytes.Equal(startRule.GetNewKeyPrefix(), endRule.GetNewKeyPrefix()) {
startTableID := tablecodec.DecodeTableID(file.GetStartKey())
endTableID := tablecodec.DecodeTableID(file.GetEndKey())
log.Error(
Expand All @@ -299,17 +300,10 @@ func ValidateFileRewriteRule(file *backuppb.File, rewriteRules *RewriteRules) er
zap.Stringer("endRule", endRule),
logutil.File(file),
)
return errors.Annotate(berrors.ErrRestoreInvalidRewrite, "unexpected rewrite rules")
}

startID := tablecodec.DecodeTableID(file.GetStartKey())
endID := tablecodec.DecodeTableID(file.GetEndKey())
if startID != endID {
log.Error("table ids mismatch",
zap.Int64("startID", startID),
zap.Int64("endID", endID),
logutil.File(file))
return errors.Annotate(berrors.ErrRestoreTableIDMismatch, "file start_key end_key table ids mismatch")
return errors.Annotatef(berrors.ErrRestoreInvalidRewrite,
"rewrite rule mismatch, the backup data may be dirty or from incompatible versions of BR, startKey rule: %X => %X, endKey rule: %X => %X",
startRule.OldKeyPrefix, startRule.NewKeyPrefix, endRule.OldKeyPrefix, endRule.NewKeyPrefix,
)
}
return nil
}
Expand All @@ -336,15 +330,6 @@ func matchOldPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.Rewrit
return nil
}

func matchNewPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.RewriteRule {
for _, rule := range rewriteRules.Data {
if bytes.HasPrefix(key, rule.GetNewKeyPrefix()) {
return rule
}
}
return nil
}

func truncateTS(key []byte) []byte {
if len(key) == 0 {
return nil
Expand Down Expand Up @@ -376,6 +361,16 @@ func SplitRanges(
})
}

func findMatchedRewriteRule(file *backuppb.File, rules *RewriteRules) *import_sstpb.RewriteRule {
startID := tablecodec.DecodeTableID(file.GetStartKey())
endID := tablecodec.DecodeTableID(file.GetEndKey())
if startID != endID {
return nil
}
_, rule := rewriteRawKey(file.StartKey, rules)
return rule
}

func rewriteFileKeys(file *backuppb.File, rewriteRules *RewriteRules) (startKey, endKey []byte, err error) {
startID := tablecodec.DecodeTableID(file.GetStartKey())
endID := tablecodec.DecodeTableID(file.GetEndKey())
Expand Down
4 changes: 2 additions & 2 deletions pkg/restore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (s *testRestoreUtilSuite) TestValidateFileRewriteRule(c *C) {
},
rules,
)
c.Assert(err, ErrorMatches, ".*restore table ID mismatch")
c.Assert(err, ErrorMatches, ".*rewrite rule mismatch.*")

// Add a bad rule for end key, after rewrite start key > end key.
rules.Data = append(rules.Data[:1], &import_sstpb.RewriteRule{
Expand All @@ -163,7 +163,7 @@ func (s *testRestoreUtilSuite) TestValidateFileRewriteRule(c *C) {
},
rules,
)
c.Assert(err, ErrorMatches, ".*unexpected rewrite rules.*")
c.Assert(err, ErrorMatches, ".*rewrite rule mismatch.*")
}

func (s *testRestoreUtilSuite) TestPaginateScanRegion(c *C) {
Expand Down

0 comments on commit 95393fe

Please sign in to comment.