Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
Merge branch 'master' into release-3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus committed Jan 7, 2020
2 parents bbe089d + 660ea64 commit 5abe20f
Show file tree
Hide file tree
Showing 15 changed files with 332 additions and 196 deletions.
1 change: 1 addition & 0 deletions cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func defineBackupFlags(flagSet *pflag.FlagSet) {
flagSet.BoolP(flagBackupChecksum, "", true,
"Run checksum after backup")
flagSet.Uint64P(flagLastBackupTS, "", 0, "the last time backup ts")
_ = flagSet.MarkHidden(flagLastBackupTS)
}

func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error {
Expand Down
4 changes: 3 additions & 1 deletion cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func NewRestoreCommand() *cobra.Command {
"Run checksum after restore")
command.PersistentFlags().BoolP("online", "", false,
"Whether online when restore")
// TODO remove hidden flag if it's stable
_ = command.PersistentFlags().MarkHidden("online")

return command
}
Expand Down Expand Up @@ -126,7 +128,7 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error
}
table := db.GetTable(tableName)
files = table.Files
tables = utils.Tables{table}
tables = append(tables, table)
default:
return errors.New("must set db when table was set")
}
Expand Down
13 changes: 1 addition & 12 deletions cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"sort"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
Expand All @@ -17,7 +16,6 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/pd/pkg/mock/mockid"
restore_util "github.com/pingcap/tidb-tools/pkg/restore-util"
"github.com/pingcap/tidb/tablecodec"
"github.com/spf13/cobra"
"go.uber.org/zap"

Expand All @@ -44,6 +42,7 @@ func NewValidateCommand() *cobra.Command {
meta.AddCommand(newBackupMetaCommand())
meta.AddCommand(decodeBackupMetaCommand())
meta.AddCommand(encodeBackupMetaCommand())
meta.Hidden = true

return meta
}
Expand Down Expand Up @@ -202,7 +201,6 @@ func newBackupMetaCommand() *cobra.Command {
}
}

sort.Sort(utils.Tables(tables))
tableIDAllocator := mockid.NewIDAllocator()
// Advance table ID allocator to the offset.
for offset := uint64(0); offset < tableIDOffset; offset++ {
Expand All @@ -228,20 +226,11 @@ func newBackupMetaCommand() *cobra.Command {
Name: indexInfo.Name,
}
}
// TODO: support table partition
rules := restore.GetRewriteRules(newTable, table.Schema, 0)
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
tableIDMap[table.Schema.ID] = int64(tableID)
}
for oldID, newID := range tableIDMap {
if _, ok := tableIDMap[oldID+1]; !ok {
rewriteRules.Table = append(rewriteRules.Table, &import_sstpb.RewriteRule{
OldKeyPrefix: tablecodec.EncodeTablePrefix(oldID + 1),
NewKeyPrefix: tablecodec.EncodeTablePrefix(newID + 1),
})
}
}
// Validate rewrite rules
for _, file := range files {
err = restore.ValidateFileRewriteRule(file, rewriteRules)
Expand Down
24 changes: 18 additions & 6 deletions pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,26 @@ func buildChecksumRequest(
}

reqs := make([]*kv.Request, 0, (len(newTable.Indices)+1)*(len(partDefs)+1))
rs, err := buildRequest(newTable, newTable.ID, oldTable, startTS)
var oldTableID int64
if oldTable != nil {
oldTableID = oldTable.Schema.ID
}
rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS)
if err != nil {
return nil, err
}
reqs = append(reqs, rs...)

