Skip to content

Commit

Permalink
ddl: fix cancel drop column ddl error (#8862)
Browse files Browse the repository at this point in the history
  • Loading branch information
ciscoxll authored and zz-jason committed Jan 2, 2019
1 parent 2fcfb88 commit 94af2bc
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 4 deletions.
86 changes: 86 additions & 0 deletions ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2006,6 +2006,92 @@ func (s *testDBSuite) TestUpdateHandleFailed(c *C) {
tk.MustExec("admin check index t idx_b")
}

// TestCancelDropColumn tests cancel ddl job which type is drop column.
func (s *testDBSuite) TestCancelDropColumn(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
s.mustExec(c, "drop table if exists test_drop_column")
s.mustExec(c, "create table test_drop_column(c1 int, c2 int)")
defer s.mustExec(c, "drop table test_drop_column;")
testCases := []struct {
needAddColumn bool
jobState model.JobState
JobSchemaState model.SchemaState
cancelSucc bool
}{
{true, model.JobStateNone, model.StateNone, true},
{false, model.JobStateRunning, model.StateWriteOnly, false},
{true, model.JobStateRunning, model.StateDeleteOnly, false},
{true, model.JobStateRunning, model.StateDeleteReorganization, false},
}
var checkErr error
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
ddl.ReorgWaitTimeout = 50 * time.Millisecond
defer func() { ddl.ReorgWaitTimeout = oldReorgWaitTimeout }()
hook := &ddl.TestDDLCallback{}
var jobID int64
testCase := &testCases[0]
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionDropColumn && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState {
jobIDs := []int64{job.ID}
jobID = job.ID
hookCtx := mock.NewContext()
hookCtx.Store = s.store
err := hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
return
}
txn, err := hookCtx.Txn(true)
if err != nil {
checkErr = errors.Trace(err)
return
}
errs, err := admin.CancelJobs(txn, jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}
if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}
checkErr = txn.Commit(context.Background())
}
}
s.dom.DDL().SetHook(hook)
var err1 error
for i := range testCases {
testCase = &testCases[i]
if testCase.needAddColumn {
s.mustExec(c, "alter table test_drop_column add column c3 int")
}
_, err1 = s.tk.Exec("alter table test_drop_column drop column c3")
var col1 *table.Column
t := s.testGetTable(c, "test_drop_column")
for _, col := range t.Cols() {
if strings.EqualFold(col.Name.L, "c3") {
col1 = col
break
}
}
if testCase.cancelSucc {
c.Assert(checkErr, IsNil)
c.Assert(col1, NotNil)
c.Assert(col1.Name.L, Equals, "c3")
c.Assert(err1.Error(), Equals, "[ddl:12]cancelled DDL job")
} else {
c.Assert(col1, IsNil)
c.Assert(err1, IsNil)
c.Assert(checkErr, NotNil)
c.Assert(checkErr.Error(), Equals, admin.ErrCannotCancelDDLJob.GenByArgs(jobID).Error())
}
}
s.dom.DDL().SetHook(&ddl.TestDDLCallback{})
s.mustExec(c, "alter table test_drop_column add column c3 int")
s.mustExec(c, "alter table test_drop_column drop column c3")
}

func (s *testDBSuite) TestAddIndexFailed(c *C) {
gofail.Enable("github.com/pingcap/tidb/ddl/mockAddIndexErr", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/ddl/mockAddIndexErr")
Expand Down
15 changes: 15 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,18 @@ func testAddColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, t
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func testCancelDropColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, args []interface{}) *model.Job {
job := &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionDropColumn,
Args: args,
BinlogInfo: &model.HistoryInfo{},
}
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}
51 changes: 47 additions & 4 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,10 @@ func buildCancelJobTests(firstID int64) []testCancelJob {
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 10}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 11}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 12}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenByArgs(firstID + 12)}, cancelState: model.StatePublic, ddlRetErr: err},

{act: model.ActionDropColumn, jobIDs: []int64{firstID + 13}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenByArgs(firstID + 13)}, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 14}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenByArgs(firstID + 14)}, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 15}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenByArgs(firstID + 15)}, cancelState: model.StateDeleteReorganization, ddlRetErr: err},
}

return tests
Expand All @@ -505,6 +509,19 @@ func (s *testDDLSuite) checkAddIdx(c *C, d *ddl, schemaID int64, tableID int64,
}
c.Assert(found, Equals, success)
}

func (s *testDDLSuite) checkCancelDropColumn(c *C, d *ddl, schemaID int64, tableID int64, colName string, success bool) {
changedTable := testGetTable(c, d, schemaID, tableID)
notFound := true
for _, colInfo := range changedTable.Meta().Columns {
if colInfo.Name.O == colName {
notFound = false
break
}
}
c.Assert(notFound, Equals, success)
}

