From a81f53beae6a8c4e53f78826356a15632fe238cf Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Thu, 16 Jun 2022 14:16:34 +0800 Subject: [PATCH] br: Ignore ddl jobs with empty query or blacklist type when exec restore (#33384) (#33516) close pingcap/tidb#33322 --- br/pkg/restore/db.go | 46 +++++++++++++++++++++++++ br/pkg/restore/db_test.go | 71 +++++++++++++++++++++++++++++++++++++++ br/pkg/task/restore.go | 1 + 3 files changed, 118 insertions(+) diff --git a/br/pkg/restore/db.go b/br/pkg/restore/db.go index 3e9d35a124dfd..7510a1fb4dd5d 100644 --- a/br/pkg/restore/db.go +++ b/br/pkg/restore/db.go @@ -28,6 +28,15 @@ type UniqueTableName struct { Table string } +type DDLJobFilterRule func(ddlJob *model.Job) bool + +var incrementalRestoreActionBlockList = map[model.ActionType]struct{}{ + model.ActionSetTiFlashReplica: {}, + model.ActionUpdateTiFlashReplicaStatus: {}, + model.ActionLockTable: {}, + model.ActionUnlockTable: {}, +} + // NewDB returns a new DB. func NewDB(g glue.Glue, store kv.Storage) (*DB, error) { se, err := g.CreateSession(store) @@ -71,6 +80,13 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { return errors.Trace(err) } + if ddlJob.Query == "" { + log.Warn("query of ddl job is empty, ignore it", + zap.Stringer("type", ddlJob.Type), + zap.String("db", ddlJob.SchemaName)) + return nil + } + if tableInfo != nil { switchDBSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName)) err = db.se.Execute(ctx, switchDBSQL) @@ -280,6 +296,31 @@ func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs [ return ddlJobs } +// FilterDDLJobByRules if one of rules returns true, the job in srcDDLJobs will be filtered. +func FilterDDLJobByRules(srcDDLJobs []*model.Job, rules ...DDLJobFilterRule) (dstDDLJobs []*model.Job) { + dstDDLJobs = make([]*model.Job, 0, len(srcDDLJobs)) + for _, ddlJob := range srcDDLJobs { + passed := true + for _, rule := range rules { + if rule(ddlJob) { + passed = false + break + } + } + + if passed { + dstDDLJobs = append(dstDDLJobs, ddlJob) + } + } + + return +} + +// DDLJobBlockListRule rule for filter ddl job with type in block list. +func DDLJobBlockListRule(ddlJob *model.Job) bool { + return checkIsInActions(ddlJob.Type, incrementalRestoreActionBlockList) +} + func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) { dbIDs := make(map[int64]bool) for _, table := range tables { @@ -290,3 +331,8 @@ func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) { } return } + +func checkIsInActions(action model.ActionType, actions map[model.ActionType]struct{}) bool { + _, ok := actions[action] + return ok +} diff --git a/br/pkg/restore/db_test.go b/br/pkg/restore/db_test.go index b9a1c8948f8dd..35e9f5ee7ea6d 100644 --- a/br/pkg/restore/db_test.go +++ b/br/pkg/restore/db_test.go @@ -251,3 +251,74 @@ func (s *testRestoreSchemaSuite) TestFilterDDLJobsV2(c *C) { } c.Assert(len(ddlJobs), Equals, 7) } + +func (s *testRestoreSchemaSuite) TestDB_ExecDDL(c *C) { + ctx := context.Background() + ddlJobs := []*model.Job{ + { + Type: model.ActionAddIndex, + Query: "CREATE DATABASE IF NOT EXISTS test_db;", + BinlogInfo: &model.HistoryInfo{}, + }, + { + Type: model.ActionAddIndex, + Query: "", + BinlogInfo: &model.HistoryInfo{}, + }, + } + + db, err := restore.NewDB(gluetidb.New(), s.mock.Storage) + c.Assert(err, IsNil) + + for _, ddlJob := range ddlJobs { + err = db.ExecDDL(ctx, ddlJob) + c.Assert(err, IsNil) + } +} + +func (s *testRestoreSchemaSuite) TestFilterDDLJobByRules(c *C) { + ddlJobs := []*model.Job{ + { + Type: model.ActionSetTiFlashReplica, + }, + { + Type: model.ActionAddPrimaryKey, + }, + { + Type: model.ActionUpdateTiFlashReplicaStatus, + }, + { + Type: model.ActionCreateTable, + }, + { + Type: model.ActionLockTable, + }, + { + Type: model.ActionAddIndex, + }, + { + Type: model.ActionUnlockTable, + }, + { + Type: model.ActionCreateSchema, + }, + { + Type: model.ActionModifyColumn, + }, + } + + expectedDDLTypes := []model.ActionType{ + model.ActionAddPrimaryKey, + model.ActionCreateTable, + model.ActionAddIndex, + model.ActionCreateSchema, + model.ActionModifyColumn, + } + + ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule) + + c.Assert(len(ddlJobs), Equals, len(expectedDDLTypes)) + for i, ddlJob := range ddlJobs { + c.Assert(ddlJob.Type, Equals, expectedDDLTypes[i]) + } +} diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index ae46f15b1f6ce..27df82804288a 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -322,6 +322,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf newTS = restoreTS } ddlJobs := restore.FilterDDLJobs(client.GetDDLJobs(), tables) + ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule) err = client.PreCheckTableTiFlashReplica(ctx, tables) if err != nil {