Skip to content

Commit

Permalink
ddl: reimplement the error handling in multi-schema change (pingcap#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Mar 16, 2022
1 parent 44acf28 commit ec0104c
Show file tree
Hide file tree
Showing 14 changed files with 162 additions and 374 deletions.
26 changes: 17 additions & 9 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,12 +537,18 @@ func checkDropColumnForStatePublic(tblInfo *model.TableInfo, colInfo *model.Colu
}

func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
tblInfo, colInfo, idxInfos, err := checkDropColumn(t, job)
tblInfo, colInfo, idxInfos, ifExists, err := checkDropColumn(t, job)
if err != nil {
if ifExists && dbterror.ErrCantDropFieldOrKey.Equal(err) {
job.Warning = toTError(err)
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
return ver, nil
}
return ver, errors.Trace(err)
}
if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
job.MarkNonRevertible()
job.SchemaState = colInfo.State
// Store the mark and enter the next DDL handling loop.
return updateVersionAndTableInfoWithCheck(t, job, tblInfo, false)
}
Expand Down Expand Up @@ -613,42 +619,44 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

func checkDropColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, []*model.IndexInfo, error) {
func checkDropColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, []*model.IndexInfo, /* ifExists */ bool, error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, errors.Trace(err)
return nil, nil, nil, false, errors.Trace(err)
}

var colName model.CIStr
// indexIds is used to make sure we don't truncate args when decoding the rawArgs.
var indexIds []int64
err = job.DecodeArgs(&colName, &indexIds)
var partitionIDs []int64 // nolint: unused
var ifExists bool
err = job.DecodeArgs(&colName, &indexIds, &partitionIDs, &ifExists)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, errors.Trace(err)
return nil, nil, nil, false, errors.Trace(err)
}

colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
if colInfo == nil || colInfo.Hidden {
job.State = model.JobStateCancelled
return nil, nil, nil, dbterror.ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName)
return nil, nil, nil, ifExists, dbterror.ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName)
}
if err = isDroppableColumn(job.MultiSchemaInfo != nil, tblInfo, colName); err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, errors.Trace(err)
return nil, nil, nil, false, errors.Trace(err)
}
idxInfos := listIndicesWithColumn(colName.L, tblInfo.Indices)
if len(idxInfos) > 0 {
for _, idxInfo := range idxInfos {
err = checkDropIndexOnAutoIncrementColumn(tblInfo, idxInfo)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, err
return nil, nil, nil, false, err
}
}
}
return tblInfo, colInfo, idxInfos, nil
return tblInfo, colInfo, idxInfos, false, nil
}

func onSetDefaultValue(t *meta.Meta, job *model.Job) (ver int64, _ error) {
Expand Down
17 changes: 1 addition & 16 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,11 +697,7 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
}
}

if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 {
for _, warning := range historyJob.MultiSchemaInfo.Warnings {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}
}
appendMultiChangeWarningsToOwnerCtx(ctx, historyJob)

logutil.BgLogger().Info("[ddl] DDL job is finished", zap.Int64("jobID", jobID))
return nil
Expand All @@ -711,17 +707,6 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
logutil.BgLogger().Info("[ddl] DDL job is failed", zap.Int64("jobID", jobID))
return errors.Trace(historyJob.Error)
}
// Only for JobStateCancelled job which is adding columns or drop columns or drop indexes.
if historyJob.IsCancelled() && (historyJob.Type == model.ActionAddColumns || historyJob.Type == model.ActionDropColumns ||
historyJob.Type == model.ActionDropIndexes) {
if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 {
for _, warning := range historyJob.MultiSchemaInfo.Warnings {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}
}
logutil.BgLogger().Info("[ddl] DDL job is cancelled", zap.Int64("jobID", jobID))
return nil
}
panic("When the state is JobStateRollbackDone or JobStateCancelled, historyJob.Error should never be nil")
}
}
Expand Down
63 changes: 2 additions & 61 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3966,15 +3966,9 @@ func (d *ddl) DropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTa
Type: model.ActionDropColumn,
BinlogInfo: &model.HistoryInfo{},
MultiSchemaInfo: multiSchemaInfo,
Args: []interface{}{colName},
Args: []interface{}{colName, nil /* index IDs */, nil /* partition IDs */, spec.IfExists},
}

