Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: Ignore ddl jobs with empty query or blacklist type when exec restore #33384

Merged
merged 9 commits into from
Mar 28, 2022
47 changes: 47 additions & 0 deletions br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ type UniqueTableName struct {
Table string
}

type DDLJobFilterRule func(ddlJob *model.Job) bool

var incrementalRestoreActionBlacklist = []model.ActionType{
model.ActionSetTiFlashReplica,
model.ActionUpdateTiFlashReplicaStatus,
model.ActionLockTable,
model.ActionUnlockTable,
}

// NewDB returns a new DB.
func NewDB(g glue.Glue, store kv.Storage, policyMode string) (*DB, bool, error) {
se, err := g.CreateSession(store)
Expand Down Expand Up @@ -90,6 +99,10 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
return errors.Trace(err)
}

if ddlJob.Query == "" {
return nil
WangLe1321 marked this conversation as resolved.
Show resolved Hide resolved
}

if tableInfo != nil {
switchDBSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName))
err = db.se.Execute(ctx, switchDBSQL)
Expand Down Expand Up @@ -409,6 +422,31 @@ func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs [
return ddlJobs
}

// FilterDDLJobByRules if one of rules returns true, the job in srcDDLJobs will be filtered.
func FilterDDLJobByRules(srcDDLJobs []*model.Job, rules ...DDLJobFilterRule) (dstDDLJobs []*model.Job) {
dstDDLJobs = make([]*model.Job, 0, len(srcDDLJobs))
for _, ddlJob := range srcDDLJobs {
passed := true
for _, rule := range rules {
if rule(ddlJob) {
passed = false
break
}
}

if passed {
dstDDLJobs = append(dstDDLJobs, ddlJob)
}
}

return
}

// DDLJobBlacklistRule rule for filter ddl job with type in blacklist.
func DDLJobBlacklistRule(ddlJob *model.Job) bool {
WangLe1321 marked this conversation as resolved.
Show resolved Hide resolved
return checkIsInActionList(ddlJob.Type, incrementalRestoreActionBlacklist)
}

func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) {
dbIDs := make(map[int64]bool)
for _, table := range tables {
Expand All @@ -419,3 +457,12 @@ func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) {
}
return
}

func checkIsInActionList(action model.ActionType, actionList []model.ActionType) bool {
for _, a := range actionList {
if action == a {
return true
}
}
return false
}
75 changes: 75 additions & 0 deletions br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)
Expand Down Expand Up @@ -293,3 +294,77 @@ func TestFilterDDLJobsV2(t *testing.T) {
}
require.Equal(t, 7, len(ddlJobs))
}

func TestDB_ExecDDL(t *testing.T) {
s, clean := createRestoreSchemaSuite(t)
defer clean()

ctx := context.Background()
ddlJobs := []*model.Job{
{
Type: model.ActionAddIndex,
Query: "CREATE DATABASE IF NOT EXISTS test_db;",
BinlogInfo: &model.HistoryInfo{},
},
{
Type: model.ActionAddIndex,
Query: "",
BinlogInfo: &model.HistoryInfo{},
},
}

db, _, err := restore.NewDB(gluetidb.New(), s.mock.Storage, "STRICT")
require.NoError(t, err)

for _, ddlJob := range ddlJobs {
err = db.ExecDDL(ctx, ddlJob)
assert.NoError(t, err)
}
}

func TestFilterDDLJobByRules(t *testing.T) {
ddlJobs := []*model.Job{
{
Type: model.ActionSetTiFlashReplica,
},
{
Type: model.ActionAddPrimaryKey,
},
{
Type: model.ActionUpdateTiFlashReplicaStatus,
},
{
Type: model.ActionCreateTable,
},
{
Type: model.ActionLockTable,
},
{
Type: model.ActionAddIndex,
},
{
Type: model.ActionUnlockTable,
},
{
Type: model.ActionCreateSchema,
},
{
Type: model.ActionModifyColumn,
},
}

expectedDDLTypes := []model.ActionType{
model.ActionAddPrimaryKey,
model.ActionCreateTable,
model.ActionAddIndex,
model.ActionCreateSchema,
model.ActionModifyColumn,
}

ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlacklistRule)

require.Equal(t, len(expectedDDLTypes), len(ddlJobs))
for i, ddlJob := range ddlJobs {
assert.Equal(t, expectedDDLTypes[i], ddlJob.Type)
}
}
1 change: 1 addition & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
newTS = restoreTS
}
ddlJobs := restore.FilterDDLJobs(client.GetDDLJobs(), tables)
ddlJobs = restore.FilterDDLJobByRules(ddlJobs, restore.DDLJobBlacklistRule)

err = client.PreCheckTableTiFlashReplica(ctx, tables)
if err != nil {
Expand Down