diff --git a/br/cmd/br/backup.go b/br/cmd/br/backup.go index 0f7b3fc2..e70d00f1 100644 --- a/br/cmd/br/backup.go +++ b/br/cmd/br/backup.go @@ -6,7 +6,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/ddl" - "github.com/pingcap/tidb/session" "github.com/spf13/cobra" "github.com/tikv/migration/br/pkg/gluetikv" "github.com/tikv/migration/br/pkg/summary" @@ -18,31 +17,6 @@ import ( "sourcegraph.com/sourcegraph/appdash" ) -func runBackupCommand(command *cobra.Command, cmdName string) error { - cfg := task.BackupConfig{Config: task.Config{LogProgress: HasLogFile()}} - if err := cfg.ParseFromFlags(command.Flags()); err != nil { - command.SilenceUsage = false - return errors.Trace(err) - } - - ctx := GetDefaultContext() - if cfg.EnableOpenTracing { - var store *appdash.MemoryStore - ctx, store = trace.TracerStartSpan(ctx) - defer trace.TracerFinishSpan(ctx, store) - } - if cfg.IgnoreStats { - // Do not run stat worker in BR. - session.DisableStats4Test() - } - - if err := task.RunBackup(ctx, tidbGlue, cmdName, &cfg); err != nil { - log.Error("failed to backup", zap.Error(err)) - return errors.Trace(err) - } - return nil -} - func runBackupRawCommand(command *cobra.Command, cmdName string) error { cfg := task.RawKvConfig{Config: task.Config{LogProgress: HasLogFile()}} if err := cfg.ParseBackupConfigFromFlags(command.Flags()); err != nil { @@ -67,7 +41,7 @@ func runBackupRawCommand(command *cobra.Command, cmdName string) error { func NewBackupCommand() *cobra.Command { command := &cobra.Command{ Use: "backup", - Short: "backup a TiDB/TiKV cluster", + Short: "backup a TiKV cluster", SilenceUsage: true, PersistentPreRunE: func(c *cobra.Command, args []string) error { if err := Init(c); err != nil { @@ -85,9 +59,6 @@ func NewBackupCommand() *cobra.Command { }, } command.AddCommand( - newFullBackupCommand(), - newDBBackupCommand(), - newTableBackupCommand(), newRawBackupCommand(), ) @@ -95,51 +66,6 @@ func NewBackupCommand() *cobra.Command { return command } -// newFullBackupCommand return a full backup subcommand. -func newFullBackupCommand() *cobra.Command { - command := &cobra.Command{ - Use: "full", - Short: "backup all database", - // prevents incorrect usage like `--checksum false` instead of `--checksum=false`. - // the former, according to pflag parsing rules, means `--checksum=true false`. - Args: cobra.NoArgs, - RunE: func(command *cobra.Command, _ []string) error { - // empty db/table means full backup. - return runBackupCommand(command, "Full backup") - }, - } - task.DefineFilterFlags(command, acceptAllTables) - return command -} - -// newDBBackupCommand return a db backup subcommand. -func newDBBackupCommand() *cobra.Command { - command := &cobra.Command{ - Use: "db", - Short: "backup a database", - Args: cobra.NoArgs, - RunE: func(command *cobra.Command, _ []string) error { - return runBackupCommand(command, "Database backup") - }, - } - task.DefineDatabaseFlags(command) - return command -} - -// newTableBackupCommand return a table backup subcommand. -func newTableBackupCommand() *cobra.Command { - command := &cobra.Command{ - Use: "table", - Short: "backup a table", - Args: cobra.NoArgs, - RunE: func(command *cobra.Command, _ []string) error { - return runBackupCommand(command, "Table backup") - }, - } - task.DefineTableFlags(command) - return command -} - // newRawBackupCommand return a raw kv range backup subcommand. func newRawBackupCommand() *cobra.Command { // TODO: remove experimental tag if it's stable diff --git a/br/cmd/br/cmd.go b/br/cmd/br/cmd.go index 12e414c4..e0831978 100644 --- a/br/cmd/br/cmd.go +++ b/br/cmd/br/cmd.go @@ -30,20 +30,6 @@ var ( hasLogFile uint64 tidbGlue = gluetidb.New() envLogToTermKey = "BR_LOG_TO_TERM" - - filterOutSysAndMemTables = []string{ - "*.*", - fmt.Sprintf("!%s.*", utils.TemporaryDBName("*")), - "!mysql.*", - "!sys.*", - "!INFORMATION_SCHEMA.*", - "!PERFORMANCE_SCHEMA.*", - "!METRICS_SCHEMA.*", - "!INSPECTION_SCHEMA.*", - } - acceptAllTables = []string{ - "*.*", - } ) const ( diff --git a/br/cmd/br/restore.go b/br/cmd/br/restore.go index ef127c93..8b7fb0e0 100644 --- a/br/cmd/br/restore.go +++ b/br/cmd/br/restore.go @@ -17,26 +17,6 @@ import ( "sourcegraph.com/sourcegraph/appdash" ) -func runRestoreCommand(command *cobra.Command, cmdName string) error { - cfg := task.RestoreConfig{Config: task.Config{LogProgress: HasLogFile()}} - if err := cfg.ParseFromFlags(command.Flags()); err != nil { - command.SilenceUsage = false - return errors.Trace(err) - } - - ctx := GetDefaultContext() - if cfg.EnableOpenTracing { - var store *appdash.MemoryStore - ctx, store = trace.TracerStartSpan(ctx) - defer trace.TracerFinishSpan(ctx, store) - } - if err := task.RunRestore(GetDefaultContext(), tidbGlue, cmdName, &cfg); err != nil { - log.Error("failed to restore", zap.Error(err)) - return errors.Trace(err) - } - return nil -} - func runRestoreRawCommand(command *cobra.Command, cmdName string) error { cfg := task.RestoreRawConfig{ RawKvConfig: task.RawKvConfig{Config: task.Config{LogProgress: HasLogFile()}}, @@ -79,9 +59,6 @@ func NewRestoreCommand() *cobra.Command { }, } command.AddCommand( - newFullRestoreCommand(), - newDBRestoreCommand(), - newTableRestoreCommand(), newRawRestoreCommand(), ) task.DefineRestoreFlags(command.PersistentFlags()) @@ -89,45 +66,6 @@ func NewRestoreCommand() *cobra.Command { return command } -func newFullRestoreCommand() *cobra.Command { - command := &cobra.Command{ - Use: "full", - Short: "restore all tables", - Args: cobra.NoArgs, - RunE: func(cmd *cobra.Command, _ []string) error { - return runRestoreCommand(cmd, "Full restore") - }, - } - task.DefineFilterFlags(command, filterOutSysAndMemTables) - return command -} - -func newDBRestoreCommand() *cobra.Command { - command := &cobra.Command{ - Use: "db", - Short: "restore tables in a database from the backup data", - Args: cobra.NoArgs, - RunE: func(cmd *cobra.Command, _ []string) error { - return runRestoreCommand(cmd, "Database restore") - }, - } - task.DefineDatabaseFlags(command) - return command -} - -func newTableRestoreCommand() *cobra.Command { - command := &cobra.Command{ - Use: "table", - Short: "restore a table from the backup data", - Args: cobra.NoArgs, - RunE: func(cmd *cobra.Command, _ []string) error { - return runRestoreCommand(cmd, "Table restore") - }, - } - task.DefineTableFlags(command) - return command -} - func newRawRestoreCommand() *cobra.Command { command := &cobra.Command{ Use: "raw", diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 5883cfa3..c89fca9a 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -5,7 +5,6 @@ package backup import ( "context" "encoding/hex" - "encoding/json" "fmt" "io" "os" @@ -20,15 +19,7 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" - filter "github.com/pingcap/tidb-tools/pkg/table-filter" - "github.com/pingcap/tidb/distsql" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta" - "github.com/pingcap/tidb/meta/autoid" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/ranger" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/txnlock" @@ -216,235 +207,6 @@ func CheckBackupStorageIsLocked(ctx context.Context, s storage.ExternalStorage) return nil } -// BuildTableRanges returns the key ranges encompassing the entire table, -// and its partitions if exists. -func BuildTableRanges(tbl *model.TableInfo) ([]kv.KeyRange, error) { - pis := tbl.GetPartitionInfo() - if pis == nil { - // Short path, no partition. - return appendRanges(tbl, tbl.ID) - } - - ranges := make([]kv.KeyRange, 0, len(pis.Definitions)*(len(tbl.Indices)+1)+1) - for _, def := range pis.Definitions { - rgs, err := appendRanges(tbl, def.ID) - if err != nil { - return nil, errors.Trace(err) - } - ranges = append(ranges, rgs...) - } - return ranges, nil -} - -func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) { - var ranges []*ranger.Range - if tbl.IsCommonHandle { - ranges = ranger.FullNotNullRange() - } else { - ranges = ranger.FullIntRange(false) - } - - kvRanges, err := distsql.TableHandleRangesToKVRanges(nil, []int64{tblID}, tbl.IsCommonHandle, ranges, nil) - if err != nil { - return nil, errors.Trace(err) - } - - for _, index := range tbl.Indices { - if index.State != model.StatePublic { - continue - } - ranges = ranger.FullRange() - idxRanges, err := distsql.IndexRangesToKVRanges(nil, tblID, index.ID, ranges, nil) - if err != nil { - return nil, errors.Trace(err) - } - kvRanges = append(kvRanges, idxRanges...) - } - return kvRanges, nil -} - -// BuildBackupRangeAndSchema gets KV range and schema of tables. -// KV ranges are separated by Table IDs. -// Also, KV ranges are separated by Index IDs in the same table. -func BuildBackupRangeAndSchema( - storage kv.Storage, - tableFilter filter.Filter, - backupTS uint64, -) ([]rtree.Range, *Schemas, error) { - snapshot := storage.GetSnapshot(kv.NewVersion(backupTS)) - m := meta.NewSnapshotMeta(snapshot) - - ranges := make([]rtree.Range, 0) - backupSchemas := newBackupSchemas() - dbs, err := m.ListDatabases() - if err != nil { - return nil, nil, errors.Trace(err) - } - - for _, dbInfo := range dbs { - // skip system databases - if !tableFilter.MatchSchema(dbInfo.Name.O) || util.IsMemDB(dbInfo.Name.L) { - continue - } - - tables, err := m.ListTables(dbInfo.ID) - if err != nil { - return nil, nil, errors.Trace(err) - } - - if len(tables) == 0 { - log.Warn("It's not necessary for backing up empty database", - zap.Stringer("db", dbInfo.Name)) - continue - } - - for _, tableInfo := range tables { - if !tableFilter.MatchTable(dbInfo.Name.O, tableInfo.Name.O) { - // Skip tables other than the given table. - continue - } - - logger := log.With( - zap.String("db", dbInfo.Name.O), - zap.String("table", tableInfo.Name.O), - ) - - tblVer := autoid.AllocOptionTableInfoVersion(tableInfo.Version) - idAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer) - seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer) - randAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer) - - var globalAutoID int64 - switch { - case tableInfo.IsSequence(): - globalAutoID, err = seqAlloc.NextGlobalAutoID() - case tableInfo.IsView() || !utils.NeedAutoID(tableInfo): - // no auto ID for views or table without either rowID nor auto_increment ID. - default: - globalAutoID, err = idAlloc.NextGlobalAutoID() - } - if err != nil { - return nil, nil, errors.Trace(err) - } - tableInfo.AutoIncID = globalAutoID - - if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() { - // this table has auto_random id, we need backup and rebase in restoration - var globalAutoRandID int64 - globalAutoRandID, err = randAlloc.NextGlobalAutoID() - if err != nil { - return nil, nil, errors.Trace(err) - } - tableInfo.AutoRandID = globalAutoRandID - logger.Debug("change table AutoRandID", - zap.Int64("AutoRandID", globalAutoRandID)) - } - logger.Debug("change table AutoIncID", - zap.Int64("AutoIncID", globalAutoID)) - - // remove all non-public indices - n := 0 - for _, index := range tableInfo.Indices { - if index.State == model.StatePublic { - tableInfo.Indices[n] = index - n++ - } - } - tableInfo.Indices = tableInfo.Indices[:n] - - backupSchemas.addSchema(dbInfo, tableInfo) - - tableRanges, err := BuildTableRanges(tableInfo) - if err != nil { - return nil, nil, errors.Trace(err) - } - for _, r := range tableRanges { - ranges = append(ranges, rtree.Range{ - StartKey: r.StartKey, - EndKey: r.EndKey, - }) - } - } - } - - if backupSchemas.Len() == 0 { - log.Info("nothing to backup") - return nil, nil, nil - } - return ranges, backupSchemas, nil -} - -func skipUnsupportedDDLJob(job *model.Job) bool { - switch job.Type { - // TiDB V5.3.0 supports TableAttributes and TablePartitionAttributes. - // Backup guarantees data integrity but region placement, which is out of scope of backup - case model.ActionCreatePlacementPolicy, - model.ActionAlterPlacementPolicy, - model.ActionDropPlacementPolicy, - model.ActionAlterTablePartitionPlacement, - model.ActionModifySchemaDefaultPlacement, - model.ActionAlterTablePlacement, - model.ActionAlterTableAttributes, - model.ActionAlterTablePartitionAttributes: - return true - default: - return false - } -} - -// WriteBackupDDLJobs sends the ddl jobs are done in (lastBackupTS, backupTS] to metaWriter. -func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastBackupTS, backupTS uint64) error { - snapshot := store.GetSnapshot(kv.NewVersion(backupTS)) - snapMeta := meta.NewSnapshotMeta(snapshot) - lastSnapshot := store.GetSnapshot(kv.NewVersion(lastBackupTS)) - lastSnapMeta := meta.NewSnapshotMeta(lastSnapshot) - lastSchemaVersion, err := lastSnapMeta.GetSchemaVersion() - if err != nil { - return errors.Trace(err) - } - allJobs := make([]*model.Job, 0) - defaultJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.DefaultJobListKey) - if err != nil { - return errors.Trace(err) - } - log.Debug("get default jobs", zap.Int("jobs", len(defaultJobs))) - allJobs = append(allJobs, defaultJobs...) - addIndexJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.AddIndexJobListKey) - if err != nil { - return errors.Trace(err) - } - log.Debug("get add index jobs", zap.Int("jobs", len(addIndexJobs))) - allJobs = append(allJobs, addIndexJobs...) - historyJobs, err := snapMeta.GetAllHistoryDDLJobs() - if err != nil { - return errors.Trace(err) - } - log.Debug("get history jobs", zap.Int("jobs", len(historyJobs))) - allJobs = append(allJobs, historyJobs...) - - count := 0 - for _, job := range allJobs { - if skipUnsupportedDDLJob(job) { - continue - } - - if (job.State == model.JobStateDone || job.State == model.JobStateSynced) && - (job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) { - jobBytes, err := json.Marshal(job) - if err != nil { - return errors.Trace(err) - } - err = metaWriter.Send(jobBytes, metautil.AppendDDL) - if err != nil { - return errors.Trace(err) - } - count++ - } - } - log.Debug("get completed jobs", zap.Int("jobs", count)) - return nil -} - // BackupRanges make a backup of the given key ranges. func (bc *Client) BackupRanges( ctx context.Context, diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index 1a9a1fd9..20e0b297 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -4,22 +4,12 @@ package backup_test import ( "context" - "encoding/json" - "math" "testing" "time" - "github.com/golang/protobuf/proto" . "github.com/pingcap/check" backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/kvproto/pkg/encryptionpb" "github.com/pingcap/kvproto/pkg/errorpb" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/testkit" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" @@ -129,91 +119,6 @@ func (r *testBackup) TestGetTS(c *C) { c.Assert(ts, Equals, backupts) } -func (r *testBackup) TestBuildTableRangeIntHandle(c *C) { - type Case struct { - ids []int64 - trs []kv.KeyRange - } - low := codec.EncodeInt(nil, math.MinInt64) - high := kv.Key(codec.EncodeInt(nil, math.MaxInt64)).PrefixNext() - cases := []Case{ - {ids: []int64{1}, trs: []kv.KeyRange{ - {StartKey: tablecodec.EncodeRowKey(1, low), EndKey: tablecodec.EncodeRowKey(1, high)}, - }}, - {ids: []int64{1, 2, 3}, trs: []kv.KeyRange{ - {StartKey: tablecodec.EncodeRowKey(1, low), EndKey: tablecodec.EncodeRowKey(1, high)}, - {StartKey: tablecodec.EncodeRowKey(2, low), EndKey: tablecodec.EncodeRowKey(2, high)}, - {StartKey: tablecodec.EncodeRowKey(3, low), EndKey: tablecodec.EncodeRowKey(3, high)}, - }}, - {ids: []int64{1, 3}, trs: []kv.KeyRange{ - {StartKey: tablecodec.EncodeRowKey(1, low), EndKey: tablecodec.EncodeRowKey(1, high)}, - {StartKey: tablecodec.EncodeRowKey(3, low), EndKey: tablecodec.EncodeRowKey(3, high)}, - }}, - } - for _, cs := range cases { - c.Log(cs) - tbl := &model.TableInfo{Partition: &model.PartitionInfo{Enable: true}} - for _, id := range cs.ids { - tbl.Partition.Definitions = append(tbl.Partition.Definitions, - model.PartitionDefinition{ID: id}) - } - ranges, err := backup.BuildTableRanges(tbl) - c.Assert(err, IsNil) - c.Assert(ranges, DeepEquals, cs.trs) - } - - tbl := &model.TableInfo{ID: 7} - ranges, err := backup.BuildTableRanges(tbl) - c.Assert(err, IsNil) - c.Assert(ranges, DeepEquals, []kv.KeyRange{ - {StartKey: tablecodec.EncodeRowKey(7, low), EndKey: tablecodec.EncodeRowKey(7, high)}, - }) -} - -func (r *testBackup) TestBuildTableRangeCommonHandle(c *C) { - type Case struct { - ids []int64 - trs []kv.KeyRange - } - low, errL := codec.EncodeKey(nil, nil, []types.Datum{types.MinNotNullDatum()}...) - c.Assert(errL, IsNil) - high, errH := codec.EncodeKey(nil, nil, []types.Datum{types.MaxValueDatum()}...) - c.Assert(errH, IsNil) - high = kv.Key(high).PrefixNext() - cases := []Case{ - {ids: []int64{1}, trs: []kv.KeyRange{ - {StartKey: tablecodec.EncodeRowKey(1, low), EndKey: tablecodec.EncodeRowKey(1, high)}, - }}, - {ids: []int64{1, 2, 3}, trs: []kv.KeyRange{ - {StartKey: tablecodec.EncodeRowKey(1, low), EndKey: tablecodec.EncodeRowKey(1, high)}, - {StartKey: tablecodec.EncodeRowKey(2, low), EndKey: tablecodec.EncodeRowKey(2, high)}, - {StartKey: tablecodec.EncodeRowKey(3, low), EndKey: tablecodec.EncodeRowKey(3, high)}, - }}, - {ids: []int64{1, 3}, trs: []kv.KeyRange{ - {StartKey: tablecodec.EncodeRowKey(1, low), EndKey: tablecodec.EncodeRowKey(1, high)}, - {StartKey: tablecodec.EncodeRowKey(3, low), EndKey: tablecodec.EncodeRowKey(3, high)}, - }}, - } - for _, cs := range cases { - c.Log(cs) - tbl := &model.TableInfo{Partition: &model.PartitionInfo{Enable: true}, IsCommonHandle: true} - for _, id := range cs.ids { - tbl.Partition.Definitions = append(tbl.Partition.Definitions, - model.PartitionDefinition{ID: id}) - } - ranges, err := backup.BuildTableRanges(tbl) - c.Assert(err, IsNil) - c.Assert(ranges, DeepEquals, cs.trs) - } - - tbl := &model.TableInfo{ID: 7, IsCommonHandle: true} - ranges, err := backup.BuildTableRanges(tbl) - c.Assert(err, IsNil) - c.Assert(ranges, DeepEquals, []kv.KeyRange{ - {StartKey: tablecodec.EncodeRowKey(7, low), EndKey: tablecodec.EncodeRowKey(7, high)}, - }) -} - func (r *testBackup) TestOnBackupRegionErrorResponse(c *C) { type Case struct { storeID uint64 @@ -292,55 +197,6 @@ func (r *testBackup) TestSendCreds(c *C) { c.Assert(secretAccessKey, Equals, "") } -func (r *testBackup) TestskipUnsupportedDDLJob(c *C) { - tk := testkit.NewTestKit(c, r.cluster.Storage) - tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;") - tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);") - lastTS, err := r.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - c.Assert(err, IsNil, Commentf("Error get last ts: %s", err)) - tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;") - tk.MustExec("DROP TABLE test_db.test_table1;") - tk.MustExec("DROP DATABASE test_db;") - tk.MustExec("CREATE DATABASE test_db;") - tk.MustExec("USE test_db;") - tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));") - tk.MustExec("RENAME TABLE test_table1 to test_table;") - tk.MustExec("TRUNCATE TABLE test_table;") - - tk.MustExec("CREATE TABLE tb(id INT NOT NULL, stu_id INT NOT NULL) " + - "PARTITION BY RANGE (stu_id) (PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11))") - tk.MustExec("ALTER TABLE tb attributes \"merge_option=allow\"") - tk.MustExec("ALTER TABLE tb PARTITION p0 attributes \"merge_option=deny\"") - - ts, err := r.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - c.Assert(err, IsNil, Commentf("Error get ts: %s", err)) - - cipher := backuppb.CipherInfo{CipherType: encryptionpb.EncryptionMethod_PLAINTEXT} - metaWriter := metautil.NewMetaWriter(r.storage, metautil.MetaFileSize, false, &cipher) - ctx := context.Background() - metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) - err = backup.WriteBackupDDLJobs(metaWriter, r.cluster.Storage, lastTS, ts) - c.Assert(err, IsNil, Commentf("Error get ddl jobs: %s", err)) - err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL) - c.Assert(err, IsNil, Commentf("Flush failed", err)) - err = metaWriter.FlushBackupMeta(ctx) - c.Assert(err, IsNil, Commentf("Finially flush backupmeta failed", err)) - - metaBytes, err := r.storage.ReadFile(ctx, metautil.MetaFile) - c.Assert(err, IsNil) - mockMeta := &backuppb.BackupMeta{} - err = proto.Unmarshal(metaBytes, mockMeta) - c.Assert(err, IsNil) - // check the schema version - metaReader := metautil.NewMetaReader(mockMeta, r.storage, &cipher) - allDDLJobsBytes, err := metaReader.ReadDDLs(ctx) - c.Assert(err, IsNil) - var allDDLJobs []*model.Job - err = json.Unmarshal(allDDLJobsBytes, &allDDLJobs) - c.Assert(err, IsNil) - c.Assert(len(allDDLJobs), Equals, 8) -} - func (r *testBackup) TestCheckBackupIsLocked(c *C) { ctx := context.Background() diff --git a/br/pkg/backup/schema.go b/br/pkg/backup/schema.go deleted file mode 100644 index 79216a9a..00000000 --- a/br/pkg/backup/schema.go +++ /dev/null @@ -1,211 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package backup - -import ( - "context" - "encoding/json" - "fmt" - "time" - - "github.com/opentracing/opentracing-go" - "github.com/pingcap/errors" - backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/log" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/statistics/handle" - "github.com/tikv/migration/br/pkg/checksum" - "github.com/tikv/migration/br/pkg/glue" - "github.com/tikv/migration/br/pkg/logutil" - "github.com/tikv/migration/br/pkg/metautil" - "github.com/tikv/migration/br/pkg/summary" - "github.com/tikv/migration/br/pkg/utils" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" -) - -const ( - // DefaultSchemaConcurrency is the default number of the concurrent - // backup schema tasks. - DefaultSchemaConcurrency = 64 -) - -type schemaInfo struct { - tableInfo *model.TableInfo - dbInfo *model.DBInfo - crc64xor uint64 - totalKvs uint64 - totalBytes uint64 - stats *handle.JSONTable -} - -// Schemas is task for backuping schemas. -type Schemas struct { - // name -> schema - schemas map[string]*schemaInfo -} - -func newBackupSchemas() *Schemas { - return &Schemas{ - schemas: make(map[string]*schemaInfo), - } -} - -func (ss *Schemas) addSchema( - dbInfo *model.DBInfo, tableInfo *model.TableInfo, -) { - name := fmt.Sprintf("%s.%s", - utils.EncloseName(dbInfo.Name.L), utils.EncloseName(tableInfo.Name.L)) - ss.schemas[name] = &schemaInfo{ - tableInfo: tableInfo, - dbInfo: dbInfo, - } -} - -// BackupSchemas backups table info, including checksum and stats. -func (ss *Schemas) BackupSchemas( - ctx context.Context, - metaWriter *metautil.MetaWriter, - store kv.Storage, - statsHandle *handle.Handle, - backupTS uint64, - concurrency uint, - copConcurrency uint, - skipChecksum bool, - updateCh glue.Progress, -) error { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("Schemas.BackupSchemas", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } - - workerPool := utils.NewWorkerPool(concurrency, "Schemas") - errg, ectx := errgroup.WithContext(ctx) - startAll := time.Now() - op := metautil.AppendSchema - metaWriter.StartWriteMetasAsync(ctx, op) - for _, s := range ss.schemas { - schema := s - // Because schema.dbInfo is a pointer that many tables point to. - // Remove "add Temporary-prefix into dbName" from closure to prevent concurrent operations. - if utils.IsSysDB(schema.dbInfo.Name.L) { - schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O) - } - - workerPool.ApplyOnErrorGroup(errg, func() error { - logger := log.With( - zap.String("db", schema.dbInfo.Name.O), - zap.String("table", schema.tableInfo.Name.O), - ) - - if !skipChecksum { - logger.Info("table checksum start") - start := time.Now() - err := schema.calculateChecksum(ectx, store.GetClient(), backupTS, copConcurrency) - if err != nil { - return errors.Trace(err) - } - logger.Info("table checksum finished", - zap.Uint64("Crc64Xor", schema.crc64xor), - zap.Uint64("TotalKvs", schema.totalKvs), - zap.Uint64("TotalBytes", schema.totalBytes), - zap.Duration("take", time.Since(start))) - } - if statsHandle != nil { - if err := schema.dumpStatsToJSON(statsHandle); err != nil { - logger.Error("dump table stats failed", logutil.ShortError(err)) - } - } - - // Send schema to metawriter - s, err := schema.encodeToSchema() - if err != nil { - return errors.Trace(err) - } - if err := metaWriter.Send(s, op); err != nil { - return errors.Trace(err) - } - updateCh.Inc() - return nil - }) - } - if err := errg.Wait(); err != nil { - return errors.Trace(err) - } - log.Info("backup checksum", zap.Duration("take", time.Since(startAll))) - summary.CollectDuration("backup checksum", time.Since(startAll)) - return metaWriter.FinishWriteMetas(ctx, op) -} - -// Len returns the number of schemas. -func (ss *Schemas) Len() int { - return len(ss.schemas) -} - -func (s *schemaInfo) calculateChecksum( - ctx context.Context, - client kv.Client, - backupTS uint64, - concurrency uint, -) error { - exe, err := checksum.NewExecutorBuilder(s.tableInfo, backupTS). - SetConcurrency(concurrency). - Build() - if err != nil { - return errors.Trace(err) - } - - checksumResp, err := exe.Execute(ctx, client, func() { - // TODO: update progress here. - }) - if err != nil { - return errors.Trace(err) - } - - s.crc64xor = checksumResp.Checksum - s.totalKvs = checksumResp.TotalKvs - s.totalBytes = checksumResp.TotalBytes - return nil -} - -func (s *schemaInfo) dumpStatsToJSON(statsHandle *handle.Handle) error { - jsonTable, err := statsHandle.DumpStatsToJSON( - s.dbInfo.Name.String(), s.tableInfo, nil) - if err != nil { - return errors.Trace(err) - } - - s.stats = jsonTable - return nil -} - -func (s *schemaInfo) encodeToSchema() (*backuppb.Schema, error) { - dbBytes, err := json.Marshal(s.dbInfo) - if err != nil { - return nil, errors.Trace(err) - } - - tableBytes, err := json.Marshal(s.tableInfo) - if err != nil { - return nil, errors.Trace(err) - } - - var statsBytes []byte - if s.stats != nil { - statsBytes, err = json.Marshal(s.stats) - if err != nil { - return nil, errors.Trace(err) - } - } - - return &backuppb.Schema{ - Db: dbBytes, - Table: tableBytes, - Crc64Xor: s.crc64xor, - TotalKvs: s.totalKvs, - TotalBytes: s.totalBytes, - Stats: statsBytes, - }, nil -} diff --git a/br/pkg/backup/schema_test.go b/br/pkg/backup/schema_test.go deleted file mode 100644 index d7aac104..00000000 --- a/br/pkg/backup/schema_test.go +++ /dev/null @@ -1,304 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package backup_test - -import ( - "context" - "fmt" - "math" - "strings" - "sync/atomic" - - "github.com/golang/protobuf/proto" - . "github.com/pingcap/check" - backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/kvproto/pkg/encryptionpb" - filter "github.com/pingcap/tidb-tools/pkg/table-filter" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/util/testkit" - "github.com/pingcap/tidb/util/testleak" - "github.com/tikv/migration/br/pkg/backup" - "github.com/tikv/migration/br/pkg/metautil" - "github.com/tikv/migration/br/pkg/mock" - "github.com/tikv/migration/br/pkg/storage" - "github.com/tikv/migration/br/pkg/utils" -) - -var _ = Suite(&testBackupSchemaSuite{}) - -type testBackupSchemaSuite struct { - mock *mock.Cluster -} - -func (s *testBackupSchemaSuite) SetUpSuite(c *C) { - var err error - s.mock, err = mock.NewCluster() - c.Assert(err, IsNil) - c.Assert(s.mock.Start(), IsNil) -} - -func (s *testBackupSchemaSuite) TearDownSuite(c *C) { - s.mock.Stop() - testleak.AfterTest(c)() -} - -func (s *testBackupSchemaSuite) GetRandomStorage(c *C) storage.ExternalStorage { - base := c.MkDir() - es, err := storage.NewLocalStorage(base) - c.Assert(err, IsNil) - return es -} - -func (s *testBackupSchemaSuite) GetSchemasFromMeta(c *C, es storage.ExternalStorage) []*metautil.Table { - ctx := context.Background() - metaBytes, err := es.ReadFile(ctx, metautil.MetaFile) - c.Assert(err, IsNil) - mockMeta := &backuppb.BackupMeta{} - err = proto.Unmarshal(metaBytes, mockMeta) - c.Assert(err, IsNil) - metaReader := metautil.NewMetaReader(mockMeta, - es, - &backuppb.CipherInfo{ - CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, - }, - ) - - output := make(chan *metautil.Table, 4) - go func() { - err = metaReader.ReadSchemasFiles(ctx, output) - c.Assert(err, IsNil) - close(output) - }() - - schemas := make([]*metautil.Table, 0, 4) - for s := range output { - schemas = append(schemas, s) - } - return schemas -} - -type simpleProgress struct { - counter int64 -} - -func (sp *simpleProgress) Inc() { - atomic.AddInt64(&sp.counter, 1) -} - -func (sp *simpleProgress) Close() {} - -func (sp *simpleProgress) reset() { - atomic.StoreInt64(&sp.counter, 0) -} - -func (sp *simpleProgress) get() int64 { - return atomic.LoadInt64(&sp.counter) -} - -func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) { - tk := testkit.NewTestKit(c, s.mock.Storage) - - // Table t1 is not exist. - testFilter, err := filter.Parse([]string{"test.t1"}) - c.Assert(err, IsNil) - _, backupSchemas, err := backup.BuildBackupRangeAndSchema( - s.mock.Storage, testFilter, math.MaxUint64) - c.Assert(err, IsNil) - c.Assert(backupSchemas, IsNil) - - // Database is not exist. - fooFilter, err := filter.Parse([]string{"foo.t1"}) - c.Assert(err, IsNil) - _, backupSchemas, err = backup.BuildBackupRangeAndSchema( - s.mock.Storage, fooFilter, math.MaxUint64) - c.Assert(err, IsNil) - c.Assert(backupSchemas, IsNil) - - // Empty database. - // Filter out system tables manually. - noFilter, err := filter.Parse([]string{"*.*", "!mysql.*"}) - c.Assert(err, IsNil) - _, backupSchemas, err = backup.BuildBackupRangeAndSchema( - s.mock.Storage, noFilter, math.MaxUint64) - c.Assert(err, IsNil) - c.Assert(backupSchemas, IsNil) - - tk.MustExec("use test") - tk.MustExec("drop table if exists t1;") - tk.MustExec("create table t1 (a int);") - tk.MustExec("insert into t1 values (10);") - - _, backupSchemas, err = backup.BuildBackupRangeAndSchema( - s.mock.Storage, testFilter, math.MaxUint64) - c.Assert(err, IsNil) - c.Assert(backupSchemas.Len(), Equals, 1) - updateCh := new(simpleProgress) - skipChecksum := false - es := s.GetRandomStorage(c) - cipher := backuppb.CipherInfo{ - CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, - } - metaWriter := metautil.NewMetaWriter(es, metautil.MetaFileSize, false, &cipher) - ctx := context.Background() - err = backupSchemas.BackupSchemas( - ctx, metaWriter, s.mock.Storage, nil, math.MaxUint64, 1, variable.DefChecksumTableConcurrency, skipChecksum, updateCh) - c.Assert(updateCh.get(), Equals, int64(1)) - c.Assert(err, IsNil) - err = metaWriter.FlushBackupMeta(ctx) - c.Assert(err, IsNil) - - schemas := s.GetSchemasFromMeta(c, es) - c.Assert(len(schemas), Equals, 1) - // Cluster returns a dummy checksum (all fields are 1). - c.Assert(schemas[0].Crc64Xor, Not(Equals), 0, Commentf("%v", schemas[0])) - c.Assert(schemas[0].TotalKvs, Not(Equals), 0, Commentf("%v", schemas[0])) - c.Assert(schemas[0].TotalBytes, Not(Equals), 0, Commentf("%v", schemas[0])) - - tk.MustExec("drop table if exists t2;") - tk.MustExec("create table t2 (a int);") - tk.MustExec("insert into t2 values (10);") - tk.MustExec("insert into t2 values (11);") - - _, backupSchemas, err = backup.BuildBackupRangeAndSchema( - s.mock.Storage, noFilter, math.MaxUint64) - c.Assert(err, IsNil) - c.Assert(backupSchemas.Len(), Equals, 2) - updateCh.reset() - - es2 := s.GetRandomStorage(c) - metaWriter2 := metautil.NewMetaWriter(es2, metautil.MetaFileSize, false, &cipher) - err = backupSchemas.BackupSchemas( - ctx, metaWriter2, s.mock.Storage, nil, math.MaxUint64, 2, variable.DefChecksumTableConcurrency, skipChecksum, updateCh) - c.Assert(updateCh.get(), Equals, int64(2)) - c.Assert(err, IsNil) - err = metaWriter2.FlushBackupMeta(ctx) - c.Assert(err, IsNil) - - schemas = s.GetSchemasFromMeta(c, es2) - - c.Assert(len(schemas), Equals, 2) - // Cluster returns a dummy checksum (all fields are 1). - c.Assert(schemas[0].Crc64Xor, Not(Equals), 0, Commentf("%v", schemas[0])) - c.Assert(schemas[0].TotalKvs, Not(Equals), 0, Commentf("%v", schemas[0])) - c.Assert(schemas[0].TotalBytes, Not(Equals), 0, Commentf("%v", schemas[0])) - c.Assert(schemas[1].Crc64Xor, Not(Equals), 0, Commentf("%v", schemas[1])) - c.Assert(schemas[1].TotalKvs, Not(Equals), 0, Commentf("%v", schemas[1])) - c.Assert(schemas[1].TotalBytes, Not(Equals), 0, Commentf("%v", schemas[1])) -} - -func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchemaWithBrokenStats(c *C) { - tk := testkit.NewTestKit(c, s.mock.Storage) - tk.MustExec("use test") - tk.MustExec("drop table if exists t3;") - tk.MustExec("create table t3 (a char(1));") - tk.MustExec("insert into t3 values ('1');") - tk.MustExec("analyze table t3;") - // corrupt the statistics like pingcap/br#679. - tk.MustExec(` - update mysql.stats_buckets set upper_bound = 0xffffffff - where table_id = ( - select tidb_table_id from information_schema.tables - where (table_schema, table_name) = ('test', 't3') - ); - `) - - f, err := filter.Parse([]string{"test.t3"}) - c.Assert(err, IsNil) - - _, backupSchemas, err := backup.BuildBackupRangeAndSchema(s.mock.Storage, f, math.MaxUint64) - c.Assert(err, IsNil) - c.Assert(backupSchemas.Len(), Equals, 1) - - skipChecksum := false - updateCh := new(simpleProgress) - - cipher := backuppb.CipherInfo{ - CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, - } - - es := s.GetRandomStorage(c) - metaWriter := metautil.NewMetaWriter(es, metautil.MetaFileSize, false, &cipher) - ctx := context.Background() - err = backupSchemas.BackupSchemas( - ctx, metaWriter, s.mock.Storage, nil, math.MaxUint64, 1, variable.DefChecksumTableConcurrency, skipChecksum, updateCh) - c.Assert(err, IsNil) - err = metaWriter.FlushBackupMeta(ctx) - c.Assert(err, IsNil) - - schemas := s.GetSchemasFromMeta(c, es) - c.Assert(err, IsNil) - c.Assert(schemas, HasLen, 1) - // the stats should be empty, but other than that everything should be backed up. - c.Assert(schemas[0].Stats, IsNil) - c.Assert(schemas[0].Crc64Xor, Not(Equals), 0) - c.Assert(schemas[0].TotalKvs, Not(Equals), 0) - c.Assert(schemas[0].TotalBytes, Not(Equals), 0) - c.Assert(schemas[0].Info, NotNil) - c.Assert(schemas[0].DB, NotNil) - - // recover the statistics. - tk.MustExec("analyze table t3;") - - _, backupSchemas, err = backup.BuildBackupRangeAndSchema(s.mock.Storage, f, math.MaxUint64) - c.Assert(err, IsNil) - c.Assert(backupSchemas.Len(), Equals, 1) - - updateCh.reset() - statsHandle := s.mock.Domain.StatsHandle() - es2 := s.GetRandomStorage(c) - metaWriter2 := metautil.NewMetaWriter(es2, metautil.MetaFileSize, false, &cipher) - err = backupSchemas.BackupSchemas( - ctx, metaWriter2, s.mock.Storage, statsHandle, math.MaxUint64, 1, variable.DefChecksumTableConcurrency, skipChecksum, updateCh) - c.Assert(err, IsNil) - err = metaWriter2.FlushBackupMeta(ctx) - c.Assert(err, IsNil) - - schemas2 := s.GetSchemasFromMeta(c, es2) - c.Assert(schemas2, HasLen, 1) - // the stats should now be filled, and other than that the result should be equivalent to the first backup. - c.Assert(schemas2[0].Stats, NotNil) - c.Assert(schemas2[0].Crc64Xor, Equals, schemas[0].Crc64Xor) - c.Assert(schemas2[0].TotalKvs, Equals, schemas[0].TotalKvs) - c.Assert(schemas2[0].TotalBytes, Equals, schemas[0].TotalBytes) - c.Assert(schemas2[0].Info, DeepEquals, schemas[0].Info) - c.Assert(schemas2[0].DB, DeepEquals, schemas[0].DB) -} - -func (s *testBackupSchemaSuite) TestBackupSchemasForSystemTable(c *C) { - tk := testkit.NewTestKit(c, s.mock.Storage) - es2 := s.GetRandomStorage(c) - - systemTablesCount := 32 - tablePrefix := "systable" - tk.MustExec("use mysql") - for i := 1; i <= systemTablesCount; i++ { - query := fmt.Sprintf("create table %s%d (a char(1));", tablePrefix, i) - tk.MustExec(query) - } - - f, err := filter.Parse([]string{"mysql.systable*"}) - c.Assert(err, IsNil) - _, backupSchemas, err := backup.BuildBackupRangeAndSchema(s.mock.Storage, f, math.MaxUint64) - c.Assert(err, IsNil) - c.Assert(backupSchemas.Len(), Equals, systemTablesCount) - - ctx := context.Background() - cipher := backuppb.CipherInfo{ - CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, - } - updateCh := new(simpleProgress) - - metaWriter2 := metautil.NewMetaWriter(es2, metautil.MetaFileSize, false, &cipher) - err = backupSchemas.BackupSchemas(ctx, metaWriter2, s.mock.Storage, nil, - math.MaxUint64, 1, variable.DefChecksumTableConcurrency, true, updateCh) - c.Assert(err, IsNil) - err = metaWriter2.FlushBackupMeta(ctx) - c.Assert(err, IsNil) - - schemas2 := s.GetSchemasFromMeta(c, es2) - c.Assert(schemas2, HasLen, systemTablesCount) - for _, schema := range schemas2 { - c.Assert(schema.DB.Name, Equals, utils.TemporaryDBName("mysql")) - c.Assert(strings.HasPrefix(schema.Info.Name.O, tablePrefix), Equals, true) - } -} diff --git a/br/pkg/checksum/executor_test.go b/br/pkg/checksum/executor_test.go deleted file mode 100644 index 9e5cd4bd..00000000 --- a/br/pkg/checksum/executor_test.go +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package checksum_test - -import ( - "context" - "math" - "testing" - - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/testkit" - "github.com/stretchr/testify/require" - "github.com/tikv/migration/br/pkg/backup" - "github.com/tikv/migration/br/pkg/checksum" - "github.com/tikv/migration/br/pkg/metautil" - "github.com/tikv/migration/br/pkg/mock" -) - -func getTestTableInfo(t *testing.T, mock *mock.Cluster, table string) *model.TableInfo { - db := "test" - info, err := mock.Domain.GetSnapshotInfoSchema(math.MaxUint64) - require.NoError(t, err) - cDBName := model.NewCIStr(db) - cTableName := model.NewCIStr(table) - tableInfo, err := info.TableByName(cDBName, cTableName) - require.NoError(t, err) - return tableInfo.Meta() -} - -func TestChecksum(t *testing.T) { - mock, err := mock.NewCluster() - require.NoError(t, err) - require.NoError(t, mock.Start()) - defer mock.Stop() - - tk := testkit.NewTestKit(t, mock.Storage) - tk.MustExec("use test") - - tk.MustExec("drop table if exists t1;") - tk.MustExec("create table t1 (a int);") - tk.MustExec("insert into t1 values (10);") - tableInfo1 := getTestTableInfo(t, mock, "t1") - exe1, err := checksum.NewExecutorBuilder(tableInfo1, math.MaxUint64). - SetConcurrency(variable.DefChecksumTableConcurrency). - Build() - require.NoError(t, err) - require.NoError(t, exe1.Each(func(r *kv.Request) error { - require.True(t, r.NotFillCache) - require.Equal(t, variable.DefChecksumTableConcurrency, r.Concurrency) - return nil - })) - require.Equal(t, 1, exe1.Len()) - resp, err := exe1.Execute(context.TODO(), mock.Storage.GetClient(), func() {}) - require.NoError(t, err) - // Cluster returns a dummy checksum (all fields are 1). - require.Equalf(t, uint64(1), resp.Checksum, "%v", resp) - require.Equalf(t, uint64(1), resp.TotalKvs, "%v", resp) - require.Equalf(t, uint64(1), resp.TotalBytes, "%v", resp) - - tk.MustExec("drop table if exists t2;") - tk.MustExec("create table t2 (a int);") - tk.MustExec("alter table t2 add index i2(a);") - tk.MustExec("insert into t2 values (10);") - tableInfo2 := getTestTableInfo(t, mock, "t2") - exe2, err := checksum.NewExecutorBuilder(tableInfo2, math.MaxUint64).Build() - require.NoError(t, err) - require.Equalf(t, 2, exe2.Len(), "%v", tableInfo2) - resp2, err := exe2.Execute(context.TODO(), mock.Storage.GetClient(), func() {}) - require.NoError(t, err) - require.Equalf(t, uint64(0), resp2.Checksum, "%v", resp2) - require.Equalf(t, uint64(2), resp2.TotalKvs, "%v", resp2) - require.Equalf(t, uint64(2), resp2.TotalBytes, "%v", resp2) - - // Test rewrite rules - tk.MustExec("alter table t1 add index i2(a);") - tableInfo1 = getTestTableInfo(t, mock, "t1") - oldTable := metautil.Table{Info: tableInfo1} - exe2, err = checksum.NewExecutorBuilder(tableInfo2, math.MaxUint64). - SetOldTable(&oldTable).Build() - require.NoError(t, err) - require.Equal(t, 2, exe2.Len()) - rawReqs, err := exe2.RawRequests() - require.NoError(t, err) - require.Len(t, rawReqs, 2) - for _, rawReq := range rawReqs { - require.NotNil(t, rawReq.Rule) - } - resp2, err = exe2.Execute(context.TODO(), mock.Storage.GetClient(), func() {}) - require.NoError(t, err) - require.NotNil(t, resp2) - - // Test commonHandle ranges - - tk.MustExec("drop table if exists t3;") - tk.MustExec("create table t3 (a char(255), b int, primary key(a) CLUSTERED);") - tk.MustExec("insert into t3 values ('fffffffff', 1), ('010101010', 2), ('394393fj39efefe', 3);") - tableInfo3 := getTestTableInfo(t, mock, "t3") - exe3, err := checksum.NewExecutorBuilder(tableInfo3, math.MaxUint64).Build() - require.NoError(t, err) - first := true - require.NoError(t, exe3.Each(func(req *kv.Request) error { - if first { - first = false - ranges, err := backup.BuildTableRanges(tableInfo3) - require.NoError(t, err) - require.Equalf(t, ranges[:1], req.KeyRanges, "%v", req.KeyRanges) - } - return nil - })) -} diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 757af37b..580dffec 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -9,10 +9,8 @@ import ( "encoding/hex" "encoding/json" "fmt" - "sort" "strconv" "strings" - "sync" "time" "github.com/opentracing/opentracing-go" @@ -28,7 +26,6 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "github.com/tikv/client-go/v2/oracle" - "github.com/tikv/migration/br/pkg/checksum" "github.com/tikv/migration/br/pkg/conn" berrors "github.com/tikv/migration/br/pkg/errors" "github.com/tikv/migration/br/pkg/glue" @@ -49,10 +46,6 @@ import ( "google.golang.org/grpc/keepalive" ) -// defaultChecksumConcurrency is the default number of the concurrent -// checksum tasks. -const defaultChecksumConcurrency = 64 - // Client sends requests to restore files. type Client struct { pdClient pd.Client @@ -77,7 +70,6 @@ type Client struct { db *DB rateLimit uint64 isOnline bool - noSchema bool hasSpeedLimited bool restoreStores []uint64 @@ -323,229 +315,6 @@ func (rc *Client) GetPlacementRules(ctx context.Context, pdAddrs []string) ([]pl return placementRules, errors.Trace(errRetry) } -// GetDatabases returns all databases. -func (rc *Client) GetDatabases() []*utils.Database { - dbs := make([]*utils.Database, 0, len(rc.databases)) - for _, db := range rc.databases { - dbs = append(dbs, db) - } - return dbs -} - -// GetDatabase returns a database by name. -func (rc *Client) GetDatabase(name string) *utils.Database { - return rc.databases[name] -} - -// GetDDLJobs returns ddl jobs. -func (rc *Client) GetDDLJobs() []*model.Job { - return rc.ddlJobs -} - -// GetTableSchema returns the schema of a table from TiDB. -func (rc *Client) GetTableSchema( - dom *domain.Domain, - dbName model.CIStr, - tableName model.CIStr, -) (*model.TableInfo, error) { - info := dom.InfoSchema() - table, err := info.TableByName(dbName, tableName) - if err != nil { - return nil, errors.Trace(err) - } - return table.Meta(), nil -} - -// CreateDatabase creates a database. -func (rc *Client) CreateDatabase(ctx context.Context, db *model.DBInfo) error { - if rc.IsSkipCreateSQL() { - log.Info("skip create database", zap.Stringer("database", db.Name)) - return nil - } - return rc.db.CreateDatabase(ctx, db) -} - -// CreateTables creates multiple tables, and returns their rewrite rules. -func (rc *Client) CreateTables( - dom *domain.Domain, - tables []*metautil.Table, - newTS uint64, -) (*RewriteRules, []*model.TableInfo, error) { - rewriteRules := &RewriteRules{ - Data: make([]*import_sstpb.RewriteRule, 0), - } - newTables := make([]*model.TableInfo, 0, len(tables)) - errCh := make(chan error, 1) - tbMapping := map[string]int{} - for i, t := range tables { - tbMapping[t.Info.Name.String()] = i - } - dataCh := rc.GoCreateTables(context.TODO(), dom, tables, newTS, nil, errCh) - for et := range dataCh { - rules := et.RewriteRule - rewriteRules.Data = append(rewriteRules.Data, rules.Data...) - newTables = append(newTables, et.Table) - } - // Let's ensure that it won't break the original order. - sort.Slice(newTables, func(i, j int) bool { - return tbMapping[newTables[i].Name.String()] < tbMapping[newTables[j].Name.String()] - }) - - select { - case err, ok := <-errCh: - if ok { - return nil, nil, errors.Trace(err) - } - default: - } - return rewriteRules, newTables, nil -} - -func (rc *Client) createTable( - ctx context.Context, - db *DB, - dom *domain.Domain, - table *metautil.Table, - newTS uint64, - ddlTables map[UniqueTableName]bool, -) (CreatedTable, error) { - if rc.IsSkipCreateSQL() { - log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name)) - } else { - err := db.CreateTable(ctx, table, ddlTables) - if err != nil { - return CreatedTable{}, errors.Trace(err) - } - } - newTableInfo, err := rc.GetTableSchema(dom, table.DB.Name, table.Info.Name) - if err != nil { - return CreatedTable{}, errors.Trace(err) - } - if newTableInfo.IsCommonHandle != table.Info.IsCommonHandle { - return CreatedTable{}, errors.Annotatef(berrors.ErrRestoreModeMismatch, - "Clustered index option mismatch. Restored cluster's @@tidb_enable_clustered_index should be %v (backup table = %v, created table = %v).", - transferBoolToValue(table.Info.IsCommonHandle), - table.Info.IsCommonHandle, - newTableInfo.IsCommonHandle) - } - rules := GetRewriteRules(newTableInfo, table.Info, newTS) - et := CreatedTable{ - RewriteRule: rules, - Table: newTableInfo, - OldTable: table, - } - return et, nil -} - -// GoCreateTables create tables, and generate their information. -// this function will use workers as the same number of sessionPool, -// leave sessionPool nil to send DDLs sequential. -func (rc *Client) GoCreateTables( - ctx context.Context, - dom *domain.Domain, - tables []*metautil.Table, - newTS uint64, - dbPool []*DB, - errCh chan<- error, -) <-chan CreatedTable { - // Could we have a smaller size of tables? - log.Info("start create tables") - - ddlTables := rc.DDLJobsMap() - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("Client.GoCreateTables", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } - outCh := make(chan CreatedTable, len(tables)) - rater := logutil.TraceRateOver(logutil.MetricTableCreatedCounter) - createOneTable := func(c context.Context, db *DB, t *metautil.Table) error { - select { - case <-c.Done(): - return c.Err() - default: - } - rt, err := rc.createTable(c, db, dom, t, newTS, ddlTables) - if err != nil { - log.Error("create table failed", - zap.Error(err), - zap.Stringer("db", t.DB.Name), - zap.Stringer("table", t.Info.Name)) - return errors.Trace(err) - } - log.Debug("table created and send to next", - zap.Int("output chan size", len(outCh)), - zap.Stringer("table", t.Info.Name), - zap.Stringer("database", t.DB.Name)) - outCh <- rt - rater.Inc() - rater.L().Info("table created", - zap.Stringer("table", t.Info.Name), - zap.Stringer("database", t.DB.Name)) - return nil - } - go func() { - defer close(outCh) - defer log.Debug("all tables are created") - var err error - if len(dbPool) > 0 { - err = rc.createTablesWithDBPool(ctx, createOneTable, tables, dbPool) - } else { - err = rc.createTablesWithSoleDB(ctx, createOneTable, tables) - } - if err != nil { - errCh <- err - } - }() - return outCh -} - -func (rc *Client) createTablesWithSoleDB(ctx context.Context, - createOneTable func(ctx context.Context, db *DB, t *metautil.Table) error, - tables []*metautil.Table) error { - for _, t := range tables { - if err := createOneTable(ctx, rc.db, t); err != nil { - return errors.Trace(err) - } - } - return nil -} - -func (rc *Client) createTablesWithDBPool(ctx context.Context, - createOneTable func(ctx context.Context, db *DB, t *metautil.Table) error, - tables []*metautil.Table, dbPool []*DB) error { - eg, ectx := errgroup.WithContext(ctx) - workers := utils.NewWorkerPool(uint(len(dbPool)), "DDL workers") - for _, t := range tables { - table := t - workers.ApplyWithIDInErrorGroup(eg, func(id uint64) error { - db := dbPool[id%uint64(len(dbPool))] - return createOneTable(ectx, db, table) - }) - } - return eg.Wait() -} - -// ExecDDLs executes the queries of the ddl jobs. -func (rc *Client) ExecDDLs(ctx context.Context, ddlJobs []*model.Job) error { - // Sort the ddl jobs by schema version in ascending order. - sort.Slice(ddlJobs, func(i, j int) bool { - return ddlJobs[i].BinlogInfo.SchemaVersion < ddlJobs[j].BinlogInfo.SchemaVersion - }) - - for _, job := range ddlJobs { - err := rc.db.ExecDDL(ctx, job) - if err != nil { - return errors.Trace(err) - } - log.Info("execute ddl query", - zap.String("db", job.SchemaName), - zap.String("query", job.Query), - zap.Int64("historySchemaVersion", job.BinlogInfo.SchemaVersion)) - } - return nil -} - func (rc *Client) setSpeedLimit(ctx context.Context) error { if !rc.hasSpeedLimited && rc.rateLimit != 0 { stores, err := conn.GetAllTiKVStores(ctx, rc.pdClient, conn.SkipTiFlash) @@ -780,171 +549,6 @@ func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMo return nil } -// GoValidateChecksum forks a goroutine to validate checksum after restore. -// it returns a channel fires a struct{} when all things get done. -func (rc *Client) GoValidateChecksum( - ctx context.Context, - tableStream <-chan CreatedTable, - kvClient kv.Client, - errCh chan<- error, - updateCh glue.Progress, - concurrency uint, -) <-chan struct{} { - log.Info("Start to validate checksum") - outCh := make(chan struct{}, 1) - wg := new(sync.WaitGroup) - wg.Add(2) - loadStatCh := make(chan *CreatedTable, 1024) - // run the stat loader - go func() { - defer wg.Done() - rc.updateMetaAndLoadStats(ctx, loadStatCh) - }() - workers := utils.NewWorkerPool(defaultChecksumConcurrency, "RestoreChecksum") - go func() { - eg, ectx := errgroup.WithContext(ctx) - defer func() { - if err := eg.Wait(); err != nil { - errCh <- err - } - close(loadStatCh) - wg.Done() - }() - - for { - select { - // if we use ectx here, maybe canceled will mask real error. - case <-ctx.Done(): - errCh <- ctx.Err() - case tbl, ok := <-tableStream: - if !ok { - return - } - - workers.ApplyOnErrorGroup(eg, func() error { - start := time.Now() - defer func() { - elapsed := time.Since(start) - summary.CollectSuccessUnit("table checksum", 1, elapsed) - }() - err := rc.execChecksum(ectx, tbl, kvClient, concurrency, loadStatCh) - if err != nil { - return errors.Trace(err) - } - updateCh.Inc() - return nil - }) - } - } - }() - go func() { - wg.Wait() - log.Info("all checksum ended") - close(outCh) - }() - return outCh -} - -func (rc *Client) execChecksum( - ctx context.Context, - tbl CreatedTable, - kvClient kv.Client, - concurrency uint, - loadStatCh chan<- *CreatedTable, -) error { - logger := log.With( - zap.String("db", tbl.OldTable.DB.Name.O), - zap.String("table", tbl.OldTable.Info.Name.O), - ) - - if tbl.OldTable.NoChecksum() { - logger.Warn("table has no checksum, skipping checksum") - return nil - } - - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("Client.execChecksum", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } - - startTS, err := rc.GetTS(ctx) - if err != nil { - return errors.Trace(err) - } - exe, err := checksum.NewExecutorBuilder(tbl.Table, startTS). - SetOldTable(tbl.OldTable). - SetConcurrency(concurrency). - Build() - if err != nil { - return errors.Trace(err) - } - checksumResp, err := exe.Execute(ctx, kvClient, func() { - // TODO: update progress here. - }) - if err != nil { - return errors.Trace(err) - } - - table := tbl.OldTable - if checksumResp.Checksum != table.Crc64Xor || - checksumResp.TotalKvs != table.TotalKvs || - checksumResp.TotalBytes != table.TotalBytes { - logger.Error("failed in validate checksum", - zap.Uint64("origin tidb crc64", table.Crc64Xor), - zap.Uint64("calculated crc64", checksumResp.Checksum), - zap.Uint64("origin tidb total kvs", table.TotalKvs), - zap.Uint64("calculated total kvs", checksumResp.TotalKvs), - zap.Uint64("origin tidb total bytes", table.TotalBytes), - zap.Uint64("calculated total bytes", checksumResp.TotalBytes), - ) - return errors.Annotate(berrors.ErrRestoreChecksumMismatch, "failed to validate checksum") - } - - loadStatCh <- &tbl - return nil -} - -func (rc *Client) updateMetaAndLoadStats(ctx context.Context, input <-chan *CreatedTable) { - for { - select { - case <-ctx.Done(): - return - case tbl, ok := <-input: - if !ok { - return - } - - // Not need to return err when failed because of update analysis-meta - restoreTS, err := rc.GetTS(ctx) - if err != nil { - log.Error("getTS failed", zap.Error(err)) - } else { - err = rc.db.UpdateStatsMeta(ctx, tbl.Table.ID, restoreTS, tbl.OldTable.TotalKvs) - if err != nil { - log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(err)) - } - } - - table := tbl.OldTable - if table.Stats != nil { - log.Info("start loads analyze after validate checksum", - zap.Int64("old id", tbl.OldTable.Info.ID), - zap.Int64("new id", tbl.Table.ID), - ) - start := time.Now() - if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil { - log.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err)) - } - log.Info("restore stat done", - zap.String("table", table.Info.Name.L), - zap.String("db", table.DB.Name.L), - zap.Duration("cost", time.Since(start))) - } - } - } -} - const ( restoreLabelKey = "exclusive" restoreLabelValue = "restore" @@ -1102,105 +706,3 @@ func (rc *Client) ResetPlacementRules(ctx context.Context, tables []*model.Table func (rc *Client) getRuleID(tableID int64) string { return "restore-t" + strconv.FormatInt(tableID, 10) } - -// IsIncremental returns whether this backup is incremental. -func (rc *Client) IsIncremental() bool { - return !(rc.backupMeta.StartVersion == rc.backupMeta.EndVersion || - rc.backupMeta.StartVersion == 0) -} - -// EnableSkipCreateSQL sets switch of skip create schema and tables. -func (rc *Client) EnableSkipCreateSQL() { - rc.noSchema = true -} - -// IsSkipCreateSQL returns whether we need skip create schema and tables in restore. -func (rc *Client) IsSkipCreateSQL() bool { - return rc.noSchema -} - -// DDLJobsMap returns a map[UniqueTableName]bool about < db table, hasCreate/hasTruncate DDL >. -// if we execute some DDLs before create table. -// we may get two situation that need to rebase auto increment/random id. -// 1. truncate table: truncate will generate new id cache. -// 2. create table/create and rename table: the first create table will lock down the id cache. -// because we cannot create onExistReplace table. -// so the final create DDL with the correct auto increment/random id won't be executed. -func (rc *Client) DDLJobsMap() map[UniqueTableName]bool { - m := make(map[UniqueTableName]bool) - for _, job := range rc.ddlJobs { - switch job.Type { - case model.ActionTruncateTable, model.ActionCreateTable, model.ActionRenameTable: - m[UniqueTableName{job.SchemaName, job.BinlogInfo.TableInfo.Name.String()}] = true - } - } - return m -} - -// PreCheckTableTiFlashReplica checks whether TiFlash replica is less than TiFlash node. -func (rc *Client) PreCheckTableTiFlashReplica( - ctx context.Context, - tables []*metautil.Table, -) error { - tiFlashStores, err := conn.GetAllTiKVStores(ctx, rc.pdClient, conn.TiFlashOnly) - if err != nil { - return errors.Trace(err) - } - tiFlashStoreCount := len(tiFlashStores) - for _, table := range tables { - if table.Info.TiFlashReplica != nil && table.Info.TiFlashReplica.Count > uint64(tiFlashStoreCount) { - // we cannot satisfy TiFlash replica in restore cluster. so we should - // set TiFlashReplica to unavailable in tableInfo, to avoid TiDB cannot sense TiFlash and make plan to TiFlash - // see details at https://github.com/pingcap/br/issues/931 - table.Info.TiFlashReplica = nil - } - } - return nil -} - -// PreCheckTableClusterIndex checks whether backup tables and existed tables have different cluster index options。 -func (rc *Client) PreCheckTableClusterIndex( - tables []*metautil.Table, - ddlJobs []*model.Job, - dom *domain.Domain, -) error { - for _, table := range tables { - oldTableInfo, err := rc.GetTableSchema(dom, table.DB.Name, table.Info.Name) - // table exists in database - if err == nil { - if table.Info.IsCommonHandle != oldTableInfo.IsCommonHandle { - return errors.Annotatef(berrors.ErrRestoreModeMismatch, - "Clustered index option mismatch. Restored cluster's @@tidb_enable_clustered_index should be %v (backup table = %v, created table = %v).", - transferBoolToValue(table.Info.IsCommonHandle), - table.Info.IsCommonHandle, - oldTableInfo.IsCommonHandle) - } - } - } - for _, job := range ddlJobs { - if job.Type == model.ActionCreateTable { - tableInfo := job.BinlogInfo.TableInfo - if tableInfo != nil { - oldTableInfo, err := rc.GetTableSchema(dom, model.NewCIStr(job.SchemaName), tableInfo.Name) - // table exists in database - if err == nil { - if tableInfo.IsCommonHandle != oldTableInfo.IsCommonHandle { - return errors.Annotatef(berrors.ErrRestoreModeMismatch, - "Clustered index option mismatch. Restored cluster's @@tidb_enable_clustered_index should be %v (backup table = %v, created table = %v).", - transferBoolToValue(tableInfo.IsCommonHandle), - tableInfo.IsCommonHandle, - oldTableInfo.IsCommonHandle) - } - } - } - } - } - return nil -} - -func transferBoolToValue(enable bool) string { - if enable { - return "ON" - } - return "OFF" -} diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index 3fcdf918..1e5f1891 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -3,23 +3,13 @@ package restore_test import ( - "context" - "math" - "strconv" "testing" "time" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/parser/types" - "github.com/pingcap/tidb/tablecodec" "github.com/stretchr/testify/require" "github.com/tikv/migration/br/pkg/gluetidb" - "github.com/tikv/migration/br/pkg/metautil" "github.com/tikv/migration/br/pkg/mock" "github.com/tikv/migration/br/pkg/restore" - pd "github.com/tikv/pd/client" "google.golang.org/grpc/keepalive" ) @@ -30,62 +20,6 @@ var defaultKeepaliveCfg = keepalive.ClientParameters{ Timeout: 10 * time.Second, } -func TestCreateTables(t *testing.T) { - m := mc - client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg) - require.NoError(t, err) - - info, err := m.Domain.GetSnapshotInfoSchema(math.MaxUint64) - require.NoError(t, err) - dbSchema, isExist := info.SchemaByName(model.NewCIStr("test")) - require.True(t, isExist) - - tables := make([]*metautil.Table, 4) - intField := types.NewFieldType(mysql.TypeLong) - intField.Charset = "binary" - for i := len(tables) - 1; i >= 0; i-- { - tables[i] = &metautil.Table{ - DB: dbSchema, - Info: &model.TableInfo{ - ID: int64(i), - Name: model.NewCIStr("test" + strconv.Itoa(i)), - Columns: []*model.ColumnInfo{{ - ID: 1, - Name: model.NewCIStr("id"), - FieldType: *intField, - State: model.StatePublic, - }}, - Charset: "utf8mb4", - Collate: "utf8mb4_bin", - }, - } - } - rules, newTables, err := client.CreateTables(m.Domain, tables, 0) - require.NoError(t, err) - // make sure tables and newTables have same order - for i, tbl := range tables { - require.Equal(t, tbl.Info.Name, newTables[i].Name) - } - for _, nt := range newTables { - require.Regexp(t, "test[0-3]", nt.Name.String()) - } - oldTableIDExist := make(map[int64]bool) - newTableIDExist := make(map[int64]bool) - for _, tr := range rules.Data { - oldTableID := tablecodec.DecodeTableID(tr.GetOldKeyPrefix()) - require.False(t, oldTableIDExist[oldTableID], "table rule duplicate old table id") - oldTableIDExist[oldTableID] = true - - newTableID := tablecodec.DecodeTableID(tr.GetNewKeyPrefix()) - require.False(t, newTableIDExist[newTableID], "table rule duplicate new table id") - newTableIDExist[newTableID] = true - } - - for i := 0; i < len(tables); i++ { - require.True(t, oldTableIDExist[int64(i)], "table rule does not exist") - } -} - func TestIsOnline(t *testing.T) { m := mc client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg) @@ -95,134 +29,3 @@ func TestIsOnline(t *testing.T) { client.EnableOnline() require.True(t, client.IsOnline()) } - -func TestPreCheckTableClusterIndex(t *testing.T) { - m := mc - client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg) - require.NoError(t, err) - - info, err := m.Domain.GetSnapshotInfoSchema(math.MaxUint64) - require.NoError(t, err) - dbSchema, isExist := info.SchemaByName(model.NewCIStr("test")) - require.True(t, isExist) - - tables := make([]*metautil.Table, 4) - intField := types.NewFieldType(mysql.TypeLong) - intField.Charset = "binary" - for i := len(tables) - 1; i >= 0; i-- { - tables[i] = &metautil.Table{ - DB: dbSchema, - Info: &model.TableInfo{ - ID: int64(i), - Name: model.NewCIStr("test" + strconv.Itoa(i)), - Columns: []*model.ColumnInfo{{ - ID: 1, - Name: model.NewCIStr("id"), - FieldType: *intField, - State: model.StatePublic, - }}, - Charset: "utf8mb4", - Collate: "utf8mb4_bin", - }, - } - } - _, _, err = client.CreateTables(m.Domain, tables, 0) - require.NoError(t, err) - - // exist different tables - tables[1].Info.IsCommonHandle = true - err = client.PreCheckTableClusterIndex(tables, nil, m.Domain) - require.Error(t, err) - require.Regexp(t, `.*@@tidb_enable_clustered_index should be ON \(backup table = true, created table = false\).*`, err.Error()) - - // exist different DDLs - jobs := []*model.Job{{ - ID: 5, - Type: model.ActionCreateTable, - SchemaName: "test", - Query: "", - BinlogInfo: &model.HistoryInfo{ - TableInfo: &model.TableInfo{ - Name: model.NewCIStr("test1"), - IsCommonHandle: true, - }, - }, - }} - err = client.PreCheckTableClusterIndex(nil, jobs, m.Domain) - require.Error(t, err) - require.Regexp(t, `.*@@tidb_enable_clustered_index should be ON \(backup table = true, created table = false\).*`, err.Error()) - - // should pass pre-check cluster index - tables[1].Info.IsCommonHandle = false - jobs[0].BinlogInfo.TableInfo.IsCommonHandle = false - require.Nil(t, client.PreCheckTableClusterIndex(tables, jobs, m.Domain)) -} - -type fakePDClient struct { - pd.Client - stores []*metapb.Store -} - -func (fpdc fakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) { - return append([]*metapb.Store{}, fpdc.stores...), nil -} - -func TestPreCheckTableTiFlashReplicas(t *testing.T) { - m := mc - mockStores := []*metapb.Store{ - { - Id: 1, - Labels: []*metapb.StoreLabel{ - { - Key: "engine", - Value: "tiflash", - }, - }, - }, - { - Id: 2, - Labels: []*metapb.StoreLabel{ - { - Key: "engine", - Value: "tiflash", - }, - }, - }, - } - - client, err := restore.NewRestoreClient(gluetidb.New(), fakePDClient{ - stores: mockStores, - }, m.Storage, nil, defaultKeepaliveCfg) - require.NoError(t, err) - - tables := make([]*metautil.Table, 4) - for i := 0; i < len(tables); i++ { - tiflashReplica := &model.TiFlashReplicaInfo{ - Count: uint64(i), - } - if i == 0 { - tiflashReplica = nil - } - - tables[i] = &metautil.Table{ - DB: nil, - Info: &model.TableInfo{ - ID: int64(i), - Name: model.NewCIStr("test" + strconv.Itoa(i)), - TiFlashReplica: tiflashReplica, - }, - } - } - ctx := context.Background() - require.Nil(t, client.PreCheckTableTiFlashReplica(ctx, tables)) - - for i := 0; i < len(tables); i++ { - if i == 0 || i > 2 { - require.Nil(t, tables[i].Info.TiFlashReplica) - } else { - require.NotNil(t, tables[i].Info.TiFlashReplica) - obtainCount := int(tables[i].Info.TiFlashReplica.Count) - require.Equal(t, i, obtainCount) - } - } -} diff --git a/br/pkg/restore/db_test.go b/br/pkg/restore/db_test.go index 6705b95d..babfc16c 100644 --- a/br/pkg/restore/db_test.go +++ b/br/pkg/restore/db_test.go @@ -4,20 +4,14 @@ package restore_test import ( "context" - "encoding/json" "math" "strconv" "testing" - "github.com/golang/protobuf/proto" - backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/kvproto/pkg/encryptionpb" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" - "github.com/tikv/client-go/v2/oracle" - "github.com/tikv/migration/br/pkg/backup" "github.com/tikv/migration/br/pkg/gluetidb" "github.com/tikv/migration/br/pkg/metautil" "github.com/tikv/migration/br/pkg/mock" @@ -123,132 +117,3 @@ func TestRestoreAutoIncID(t *testing.T) { require.Equal(t, uint64(globalAutoID+300), autoIncID) } - -func TestFilterDDLJobs(t *testing.T) { - s, clean := createRestoreSchemaSuite(t) - defer clean() - tk := testkit.NewTestKit(t, s.mock.Storage) - tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;") - tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);") - lastTS, err := s.mock.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - require.NoErrorf(t, err, "Error get last ts: %s", err) - tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;") - tk.MustExec("DROP TABLE test_db.test_table1;") - tk.MustExec("DROP DATABASE test_db;") - tk.MustExec("CREATE DATABASE test_db;") - tk.MustExec("USE test_db;") - tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));") - tk.MustExec("RENAME TABLE test_table1 to test_table;") - tk.MustExec("TRUNCATE TABLE test_table;") - - ts, err := s.mock.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - require.NoErrorf(t, err, "Error get ts: %s", err) - - cipher := backuppb.CipherInfo{ - CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, - } - - metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, &cipher) - ctx := context.Background() - metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) - err = backup.WriteBackupDDLJobs(metaWriter, s.mock.Storage, lastTS, ts) - require.NoErrorf(t, err, "Error get ddl jobs: %s", err) - err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL) - require.NoErrorf(t, err, "Flush failed", err) - err = metaWriter.FlushBackupMeta(ctx) - require.NoErrorf(t, err, "Finially flush backupmeta failed", err) - infoSchema, err := s.mock.Domain.GetSnapshotInfoSchema(ts) - require.NoErrorf(t, err, "Error get snapshot info schema: %s", err) - dbInfo, ok := infoSchema.SchemaByName(model.NewCIStr("test_db")) - require.Truef(t, ok, "DB info not exist") - tableInfo, err := infoSchema.TableByName(model.NewCIStr("test_db"), model.NewCIStr("test_table")) - require.NoErrorf(t, err, "Error get table info: %s", err) - tables := []*metautil.Table{{ - DB: dbInfo, - Info: tableInfo.Meta(), - }} - metaBytes, err := s.storage.ReadFile(ctx, metautil.MetaFile) - require.NoError(t, err) - mockMeta := &backuppb.BackupMeta{} - err = proto.Unmarshal(metaBytes, mockMeta) - require.NoError(t, err) - // check the schema version - require.Equal(t, int32(metautil.MetaV1), mockMeta.Version) - metaReader := metautil.NewMetaReader(mockMeta, s.storage, &cipher) - allDDLJobsBytes, err := metaReader.ReadDDLs(ctx) - require.NoError(t, err) - var allDDLJobs []*model.Job - err = json.Unmarshal(allDDLJobsBytes, &allDDLJobs) - require.NoError(t, err) - - ddlJobs := restore.FilterDDLJobs(allDDLJobs, tables) - for _, job := range ddlJobs { - t.Logf("get ddl job: %s", job.Query) - } - require.Equal(t, 7, len(ddlJobs)) -} - -func TestFilterDDLJobsV2(t *testing.T) { - s, clean := createRestoreSchemaSuite(t) - defer clean() - tk := testkit.NewTestKit(t, s.mock.Storage) - tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;") - tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);") - lastTS, err := s.mock.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - require.NoErrorf(t, err, "Error get last ts: %s", err) - tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;") - tk.MustExec("DROP TABLE test_db.test_table1;") - tk.MustExec("DROP DATABASE test_db;") - tk.MustExec("CREATE DATABASE test_db;") - tk.MustExec("USE test_db;") - tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));") - tk.MustExec("RENAME TABLE test_table1 to test_table;") - tk.MustExec("TRUNCATE TABLE test_table;") - - ts, err := s.mock.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - require.NoErrorf(t, err, "Error get ts: %s", err) - - cipher := backuppb.CipherInfo{ - CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, - } - - metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, true, &cipher) - ctx := context.Background() - metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) - err = backup.WriteBackupDDLJobs(metaWriter, s.mock.Storage, lastTS, ts) - require.NoErrorf(t, err, "Error get ddl jobs: %s", err) - err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL) - require.NoErrorf(t, err, "Flush failed", err) - err = metaWriter.FlushBackupMeta(ctx) - require.NoErrorf(t, err, "Flush BackupMeta failed", err) - - infoSchema, err := s.mock.Domain.GetSnapshotInfoSchema(ts) - require.NoErrorf(t, err, "Error get snapshot info schema: %s", err) - dbInfo, ok := infoSchema.SchemaByName(model.NewCIStr("test_db")) - require.Truef(t, ok, "DB info not exist") - tableInfo, err := infoSchema.TableByName(model.NewCIStr("test_db"), model.NewCIStr("test_table")) - require.NoErrorf(t, err, "Error get table info: %s", err) - tables := []*metautil.Table{{ - DB: dbInfo, - Info: tableInfo.Meta(), - }} - metaBytes, err := s.storage.ReadFile(ctx, metautil.MetaFile) - require.NoError(t, err) - mockMeta := &backuppb.BackupMeta{} - err = proto.Unmarshal(metaBytes, mockMeta) - require.NoError(t, err) - // check the schema version - require.Equal(t, int32(metautil.MetaV2), mockMeta.Version) - metaReader := metautil.NewMetaReader(mockMeta, s.storage, &cipher) - allDDLJobsBytes, err := metaReader.ReadDDLs(ctx) - require.NoError(t, err) - var allDDLJobs []*model.Job - err = json.Unmarshal(allDDLJobsBytes, &allDDLJobs) - require.NoError(t, err) - - ddlJobs := restore.FilterDDLJobs(allDDLJobs, tables) - for _, job := range ddlJobs { - t.Logf("get ddl job: %s", job.Query) - } - require.Equal(t, 7, len(ddlJobs)) -} diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index 5df3020a..7741fa75 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -3,35 +3,18 @@ package task import ( - "context" - "fmt" - "os" "strconv" - "strings" "time" - "github.com/docker/go-units" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/log" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/types" "github.com/spf13/pflag" "github.com/tikv/client-go/v2/oracle" - "github.com/tikv/migration/br/pkg/backup" - "github.com/tikv/migration/br/pkg/checksum" berrors "github.com/tikv/migration/br/pkg/errors" - "github.com/tikv/migration/br/pkg/glue" - "github.com/tikv/migration/br/pkg/logutil" - "github.com/tikv/migration/br/pkg/metautil" - "github.com/tikv/migration/br/pkg/storage" - "github.com/tikv/migration/br/pkg/summary" "github.com/tikv/migration/br/pkg/utils" - "go.uber.org/zap" ) const ( @@ -45,9 +28,6 @@ const ( flagUseBackupMetaV2 = "use-backupmeta-v2" flagGCTTL = "gcttl" - - defaultBackupConcurrency = 4 - maxBackupConcurrency = 256 ) // CompressionConfig is the configuration for sst file compression. @@ -180,321 +160,6 @@ func parseCompressionFlags(flags *pflag.FlagSet) (*CompressionConfig, error) { }, nil } -// adjustBackupConfig is use for BR(binary) and BR in TiDB. -// When new config was add and not included in parser. -// we should set proper value in this function. -// so that both binary and TiDB will use same default value. -func (cfg *BackupConfig) adjustBackupConfig() { - cfg.adjust() - usingDefaultConcurrency := false - if cfg.Config.Concurrency == 0 { - cfg.Config.Concurrency = defaultBackupConcurrency - usingDefaultConcurrency = true - } - if cfg.Config.Concurrency > maxBackupConcurrency { - cfg.Config.Concurrency = maxBackupConcurrency - } - if cfg.RateLimit != unlimited { - // TiKV limits the upload rate by each backup request. - // When the backup requests are sent concurrently, - // the ratelimit couldn't work as intended. - // Degenerating to sequentially sending backup requests to avoid this. - if !usingDefaultConcurrency { - logutil.WarnTerm("setting `--ratelimit` and `--concurrency` at the same time, "+ - "ignoring `--concurrency`: `--ratelimit` forces sequential (i.e. concurrency = 1) backup", - zap.String("ratelimit", units.HumanSize(float64(cfg.RateLimit))+"/s"), - zap.Uint32("concurrency-specified", cfg.Config.Concurrency)) - } - cfg.Config.Concurrency = 1 - } - - if cfg.GCTTL == 0 { - cfg.GCTTL = utils.DefaultBRGCSafePointTTL - } - // Use zstd as default - if cfg.CompressionType == backuppb.CompressionType_UNKNOWN { - cfg.CompressionType = backuppb.CompressionType_ZSTD - } -} - -// RunBackup starts a backup task inside the current goroutine. -func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig) error { - cfg.adjustBackupConfig() - - defer summary.Summary(cmdName) - ctx, cancel := context.WithCancel(c) - defer cancel() - - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("task.RunBackup", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } - - u, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions) - if err != nil { - return errors.Trace(err) - } - skipStats := cfg.IgnoreStats - // For backup, Domain is not needed if user ignores stats. - // Domain loads all table info into memory. By skipping Domain, we save - // lots of memory (about 500MB for 40K 40 fields YCSB tables). - needDomain := !skipStats - mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain) - if err != nil { - return errors.Trace(err) - } - defer mgr.Close() - var statsHandle *handle.Handle - if !skipStats { - statsHandle = mgr.GetDomain().StatsHandle() - } - - client, err := backup.NewBackupClient(ctx, mgr) - if err != nil { - return errors.Trace(err) - } - opts := storage.ExternalStorageOptions{ - NoCredentials: cfg.NoCreds, - SendCredentials: cfg.SendCreds, - } - if err = client.SetStorage(ctx, u, &opts); err != nil { - return errors.Trace(err) - } - err = client.SetLockFile(ctx) - if err != nil { - return errors.Trace(err) - } - client.SetGCTTL(cfg.GCTTL) - - backupTS, err := client.GetTS(ctx, cfg.TimeAgo, cfg.BackupTS) - if err != nil { - return errors.Trace(err) - } - g.Record("BackupTS", backupTS) - sp := utils.BRServiceSafePoint{ - BackupTS: backupTS, - TTL: client.GetGCTTL(), - ID: utils.MakeSafePointID(), - } - // use lastBackupTS as safePoint if exists - if cfg.LastBackupTS > 0 { - sp.BackupTS = cfg.LastBackupTS - } - - log.Info("current backup safePoint job", zap.Object("safePoint", sp)) - err = utils.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp) - if err != nil { - return errors.Trace(err) - } - - isIncrementalBackup := cfg.LastBackupTS > 0 - - if cfg.RemoveSchedulers { - log.Debug("removing some PD schedulers") - restore, e := mgr.RemoveSchedulers(ctx) - defer func() { - if ctx.Err() != nil { - log.Warn("context canceled, doing clean work with background context") - ctx = context.Background() - } - if restoreE := restore(ctx); restoreE != nil { - log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) - } - }() - if e != nil { - return errors.Trace(err) - } - } - - req := backuppb.BackupRequest{ - ClusterId: client.GetClusterID(), - StartVersion: cfg.LastBackupTS, - EndVersion: backupTS, - RateLimit: cfg.RateLimit, - Concurrency: defaultBackupConcurrency, - CompressionType: cfg.CompressionType, - CompressionLevel: cfg.CompressionLevel, - CipherInfo: &cfg.CipherInfo, - } - brVersion := g.GetVersion() - clusterVersion, err := mgr.GetClusterVersion(ctx) - if err != nil { - return errors.Trace(err) - } - - ranges, schemas, err := backup.BuildBackupRangeAndSchema(mgr.GetStorage(), cfg.TableFilter, backupTS) - if err != nil { - return errors.Trace(err) - } - - // Metafile size should be less than 64MB. - metawriter := metautil.NewMetaWriter(client.GetStorage(), - metautil.MetaFileSize, cfg.UseBackupMetaV2, &cfg.CipherInfo) - // Hack way to update backupmeta. - metawriter.Update(func(m *backuppb.BackupMeta) { - m.StartVersion = req.StartVersion - m.EndVersion = req.EndVersion - m.IsRawKv = req.IsRawKv - m.ClusterId = req.ClusterId - m.ClusterVersion = clusterVersion - m.BrVersion = brVersion - }) - - // nothing to backup - if ranges == nil { - pdAddress := strings.Join(cfg.PD, ",") - log.Warn("Nothing to backup, maybe connected to cluster for restoring", - zap.String("PD address", pdAddress)) - - err = metawriter.FlushBackupMeta(ctx) - if err == nil { - summary.SetSuccessStatus(true) - } - return err - } - - if isIncrementalBackup { - if backupTS <= cfg.LastBackupTS { - log.Error("LastBackupTS is larger or equal to current TS") - return errors.Annotate(berrors.ErrInvalidArgument, "LastBackupTS is larger or equal to current TS") - } - err = utils.CheckGCSafePoint(ctx, mgr.GetPDClient(), cfg.LastBackupTS) - if err != nil { - log.Error("Check gc safepoint for last backup ts failed", zap.Error(err)) - return errors.Trace(err) - } - - metawriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) - err = backup.WriteBackupDDLJobs(metawriter, mgr.GetStorage(), cfg.LastBackupTS, backupTS) - if err != nil { - return errors.Trace(err) - } - if err = metawriter.FinishWriteMetas(ctx, metautil.AppendDDL); err != nil { - return errors.Trace(err) - } - } - - summary.CollectInt("backup total ranges", len(ranges)) - - var updateCh glue.Progress - var unit backup.ProgressUnit - if len(ranges) < 100 { - unit = backup.RegionUnit - // The number of regions need to backup - approximateRegions := 0 - for _, r := range ranges { - var regionCount int - regionCount, err = mgr.GetRegionCount(ctx, r.StartKey, r.EndKey) - if err != nil { - return errors.Trace(err) - } - approximateRegions += regionCount - } - // Redirect to log if there is no log file to avoid unreadable output. - updateCh = g.StartProgress( - ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) - summary.CollectInt("backup total regions", approximateRegions) - } else { - unit = backup.RangeUnit - // To reduce the costs, we can use the range as unit of progress. - updateCh = g.StartProgress( - ctx, cmdName, int64(len(ranges)), !cfg.LogProgress) - } - - progressCount := 0 - progressCallBack := func(callBackUnit backup.ProgressUnit) { - if unit == callBackUnit { - updateCh.Inc() - progressCount++ - failpoint.Inject("progress-call-back", func(v failpoint.Value) { - log.Info("failpoint progress-call-back injected") - if fileName, ok := v.(string); ok { - f, osErr := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModePerm) - if osErr != nil { - log.Warn("failed to create file", zap.Error(osErr)) - } - msg := []byte(fmt.Sprintf("%s:%d\n", unit, progressCount)) - _, err = f.Write(msg) - if err != nil { - log.Warn("failed to write data to file", zap.Error(err)) - } - } - }) - } - } - metawriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile) - err = client.BackupRanges(ctx, ranges, req, uint(cfg.Concurrency), metawriter, progressCallBack) - if err != nil { - return errors.Trace(err) - } - // Backup has finished - updateCh.Close() - - err = metawriter.FinishWriteMetas(ctx, metautil.AppendDataFile) - if err != nil { - return errors.Trace(err) - } - - skipChecksum := !cfg.Checksum || isIncrementalBackup - checksumProgress := int64(schemas.Len()) - if skipChecksum { - checksumProgress = 1 - if isIncrementalBackup { - // Since we don't support checksum for incremental data, fast checksum should be skipped. - log.Info("Skip fast checksum in incremental backup") - } else { - // When user specified not to calculate checksum, don't calculate checksum. - log.Info("Skip fast checksum") - } - } - updateCh = g.StartProgress(ctx, "Checksum", checksumProgress, !cfg.LogProgress) - schemasConcurrency := uint(utils.MinInt(backup.DefaultSchemaConcurrency, schemas.Len())) - - err = schemas.BackupSchemas( - ctx, metawriter, mgr.GetStorage(), statsHandle, backupTS, schemasConcurrency, cfg.ChecksumConcurrency, skipChecksum, updateCh) - if err != nil { - return errors.Trace(err) - } - - err = metawriter.FlushBackupMeta(ctx) - if err != nil { - return errors.Trace(err) - } - - // Checksum has finished, close checksum progress. - updateCh.Close() - - if !skipChecksum { - // Check if checksum from files matches checksum from coprocessor. - err = checksum.FastChecksum(ctx, metawriter.Backupmeta(), client.GetStorage(), &cfg.CipherInfo) - if err != nil { - return errors.Trace(err) - } - } - archiveSize := metawriter.ArchiveSize() - g.Record(summary.BackupDataSize, archiveSize) - //backup from tidb will fetch a general Size issue https://github.com/pingcap/tidb/issues/27247 - g.Record("Size", archiveSize) - failpoint.Inject("s3-outage-during-writing-file", func(v failpoint.Value) { - log.Info("failpoint s3-outage-during-writing-file injected, " + - "process will sleep for 3s and notify the shell to kill s3 service.") - if sigFile, ok := v.(string); ok { - file, err := os.Create(sigFile) - if err != nil { - log.Warn("failed to create file for notifying, skipping notify", zap.Error(err)) - } - if file != nil { - file.Close() - } - } - time.Sleep(3 * time.Second) - }) - // Set task summary to success status. - summary.SetSuccessStatus(true) - return nil -} - // parseTSString port from tidb setSnapshotTS. func parseTSString(ts string) (uint64, error) { if len(ts) == 0 { diff --git a/br/pkg/task/common_test.go b/br/pkg/task/common_test.go index b124f697..e980f5a4 100644 --- a/br/pkg/task/common_test.go +++ b/br/pkg/task/common_test.go @@ -9,7 +9,6 @@ import ( backup "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" - "github.com/pingcap/tidb/config" "github.com/spf13/pflag" "github.com/stretchr/testify/require" ) @@ -38,14 +37,6 @@ func TestUrlNoQuery(t *testing.T) { require.Equal(t, "s3://some/what", field.Interface.(fmt.Stringer).String()) } -func TestTiDBConfigUnchanged(t *testing.T) { - cfg := config.GetGlobalConfig() - restoreConfig := enableTiDBConfig() - require.NotEqual(t, config.GetGlobalConfig(), cfg) - restoreConfig() - require.Equal(t, config.GetGlobalConfig(), cfg) -} - func TestStripingPDURL(t *testing.T) { nor1, err := normalizePDURL("https://pd:5432", true) require.NoError(t, err) diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 4bf3bbba..05835bcf 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -6,24 +6,12 @@ import ( "context" "time" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" - "github.com/pingcap/tidb/config" "github.com/spf13/pflag" "github.com/tikv/migration/br/pkg/conn" - berrors "github.com/tikv/migration/br/pkg/errors" - "github.com/tikv/migration/br/pkg/glue" - "github.com/tikv/migration/br/pkg/metautil" "github.com/tikv/migration/br/pkg/pdutil" "github.com/tikv/migration/br/pkg/restore" - "github.com/tikv/migration/br/pkg/storage" - "github.com/tikv/migration/br/pkg/summary" - "github.com/tikv/migration/br/pkg/utils" - "github.com/tikv/migration/br/pkg/version" - "go.uber.org/multierr" "go.uber.org/zap" ) @@ -41,10 +29,8 @@ const ( FlagBatchFlushInterval = "batch-flush-interval" defaultRestoreConcurrency = 128 - maxRestoreBatchSizeLimit = 10240 defaultPDConcurrency = 1 defaultBatchFlushInterval = 16 * time.Second - defaultDDLConcurrency = 16 ) // RestoreCommonConfig is the common configuration for all BR restore tasks. @@ -177,351 +163,6 @@ func (cfg *RestoreConfig) adjustRestoreConfig() { } } -// CheckRestoreDBAndTable is used to check whether the restore dbs or tables have been backup -func CheckRestoreDBAndTable(client *restore.Client, cfg *RestoreConfig) error { - if len(cfg.Schemas) == 0 && len(cfg.Tables) == 0 { - return nil - } - schemas := client.GetDatabases() - schemasMap := make(map[string]struct{}) - tablesMap := make(map[string]struct{}) - for _, db := range schemas { - dbName := db.Info.Name.O - if name, ok := utils.GetSysDBName(db.Info.Name); utils.IsSysDB(name) && ok { - dbName = name - } - schemasMap[utils.EncloseName(dbName)] = struct{}{} - for _, table := range db.Tables { - tablesMap[utils.EncloseDBAndTable(dbName, table.Info.Name.O)] = struct{}{} - } - } - restoreSchemas := cfg.Schemas - restoreTables := cfg.Tables - for schema := range restoreSchemas { - if _, ok := schemasMap[schema]; !ok { - return errors.Annotatef(berrors.ErrUndefinedRestoreDbOrTable, - "[database: %v] has not been backup, please ensure you has input a correct database name", schema) - } - } - for table := range restoreTables { - if _, ok := tablesMap[table]; !ok { - return errors.Annotatef(berrors.ErrUndefinedRestoreDbOrTable, - "[table: %v] has not been backup, please ensure you has input a correct table name", table) - } - } - return nil -} - -// RunRestore starts a restore task inside the current goroutine. -func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { - cfg.adjustRestoreConfig() - - defer summary.Summary(cmdName) - ctx, cancel := context.WithCancel(c) - defer cancel() - - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("task.RunRestore", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } - - // Restore needs domain to do DDL. - needDomain := true - mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain) - if err != nil { - return errors.Trace(err) - } - defer mgr.Close() - - keepaliveCfg := GetKeepalive(&cfg.Config) - keepaliveCfg.PermitWithoutStream = true - client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetStorage(), mgr.GetTLSConfig(), keepaliveCfg) - if err != nil { - return errors.Trace(err) - } - defer client.Close() - - u, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions) - if err != nil { - return errors.Trace(err) - } - opts := storage.ExternalStorageOptions{ - NoCredentials: cfg.NoCreds, - SendCredentials: cfg.SendCreds, - } - if err = client.SetStorage(ctx, u, &opts); err != nil { - return errors.Trace(err) - } - client.SetRateLimit(cfg.RateLimit) - client.SetCrypter(&cfg.CipherInfo) - client.SetConcurrency(uint(cfg.Concurrency)) - if cfg.Online { - client.EnableOnline() - } - if cfg.NoSchema { - client.EnableSkipCreateSQL() - } - client.SetSwitchModeInterval(cfg.SwitchModeInterval) - err = client.LoadRestoreStores(ctx) - if err != nil { - return errors.Trace(err) - } - - u, s, backupMeta, err := ReadBackupMeta(ctx, metautil.MetaFile, &cfg.Config) - if err != nil { - return errors.Trace(err) - } - backupVersion := version.NormalizeBackupVersion(backupMeta.ClusterVersion) - if cfg.CheckRequirements && backupVersion != nil { - if versionErr := version.CheckClusterVersion(ctx, mgr.GetPDClient(), version.CheckVersionForBackup(backupVersion)); versionErr != nil { - return errors.Trace(versionErr) - } - } - reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - if err = client.InitBackupMeta(c, backupMeta, u, s, reader); err != nil { - return errors.Trace(err) - } - - if client.IsRawKvMode() { - return errors.Annotate(berrors.ErrRestoreModeMismatch, "cannot do transactional restore from raw kv data") - } - if err = CheckRestoreDBAndTable(client, cfg); err != nil { - return err - } - files, tables, dbs := filterRestoreFiles(client, cfg) - if len(dbs) == 0 && len(tables) != 0 { - return errors.Annotate(berrors.ErrRestoreInvalidBackup, "contain tables but no databases") - } - archiveSize := reader.ArchiveSize(ctx, files) - g.Record(summary.RestoreDataSize, archiveSize) - //restore from tidb will fetch a general Size issue https://github.com/pingcap/tidb/issues/27247 - g.Record("Size", archiveSize) - restoreTS, err := client.GetTS(ctx) - if err != nil { - return errors.Trace(err) - } - - sp := utils.BRServiceSafePoint{ - BackupTS: restoreTS, - TTL: utils.DefaultBRGCSafePointTTL, - ID: utils.MakeSafePointID(), - } - g.Record("BackupTS", restoreTS) - - // restore checksum will check safe point with its start ts, see details at - // https://github.com/pingcap/tidb/blob/180c02127105bed73712050594da6ead4d70a85f/store/tikv/kv.go#L186-L190 - // so, we should keep the safe point unchangeable. to avoid GC life time is shorter than transaction duration. - err = utils.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp) - if err != nil { - return errors.Trace(err) - } - - var newTS uint64 - if client.IsIncremental() { - newTS = restoreTS - } - ddlJobs := restore.FilterDDLJobs(client.GetDDLJobs(), tables) - - err = client.PreCheckTableTiFlashReplica(ctx, tables) - if err != nil { - return errors.Trace(err) - } - - err = client.PreCheckTableClusterIndex(tables, ddlJobs, mgr.GetDomain()) - if err != nil { - return errors.Trace(err) - } - - // pre-set TiDB config for restore - restoreDBConfig := enableTiDBConfig() - defer restoreDBConfig() - - // execute DDL first - err = client.ExecDDLs(ctx, ddlJobs) - if err != nil { - return errors.Trace(err) - } - - // nothing to restore, maybe only ddl changes in incremental restore - if len(dbs) == 0 && len(tables) == 0 { - log.Info("nothing to restore, all databases and tables are filtered out") - // even nothing to restore, we show a success message since there is no failure. - summary.SetSuccessStatus(true) - return nil - } - - for _, db := range dbs { - err = client.CreateDatabase(ctx, db.Info) - if err != nil { - return errors.Trace(err) - } - } - - // We make bigger errCh so we won't block on multi-part failed. - errCh := make(chan error, 32) - // Maybe allow user modify the DDL concurrency isn't necessary, - // because executing DDL is really I/O bound (or, algorithm bound?), - // and we cost most of time at waiting DDL jobs be enqueued. - // So these jobs won't be faster or slower when machine become faster or slower, - // hence make it a fixed value would be fine. - var dbPool []*restore.DB - if g.OwnsStorage() { - // Only in binary we can use multi-thread sessions to create tables. - // so use OwnStorage() to tell whether we are use binary or SQL. - dbPool, err = restore.MakeDBPool(defaultDDLConcurrency, func() (*restore.DB, error) { - return restore.NewDB(g, mgr.GetStorage()) - }) - } - if err != nil { - log.Warn("create session pool failed, we will send DDLs only by created sessions", - zap.Error(err), - zap.Int("sessionCount", len(dbPool)), - ) - } - tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, dbPool, errCh) - if len(files) == 0 { - log.Info("no files, empty databases and tables are restored") - summary.SetSuccessStatus(true) - // don't return immediately, wait all pipeline done. - } - - tableFileMap := restore.MapTableToFiles(files) - log.Debug("mapped table to files", zap.Any("result map", tableFileMap)) - - rangeStream := restore.GoValidateFileRanges( - ctx, tableStream, tableFileMap, cfg.MergeSmallRegionSizeBytes, cfg.MergeSmallRegionKeyCount, errCh) - - rangeSize := restore.EstimateRangeSize(files) - summary.CollectInt("restore ranges", rangeSize) - log.Info("range and file prepared", zap.Int("file count", len(files)), zap.Int("range count", rangeSize)) - - restoreSchedulers, err := restorePreWork(ctx, client, mgr) - if err != nil { - return errors.Trace(err) - } - // Always run the post-work even on error, so we don't stuck in the import - // mode or emptied schedulers - defer restorePostWork(ctx, client, restoreSchedulers) - - // Do not reset timestamp if we are doing incremental restore, because - // we are not allowed to decrease timestamp. - if !client.IsIncremental() { - if err = client.ResetTS(ctx, cfg.PD); err != nil { - log.Error("reset pd TS failed", zap.Error(err)) - return errors.Trace(err) - } - } - - // Restore sst files in batch. - batchSize := utils.ClampInt(int(cfg.Concurrency), defaultRestoreConcurrency, maxRestoreBatchSizeLimit) - failpoint.Inject("small-batch-size", func(v failpoint.Value) { - log.Info("failpoint small batch size is on", zap.Int("size", v.(int))) - batchSize = v.(int) - }) - - // Redirect to log if there is no log file to avoid unreadable output. - updateCh := g.StartProgress( - ctx, - cmdName, - // Split/Scatter + Download/Ingest + Checksum - int64(rangeSize+len(files)+len(tables)), - !cfg.LogProgress) - defer updateCh.Close() - sender, err := restore.NewTiKVSender(ctx, client, updateCh, cfg.PDConcurrency) - if err != nil { - return errors.Trace(err) - } - manager := restore.NewBRContextManager(client) - batcher, afterRestoreStream := restore.NewBatcher(ctx, sender, manager, errCh) - batcher.SetThreshold(batchSize) - batcher.EnableAutoCommit(ctx, cfg.BatchFlushInterval) - go restoreTableStream(ctx, rangeStream, batcher, errCh) - - var finish <-chan struct{} - // Checksum - if cfg.Checksum { - finish = client.GoValidateChecksum( - ctx, afterRestoreStream, mgr.GetStorage().GetClient(), errCh, updateCh, cfg.ChecksumConcurrency) - } else { - // when user skip checksum, just collect tables, and drop them. - finish = dropToBlackhole(ctx, afterRestoreStream, errCh, updateCh) - } - - select { - case err = <-errCh: - err = multierr.Append(err, multierr.Combine(restore.Exhaust(errCh)...)) - case <-finish: - } - - // If any error happened, return now. - if err != nil { - return errors.Trace(err) - } - - // The cost of rename user table / replace into system table wouldn't be so high. - // So leave it out of the pipeline for easier implementation. - client.RestoreSystemSchemas(ctx, cfg.TableFilter) - - // Set task summary to success status. - summary.SetSuccessStatus(true) - return nil -} - -// dropToBlackhole drop all incoming tables into black hole, -// i.e. don't execute checksum, just increase the process anyhow. -func dropToBlackhole( - ctx context.Context, - tableStream <-chan restore.CreatedTable, - errCh chan<- error, - updateCh glue.Progress, -) <-chan struct{} { - outCh := make(chan struct{}, 1) - go func() { - defer func() { - close(outCh) - }() - for { - select { - case <-ctx.Done(): - errCh <- ctx.Err() - return - case _, ok := <-tableStream: - if !ok { - return - } - updateCh.Inc() - } - } - }() - return outCh -} - -func filterRestoreFiles( - client *restore.Client, - cfg *RestoreConfig, -) (files []*backuppb.File, tables []*metautil.Table, dbs []*utils.Database) { - for _, db := range client.GetDatabases() { - createdDatabase := false - dbName := db.Info.Name.O - if name, ok := utils.GetSysDBName(db.Info.Name); utils.IsSysDB(name) && ok { - dbName = name - } - for _, table := range db.Tables { - if !cfg.TableFilter.MatchTable(dbName, table.Info.Name.O) { - continue - } - if !createdDatabase { - dbs = append(dbs, db) - createdDatabase = true - } - files = append(files, table.Files...) - tables = append(tables, table) - } - } - return -} - // restorePreWork executes some prepare work before restore. // TODO make this function returns a restore post work. func restorePreWork(ctx context.Context, client *restore.Client, mgr *conn.Mgr) (pdutil.UndoFunc, error) { @@ -554,51 +195,3 @@ func restorePostWork( log.Warn("failed to restore PD schedulers", zap.Error(err)) } } - -// enableTiDBConfig tweaks some of configs of TiDB to make the restore progress go well. -// return a function that could restore the config to origin. -func enableTiDBConfig() func() { - restoreConfig := config.RestoreFunc() - config.UpdateGlobal(func(conf *config.Config) { - // set max-index-length before execute DDLs and create tables - // we set this value to max(3072*4), otherwise we might not restore table - // when upstream and downstream both set this value greater than default(3072) - conf.MaxIndexLength = config.DefMaxOfMaxIndexLength - log.Warn("set max-index-length to max(3072*4) to skip check index length in DDL") - }) - return restoreConfig -} - -// restoreTableStream blocks current goroutine and restore a stream of tables, -// by send tables to batcher. -func restoreTableStream( - ctx context.Context, - inputCh <-chan restore.TableWithRange, - batcher *restore.Batcher, - errCh chan<- error, -) { - // We cache old tables so that we can 'batch' recover TiFlash and tables. - oldTables := []*metautil.Table{} - defer func() { - // when things done, we must clean pending requests. - batcher.Close() - log.Info("doing postwork", - zap.Int("table count", len(oldTables)), - ) - }() - - for { - select { - case <-ctx.Done(): - errCh <- ctx.Err() - return - case t, ok := <-inputCh: - if !ok { - return - } - oldTables = append(oldTables, t.OldTable) - - batcher.Add(t) - } - } -}