Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix cancel drop column ddl error #9352

Merged
merged 3 commits into from
Feb 19, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,90 @@ func checkDelRangeDone(c *C, ctx sessionctx.Context, idx table.Index) {
c.Assert(handles, HasLen, 0, Commentf("take time %v", time.Since(startTime)))
}

// TestCancelDropColumn tests cancel ddl job which type is drop column.
func (s *testDBSuite) TestCancelDropColumn(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
s.mustExec(c, "drop table if exists test_drop_column")
s.mustExec(c, "create table test_drop_column(c1 int, c2 int)")
defer s.mustExec(c, "drop table test_drop_column;")
testCases := []struct {
needAddColumn bool
jobState model.JobState
JobSchemaState model.SchemaState
cancelSucc bool
}{
{true, model.JobStateNone, model.StateNone, true},
{false, model.JobStateRunning, model.StateWriteOnly, false},
{true, model.JobStateRunning, model.StateDeleteOnly, false},
{true, model.JobStateRunning, model.StateDeleteReorganization, false},
}
var checkErr error
hook := &ddl.TestDDLCallback{}
var jobID int64
testCase := &testCases[0]
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionDropColumn && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState {
jobIDs := []int64{job.ID}
jobID = job.ID
hookCtx := mock.NewContext()
hookCtx.Store = s.store
err := hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
return
}
txn, err := hookCtx.Txn(true)
if err != nil {
checkErr = errors.Trace(err)
return
}
errs, err := admin.CancelJobs(txn, jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}
if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}
checkErr = txn.Commit(context.Background())
}
}

s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
var err1 error
for i := range testCases {
testCase = &testCases[i]
if testCase.needAddColumn {
s.mustExec(c, "alter table test_drop_column add column c3 int")
}
_, err1 = s.tk.Exec("alter table test_drop_column drop column c3")
var col1 *table.Column
t := s.testGetTable(c, "test_drop_column")
for _, col := range t.Cols() {
if strings.EqualFold(col.Name.L, "c3") {
col1 = col
break
}
}
if testCase.cancelSucc {
c.Assert(checkErr, IsNil)
c.Assert(col1, NotNil)
c.Assert(col1.Name.L, Equals, "c3")
c.Assert(err1.Error(), Equals, "[ddl:12]cancelled DDL job")
} else {
c.Assert(col1, IsNil)
c.Assert(err1, IsNil)
c.Assert(checkErr, NotNil)
c.Assert(checkErr.Error(), Equals, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error())
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(&ddl.TestDDLCallback{})
s.mustExec(c, "alter table test_drop_column add column c3 int")
s.mustExec(c, "alter table test_drop_column drop column c3")
}

func (s *testDBSuite) TestAddIndexWithDupCols(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
Expand Down
46 changes: 42 additions & 4 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@ func buildCancelJobTests(firstID int64) []testCancelJob {
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 10}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 11}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 12}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 12)}, cancelState: model.StatePublic, ddlRetErr: err},

{act: model.ActionDropColumn, jobIDs: []int64{firstID + 13}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 13)}, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 14}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 14)}, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 15}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 15)}, cancelState: model.StateWriteReorganization, ddlRetErr: err},
}

return tests
Expand Down Expand Up @@ -380,6 +384,18 @@ func (s *testDDLSuite) checkAddColumn(c *C, d *ddl, schemaID int64, tableID int6
c.Assert(found, Equals, success)
}

func (s *testDDLSuite) checkCancelDropColumn(c *C, d *ddl, schemaID int64, tableID int64, colName string, success bool) {
changedTable := testGetTable(c, d, schemaID, tableID)
notFound := true
for _, colInfo := range changedTable.Meta().Columns {
if colInfo.Name.O == colName {
notFound = false
break
}
}
c.Assert(notFound, Equals, success)
}

