Skip to content

Commit

Permalink
cherry pick pingcap#33384 to release-5.4
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
WangLe1321 authored and ti-srebot committed Mar 28, 2022
1 parent 8effbbc commit 8f7f248
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 0 deletions.
46 changes: 46 additions & 0 deletions br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ type UniqueTableName struct {
Table string
}

type DDLJobFilterRule func(ddlJob *model.Job) bool

var incrementalRestoreActionBlockList = map[model.ActionType]struct{}{
model.ActionSetTiFlashReplica: {},
model.ActionUpdateTiFlashReplicaStatus: {},
model.ActionLockTable: {},
model.ActionUnlockTable: {},
}

// NewDB returns a new DB.
func NewDB(g glue.Glue, store kv.Storage) (*DB, error) {
se, err := g.CreateSession(store)
Expand Down Expand Up @@ -71,6 +80,13 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
return errors.Trace(err)
}

if ddlJob.Query == "" {
log.Warn("query of ddl job is empty, ignore it",
zap.Stringer("type", ddlJob.Type),
zap.String("db", ddlJob.SchemaName))
return nil
}

if tableInfo != nil {
switchDBSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName))
err = db.se.Execute(ctx, switchDBSQL)
Expand Down Expand Up @@ -280,6 +296,31 @@ func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs [
return ddlJobs
}

// FilterDDLJobByRules if one of rules returns true, the job in srcDDLJobs will be filtered.
func FilterDDLJobByRules(srcDDLJobs []*model.Job, rules ...DDLJobFilterRule) (dstDDLJobs []*model.Job) {
dstDDLJobs = make([]*model.Job, 0, len(srcDDLJobs))
for _, ddlJob := range srcDDLJobs {
passed := true
for _, rule := range rules {
if rule(ddlJob) {
passed = false
break
}
}

if passed {
dstDDLJobs = append(dstDDLJobs, ddlJob)
}
}

return
}

// DDLJobBlockListRule rule for filter ddl job with type in block list.
func DDLJobBlockListRule(ddlJob *model.Job) bool {
return checkIsInActions(ddlJob.Type, incrementalRestoreActionBlockList)
}

func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) {
dbIDs := make(map[int64]bool)
for _, table := range tables {
Expand All @@ -290,3 +331,8 @@ func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) {
}
return
}

func checkIsInActions(action model.ActionType, actions map[model.ActionType]struct{}) bool {
_, ok := actions[action]
return ok
}
82 changes: 82 additions & 0 deletions br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,16 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
<<<<<<< HEAD
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
=======
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
>>>>>>> 11db01105... br: Ignore ddl jobs with empty query or blacklist type when exec restore (#33384)
"github.com/tikv/client-go/v2/oracle"
)

Expand Down Expand Up @@ -251,3 +259,77 @@ func (s *testRestoreSchemaSuite) TestFilterDDLJobsV2(c *C) {
}
c.Assert(len(ddlJobs), Equals, 7)
}

func TestDB_ExecDDL(t *testing.T) {
s, clean := createRestoreSchemaSuite(t)
defer clean()

ctx := context.Background()
ddlJobs := []*model.Job{
{
Type: model.ActionAddIndex,
Query: "CREATE DATABASE IF NOT EXISTS test_db;",
BinlogInfo: &model.HistoryInfo{},
},
{
Type: model.ActionAddIndex,
Query: "",
BinlogInfo: &model.HistoryInfo{},
},
}

db, _, err := restore.NewDB(gluetidb.New(), s.mock.Storage, "STRICT")
require.NoError(t, err)

for _, ddlJob := range ddlJobs {
err = db.ExecDDL(ctx, ddlJob)
assert.NoError(t, err)
}
}

func TestFilterDDLJobByRules(t *testing.T) {
ddlJobs := []*model.Job{
{
Type: model.ActionSetTiFlashReplica,
},
{
Type: model.ActionAddPrimaryKey,
},
{
Type: model.ActionUpdateTiFlashReplicaStatus,
},
{
Type: model.ActionCreateTable,
},
{
Type: model.ActionLockTable,
},
{
Type: model.ActionAddIndex,
},
{
Type: model.ActionUnlockTable,
},
{
Type: model.ActionCreateSchema,
},
{
Type: model.ActionModifyColumn,
},
}

expectedDDLTypes := []model.ActionType{
model.ActionAddPrimaryKey,
model.ActionCreateTable,
model.ActionAddIndex,
model.ActionCreateSchema,
model.ActionModifyColumn,
}

ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule)

require.Equal(t, len(expectedDDLTypes), len(ddlJobs))
for i, ddlJob := range ddlJobs {
assert.Equal(t, expectedDDLTypes[i], ddlJob.Type)
}
}
1 change: 1 addition & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
newTS = restoreTS
}
ddlJobs := restore.FilterDDLJobs(client.GetDDLJobs(), tables)
ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlockListRule)

err = client.PreCheckTableTiFlashReplica(ctx, tables)
if err != nil {
Expand Down

0 comments on commit 8f7f248

Please sign in to comment.