Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support drop columns for multi-schema change #35737

Merged
merged 10 commits into from
Jun 30, 2022
26 changes: 12 additions & 14 deletions ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,21 +176,19 @@ var allTestCase = []testCancelJob{
{"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateWriteReorganization, model.StateNone}, true, true, nil},
{"alter table t add column c41 bigint, add column c42 bigint", false, subStates{model.StatePublic, model.StatePublic}, false, true, nil},
// Drop columns.
// TODO: fix schema state.
{"alter table t drop column c41, drop column c42", true, model.StateNone, true, false, nil},
{"alter table t drop column c41, drop column c42", false, model.StateDeleteOnly, true, false, nil},
{"alter table t drop column c41, drop column c42", false, model.StateDeleteOnly, false, true, []string{"alter table t add column c41 bigint, add column c42 bigint"}},
{"alter table t drop column c41, drop column c42", false, model.StateWriteOnly, true, true, []string{"alter table t add column c41 bigint, add column c42 bigint"}},
{"alter table t drop column c41, drop column c42", false, model.StateDeleteReorganization, true, true, []string{"alter table t add column c41 bigint, add column c42 bigint"}},
{"alter table t drop column c41, drop column c42", false, model.StatePublic, false, true, []string{"alter table t add column c41 bigint, add column c42 bigint"}},
{"alter table t drop column c41, drop column c42", true, subStates{model.StatePublic, model.StatePublic}, true, false, nil},
{"alter table t drop column c41, drop column c42", false, subStates{model.StateDeleteOnly, model.StateDeleteOnly}, true, false, nil},
{"alter table t drop column c41, drop column c42", false, subStates{model.StateDeleteOnly, model.StateDeleteOnly}, false, true, []string{"alter table t add column c41 bigint, add column c42 bigint"}},
{"alter table t drop column c41, drop column c42", false, subStates{model.StateWriteOnly, model.StateDeleteOnly}, true, true, []string{"alter table t add column c41 bigint, add column c42 bigint"}},
{"alter table t drop column c41, drop column c42", false, subStates{model.StateDeleteReorganization, model.StateDeleteOnly}, true, true, []string{"alter table t add column c41 bigint, add column c42 bigint"}},
{"alter table t drop column c41, drop column c42", false, subStates{model.StateNone, model.StateDeleteOnly}, false, true, []string{"alter table t add column c41 bigint, add column c42 bigint"}},
// Drop columns with index.
// TODO: fix schema state.
{"alter table t drop column c41, drop column c42", true, model.StateNone, true, false, []string{"alter table t add column c41 bigint, add column c42 bigint", "alter table t add index drop_columns_idx(c41)"}},
{"alter table t drop column c41, drop column c42", false, model.StateDeleteOnly, true, false, nil},
{"alter table t drop column c41, drop column c42", false, model.StateDeleteOnly, false, true, []string{"alter table t add column c41 bigint, add column c42 bigint", "alter table t add index drop_columns_idx(c41)"}},
{"alter table t drop column c41, drop column c42", false, model.StateWriteOnly, true, true, []string{"alter table t add column c41 bigint, add column c42 bigint", "alter table t add index drop_columns_idx(c41)"}},
{"alter table t drop column c41, drop column c42", false, model.StateDeleteReorganization, true, true, []string{"alter table t add column c41 bigint, add column c42 bigint", "alter table t add index drop_columns_idx(c41)"}},
{"alter table t drop column c41, drop column c42", false, model.StatePublic, false, true, []string{"alter table t add column c41 bigint, add column c42 bigint", "alter table t add index drop_columns_idx(c41)"}},
{"alter table t drop column c41, drop column c42", true, subStates{model.StatePublic, model.StatePublic}, true, false, []string{"alter table t add column c41 bigint, add column c42 bigint", "alter table t add index drop_columns_idx(c41)"}},
{"alter table t drop column c41, drop column c42", false, subStates{model.StateDeleteOnly, model.StateDeleteOnly}, true, false, nil},
{"alter table t drop column c41, drop column c42", false, subStates{model.StateDeleteOnly, model.StateDeleteOnly}, false, true, []string{"alter table t add column c41 bigint, add column c42 bigint", "alter table t add index drop_columns_idx(c41)"}},
{"alter table t drop column c41, drop column c42", false, subStates{model.StateWriteOnly, model.StateDeleteOnly}, true, true, []string{"alter table t add column c41 bigint, add column c42 bigint", "alter table t add index drop_columns_idx(c41)"}},
{"alter table t drop column c41, drop column c42", false, subStates{model.StateDeleteReorganization, model.StateDeleteOnly}, true, true, []string{"alter table t add column c41 bigint, add column c42 bigint", "alter table t add index drop_columns_idx(c41)"}},
{"alter table t drop column c41, drop column c42", false, subStates{model.StateNone, model.StateDeleteOnly}, false, true, []string{"alter table t add column c41 bigint, add column c42 bigint", "alter table t add index drop_columns_idx(c41)"}},
// Alter index visibility.
{"alter table t alter index idx_v invisible", true, model.StateNone, true, false, []string{"alter table t add index idx_v(c1)"}},
{"alter table t alter index idx_v invisible", false, model.StatePublic, false, true, nil},
Expand Down
29 changes: 21 additions & 8 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,21 @@ func checkDropColumnForStatePublic(tblInfo *model.TableInfo, colInfo *model.Colu
}

func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
tblInfo, colInfo, idxInfos, err := checkDropColumn(t, job)
tblInfo, colInfo, idxInfos, ifExists, err := checkDropColumn(t, job)
if err != nil {
if ifExists && dbterror.ErrCantDropFieldOrKey.Equal(err) {
wjhuang2016 marked this conversation as resolved.
Show resolved Hide resolved
job.Warning = toTError(err)
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
return ver, nil
}
return ver, errors.Trace(err)
}
if job.MultiSchemaInfo != nil && !job.IsRollingback() && job.MultiSchemaInfo.Revertible {
job.MarkNonRevertible()
job.SchemaState = colInfo.State
// Store the mark and enter the next DDL handling loop.
return updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, false)
}

originalState := colInfo.State
switch colInfo.State {
Expand Down Expand Up @@ -619,6 +630,7 @@ func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
case model.StateDeleteReorganization:
// reorganization -> absent
// All reorganization jobs are done, drop this column.
tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1)
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1]
colInfo.State = model.StateNone
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != colInfo.State)
Expand All @@ -641,33 +653,34 @@ func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
return ver, errors.Trace(err)
}

func checkDropColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, []*model.IndexInfo, error) {
func checkDropColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, []*model.IndexInfo, bool /* ifExists */, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, errors.Trace(err)
return nil, nil, nil, false, errors.Trace(err)
}

var colName model.CIStr
var ifExists bool
// indexIds is used to make sure we don't truncate args when decoding the rawArgs.
var indexIds []int64
err = job.DecodeArgs(&colName, &indexIds)
err = job.DecodeArgs(&colName, &ifExists, &indexIds)
tangenta marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, errors.Trace(err)
return nil, nil, nil, false, errors.Trace(err)
}

colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
if colInfo == nil || colInfo.Hidden {
job.State = model.JobStateCancelled
return nil, nil, nil, dbterror.ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName)
return nil, nil, nil, ifExists, dbterror.ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName)
}
if err = isDroppableColumn(job.MultiSchemaInfo != nil, tblInfo, colName); err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, errors.Trace(err)
return nil, nil, nil, false, errors.Trace(err)
}
idxInfos := listIndicesWithColumn(colName.L, tblInfo.Indices)
return tblInfo, colInfo, idxInfos, nil
return tblInfo, colInfo, idxInfos, false, nil
}

func onSetDefaultValue(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
Expand Down
8 changes: 4 additions & 4 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,8 @@ func TestIssue5092(t *testing.T) {
tk.MustExec("create table t_issue_5092 (a int)")
tk.MustExec("alter table t_issue_5092 add column (b int, c int)")
tk.MustGetErrCode("alter table t_issue_5092 drop column if exists a, drop column b, drop column c", errno.ErrCantRemoveAllFields)
tk.MustGetErrCode("alter table t_issue_5092 drop column if exists c, drop column c", errno.ErrCantDropFieldOrKey)
tk.MustExec("alter table t_issue_5092 drop column c, drop column if exists c")
tk.MustGetErrCode("alter table t_issue_5092 drop column if exists c, drop column c", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t_issue_5092 drop column c, drop column if exists c", errno.ErrUnsupportedDDLOperation)
tk.MustExec("drop table t_issue_5092")
}

Expand Down Expand Up @@ -593,9 +593,9 @@ func TestErrnoErrorCode(t *testing.T) {
sql = "alter table test_drop_columns drop column c1, drop column c2, drop column c3;"
tk.MustGetErrCode(sql, errno.ErrCantRemoveAllFields)
sql = "alter table test_drop_columns drop column c1, add column c2 int;"
tk.MustGetErrCode(sql, errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode(sql, errno.ErrDupFieldName)
sql = "alter table test_drop_columns drop column c1, drop column c1;"
tk.MustGetErrCode(sql, errno.ErrCantDropFieldOrKey)
tk.MustGetErrCode(sql, errno.ErrUnsupportedDDLOperation)
// add index
sql = "alter table test_error_code_succ add index idx (c_not_exist)"
tk.MustGetErrCode(sql, errno.ErrKeyColumnDoesNotExits)
Expand Down
12 changes: 12 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,18 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
setDDLJobQuery(ctx, job)
task := &limitJobTask{job, make(chan error)}
d.limitJobCh <- task

failpoint.Inject("mockParallelSameDDLJobTwice", func(val failpoint.Value) {
if val.(bool) {
// The same job will be put to the DDL queue twice.
task1 := &limitJobTask{job, make(chan error)}
d.limitJobCh <- task1
<-task.err
// The second job result is used for test.
task = task1
}
})

// worker should restart to continue handling tasks in limitJobCh, and send back through task.err
err := <-task.err
if err != nil {
Expand Down
24 changes: 15 additions & 9 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2978,13 +2978,24 @@ func checkMultiSpecs(sctx sessionctx.Context, specs []*ast.AlterTableSpec) error
return dbterror.ErrRunMultiSchemaChanges
}
} else {
if len(specs) > 1 && !isSameTypeMultiSpecs(specs) {
if len(specs) > 1 && !isSameTypeMultiSpecs(specs) && !allSupported(specs) {
return dbterror.ErrRunMultiSchemaChanges
}
}
return nil
}

func allSupported(specs []*ast.AlterTableSpec) bool {
for _, s := range specs {
switch s.Tp {
case ast.AlterTableAddColumns, ast.AlterTableDropColumn:
tangenta marked this conversation as resolved.
Show resolved Hide resolved
default:
return false
}
}
return true
}

func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast.Ident, specs []*ast.AlterTableSpec) (err error) {
validSpecs, err := resolveAlterTableSpec(sctx, specs)
if err != nil {
Expand Down Expand Up @@ -3019,7 +3030,7 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast
case ast.AlterTableAddColumns:
useMultiSchemaChange = true
case ast.AlterTableDropColumn:
err = d.DropColumns(sctx, ident, validSpecs)
useMultiSchemaChange = true
tangenta marked this conversation as resolved.
Show resolved Hide resolved
case ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex:
err = d.DropIndexes(sctx, ident, validSpecs)
default:
Expand Down Expand Up @@ -4016,20 +4027,15 @@ func (d *ddl) DropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTa
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: model.StatePublic,
TableName: t.Meta().Name.L,
Type: model.ActionDropColumn,
BinlogInfo: &model.HistoryInfo{},
MultiSchemaInfo: multiSchemaInfo,
SchemaState: model.StatePublic,
Args: []interface{}{colName},
Args: []interface{}{colName, spec.IfExists},
}

err = d.DoDDLJob(ctx, job)
// column not exists, but if_exists flags is true, so we ignore this error.
if dbterror.ErrCantDropFieldOrKey.Equal(err) && spec.IfExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
}
err = d.callHookOnChanged(job, err)
return errors.Trace(err)
}
Expand Down
9 changes: 8 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,11 @@ func (w *worker) deleteRange(ctx context.Context, job *model.Job) error {

func jobNeedGC(job *model.Job) bool {
if !job.IsCancelled() {
if job.Warning != nil && dbterror.ErrCantDropFieldOrKey.Equal(job.Warning) {
wjhuang2016 marked this conversation as resolved.
Show resolved Hide resolved
// For the field/key not exists warnings, there is no need to
// delete the ranges.
return false
}
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
if job.State != model.JobStateRollbackDone {
Expand All @@ -444,6 +449,8 @@ func jobNeedGC(job *model.Job) bool {
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes:
return true
case model.ActionMultiSchemaChange:
tangenta marked this conversation as resolved.
Show resolved Hide resolved
return true
}
}
return false
Expand All @@ -460,7 +467,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
if jobNeedGC(job) {
err = w.deleteRange(w.ctx, job)
if err != nil {
return err
return errors.Trace(err)
}
}

Expand Down
Loading