Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: remove onDropColumns and onAddColumns #35862

Merged
merged 4 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 0 additions & 276 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,50 +250,6 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
return ver, errors.Trace(err)
}

func checkAddColumns(t *meta.Meta, job *model.Job) (*model.TableInfo, []*model.ColumnInfo, []*model.ColumnInfo, []*ast.ColumnPosition, []int, []bool, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, nil, nil, nil, errors.Trace(err)
}
columns := []*model.ColumnInfo{}
positions := []*ast.ColumnPosition{}
offsets := []int{}
ifNotExists := []bool{}
err = job.DecodeArgs(&columns, &positions, &offsets, &ifNotExists)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, nil, nil, nil, errors.Trace(err)
}

columnInfos := make([]*model.ColumnInfo, 0, len(columns))
newColumns := make([]*model.ColumnInfo, 0, len(columns))
newPositions := make([]*ast.ColumnPosition, 0, len(columns))
newOffsets := make([]int, 0, len(columns))
newIfNotExists := make([]bool, 0, len(columns))
for i, col := range columns {
columnInfo := model.FindColumnInfo(tblInfo.Columns, col.Name.L)
if columnInfo != nil {
if columnInfo.State == model.StatePublic {
// We already have a column with the same column name.
if ifNotExists[i] {
// TODO: Should return a warning.
logutil.BgLogger().Warn("[ddl] check add columns, duplicate column", zap.Stringer("col", col.Name))
continue
}
job.State = model.JobStateCancelled
return nil, nil, nil, nil, nil, nil, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
}
columnInfos = append(columnInfos, columnInfo)
}
newColumns = append(newColumns, columns[i])
newPositions = append(newPositions, positions[i])
newOffsets = append(newOffsets, offsets[i])
newIfNotExists = append(newIfNotExists, ifNotExists[i])
}
return tblInfo, columnInfos, newColumns, newPositions, newOffsets, newIfNotExists, nil
}

func setColumnsState(columnInfos []*model.ColumnInfo, state model.SchemaState) {
for i := range columnInfos {
columnInfos[i].State = state
Expand All @@ -318,238 +274,6 @@ func setIndicesState(indexInfos []*model.IndexInfo, state model.SchemaState) {
}
}

func onAddColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = onDropColumns(d, t, job)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
}

failpoint.Inject("errorBeforeDecodeArgs", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(ver, errors.New("occur an error before decode args"))
}
})

tblInfo, columnInfos, columns, positions, offsets, ifNotExists, err := checkAddColumns(t, job)
if err != nil {
return ver, errors.Trace(err)
}
if len(columnInfos) == 0 {
if len(columns) == 0 {
job.State = model.JobStateCancelled
return ver, nil
}
for i := range columns {
columnInfo, pos, offset, err := createColumnInfoWithPosCheck(tblInfo, columns[i], positions[i])
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
logutil.BgLogger().Info("[ddl] run add columns job", zap.String("job", job.String()), zap.Reflect("columnInfo", *columnInfo), zap.Int("offset", offset))
positions[i] = pos
offsets[i] = offset
if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
columnInfos = append(columnInfos, columnInfo)
}
// Set arg to job.
job.Args = []interface{}{columnInfos, positions, offsets, ifNotExists}
}

originalState := columnInfos[0].State
switch columnInfos[0].State {
case model.StateNone:
// none -> delete only
setColumnsState(columnInfos, model.StateDeleteOnly)
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteOnly
case model.StateDeleteOnly:
// delete only -> write only
setColumnsState(columnInfos, model.StateWriteOnly)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteOnly
case model.StateWriteOnly:
// write only -> reorganization
setColumnsState(columnInfos, model.StateWriteReorganization)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteReorganization
case model.StateWriteReorganization:
// reorganization -> public
// Adjust table column offsets.
oldCols := tblInfo.Columns[:len(tblInfo.Columns)-len(offsets)]
newCols := tblInfo.Columns[len(tblInfo.Columns)-len(offsets):]
tblInfo.Columns = oldCols
for i := range offsets {
// For multiple columns with after position, should adjust offsets.
// e.g. create table t(a int);
// alter table t add column b int after a, add column c int after a;
// alter table t add column a1 int after a, add column b1 int after b, add column c1 int after c;
// alter table t add column a1 int after a, add column b1 int first;
if positions[i].Tp == ast.ColumnPositionAfter {
for j := 0; j < i; j++ {
if (positions[j].Tp == ast.ColumnPositionAfter && offsets[j] < offsets[i]) || positions[j].Tp == ast.ColumnPositionFirst {
offsets[i]++
}
}
}
tblInfo.Columns = append(tblInfo.Columns, newCols[i])
tblInfo.MoveColumnInfo(len(tblInfo.Columns)-1, offsets[i])
}
setColumnsState(columnInfos, model.StatePublic)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
asyncNotifyEvent(d, &ddlutil.Event{Tp: model.ActionAddColumns, TableInfo: tblInfo, ColumnInfos: columnInfos})
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", columnInfos[0].State)
}

