diff --git a/ddl/db_test.go b/ddl/db_test.go index c002c942fd7fc..9adeba750916d 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -890,6 +890,90 @@ func checkDelRangeDone(c *C, ctx sessionctx.Context, idx table.Index) { c.Assert(handles, HasLen, 0, Commentf("take time %v", time.Since(startTime))) } +// TestCancelDropColumn tests cancel ddl job which type is drop column. +func (s *testDBSuite) TestCancelDropColumn(c *C) { + s.tk = testkit.NewTestKit(c, s.store) + s.tk.MustExec("use " + s.schemaName) + s.mustExec(c, "drop table if exists test_drop_column") + s.mustExec(c, "create table test_drop_column(c1 int, c2 int)") + defer s.mustExec(c, "drop table test_drop_column;") + testCases := []struct { + needAddColumn bool + jobState model.JobState + JobSchemaState model.SchemaState + cancelSucc bool + }{ + {true, model.JobStateNone, model.StateNone, true}, + {false, model.JobStateRunning, model.StateWriteOnly, false}, + {true, model.JobStateRunning, model.StateDeleteOnly, false}, + {true, model.JobStateRunning, model.StateDeleteReorganization, false}, + } + var checkErr error + hook := &ddl.TestDDLCallback{} + var jobID int64 + testCase := &testCases[0] + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionDropColumn && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState { + jobIDs := []int64{job.ID} + jobID = job.ID + hookCtx := mock.NewContext() + hookCtx.Store = s.store + err := hookCtx.NewTxn() + if err != nil { + checkErr = errors.Trace(err) + return + } + txn, err := hookCtx.Txn(true) + if err != nil { + checkErr = errors.Trace(err) + return + } + errs, err := admin.CancelJobs(txn, jobIDs) + if err != nil { + checkErr = errors.Trace(err) + return + } + if errs[0] != nil { + checkErr = errors.Trace(errs[0]) + return + } + checkErr = txn.Commit(context.Background()) + } + } + + s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + var err1 error + for i := range testCases { + testCase = &testCases[i] + if testCase.needAddColumn { + s.mustExec(c, "alter table test_drop_column add column c3 int") + } + _, err1 = s.tk.Exec("alter table test_drop_column drop column c3") + var col1 *table.Column + t := s.testGetTable(c, "test_drop_column") + for _, col := range t.Cols() { + if strings.EqualFold(col.Name.L, "c3") { + col1 = col + break + } + } + if testCase.cancelSucc { + c.Assert(checkErr, IsNil) + c.Assert(col1, NotNil) + c.Assert(col1.Name.L, Equals, "c3") + c.Assert(err1.Error(), Equals, "[ddl:12]cancelled DDL job") + } else { + c.Assert(col1, IsNil) + c.Assert(err1, IsNil) + c.Assert(checkErr, NotNil) + c.Assert(checkErr.Error(), Equals, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error()) + } + } + s.dom.DDL().(ddl.DDLForTest).SetHook(&ddl.TestDDLCallback{}) + s.mustExec(c, "alter table test_drop_column add column c3 int") + s.mustExec(c, "alter table test_drop_column drop column c3") +} + func (s *testDBSuite) TestAddIndexWithDupCols(c *C) { s.tk = testkit.NewTestKit(c, s.store) s.tk.MustExec("use " + s.schemaName) diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index c83c7c842a0e3..f536f1dece7d4 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -351,6 +351,10 @@ func buildCancelJobTests(firstID int64) []testCancelJob { {act: model.ActionAddColumn, jobIDs: []int64{firstID + 10}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err}, {act: model.ActionAddColumn, jobIDs: []int64{firstID + 11}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err}, {act: model.ActionAddColumn, jobIDs: []int64{firstID + 12}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 12)}, cancelState: model.StatePublic, ddlRetErr: err}, + + {act: model.ActionDropColumn, jobIDs: []int64{firstID + 13}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 13)}, cancelState: model.StateDeleteOnly, ddlRetErr: err}, + {act: model.ActionDropColumn, jobIDs: []int64{firstID + 14}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 14)}, cancelState: model.StateWriteOnly, ddlRetErr: err}, + {act: model.ActionDropColumn, jobIDs: []int64{firstID + 15}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 15)}, cancelState: model.StateWriteReorganization, ddlRetErr: err}, } return tests @@ -380,6 +384,18 @@ func (s *testDDLSuite) checkAddColumn(c *C, d *ddl, schemaID int64, tableID int6 c.Assert(found, Equals, success) } +func (s *testDDLSuite) checkCancelDropColumn(c *C, d *ddl, schemaID int64, tableID int64, colName string, success bool) { + changedTable := testGetTable(c, d, schemaID, tableID) + notFound := true + for _, colInfo := range changedTable.Meta().Columns { + if colInfo.Name.O == colName { + notFound = false + break + } + } + c.Assert(notFound, Equals, success) +} + func (s *testDDLSuite) TestCancelJob(c *C) { store := testCreateStore(c, "test_cancel_job") defer store.Close() @@ -388,15 +404,15 @@ func (s *testDDLSuite) TestCancelJob(c *C) { dbInfo := testSchemaInfo(c, d, "test_cancel_job") testCreateSchema(c, testNewContext(d), d, dbInfo) - // create table t (c1 int, c2 int); - tblInfo := testTableInfo(c, d, "t", 2) + // create table t (c1 int, c2 int, c3 int, c4 int, c5 int); + tblInfo := testTableInfo(c, d, "t", 5) ctx := testNewContext(d) err := ctx.NewTxn() c.Assert(err, IsNil) job := testCreateTable(c, ctx, d, dbInfo, tblInfo) - // insert t values (1, 2); + // insert t values (1, 2, 3, 4, 5); originTable := testGetTable(c, d, dbInfo.ID, tblInfo.ID) - row := types.MakeDatums(1, 2) + row := types.MakeDatums(1, 2, 3, 4, 5) _, err = originTable.AddRecord(ctx, row) c.Assert(err, IsNil) txn, err := ctx.Txn(true) @@ -518,6 +534,28 @@ func (s *testDDLSuite) TestCancelJob(c *C) { testAddColumn(c, ctx, d, dbInfo, tblInfo, addColumnArgs) c.Check(errors.ErrorStack(checkErr), Equals, "") s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, true) + + // for drop column. + test = &tests[12] + dropColName := "c3" + s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false) + testDropColumn(c, ctx, d, dbInfo, tblInfo, dropColName, false) + c.Check(errors.ErrorStack(checkErr), Equals, "") + s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, true) + + test = &tests[13] + dropColName = "c4" + s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false) + testDropColumn(c, ctx, d, dbInfo, tblInfo, dropColName, false) + c.Check(errors.ErrorStack(checkErr), Equals, "") + s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, true) + + test = &tests[14] + dropColName = "c5" + s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false) + testDropColumn(c, ctx, d, dbInfo, tblInfo, dropColName, false) + c.Check(errors.ErrorStack(checkErr), Equals, "") + s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, true) } func (s *testDDLSuite) TestIgnorableSpec(c *C) { diff --git a/ddl/rollingback.go b/ddl/rollingback.go index 74f9513a6b0c9..4492ea2d11654 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -131,12 +131,46 @@ func rollingbackAddindex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve return } +func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error) { + tblInfo, err := getTableInfo(t, job, job.SchemaID) + if err != nil { + return ver, errors.Trace(err) + } + + var colName model.CIStr + err = job.DecodeArgs(&colName) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L) + if colInfo == nil { + job.State = model.JobStateCancelled + return ver, ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName) + } + + // StatePublic means when the job is not running yet. + if colInfo.State == model.StatePublic { + job.State = model.JobStateCancelled + } else { + // In the state of drop column `write only -> delete only -> reorganization`, + // We can not rollback now, so just continue to drop column. + job.State = model.JobStateRunning + return ver, errors.Trace(nil) + } + job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo) + return ver, errors.Trace(errCancelledDDLJob) +} + func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { switch job.Type { case model.ActionAddColumn: ver, err = rollingbackAddColumn(t, job) case model.ActionAddIndex: ver, err = rollingbackAddindex(w, d, t, job) + case model.ActionDropColumn: + ver, err = rollingbackDropColumn(t, job) default: job.State = model.JobStateCancelled err = errCancelledDDLJob diff --git a/util/admin/admin.go b/util/admin/admin.go index ae35073e1f8dd..eed8a58d7c88c 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -83,6 +83,16 @@ func GetDDLInfo(txn kv.Transaction) (*DDLInfo, error) { return info, nil } +func isJobRollbackable(job *model.Job, id int64) error { + switch job.Type { + case model.ActionDropColumn: + if job.SchemaState != model.StateNone { + return ErrCannotCancelDDLJob.GenWithStackByArgs(id) + } + } + return nil +} + // CancelJobs cancels the DDL jobs. func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) { if len(ids) == 0 { @@ -113,6 +123,11 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) { if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() { continue } + errs[i] = isJobRollbackable(job, id) + if errs[i] != nil { + continue + } + job.State = model.JobStateCancelling // Make sure RawArgs isn't overwritten. err := job.DecodeArgs(job.RawArgs) @@ -701,6 +716,7 @@ const ( codeInvalidColumnState = 3 codeDDLJobNotFound = 4 codeCancelFinishedJob = 5 + codeCannotCancelDDLJob = 6 ) var ( @@ -712,4 +728,6 @@ var ( ErrDDLJobNotFound = terror.ClassAdmin.New(codeDDLJobNotFound, "DDL Job:%v not found") // ErrCancelFinishedDDLJob returns when cancel a finished ddl job. ErrCancelFinishedDDLJob = terror.ClassAdmin.New(codeCancelFinishedJob, "This job:%v is finished, so can't be cancelled") + // ErrCannotCancelDDLJob returns when cancel a almost finished ddl job, because cancel in now may cause data inconsistency. + ErrCannotCancelDDLJob = terror.ClassAdmin.New(codeCannotCancelDDLJob, "This job:%v is almost finished, can't be cancelled now") )