diff --git a/br/pkg/restore/db.go b/br/pkg/restore/db.go index 3e9d35a124dfd..7510a1fb4dd5d 100644 --- a/br/pkg/restore/db.go +++ b/br/pkg/restore/db.go @@ -28,6 +28,15 @@ type UniqueTableName struct { Table string } +type DDLJobFilterRule func(ddlJob *model.Job) bool + +var incrementalRestoreActionBlockList = map[model.ActionType]struct{}{ + model.ActionSetTiFlashReplica: {}, + model.ActionUpdateTiFlashReplicaStatus: {}, + model.ActionLockTable: {}, + model.ActionUnlockTable: {}, +} + // NewDB returns a new DB. func NewDB(g glue.Glue, store kv.Storage) (*DB, error) { se, err := g.CreateSession(store) @@ -71,6 +80,13 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { return errors.Trace(err) } + if ddlJob.Query == "" { + log.Warn("query of ddl job is empty, ignore it", + zap.Stringer("type", ddlJob.Type), + zap.String("db", ddlJob.SchemaName)) + return nil + } + if tableInfo != nil { switchDBSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName)) err = db.se.Execute(ctx, switchDBSQL) @@ -280,6 +296,31 @@ func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs [ return ddlJobs } +// FilterDDLJobByRules if one of rules returns true, the job in srcDDLJobs will be filtered. +func FilterDDLJobByRules(srcDDLJobs []*model.Job, rules ...DDLJobFilterRule) (dstDDLJobs []*model.Job) { + dstDDLJobs = make([]*model.Job, 0, len(srcDDLJobs)) + for _, ddlJob := range srcDDLJobs { + passed := true + for _, rule := range rules { + if rule(ddlJob) { + passed = false + break + } + } + + if passed { + dstDDLJobs = append(dstDDLJobs, ddlJob) + } + } + + return +} + +// DDLJobBlockListRule rule for filter ddl job with type in block list. +func DDLJobBlockListRule(ddlJob *model.Job) bool { + return checkIsInActions(ddlJob.Type, incrementalRestoreActionBlockList) +} + func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) { dbIDs := make(map[int64]bool) for _, table := range tables { @@ -290,3 +331,8 @@ func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) { } return } + +func checkIsInActions(action model.ActionType, actions map[model.ActionType]struct{}) bool { + _, ok := actions[action] + return ok +} diff --git a/br/pkg/restore/db_test.go b/br/pkg/restore/db_test.go index b9a1c8948f8dd..3c9837ee21a21 100644 --- a/br/pkg/restore/db_test.go +++ b/br/pkg/restore/db_test.go @@ -21,8 +21,16 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" +<<<<<<< HEAD "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" +======= + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/parser/types" + "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +>>>>>>> 11db01105... br: Ignore ddl jobs with empty query or blacklist type when exec restore (#33384) "github.com/tikv/client-go/v2/oracle" ) @@ -251,3 +259,77 @@ func (s *testRestoreSchemaSuite) TestFilterDDLJobsV2(c *C) { } c.Assert(len(ddlJobs), Equals, 7) } + +func TestDB_ExecDDL(t *testing.T) { + s, clean := createRestoreSchemaSuite(t) + defer clean() + + ctx := context.Background() + ddlJobs := []*model.Job{ + { + Type: model.ActionAddIndex, + Query: "CREATE DATABASE IF NOT EXISTS test_db;", + BinlogInfo: &model.HistoryInfo{}, + }, + { + Type: model.ActionAddIndex, + Query: "", + BinlogInfo: &model.HistoryInfo{}, + }, + } + + db, _, err := restore.NewDB(gluetidb.New(), s.mock.Storage, "STRICT") + require.NoError(t, err) + + for _, ddlJob := range ddlJobs { + err = db.ExecDDL(ctx, ddlJob) + assert.NoError(t, err) + } +} + +func TestFilterDDLJobByRules(t *testing.T) { + ddlJobs := []*model.Job{ + { + Type: model.ActionSetTiFlashReplica, + }, + { + Type: model.ActionAddPrimaryKey, + }, + { + Type: model.ActionUpdateTiFlashReplicaStatus, + }, + { + Type: model.ActionCreateTable, + }, + { + Type: model.ActionLockTable, + }, + { + Type: model.ActionAddIndex, + }, + { + Type: model.ActionUnlockTable, + }, + { + Type: model.ActionCreateSchema, + }, + { + Type: model.ActionModifyColumn, + }, + } + + expectedDDLTypes := []model.ActionType{ + model.ActionAddPrimaryKey, + model.ActionCreateTable, + model.ActionAddIndex, + model.ActionCreateSchema, + model.ActionModifyColumn, + } + + ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule) + + require.Equal(t, len(expectedDDLTypes), len(ddlJobs)) + for i, ddlJob := range ddlJobs { + assert.Equal(t, expectedDDLTypes[i], ddlJob.Type) + } +} diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index ae46f15b1f6ce..27df82804288a 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -322,6 +322,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf newTS = restoreTS } ddlJobs := restore.FilterDDLJobs(client.GetDDLJobs(), tables) + ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule) err = client.PreCheckTableTiFlashReplica(ctx, tables) if err != nil {