return ver, errors.Trace(err)
}

func onDropColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
tblInfo, colInfos, delCount, idxInfos, err := checkDropColumns(t, job)
if err != nil {
return ver, errors.Trace(err)
}
if len(colInfos) == 0 {
job.State = model.JobStateCancelled
return ver, nil
}

originalState := colInfos[0].State
switch colInfos[0].State {
case model.StatePublic:
// public -> write only
setColumnsState(colInfos, model.StateWriteOnly)
setIndicesState(idxInfos, model.StateWriteOnly)
for _, colInfo := range colInfos {
err = checkDropColumnForStatePublic(tblInfo, colInfo)
if err != nil {
return ver, errors.Trace(err)
}
}
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteOnly
case model.StateWriteOnly:
// write only -> delete only
setColumnsState(colInfos, model.StateDeleteOnly)
if len(idxInfos) > 0 {
newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
if !indexInfoContains(idx.ID, idxInfos) {
newIndices = append(newIndices, idx)
}
}
tblInfo.Indices = newIndices
}
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.Args = append(job.Args, indexInfosToIDList(idxInfos))
job.SchemaState = model.StateDeleteOnly
case model.StateDeleteOnly:
// delete only -> reorganization
setColumnsState(colInfos, model.StateDeleteReorganization)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteReorganization
case model.StateDeleteReorganization:
// reorganization -> absent
// All reorganization jobs are done, drop this column.
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-delCount]
setColumnsState(colInfos, model.StateNone)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}

// Finish this job.
if job.IsRollingback() {
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
} else {
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
job.Args = append(job.Args, getPartitionIDs(tblInfo))
}
default:
err = dbterror.ErrInvalidDDLJob.GenWithStackByArgs("table", tblInfo.State)
}
return ver, errors.Trace(err)
}

func checkDropColumns(t *meta.Meta, job *model.Job) (*model.TableInfo, []*model.ColumnInfo, int, []*model.IndexInfo, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, 0, nil, errors.Trace(err)
}

var colNames []model.CIStr
var ifExists []bool
// indexIds is used to make sure we don't truncate args when decoding the rawArgs.
var indexIds []int64
err = job.DecodeArgs(&colNames, &ifExists, &indexIds)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, 0, nil, errors.Trace(err)
}

newColNames := make([]model.CIStr, 0, len(colNames))
colInfos := make([]*model.ColumnInfo, 0, len(colNames))
newIfExists := make([]bool, 0, len(colNames))
indexInfos := make([]*model.IndexInfo, 0)
for i, colName := range colNames {
colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
if colInfo == nil || colInfo.Hidden {
if ifExists[i] {
// TODO: Should return a warning.
logutil.BgLogger().Warn(fmt.Sprintf("column %s doesn't exist", colName))
continue
}
job.State = model.JobStateCancelled
return nil, nil, 0, nil, dbterror.ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName)
}
if err = isDroppableColumn(job.MultiSchemaInfo != nil, tblInfo, colName); err != nil {
job.State = model.JobStateCancelled
return nil, nil, 0, nil, errors.Trace(err)
}
newColNames = append(newColNames, colName)
newIfExists = append(newIfExists, ifExists[i])
colInfos = append(colInfos, colInfo)
idxInfos := listIndicesWithColumn(colName.L, tblInfo.Indices)
indexInfos = append(indexInfos, idxInfos...)
}
job.Args = []interface{}{newColNames, newIfExists}
if len(indexIds) > 0 {
job.Args = append(job.Args, indexIds)
}
return tblInfo, colInfos, len(colInfos), indexInfos, nil
}

func checkDropColumnForStatePublic(tblInfo *model.TableInfo, colInfo *model.ColumnInfo) (err error) {
// Set this column's offset to the last and reset all following columns' offsets.
adjustColumnInfoInDropColumn(tblInfo, colInfo.Offset)
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
return errors.Trace(historyJob.Error)
}
// Only for JobStateCancelled job which is adding columns or drop columns or drop indexes.
if historyJob.IsCancelled() && (historyJob.Type == model.ActionAddColumns || historyJob.Type == model.ActionDropColumns || historyJob.Type == model.ActionDropIndexes) {
if historyJob.IsCancelled() && (historyJob.Type == model.ActionDropIndexes) {
if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 {
for _, warning := range historyJob.MultiSchemaInfo.Warnings {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
Expand Down
Loading