From 1acb8f7e437b1a5463a02a9da45a1cadd48ba9d5 Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Thu, 25 Jul 2024 14:31:35 +0800 Subject: [PATCH] br: make br compatible with infoschemaV2 (#52718) close pingcap/tidb#52717 --- br/pkg/backup/client.go | 4 +- br/pkg/restore/BUILD.bazel | 6 +- br/pkg/restore/ingestrec/BUILD.bazel | 7 + br/pkg/restore/ingestrec/ingest_recorder.go | 94 +++--- .../restore/ingestrec/ingest_recorder_test.go | 317 +++++++++++------- br/pkg/restore/log_client/client.go | 29 +- br/pkg/restore/misc.go | 48 ++- br/pkg/restore/misc_test.go | 89 ++--- br/pkg/restore/snap_client/client.go | 29 +- .../restore/snap_client/systable_restore.go | 22 +- br/pkg/stream/stream_mgr.go | 21 +- br/pkg/task/BUILD.bazel | 1 + br/pkg/task/restore_data.go | 11 +- br/pkg/task/stream.go | 13 +- br/tests/br_pitr/run.sh | 7 +- br/tests/run_group_br_tests.sh | 5 +- 16 files changed, 410 insertions(+), 293 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 7790dc0ee3858..2040edaf7ab25 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -809,9 +809,7 @@ func BuildBackupRangeAndInitSchema( return nil, nil, nil, nil } return ranges, NewBackupSchemas(func(storage kv.Storage, fn func(*model.DBInfo, *model.TableInfo)) error { - return BuildBackupSchemas(storage, tableFilter, backupTS, isFullBackup, func(dbInfo *model.DBInfo, tableInfo *model.TableInfo) { - fn(dbInfo, tableInfo) - }) + return BuildBackupSchemas(storage, tableFilter, backupTS, isFullBackup, fn) }, schemasNum), policies, nil } diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index 41bf3989a7079..3d52de515d143 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -11,10 +11,13 @@ go_library( deps = [ "//br/pkg/conn", "//br/pkg/conn/util", + "//br/pkg/errors", "//br/pkg/logutil", "//br/pkg/pdutil", "//br/pkg/utils", "//pkg/domain", + "//pkg/kv", + "//pkg/meta", "//pkg/parser/model", "//pkg/util", "@com_github_go_sql_driver_mysql//:mysql", @@ -49,8 +52,9 @@ go_test( "//br/pkg/mock", "//br/pkg/pdutil", "//br/pkg/utiltest", - "//pkg/infoschema", + "//pkg/kv", "//pkg/parser/model", + "//pkg/session", "@com_github_coreos_go_semver//semver", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/import_sstpb", diff --git a/br/pkg/restore/ingestrec/BUILD.bazel b/br/pkg/restore/ingestrec/BUILD.bazel index a8fd65359d18b..a69cde11f8290 100644 --- a/br/pkg/restore/ingestrec/BUILD.bazel +++ b/br/pkg/restore/ingestrec/BUILD.bazel @@ -6,9 +6,12 @@ go_library( importpath = "github.com/pingcap/tidb/br/pkg/restore/ingestrec", visibility = ["//visibility:public"], deps = [ + "//pkg/infoschema", "//pkg/parser/model", "//pkg/types", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_log//:log", + "@org_uber_go_zap//:zap", ], ) @@ -20,7 +23,11 @@ go_test( shard_count = 3, deps = [ ":ingestrec", + "//pkg/kv", + "//pkg/meta", "//pkg/parser/model", + "//pkg/session", + "//pkg/store/mockstore", "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//require", ], diff --git a/br/pkg/restore/ingestrec/ingest_recorder.go b/br/pkg/restore/ingestrec/ingest_recorder.go index 0167fe8b492c9..54a0dccd31d26 100644 --- a/br/pkg/restore/ingestrec/ingest_recorder.go +++ b/br/pkg/restore/ingestrec/ingest_recorder.go @@ -17,10 +17,14 @@ package ingestrec import ( "fmt" "strings" + "time" "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/types" + "go.uber.org/zap" ) // IngestIndexInfo records the information used to generate index drop/re-add SQL. @@ -136,53 +140,65 @@ func (i *IngestRecorder) RewriteTableID(rewriteFunc func(tableID int64) (int64, } // UpdateIndexInfo uses the newest schemas to update the ingest index's information -func (i *IngestRecorder) UpdateIndexInfo(dbInfos []*model.DBInfo) { - for _, dbInfo := range dbInfos { - for _, tblInfo := range dbInfo.Tables { - tableindexes, tblexists := i.items[tblInfo.ID] - if !tblexists { +func (i *IngestRecorder) UpdateIndexInfo(infoSchema infoschema.InfoSchema) error { + log.Info("start to update index information for ingest index") + start := time.Now() + defer func() { + log.Info("finish updating index information for ingest index", zap.Duration("takes", time.Since(start))) + }() + + for tableID, tableIndexes := range i.items { + tblInfo, tblexists := infoSchema.TableInfoByID(tableID) + if !tblexists || tblInfo == nil { + log.Info("skip repair ingest index, table is dropped", zap.Int64("table id", tableID)) + continue + } + // TODO: here only need an interface function like `SchemaNameByID` + dbInfo, dbexists := infoSchema.SchemaByID(tblInfo.DBID) + if !dbexists || dbInfo == nil { + return errors.Errorf("failed to repair ingest index because table exists but cannot find database."+ + "[table-id:%d][db-id:%d]", tableID, tblInfo.DBID) + } + for _, indexInfo := range tblInfo.Indices { + index, idxexists := tableIndexes[indexInfo.ID] + if !idxexists { continue } - for _, indexInfo := range tblInfo.Indices { - index, idxexists := tableindexes[indexInfo.ID] - if !idxexists { - continue + var columnListBuilder strings.Builder + var columnListArgs []any = make([]any, 0, len(indexInfo.Columns)) + var isFirst bool = true + for _, column := range indexInfo.Columns { + if !isFirst { + columnListBuilder.WriteByte(',') } - var columnListBuilder strings.Builder - var columnListArgs []any = make([]any, 0, len(indexInfo.Columns)) - var isFirst bool = true - for _, column := range indexInfo.Columns { - if !isFirst { - columnListBuilder.WriteByte(',') - } - isFirst = false - - // expression / column - col := tblInfo.Columns[column.Offset] - if col.Hidden { - // (expression) - // the generated expression string can be directly add into sql - columnListBuilder.WriteByte('(') - columnListBuilder.WriteString(col.GeneratedExprString) - columnListBuilder.WriteByte(')') - } else { - // columnName - columnListBuilder.WriteString("%n") - columnListArgs = append(columnListArgs, column.Name.O) - if column.Length != types.UnspecifiedLength { - columnListBuilder.WriteString(fmt.Sprintf("(%d)", column.Length)) - } + isFirst = false + + // expression / column + col := tblInfo.Columns[column.Offset] + if col.Hidden { + // (expression) + // the generated expression string can be directly add into sql + columnListBuilder.WriteByte('(') + columnListBuilder.WriteString(col.GeneratedExprString) + columnListBuilder.WriteByte(')') + } else { + // columnName + columnListBuilder.WriteString("%n") + columnListArgs = append(columnListArgs, column.Name.O) + if column.Length != types.UnspecifiedLength { + columnListBuilder.WriteString(fmt.Sprintf("(%d)", column.Length)) } } - index.ColumnList = columnListBuilder.String() - index.ColumnArgs = columnListArgs - index.IndexInfo = indexInfo - index.SchemaName = dbInfo.Name - index.TableName = tblInfo.Name - index.Updated = true } + index.ColumnList = columnListBuilder.String() + index.ColumnArgs = columnListArgs + index.IndexInfo = indexInfo + index.SchemaName = dbInfo.Name + index.TableName = tblInfo.Name + index.Updated = true } } + return nil } // Iterate iterates all the ingest index. diff --git a/br/pkg/restore/ingestrec/ingest_recorder_test.go b/br/pkg/restore/ingestrec/ingest_recorder_test.go index 9772aa4645dc5..56d655a01416a 100644 --- a/br/pkg/restore/ingestrec/ingest_recorder_test.go +++ b/br/pkg/restore/ingestrec/ingest_recorder_test.go @@ -15,12 +15,17 @@ package ingestrec_test import ( + "context" "encoding/json" "testing" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/restore/ingestrec" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/session" + "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/stretchr/testify/require" ) @@ -93,52 +98,80 @@ func hasOneItem(idxID int64, columnList string, columnArgs []any) (iterateFunc, }, &count } +func createMeta(t *testing.T, store kv.Storage, fn func(m *meta.Meta)) { + txn, err := store.Begin() + require.NoError(t, err) + + fn(meta.NewMeta(txn)) + + err = txn.Commit(context.Background()) + require.NoError(t, err) +} + func TestAddIngestRecorder(t *testing.T) { - allSchemas := []*model.DBInfo{ - { - Name: model.NewCIStr(SchemaName), - Tables: []*model.TableInfo{ + store, err := mockstore.NewMockStore(mockstore.WithStoreType(mockstore.EmbedUnistore)) + require.NoError(t, err) + defer func() { + require.NoError(t, store.Close()) + }() + + createMeta(t, store, func(m *meta.Meta) { + dbInfo := &model.DBInfo{ + ID: 1, + Name: model.NewCIStr(SchemaName), + State: model.StatePublic, + } + err := m.CreateDatabase(dbInfo) + require.NoError(t, err) + tblInfo := &model.TableInfo{ + ID: TableID, + Name: model.NewCIStr(TableName), + Columns: []*model.ColumnInfo{ { - ID: TableID, - Name: model.NewCIStr(TableName), - Columns: []*model.ColumnInfo{ + Name: model.NewCIStr("x"), + Hidden: false, + State: model.StatePublic, + }, + { + Name: model.NewCIStr("y"), + Hidden: false, + State: model.StatePublic, + }, + }, + Indices: []*model.IndexInfo{ + { + ID: 1, + Name: model.NewCIStr("x"), + Table: model.NewCIStr(TableName), + Columns: []*model.IndexColumn{ { Name: model.NewCIStr("x"), - Hidden: false, + Offset: 0, + Length: -1, }, { Name: model.NewCIStr("y"), - Hidden: false, - }, - }, - Indices: []*model.IndexInfo{ - { - ID: 1, - Name: model.NewCIStr("x"), - Table: model.NewCIStr(TableName), - Columns: []*model.IndexColumn{ - { - Name: model.NewCIStr("x"), - Offset: 0, - Length: -1, - }, - { - Name: model.NewCIStr("y"), - Offset: 1, - Length: -1, - }, - }, - Comment: "123", - Tp: model.IndexTypeBtree, + Offset: 1, + Length: -1, }, }, + Comment: "123", + Tp: model.IndexTypeBtree, + State: model.StatePublic, }, }, - }, - } + State: model.StatePublic, + } + err = m.CreateTableOrView(1, dbInfo.Name.L, tblInfo) + require.NoError(t, err) + }) + dom, err := session.GetDomain(store) + require.NoError(t, err) + infoSchema := dom.InfoSchema() + recorder := ingestrec.New() // no ingest job, should ignore it - err := recorder.TryAddJob(fakeJob( + err = recorder.TryAddJob(fakeJob( model.ReorgTypeTxn, model.ActionAddIndex, model.JobStateSynced, @@ -149,7 +182,8 @@ func TestAddIngestRecorder(t *testing.T) { nil, ), false) require.NoError(t, err) - recorder.UpdateIndexInfo(allSchemas) + err = recorder.UpdateIndexInfo(infoSchema) + require.NoError(t, err) err = recorder.Iterate(noItem) require.NoError(t, err) @@ -165,7 +199,8 @@ func TestAddIngestRecorder(t *testing.T) { nil, ), false) require.NoError(t, err) - recorder.UpdateIndexInfo(allSchemas) + err = recorder.UpdateIndexInfo(infoSchema) + require.NoError(t, err) err = recorder.Iterate(noItem) require.NoError(t, err) @@ -181,7 +216,8 @@ func TestAddIngestRecorder(t *testing.T) { nil, ), false) require.NoError(t, err) - recorder.UpdateIndexInfo(allSchemas) + err = recorder.UpdateIndexInfo(infoSchema) + require.NoError(t, err) err = recorder.Iterate(noItem) require.NoError(t, err) @@ -200,7 +236,8 @@ func TestAddIngestRecorder(t *testing.T) { ), false) require.NoError(t, err) f, cnt := hasOneItem(1, "%n,%n", []any{"x", "y"}) - recorder.UpdateIndexInfo(allSchemas) + err = recorder.UpdateIndexInfo(infoSchema) + require.NoError(t, err) err = recorder.Iterate(f) require.NoError(t, err) require.Equal(t, *cnt, 1) @@ -221,7 +258,8 @@ func TestAddIngestRecorder(t *testing.T) { ), false) require.NoError(t, err) f, cnt := hasOneItem(1, "%n,%n", []any{"x", "y"}) - recorder.UpdateIndexInfo(allSchemas) + err = recorder.UpdateIndexInfo(infoSchema) + require.NoError(t, err) err = recorder.Iterate(f) require.NoError(t, err) require.Equal(t, *cnt, 1) @@ -241,7 +279,8 @@ func TestAddIngestRecorder(t *testing.T) { ), true) require.NoError(t, err) f, cnt := hasOneItem(1, "%n,%n", []any{"x", "y"}) - recorder.UpdateIndexInfo(allSchemas) + err = recorder.UpdateIndexInfo(infoSchema) + require.NoError(t, err) err = recorder.Iterate(f) require.NoError(t, err) require.Equal(t, *cnt, 1) @@ -249,62 +288,84 @@ func TestAddIngestRecorder(t *testing.T) { } func TestIndexesKind(t *testing.T) { - allSchemas := []*model.DBInfo{ - { - Name: model.NewCIStr(SchemaName), - Tables: []*model.TableInfo{ + //ctx := context.Background() + store, err := mockstore.NewMockStore(mockstore.WithStoreType(mockstore.EmbedUnistore)) + require.NoError(t, err) + defer func() { + require.NoError(t, store.Close()) + }() + /*se, err := session.CreateSession(store) + require.NoError(t, err) + _, err := se.ExecuteInternal(ctx) + */ + createMeta(t, store, func(m *meta.Meta) { + dbInfo := &model.DBInfo{ + ID: 1, + Name: model.NewCIStr(SchemaName), + State: model.StatePublic, + } + err := m.CreateDatabase(dbInfo) + require.NoError(t, err) + tblInfo := &model.TableInfo{ + ID: TableID, + Name: model.NewCIStr(TableName), + Columns: []*model.ColumnInfo{ + { + Name: model.NewCIStr("x"), + Hidden: false, + State: model.StatePublic, + }, + { + Name: model.NewCIStr("_V$_x_0"), + Hidden: true, + GeneratedExprString: "`x` * 2", + State: model.StatePublic, + }, { - ID: TableID, - Name: model.NewCIStr(TableName), - Columns: []*model.ColumnInfo{ + Name: model.NewCIStr("z"), + Hidden: false, + State: model.StatePublic, + }, + }, + Indices: []*model.IndexInfo{ + { + ID: 1, + Name: model.NewCIStr("x"), + Table: model.NewCIStr(TableName), + Columns: []*model.IndexColumn{ { Name: model.NewCIStr("x"), - Hidden: false, + Offset: 0, + Length: -1, }, { - Name: model.NewCIStr("_V$_x_0"), - Hidden: true, - GeneratedExprString: "`x` * 2", + Name: model.NewCIStr("_V$_x_0"), + Offset: 1, + Length: -1, }, { Name: model.NewCIStr("z"), - Hidden: false, - }, - }, - Indices: []*model.IndexInfo{ - { - ID: 1, - Name: model.NewCIStr("x"), - Table: model.NewCIStr(TableName), - Columns: []*model.IndexColumn{ - { - Name: model.NewCIStr("x"), - Offset: 0, - Length: -1, - }, - { - Name: model.NewCIStr("_V$_x_0"), - Offset: 1, - Length: -1, - }, - { - Name: model.NewCIStr("z"), - Offset: 2, - Length: 4, - }, - }, - Comment: "123", - Tp: model.IndexTypeHash, - Invisible: true, + Offset: 2, + Length: 4, }, }, + Comment: "123", + Tp: model.IndexTypeHash, + Invisible: true, + State: model.StatePublic, }, }, - }, - } + State: model.StatePublic, + } + err = m.CreateTableOrView(1, dbInfo.Name.L, tblInfo) + require.NoError(t, err) + }) + dom, err := session.GetDomain(store) + require.NoError(t, err) + infoSchema := dom.InfoSchema() recorder := ingestrec.New() - err := recorder.TryAddJob(fakeJob( + err = recorder.TryAddJob(fakeJob( model.ReorgTypeLitMerge, model.ActionAddIndex, model.JobStateSynced, @@ -315,7 +376,8 @@ func TestIndexesKind(t *testing.T) { json.RawMessage(`[1, "a"]`), ), false) require.NoError(t, err) - recorder.UpdateIndexInfo(allSchemas) + err = recorder.UpdateIndexInfo(infoSchema) + require.NoError(t, err) var ( tableID int64 indexID int64 @@ -339,50 +401,68 @@ func TestIndexesKind(t *testing.T) { } func TestRewriteTableID(t *testing.T) { - allSchemas := []*model.DBInfo{ - { - Name: model.NewCIStr(SchemaName), - Tables: []*model.TableInfo{ + store, err := mockstore.NewMockStore(mockstore.WithStoreType(mockstore.EmbedUnistore)) + require.NoError(t, err) + defer func() { + require.NoError(t, store.Close()) + }() + + createMeta(t, store, func(m *meta.Meta) { + dbInfo := &model.DBInfo{ + ID: 1, + Name: model.NewCIStr(SchemaName), + State: model.StatePublic, + } + err := m.CreateDatabase(dbInfo) + require.NoError(t, err) + tblInfo := &model.TableInfo{ + ID: TableID, + Name: model.NewCIStr(TableName), + Columns: []*model.ColumnInfo{ { - ID: TableID, - Name: model.NewCIStr(TableName), - Columns: []*model.ColumnInfo{ + Name: model.NewCIStr("x"), + Hidden: false, + State: model.StatePublic, + }, + { + Name: model.NewCIStr("y"), + Hidden: false, + State: model.StatePublic, + }, + }, + Indices: []*model.IndexInfo{ + { + ID: 1, + Name: model.NewCIStr("x"), + Table: model.NewCIStr(TableName), + Columns: []*model.IndexColumn{ { Name: model.NewCIStr("x"), - Hidden: false, + Offset: 0, + Length: -1, }, { Name: model.NewCIStr("y"), - Hidden: false, - }, - }, - Indices: []*model.IndexInfo{ - { - ID: 1, - Name: model.NewCIStr("x"), - Table: model.NewCIStr(TableName), - Columns: []*model.IndexColumn{ - { - Name: model.NewCIStr("x"), - Offset: 0, - Length: -1, - }, - { - Name: model.NewCIStr("y"), - Offset: 1, - Length: -1, - }, - }, - Comment: "123", - Tp: model.IndexTypeBtree, + Offset: 1, + Length: -1, }, }, + Comment: "123", + Tp: model.IndexTypeBtree, + State: model.StatePublic, }, }, - }, - } + State: model.StatePublic, + } + err = m.CreateTableOrView(1, dbInfo.Name.L, tblInfo) + require.NoError(t, err) + }) + dom, err := session.GetDomain(store) + require.NoError(t, err) + infoSchema := dom.InfoSchema() + recorder := ingestrec.New() - err := recorder.TryAddJob(fakeJob( + err = recorder.TryAddJob(fakeJob( model.ReorgTypeLitMerge, model.ActionAddIndex, model.JobStateSynced, @@ -393,18 +473,23 @@ func TestRewriteTableID(t *testing.T) { json.RawMessage(`[1, "a"]`), ), false) require.NoError(t, err) - recorder.UpdateIndexInfo(allSchemas) + err = recorder.UpdateIndexInfo(infoSchema) + require.NoError(t, err) recorder.RewriteTableID(func(tableID int64) (int64, bool, error) { return tableID + 1, false, nil }) + count := 0 err = recorder.Iterate(func(tableID, indexID int64, info *ingestrec.IngestIndexInfo) error { + count += 1 require.Equal(t, TableID+1, tableID) return nil }) require.NoError(t, err) recorder.RewriteTableID(func(tableID int64) (int64, bool, error) { + count += 1 return tableID + 1, true, nil }) err = recorder.Iterate(noItem) require.NoError(t, err) + require.Equal(t, 2, count) } diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index 656028dfb6b82..9e11baf74bec2 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -680,18 +680,15 @@ func (rc *LogClient) InitSchemasReplaceForDDL( if err != nil { return nil, errors.Trace(err) } - info := rc.dom.InfoSchema() - shcemas := info.AllSchemaNames() - for _, schema := range shcemas { - tblInfos, err := info.SchemaTableInfos(ctx, schema) - if err != nil { - return nil, errors.Trace(err) - } - for _, tableInfo := range tblInfos { - if tableInfo.TiFlashReplica != nil && tableInfo.TiFlashReplica.Count > 0 { - return nil, errors.Errorf("exist table(s) have tiflash replica, please remove it before restore") - } + existTiFlashTable := false + rc.dom.InfoSchema().ListTablesWithSpecialAttribute(func(tableInfo *model.TableInfo) bool { + if tableInfo.TiFlashReplica != nil && tableInfo.TiFlashReplica.Count > 0 { + existTiFlashTable = true } + return false + }) + if existTiFlashTable { + return nil, errors.Errorf("exist table(s) have tiflash replica, please remove it before restore") } } @@ -1238,7 +1235,6 @@ const ( func (rc *LogClient) generateRepairIngestIndexSQLs( ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, - allSchema []*model.DBInfo, taskName string, ) ([]checkpoint.CheckpointIngestIndexRepairSQL, bool, error) { var sqls []checkpoint.CheckpointIngestIndexRepairSQL @@ -1258,7 +1254,9 @@ func (rc *LogClient) generateRepairIngestIndexSQLs( } } - ingestRecorder.UpdateIndexInfo(allSchema) + if err := ingestRecorder.UpdateIndexInfo(rc.dom.InfoSchema()); err != nil { + return sqls, false, errors.Trace(err) + } if err := ingestRecorder.Iterate(func(_, indexID int64, info *ingestrec.IngestIndexInfo) error { var ( addSQL strings.Builder @@ -1322,13 +1320,12 @@ func (rc *LogClient) generateRepairIngestIndexSQLs( // RepairIngestIndex drops the indexes from IngestRecorder and re-add them. func (rc *LogClient) RepairIngestIndex(ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, g glue.Glue, taskName string) error { - info := rc.dom.InfoSchema() - - sqls, fromCheckpoint, err := rc.generateRepairIngestIndexSQLs(ctx, ingestRecorder, info.AllSchemas(), taskName) + sqls, fromCheckpoint, err := rc.generateRepairIngestIndexSQLs(ctx, ingestRecorder, taskName) if err != nil { return errors.Trace(err) } + info := rc.dom.InfoSchema() console := glue.GetConsole(g) NEXTSQL: for _, sql := range sqls { diff --git a/br/pkg/restore/misc.go b/br/pkg/restore/misc.go index fb1b2d335aeb1..62d7fbc32fdb4 100644 --- a/br/pkg/restore/misc.go +++ b/br/pkg/restore/misc.go @@ -16,13 +16,18 @@ package restore import ( "context" + "fmt" + "strings" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/parser/model" tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/tikv/client-go/v2/oracle" @@ -64,23 +69,52 @@ func GetTableSchema( return table.Meta(), nil } -// GetExistedUserDBs get dbs created or modified by users -func GetExistedUserDBs(dom *domain.Domain) []*model.DBInfo { +const maxUserTablesNum = 10 + +// AssertUserDBsEmpty check whether user dbs exist in the cluster +func AssertUserDBsEmpty(dom *domain.Domain) error { databases := dom.InfoSchema().AllSchemas() - existedDatabases := make([]*model.DBInfo, 0, 16) + m := meta.NewSnapshotMeta(dom.Store().GetSnapshot(kv.MaxVersion)) + userTables := make([]string, 0, maxUserTablesNum+1) + appendTables := func(dbName, tableName string) bool { + if len(userTables) >= maxUserTablesNum { + userTables = append(userTables, "...") + return true + } + userTables = append(userTables, fmt.Sprintf("%s.%s", dbName, tableName)) + return false + } +LISTDBS: for _, db := range databases { dbName := db.Name.L if tidbutil.IsMemOrSysDB(dbName) { continue - } else if dbName == "test" && len(db.Tables) == 0 { + } + tables, err := m.ListSimpleTables(db.ID) + if err != nil { + return errors.Annotatef(err, "failed to iterator tables of database[id=%d]", db.ID) + } + if len(tables) == 0 { // tidb create test db on fresh cluster // if it's empty we don't take it as user db + if dbName != "test" { + if appendTables(db.Name.O, "") { + break LISTDBS + } + } continue } - existedDatabases = append(existedDatabases, db) + for _, table := range tables { + if appendTables(db.Name.O, table.Name.O) { + break LISTDBS + } + } } - - return existedDatabases + if len(userTables) > 0 { + return errors.Annotate(berrors.ErrRestoreNotFreshCluster, + "user db/tables: "+strings.Join(userTables, ", ")) + } + return nil } // GetTS gets a new timestamp from PD. diff --git a/br/pkg/restore/misc_test.go b/br/pkg/restore/misc_test.go index 7d9fe6699505e..b461b3e395ebd 100644 --- a/br/pkg/restore/misc_test.go +++ b/br/pkg/restore/misc_test.go @@ -16,15 +16,17 @@ package restore_test import ( "context" - "math" + "fmt" + "strings" "testing" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/utiltest" - "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/session" "github.com/stretchr/testify/require" ) @@ -46,53 +48,60 @@ func TestGetTableSchema(t *testing.T) { require.Equal(t, model.NewCIStr("tidb"), tableInfo.Name) } -func TestGetExistedUserDBs(t *testing.T) { +func TestAssertUserDBsEmpty(t *testing.T) { m, err := mock.NewCluster() require.Nil(t, err) defer m.Stop() dom := m.Domain - dbs := restore.GetExistedUserDBs(dom) - require.Equal(t, 0, len(dbs)) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR) + se, err := session.CreateSession(dom.Store()) + require.Nil(t, err) + + err = restore.AssertUserDBsEmpty(dom) + require.Nil(t, err) - builder, err := infoschema.NewBuilder(dom, nil, nil).InitWithDBInfos( - []*model.DBInfo{ - {Name: model.NewCIStr("mysql")}, - {Name: model.NewCIStr("test")}, - }, - nil, nil, 1) + _, err = se.ExecuteInternal(ctx, "CREATE DATABASE d1;") require.Nil(t, err) - dom.MockInfoCacheAndLoadInfoSchema(builder.Build(math.MaxUint64)) - dbs = restore.GetExistedUserDBs(dom) - require.Equal(t, 0, len(dbs)) - - builder, err = infoschema.NewBuilder(dom, nil, nil).InitWithDBInfos( - []*model.DBInfo{ - {Name: model.NewCIStr("mysql")}, - {Name: model.NewCIStr("test")}, - {Name: model.NewCIStr("d1")}, - }, - nil, nil, 1) + err = restore.AssertUserDBsEmpty(dom) + require.Error(t, err) + require.Contains(t, err.Error(), "d1.") + + _, err = se.ExecuteInternal(ctx, "CREATE TABLE d1.test(id int);") require.Nil(t, err) - dom.MockInfoCacheAndLoadInfoSchema(builder.Build(math.MaxUint64)) - dbs = restore.GetExistedUserDBs(dom) - require.Equal(t, 1, len(dbs)) - - builder, err = infoschema.NewBuilder(dom, nil, nil).InitWithDBInfos( - []*model.DBInfo{ - {Name: model.NewCIStr("mysql")}, - {Name: model.NewCIStr("d1")}, - { - Name: model.NewCIStr("test"), - Tables: []*model.TableInfo{{ID: 1, Name: model.NewCIStr("t1"), State: model.StatePublic}}, - State: model.StatePublic, - }, - }, - nil, nil, 1) + err = restore.AssertUserDBsEmpty(dom) + require.Error(t, err) + require.Contains(t, err.Error(), "d1.test") + + _, err = se.ExecuteInternal(ctx, "DROP DATABASE d1;") require.Nil(t, err) - dom.MockInfoCacheAndLoadInfoSchema(builder.Build(math.MaxUint64)) - dbs = restore.GetExistedUserDBs(dom) - require.Equal(t, 2, len(dbs)) + for i := 0; i < 15; i += 1 { + _, err = se.ExecuteInternal(ctx, fmt.Sprintf("CREATE DATABASE d%d;", i)) + require.Nil(t, err) + } + err = restore.AssertUserDBsEmpty(dom) + require.Error(t, err) + containCount := 0 + for i := 0; i < 15; i += 1 { + if strings.Contains(err.Error(), fmt.Sprintf("d%d.", i)) { + containCount += 1 + } + } + require.Equal(t, 10, containCount) + + for i := 0; i < 15; i += 1 { + _, err = se.ExecuteInternal(ctx, fmt.Sprintf("CREATE TABLE d%d.t1(id int);", i)) + require.Nil(t, err) + } + err = restore.AssertUserDBsEmpty(dom) + require.Error(t, err) + containCount = 0 + for i := 0; i < 15; i += 1 { + if strings.Contains(err.Error(), fmt.Sprintf("d%d.t1", i)) { + containCount += 1 + } + } + require.Equal(t, 10, containCount) } func TestGetTSWithRetry(t *testing.T) { diff --git a/br/pkg/restore/snap_client/client.go b/br/pkg/restore/snap_client/client.go index ac43584eba951..061fad6388016 100644 --- a/br/pkg/restore/snap_client/client.go +++ b/br/pkg/restore/snap_client/client.go @@ -857,34 +857,7 @@ func (rc *SnapClient) IsSkipCreateSQL() bool { // user may have created some users or made other changes. func (rc *SnapClient) CheckTargetClusterFresh(ctx context.Context) error { log.Info("checking whether target cluster is fresh") - userDBs := restore.GetExistedUserDBs(rc.dom) - if len(userDBs) == 0 { - return nil - } - - const maxPrintCount = 10 - userTableOrDBNames := make([]string, 0, maxPrintCount+1) - addName := func(name string) bool { - if len(userTableOrDBNames) == maxPrintCount { - userTableOrDBNames = append(userTableOrDBNames, "...") - return false - } - userTableOrDBNames = append(userTableOrDBNames, name) - return true - } -outer: - for _, db := range userDBs { - if !addName(db.Name.L) { - break outer - } - for _, tbl := range db.Tables { - if !addName(tbl.Name.L) { - break outer - } - } - } - log.Error("not fresh cluster", zap.Strings("user tables", userTableOrDBNames)) - return errors.Annotate(berrors.ErrRestoreNotFreshCluster, "user db/tables: "+strings.Join(userTableOrDBNames, ", ")) + return restore.AssertUserDBsEmpty(rc.dom) } // ExecDDLs executes the queries of the ddl jobs. diff --git a/br/pkg/restore/snap_client/systable_restore.go b/br/pkg/restore/snap_client/systable_restore.go index c939640849f37..a99d9925b1ab2 100644 --- a/br/pkg/restore/snap_client/systable_restore.go +++ b/br/pkg/restore/snap_client/systable_restore.go @@ -125,7 +125,10 @@ func (rc *SnapClient) restoreSystemSchema(ctx context.Context, f filter.Filter, log.Info("system database not backed up, skipping", zap.String("database", sysDB)) return nil } - db, ok := rc.getDatabaseByName(sysDB) + db, ok, err := rc.getSystemDatabaseByName(ctx, sysDB) + if err != nil { + return errors.Trace(err) + } if !ok { // Or should we create the database here? log.Warn("target database not exist, aborting", zap.String("database", sysDB)) @@ -160,22 +163,27 @@ type database struct { TemporaryName model.CIStr } -// getDatabaseByName make a record of a database from info schema by its name. -func (rc *SnapClient) getDatabaseByName(name string) (*database, bool) { +// getSystemDatabaseByName make a record of a system database, such as mysql and sys, from info schema by its name. +func (rc *SnapClient) getSystemDatabaseByName(ctx context.Context, name string) (*database, bool, error) { infoSchema := rc.dom.InfoSchema() schema, ok := infoSchema.SchemaByName(model.NewCIStr(name)) if !ok { - return nil, false + return nil, false, nil } db := &database{ ExistingTables: map[string]*model.TableInfo{}, Name: model.NewCIStr(name), TemporaryName: utils.TemporaryDBName(name), } - for _, t := range schema.Tables { - db.ExistingTables[t.Name.L] = t + // It's OK to get all the tables from system tables. + tableInfos, err := infoSchema.SchemaTableInfos(ctx, schema.Name) + if err != nil { + return nil, false, errors.Trace(err) + } + for _, tbl := range tableInfos { + db.ExistingTables[tbl.Name.L] = tbl } - return db, true + return db, true, nil } // afterSystemTablesReplaced do some extra work for special system tables. diff --git a/br/pkg/stream/stream_mgr.go b/br/pkg/stream/stream_mgr.go index d61d4dfca64e8..4acdaa6faf650 100644 --- a/br/pkg/stream/stream_mgr.go +++ b/br/pkg/stream/stream_mgr.go @@ -94,25 +94,18 @@ func buildObserveTableRanges( continue } - tables, err := m.ListTables(dbInfo.ID) - if err != nil { - return nil, errors.Trace(err) - } - if len(tables) == 0 { - log.Warn("It's not necessary to observe empty database", - zap.Stringer("db", dbInfo.Name)) - continue - } - - for _, tableInfo := range tables { + if err := m.IterTables(dbInfo.ID, func(tableInfo *model.TableInfo) error { if !tableFilter.MatchTable(dbInfo.Name.O, tableInfo.Name.O) { // Skip tables other than the given table. - continue + return nil } + log.Info("start to observe the table", zap.Stringer("db", dbInfo.Name), zap.Stringer("table", tableInfo.Name)) - log.Info("observer table schema", zap.String("table", dbInfo.Name.O+"."+tableInfo.Name.O)) tableRanges := buildObserveTableRange(tableInfo) ranges = append(ranges, tableRanges...) + return nil + }); err != nil { + return nil, errors.Trace(err) } } @@ -141,6 +134,8 @@ func BuildObserveDataRanges( if len(filterStr) == 1 && filterStr[0] == string("*.*") { return buildObserverAllRange(), nil } + // TODO: currently it's a dead code, the iterator metakvs can be optimized + // to marshal only necessary fields. return buildObserveTableRanges(storage, tableFilter, backupTS) } diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index b8bb4177bd6af..60f80c77a5f86 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -51,6 +51,7 @@ go_library( "//br/pkg/version", "//pkg/config", "//pkg/domain", + "//pkg/infoschema", "//pkg/kv", "//pkg/parser/model", "//pkg/parser/mysql", diff --git a/br/pkg/task/restore_data.go b/br/pkg/task/restore_data.go index 57dc184fa182a..1b4f9718d58e1 100644 --- a/br/pkg/task/restore_data.go +++ b/br/pkg/task/restore_data.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" tidbconfig "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -176,17 +177,13 @@ func resetTiFlashReplicas(ctx context.Context, g glue.Glue, storage kv.Storage, return errors.Trace(err) } info := dom.InfoSchema() - allSchemaName := info.AllSchemaNames() recorder := tiflashrec.New() expectTiFlashStoreCount := uint64(0) needTiFlash := false - for _, s := range allSchemaName { - tblInfos, err := info.SchemaTableInfos(ctx, s) - if err != nil { - return errors.Trace(err) - } - for _, t := range tblInfos { + tableInfoRes := info.ListTablesWithSpecialAttribute(infoschema.TiFlashAttribute) + for _, s := range tableInfoRes { + for _, t := range s.TableInfos { if t.TiFlashReplica != nil { expectTiFlashStoreCount = max(expectTiFlashStoreCount, t.TiFlashReplica.Count) recorder.AddTable(t.ID, *t.TiFlashReplica) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 41adf1bf5da34..5ec178a4ceeff 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1755,18 +1755,7 @@ func buildPauseSafePointName(taskName string) string { } func checkPiTRRequirements(mgr *conn.Mgr) error { - userDBs := restore.GetExistedUserDBs(mgr.GetDomain()) - if len(userDBs) > 0 { - userDBNames := make([]string, 0, len(userDBs)) - for _, db := range userDBs { - userDBNames = append(userDBNames, db.Name.O) - } - return errors.Annotatef(berrors.ErrDatabasesAlreadyExisted, - "databases %s existed in restored cluster, please drop them before execute PiTR", - strings.Join(userDBNames, ",")) - } - - return nil + return restore.AssertUserDBsEmpty(mgr.GetDomain()) } type PiTRTaskInfo struct { diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh index 77ff8008df323..24c084af4191c 100644 --- a/br/tests/br_pitr/run.sh +++ b/br/tests/br_pitr/run.sh @@ -151,4 +151,9 @@ echo "run snapshot restore#3" run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$PREFIX/full" echo "run incremental restore but failed" -run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$PREFIX/inc_fail" || { echo 'pitr failed' ; exit 1; } +restore_fail=0 +run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$PREFIX/inc_fail" || restore_fail=1 +if [ $restore_fail -ne 1 ]; then + echo 'pitr success' + exit 1 +fi diff --git a/br/tests/run_group_br_tests.sh b/br/tests/run_group_br_tests.sh index 5e0a5f4317227..9fe7fd643a58b 100755 --- a/br/tests/run_group_br_tests.sh +++ b/br/tests/run_group_br_tests.sh @@ -26,10 +26,9 @@ groups=( ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_index br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index br_tidb_placement_policy br_tiflash br_tiflash_conflict' ["G04"]='br_range br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table' ["G05"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter' - ["G06"]='br_tikv_outage' - ["G07"]='br_tikv_outage3' + ["G06"]='br_tikv_outage br_tikv_outage3' + ["G07"]='br_pitr' ["G08"]='br_tikv_outage2 br_ttl br_views_and_sequences br_z_gc_safepoint br_autorandom' - ["G09"]='br_pitr' ) # Get other cases not in groups, to avoid missing any case