Skip to content

Commit

Permalink
restore: skip un-restored tables validating checksum (pingcap#48)
Browse files Browse the repository at this point in the history
* restore: skip un-restored tables validating checksum

Signed-off-by: 5kbpers <[email protected]>

* address lint

Signed-off-by: 5kbpers <[email protected]>
  • Loading branch information
5kbpers authored and 3pointer committed Nov 11, 2019
1 parent c20df98 commit f050d86
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 39 deletions.
18 changes: 12 additions & 6 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/parser/model"
restore_util "github.com/pingcap/tidb-tools/pkg/restore-util"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
Expand Down Expand Up @@ -59,21 +60,26 @@ func newFullRestoreCommand() *cobra.Command {
tableRules := make([]*import_sstpb.RewriteRule, 0)
dataRules := make([]*import_sstpb.RewriteRule, 0)
files := make([]*backup.File, 0)
tables := make([]*utils.Table, 0)
newTables := make([]*model.TableInfo, 0)
for _, db := range client.GetDatabases() {
err = restore.CreateDatabase(db.Schema, client.GetDbDSN())
if err != nil {
return errors.Trace(err)
}
var rules *restore_util.RewriteRules
rules, err = client.CreateTables(db.Tables)
var nt []*model.TableInfo
rules, nt, err = client.CreateTables(db.Tables)
if err != nil {
return errors.Trace(err)
}
newTables = append(newTables, nt...)
tableRules = append(tableRules, rules.Table...)
dataRules = append(dataRules, rules.Data...)
for _, table := range db.Tables {
files = append(files, table.Files...)
}
tables = append(tables, db.Tables...)
}
ranges := restore.GetRanges(files)

Expand Down Expand Up @@ -113,7 +119,7 @@ func newFullRestoreCommand() *cobra.Command {
if err != nil {
return errors.Trace(err)
}
err = client.ValidateChecksum(rewriteRules.Table)
err = client.ValidateChecksum(tables, newTables)
return errors.Trace(err)
},
}
Expand Down Expand Up @@ -162,7 +168,7 @@ func newDbRestoreCommand() *cobra.Command {
return errors.Trace(err)
}

rewriteRules, err := client.CreateTables(db.Tables)
rewriteRules, newTables, err := client.CreateTables(db.Tables)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -204,7 +210,7 @@ func newDbRestoreCommand() *cobra.Command {
if err != nil {
return errors.Trace(err)
}
err = client.ValidateChecksum(rewriteRules.Table)
err = client.ValidateChecksum(db.Tables, newTables)
return errors.Trace(err)
},
}
Expand Down Expand Up @@ -267,7 +273,7 @@ func newTableRestoreCommand() *cobra.Command {
return errors.New("not exists table")
}
// The rules here is raw key.
rewriteRules, err := client.CreateTables([]*utils.Table{table})
rewriteRules, newTables, err := client.CreateTables([]*utils.Table{table})
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -302,7 +308,7 @@ func newTableRestoreCommand() *cobra.Command {
if err != nil {
return errors.Trace(err)
}
err = client.ValidateChecksum(rewriteRules.Table)
err = client.ValidateChecksum([]*utils.Table{table}, newTables)
return errors.Trace(err)
},
}
Expand Down
50 changes: 17 additions & 33 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,12 @@ func (rc *Client) GetTableSchema(dbName model.CIStr, tableName model.CIStr) (*mo
}

