Skip to content

Commit

Permalink
cherry pick #23903 to release-5.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
AilinKid authored and ti-srebot committed Apr 16, 2021
1 parent 112c972 commit efa9d9c
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 27 deletions.
67 changes: 40 additions & 27 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ import (
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand All @@ -40,8 +42,11 @@ func updateColsNull2NotNull(tblInfo *model.TableInfo, indexInfo *model.IndexInfo
}

func convertAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, err error) (int64, error) {
job.State = model.JobStateRollingback

failpoint.Inject("mockConvertAddIdxJob2RollbackJobError", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(0, errors.New("mock convert add index job to rollback job error"))
}
})
if indexInfo.Primary {
nullCols, err := getNullColInfos(tblInfo, indexInfo)
if err != nil {
Expand All @@ -68,7 +73,7 @@ func convertAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.T
if err1 != nil {
return ver, errors.Trace(err1)
}

job.State = model.JobStateRollingback
return ver, errors.Trace(err)
}

Expand Down Expand Up @@ -139,7 +144,6 @@ func rollingbackModifyColumn(t *meta.Meta, job *model.Job) (ver int64, err error
}

func rollingbackAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
job.State = model.JobStateRollingback
tblInfo, columnInfo, col, _, _, err := checkAddColumn(t, job)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -158,11 +162,12 @@ func rollingbackAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
if err != nil {
return ver, errors.Trace(err)
}

job.State = model.JobStateRollingback
return ver, errCancelledDDLJob
}

func rollingbackAddColumns(t *meta.Meta, job *model.Job) (ver int64, err error) {
job.State = model.JobStateRollingback
tblInfo, columnInfos, _, _, _, _, err := checkAddColumns(t, job)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -186,11 +191,12 @@ func rollingbackAddColumns(t *meta.Meta, job *model.Job) (ver int64, err error)
if err != nil {
return ver, errors.Trace(err)
}
job.State = model.JobStateRollingback
return ver, errCancelledDDLJob
}

func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
tblInfo, colInfo, idxInfos, err := checkDropColumn(t, job)
_, colInfo, idxInfos, err := checkDropColumn(t, job)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -213,7 +219,6 @@ func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error)
// StatePublic means when the job is not running yet.
if colInfo.State == model.StatePublic {
job.State = model.JobStateCancelled
job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo)
return ver, errCancelledDDLJob
}
// In the state of drop column `write only -> delete only -> reorganization`,
Expand All @@ -223,7 +228,7 @@ func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error)
}

func rollingbackDropColumns(t *meta.Meta, job *model.Job) (ver int64, err error) {
tblInfo, colInfos, _, idxInfos, err := checkDropColumns(t, job)
_, colInfos, _, idxInfos, err := checkDropColumns(t, job)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -246,7 +251,6 @@ func rollingbackDropColumns(t *meta.Meta, job *model.Job) (ver int64, err error)
// StatePublic means when the job is not running yet.
if colInfos[0].State == model.StatePublic {
job.State = model.JobStateCancelled
job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo)
return ver, errCancelledDDLJob
}
// In the state of drop columns `write only -> delete only -> reorganization`,
Expand All @@ -256,33 +260,23 @@ func rollingbackDropColumns(t *meta.Meta, job *model.Job) (ver int64, err error)
}

func rollingbackDropIndex(t *meta.Meta, job *model.Job) (ver int64, err error) {
tblInfo, indexInfo, err := checkDropIndex(t, job)
_, indexInfo, err := checkDropIndex(t, job)
if err != nil {
return ver, errors.Trace(err)
}

originalState := indexInfo.State
switch indexInfo.State {
case model.StateWriteOnly, model.StateDeleteOnly, model.StateDeleteReorganization, model.StateNone:
// We can not rollback now, so just continue to drop index.
// Normally won't fetch here, because there is check when cancel ddl jobs. see function: isJobRollbackable.
job.State = model.JobStateRunning
return ver, nil
case model.StatePublic:
job.State = model.JobStateRollbackDone
indexInfo.State = model.StatePublic
job.State = model.JobStateCancelled
return ver, errCancelledDDLJob
default:
return ver, ErrInvalidDDLState.GenWithStackByArgs("index", indexInfo.State)
}

job.SchemaState = indexInfo.State
job.Args = []interface{}{indexInfo.Name}
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo)
return ver, errCancelledDDLJob
}

func rollingbackAddIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, isPK bool) (ver int64, err error) {
Expand All @@ -300,7 +294,6 @@ func rollingbackAddIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, isP
}