func (s *testDDLSuite) TestCancelJob(c *C) {
store := testCreateStore(c, "test_cancel_job")
defer store.Close()
Expand All @@ -388,15 +404,15 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
dbInfo := testSchemaInfo(c, d, "test_cancel_job")
testCreateSchema(c, testNewContext(d), d, dbInfo)

// create table t (c1 int, c2 int);
tblInfo := testTableInfo(c, d, "t", 2)
// create table t (c1 int, c2 int, c3 int, c4 int, c5 int);
tblInfo := testTableInfo(c, d, "t", 5)
ctx := testNewContext(d)
err := ctx.NewTxn()
c.Assert(err, IsNil)
job := testCreateTable(c, ctx, d, dbInfo, tblInfo)
// insert t values (1, 2);
// insert t values (1, 2, 3, 4, 5);
originTable := testGetTable(c, d, dbInfo.ID, tblInfo.ID)
row := types.MakeDatums(1, 2)
row := types.MakeDatums(1, 2, 3, 4, 5)
_, err = originTable.AddRecord(ctx, row)
c.Assert(err, IsNil)
txn, err := ctx.Txn(true)
Expand Down Expand Up @@ -518,6 +534,28 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
testAddColumn(c, ctx, d, dbInfo, tblInfo, addColumnArgs)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, true)

// for drop column.
test = &tests[12]
dropColName := "c3"
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)
testDropColumn(c, ctx, d, dbInfo, tblInfo, dropColName, false)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, true)

test = &tests[13]
dropColName = "c4"
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)
testDropColumn(c, ctx, d, dbInfo, tblInfo, dropColName, false)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, true)

test = &tests[14]
dropColName = "c5"
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)
testDropColumn(c, ctx, d, dbInfo, tblInfo, dropColName, false)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkCancelDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, true)
}

func (s *testDDLSuite) TestIgnorableSpec(c *C) {
Expand Down
34 changes: 34 additions & 0 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,46 @@ func rollingbackAddindex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return
}

func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
tblInfo, err := getTableInfo(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

var colName model.CIStr
err = job.DecodeArgs(&colName)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
if colInfo == nil {
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName)
}

// StatePublic means when the job is not running yet.
if colInfo.State == model.StatePublic {
job.State = model.JobStateCancelled
} else {
// In the state of drop column `write only -> delete only -> reorganization`,
// We can not rollback now, so just continue to drop column.
job.State = model.JobStateRunning
return ver, errors.Trace(nil)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo)
return ver, errors.Trace(errCancelledDDLJob)
}

func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
switch job.Type {
case model.ActionAddColumn:
ver, err = rollingbackAddColumn(t, job)
case model.ActionAddIndex:
ver, err = rollingbackAddindex(w, d, t, job)
case model.ActionDropColumn:
ver, err = rollingbackDropColumn(t, job)
default:
job.State = model.JobStateCancelled
err = errCancelledDDLJob
Expand Down
18 changes: 18 additions & 0 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ func GetDDLInfo(txn kv.Transaction) (*DDLInfo, error) {
return info, nil
}

func isJobRollbackable(job *model.Job, id int64) error {
switch job.Type {
case model.ActionDropColumn:
if job.SchemaState != model.StateNone {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
}
}
return nil
}

// CancelJobs cancels the DDL jobs.
func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) {
if len(ids) == 0 {
Expand Down Expand Up @@ -113,6 +123,11 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) {
if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() {
continue
}
errs[i] = isJobRollbackable(job, id)
if errs[i] != nil {
continue
}

job.State = model.JobStateCancelling
// Make sure RawArgs isn't overwritten.
err := job.DecodeArgs(job.RawArgs)
Expand Down Expand Up @@ -701,6 +716,7 @@ const (
codeInvalidColumnState = 3
codeDDLJobNotFound = 4
codeCancelFinishedJob = 5
codeCannotCancelDDLJob = 6
)

var (
Expand All @@ -712,4 +728,6 @@ var (
ErrDDLJobNotFound = terror.ClassAdmin.New(codeDDLJobNotFound, "DDL Job:%v not found")
// ErrCancelFinishedDDLJob returns when cancel a finished ddl job.
ErrCancelFinishedDDLJob = terror.ClassAdmin.New(codeCancelFinishedJob, "This job:%v is finished, so can't be cancelled")
// ErrCannotCancelDDLJob returns when cancel a almost finished ddl job, because cancel in now may cause data inconsistency.
ErrCannotCancelDDLJob = terror.ClassAdmin.New(codeCannotCancelDDLJob, "This job:%v is almost finished, can't be cancelled now")
)