// CreateTables creates multiple tables, and returns their rewrite rules.
func (rc *Client) CreateTables(tables []*utils.Table) (*restore_util.RewriteRules, error) {
func (rc *Client) CreateTables(tables []*utils.Table) (*restore_util.RewriteRules, []*model.TableInfo, error) {
rewriteRules := &restore_util.RewriteRules{
Table: make([]*import_sstpb.RewriteRule, 0),
Data: make([]*import_sstpb.RewriteRule, 0),
}
newTables := make([]*model.TableInfo, 0, len(tables))
openDBs := make(map[string]*sql.DB)
defer func() {
for _, db := range openDBs {
Expand All @@ -227,27 +228,28 @@ func (rc *Client) CreateTables(tables []*utils.Table) (*restore_util.RewriteRule
if !ok {
db, err = OpenDatabase(table.Db.Name.String(), rc.dbDSN)
if err != nil {
return nil, err
return nil, nil, err
}
openDBs[table.Db.Name.String()] = db
}
err = CreateTable(db, table)
if err != nil {
return nil, err
return nil, nil, err
}
err = AlterAutoIncID(db, table)
if err != nil {
return nil, err
return nil, nil, err
}
newTableInfo, err := rc.GetTableSchema(table.Db.Name, table.Schema.Name)
if err != nil {
return nil, err
return nil, nil, err
}
rules := GetRewriteRules(newTableInfo, table.Schema)
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
newTables = append(newTables, newTableInfo)
}
return rewriteRules, nil
return rewriteRules, newTables, nil
}

// RestoreTable tries to restore the data of a table.
Expand Down Expand Up @@ -434,25 +436,21 @@ func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMo
}

//ValidateChecksum validate checksum after restore
func (rc *Client) ValidateChecksum(rewriteRules []*import_sstpb.RewriteRule) error {
func (rc *Client) ValidateChecksum(tables []*utils.Table, newTables []*model.TableInfo) error {
start := time.Now()
defer func() {
elapsed := time.Since(start)
log.Info("Restore Checksum", zap.Duration("take", elapsed))
}()

// Assume one database one table.
tables := make([]*utils.Table, 0, len(rc.databases))
for _, db := range rc.databases {
tables = append(tables, db.Tables...)
}
wg := sync.WaitGroup{}
errCh := make(chan error, len(tables))
for _, table := range tables {
rule := getTableRewriteRule(table.Schema.ID, rewriteRules)
newTableID := tablecodec.DecodeTableID(rule.GetNewPrefix())
if newTableID == 0 || rule == nil {
return errors.Errorf("failed to get rewrite rule for %v", table.Schema.ID)
for i, t := range tables {
table := t
newTable := newTables[i]
rule := &tipb.ChecksumRewriteRule{
OldPrefix: tablecodec.EncodeTablePrefix(table.Schema.ID),
NewPrefix: tablecodec.EncodeTablePrefix(newTable.ID),
}
checksumReq := tipb.ChecksumRequest{
StartTs: rc.backupMeta.GetEndVersion(),
Expand All @@ -466,11 +464,10 @@ func (rc *Client) ValidateChecksum(rewriteRules []*import_sstpb.RewriteRule) err
}

wg.Add(1)
table := table
rc.workerPool.Apply(func() {
defer wg.Done()
tableStart := tablecodec.EncodeTablePrefix(newTableID)
tableEnd := tablecodec.EncodeTablePrefix(newTableID + 1)
tableStart := tablecodec.EncodeTablePrefix(newTable.ID)
tableEnd := tablecodec.EncodeTablePrefix(newTable.ID + 1)
resp, err := rc.checksumRange(0, tableStart, tableEnd, data)
if err != nil {
errCh <- err
Expand Down Expand Up @@ -646,19 +643,6 @@ func (rc *Client) checksumRegion(
return checksum, nil
}

func getTableRewriteRule(tid int64, rules []*import_sstpb.RewriteRule) *tipb.ChecksumRewriteRule {
for _, r := range rules {
tableID := tablecodec.DecodeTableID(r.GetOldKeyPrefix())
if tableID == tid {
return &tipb.ChecksumRewriteRule{
OldPrefix: r.GetOldKeyPrefix(),
NewPrefix: r.GetNewKeyPrefix(),
}
}
}
return nil
}

// get intersect key range of [start, end] and [region.StartKey, region.EndKey]
func getIntersectRange(
start *[]byte,
Expand Down

0 comments on commit f050d86

Please sign in to comment.