From 0aa52df6496d1899031ca8a3b785a211481ffabb Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 16 Jun 2022 12:19:33 +0800 Subject: [PATCH 01/12] ddl: add framework for multi-schema change --- ddl/column.go | 74 ++++++++--- ddl/ddl.go | 13 +- ddl/ddl_api.go | 45 ++++--- ddl/ddl_worker.go | 13 +- ddl/index.go | 2 +- ddl/multi_schema_change.go | 218 +++++++++++++++++++++++++++++++- ddl/multi_schema_change_test.go | 100 +++++++++++++++ parser/model/ddl.go | 22 ++++ 8 files changed, 437 insertions(+), 50 deletions(-) create mode 100644 ddl/multi_schema_change_test.go diff --git a/ddl/column.go b/ddl/column.go index e271d6a540226..bec9f884620c4 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -83,7 +83,7 @@ func adjustColumnInfoInDropColumn(tblInfo *model.TableInfo, offset int) { tblInfo.Columns = newCols } -func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, *ast.ColumnPosition, int, error) { +func createColumnInfoWithPosCheck(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, *ast.ColumnPosition, int, error) { // Check column name duplicate. cols := tblInfo.Columns offset := len(cols) @@ -115,19 +115,34 @@ func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos * return colInfo, pos, offset, nil } -func checkAddColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, *model.ColumnInfo, *ast.ColumnPosition, int, error) { +func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo) *model.ColumnInfo { + cols := tblInfo.Columns + colInfo.ID = allocateColumnID(tblInfo) + colInfo.State = model.StateNone + // To support add column asynchronous, we should mark its offset as the last column. + // So that we can use origin column offset to get value from row. + colInfo.Offset = len(cols) + // Append the column info to the end of the tblInfo.Columns. + // It will reorder to the right offset in "Columns" when it state change to public. + tblInfo.Columns = append(cols, colInfo) + return colInfo +} + +func checkAddColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, *model.ColumnInfo, + *ast.ColumnPosition, bool /* ifNotExists */, error) { schemaID := job.SchemaID tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID) if err != nil { - return nil, nil, nil, nil, 0, errors.Trace(err) + return nil, nil, nil, nil, false, errors.Trace(err) } col := &model.ColumnInfo{} pos := &ast.ColumnPosition{} offset := 0 - err = job.DecodeArgs(col, pos, &offset) + ifNotExists := false + err = job.DecodeArgs(col, pos, &offset, &ifNotExists) if err != nil { job.State = model.JobStateCancelled - return nil, nil, nil, nil, 0, errors.Trace(err) + return nil, nil, nil, nil, false, errors.Trace(err) } columnInfo := model.FindColumnInfo(tblInfo.Columns, col.Name.L) @@ -135,10 +150,17 @@ func checkAddColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.Colu if columnInfo.State == model.StatePublic { // We already have a column with the same column name. job.State = model.JobStateCancelled - return nil, nil, nil, nil, 0, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name) + return nil, nil, nil, nil, ifNotExists, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name) } } - return tblInfo, columnInfo, col, pos, offset, nil + + err = checkPosition(tblInfo, pos) + if err != nil { + job.State = model.JobStateCancelled + return nil, nil, nil, nil, false, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name) + } + + return tblInfo, columnInfo, col, pos, false, nil } func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { @@ -157,21 +179,18 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) } }) - tblInfo, columnInfo, col, pos, offset, err := checkAddColumn(t, job) + tblInfo, columnInfo, col, pos, ifNotExists, err := checkAddColumn(t, job) if err != nil { + if ifNotExists && infoschema.ErrColumnExists.Equal(err) { + job.Warning = toTError(err) + job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) + return ver, nil + } return ver, errors.Trace(err) } if columnInfo == nil { - columnInfo, _, offset, err = createColumnInfo(tblInfo, col, pos) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - logutil.BgLogger().Info("[ddl] run add column job", zap.String("job", job.String()), zap.Reflect("columnInfo", *columnInfo), zap.Int("offset", offset)) - // Set offset arg to job. - if offset != 0 { - job.Args = []interface{}{columnInfo, pos, offset} - } + columnInfo = createColumnInfo(tblInfo, col) + logutil.BgLogger().Info("[ddl] run add column job", zap.String("job", job.String()), zap.Reflect("columnInfo", *columnInfo)) if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -206,9 +225,14 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) } // Update the job state when all affairs done. job.SchemaState = model.StateWriteReorganization + job.MarkNonRevertible() case model.StateWriteReorganization: // reorganization -> public // Adjust table column offset. + offset, err := locateOffsetToMove(columnInfo.Offset, pos, tblInfo) + if err != nil { + return ver, errors.Trace(err) + } tblInfo.MoveColumnInfo(columnInfo.Offset, offset) columnInfo.State = model.StatePublic ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != columnInfo.State) @@ -276,6 +300,16 @@ func setColumnsState(columnInfos []*model.ColumnInfo, state model.SchemaState) { } } +func checkPosition(tblInfo *model.TableInfo, pos *ast.ColumnPosition) error { + if pos != nil && pos.Tp == ast.ColumnPositionAfter { + c := model.FindColumnInfo(tblInfo.Columns, pos.RelativeColumn.Name.L) + if c == nil { + return infoschema.ErrColumnNotExists.GenWithStackByArgs(pos.RelativeColumn, tblInfo.Name) + } + } + return nil +} + func setIndicesState(indexInfos []*model.IndexInfo, state model.SchemaState) { for _, indexInfo := range indexInfos { indexInfo.State = state @@ -308,7 +342,7 @@ func onAddColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error return ver, nil } for i := range columns { - columnInfo, pos, offset, err := createColumnInfo(tblInfo, columns[i], positions[i]) + columnInfo, pos, offset, err := createColumnInfoWithPosCheck(tblInfo, columns[i], positions[i]) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -856,7 +890,7 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in return ver, errors.Trace(err) } - _, _, _, err = createColumnInfo(tblInfo, modifyInfo.changingCol, changingColPos) + _, _, _, err = createColumnInfoWithPosCheck(tblInfo, modifyInfo.changingCol, changingColPos) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) diff --git a/ddl/ddl.go b/ddl/ddl.go index 0893dc4126881..00623056f65cb 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -709,6 +709,12 @@ func setDDLJobQuery(ctx sessionctx.Context, job *model.Job) { // - context.Cancel: job has been sent to worker, but not found in history DDL job before cancel // - other: found in history DDL job and return that job error func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { + if mci := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; mci != nil { + // In multiple schema change, we don't run the job. + // Instead, we merge all the jobs into one pending job. + return appendToSubJobs(mci, job) + } + // Get a global job ID and put the DDL job in the queue. setDDLJobQuery(ctx, job) task := &limitJobTask{job, make(chan error)} @@ -783,12 +789,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { } } } - - if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 { - for _, warning := range historyJob.MultiSchemaInfo.Warnings { - ctx.GetSessionVars().StmtCtx.AppendWarning(warning) - } - } + appendMultiChangeWarningsToOwnerCtx(ctx, historyJob) logutil.BgLogger().Info("[ddl] DDL job is finished", zap.Int64("jobID", jobID)) return nil diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index e3d14ccae2f49..65d073fd0db83 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2951,8 +2951,21 @@ func needToOverwriteColCharset(options []*ast.TableOption) bool { return false } +// resolveAlterTableAddColumns splits "add columns" to multiple spec. For example, +// `ALTER TABLE ADD COLUMN (c1 INT, c2 INT)` is split into +// `ALTER TABLE ADD COLUMN c1 INT, ADD COLUMN c2 INT`. +func resolveAlterTableAddColumns(spec *ast.AlterTableSpec) []*ast.AlterTableSpec { + specs := make([]*ast.AlterTableSpec, len(spec.NewColumns)) + for i, col := range spec.NewColumns { + t := *spec + t.NewColumns = []*ast.ColumnDef{col} + specs[i] = &t + } + return specs +} + // resolveAlterTableSpec resolves alter table algorithm and removes ignore table spec in specs. -// returns valied specs, and the occurred error. +// returns valid specs, and the occurred error. func resolveAlterTableSpec(ctx sessionctx.Context, specs []*ast.AlterTableSpec) ([]*ast.AlterTableSpec, error) { validSpecs := make([]*ast.AlterTableSpec, 0, len(specs)) algorithm := ast.AlgorithmTypeDefault @@ -2964,7 +2977,11 @@ func resolveAlterTableSpec(ctx sessionctx.Context, specs []*ast.AlterTableSpec) if isIgnorableSpec(spec.Tp) { continue } - validSpecs = append(validSpecs, spec) + if spec.Tp == ast.AlterTableAddColumns && len(spec.NewColumns) > 1 { + validSpecs = append(validSpecs, resolveAlterTableAddColumns(spec)...) + } else { + validSpecs = append(validSpecs, spec) + } } // Verify whether the algorithm is supported. @@ -3045,9 +3062,10 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast } if len(validSpecs) > 1 { + continuE := false switch validSpecs[0].Tp { case ast.AlterTableAddColumns: - err = d.AddColumns(sctx, ident, validSpecs) + continuE = true case ast.AlterTableDropColumn: err = d.DropColumns(sctx, ident, validSpecs) case ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex: @@ -3058,7 +3076,9 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast if err != nil { return errors.Trace(err) } - return nil + if !continuE { + return nil + } } if len(validSpecs) > 1 { @@ -3069,11 +3089,7 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast var handledCharsetOrCollate bool switch spec.Tp { case ast.AlterTableAddColumns: - if len(spec.NewColumns) != 1 { - err = d.AddColumns(sctx, ident, []*ast.AlterTableSpec{spec}) - } else { - err = d.AddColumn(sctx, ident, spec) - } + err = d.AddColumn(sctx, ident, spec) case ast.AlterTableAddPartitions: err = d.AddTablePartitions(sctx, ident, spec) case ast.AlterTableCoalescePartitions: @@ -3524,6 +3540,10 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab if col == nil { return nil } + err = checkPosition(t.Meta(), spec.Position) + if err != nil { + return errors.Trace(err) + } job := &model.Job{ SchemaID: schema.ID, @@ -3532,15 +3552,10 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab TableName: t.Meta().Name.L, Type: model.ActionAddColumn, BinlogInfo: &model.HistoryInfo{}, - Args: []interface{}{col, spec.Position, 0}, + Args: []interface{}{col, spec.Position, 0, spec.IfNotExists}, } err = d.DoDDLJob(ctx, job) - // column exists, but if_not_exists flags is true, so we ignore this error. - if infoschema.ErrColumnExists.Equal(err) && spec.IfNotExists { - ctx.GetSessionVars().StmtCtx.AppendNote(err) - return nil - } err = d.callHookOnChanged(job, err) return errors.Trace(err) } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index b7d5341fac9f1..883b0418ec63f 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -295,7 +295,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { job.Version = currentVersion job.StartTS = txn.StartTS() job.ID = ids[i] - job.State = model.JobStateQueueing + setJobStateToQueueing(job) if err = buildJobDependence(t, job); err != nil { return errors.Trace(err) } @@ -335,6 +335,15 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { } } +func setJobStateToQueueing(job *model.Job) { + if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil { + for _, sub := range job.MultiSchemaInfo.SubJobs { + sub.State = model.JobStateQueueing + } + } + job.State = model.JobStateQueueing +} + // getHistoryDDLJob gets a DDL job with job's ID from history queue. func (d *ddl) getHistoryDDLJob(id int64) (*model.Job, error) { se, err := d.sessPool.get() @@ -932,6 +941,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, ver, err = onAlterCacheTable(d, t, job) case model.ActionAlterNoCacheTable: ver, err = onAlterNoCacheTable(d, t, job) + case model.ActionMultiSchemaChange: + ver, err = onMultiSchemaChange(w, d, t, job) default: // Invalid job, cancel it. job.State = model.JobStateCancelled diff --git a/ddl/index.go b/ddl/index.go index 467f84190eb34..9296d2507275c 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -470,7 +470,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo if len(hiddenCols) > 0 { pos := &ast.ColumnPosition{Tp: ast.ColumnPositionNone} for _, hiddenCol := range hiddenCols { - _, _, _, err = createColumnInfo(tblInfo, hiddenCol, pos) + _, _, _, err = createColumnInfoWithPosCheck(tblInfo, hiddenCol, pos) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index 414d2f1484909..02582bcae9d91 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -15,8 +15,11 @@ package ddl import ( + "sync" + "github.com/pingcap/errors" ddlutil "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/terror" @@ -56,18 +59,191 @@ func (d *ddl) MultiSchemaChange(ctx sessionctx.Context, ti ast.Ident) error { return d.callHookOnChanged(job, err) } -func checkMultiSchemaInfo(info *model.MultiSchemaInfo, t table.Table) error { - err := checkOperateSameColAndIdx(info) - if err != nil { - return err +func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { + if job.MultiSchemaInfo.Revertible { + // Handle the rolling back job. + if job.IsRollingback() { + // Rollback/cancel the sub-jobs in reverse order. + for i := len(job.MultiSchemaInfo.SubJobs) - 1; i >= 0; i-- { + sub := job.MultiSchemaInfo.SubJobs[i] + if isFinished(sub) { + continue + } + proxyJob := cloneFromSubJob(job, sub) + ver, err = w.runDDLJob(d, t, proxyJob) + mergeBackToSubJob(proxyJob, sub) + if i == 0 && isFinished(sub) { + job.State = model.JobStateRollbackDone + } + return ver, err + } + // The last rollback/cancelling sub-job is done. + job.State = model.JobStateRollbackDone + return ver, nil + } + + // The sub-jobs are normally running. + // Run the first executable sub-job. + for _, sub := range job.MultiSchemaInfo.SubJobs { + if !sub.Revertible || isFinished(sub) { + // Skip the sub-jobs which related schema states + // are in the last revertible point. + // If a sub job is finished here, it should be a noop job. + continue + } + proxyJob := cloneFromSubJob(job, sub) + ver, err = w.runDDLJob(d, t, proxyJob) + mergeBackToSubJob(proxyJob, sub) + handleRevertibleException(job, sub, proxyJob.Error) + return ver, err + } + + // Save tblInfo and subJobs for rollback + tblInfo, _ := t.GetTable(job.SchemaID, job.TableID) + subJobs := make([]model.SubJob, len(job.MultiSchemaInfo.SubJobs)) + // Step the sub-jobs to the non-revertible states all at once. + for i, sub := range job.MultiSchemaInfo.SubJobs { + if isFinished(sub) { + continue + } + subJobs[i] = *sub + proxyJob := cloneFromSubJob(job, sub) + ver, err = w.runDDLJob(d, t, proxyJob) + mergeBackToSubJob(proxyJob, sub) + if err != nil || proxyJob.Error != nil { + for j := i - 1; j >= 0; j-- { + job.MultiSchemaInfo.SubJobs[j] = &subJobs[j] + } + handleRevertibleException(job, sub, proxyJob.Error) + return updateVersionAndTableInfo(d, t, job, tblInfo, true) + } + } + // All the sub-jobs are non-revertible. + job.MultiSchemaInfo.Revertible = false + return ver, err + } + // Run the rest non-revertible sub-jobs one by one. + for _, sub := range job.MultiSchemaInfo.SubJobs { + if isFinished(sub) { + continue + } + proxyJob := cloneFromSubJob(job, sub) + ver, err = w.runDDLJob(d, t, proxyJob) + mergeBackToSubJob(proxyJob, sub) + return ver, err } + job.State = model.JobStateDone + return ver, err +} - err = checkVisibleColumnCnt(t, len(info.AddColumns), len(info.DropColumns)) +func isFinished(job *model.SubJob) bool { + return job.State == model.JobStateDone || + job.State == model.JobStateRollbackDone || + job.State == model.JobStateCancelled +} + +func cloneFromSubJob(job *model.Job, sub *model.SubJob) *model.Job { + return &model.Job{ + ID: job.ID, + Type: sub.Type, + SchemaID: job.SchemaID, + TableID: job.TableID, + SchemaName: job.SchemaName, + State: sub.State, + Warning: sub.Warning, + Error: nil, + ErrorCount: 0, + RowCount: sub.RowCount, + Mu: sync.Mutex{}, + CtxVars: sub.CtxVars, + Args: sub.Args, + RawArgs: sub.RawArgs, + SchemaState: sub.SchemaState, + SnapshotVer: sub.SnapshotVer, + RealStartTS: job.RealStartTS, + StartTS: job.StartTS, + DependencyID: job.DependencyID, + Query: job.Query, + BinlogInfo: job.BinlogInfo, + Version: job.Version, + ReorgMeta: job.ReorgMeta, + MultiSchemaInfo: &model.MultiSchemaInfo{Revertible: sub.Revertible}, + Priority: job.Priority, + SeqNum: job.SeqNum, + } +} + +func mergeBackToSubJob(job *model.Job, sub *model.SubJob) { + sub.Revertible = job.MultiSchemaInfo.Revertible + sub.SchemaState = job.SchemaState + sub.SnapshotVer = job.SnapshotVer + sub.Args = job.Args + sub.State = job.State + sub.Warning = job.Warning + sub.RowCount = job.RowCount +} + +func handleRevertibleException(job *model.Job, subJob *model.SubJob, err *terror.Error) { + if !isAbnormal(subJob) { + return + } + job.State = model.JobStateRollingback + job.Error = err + // Flush the cancelling state and cancelled state to sub-jobs. + for _, sub := range job.MultiSchemaInfo.SubJobs { + switch sub.State { + case model.JobStateCancelled: + if !sub.Revertible { + sub.State = model.JobStateCancelling + } + case model.JobStateRunning: + sub.State = model.JobStateCancelling + case model.JobStateNone, model.JobStateQueueing: + sub.State = model.JobStateCancelled + } + } +} + +func isAbnormal(job *model.SubJob) bool { + return job.State == model.JobStateCancelling || + job.State == model.JobStateCancelled || + job.State == model.JobStateRollingback || + job.State == model.JobStateRollbackDone +} + +func appendToSubJobs(m *model.MultiSchemaInfo, job *model.Job) error { + err := fillMultiSchemaInfo(m, job) if err != nil { return err } + m.SubJobs = append(m.SubJobs, &model.SubJob{ + Type: job.Type, + Args: job.Args, + RawArgs: job.RawArgs, + SchemaState: job.SchemaState, + SnapshotVer: job.SnapshotVer, + Revertible: true, + CtxVars: job.CtxVars, + }) + return nil +} - return checkAddColumnTooManyColumns(len(t.Cols()) + len(info.AddColumns) - len(info.DropColumns)) +func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error) { + switch job.Type { + case model.ActionAddColumn: + col := job.Args[0].(*table.Column) + pos := job.Args[1].(*ast.ColumnPosition) + info.AddColumns = append(info.AddColumns, col.Name) + for colName := range col.Dependences { + info.RelativeColumns = append(info.RelativeColumns, model.CIStr{L: colName, O: colName}) + } + if pos != nil && pos.Tp == ast.ColumnPositionAfter { + info.PositionColumns = append(info.PositionColumns, pos.RelativeColumn.Name) + } + default: + return dbterror.ErrRunMultiSchemaChanges + } + return nil } func checkOperateSameColAndIdx(info *model.MultiSchemaInfo) error { @@ -106,12 +282,15 @@ func checkOperateSameColAndIdx(info *model.MultiSchemaInfo) error { if err := checkColumns(info.DropColumns, true); err != nil { return err } - if err := checkColumns(info.RelativeColumns, false); err != nil { + if err := checkColumns(info.PositionColumns, false); err != nil { return err } if err := checkColumns(info.ModifyColumns, true); err != nil { return err } + if err := checkColumns(info.RelativeColumns, false); err != nil { + return err + } if err := checkIndexes(info.AddIndexes, true); err != nil { return err @@ -121,3 +300,28 @@ func checkOperateSameColAndIdx(info *model.MultiSchemaInfo) error { } return checkIndexes(info.AlterIndexes, true) } + +func checkMultiSchemaInfo(info *model.MultiSchemaInfo, t table.Table) error { + err := checkOperateSameColAndIdx(info) + if err != nil { + return err + } + + err = checkVisibleColumnCnt(t, len(info.AddColumns), len(info.DropColumns)) + if err != nil { + return err + } + + return checkAddColumnTooManyColumns(len(t.Cols()) + len(info.AddColumns) - len(info.DropColumns)) +} + +func appendMultiChangeWarningsToOwnerCtx(ctx sessionctx.Context, job *model.Job) { + if job.MultiSchemaInfo == nil || job.Type != model.ActionMultiSchemaChange { + return + } + for _, sub := range job.MultiSchemaInfo.SubJobs { + if sub.Warning != nil { + ctx.GetSessionVars().StmtCtx.AppendNote(sub.Warning) + } + } +} diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go new file mode 100644 index 0000000000000..9e3cae966c43f --- /dev/null +++ b/ddl/multi_schema_change_test.go @@ -0,0 +1,100 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl_test + +import ( + "testing" + + "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/testkit" +) + +func TestMultiSchemaChangeAddColumns(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@global.tidb_enable_change_multi_schema = 1") + + // Test add multiple columns in multiple specs. + tk.MustExec("create table t (a int);") + tk.MustExec("insert into t values (1);") + tk.MustExec("alter table t add column b int default 2, add column c int default 3;") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3")) + + // Test add multiple columns in one spec. + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int);") + tk.MustExec("insert into t values (1);") + tk.MustExec("alter table t add column (b int default 2, c int default 3);") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3")) + + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int, b int, c int);") + tk.MustExec("insert into t values (1, 2, 3);") + tk.MustExec("alter table t add column (d int default 4, e int default 5);") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3 4 5")) + + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int default 1);") + tk.MustExec("insert into t values ();") + tk.MustExec("alter table t add column if not exists (b int default 2, c int default 3);") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3")) + tk.MustExec("alter table t add column if not exists (c int default 3, d int default 4);") + tk.MustQuery("show warnings;").Check(testkit.Rows("Note 1060 Duplicate column name 'c'")) + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3 4")) + + // Test referencing previous column in multi-schema change is not supported. + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int);") + tk.MustGetErrCode("alter table t add column b int after a, add column c int after b", errno.ErrBadField) + tk.MustGetErrCode("alter table t add column c int after b, add column b int", errno.ErrBadField) + + // Test add multiple columns with different position. + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int, b int, c int);") + tk.MustExec("insert into t values (1, 2, 3);") + tk.MustExec(`alter table t + add column d int default 4 first, + add column e int default 5 after b, + add column f int default 6 after b;`) + tk.MustQuery("select * from t;").Check(testkit.Rows("4 1 2 6 5 3")) + + // Test [if not exists] for adding columns. + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int default 1);") + tk.MustExec("insert into t values ();") + tk.MustExec("alter table t add column b int default 2, add column if not exists a int;") + tk.MustQuery("show warnings;").Check(testkit.Rows("Note 1060 Duplicate column name 'a'")) + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2")) + + // Test add generate column + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(a int, b int);") + tk.MustExec("insert into t values (1, 2);") + tk.MustExec("alter table t add column c double default 3.0, add column d double as (a + b);") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3 3")) + + // Test add columns with same name + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int default 1, c int default 4);") + tk.MustGetErrCode("alter table t add column b int default 2, add column b int default 3", errno.ErrUnsupportedDDLOperation) + + // Test add generate column dependents on a modifying column + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(a int, b int);") + tk.MustExec("insert into t values (1, 2);") + tk.MustGetErrCode("alter table t modify column b double, add column c double as (a + b);", errno.ErrUnsupportedDDLOperation) +} diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 1769d3b526d7f..354075729997d 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -270,6 +270,7 @@ type MultiSchemaInfo struct { AlterIndexes []CIStr `json:"-"` RelativeColumns []CIStr `json:"-"` + PositionColumns []CIStr `json:"-"` } func NewMultiSchemaInfo() *MultiSchemaInfo { @@ -301,6 +302,7 @@ type Job struct { SchemaName string `json:"schema_name"` TableName string `json:"table_name"` State JobState `json:"state"` + Warning *terror.Error `json:"warning"` Error *terror.Error `json:"err"` // ErrorCount will be increased, every time we meet an error when running job. ErrorCount int64 `json:"err_count"` @@ -370,6 +372,14 @@ func (job *Job) FinishDBJob(jobState JobState, schemaState SchemaState, ver int6 job.BinlogInfo.AddDBInfo(ver, dbInfo) } +// MarkNonRevertible mark the current job to be non-revertible. +// It means the job cannot be cancelled or rollbacked. +func (job *Job) MarkNonRevertible() { + if job.MultiSchemaInfo != nil { + job.MultiSchemaInfo.Revertible = false + } +} + // TSConvert2Time converts timestamp to time. func TSConvert2Time(ts uint64) time.Time { t := int64(ts >> 18) // 18 is for the logical time. @@ -412,6 +422,18 @@ func (job *Job) Encode(updateRawArgs bool) ([]byte, error) { if err != nil { return nil, errors.Trace(err) } + if job.MultiSchemaInfo != nil { + for _, sub := range job.MultiSchemaInfo.SubJobs { + // Only update the args of last executing sub-job. + if sub.Args == nil { + continue + } + sub.RawArgs, err = json.Marshal(sub.Args) + if err != nil { + return nil, errors.Trace(err) + } + } + } } var b []byte From 29e9a22cf4e095adb31141c294be346999fe527f Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 16 Jun 2022 14:53:04 +0800 Subject: [PATCH 02/12] fix integration test --- ddl/column_change_test.go | 8 +++++++- ddl/db_change_test.go | 14 +++++++++++--- ddl/db_integration_test.go | 7 +++---- ddl/ddl_worker.go | 23 ++++++++++++++++------- ddl/index_modify_test.go | 4 ++-- ddl/multi_schema_change.go | 14 ++++++++++---- 6 files changed, 49 insertions(+), 21 deletions(-) diff --git a/ddl/column_change_test.go b/ddl/column_change_test.go index e1f206514366a..a6de4bc964d2f 100644 --- a/ddl/column_change_test.go +++ b/ddl/column_change_test.go @@ -417,7 +417,13 @@ func testCheckJobDone(t *testing.T, store kv.Storage, jobID int64, isAdd bool) { require.NoError(t, err) require.Equal(t, historyJob.State, model.JobStateSynced) if isAdd { - require.Equal(t, historyJob.SchemaState, model.StatePublic) + if historyJob.Type == model.ActionMultiSchemaChange { + for _, sub := range historyJob.MultiSchemaInfo.SubJobs { + require.Equal(t, sub.SchemaState, model.StatePublic) + } + } else { + require.Equal(t, historyJob.SchemaState, model.StatePublic) + } } else { require.Equal(t, historyJob.SchemaState, model.StateNone) } diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index 2ac1d1974eea2..eacd35ac119f1 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -837,11 +837,11 @@ func runTestInSchemaState( _, err = se.Execute(context.Background(), "use test_db_state") require.NoError(t, err) cbFunc := func(job *model.Job) { - if job.SchemaState == prevState || checkErr != nil { + if currentSchemaState(job) == prevState || checkErr != nil { return } - prevState = job.SchemaState - if job.SchemaState != state { + prevState = currentSchemaState(job) + if prevState != state { return } for _, sqlWithErr := range sqlWithErrs { @@ -877,6 +877,14 @@ func runTestInSchemaState( } } +func currentSchemaState(job *model.Job) model.SchemaState { + if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil { + subs := job.MultiSchemaInfo.SubJobs + return subs[len(subs)-1].SchemaState + } + return job.SchemaState +} + func TestShowIndex(t *testing.T) { store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 200*time.Millisecond) defer clean() diff --git a/ddl/db_integration_test.go b/ddl/db_integration_test.go index fff7e16f8a33c..d1caa46f2efbd 100644 --- a/ddl/db_integration_test.go +++ b/ddl/db_integration_test.go @@ -401,7 +401,7 @@ func TestIssue5092(t *testing.T) { ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) // The following two statements are consistent with MariaDB. tk.MustGetErrCode("alter table t_issue_5092 add column if not exists d int, add column d int", errno.ErrDupFieldName) - tk.MustExec("alter table t_issue_5092 add column dd int, add column if not exists dd int") + tk.MustGetErrCode("alter table t_issue_5092 add column dd int, add column if not exists dd int", errno.ErrUnsupportedDDLOperation) tk.MustExec("alter table t_issue_5092 add column if not exists (d int, e int), add column ff text") tk.MustExec("alter table t_issue_5092 add column b2 int after b1, add column c2 int first") tk.MustQuery("show create table t_issue_5092").Check(testkit.Rows("t_issue_5092 CREATE TABLE `t_issue_5092` (\n" + @@ -417,7 +417,6 @@ func TestIssue5092(t *testing.T) { " `c1` int(11) DEFAULT NULL,\n" + " `f` int(11) DEFAULT NULL,\n" + " `g` int(11) DEFAULT NULL,\n" + - " `dd` int(11) DEFAULT NULL,\n" + " `ff` text DEFAULT NULL\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) tk.MustExec("drop table t_issue_5092") @@ -451,8 +450,8 @@ func TestIssue5092(t *testing.T) { tk.MustExec("create table t_issue_5092 (a int)") tk.MustExec("alter table t_issue_5092 add column (b int, c int)") tk.MustGetErrCode("alter table t_issue_5092 drop column if exists a, drop column b, drop column c", errno.ErrCantRemoveAllFields) - tk.MustGetErrCode("alter table t_issue_5092 drop column if exists c, drop column c", errno.ErrCantDropFieldOrKey) - tk.MustExec("alter table t_issue_5092 drop column c, drop column if exists c") + tk.MustGetErrCode("alter table t_issue_5092 drop column if exists c, drop column c", errno.ErrUnsupportedDDLOperation) + tk.MustGetErrCode("alter table t_issue_5092 drop column c, drop column if exists c", errno.ErrUnsupportedDDLOperation) tk.MustExec("drop table t_issue_5092") } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index bd87dd8c71f76..50362923488a8 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -304,13 +304,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { if job.MayNeedReorg() { jobListKey = meta.AddIndexJobListKey } - failpoint.Inject("MockModifyJobArg", func(val failpoint.Value) { - if val.(bool) { - if len(job.Args) > 0 { - job.Args[0] = 1 - } - } - }) + injectModifyJobArgFailPoint(job) if err = t.EnQueueDDLJob(job, jobListKey); err != nil { return errors.Trace(err) } @@ -336,6 +330,21 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { } } +func injectModifyJobArgFailPoint(job *model.Job) { + failpoint.Inject("MockModifyJobArg", func(val failpoint.Value) { + if val.(bool) { + // Corrupt the DDL job argument. + if job.Type == model.ActionMultiSchemaChange { + if len(job.MultiSchemaInfo.SubJobs) > 0 && len(job.MultiSchemaInfo.SubJobs[0].Args) > 0 { + job.MultiSchemaInfo.SubJobs[0].Args[0] = 1 + } + } else if len(job.Args) > 0 { + job.Args[0] = 1 + } + } + }) +} + func setJobStateToQueueing(job *model.Job) { if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil { for _, sub := range job.MultiSchemaInfo.SubJobs { diff --git a/ddl/index_modify_test.go b/ddl/index_modify_test.go index 39950e424e1db..3ae2ae16482e0 100644 --- a/ddl/index_modify_test.go +++ b/ddl/index_modify_test.go @@ -882,7 +882,7 @@ func testDropIndexesIfExists(t *testing.T, store kv.Storage) { "[ddl:1091]index i3 doesn't exist", ) tk.MustExec("alter table test_drop_indexes_if_exists drop index i1, drop index if exists i3;") - tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Warning|1091|index i3 doesn't exist")) + tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Note|1091|index i3 doesn't exist")) // Verify the impact of deletion order when dropping duplicate indexes. tk.MustGetErrMsg( @@ -894,7 +894,7 @@ func testDropIndexesIfExists(t *testing.T, store kv.Storage) { "[ddl:1091]index i2 doesn't exist", ) tk.MustExec("alter table test_drop_indexes_if_exists drop index i2, drop index if exists i2;") - tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Warning|1091|index i2 doesn't exist")) + tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Note|1091|index i2 doesn't exist")) } func testDropIndexesFromPartitionedTable(t *testing.T, store kv.Storage) { diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index 02582bcae9d91..d89702b64965f 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -316,12 +316,18 @@ func checkMultiSchemaInfo(info *model.MultiSchemaInfo, t table.Table) error { } func appendMultiChangeWarningsToOwnerCtx(ctx sessionctx.Context, job *model.Job) { - if job.MultiSchemaInfo == nil || job.Type != model.ActionMultiSchemaChange { + if job.MultiSchemaInfo == nil { return } - for _, sub := range job.MultiSchemaInfo.SubJobs { - if sub.Warning != nil { - ctx.GetSessionVars().StmtCtx.AppendNote(sub.Warning) + if job.Type == model.ActionMultiSchemaChange { + for _, sub := range job.MultiSchemaInfo.SubJobs { + if sub.Warning != nil { + ctx.GetSessionVars().StmtCtx.AppendNote(sub.Warning) + } } } + for _, w := range job.MultiSchemaInfo.Warnings { + ctx.GetSessionVars().StmtCtx.AppendNote(w) + } + } From acae4183e32f502628618964924e638a1eec7591 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 16 Jun 2022 16:21:11 +0800 Subject: [PATCH 03/12] fix integration test --- ddl/db_integration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/db_integration_test.go b/ddl/db_integration_test.go index d1caa46f2efbd..cc4966d60598f 100644 --- a/ddl/db_integration_test.go +++ b/ddl/db_integration_test.go @@ -450,8 +450,8 @@ func TestIssue5092(t *testing.T) { tk.MustExec("create table t_issue_5092 (a int)") tk.MustExec("alter table t_issue_5092 add column (b int, c int)") tk.MustGetErrCode("alter table t_issue_5092 drop column if exists a, drop column b, drop column c", errno.ErrCantRemoveAllFields) - tk.MustGetErrCode("alter table t_issue_5092 drop column if exists c, drop column c", errno.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t_issue_5092 drop column c, drop column if exists c", errno.ErrUnsupportedDDLOperation) + tk.MustGetErrCode("alter table t_issue_5092 drop column if exists c, drop column c", errno.ErrCantDropFieldOrKey) + tk.MustExec("alter table t_issue_5092 drop column c, drop column if exists c") tk.MustExec("drop table t_issue_5092") } From 8a45600a509cbe78f11fa1876ce0ee45395c2a2d Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 17 Jun 2022 16:13:42 +0800 Subject: [PATCH 04/12] rename and add comments --- ddl/column.go | 12 +++++++----- ddl/db_change_test.go | 6 +++--- ddl/ddl_api.go | 2 +- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index bec9f884620c4..e3c16eda49603 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -115,7 +115,7 @@ func createColumnInfoWithPosCheck(tblInfo *model.TableInfo, colInfo *model.Colum return colInfo, pos, offset, nil } -func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo) *model.ColumnInfo { +func initColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo) *model.ColumnInfo { cols := tblInfo.Columns colInfo.ID = allocateColumnID(tblInfo) colInfo.State = model.StateNone @@ -154,7 +154,7 @@ func checkAddColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.Colu } } - err = checkPosition(tblInfo, pos) + err = checkAfterPositionExists(tblInfo, pos) if err != nil { job.State = model.JobStateCancelled return nil, nil, nil, nil, false, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name) @@ -179,7 +179,7 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) } }) - tblInfo, columnInfo, col, pos, ifNotExists, err := checkAddColumn(t, job) + tblInfo, columnInfo, colFromArgs, pos, ifNotExists, err := checkAddColumn(t, job) if err != nil { if ifNotExists && infoschema.ErrColumnExists.Equal(err) { job.Warning = toTError(err) @@ -189,7 +189,7 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) return ver, errors.Trace(err) } if columnInfo == nil { - columnInfo = createColumnInfo(tblInfo, col) + columnInfo = initColumnInfo(tblInfo, colFromArgs) logutil.BgLogger().Info("[ddl] run add column job", zap.String("job", job.String()), zap.Reflect("columnInfo", *columnInfo)) if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil { job.State = model.JobStateCancelled @@ -300,7 +300,9 @@ func setColumnsState(columnInfos []*model.ColumnInfo, state model.SchemaState) { } } -func checkPosition(tblInfo *model.TableInfo, pos *ast.ColumnPosition) error { +// checkAfterPositionExists makes sure the column specified in AFTER clause is exists. +// For example, ALTER TABLE t ADD COLUMN c3 INT AFTER c1. +func checkAfterPositionExists(tblInfo *model.TableInfo, pos *ast.ColumnPosition) error { if pos != nil && pos.Tp == ast.ColumnPositionAfter { c := model.FindColumnInfo(tblInfo.Columns, pos.RelativeColumn.Name.L) if c == nil { diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index eacd35ac119f1..941554dfd44a1 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -837,10 +837,10 @@ func runTestInSchemaState( _, err = se.Execute(context.Background(), "use test_db_state") require.NoError(t, err) cbFunc := func(job *model.Job) { - if currentSchemaState(job) == prevState || checkErr != nil { + if jobStateOrLastSubJobState(job) == prevState || checkErr != nil { return } - prevState = currentSchemaState(job) + prevState = jobStateOrLastSubJobState(job) if prevState != state { return } @@ -877,7 +877,7 @@ func runTestInSchemaState( } } -func currentSchemaState(job *model.Job) model.SchemaState { +func jobStateOrLastSubJobState(job *model.Job) model.SchemaState { if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil { subs := job.MultiSchemaInfo.SubJobs return subs[len(subs)-1].SchemaState diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 65d073fd0db83..27ff77bba3709 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -3540,7 +3540,7 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab if col == nil { return nil } - err = checkPosition(t.Meta(), spec.Position) + err = checkAfterPositionExists(t.Meta(), spec.Position) if err != nil { return errors.Trace(err) } From 31e13d42f2df2a11d645461da71224e3c5a95a85 Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 17 Jun 2022 17:06:35 +0800 Subject: [PATCH 05/12] add a comment to model.Job --- parser/model/ddl.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 354075729997d..b4e7b866c77af 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -344,6 +344,9 @@ type Job struct { // SeqNum is the total order in all DDLs, it's used to identify the order of DDL. SeqNum uint64 `json:"seq_num"` + + // NOTE: To add a field, please make sure that the following methods work as expected: + // - ddl.cloneFromSubJob } // FinishTableJob is called when a job is finished. @@ -425,12 +428,12 @@ func (job *Job) Encode(updateRawArgs bool) ([]byte, error) { if job.MultiSchemaInfo != nil { for _, sub := range job.MultiSchemaInfo.SubJobs { // Only update the args of last executing sub-job. - if sub.Args == nil { - continue - } - sub.RawArgs, err = json.Marshal(sub.Args) - if err != nil { - return nil, errors.Trace(err) + if sub.Args != nil { + sub.RawArgs, err = json.Marshal(sub.Args) + if err != nil { + return nil, errors.Trace(err) + } + break } } } From 306465d6dcb3df7c0f50c178cb62e98d714cba29 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 20 Jun 2022 14:56:35 +0800 Subject: [PATCH 06/12] add rollback handling to fix integration test --- ddl/cancel_test.go | 10 +++++----- ddl/column.go | 4 ++-- ddl/multi_schema_change.go | 22 ++++++++++++++++++++++ ddl/rollingback.go | 2 ++ parser/model/ddl.go | 16 +++++++++------- 5 files changed, 40 insertions(+), 14 deletions(-) diff --git a/ddl/cancel_test.go b/ddl/cancel_test.go index cca3047083a99..9c53952534bb3 100644 --- a/ddl/cancel_test.go +++ b/ddl/cancel_test.go @@ -170,11 +170,11 @@ var allTestCase = []testCancelJob{ {"alter table t_partition truncate partition p3", true, model.StateNone, true, false, nil}, {"alter table t_partition truncate partition p3", false, model.StatePublic, false, true, nil}, // Add columns. - {"alter table t add column c41 bigint, add column c42 bigint", true, model.StateNone, true, false, nil}, - {"alter table t add column c41 bigint, add column c42 bigint", true, model.StateDeleteOnly, true, true, nil}, - {"alter table t add column c41 bigint, add column c42 bigint", true, model.StateWriteOnly, true, true, nil}, - {"alter table t add column c41 bigint, add column c42 bigint", true, model.StateWriteReorganization, true, true, nil}, - {"alter table t add column c41 bigint, add column c42 bigint", false, model.StatePublic, false, true, nil}, + {"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateNone, model.StateNone}, true, false, nil}, + {"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateDeleteOnly, model.StateNone}, true, true, nil}, + {"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateWriteOnly, model.StateNone}, true, true, nil}, + {"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateWriteReorganization, model.StateNone}, true, true, nil}, + {"alter table t add column c41 bigint, add column c42 bigint", false, subStates{model.StatePublic, model.StatePublic}, false, true, nil}, // Drop columns. // TODO: fix schema state. {"alter table t drop column c41, drop column c42", true, model.StateNone, true, false, nil}, diff --git a/ddl/column.go b/ddl/column.go index e3c16eda49603..356da44109ee5 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -115,7 +115,7 @@ func createColumnInfoWithPosCheck(tblInfo *model.TableInfo, colInfo *model.Colum return colInfo, pos, offset, nil } -func initColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo) *model.ColumnInfo { +func initAndAddColumnToTable(tblInfo *model.TableInfo, colInfo *model.ColumnInfo) *model.ColumnInfo { cols := tblInfo.Columns colInfo.ID = allocateColumnID(tblInfo) colInfo.State = model.StateNone @@ -189,7 +189,7 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) return ver, errors.Trace(err) } if columnInfo == nil { - columnInfo = initColumnInfo(tblInfo, colFromArgs) + columnInfo = initAndAddColumnToTable(tblInfo, colFromArgs) logutil.BgLogger().Info("[ddl] run add column job", zap.String("job", job.String()), zap.Reflect("columnInfo", *columnInfo)) if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil { job.State = model.JobStateCancelled diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index d89702b64965f..1fcb9b7b7af62 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -331,3 +331,25 @@ func appendMultiChangeWarningsToOwnerCtx(ctx sessionctx.Context, job *model.Job) } } + +// rollingBackMultiSchemaChange updates a multi-schema change job +// from cancelling state to rollingback state. +func rollingBackMultiSchemaChange(job *model.Job) error { + if !job.MultiSchemaInfo.Revertible { + // Cannot rolling back because the jobs are non-revertible. + // Resume the job state to running. + job.State = model.JobStateRunning + return nil + } + // Mark all the jobs to cancelling. + for _, sub := range job.MultiSchemaInfo.SubJobs { + switch sub.State { + case model.JobStateRunning: + sub.State = model.JobStateCancelling + case model.JobStateNone, model.JobStateQueueing: + sub.State = model.JobStateCancelled + } + } + job.State = model.JobStateRollingback + return dbterror.ErrCancelledDDLJob +} diff --git a/ddl/rollingback.go b/ddl/rollingback.go index 659cf146e62ba..b1ea2da7c7592 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -481,6 +481,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) model.ActionModifyTableAutoIdCache, model.ActionAlterIndexVisibility, model.ActionExchangeTablePartition, model.ActionModifySchemaDefaultPlacement: ver, err = cancelOnlyNotHandledJob(job, model.StateNone) + case model.ActionMultiSchemaChange: + err = rollingBackMultiSchemaChange(job) default: job.State = model.JobStateCancelled err = dbterror.ErrCancelledDDLJob diff --git a/parser/model/ddl.go b/parser/model/ddl.go index b4e7b866c77af..1d400a670192b 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -427,13 +427,13 @@ func (job *Job) Encode(updateRawArgs bool) ([]byte, error) { } if job.MultiSchemaInfo != nil { for _, sub := range job.MultiSchemaInfo.SubJobs { - // Only update the args of last executing sub-job. - if sub.Args != nil { - sub.RawArgs, err = json.Marshal(sub.Args) - if err != nil { - return nil, errors.Trace(err) - } - break + // Only update the args of executing sub-jobs. + if sub.Args == nil { + continue + } + sub.RawArgs, err = json.Marshal(sub.Args) + if err != nil { + return nil, errors.Trace(err) } } } @@ -611,6 +611,8 @@ func (job *Job) IsRollbackable() bool { ActionModifySchemaCharsetAndCollate, ActionRepairTable, ActionModifyTableAutoIdCache, ActionModifySchemaDefaultPlacement: return job.SchemaState == StateNone + case ActionMultiSchemaChange: + return job.MultiSchemaInfo.Revertible } return true } From 91968e46b9a8ca8f98783b7f27197a6d6e94a048 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 20 Jun 2022 21:21:11 +0800 Subject: [PATCH 07/12] address comment --- ddl/multi_schema_change.go | 86 +++++++------------------------------- parser/model/ddl.go | 60 ++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 71 deletions(-) diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index 1fcb9b7b7af62..35c09626c0089 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -15,8 +15,6 @@ package ddl import ( - "sync" - "github.com/pingcap/errors" ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/meta" @@ -66,13 +64,13 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve // Rollback/cancel the sub-jobs in reverse order. for i := len(job.MultiSchemaInfo.SubJobs) - 1; i >= 0; i-- { sub := job.MultiSchemaInfo.SubJobs[i] - if isFinished(sub) { + if sub.IsFinished() { continue } - proxyJob := cloneFromSubJob(job, sub) + proxyJob := sub.ToProxyJob(job) ver, err = w.runDDLJob(d, t, proxyJob) - mergeBackToSubJob(proxyJob, sub) - if i == 0 && isFinished(sub) { + sub.FromProxyJob(proxyJob) + if i == 0 && sub.IsFinished() { job.State = model.JobStateRollbackDone } return ver, err @@ -85,31 +83,31 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve // The sub-jobs are normally running. // Run the first executable sub-job. for _, sub := range job.MultiSchemaInfo.SubJobs { - if !sub.Revertible || isFinished(sub) { + if !sub.Revertible || sub.IsFinished() { // Skip the sub-jobs which related schema states // are in the last revertible point. // If a sub job is finished here, it should be a noop job. continue } - proxyJob := cloneFromSubJob(job, sub) + proxyJob := sub.ToProxyJob(job) ver, err = w.runDDLJob(d, t, proxyJob) - mergeBackToSubJob(proxyJob, sub) + sub.FromProxyJob(proxyJob) handleRevertibleException(job, sub, proxyJob.Error) return ver, err } - // Save tblInfo and subJobs for rollback + // Save tblInfo and subJobs for rollback, because some DDLs update the transaction aggressively. tblInfo, _ := t.GetTable(job.SchemaID, job.TableID) subJobs := make([]model.SubJob, len(job.MultiSchemaInfo.SubJobs)) // Step the sub-jobs to the non-revertible states all at once. for i, sub := range job.MultiSchemaInfo.SubJobs { - if isFinished(sub) { + if sub.IsFinished() { continue } subJobs[i] = *sub - proxyJob := cloneFromSubJob(job, sub) + proxyJob := sub.ToProxyJob(job) ver, err = w.runDDLJob(d, t, proxyJob) - mergeBackToSubJob(proxyJob, sub) + sub.FromProxyJob(proxyJob) if err != nil || proxyJob.Error != nil { for j := i - 1; j >= 0; j-- { job.MultiSchemaInfo.SubJobs[j] = &subJobs[j] @@ -124,67 +122,20 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve } // Run the rest non-revertible sub-jobs one by one. for _, sub := range job.MultiSchemaInfo.SubJobs { - if isFinished(sub) { + if sub.IsFinished() { continue } - proxyJob := cloneFromSubJob(job, sub) + proxyJob := sub.ToProxyJob(job) ver, err = w.runDDLJob(d, t, proxyJob) - mergeBackToSubJob(proxyJob, sub) + sub.FromProxyJob(proxyJob) return ver, err } job.State = model.JobStateDone return ver, err } -func isFinished(job *model.SubJob) bool { - return job.State == model.JobStateDone || - job.State == model.JobStateRollbackDone || - job.State == model.JobStateCancelled -} - -func cloneFromSubJob(job *model.Job, sub *model.SubJob) *model.Job { - return &model.Job{ - ID: job.ID, - Type: sub.Type, - SchemaID: job.SchemaID, - TableID: job.TableID, - SchemaName: job.SchemaName, - State: sub.State, - Warning: sub.Warning, - Error: nil, - ErrorCount: 0, - RowCount: sub.RowCount, - Mu: sync.Mutex{}, - CtxVars: sub.CtxVars, - Args: sub.Args, - RawArgs: sub.RawArgs, - SchemaState: sub.SchemaState, - SnapshotVer: sub.SnapshotVer, - RealStartTS: job.RealStartTS, - StartTS: job.StartTS, - DependencyID: job.DependencyID, - Query: job.Query, - BinlogInfo: job.BinlogInfo, - Version: job.Version, - ReorgMeta: job.ReorgMeta, - MultiSchemaInfo: &model.MultiSchemaInfo{Revertible: sub.Revertible}, - Priority: job.Priority, - SeqNum: job.SeqNum, - } -} - -func mergeBackToSubJob(job *model.Job, sub *model.SubJob) { - sub.Revertible = job.MultiSchemaInfo.Revertible - sub.SchemaState = job.SchemaState - sub.SnapshotVer = job.SnapshotVer - sub.Args = job.Args - sub.State = job.State - sub.Warning = job.Warning - sub.RowCount = job.RowCount -} - func handleRevertibleException(job *model.Job, subJob *model.SubJob, err *terror.Error) { - if !isAbnormal(subJob) { + if subJob.IsNormal() { return } job.State = model.JobStateRollingback @@ -204,13 +155,6 @@ func handleRevertibleException(job *model.Job, subJob *model.SubJob, err *terror } } -func isAbnormal(job *model.SubJob) bool { - return job.State == model.JobStateCancelling || - job.State == model.JobStateCancelled || - job.State == model.JobStateRollingback || - job.State == model.JobStateRollbackDone -} - func appendToSubJobs(m *model.MultiSchemaInfo, job *model.Job) error { err := fillMultiSchemaInfo(m, job) if err != nil { diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 1d400a670192b..1462d341b3a1b 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -293,6 +293,66 @@ type SubJob struct { CtxVars []interface{} `json:"-"` } +// IsNormal returns true if the sub-job is normally running. +func (sub *SubJob) IsNormal() bool { + switch sub.State { + case JobStateCancelling, JobStateCancelled, + JobStateRollingback, JobStateRollbackDone: + return false + default: + return true + } +} + +// IsFinished returns true if the job is done. +func (sub *SubJob) IsFinished() bool { + return sub.State == JobStateDone || + sub.State == JobStateRollbackDone || + sub.State == JobStateCancelled +} + +// ToProxyJob converts a sub-job to a proxy job. +func (sub *SubJob) ToProxyJob(parentJob *Job) *Job { + return &Job{ + ID: parentJob.ID, + Type: sub.Type, + SchemaID: parentJob.SchemaID, + TableID: parentJob.TableID, + SchemaName: parentJob.SchemaName, + State: sub.State, + Warning: sub.Warning, + Error: nil, + ErrorCount: 0, + RowCount: sub.RowCount, + Mu: sync.Mutex{}, + CtxVars: sub.CtxVars, + Args: sub.Args, + RawArgs: sub.RawArgs, + SchemaState: sub.SchemaState, + SnapshotVer: sub.SnapshotVer, + RealStartTS: parentJob.RealStartTS, + StartTS: parentJob.StartTS, + DependencyID: parentJob.DependencyID, + Query: parentJob.Query, + BinlogInfo: parentJob.BinlogInfo, + Version: parentJob.Version, + ReorgMeta: parentJob.ReorgMeta, + MultiSchemaInfo: &MultiSchemaInfo{Revertible: sub.Revertible}, + Priority: parentJob.Priority, + SeqNum: parentJob.SeqNum, + } +} + +func (sub *SubJob) FromProxyJob(proxyJob *Job) { + sub.Revertible = proxyJob.MultiSchemaInfo.Revertible + sub.SchemaState = proxyJob.SchemaState + sub.SnapshotVer = proxyJob.SnapshotVer + sub.Args = proxyJob.Args + sub.State = proxyJob.State + sub.Warning = proxyJob.Warning + sub.RowCount = proxyJob.RowCount +} + // Job is for a DDL operation. type Job struct { ID int64 `json:"id"` From 7d2c42a7790e172373b2845312eb8e6b7fd1ea68 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 20 Jun 2022 21:27:05 +0800 Subject: [PATCH 08/12] add a job sizeof test --- parser/model/ddl.go | 3 --- parser/model/model_test.go | 10 ++++++++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 1462d341b3a1b..549a8119e6b33 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -404,9 +404,6 @@ type Job struct { // SeqNum is the total order in all DDLs, it's used to identify the order of DDL. SeqNum uint64 `json:"seq_num"` - - // NOTE: To add a field, please make sure that the following methods work as expected: - // - ddl.cloneFromSubJob } // FinishTableJob is called when a job is finished. diff --git a/parser/model/model_test.go b/parser/model/model_test.go index 73a6ec5e782e5..114b3a38bf5bf 100644 --- a/parser/model/model_test.go +++ b/parser/model/model_test.go @@ -18,6 +18,7 @@ import ( "fmt" "testing" "time" + "unsafe" "github.com/pingcap/tidb/parser/charset" "github.com/pingcap/tidb/parser/mysql" @@ -578,3 +579,12 @@ func TestLocation(t *testing.T) { location := time.FixedZone("UTC", loc1.Offset) require.Equal(t, nLoc, location) } + +func TestDDLJobSize(t *testing.T) { + msg := `Please make sure that the following methods work as expected: +- SubJob.FromProxyJob() +- SubJob.ToProxyJob() +` + job := Job{} + require.Equal(t, 288, int(unsafe.Sizeof(job)), msg) +} From 32880da4f51371efe05c62fdee16413212efef80 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 21 Jun 2022 14:47:33 +0800 Subject: [PATCH 09/12] address comments --- ddl/multi_schema_change.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index 35c09626c0089..fc7d3ec4ad979 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -96,7 +96,7 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve return ver, err } - // Save tblInfo and subJobs for rollback, because some DDLs update the transaction aggressively. + // Save table info and sub-jobs for rolling back. tblInfo, _ := t.GetTable(job.SchemaID, job.TableID) subJobs := make([]model.SubJob, len(job.MultiSchemaInfo.SubJobs)) // Step the sub-jobs to the non-revertible states all at once. @@ -113,6 +113,8 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve job.MultiSchemaInfo.SubJobs[j] = &subJobs[j] } handleRevertibleException(job, sub, proxyJob.Error) + // The TableInfo and sub-jobs should be restored + // because some schema changes update the transaction aggressively. return updateVersionAndTableInfo(d, t, job, tblInfo, true) } } @@ -143,10 +145,6 @@ func handleRevertibleException(job *model.Job, subJob *model.SubJob, err *terror // Flush the cancelling state and cancelled state to sub-jobs. for _, sub := range job.MultiSchemaInfo.SubJobs { switch sub.State { - case model.JobStateCancelled: - if !sub.Revertible { - sub.State = model.JobStateCancelling - } case model.JobStateRunning: sub.State = model.JobStateCancelling case model.JobStateNone, model.JobStateQueueing: From 3ca3ee1360ceb1d1507a95eadb7732eaf7d6f26f Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 21 Jun 2022 14:50:22 +0800 Subject: [PATCH 10/12] rename continuE to useMultiSchemaChange --- ddl/ddl_api.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 27ff77bba3709..ad9dca724521a 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -3062,10 +3062,10 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast } if len(validSpecs) > 1 { - continuE := false + useMultiSchemaChange := false switch validSpecs[0].Tp { case ast.AlterTableAddColumns: - continuE = true + useMultiSchemaChange = true case ast.AlterTableDropColumn: err = d.DropColumns(sctx, ident, validSpecs) case ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex: @@ -3076,7 +3076,7 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast if err != nil { return errors.Trace(err) } - if !continuE { + if !useMultiSchemaChange { return nil } } From 285ab3ae27a4d7a1b11d2da68607397cea0753b1 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 23 Jun 2022 15:00:39 +0800 Subject: [PATCH 11/12] address comment --- ddl/multi_schema_change.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index fc7d3ec4ad979..49044783a50dc 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -97,7 +97,11 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve } // Save table info and sub-jobs for rolling back. - tblInfo, _ := t.GetTable(job.SchemaID, job.TableID) + var tblInfo *model.TableInfo + tblInfo, err = t.GetTable(job.SchemaID, job.TableID) + if err != nil { + return ver, err + } subJobs := make([]model.SubJob, len(job.MultiSchemaInfo.SubJobs)) // Step the sub-jobs to the non-revertible states all at once. for i, sub := range job.MultiSchemaInfo.SubJobs { @@ -119,7 +123,7 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve } } // All the sub-jobs are non-revertible. - job.MultiSchemaInfo.Revertible = false + job.MarkNonRevertible() return ver, err } // Run the rest non-revertible sub-jobs one by one. From 9204d4bef1673be0aad6c1639f3809cb1dee4ae0 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 23 Jun 2022 15:48:47 +0800 Subject: [PATCH 12/12] remove tiny optimization to improve readability --- ddl/multi_schema_change.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index 49044783a50dc..bd0518404530a 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -70,9 +70,6 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve proxyJob := sub.ToProxyJob(job) ver, err = w.runDDLJob(d, t, proxyJob) sub.FromProxyJob(proxyJob) - if i == 0 && sub.IsFinished() { - job.State = model.JobStateRollbackDone - } return ver, err } // The last rollback/cancelling sub-job is done.