From b5f6062a75d1f6776859c987d97084838757bfea Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Mon, 28 Mar 2022 17:48:47 +0800 Subject: [PATCH] [to #67] remove unused code related to restore Signed-off-by: Jian Zhang --- br/pkg/glue/glue.go | 3 - br/pkg/gluetidb/glue.go | 70 ------- br/pkg/restore/client.go | 20 +- br/pkg/restore/db.go | 289 ----------------------------- br/pkg/restore/db_test.go | 119 ------------ br/pkg/restore/systable_restore.go | 217 ---------------------- br/pkg/restore/util.go | 125 ------------- br/pkg/restore/util_test.go | 37 ---- 8 files changed, 1 insertion(+), 879 deletions(-) delete mode 100644 br/pkg/restore/db_test.go delete mode 100644 br/pkg/restore/systable_restore.go diff --git a/br/pkg/glue/glue.go b/br/pkg/glue/glue.go index 7f2be30a..13aa39f4 100644 --- a/br/pkg/glue/glue.go +++ b/br/pkg/glue/glue.go @@ -7,7 +7,6 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/model" pd "github.com/tikv/pd/client" ) @@ -34,8 +33,6 @@ type Glue interface { type Session interface { Execute(ctx context.Context, sql string) error ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error - CreateDatabase(ctx context.Context, schema *model.DBInfo) error - CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error Close() } diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index 952de0d3..982749cc 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -3,32 +3,19 @@ package gluetidb import ( - "bytes" "context" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" - "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta/autoid" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/session" - "github.com/pingcap/tidb/sessionctx" "github.com/tikv/migration/br/pkg/glue" "github.com/tikv/migration/br/pkg/gluetikv" pd "github.com/tikv/pd/client" ) -const ( - defaultCapOfCreateTable = 512 - defaultCapOfCreateDatabase = 64 - brComment = `/*from(br)*/` -) - // New makes a new tidb glue. func New() Glue { log.Debug("enabling no register config") @@ -113,64 +100,7 @@ func (gs *tidbSession) ExecuteInternal(ctx context.Context, sql string, args ... return errors.Trace(err) } -// CreateDatabase implements glue.Session. -func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { - d := domain.GetDomain(gs.se).DDL() - query, err := gs.showCreateDatabase(schema) - if err != nil { - return errors.Trace(err) - } - gs.se.SetValue(sessionctx.QueryString, query) - schema = schema.Clone() - if len(schema.Charset) == 0 { - schema.Charset = mysql.DefaultCharset - } - return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore, true) -} - -// CreateTable implements glue.Session. -func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error { - d := domain.GetDomain(gs.se).DDL() - query, err := gs.showCreateTable(table) - if err != nil { - return errors.Trace(err) - } - gs.se.SetValue(sessionctx.QueryString, query) - // Clone() does not clone partitions yet :( - table = table.Clone() - if table.Partition != nil { - newPartition := *table.Partition - newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...) - table.Partition = &newPartition - } - return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore, true) -} - // Close implements glue.Session. func (gs *tidbSession) Close() { gs.se.Close() } - -// showCreateTable shows the result of SHOW CREATE TABLE from a TableInfo. -func (gs *tidbSession) showCreateTable(tbl *model.TableInfo) (string, error) { - table := tbl.Clone() - table.AutoIncID = 0 - result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateTable)) - // this can never fail. - _, _ = result.WriteString(brComment) - if err := executor.ConstructResultOfShowCreateTable(gs.se, tbl, autoid.Allocators{}, result); err != nil { - return "", errors.Trace(err) - } - return result.String(), nil -} - -// showCreateDatabase shows the result of SHOW CREATE DATABASE from a dbInfo. -func (gs *tidbSession) showCreateDatabase(db *model.DBInfo) (string, error) { - result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateDatabase)) - // this can never fail. - _, _ = result.WriteString(brComment) - if err := executor.ConstructResultOfShowCreateDatabase(gs.se, db, true, result); err != nil { - return "", errors.Trace(err) - } - return result.String(), nil -} diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 580dffec..478fb2f2 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -58,16 +58,7 @@ type Client struct { databases map[string]*utils.Database ddlJobs []*model.Job backupMeta *backuppb.BackupMeta - // TODO Remove this field or replace it with a []*DB, - // since https://github.com/pingcap/br/pull/377 needs more DBs to speed up DDL execution. - // And for now, we must inject a pool of DBs to `Client.GoCreateTables`, otherwise there would be a race condition. - // This is dirty: why we need DBs from different sources? - // By replace it with a []*DB, we can remove the dirty parameter of `Client.GoCreateTable`, - // along with them in some private functions. - // Before you do it, you can firstly read discussions at - // https://github.com/pingcap/br/pull/377#discussion_r446594501, - // this probably isn't as easy as it seems like (however, not hard, too :D) - db *DB + rateLimit uint64 isOnline bool hasSpeedLimited bool @@ -95,10 +86,6 @@ func NewRestoreClient( tlsConf *tls.Config, keepaliveConf keepalive.ClientParameters, ) (*Client, error) { - db, err := NewDB(g, store) - if err != nil { - return nil, errors.Trace(err) - } dom, err := g.GetDomain(store) if err != nil { return nil, errors.Trace(err) @@ -113,7 +100,6 @@ func NewRestoreClient( return &Client{ pdClient: pdClient, toolClient: NewSplitClient(pdClient, tlsConf), - db: db, tlsConf: tlsConf, keepaliveConf: keepaliveConf, switchCh: make(chan struct{}), @@ -159,10 +145,6 @@ func (rc *Client) SetSwitchModeInterval(interval time.Duration) { // Close a client. func (rc *Client) Close() { - // rc.db can be nil in raw kv mode. - if rc.db != nil { - rc.db.Close() - } log.Info("Restore client closed") } diff --git a/br/pkg/restore/db.go b/br/pkg/restore/db.go index 594cc75b..97f487f7 100644 --- a/br/pkg/restore/db.go +++ b/br/pkg/restore/db.go @@ -1,292 +1,3 @@ // Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. package restore - -import ( - "context" - "fmt" - "sort" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" - "github.com/tikv/migration/br/pkg/glue" - "github.com/tikv/migration/br/pkg/metautil" - "github.com/tikv/migration/br/pkg/utils" - "go.uber.org/zap" -) - -// DB is a TiDB instance, not thread-safe. -type DB struct { - se glue.Session -} - -type UniqueTableName struct { - DB string - Table string -} - -// NewDB returns a new DB. -func NewDB(g glue.Glue, store kv.Storage) (*DB, error) { - se, err := g.CreateSession(store) - if err != nil { - return nil, errors.Trace(err) - } - // The session may be nil in raw kv mode - if se == nil { - return nil, nil - } - // Set SQL mode to None for avoiding SQL compatibility problem - err = se.Execute(context.Background(), "set @@sql_mode=''") - if err != nil { - return nil, errors.Trace(err) - } - return &DB{ - se: se, - }, nil -} - -// ExecDDL executes the query of a ddl job. -func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { - var err error - tableInfo := ddlJob.BinlogInfo.TableInfo - dbInfo := ddlJob.BinlogInfo.DBInfo - switch ddlJob.Type { - case model.ActionCreateSchema: - err = db.se.CreateDatabase(ctx, dbInfo) - if err != nil { - log.Error("create database failed", zap.Stringer("db", dbInfo.Name), zap.Error(err)) - } - return errors.Trace(err) - case model.ActionCreateTable: - err = db.se.CreateTable(ctx, model.NewCIStr(ddlJob.SchemaName), tableInfo) - if err != nil { - log.Error("create table failed", - zap.Stringer("db", dbInfo.Name), - zap.Stringer("table", tableInfo.Name), - zap.Error(err)) - } - return errors.Trace(err) - } - - if tableInfo != nil { - switchDBSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName)) - err = db.se.Execute(ctx, switchDBSQL) - if err != nil { - log.Error("switch db failed", - zap.String("query", switchDBSQL), - zap.String("db", ddlJob.SchemaName), - zap.Error(err)) - return errors.Trace(err) - } - } - err = db.se.Execute(ctx, ddlJob.Query) - if err != nil { - log.Error("execute ddl query failed", - zap.String("query", ddlJob.Query), - zap.String("db", ddlJob.SchemaName), - zap.Int64("historySchemaVersion", ddlJob.BinlogInfo.SchemaVersion), - zap.Error(err)) - } - return errors.Trace(err) -} - -// UpdateStatsMeta update count and snapshot ts in mysql.stats_meta -func (db *DB) UpdateStatsMeta(ctx context.Context, tableID int64, restoreTS uint64, count uint64) error { - sysDB := mysql.SystemDB - statsMetaTbl := "stats_meta" - - // set restoreTS to snapshot and version which is used to update stats_meta - err := db.se.ExecuteInternal( - ctx, - "update %n.%n set snapshot = %?, version = %?, count = %? where table_id = %?", - sysDB, - statsMetaTbl, - restoreTS, - restoreTS, - count, - tableID, - ) - if err != nil { - log.Error("execute update sql failed", zap.Error(err)) - } - return nil -} - -// CreateDatabase executes a CREATE DATABASE SQL. -func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { - err := db.se.CreateDatabase(ctx, schema) - if err != nil { - log.Error("create database failed", zap.Stringer("db", schema.Name), zap.Error(err)) - } - return errors.Trace(err) -} - -// CreateTable executes a CREATE TABLE SQL. -func (db *DB) CreateTable(ctx context.Context, table *metautil.Table, ddlTables map[UniqueTableName]bool) error { - err := db.se.CreateTable(ctx, table.DB.Name, table.Info) - if err != nil { - log.Error("create table failed", - zap.Stringer("db", table.DB.Name), - zap.Stringer("table", table.Info.Name), - zap.Error(err)) - return errors.Trace(err) - } - - var restoreMetaSQL string - switch { - case table.Info.IsView(): - return nil - case table.Info.IsSequence(): - setValFormat := fmt.Sprintf("do setval(%s.%s, %%d);", - utils.EncloseName(table.DB.Name.O), - utils.EncloseName(table.Info.Name.O)) - if table.Info.Sequence.Cycle { - increment := table.Info.Sequence.Increment - // TiDB sequence's behaviour is designed to keep the same pace - // among all nodes within the same cluster. so we need restore round. - // Here is a hack way to trigger sequence cycle round > 0 according to - // https://github.com/pingcap/br/pull/242#issuecomment-631307978 - // TODO use sql to set cycle round - nextSeqSQL := fmt.Sprintf("do nextval(%s.%s);", - utils.EncloseName(table.DB.Name.O), - utils.EncloseName(table.Info.Name.O)) - var setValSQL string - if increment < 0 { - setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MinValue) - } else { - setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MaxValue) - } - err = db.se.Execute(ctx, setValSQL) - if err != nil { - log.Error("restore meta sql failed", - zap.String("query", setValSQL), - zap.Stringer("db", table.DB.Name), - zap.Stringer("table", table.Info.Name), - zap.Error(err)) - return errors.Trace(err) - } - - // trigger cycle round > 0 - err = db.se.Execute(ctx, nextSeqSQL) - if err != nil { - log.Error("restore meta sql failed", - zap.String("query", nextSeqSQL), - zap.Stringer("db", table.DB.Name), - zap.Stringer("table", table.Info.Name), - zap.Error(err)) - return errors.Trace(err) - } - } - restoreMetaSQL = fmt.Sprintf(setValFormat, table.Info.AutoIncID) - err = db.se.Execute(ctx, restoreMetaSQL) - if err != nil { - log.Error("restore meta sql failed", - zap.String("query", restoreMetaSQL), - zap.Stringer("db", table.DB.Name), - zap.Stringer("table", table.Info.Name), - zap.Error(err)) - return errors.Trace(err) - } - // only table exists in ddlJobs during incremental restoration should do alter after creation. - case ddlTables[UniqueTableName{table.DB.Name.String(), table.Info.Name.String()}]: - if utils.NeedAutoID(table.Info) { - restoreMetaSQL = fmt.Sprintf( - "alter table %s.%s auto_increment = %d;", - utils.EncloseName(table.DB.Name.O), - utils.EncloseName(table.Info.Name.O), - table.Info.AutoIncID) - } else if table.Info.PKIsHandle && table.Info.ContainsAutoRandomBits() { - restoreMetaSQL = fmt.Sprintf( - "alter table %s.%s auto_random_base = %d", - utils.EncloseName(table.DB.Name.O), - utils.EncloseName(table.Info.Name.O), - table.Info.AutoRandID) - } else { - log.Info("table exists in incremental ddl jobs, but don't need to be altered", - zap.Stringer("db", table.DB.Name), - zap.Stringer("table", table.Info.Name)) - return nil - } - err = db.se.Execute(ctx, restoreMetaSQL) - if err != nil { - log.Error("restore meta sql failed", - zap.String("query", restoreMetaSQL), - zap.Stringer("db", table.DB.Name), - zap.Stringer("table", table.Info.Name), - zap.Error(err)) - return errors.Trace(err) - } - } - return errors.Trace(err) -} - -// Close closes the connection. -func (db *DB) Close() { - db.se.Close() -} - -// FilterDDLJobs filters ddl jobs. -func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs []*model.Job) { - // Sort the ddl jobs by schema version in descending order. - sort.Slice(allDDLJobs, func(i, j int) bool { - return allDDLJobs[i].BinlogInfo.SchemaVersion > allDDLJobs[j].BinlogInfo.SchemaVersion - }) - dbs := getDatabases(tables) - for _, db := range dbs { - // These maps is for solving some corner case. - // e.g. let "t=2" indicates that the id of database "t" is 2, if the ddl execution sequence is: - // rename "a" to "b"(a=1) -> drop "b"(b=1) -> create "b"(b=2) -> rename "b" to "a"(a=2) - // Which we cannot find the "create" DDL by name and id directly. - // To cover †his case, we must find all names and ids the database/table ever had. - dbIDs := make(map[int64]bool) - dbIDs[db.ID] = true - dbNames := make(map[string]bool) - dbNames[db.Name.String()] = true - for _, job := range allDDLJobs { - if job.BinlogInfo.DBInfo != nil { - if dbIDs[job.SchemaID] || dbNames[job.BinlogInfo.DBInfo.Name.String()] { - ddlJobs = append(ddlJobs, job) - // The the jobs executed with the old id, like the step 2 in the example above. - dbIDs[job.SchemaID] = true - // For the jobs executed after rename, like the step 3 in the example above. - dbNames[job.BinlogInfo.DBInfo.Name.String()] = true - } - } - } - } - - for _, table := range tables { - tableIDs := make(map[int64]bool) - tableIDs[table.Info.ID] = true - tableNames := make(map[UniqueTableName]bool) - name := UniqueTableName{table.DB.Name.String(), table.Info.Name.String()} - tableNames[name] = true - for _, job := range allDDLJobs { - if job.BinlogInfo.TableInfo != nil { - name = UniqueTableName{job.SchemaName, job.BinlogInfo.TableInfo.Name.String()} - if tableIDs[job.TableID] || tableNames[name] { - ddlJobs = append(ddlJobs, job) - tableIDs[job.TableID] = true - // For truncate table, the id may be changed - tableIDs[job.BinlogInfo.TableInfo.ID] = true - tableNames[name] = true - } - } - } - } - return ddlJobs -} - -func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) { - dbIDs := make(map[int64]bool) - for _, table := range tables { - if !dbIDs[table.DB.ID] { - dbs = append(dbs, table.DB) - dbIDs[table.DB.ID] = true - } - } - return -} diff --git a/br/pkg/restore/db_test.go b/br/pkg/restore/db_test.go deleted file mode 100644 index babfc16c..00000000 --- a/br/pkg/restore/db_test.go +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package restore_test - -import ( - "context" - "math" - "strconv" - "testing" - - "github.com/pingcap/tidb/meta/autoid" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/testkit" - "github.com/stretchr/testify/require" - "github.com/tikv/migration/br/pkg/gluetidb" - "github.com/tikv/migration/br/pkg/metautil" - "github.com/tikv/migration/br/pkg/mock" - "github.com/tikv/migration/br/pkg/restore" - "github.com/tikv/migration/br/pkg/storage" -) - -type testRestoreSchemaSuite struct { - mock *mock.Cluster - storage storage.ExternalStorage -} - -func createRestoreSchemaSuite(t *testing.T) (s *testRestoreSchemaSuite, clean func()) { - var err error - s = new(testRestoreSchemaSuite) - s.mock, err = mock.NewCluster() - require.NoError(t, err) - base := t.TempDir() - s.storage, err = storage.NewLocalStorage(base) - require.NoError(t, err) - require.NoError(t, s.mock.Start()) - clean = func() { - s.mock.Stop() - } - return -} - -func TestRestoreAutoIncID(t *testing.T) { - s, clean := createRestoreSchemaSuite(t) - defer clean() - tk := testkit.NewTestKit(t, s.mock.Storage) - tk.MustExec("use test") - tk.MustExec("set @@sql_mode=''") - tk.MustExec("drop table if exists `\"t\"`;") - // Test SQL Mode - tk.MustExec("create table `\"t\"` (" + - "a int not null," + - "time timestamp not null default '0000-00-00 00:00:00');", - ) - tk.MustExec("insert into `\"t\"` values (10, '0000-00-00 00:00:00');") - // Query the current AutoIncID - autoIncID, err := strconv.ParseUint(tk.MustQuery("admin show `\"t\"` next_row_id").Rows()[0][3].(string), 10, 64) - require.NoErrorf(t, err, "Error query auto inc id: %s", err) - // Get schemas of db and table - info, err := s.mock.Domain.GetSnapshotInfoSchema(math.MaxUint64) - require.NoErrorf(t, err, "Error get snapshot info schema: %s", err) - dbInfo, exists := info.SchemaByName(model.NewCIStr("test")) - require.Truef(t, exists, "Error get db info") - tableInfo, err := info.TableByName(model.NewCIStr("test"), model.NewCIStr("\"t\"")) - require.NoErrorf(t, err, "Error get table info: %s", err) - table := metautil.Table{ - Info: tableInfo.Meta(), - DB: dbInfo, - } - // Get the next AutoIncID - idAlloc := autoid.NewAllocator(s.mock.Storage, dbInfo.ID, table.Info.ID, false, autoid.RowIDAllocType) - globalAutoID, err := idAlloc.NextGlobalAutoID() - require.NoErrorf(t, err, "Error allocate next auto id") - require.Equal(t, uint64(globalAutoID), autoIncID) - // Alter AutoIncID to the next AutoIncID + 100 - table.Info.AutoIncID = globalAutoID + 100 - db, err := restore.NewDB(gluetidb.New(), s.mock.Storage) - require.NoErrorf(t, err, "Error create DB") - tk.MustExec("drop database if exists test;") - // Test empty collate value - table.DB.Charset = "utf8mb4" - table.DB.Collate = "" - err = db.CreateDatabase(context.Background(), table.DB) - require.NoErrorf(t, err, "Error create empty collate db: %s %s", err, s.mock.DSN) - tk.MustExec("drop database if exists test;") - // Test empty charset value - table.DB.Charset = "" - table.DB.Collate = "utf8mb4_bin" - err = db.CreateDatabase(context.Background(), table.DB) - require.NoErrorf(t, err, "Error create empty charset db: %s %s", err, s.mock.DSN) - uniqueMap := make(map[restore.UniqueTableName]bool) - err = db.CreateTable(context.Background(), &table, uniqueMap) - require.NoErrorf(t, err, "Error create table: %s %s", err, s.mock.DSN) - - tk.MustExec("use test") - autoIncID, err = strconv.ParseUint(tk.MustQuery("admin show `\"t\"` next_row_id").Rows()[0][3].(string), 10, 64) - require.NoErrorf(t, err, "Error query auto inc id: %s", err) - // Check if AutoIncID is altered successfully. - require.Equal(t, uint64(globalAutoID+100), autoIncID) - - // try again, failed due to table exists. - table.Info.AutoIncID = globalAutoID + 200 - err = db.CreateTable(context.Background(), &table, uniqueMap) - require.NoErrorf(t, err, "Got unexpected error when create table: %v", err) - // Check if AutoIncID is not altered. - autoIncID, err = strconv.ParseUint(tk.MustQuery("admin show `\"t\"` next_row_id").Rows()[0][3].(string), 10, 64) - require.NoErrorf(t, err, "Error query auto inc id: %s", err) - require.Equal(t, uint64(globalAutoID+100), autoIncID) - - // try again, success because we use alter sql in unique map. - table.Info.AutoIncID = globalAutoID + 300 - uniqueMap[restore.UniqueTableName{"test", "\"t\""}] = true - err = db.CreateTable(context.Background(), &table, uniqueMap) - require.NoErrorf(t, err, "Error create table: %s", err) - // Check if AutoIncID is altered to globalAutoID + 300. - autoIncID, err = strconv.ParseUint(tk.MustQuery("admin show `\"t\"` next_row_id").Rows()[0][3].(string), 10, 64) - require.NoErrorf(t, err, "Error query auto inc id: %s", err) - require.Equal(t, uint64(globalAutoID+300), autoIncID) - -} diff --git a/br/pkg/restore/systable_restore.go b/br/pkg/restore/systable_restore.go deleted file mode 100644 index 9205be76..00000000 --- a/br/pkg/restore/systable_restore.go +++ /dev/null @@ -1,217 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package restore - -import ( - "context" - "fmt" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - filter "github.com/pingcap/tidb-tools/pkg/table-filter" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" - berrors "github.com/tikv/migration/br/pkg/errors" - "github.com/tikv/migration/br/pkg/logutil" - "github.com/tikv/migration/br/pkg/utils" - "go.uber.org/multierr" - "go.uber.org/zap" -) - -var statsTables = map[string]struct{}{ - "stats_buckets": {}, - "stats_extended": {}, - "stats_feedback": {}, - "stats_fm_sketch": {}, - "stats_histograms": {}, - "stats_meta": {}, - "stats_top_n": {}, -} - -var unRecoverableTable = map[string]struct{}{ - // some variables in tidb (e.g. gc_safe_point) cannot be recovered. - "tidb": {}, - "global_variables": {}, - - // all user related tables cannot be recovered for now. - "column_stats_usage": {}, - "columns_priv": {}, - "db": {}, - "default_roles": {}, - "global_grants": {}, - "global_priv": {}, - "role_edges": {}, - "tables_priv": {}, - "user": {}, - "capture_plan_baselines_blacklist": {}, - // gc info don't need to recover. - "gc_delete_range": {}, - "gc_delete_range_done": {}, - - // schema_index_usage has table id need to be rewrite. - "schema_index_usage": {}, -} - -func isUnrecoverableTable(tableName string) bool { - _, ok := unRecoverableTable[tableName] - return ok -} - -func isStatsTable(tableName string) bool { - _, ok := statsTables[tableName] - return ok -} - -// RestoreSystemSchemas restores the system schema(i.e. the `mysql` schema). -// Detail see https://github.com/pingcap/br/issues/679#issuecomment-762592254. -func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter) { - sysDB := mysql.SystemDB - - temporaryDB := utils.TemporaryDBName(sysDB) - defer rc.cleanTemporaryDatabase(ctx, sysDB) - - if !f.MatchSchema(sysDB) { - log.Debug("system database filtered out", zap.String("database", sysDB)) - return - } - originDatabase, ok := rc.databases[temporaryDB.O] - if !ok { - log.Info("system database not backed up, skipping", zap.String("database", sysDB)) - return - } - db, ok := rc.getDatabaseByName(sysDB) - if !ok { - // Or should we create the database here? - log.Warn("target database not exist, aborting", zap.String("database", sysDB)) - return - } - - tablesRestored := make([]string, 0, len(originDatabase.Tables)) - for _, table := range originDatabase.Tables { - tableName := table.Info.Name - if f.MatchTable(sysDB, tableName.O) { - if err := rc.replaceTemporaryTableToSystable(ctx, tableName.L, db); err != nil { - log.Warn("error during merging temporary tables into system tables", - logutil.ShortError(err), - zap.Stringer("table", tableName), - ) - } - tablesRestored = append(tablesRestored, tableName.L) - } - } - if err := rc.afterSystemTablesReplaced(tablesRestored); err != nil { - for _, e := range multierr.Errors(err) { - log.Warn("error during reconfigurating the system tables", zap.String("database", sysDB), logutil.ShortError(e)) - } - } -} - -// database is a record of a database. -// For fast querying whether a table exists and the temporary database of it. -type database struct { - ExistingTables map[string]*model.TableInfo - Name model.CIStr - TemporaryName model.CIStr -} - -// getDatabaseByName make a record of a database from info schema by its name. -func (rc *Client) getDatabaseByName(name string) (*database, bool) { - infoSchema := rc.dom.InfoSchema() - schema, ok := infoSchema.SchemaByName(model.NewCIStr(name)) - if !ok { - return nil, false - } - db := &database{ - ExistingTables: map[string]*model.TableInfo{}, - Name: model.NewCIStr(name), - TemporaryName: utils.TemporaryDBName(name), - } - for _, t := range schema.Tables { - db.ExistingTables[t.Name.L] = t - } - return db, true -} - -// afterSystemTablesReplaced do some extra work for special system tables. -// e.g. after inserting to the table mysql.user, we must execute `FLUSH PRIVILEGES` to allow it take effect. -func (rc *Client) afterSystemTablesReplaced(tables []string) error { - var err error - for _, table := range tables { - switch { - case table == "user": - // We cannot execute `rc.dom.NotifyUpdatePrivilege` here, because there isn't - // sessionctx.Context provided by the glue. - // TODO: update the glue type and allow we retrieve a session context from it. - err = multierr.Append(err, errors.Annotatef(berrors.ErrUnsupportedSystemTable, - "restored user info may not take effect, until you should execute `FLUSH PRIVILEGES` manually")) - } - } - return err -} - -// replaceTemporaryTableToSystable replaces the temporary table to real system table. -func (rc *Client) replaceTemporaryTableToSystable(ctx context.Context, tableName string, db *database) error { - execSQL := func(sql string) error { - // SQLs here only contain table name and database name, seems it is no need to redact them. - if err := rc.db.se.Execute(ctx, sql); err != nil { - log.Warn("failed to execute SQL restore system database", - zap.String("table", tableName), - zap.Stringer("database", db.Name), - zap.String("sql", sql), - zap.Error(err), - ) - return berrors.ErrUnknown.Wrap(err).GenWithStack("failed to execute %s", sql) - } - log.Info("successfully restore system database", - zap.String("table", tableName), - zap.Stringer("database", db.Name), - zap.String("sql", sql), - ) - return nil - } - - // The newly created tables have different table IDs with original tables, - // hence the old statistics are invalid. - // - // TODO: - // 1 ) Rewrite the table IDs via `UPDATE _temporary_mysql.stats_xxx SET table_id = new_table_id WHERE table_id = old_table_id` - // BEFORE replacing into and then execute `rc.statsHandler.Update(rc.dom.InfoSchema())`. - // 1.5 ) (Optional) The UPDATE statement sometimes costs, the whole system tables restore step can be place into the restore pipeline. - // 2 ) Deprecate the origin interface for backing up statistics. - if isStatsTable(tableName) { - return berrors.ErrUnsupportedSystemTable.GenWithStack("restoring stats via `mysql` schema isn't support yet: " + - "the table ID is out-of-date and may corrupt existing statistics") - } - - if isUnrecoverableTable(tableName) { - return berrors.ErrUnsupportedSystemTable.GenWithStack("restoring unsupported `mysql` schema table") - } - - if db.ExistingTables[tableName] != nil { - log.Info("table existing, using replace into for restore", - zap.String("table", tableName), - zap.Stringer("schema", db.Name)) - replaceIntoSQL := fmt.Sprintf("REPLACE INTO %s SELECT * FROM %s;", - utils.EncloseDBAndTable(db.Name.L, tableName), - utils.EncloseDBAndTable(db.TemporaryName.L, tableName)) - return execSQL(replaceIntoSQL) - } - - renameSQL := fmt.Sprintf("RENAME TABLE %s TO %s;", - utils.EncloseDBAndTable(db.TemporaryName.L, tableName), - utils.EncloseDBAndTable(db.Name.L, tableName), - ) - return execSQL(renameSQL) -} - -func (rc *Client) cleanTemporaryDatabase(ctx context.Context, originDB string) { - database := utils.TemporaryDBName(originDB) - log.Debug("dropping temporary database", zap.Stringer("database", database)) - sql := fmt.Sprintf("DROP DATABASE IF EXISTS %s", utils.EncloseName(database.L)) - if err := rc.db.se.Execute(ctx, sql); err != nil { - logutil.WarnTerm("failed to drop temporary database, it should be dropped manually", - zap.Stringer("database", database), - logutil.ShortError(err), - ) - } -} diff --git a/br/pkg/restore/util.go b/br/pkg/restore/util.go index 9bccb3a8..5380de92 100644 --- a/br/pkg/restore/util.go +++ b/br/pkg/restore/util.go @@ -137,131 +137,6 @@ func GetSSTMetaFromFile( } } -// MakeDBPool makes a session pool with specficated size by sessionFactory. -func MakeDBPool(size uint, dbFactory func() (*DB, error)) ([]*DB, error) { - dbPool := make([]*DB, 0, size) - for i := uint(0); i < size; i++ { - db, e := dbFactory() - if e != nil { - return dbPool, e - } - dbPool = append(dbPool, db) - } - return dbPool, nil -} - -// EstimateRangeSize estimates the total range count by file. -func EstimateRangeSize(files []*backuppb.File) int { - result := 0 - for _, f := range files { - if strings.HasSuffix(f.GetName(), "_write.sst") { - result++ - } - } - return result -} - -// MapTableToFiles makes a map that mapping table ID to its backup files. -// aware that one file can and only can hold one table. -func MapTableToFiles(files []*backuppb.File) map[int64][]*backuppb.File { - result := map[int64][]*backuppb.File{} - for _, file := range files { - tableID := tablecodec.DecodeTableID(file.GetStartKey()) - tableEndID := tablecodec.DecodeTableID(file.GetEndKey()) - if tableID != tableEndID { - log.Panic("key range spread between many files.", - zap.String("file name", file.Name), - logutil.Key("startKey", file.StartKey), - logutil.Key("endKey", file.EndKey)) - } - if tableID == 0 { - log.Panic("invalid table key of file", - zap.String("file name", file.Name), - logutil.Key("startKey", file.StartKey), - logutil.Key("endKey", file.EndKey)) - } - result[tableID] = append(result[tableID], file) - } - return result -} - -// GoValidateFileRanges validate files by a stream of tables and yields -// tables with range. -func GoValidateFileRanges( - ctx context.Context, - tableStream <-chan CreatedTable, - fileOfTable map[int64][]*backuppb.File, - splitSizeBytes, splitKeyCount uint64, - errCh chan<- error, -) <-chan TableWithRange { - // Could we have a smaller outCh size? - outCh := make(chan TableWithRange, len(fileOfTable)) - go func() { - defer close(outCh) - defer log.Info("all range generated") - for { - select { - case <-ctx.Done(): - errCh <- ctx.Err() - return - case t, ok := <-tableStream: - if !ok { - return - } - files := fileOfTable[t.OldTable.Info.ID] - if partitions := t.OldTable.Info.Partition; partitions != nil { - log.Debug("table partition", - zap.Stringer("database", t.OldTable.DB.Name), - zap.Stringer("table", t.Table.Name), - zap.Any("partition info", partitions), - ) - for _, partition := range partitions.Definitions { - files = append(files, fileOfTable[partition.ID]...) - } - } - for _, file := range files { - err := ValidateFileRewriteRule(file, t.RewriteRule) - if err != nil { - errCh <- err - return - } - } - // Merge small ranges to reduce split and scatter regions. - ranges, stat, err := MergeFileRanges( - files, splitSizeBytes, splitKeyCount) - if err != nil { - errCh <- err - return - } - log.Info("merge and validate file", - zap.Stringer("database", t.OldTable.DB.Name), - zap.Stringer("table", t.Table.Name), - zap.Int("Files(total)", stat.TotalFiles), - zap.Int("File(write)", stat.TotalWriteCFFile), - zap.Int("File(default)", stat.TotalDefaultCFFile), - zap.Int("Region(total)", stat.TotalRegions), - zap.Int("Regoin(keys avg)", stat.RegionKeysAvg), - zap.Int("Region(bytes avg)", stat.RegionBytesAvg), - zap.Int("Merged(regions)", stat.MergedRegions), - zap.Int("Merged(keys avg)", stat.MergedRegionKeysAvg), - zap.Int("Merged(bytes avg)", stat.MergedRegionBytesAvg)) - - tableWithRange := TableWithRange{ - CreatedTable: t, - Range: ranges, - } - log.Debug("sending range info", - zap.Stringer("table", t.Table.Name), - zap.Int("files", len(files)), - zap.Int("range size", len(ranges)), - zap.Int("output channel size", len(outCh))) - outCh <- tableWithRange - } - } - }() - return outCh -} - // ValidateFileRewriteRule uses rewrite rules to validate the ranges of a file. func ValidateFileRewriteRule(file *backuppb.File, rewriteRules *RewriteRules) error { // Check if the start key has a matched rewrite key diff --git a/br/pkg/restore/util_test.go b/br/pkg/restore/util_test.go index 7edda201..7a2e2e83 100644 --- a/br/pkg/restore/util_test.go +++ b/br/pkg/restore/util_test.go @@ -53,43 +53,6 @@ func TestGetSSTMetaFromFile(t *testing.T) { require.Equal(t, "t2\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff", string(sstMeta.GetRange().GetEnd())) } -func TestMapTableToFiles(t *testing.T) { - filesOfTable1 := []*backuppb.File{ - { - Name: "table1-1.sst", - StartKey: tablecodec.EncodeTablePrefix(1), - EndKey: tablecodec.EncodeTablePrefix(1), - }, - { - Name: "table1-2.sst", - StartKey: tablecodec.EncodeTablePrefix(1), - EndKey: tablecodec.EncodeTablePrefix(1), - }, - { - Name: "table1-3.sst", - StartKey: tablecodec.EncodeTablePrefix(1), - EndKey: tablecodec.EncodeTablePrefix(1), - }, - } - filesOfTable2 := []*backuppb.File{ - { - Name: "table2-1.sst", - StartKey: tablecodec.EncodeTablePrefix(2), - EndKey: tablecodec.EncodeTablePrefix(2), - }, - { - Name: "table2-2.sst", - StartKey: tablecodec.EncodeTablePrefix(2), - EndKey: tablecodec.EncodeTablePrefix(2), - }, - } - - result := restore.MapTableToFiles(append(filesOfTable2, filesOfTable1...)) - - require.Equal(t, filesOfTable1, result[1]) - require.Equal(t, filesOfTable2, result[2]) -} - func TestValidateFileRewriteRule(t *testing.T) { rules := &restore.RewriteRules{ Data: []*import_sstpb.RewriteRule{{