func (s *testDDLSuite) checkAddColumn(c *C, d *ddl, schemaID int64, tableID int64, colName string, success bool) {
changedTable := testGetTable(c, d, schemaID, tableID)
var found bool
Expand All @@ -525,15 +542,15 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
dbInfo := testSchemaInfo(c, d, "test_cancel_job")
testCreateSchema(c, testNewContext(d), d, dbInfo)

// create table t (c1 int, c2 int);
tblInfo := testTableInfo(c, d, "t", 2)
// create table t (c1 int, c2 int, c3 int, c4 int);
tblInfo := testTableInfo(c, d, "t", 4)
ctx := testNewContext(d)
err := ctx.NewTxn()
c.Assert(err, IsNil)
job := testCreateTable(c, ctx, d, dbInfo, tblInfo)
// insert t values (1, 2);
// insert t values (1, 2, 3, 4);
originTable := testGetTable(c, d, dbInfo.ID, tblInfo.ID)
row := types.MakeDatums(1, 2)
row := types.MakeDatums(1, 2, 3, 4)
_, err = originTable.AddRecord(ctx, row, false)
c.Assert(err, IsNil)
txn, err := ctx.Txn(true)
Expand Down Expand Up @@ -650,6 +667,32 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
testAddColumn(c, ctx, d, dbInfo, tblInfo, addColumnArgs)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, true)

// for drop column.
test = &tests[12]
dropColName := "c1"
dropColumnArgs := []interface{}{model.NewCIStr(dropColName)}
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)
testCancelDropColumn(c, ctx, d, dbInfo, tblInfo, dropColumnArgs)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, true)

test = &tests[13]

dropColName = "c2"
dropColumnArgs = []interface{}{model.NewCIStr(dropColName)}
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)
testCancelDropColumn(c, ctx, d, dbInfo, tblInfo, dropColumnArgs)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, true)

test = &tests[14]
dropColName = "c3"
dropColumnArgs = []interface{}{model.NewCIStr(dropColName)}
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)
testCancelDropColumn(c, ctx, d, dbInfo, tblInfo, dropColumnArgs)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, true)
}

func (s *testDDLSuite) TestIgnorableSpec(c *C) {
Expand Down
35 changes: 35 additions & 0 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,47 @@ func rollingbackAddindex(d *ddl, t *meta.Meta, job *model.Job) (ver int64, err e
}
return
}

func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
tblInfo, err := getTableInfo(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

var colName model.CIStr
err = job.DecodeArgs(&colName)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
if colInfo == nil {
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.Gen("column %s doesn't exist", colName)
}

// StatePublic means when the job is not running yet.
if colInfo.State == model.StatePublic {
job.State = model.JobStateCancelled
} else {
// In the state of drop column `write only -> delete only -> reorganization`,
// We can not rollback now, so just continue to drop column.
job.State = model.JobStateRunning
return ver, errors.Trace(nil)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo)
return ver, errors.Trace(errCancelledDDLJob)
}

func convertJob2RollbackJob(d *ddl, t *meta.Meta, job *model.Job) (ver int64, err error) {
switch job.Type {
case model.ActionAddColumn:
ver, err = rollingbackAddColumn(t, job)
case model.ActionAddIndex:
ver, err = rollingbackAddindex(d, t, job)
case model.ActionDropColumn:
ver, err = rollingbackDropColumn(t, job)
default:
job.State = model.JobStateCancelled
err = errCancelledDDLJob
Expand Down
18 changes: 18 additions & 0 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ func GetDDLInfo(txn kv.Transaction) (*DDLInfo, error) {
return info, nil
}

func isJobRollbackable(job *model.Job, id int64) error {
switch job.Type {
case model.ActionDropColumn:
if job.SchemaState != model.StateNone {
return ErrCannotCancelDDLJob.GenByArgs(id)
}
}
return nil
}

// CancelJobs cancels the DDL jobs.
func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) {
if len(ids) == 0 {
Expand Down Expand Up @@ -96,6 +106,11 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) {
if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() {
continue
}
errs[i] = isJobRollbackable(job, id)
if errs[i] != nil {
continue
}

job.State = model.JobStateCancelling
// Make sure RawArgs isn't overwritten.
err := job.DecodeArgs(job.RawArgs)
Expand Down Expand Up @@ -603,6 +618,7 @@ const (
codeInvalidColumnState = 3
codeDDLJobNotFound = 4
codeCancelFinishedJob = 5
codeCannotCancelDDLJob = 6
)

var (
Expand All @@ -613,4 +629,6 @@ var (
ErrDDLJobNotFound = terror.ClassAdmin.New(codeDDLJobNotFound, "DDL Job:%v not found")
// ErrCancelFinishedDDLJob returns when cancel a finished ddl job.
ErrCancelFinishedDDLJob = terror.ClassAdmin.New(codeCancelFinishedJob, "This job:%v is finished, so can't be cancelled")
// ErrCannotCancelDDLJob returns when cancel a almost finished ddl job, because cancel in now may cause data inconsistency.
ErrCannotCancelDDLJob = terror.ClassAdmin.New(codeCannotCancelDDLJob, "This job:%v is almost finished, can't be cancelled now")
)

0 comments on commit 94af2bc

Please sign in to comment.