for _, partDef := range partDefs {
rs, err := buildRequest(newTable, partDef.ID, oldTable, startTS)
var oldPartID int64
if oldTable != nil {
for _, oldPartDef := range oldTable.Schema.Partition.Definitions {
if oldPartDef.Name == partDef.Name {
oldPartID = oldPartDef.ID
}
}
}
rs, err := buildRequest(newTable, partDef.ID, oldTable, oldPartID, startTS)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -80,10 +92,11 @@ func buildRequest(
tableInfo *model.TableInfo,
tableID int64,
oldTable *utils.Table,
oldTableID int64,
startTS uint64,
) ([]*kv.Request, error) {
reqs := make([]*kv.Request, 0)
req, err := buildTableRequest(tableID, oldTable, startTS)
req, err := buildTableRequest(tableID, oldTable, oldTableID, startTS)
if err != nil {
return nil, err
}
Expand All @@ -93,13 +106,11 @@ func buildRequest(
if indexInfo.State != model.StatePublic {
continue
}
var oldTableID int64
var oldIndexInfo *model.IndexInfo
if oldTable != nil {
for _, oldIndex := range oldTable.Schema.Indices {
if oldIndex.Name == indexInfo.Name {
oldIndexInfo = oldIndex
oldTableID = oldTable.Schema.ID
break
}
}
Expand All @@ -124,12 +135,13 @@ func buildRequest(
func buildTableRequest(
tableID int64,
oldTable *utils.Table,
oldTableID int64,
startTS uint64,
) (*kv.Request, error) {
var rule *tipb.ChecksumRewriteRule
if oldTable != nil {
rule = &tipb.ChecksumRewriteRule{
OldPrefix: tablecodec.GenTableRecordPrefix(oldTable.Schema.ID),
OldPrefix: tablecodec.GenTableRecordPrefix(oldTableID),
NewPrefix: tablecodec.GenTableRecordPrefix(tableID),
}
}
Expand Down
23 changes: 1 addition & 22 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math"
"sort"
"sync"
"time"

Expand All @@ -17,7 +16,6 @@ import (
restore_util "github.com/pingcap/tidb-tools/pkg/restore-util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/tablecodec"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -199,11 +197,6 @@ func (rc *Client) CreateTables(
Data: make([]*import_sstpb.RewriteRule, 0),
}
newTables := make([]*model.TableInfo, 0, len(tables))
// Sort the tables by id for ensuring the new tables has same id ordering as the old tables.
// We require this constrain since newTableID of tableID+1 must be not bigger
// than newTableID of tableID.
sort.Sort(utils.Tables(tables))
tableIDMap := make(map[int64]int64)
for _, table := range tables {
err := rc.db.CreateTable(rc.ctx, table)
if err != nil {
Expand All @@ -214,21 +207,10 @@ func (rc *Client) CreateTables(
return nil, nil, err
}
rules := GetRewriteRules(newTableInfo, table.Schema, newTS)
tableIDMap[table.Schema.ID] = newTableInfo.ID
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
newTables = append(newTables, newTableInfo)
}
// If tableID + 1 has already exist, then we don't need to add a new rewrite rule for it.
for oldID, newID := range tableIDMap {
if _, ok := tableIDMap[oldID+1]; !ok {
rewriteRules.Table = append(rewriteRules.Table, &import_sstpb.RewriteRule{
OldKeyPrefix: tablecodec.EncodeTablePrefix(oldID + 1),
NewKeyPrefix: tablecodec.EncodeTablePrefix(newID + 1),
NewTimestamp: newTS,
})
}
}
return rewriteRules, newTables, nil
}

Expand Down Expand Up @@ -276,9 +258,6 @@ func (rc *Client) RestoreTable(
errCh := make(chan error, len(table.Files))
wg := new(sync.WaitGroup)
defer close(errCh)
// We should encode the rewrite rewriteRules before using it to import files
encodedRules := encodeRewriteRules(rewriteRules)

err = rc.setSpeedLimit()
if err != nil {
return err
Expand All @@ -293,7 +272,7 @@ func (rc *Client) RestoreTable(
select {
case <-rc.ctx.Done():
errCh <- nil
case errCh <- rc.fileImporter.Import(fileReplica, encodedRules):
case errCh <- rc.fileImporter.Import(fileReplica, rewriteRules):
updateCh <- struct{}{}
}
})
Expand Down
1 change: 0 additions & 1 deletion pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) {

for i := 0; i < len(tables); i++ {
c.Assert(oldTableIDExist[int64(i)], IsTrue, Commentf("table rule does not exist"))
c.Assert(oldTableIDExist[int64(i+1)], IsTrue, Commentf("table rule does not exist"))
}
}

Expand Down
58 changes: 40 additions & 18 deletions pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/pd/pkg/codec"
restore_util "github.com/pingcap/tidb-tools/pkg/restore-util"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -163,25 +164,26 @@ func NewFileImporter(
// Import tries to import a file.
// All rules must contain encoded keys.
func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_util.RewriteRules) error {
log.Debug("import file", zap.Stringer("file", file))
// Rewrite the start key and end key of file to scan regions
scanStartKey, startRule := rewriteRawKeyWithEncodedRules(file.GetStartKey(), rewriteRules)
if startRule == nil {
log.Error("cannot find a rewrite rule for file start key", zap.Stringer("file", file))
return errRewriteRuleNotFound
}
scanEndKey, endRule := rewriteRawKeyWithEncodedRules(file.GetEndKey(), rewriteRules)
if endRule == nil {
log.Error("cannot find a rewrite rule for file end key", zap.Stringer("file", file))
return errRewriteRuleNotFound
startKey, endKey, err := rewriteFileKeys(file, rewriteRules)
if err != nil {
return err
}
err := withRetry(func() error {
log.Debug("rewrite file keys",
zap.Stringer("file", file),
zap.Binary("startKey", startKey),
zap.Binary("endKey", endKey),
)
err = withRetry(func() error {
ctx, cancel := context.WithTimeout(importer.ctx, importScanResgionTime)
defer cancel()
// Scan regions covered by the file range
regionInfos, err := importer.metaClient.ScanRegions(ctx, scanStartKey, scanEndKey, 0)
regionInfos, err := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0)
if err != nil {
return errors.Trace(err)
}
log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos)))
// Try to download and ingest the file in every region
for _, regionInfo := range regionInfos {
var downloadMeta *import_sstpb.SSTMeta
Expand All @@ -196,8 +198,8 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_ut
log.Warn("download file failed",
zap.Stringer("file", file),
zap.Stringer("region", info.Region),
zap.ByteString("scanStartKey", scanStartKey),
zap.ByteString("scanEndKey", scanEndKey),
zap.Binary("startKey", startKey),
zap.Binary("endKey", endKey),
zap.Error(err),
)
}
Expand Down Expand Up @@ -261,19 +263,38 @@ func (importer *FileImporter) downloadSST(
if err != nil {
return nil, true, errors.Trace(err)
}
regionRule := findRegionRewriteRule(regionInfo.Region, rewriteRules)
// Assume one region reflects to one rewrite rule
_, key, err := codec.DecodeBytes(regionInfo.Region.GetStartKey())
if err != nil {
return nil, true, err
}
regionRule := matchNewPrefix(key, rewriteRules)
if regionRule == nil {
log.Debug("cannot find rewrite rule, skip region",
zap.Stringer("region", regionInfo.Region),
zap.Array("tableRule", rules(rewriteRules.Table)),
zap.Array("dataRule", rules(rewriteRules.Data)),
zap.Binary("key", key),
)
return nil, true, errRewriteRuleNotFound
}
sstMeta := getSSTMetaFromFile(id, file, regionInfo.Region, regionRule)
rule := import_sstpb.RewriteRule{
OldKeyPrefix: encodeKeyPrefix(regionRule.GetOldKeyPrefix()),
NewKeyPrefix: encodeKeyPrefix(regionRule.GetNewKeyPrefix()),
}
sstMeta := getSSTMetaFromFile(id, file, regionInfo.Region, &rule)
sstMeta.RegionId = regionInfo.Region.GetId()
sstMeta.RegionEpoch = regionInfo.Region.GetRegionEpoch()
req := &import_sstpb.DownloadRequest{
Sst: sstMeta,
StorageBackend: importer.backend,
Name: file.GetName(),
RewriteRule: *regionRule,
RewriteRule: rule,
}
log.Debug("download SST",
zap.Stringer("sstMeta", &sstMeta),
zap.Stringer("region", regionInfo.Region),
)
var resp *import_sstpb.DownloadResponse
for _, peer := range regionInfo.Region.GetPeers() {
resp, err = importer.importClient.DownloadSST(importer.ctx, peer.GetStoreId(), req)
Expand All @@ -290,7 +311,7 @@ func (importer *FileImporter) downloadSST(
}

func (importer *FileImporter) ingestSST(
fileMeta *import_sstpb.SSTMeta,
sstMeta *import_sstpb.SSTMeta,
regionInfo *restore_util.RegionInfo,
) error {
leader := regionInfo.Leader
Expand All @@ -304,8 +325,9 @@ func (importer *FileImporter) ingestSST(
}
req := &import_sstpb.IngestRequest{
Context: reqCtx,
Sst: fileMeta,
Sst: sstMeta,
}
log.Debug("download SST", zap.Stringer("sstMeta", sstMeta))
resp, err := importer.importClient.IngestSST(importer.ctx, leader.GetStoreId(), req)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 5abe20f

Please sign in to comment.