Skip to content

Commit

Permalink
restore: merge tidb-tools/pkg/restore-util (pingcap#146)
Browse files Browse the repository at this point in the history
* restore-util: Implement split/scatter (pingcap#274)

* implement split/scatter

Signed-off-by: 5kbpers <[email protected]>

* init test

Signed-off-by: 5kbpers <[email protected]>

* redesign output/input of the lib

Signed-off-by: 5kbpers <[email protected]>

* update dependency

Signed-off-by: 5kbpers <[email protected]>

* add commments and more tests

Signed-off-by: 5kbpers <[email protected]>

* add ScanRegions interface to Client

Signed-off-by: 5kbpers <[email protected]>

* fix potential data race

Signed-off-by: 5kbpers <[email protected]>

* address comments

Signed-off-by: 5kbpers <[email protected]>

* address comments

Signed-off-by: 5kbpers <[email protected]>

* Apply suggestions from code review

Co-Authored-By: kennytm <[email protected]>

* Update pkg/restore-util/client.go

Co-Authored-By: kennytm <[email protected]>

* address comments

Signed-off-by: 5kbpers <[email protected]>

* address comments

Signed-off-by: 5kbpers <[email protected]>

* address comments

Signed-off-by: 5kbpers <[email protected]>

* update dependency

Signed-off-by: 5kbpers <[email protected]>

* resolve conflicts

Signed-off-by: 5kbpers <[email protected]>

* fix prefix rewrite

Signed-off-by: 5kbpers <[email protected]>

* add RewriteRule/skip failed scatter region/retry the SplitRegion

Signed-off-by: 5kbpers <[email protected]>

* fix test

Signed-off-by: 5kbpers <[email protected]>

* check if region has peer

Signed-off-by: 5kbpers <[email protected]>

* more logs

Signed-off-by: 5kbpers <[email protected]>

* restore-util: add split retry interval (pingcap#277)

* reset dependencies to release-3.1

* add split retry interval

Signed-off-by: 5kbpers <[email protected]>

* fix go.sum

Signed-off-by: 5kbpers <[email protected]>

* restore-util: wait for scatter region sequentially  (pingcap#279)

* wait for scatter region sequentially

Signed-off-by: 5kbpers <[email protected]>

* address comments

Signed-off-by: 5kbpers <[email protected]>

* restore-util: add on split hook (pingcap#281)

* restore-util: add on split hook

Signed-off-by: Neil Shen <[email protected]>

* Nil check onSplit

Co-Authored-By: kennytm <[email protected]>

* restore-util: fix returned new region is nil (pingcap#283)

* restore-util: fix returned new region is nil

Signed-off-by: 5kbpers <[email protected]>

* more logs

Signed-off-by: 5kbpers <[email protected]>

* *: gofmt

Signed-off-by: 5kbpers <[email protected]>

* Apply suggestions from code review

Co-Authored-By: kennytm <[email protected]>

* fix log

Signed-off-by: 5kbpers <[email protected]>

* restore-util: call onSplit on splitByRewriteRules (pingcap#285)

Signed-off-by: Neil Shen <[email protected]>

* restore-util: fix overlapped error message (pingcap#293)

* restore-util: fix overlapped error message

Signed-off-by: 5kbpers <[email protected]>

* fix log message

Signed-off-by: 5kbpers <[email protected]>

* reduce error trace

Signed-off-by: 5kbpers <[email protected]>

* fix test

Signed-off-by: 5kbpers <[email protected]>

* address comments

Signed-off-by: 5kbpers <[email protected]>

* address comments

Signed-off-by: 5kbpers <[email protected]>

* restore-util: log warning when cannot find matched rewrite rule (pingcap#299)

* restore-util: add method to set placement rules and store labels (pingcap#301)

* restore-util: add method to set placement rules and store labels

Signed-off-by: disksing <[email protected]>

* minor fix

Signed-off-by: disksing <[email protected]>

* address comment

Signed-off-by: disksing <[email protected]>

* add GetPlacementRules

Signed-off-by: disksing <[email protected]>

* fix test

Signed-off-by: disksing <[email protected]>

* restore-util: support batch split (pingcap#300)

* restore-util: support batch split

Signed-off-by: 5kbpers <[email protected]>

* go fmt

Signed-off-by: 5kbpers <[email protected]>

* Apply suggestions from code review

Co-Authored-By: kennytm <[email protected]>

* address commits

Signed-off-by: 5kbpers <[email protected]>

* Update pkg/restore-util/split.go

Co-Authored-By: kennytm <[email protected]>

* add onSplit callback

Signed-off-by: 5kbpers <[email protected]>

* fix test

Signed-off-by: 5kbpers <[email protected]>

* address comments

Signed-off-by: 5kbpers <[email protected]>

* restore-util: add upper bound time for waiting for scatter (pingcap#305)

* restore: fix scatter regions failed

Signed-off-by: 5kbpers <[email protected]>

* add log

Signed-off-by: 5kbpers <[email protected]>

* stop waiting for scatter after 3min

Signed-off-by: 5kbpers <[email protected]>

* address comments

Signed-off-by: 5kbpers <[email protected]>

* restore-util: fix wrong url (pingcap#306)

Signed-off-by: disksing <[email protected]>

* restore-util: add warning about unmatched table id (pingcap#313)

* restore-util: support table partition

Signed-off-by: 5kbpers <[email protected]>

* fix log

Signed-off-by: 5kbpers <[email protected]>

* warn table id does not match

Signed-off-by: 5kbpers <[email protected]>

* add unit tests

Signed-off-by: 5kbpers <[email protected]>

* Apply suggestions from code review

Co-Authored-By: Neil Shen <[email protected]>

* fix compile error

Signed-off-by: 5kbpers <[email protected]>

* address comments

Signed-off-by: 5kbpers <[email protected]>

* address comments

Signed-off-by: 5kbpers <[email protected]>

* fix test

Signed-off-by: 5kbpers <[email protected]>

Co-authored-by: Ian <[email protected]>
Co-authored-by: Neil Shen <[email protected]>

* *: prune tidb-tools

Signed-off-by: Neil Shen <[email protected]>

* restore: address linters suggestions

Signed-off-by: Neil Shen <[email protected]>

* restore: merge restoreutil into restore

Signed-off-by: Neil Shen <[email protected]>

* address comment

Signed-off-by: Neil Shen <[email protected]>

Co-authored-by: 5kbpers <[email protected]>
Co-authored-by: kennytm <[email protected]>
Co-authored-by: disksing <[email protected]>
Co-authored-by: Ian <[email protected]>
  • Loading branch information
5 people committed Jan 22, 2020
1 parent 0c1dea4 commit 0a43d2a
Show file tree
Hide file tree
Showing 13 changed files with 1,228 additions and 52 deletions.
7 changes: 4 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ issues:
text: "Potential HTTP request made with variable url"
linters:
- gosec
- path: .go
text: "Use of weak random number generator"
# TODO Remove it.
- path: split_client.go
text: "SA1019:"
linters:
- gosec
- staticcheck
9 changes: 4 additions & 5 deletions cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/pd/pkg/mock/mockid"
restore_util "github.com/pingcap/tidb-tools/pkg/restore-util"
"github.com/spf13/cobra"
"go.uber.org/zap"

Expand Down Expand Up @@ -187,15 +186,15 @@ func newBackupMetaCommand() *cobra.Command {
tables = append(tables, db.Tables...)
}
// Check if the ranges of files overlapped
rangeTree := restore_util.NewRangeTree()
rangeTree := restore.NewRangeTree()
for _, file := range files {
if out := rangeTree.InsertRange(restore_util.Range{
if out := rangeTree.InsertRange(restore.Range{
StartKey: file.GetStartKey(),
EndKey: file.GetEndKey(),
}); out != nil {
log.Error(
"file ranges overlapped",
zap.Stringer("out", out.(*restore_util.Range)),
zap.Stringer("out", out.(*restore.Range)),
zap.Stringer("file", file),
)
}
Expand All @@ -206,7 +205,7 @@ func newBackupMetaCommand() *cobra.Command {
for offset := uint64(0); offset < tableIDOffset; offset++ {
_, _ = tableIDAllocator.Alloc() // Ignore error
}
rewriteRules := &restore_util.RewriteRules{
rewriteRules := &restore.RewriteRules{
Table: make([]*import_sstpb.RewriteRule, 0),
Data: make([]*import_sstpb.RewriteRule, 0),
}
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ require (
github.com/pingcap/parser v0.0.0-20191210060830-bdf23a7ade01
github.com/pingcap/pd v1.1.0-beta.0.20191212045800-234784c7a9c5
github.com/pingcap/tidb v1.1.0-beta.0.20191213040028-9009da737834
github.com/pingcap/tidb-tools v3.1.0-beta.0.20191223064326-e9c7a23a8dcb+incompatible
github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33
github.com/prometheus/client_golang v1.0.0
github.com/sirupsen/logrus v1.4.2
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,8 @@ github.com/pingcap/sysutil v0.0.0-20191126040022-986c5b3ed9a3 h1:HCNif3lukL83gNC
github.com/pingcap/sysutil v0.0.0-20191126040022-986c5b3ed9a3/go.mod h1:Futrrmuw98pEsbEmoPsjw8aKLCmixwHEmT2rF+AsXGw=
github.com/pingcap/tidb v1.1.0-beta.0.20191213040028-9009da737834 h1:eNf7bDY39moIzzcs5+PhLLW0BM2D2yrzFbjW/X42y0s=
github.com/pingcap/tidb v1.1.0-beta.0.20191213040028-9009da737834/go.mod h1:VWx47QOXISBHHtZeWrDQlBOdbvth9TE9gei6QpoqJ4g=
github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible h1:H1jg0aDWz2SLRh3hNBo2HFtnuHtudIUvBumU7syRkic=
github.com/pingcap/tidb-tools v3.0.6-0.20191106033616-90632dda3863+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v3.1.0-beta.0.20191223064326-e9c7a23a8dcb+incompatible h1:GxWxXVqA2aAZIgS+bEpasJkkspu9Jom1/oB2NmP7t/o=
github.com/pingcap/tidb-tools v3.1.0-beta.0.20191223064326-e9c7a23a8dcb+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33 h1:cTSaVv1hue17BCPqt+sURADTFSMpSD26ZuvKRyYIjJs=
github.com/pingcap/tipb v0.0.0-20191209145133-44f75c9bef33/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
13 changes: 6 additions & 7 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
pd "github.com/pingcap/pd/client"
restore_util "github.com/pingcap/tidb-tools/pkg/restore-util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -108,7 +107,7 @@ func (rc *Client) InitBackupMeta(backupMeta *backup.BackupMeta, backend *backup.
rc.databases = databases
rc.backupMeta = backupMeta

metaClient := restore_util.NewClient(rc.pdClient)
metaClient := NewSplitClient(rc.pdClient)
importClient := NewImportClient(metaClient)
rc.fileImporter = NewFileImporter(rc.ctx, metaClient, importClient, backend, rc.rateLimit)
return nil
Expand Down Expand Up @@ -189,8 +188,8 @@ func (rc *Client) CreateTables(
dom *domain.Domain,
tables []*utils.Table,
newTS uint64,
) (*restore_util.RewriteRules, []*model.TableInfo, error) {
rewriteRules := &restore_util.RewriteRules{
) (*RewriteRules, []*model.TableInfo, error) {
rewriteRules := &RewriteRules{
Table: make([]*import_sstpb.RewriteRule, 0),
Data: make([]*import_sstpb.RewriteRule, 0),
}
Expand Down Expand Up @@ -232,7 +231,7 @@ func (rc *Client) setSpeedLimit() error {
// RestoreTable tries to restore the data of a table.
func (rc *Client) RestoreTable(
table *utils.Table,
rewriteRules *restore_util.RewriteRules,
rewriteRules *RewriteRules,
updateCh chan<- struct{},
) (err error) {
start := time.Now()
Expand Down Expand Up @@ -300,7 +299,7 @@ func (rc *Client) RestoreTable(
// RestoreDatabase tries to restore the data of a database
func (rc *Client) RestoreDatabase(
db *utils.Database,
rewriteRules *restore_util.RewriteRules,
rewriteRules *RewriteRules,
updateCh chan<- struct{},
) (err error) {
start := time.Now()
Expand Down Expand Up @@ -336,7 +335,7 @@ func (rc *Client) RestoreDatabase(

// RestoreAll tries to restore all the data of backup files.
func (rc *Client) RestoreAll(
rewriteRules *restore_util.RewriteRules,
rewriteRules *RewriteRules,
updateCh chan<- struct{},
) (err error) {
start := time.Now()
Expand Down
33 changes: 16 additions & 17 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/pd/pkg/codec"
restore_util "github.com/pingcap/tidb-tools/pkg/restore-util"
"go.uber.org/zap"
"google.golang.org/grpc"

Expand Down Expand Up @@ -60,12 +59,12 @@ type ImporterClient interface {

type importClient struct {
mu sync.Mutex
metaClient restore_util.Client
metaClient SplitClient
clients map[uint64]import_sstpb.ImportSSTClient
}

// NewImportClient returns a new ImporterClient
func NewImportClient(metaClient restore_util.Client) ImporterClient {
func NewImportClient(metaClient SplitClient) ImporterClient {
return &importClient{
metaClient: metaClient,
clients: make(map[uint64]import_sstpb.ImportSSTClient),
Expand Down Expand Up @@ -133,7 +132,7 @@ func (ic *importClient) getImportClient(

// FileImporter used to import a file to TiKV.
type FileImporter struct {
metaClient restore_util.Client
metaClient SplitClient
importClient ImporterClient
backend *backup.StorageBackend
rateLimit uint64
Expand All @@ -145,7 +144,7 @@ type FileImporter struct {
// NewFileImporter returns a new file importClient.
func NewFileImporter(
ctx context.Context,
metaClient restore_util.Client,
metaClient SplitClient,
importClient ImporterClient,
backend *backup.StorageBackend,
rateLimit uint64,
Expand All @@ -163,7 +162,7 @@ func NewFileImporter(

// Import tries to import a file.
// All rules must contain encoded keys.
func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_util.RewriteRules) error {
func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRules) error {
log.Debug("import file", zap.Stringer("file", file))
// Rewrite the start key and end key of file to scan regions
startKey, endKey, err := rewriteFileKeys(file, rewriteRules)
Expand All @@ -179,9 +178,9 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_ut
ctx, cancel := context.WithTimeout(importer.ctx, importScanResgionTime)
defer cancel()
// Scan regions covered by the file range
regionInfos, err := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0)
if err != nil {
return errors.Trace(err)
regionInfos, err1 := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0)
if err1 != nil {
return errors.Trace(err1)
}
log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos)))
// Try to download and ingest the file in every region
Expand All @@ -190,20 +189,20 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_ut
info := regionInfo
// Try to download file.
err = withRetry(func() error {
var err error
var err2 error
var isEmpty bool
downloadMeta, isEmpty, err = importer.downloadSST(info, file, rewriteRules)
if err != nil {
downloadMeta, isEmpty, err2 = importer.downloadSST(info, file, rewriteRules)
if err2 != nil {
if err != errRewriteRuleNotFound {
log.Warn("download file failed",
zap.Stringer("file", file),
zap.Stringer("region", info.Region),
zap.Binary("startKey", startKey),
zap.Binary("endKey", endKey),
zap.Error(err),
zap.Error(err2),
)
}
return err
return err2
}
if isEmpty {
log.Info(
Expand Down Expand Up @@ -255,9 +254,9 @@ func (importer *FileImporter) setDownloadSpeedLimit(storeID uint64) error {
}

func (importer *FileImporter) downloadSST(
regionInfo *restore_util.RegionInfo,
regionInfo *RegionInfo,
file *backup.File,
rewriteRules *restore_util.RewriteRules,
rewriteRules *RewriteRules,
) (*import_sstpb.SSTMeta, bool, error) {
id, err := uuid.New().MarshalBinary()
if err != nil {
Expand Down Expand Up @@ -312,7 +311,7 @@ func (importer *FileImporter) downloadSST(

func (importer *FileImporter) ingestSST(
sstMeta *import_sstpb.SSTMeta,
regionInfo *restore_util.RegionInfo,
regionInfo *RegionInfo,
) error {
leader := regionInfo.Leader
if leader == nil {
Expand Down
148 changes: 148 additions & 0 deletions pkg/restore/range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package restore

import (
"bytes"
"fmt"

"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/tablecodec"
"go.uber.org/zap"
)

// Range represents a range of keys.
type Range struct {
StartKey []byte
EndKey []byte
}

// String formats a range to a string
func (r *Range) String() string {
return fmt.Sprintf("[%x %x]", r.StartKey, r.EndKey)
}

// Less compares a range with a btree.Item
func (r *Range) Less(than btree.Item) bool {
t := than.(*Range)
return len(r.EndKey) != 0 && bytes.Compare(r.EndKey, t.StartKey) <= 0
}

// contains returns if a key is included in the range.
func (r *Range) contains(key []byte) bool {
start, end := r.StartKey, r.EndKey
return bytes.Compare(key, start) >= 0 &&
(len(end) == 0 || bytes.Compare(key, end) < 0)
}

// sortRanges checks if the range overlapped and sort them
func sortRanges(ranges []Range, rewriteRules *RewriteRules) ([]Range, error) {
rangeTree := NewRangeTree()
for _, rg := range ranges {
if rewriteRules != nil {
startID := tablecodec.DecodeTableID(rg.StartKey)
endID := tablecodec.DecodeTableID(rg.EndKey)
var rule *import_sstpb.RewriteRule
if startID == endID {
rg.StartKey, rule = replacePrefix(rg.StartKey, rewriteRules)
if rule == nil {
log.Warn("cannot find rewrite rule", zap.Binary("key", rg.StartKey))
} else {
log.Debug(
"rewrite start key",
zap.Binary("key", rg.StartKey),
zap.Stringer("rule", rule))
}
rg.EndKey, rule = replacePrefix(rg.EndKey, rewriteRules)
if rule == nil {
log.Warn("cannot find rewrite rule", zap.Binary("key", rg.EndKey))
} else {
log.Debug(
"rewrite end key",
zap.Binary("key", rg.EndKey),
zap.Stringer("rule", rule))
}
} else {
log.Warn("table id does not match",
zap.Binary("startKey", rg.StartKey),
zap.Binary("endKey", rg.EndKey),
zap.Int64("startID", startID),
zap.Int64("endID", endID))
return nil, errors.New("table id does not match")
}
}
if out := rangeTree.InsertRange(rg); out != nil {
return nil, errors.Errorf("ranges overlapped: %s, %s", out, rg)
}
}
sortedRanges := make([]Range, 0, len(ranges))
rangeTree.Ascend(func(rg *Range) bool {
if rg == nil {
return false
}
sortedRanges = append(sortedRanges, *rg)
return true
})
return sortedRanges, nil
}

// RangeTree stores the ranges in an orderly manner.
// All the ranges it stored do not overlap.
type RangeTree struct {
tree *btree.BTree
}

// NewRangeTree returns a new RangeTree.
func NewRangeTree() *RangeTree {
return &RangeTree{tree: btree.New(32)}
}

// Find returns nil or a range in the range tree
func (rt *RangeTree) Find(key []byte) *Range {
var ret *Range
r := &Range{
StartKey: key,
}
rt.tree.DescendLessOrEqual(r, func(i btree.Item) bool {
ret = i.(*Range)
return false
})
if ret == nil || !ret.contains(key) {
return nil
}
return ret
}

// InsertRange inserts ranges into the range tree.
// it returns true if all ranges inserted successfully.
// it returns false if there are some overlapped ranges.
func (rt *RangeTree) InsertRange(rg Range) btree.Item {
return rt.tree.ReplaceOrInsert(&rg)
}

// RangeIterator allows callers of Ascend to iterate in-order over portions of
// the tree. When this function returns false, iteration will stop and the
// associated Ascend function will immediately return.
type RangeIterator func(rg *Range) bool

// Ascend calls the iterator for every value in the tree within [first, last],
// until the iterator returns false.
func (rt *RangeTree) Ascend(iterator RangeIterator) {
rt.tree.Ascend(func(i btree.Item) bool {
return iterator(i.(*Range))
})
}

// RegionInfo includes a region and the leader of the region.
type RegionInfo struct {
Region *metapb.Region
Leader *metapb.Peer
}

// RewriteRules contains rules for rewriting keys of tables.
type RewriteRules struct {
Table []*import_sstpb.RewriteRule
Data []*import_sstpb.RewriteRule
}
Loading

0 comments on commit 0a43d2a

Please sign in to comment.