diff --git a/br/pkg/restore/db.go b/br/pkg/restore/db.go index 65a5fdb4d1ed6..6f1d7b36323ba 100644 --- a/br/pkg/restore/db.go +++ b/br/pkg/restore/db.go @@ -30,6 +30,15 @@ type UniqueTableName struct { Table string } +type DDLJobFilterRule func(ddlJob *model.Job) bool + +var incrementalRestoreActionBlockList = map[model.ActionType]struct{}{ + model.ActionSetTiFlashReplica: {}, + model.ActionUpdateTiFlashReplicaStatus: {}, + model.ActionLockTable: {}, + model.ActionUnlockTable: {}, +} + // NewDB returns a new DB. func NewDB(g glue.Glue, store kv.Storage, policyMode string) (*DB, bool, error) { se, err := g.CreateSession(store) @@ -90,6 +99,13 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { return errors.Trace(err) } + if ddlJob.Query == "" { + log.Warn("query of ddl job is empty, ignore it", + zap.Stringer("type", ddlJob.Type), + zap.String("db", ddlJob.SchemaName)) + return nil + } + if tableInfo != nil { switchDBSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName)) err = db.se.Execute(ctx, switchDBSQL) @@ -409,6 +425,31 @@ func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs [ return ddlJobs } +// FilterDDLJobByRules if one of rules returns true, the job in srcDDLJobs will be filtered. +func FilterDDLJobByRules(srcDDLJobs []*model.Job, rules ...DDLJobFilterRule) (dstDDLJobs []*model.Job) { + dstDDLJobs = make([]*model.Job, 0, len(srcDDLJobs)) + for _, ddlJob := range srcDDLJobs { + passed := true + for _, rule := range rules { + if rule(ddlJob) { + passed = false + break + } + } + + if passed { + dstDDLJobs = append(dstDDLJobs, ddlJob) + } + } + + return +} + +// DDLJobBlockListRule rule for filter ddl job with type in block list. +func DDLJobBlockListRule(ddlJob *model.Job) bool { + return checkIsInActions(ddlJob.Type, incrementalRestoreActionBlockList) +} + func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) { dbIDs := make(map[int64]bool) for _, table := range tables { @@ -419,3 +460,8 @@ func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) { } return } + +func checkIsInActions(action model.ActionType, actions map[model.ActionType]struct{}) bool { + _, ok := actions[action] + return ok +} diff --git a/br/pkg/restore/db_test.go b/br/pkg/restore/db_test.go index 2d774355c6c85..a7fd2eb82ed78 100644 --- a/br/pkg/restore/db_test.go +++ b/br/pkg/restore/db_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" ) @@ -293,3 +294,77 @@ func TestFilterDDLJobsV2(t *testing.T) { } require.Equal(t, 7, len(ddlJobs)) } + +func TestDB_ExecDDL(t *testing.T) { + s, clean := createRestoreSchemaSuite(t) + defer clean() + + ctx := context.Background() + ddlJobs := []*model.Job{ + { + Type: model.ActionAddIndex, + Query: "CREATE DATABASE IF NOT EXISTS test_db;", + BinlogInfo: &model.HistoryInfo{}, + }, + { + Type: model.ActionAddIndex, + Query: "", + BinlogInfo: &model.HistoryInfo{}, + }, + } + + db, _, err := restore.NewDB(gluetidb.New(), s.mock.Storage, "STRICT") + require.NoError(t, err) + + for _, ddlJob := range ddlJobs { + err = db.ExecDDL(ctx, ddlJob) + assert.NoError(t, err) + } +} + +func TestFilterDDLJobByRules(t *testing.T) { + ddlJobs := []*model.Job{ + { + Type: model.ActionSetTiFlashReplica, + }, + { + Type: model.ActionAddPrimaryKey, + }, + { + Type: model.ActionUpdateTiFlashReplicaStatus, + }, + { + Type: model.ActionCreateTable, + }, + { + Type: model.ActionLockTable, + }, + { + Type: model.ActionAddIndex, + }, + { + Type: model.ActionUnlockTable, + }, + { + Type: model.ActionCreateSchema, + }, + { + Type: model.ActionModifyColumn, + }, + } + + expectedDDLTypes := []model.ActionType{ + model.ActionAddPrimaryKey, + model.ActionCreateTable, + model.ActionAddIndex, + model.ActionCreateSchema, + model.ActionModifyColumn, + } + + ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule) + + require.Equal(t, len(expectedDDLTypes), len(ddlJobs)) + for i, ddlJob := range ddlJobs { + assert.Equal(t, expectedDDLTypes[i], ddlJob.Type) + } +} diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 1895a901d7bb2..89bb71e49ae84 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -359,6 +359,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf newTS = restoreTS } ddlJobs := restore.FilterDDLJobs(client.GetDDLJobs(), tables) + ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule) err = client.PreCheckTableTiFlashReplica(ctx, tables) if err != nil {