err = d.doDDLJob(ctx, job)
// column not exists, but if_exists flags is true, so we ignore this error.
if dbterror.ErrCantDropFieldOrKey.Equal(err) && spec.IfExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
}
err = d.callHookOnChanged(err)
return errors.Trace(err)
}
Expand Down Expand Up @@ -5922,61 +5916,8 @@ func (d *ddl) DropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CI
SchemaName: schema.Name.L,
Type: jobTp,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{indexName},
}
err = d.doDDLJob(ctx, job)
// index not exists, but if_exists flags is true, so we ignore this error.
if dbterror.ErrCantDropFieldOrKey.Equal(err) && ifExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
}
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func (d *ddl) DropIndexes(ctx sessionctx.Context, ti ast.Ident, specs []*ast.AlterTableSpec) error {
schema, t, err := d.getSchemaAndTableByIdent(ctx, ti)
if err != nil {
return err
}

if t.Meta().TableCacheStatusType != model.TableCacheStatusDisable {
return errors.Trace(dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Drop Indexes"))
}
indexNames := make([]model.CIStr, 0, len(specs))
ifExists := make([]bool, 0, len(specs))
for _, spec := range specs {
var indexName model.CIStr
if spec.Tp == ast.AlterTableDropPrimaryKey {
indexName = model.NewCIStr(mysql.PrimaryKeyName)
} else {
indexName = model.NewCIStr(spec.Name)
}

indexInfo := t.Meta().FindIndexByName(indexName.L)
if indexInfo != nil {
_, err := checkIsDropPrimaryKey(indexName, indexInfo, t)
if err != nil {
return err
}
if err := checkDropIndexOnAutoIncrementColumn(t.Meta(), indexInfo); err != nil {
return errors.Trace(err)
}
}

indexNames = append(indexNames, indexName)
ifExists = append(ifExists, spec.IfExists)
}

job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionDropIndexes,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{indexNames, ifExists},
Args: []interface{}{indexName, indexInfo.ID, nil /* partition IDs */, ifExists},
}

err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
Expand Down
4 changes: 1 addition & 3 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func deleteRangeForDropSchemaObjectJob(w *worker, job *model.Job) error {
// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
return w.deleteRange(w.ddlJobCtx, job)
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes:
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn:
return w.deleteRange(w.ddlJobCtx, job)
case model.ActionMultiSchemaChange:
for _, sub := range job.MultiSchemaInfo.SubJobs {
Expand Down Expand Up @@ -840,8 +840,6 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = w.onCreateIndex(d, t, job, true)
case model.ActionDropIndex, model.ActionDropPrimaryKey:
ver, err = onDropIndex(t, job)
case model.ActionDropIndexes:
ver, err = onDropIndexes(t, job)
case model.ActionRenameIndex:
ver, err = onRenameIndex(t, job)
case model.ActionAddForeignKey:
Expand Down
26 changes: 0 additions & 26 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,11 +666,6 @@ func buildCancelJobTests(firstID int64) []testCancelJob {
{act: model.ActionModifyColumn, jobIDs: []int64{firstID + 68}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization},
{act: model.ActionModifyColumn, jobIDs: []int64{firstID + 69}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob}, cancelState: model.StatePublic},

// for drop indexes
{act: model.ActionDropIndexes, jobIDs: []int64{firstID + 72}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 72)}, cancelState: model.StateWriteOnly},
{act: model.ActionDropIndexes, jobIDs: []int64{firstID + 73}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 73)}, cancelState: model.StateDeleteOnly},
{act: model.ActionDropIndexes, jobIDs: []int64{firstID + 74}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 74)}, cancelState: model.StateWriteReorganization},

// for alter db placement
{act: model.ActionModifySchemaDefaultPlacement, jobIDs: []int64{firstID + 75}, cancelRetErrs: noErrs, cancelState: model.StateNone},
{act: model.ActionModifySchemaDefaultPlacement, jobIDs: []int64{firstID + 76}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 76)}, cancelState: model.StatePublic},
Expand Down Expand Up @@ -1307,27 +1302,6 @@ func (s *testDDLSerialSuiteToVerify) TestCancelJob() {
require.Equal(s.T(), baseTable.Meta().Columns[0].FieldType.Tp, mysql.TypeTiny)
require.Equal(s.T(), baseTable.Meta().Columns[0].FieldType.Flag&mysql.NotNullFlag, uint(1))
require.Nil(s.T(), failpoint.Disable("github.com/pingcap/tidb/ddl/skipMockContextDoExec"))

// for drop indexes
updateTest(&tests[54])
ifExists := make([]bool, 2)
idxNames := []model.CIStr{model.NewCIStr("i1"), model.NewCIStr("i2")}
dropIndexesArgs := []interface{}{idxNames, ifExists}
tableInfo := createTestTableForDropIndexes(s.T(), ctx, d, dbInfo, "test-drop-indexes", 6)
doDDLJobSuccess(ctx, d, s.T(), dbInfo.ID, tableInfo.ID, test.act, dropIndexesArgs)
s.checkDropIndexes(d, dbInfo.ID, tableInfo.ID, idxNames, true)

updateTest(&tests[55])
idxNames = []model.CIStr{model.NewCIStr("i3"), model.NewCIStr("i4")}
dropIndexesArgs = []interface{}{idxNames, ifExists}
doDDLJobSuccess(ctx, d, s.T(), dbInfo.ID, tableInfo.ID, test.act, dropIndexesArgs)
s.checkDropIndexes(d, dbInfo.ID, tableInfo.ID, idxNames, true)

updateTest(&tests[56])
idxNames = []model.CIStr{model.NewCIStr("i5"), model.NewCIStr("i6")}
dropIndexesArgs = []interface{}{idxNames, ifExists}
doDDLJobSuccess(ctx, d, s.T(), dbInfo.ID, tableInfo.ID, test.act, dropIndexesArgs)
s.checkDropIndexes(d, dbInfo.ID, tableInfo.ID, idxNames, true)
}

func TestIgnorableSpec(t *testing.T) {
Expand Down
18 changes: 0 additions & 18 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,24 +351,6 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
return doInsert(ctx, s, job.ID, indexID, startKey, endKey, now)
}
case model.ActionDropIndexes:
var indexIDs []int64
var partitionIDs []int64
if err := job.DecodeArgs(&[]model.CIStr{}, &[]bool{}, &indexIDs, &partitionIDs); err != nil {
return errors.Trace(err)
}
// Remove data in TiKV.
if len(indexIDs) == 0 {
return nil
}
if len(partitionIDs) == 0 {
return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now)
}
for _, pID := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pID, indexIDs, now); err != nil {
return errors.Trace(err)
}
}
case model.ActionDropColumn:
var colName model.CIStr
var indexIDs []int64
Expand Down
Loading

0 comments on commit ec0104c

Please sign in to comment.