func convertAddTablePartitionJob2RollbackJob(t *meta.Meta, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) {
job.State = model.JobStateRollingback
addingDefinitions := tblInfo.Partition.AddingDefinitions
partNames := make([]string, 0, len(addingDefinitions))
for _, pd := range addingDefinitions {
Expand All @@ -311,6 +304,7 @@ func convertAddTablePartitionJob2RollbackJob(t *meta.Meta, job *model.Job, other
if err != nil {
return ver, errors.Trace(err)
}
job.State = model.JobStateRollingback
return ver, errors.Trace(otherwiseErr)
}

Expand Down Expand Up @@ -447,12 +441,31 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
if job.Error == nil {
job.Error = toTError(err)
}
if !job.Error.Equal(errCancelledDDLJob) {
job.Error = terror.GetErrClass(job.Error).Synthesize(terror.ErrCode(job.Error.Code()),
fmt.Sprintf("DDL job rollback, error msg: %s", terror.ToSQLError(job.Error).Message))
}
job.ErrorCount++

if errCancelledDDLJob.Equal(err) {
// The job is normally cancelled.
if !job.Error.Equal(errCancelledDDLJob) {
job.Error = terror.GetErrClass(job.Error).Synthesize(terror.ErrCode(job.Error.Code()),
fmt.Sprintf("DDL job rollback, error msg: %s", terror.ToSQLError(job.Error).Message))
}
} else {
// A job canceling meet other error.
//
// Once `convertJob2RollbackJob` meets an error, the job state can't be set as `JobStateRollingback` since
// job state and args may not be correctly overwritten. The job will be fetched to run with the cancelling
// state again. So we should check the error count here.
if err1 := loadDDLVars(w); err1 != nil {
logutil.Logger(w.logCtx).Error("[ddl] load DDL global variable failed", zap.Error(err1))
}
errorCount := variable.GetDDLErrorCountLimit()
if job.ErrorCount > errorCount {
logutil.Logger(w.logCtx).Warn("[ddl] rollback DDL job error count exceed the limit, cancelled it now", zap.Int64("jobID", job.ID), zap.Int64("errorCountLimit", errorCount))
job.Error = toTError(errors.Errorf("rollback DDL job error count exceed the limit %d, cancelled it now", errorCount))
job.State = model.JobStateCancelled
}
}

if job.State != model.JobStateRollingback && job.State != model.JobStateCancelled {
logutil.Logger(w.logCtx).Error("[ddl] run DDL job failed", zap.String("job", job.String()), zap.Error(err))
} else {
Expand Down
101 changes: 101 additions & 0 deletions ddl/rollingback_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl_test

import (
"context"
"strconv"

. "github.com/pingcap/check"
errors2 "github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/testkit"
)

var _ = SerialSuites(&testRollingBackSuite{&testDBSuite{}})

type testRollingBackSuite struct{ *testDBSuite }

// TestCancelJobMeetError is used to test canceling ddl job failure when convert ddl job to a rollingback job.
func (s *testRollingBackSuite) TestCancelAddIndexJobError(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk1 := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk1.MustExec("use test")

tk.MustExec("create table t_cancel_add_index (a int)")
tk.MustExec("insert into t_cancel_add_index values(1),(2),(3)")
tk.MustExec("set @@global.tidb_ddl_error_count_limit=3")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/mockConvertAddIdxJob2RollbackJobError", `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/ddl/mockConvertAddIdxJob2RollbackJobError"), IsNil)
}()

tbl := testGetTableByName(c, tk.Se, "test", "t_cancel_add_index")
c.Assert(tbl, NotNil)

d := s.dom.DDL()
hook := &ddl.TestDDLCallback{Do: s.dom}
var (
checkErr error
jobID int64
res sqlexec.RecordSet
)
hook.OnJobUpdatedExported = func(job *model.Job) {
if job.TableID != tbl.Meta().ID {
return
}
if job.Type != model.ActionAddIndex {
return
}
if job.SchemaState == model.StateDeleteOnly {
jobID = job.ID
res, checkErr = tk1.Exec("admin cancel ddl jobs " + strconv.Itoa(int(job.ID)))
// drain the result set here, otherwise the cancel action won't take effect immediately.
chk := res.NewChunk()
if err := res.Next(context.Background(), chk); err != nil {
checkErr = err
return
}
if err := res.Close(); err != nil {
checkErr = err
}
}
}
d.(ddl.DDLForTest).SetHook(hook)

// This will hang on stateDeleteOnly, and the job will be canceled.
_, err := tk.Exec("alter table t_cancel_add_index add index idx(a)")
c.Assert(err, NotNil)
c.Assert(checkErr, IsNil)
c.Assert(err.Error(), Equals, "[ddl:-1]rollback DDL job error count exceed the limit 3, cancelled it now")

// Verification of the history job state.
var job *model.Job
err = kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err1 error
job, err1 = t.GetHistoryDDLJob(jobID)
return errors2.Trace(err1)
})
c.Assert(err, IsNil)
c.Assert(job.ErrorCount, Equals, int64(4))
c.Assert(job.Error.Error(), Equals, "[ddl:-1]rollback DDL job error count exceed the limit 3, cancelled it now")
}

0 comments on commit efa9d9c

Please sign in to comment.