diff --git a/ddl/cancel_test.go b/ddl/cancel_test.go index cca3047083a99..9c53952534bb3 100644 --- a/ddl/cancel_test.go +++ b/ddl/cancel_test.go @@ -170,11 +170,11 @@ var allTestCase = []testCancelJob{ {"alter table t_partition truncate partition p3", true, model.StateNone, true, false, nil}, {"alter table t_partition truncate partition p3", false, model.StatePublic, false, true, nil}, // Add columns. - {"alter table t add column c41 bigint, add column c42 bigint", true, model.StateNone, true, false, nil}, - {"alter table t add column c41 bigint, add column c42 bigint", true, model.StateDeleteOnly, true, true, nil}, - {"alter table t add column c41 bigint, add column c42 bigint", true, model.StateWriteOnly, true, true, nil}, - {"alter table t add column c41 bigint, add column c42 bigint", true, model.StateWriteReorganization, true, true, nil}, - {"alter table t add column c41 bigint, add column c42 bigint", false, model.StatePublic, false, true, nil}, + {"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateNone, model.StateNone}, true, false, nil}, + {"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateDeleteOnly, model.StateNone}, true, true, nil}, + {"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateWriteOnly, model.StateNone}, true, true, nil}, + {"alter table t add column c41 bigint, add column c42 bigint", true, subStates{model.StateWriteReorganization, model.StateNone}, true, true, nil}, + {"alter table t add column c41 bigint, add column c42 bigint", false, subStates{model.StatePublic, model.StatePublic}, false, true, nil}, // Drop columns. // TODO: fix schema state. {"alter table t drop column c41, drop column c42", true, model.StateNone, true, false, nil}, diff --git a/ddl/column.go b/ddl/column.go index 0ac17d69f5bb7..6b9a00ea5b3db 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -83,7 +83,7 @@ func adjustColumnInfoInDropColumn(tblInfo *model.TableInfo, offset int) { tblInfo.Columns = newCols } -func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, *ast.ColumnPosition, int, error) { +func createColumnInfoWithPosCheck(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, *ast.ColumnPosition, int, error) { // Check column name duplicate. cols := tblInfo.Columns offset := len(cols) @@ -115,19 +115,34 @@ func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos * return colInfo, pos, offset, nil } -func checkAddColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, *model.ColumnInfo, *ast.ColumnPosition, int, error) { +func initAndAddColumnToTable(tblInfo *model.TableInfo, colInfo *model.ColumnInfo) *model.ColumnInfo { + cols := tblInfo.Columns + colInfo.ID = allocateColumnID(tblInfo) + colInfo.State = model.StateNone + // To support add column asynchronous, we should mark its offset as the last column. + // So that we can use origin column offset to get value from row. + colInfo.Offset = len(cols) + // Append the column info to the end of the tblInfo.Columns. + // It will reorder to the right offset in "Columns" when it state change to public. + tblInfo.Columns = append(cols, colInfo) + return colInfo +} + +func checkAddColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, *model.ColumnInfo, + *ast.ColumnPosition, bool /* ifNotExists */, error) { schemaID := job.SchemaID tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID) if err != nil { - return nil, nil, nil, nil, 0, errors.Trace(err) + return nil, nil, nil, nil, false, errors.Trace(err) } col := &model.ColumnInfo{} pos := &ast.ColumnPosition{} offset := 0 - err = job.DecodeArgs(col, pos, &offset) + ifNotExists := false + err = job.DecodeArgs(col, pos, &offset, &ifNotExists) if err != nil { job.State = model.JobStateCancelled - return nil, nil, nil, nil, 0, errors.Trace(err) + return nil, nil, nil, nil, false, errors.Trace(err) } columnInfo := model.FindColumnInfo(tblInfo.Columns, col.Name.L) @@ -135,10 +150,17 @@ func checkAddColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.Colu if columnInfo.State == model.StatePublic { // We already have a column with the same column name. job.State = model.JobStateCancelled - return nil, nil, nil, nil, 0, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name) + return nil, nil, nil, nil, ifNotExists, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name) } } - return tblInfo, columnInfo, col, pos, offset, nil + + err = checkAfterPositionExists(tblInfo, pos) + if err != nil { + job.State = model.JobStateCancelled + return nil, nil, nil, nil, false, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name) + } + + return tblInfo, columnInfo, col, pos, false, nil } func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { @@ -157,21 +179,18 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) } }) - tblInfo, columnInfo, col, pos, offset, err := checkAddColumn(t, job) + tblInfo, columnInfo, colFromArgs, pos, ifNotExists, err := checkAddColumn(t, job) if err != nil { + if ifNotExists && infoschema.ErrColumnExists.Equal(err) { + job.Warning = toTError(err) + job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) + return ver, nil + } return ver, errors.Trace(err) } if columnInfo == nil { - columnInfo, _, offset, err = createColumnInfo(tblInfo, col, pos) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - logutil.BgLogger().Info("[ddl] run add column job", zap.String("job", job.String()), zap.Reflect("columnInfo", *columnInfo), zap.Int("offset", offset)) - // Set offset arg to job. - if offset != 0 { - job.Args = []interface{}{columnInfo, pos, offset} - } + columnInfo = initAndAddColumnToTable(tblInfo, colFromArgs) + logutil.BgLogger().Info("[ddl] run add column job", zap.String("job", job.String()), zap.Reflect("columnInfo", *columnInfo)) if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -206,9 +225,14 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) } // Update the job state when all affairs done. job.SchemaState = model.StateWriteReorganization + job.MarkNonRevertible() case model.StateWriteReorganization: // reorganization -> public // Adjust table column offset. + offset, err := locateOffsetToMove(columnInfo.Offset, pos, tblInfo) + if err != nil { + return ver, errors.Trace(err) + } tblInfo.MoveColumnInfo(columnInfo.Offset, offset) columnInfo.State = model.StatePublic ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != columnInfo.State) @@ -276,6 +300,18 @@ func setColumnsState(columnInfos []*model.ColumnInfo, state model.SchemaState) { } } +// checkAfterPositionExists makes sure the column specified in AFTER clause is exists. +// For example, ALTER TABLE t ADD COLUMN c3 INT AFTER c1. +func checkAfterPositionExists(tblInfo *model.TableInfo, pos *ast.ColumnPosition) error { + if pos != nil && pos.Tp == ast.ColumnPositionAfter { + c := model.FindColumnInfo(tblInfo.Columns, pos.RelativeColumn.Name.L) + if c == nil { + return infoschema.ErrColumnNotExists.GenWithStackByArgs(pos.RelativeColumn, tblInfo.Name) + } + } + return nil +} + func setIndicesState(indexInfos []*model.IndexInfo, state model.SchemaState) { for _, indexInfo := range indexInfos { indexInfo.State = state @@ -308,7 +344,7 @@ func onAddColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error return ver, nil } for i := range columns { - columnInfo, pos, offset, err := createColumnInfo(tblInfo, columns[i], positions[i]) + columnInfo, pos, offset, err := createColumnInfoWithPosCheck(tblInfo, columns[i], positions[i]) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -856,7 +892,7 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in return ver, errors.Trace(err) } - _, _, _, err = createColumnInfo(tblInfo, modifyInfo.changingCol, changingColPos) + _, _, _, err = createColumnInfoWithPosCheck(tblInfo, modifyInfo.changingCol, changingColPos) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) diff --git a/ddl/column_change_test.go b/ddl/column_change_test.go index e1f206514366a..a6de4bc964d2f 100644 --- a/ddl/column_change_test.go +++ b/ddl/column_change_test.go @@ -417,7 +417,13 @@ func testCheckJobDone(t *testing.T, store kv.Storage, jobID int64, isAdd bool) { require.NoError(t, err) require.Equal(t, historyJob.State, model.JobStateSynced) if isAdd { - require.Equal(t, historyJob.SchemaState, model.StatePublic) + if historyJob.Type == model.ActionMultiSchemaChange { + for _, sub := range historyJob.MultiSchemaInfo.SubJobs { + require.Equal(t, sub.SchemaState, model.StatePublic) + } + } else { + require.Equal(t, historyJob.SchemaState, model.StatePublic) + } } else { require.Equal(t, historyJob.SchemaState, model.StateNone) } diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index e715890e67bca..2863aa2685f5a 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -837,11 +837,11 @@ func runTestInSchemaState( _, err = se.Execute(context.Background(), "use test_db_state") require.NoError(t, err) cbFunc := func(job *model.Job) { - if job.SchemaState == prevState || checkErr != nil { + if jobStateOrLastSubJobState(job) == prevState || checkErr != nil { return } - prevState = job.SchemaState - if job.SchemaState != state { + prevState = jobStateOrLastSubJobState(job) + if prevState != state { return } for _, sqlWithErr := range sqlWithErrs { @@ -877,6 +877,14 @@ func runTestInSchemaState( } } +func jobStateOrLastSubJobState(job *model.Job) model.SchemaState { + if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil { + subs := job.MultiSchemaInfo.SubJobs + return subs[len(subs)-1].SchemaState + } + return job.SchemaState +} + func TestShowIndex(t *testing.T) { store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 200*time.Millisecond) defer clean() diff --git a/ddl/db_integration_test.go b/ddl/db_integration_test.go index 23c10805724be..d44f69c29c994 100644 --- a/ddl/db_integration_test.go +++ b/ddl/db_integration_test.go @@ -401,7 +401,7 @@ func TestIssue5092(t *testing.T) { ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) // The following two statements are consistent with MariaDB. tk.MustGetErrCode("alter table t_issue_5092 add column if not exists d int, add column d int", errno.ErrDupFieldName) - tk.MustExec("alter table t_issue_5092 add column dd int, add column if not exists dd int") + tk.MustGetErrCode("alter table t_issue_5092 add column dd int, add column if not exists dd int", errno.ErrUnsupportedDDLOperation) tk.MustExec("alter table t_issue_5092 add column if not exists (d int, e int), add column ff text") tk.MustExec("alter table t_issue_5092 add column b2 int after b1, add column c2 int first") tk.MustQuery("show create table t_issue_5092").Check(testkit.Rows("t_issue_5092 CREATE TABLE `t_issue_5092` (\n" + @@ -417,7 +417,6 @@ func TestIssue5092(t *testing.T) { " `c1` int(11) DEFAULT NULL,\n" + " `f` int(11) DEFAULT NULL,\n" + " `g` int(11) DEFAULT NULL,\n" + - " `dd` int(11) DEFAULT NULL,\n" + " `ff` text DEFAULT NULL\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) tk.MustExec("drop table t_issue_5092") diff --git a/ddl/ddl.go b/ddl/ddl.go index d5336e360957d..e77a9da01e4a9 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -712,6 +712,12 @@ func setDDLJobQuery(ctx sessionctx.Context, job *model.Job) { // - context.Cancel: job has been sent to worker, but not found in history DDL job before cancel // - other: found in history DDL job and return that job error func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { + if mci := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; mci != nil { + // In multiple schema change, we don't run the job. + // Instead, we merge all the jobs into one pending job. + return appendToSubJobs(mci, job) + } + // Get a global job ID and put the DDL job in the queue. setDDLJobQuery(ctx, job) task := &limitJobTask{job, make(chan error)} @@ -786,12 +792,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { } } } - - if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 { - for _, warning := range historyJob.MultiSchemaInfo.Warnings { - ctx.GetSessionVars().StmtCtx.AppendWarning(warning) - } - } + appendMultiChangeWarningsToOwnerCtx(ctx, historyJob) logutil.BgLogger().Info("[ddl] DDL job is finished", zap.Int64("jobID", jobID)) return nil diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 0105ef79f7d0b..5a7758a36ba88 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2903,8 +2903,21 @@ func needToOverwriteColCharset(options []*ast.TableOption) bool { return false } +// resolveAlterTableAddColumns splits "add columns" to multiple spec. For example, +// `ALTER TABLE ADD COLUMN (c1 INT, c2 INT)` is split into +// `ALTER TABLE ADD COLUMN c1 INT, ADD COLUMN c2 INT`. +func resolveAlterTableAddColumns(spec *ast.AlterTableSpec) []*ast.AlterTableSpec { + specs := make([]*ast.AlterTableSpec, len(spec.NewColumns)) + for i, col := range spec.NewColumns { + t := *spec + t.NewColumns = []*ast.ColumnDef{col} + specs[i] = &t + } + return specs +} + // resolveAlterTableSpec resolves alter table algorithm and removes ignore table spec in specs. -// returns valied specs, and the occurred error. +// returns valid specs, and the occurred error. func resolveAlterTableSpec(ctx sessionctx.Context, specs []*ast.AlterTableSpec) ([]*ast.AlterTableSpec, error) { validSpecs := make([]*ast.AlterTableSpec, 0, len(specs)) algorithm := ast.AlgorithmTypeDefault @@ -2916,7 +2929,11 @@ func resolveAlterTableSpec(ctx sessionctx.Context, specs []*ast.AlterTableSpec) if isIgnorableSpec(spec.Tp) { continue } - validSpecs = append(validSpecs, spec) + if spec.Tp == ast.AlterTableAddColumns && len(spec.NewColumns) > 1 { + validSpecs = append(validSpecs, resolveAlterTableAddColumns(spec)...) + } else { + validSpecs = append(validSpecs, spec) + } } // Verify whether the algorithm is supported. @@ -2997,9 +3014,10 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast } if len(validSpecs) > 1 { + useMultiSchemaChange := false switch validSpecs[0].Tp { case ast.AlterTableAddColumns: - err = d.AddColumns(sctx, ident, validSpecs) + useMultiSchemaChange = true case ast.AlterTableDropColumn: err = d.DropColumns(sctx, ident, validSpecs) case ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex: @@ -3010,7 +3028,9 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast if err != nil { return errors.Trace(err) } - return nil + if !useMultiSchemaChange { + return nil + } } if len(validSpecs) > 1 { @@ -3021,11 +3041,7 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, ident ast var handledCharsetOrCollate bool switch spec.Tp { case ast.AlterTableAddColumns: - if len(spec.NewColumns) != 1 { - err = d.AddColumns(sctx, ident, []*ast.AlterTableSpec{spec}) - } else { - err = d.AddColumn(sctx, ident, spec) - } + err = d.AddColumn(sctx, ident, spec) case ast.AlterTableAddPartitions: err = d.AddTablePartitions(sctx, ident, spec) case ast.AlterTableCoalescePartitions: @@ -3476,6 +3492,10 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab if col == nil { return nil } + err = checkAfterPositionExists(t.Meta(), spec.Position) + if err != nil { + return errors.Trace(err) + } job := &model.Job{ SchemaID: schema.ID, @@ -3484,15 +3504,10 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab TableName: t.Meta().Name.L, Type: model.ActionAddColumn, BinlogInfo: &model.HistoryInfo{}, - Args: []interface{}{col, spec.Position, 0}, + Args: []interface{}{col, spec.Position, 0, spec.IfNotExists}, } err = d.DoDDLJob(ctx, job) - // column exists, but if_not_exists flags is true, so we ignore this error. - if infoschema.ErrColumnExists.Equal(err) && spec.IfNotExists { - ctx.GetSessionVars().StmtCtx.AppendNote(err) - return nil - } err = d.callHookOnChanged(job, err) return errors.Trace(err) } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index d12e304e844a9..50362923488a8 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -296,7 +296,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { job.Version = currentVersion job.StartTS = txn.StartTS() job.ID = ids[i] - job.State = model.JobStateQueueing + setJobStateToQueueing(job) if err = buildJobDependence(t, job); err != nil { return errors.Trace(err) } @@ -304,13 +304,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { if job.MayNeedReorg() { jobListKey = meta.AddIndexJobListKey } - failpoint.Inject("MockModifyJobArg", func(val failpoint.Value) { - if val.(bool) { - if len(job.Args) > 0 { - job.Args[0] = 1 - } - } - }) + injectModifyJobArgFailPoint(job) if err = t.EnQueueDDLJob(job, jobListKey); err != nil { return errors.Trace(err) } @@ -336,6 +330,30 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { } } +func injectModifyJobArgFailPoint(job *model.Job) { + failpoint.Inject("MockModifyJobArg", func(val failpoint.Value) { + if val.(bool) { + // Corrupt the DDL job argument. + if job.Type == model.ActionMultiSchemaChange { + if len(job.MultiSchemaInfo.SubJobs) > 0 && len(job.MultiSchemaInfo.SubJobs[0].Args) > 0 { + job.MultiSchemaInfo.SubJobs[0].Args[0] = 1 + } + } else if len(job.Args) > 0 { + job.Args[0] = 1 + } + } + }) +} + +func setJobStateToQueueing(job *model.Job) { + if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil { + for _, sub := range job.MultiSchemaInfo.SubJobs { + sub.State = model.JobStateQueueing + } + } + job.State = model.JobStateQueueing +} + // getHistoryDDLJob gets a DDL job with job's ID from history queue. func (d *ddl) getHistoryDDLJob(id int64) (*model.Job, error) { se, err := d.sessPool.get() @@ -936,6 +954,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, ver, err = onAlterCacheTable(d, t, job) case model.ActionAlterNoCacheTable: ver, err = onAlterNoCacheTable(d, t, job) + case model.ActionMultiSchemaChange: + ver, err = onMultiSchemaChange(w, d, t, job) default: // Invalid job, cancel it. job.State = model.JobStateCancelled diff --git a/ddl/index.go b/ddl/index.go index 467f84190eb34..9296d2507275c 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -470,7 +470,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo if len(hiddenCols) > 0 { pos := &ast.ColumnPosition{Tp: ast.ColumnPositionNone} for _, hiddenCol := range hiddenCols { - _, _, _, err = createColumnInfo(tblInfo, hiddenCol, pos) + _, _, _, err = createColumnInfoWithPosCheck(tblInfo, hiddenCol, pos) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) diff --git a/ddl/index_modify_test.go b/ddl/index_modify_test.go index 39950e424e1db..3ae2ae16482e0 100644 --- a/ddl/index_modify_test.go +++ b/ddl/index_modify_test.go @@ -882,7 +882,7 @@ func testDropIndexesIfExists(t *testing.T, store kv.Storage) { "[ddl:1091]index i3 doesn't exist", ) tk.MustExec("alter table test_drop_indexes_if_exists drop index i1, drop index if exists i3;") - tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Warning|1091|index i3 doesn't exist")) + tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Note|1091|index i3 doesn't exist")) // Verify the impact of deletion order when dropping duplicate indexes. tk.MustGetErrMsg( @@ -894,7 +894,7 @@ func testDropIndexesIfExists(t *testing.T, store kv.Storage) { "[ddl:1091]index i2 doesn't exist", ) tk.MustExec("alter table test_drop_indexes_if_exists drop index i2, drop index if exists i2;") - tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Warning|1091|index i2 doesn't exist")) + tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Note|1091|index i2 doesn't exist")) } func testDropIndexesFromPartitionedTable(t *testing.T, store kv.Storage) { diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index 414d2f1484909..bd0518404530a 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -17,6 +17,7 @@ package ddl import ( "github.com/pingcap/errors" ddlutil "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/terror" @@ -56,18 +57,136 @@ func (d *ddl) MultiSchemaChange(ctx sessionctx.Context, ti ast.Ident) error { return d.callHookOnChanged(job, err) } -func checkMultiSchemaInfo(info *model.MultiSchemaInfo, t table.Table) error { - err := checkOperateSameColAndIdx(info) - if err != nil { - return err +func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { + if job.MultiSchemaInfo.Revertible { + // Handle the rolling back job. + if job.IsRollingback() { + // Rollback/cancel the sub-jobs in reverse order. + for i := len(job.MultiSchemaInfo.SubJobs) - 1; i >= 0; i-- { + sub := job.MultiSchemaInfo.SubJobs[i] + if sub.IsFinished() { + continue + } + proxyJob := sub.ToProxyJob(job) + ver, err = w.runDDLJob(d, t, proxyJob) + sub.FromProxyJob(proxyJob) + return ver, err + } + // The last rollback/cancelling sub-job is done. + job.State = model.JobStateRollbackDone + return ver, nil + } + + // The sub-jobs are normally running. + // Run the first executable sub-job. + for _, sub := range job.MultiSchemaInfo.SubJobs { + if !sub.Revertible || sub.IsFinished() { + // Skip the sub-jobs which related schema states + // are in the last revertible point. + // If a sub job is finished here, it should be a noop job. + continue + } + proxyJob := sub.ToProxyJob(job) + ver, err = w.runDDLJob(d, t, proxyJob) + sub.FromProxyJob(proxyJob) + handleRevertibleException(job, sub, proxyJob.Error) + return ver, err + } + + // Save table info and sub-jobs for rolling back. + var tblInfo *model.TableInfo + tblInfo, err = t.GetTable(job.SchemaID, job.TableID) + if err != nil { + return ver, err + } + subJobs := make([]model.SubJob, len(job.MultiSchemaInfo.SubJobs)) + // Step the sub-jobs to the non-revertible states all at once. + for i, sub := range job.MultiSchemaInfo.SubJobs { + if sub.IsFinished() { + continue + } + subJobs[i] = *sub + proxyJob := sub.ToProxyJob(job) + ver, err = w.runDDLJob(d, t, proxyJob) + sub.FromProxyJob(proxyJob) + if err != nil || proxyJob.Error != nil { + for j := i - 1; j >= 0; j-- { + job.MultiSchemaInfo.SubJobs[j] = &subJobs[j] + } + handleRevertibleException(job, sub, proxyJob.Error) + // The TableInfo and sub-jobs should be restored + // because some schema changes update the transaction aggressively. + return updateVersionAndTableInfo(d, t, job, tblInfo, true) + } + } + // All the sub-jobs are non-revertible. + job.MarkNonRevertible() + return ver, err + } + // Run the rest non-revertible sub-jobs one by one. + for _, sub := range job.MultiSchemaInfo.SubJobs { + if sub.IsFinished() { + continue + } + proxyJob := sub.ToProxyJob(job) + ver, err = w.runDDLJob(d, t, proxyJob) + sub.FromProxyJob(proxyJob) + return ver, err } + job.State = model.JobStateDone + return ver, err +} - err = checkVisibleColumnCnt(t, len(info.AddColumns), len(info.DropColumns)) +func handleRevertibleException(job *model.Job, subJob *model.SubJob, err *terror.Error) { + if subJob.IsNormal() { + return + } + job.State = model.JobStateRollingback + job.Error = err + // Flush the cancelling state and cancelled state to sub-jobs. + for _, sub := range job.MultiSchemaInfo.SubJobs { + switch sub.State { + case model.JobStateRunning: + sub.State = model.JobStateCancelling + case model.JobStateNone, model.JobStateQueueing: + sub.State = model.JobStateCancelled + } + } +} + +func appendToSubJobs(m *model.MultiSchemaInfo, job *model.Job) error { + err := fillMultiSchemaInfo(m, job) if err != nil { return err } + m.SubJobs = append(m.SubJobs, &model.SubJob{ + Type: job.Type, + Args: job.Args, + RawArgs: job.RawArgs, + SchemaState: job.SchemaState, + SnapshotVer: job.SnapshotVer, + Revertible: true, + CtxVars: job.CtxVars, + }) + return nil +} - return checkAddColumnTooManyColumns(len(t.Cols()) + len(info.AddColumns) - len(info.DropColumns)) +func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error) { + switch job.Type { + case model.ActionAddColumn: + col := job.Args[0].(*table.Column) + pos := job.Args[1].(*ast.ColumnPosition) + info.AddColumns = append(info.AddColumns, col.Name) + for colName := range col.Dependences { + info.RelativeColumns = append(info.RelativeColumns, model.CIStr{L: colName, O: colName}) + } + if pos != nil && pos.Tp == ast.ColumnPositionAfter { + info.PositionColumns = append(info.PositionColumns, pos.RelativeColumn.Name) + } + default: + return dbterror.ErrRunMultiSchemaChanges + } + return nil } func checkOperateSameColAndIdx(info *model.MultiSchemaInfo) error { @@ -106,12 +225,15 @@ func checkOperateSameColAndIdx(info *model.MultiSchemaInfo) error { if err := checkColumns(info.DropColumns, true); err != nil { return err } - if err := checkColumns(info.RelativeColumns, false); err != nil { + if err := checkColumns(info.PositionColumns, false); err != nil { return err } if err := checkColumns(info.ModifyColumns, true); err != nil { return err } + if err := checkColumns(info.RelativeColumns, false); err != nil { + return err + } if err := checkIndexes(info.AddIndexes, true); err != nil { return err @@ -121,3 +243,56 @@ func checkOperateSameColAndIdx(info *model.MultiSchemaInfo) error { } return checkIndexes(info.AlterIndexes, true) } + +func checkMultiSchemaInfo(info *model.MultiSchemaInfo, t table.Table) error { + err := checkOperateSameColAndIdx(info) + if err != nil { + return err + } + + err = checkVisibleColumnCnt(t, len(info.AddColumns), len(info.DropColumns)) + if err != nil { + return err + } + + return checkAddColumnTooManyColumns(len(t.Cols()) + len(info.AddColumns) - len(info.DropColumns)) +} + +func appendMultiChangeWarningsToOwnerCtx(ctx sessionctx.Context, job *model.Job) { + if job.MultiSchemaInfo == nil { + return + } + if job.Type == model.ActionMultiSchemaChange { + for _, sub := range job.MultiSchemaInfo.SubJobs { + if sub.Warning != nil { + ctx.GetSessionVars().StmtCtx.AppendNote(sub.Warning) + } + } + } + for _, w := range job.MultiSchemaInfo.Warnings { + ctx.GetSessionVars().StmtCtx.AppendNote(w) + } + +} + +// rollingBackMultiSchemaChange updates a multi-schema change job +// from cancelling state to rollingback state. +func rollingBackMultiSchemaChange(job *model.Job) error { + if !job.MultiSchemaInfo.Revertible { + // Cannot rolling back because the jobs are non-revertible. + // Resume the job state to running. + job.State = model.JobStateRunning + return nil + } + // Mark all the jobs to cancelling. + for _, sub := range job.MultiSchemaInfo.SubJobs { + switch sub.State { + case model.JobStateRunning: + sub.State = model.JobStateCancelling + case model.JobStateNone, model.JobStateQueueing: + sub.State = model.JobStateCancelled + } + } + job.State = model.JobStateRollingback + return dbterror.ErrCancelledDDLJob +} diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go new file mode 100644 index 0000000000000..9e3cae966c43f --- /dev/null +++ b/ddl/multi_schema_change_test.go @@ -0,0 +1,100 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl_test + +import ( + "testing" + + "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/testkit" +) + +func TestMultiSchemaChangeAddColumns(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@global.tidb_enable_change_multi_schema = 1") + + // Test add multiple columns in multiple specs. + tk.MustExec("create table t (a int);") + tk.MustExec("insert into t values (1);") + tk.MustExec("alter table t add column b int default 2, add column c int default 3;") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3")) + + // Test add multiple columns in one spec. + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int);") + tk.MustExec("insert into t values (1);") + tk.MustExec("alter table t add column (b int default 2, c int default 3);") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3")) + + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int, b int, c int);") + tk.MustExec("insert into t values (1, 2, 3);") + tk.MustExec("alter table t add column (d int default 4, e int default 5);") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3 4 5")) + + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int default 1);") + tk.MustExec("insert into t values ();") + tk.MustExec("alter table t add column if not exists (b int default 2, c int default 3);") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3")) + tk.MustExec("alter table t add column if not exists (c int default 3, d int default 4);") + tk.MustQuery("show warnings;").Check(testkit.Rows("Note 1060 Duplicate column name 'c'")) + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3 4")) + + // Test referencing previous column in multi-schema change is not supported. + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int);") + tk.MustGetErrCode("alter table t add column b int after a, add column c int after b", errno.ErrBadField) + tk.MustGetErrCode("alter table t add column c int after b, add column b int", errno.ErrBadField) + + // Test add multiple columns with different position. + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int, b int, c int);") + tk.MustExec("insert into t values (1, 2, 3);") + tk.MustExec(`alter table t + add column d int default 4 first, + add column e int default 5 after b, + add column f int default 6 after b;`) + tk.MustQuery("select * from t;").Check(testkit.Rows("4 1 2 6 5 3")) + + // Test [if not exists] for adding columns. + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int default 1);") + tk.MustExec("insert into t values ();") + tk.MustExec("alter table t add column b int default 2, add column if not exists a int;") + tk.MustQuery("show warnings;").Check(testkit.Rows("Note 1060 Duplicate column name 'a'")) + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2")) + + // Test add generate column + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(a int, b int);") + tk.MustExec("insert into t values (1, 2);") + tk.MustExec("alter table t add column c double default 3.0, add column d double as (a + b);") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3 3")) + + // Test add columns with same name + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int default 1, c int default 4);") + tk.MustGetErrCode("alter table t add column b int default 2, add column b int default 3", errno.ErrUnsupportedDDLOperation) + + // Test add generate column dependents on a modifying column + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(a int, b int);") + tk.MustExec("insert into t values (1, 2);") + tk.MustGetErrCode("alter table t modify column b double, add column c double as (a + b);", errno.ErrUnsupportedDDLOperation) +} diff --git a/ddl/rollingback.go b/ddl/rollingback.go index 659cf146e62ba..b1ea2da7c7592 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -481,6 +481,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) model.ActionModifyTableAutoIdCache, model.ActionAlterIndexVisibility, model.ActionExchangeTablePartition, model.ActionModifySchemaDefaultPlacement: ver, err = cancelOnlyNotHandledJob(job, model.StateNone) + case model.ActionMultiSchemaChange: + err = rollingBackMultiSchemaChange(job) default: job.State = model.JobStateCancelled err = dbterror.ErrCancelledDDLJob diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 1769d3b526d7f..549a8119e6b33 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -270,6 +270,7 @@ type MultiSchemaInfo struct { AlterIndexes []CIStr `json:"-"` RelativeColumns []CIStr `json:"-"` + PositionColumns []CIStr `json:"-"` } func NewMultiSchemaInfo() *MultiSchemaInfo { @@ -292,6 +293,66 @@ type SubJob struct { CtxVars []interface{} `json:"-"` } +// IsNormal returns true if the sub-job is normally running. +func (sub *SubJob) IsNormal() bool { + switch sub.State { + case JobStateCancelling, JobStateCancelled, + JobStateRollingback, JobStateRollbackDone: + return false + default: + return true + } +} + +// IsFinished returns true if the job is done. +func (sub *SubJob) IsFinished() bool { + return sub.State == JobStateDone || + sub.State == JobStateRollbackDone || + sub.State == JobStateCancelled +} + +// ToProxyJob converts a sub-job to a proxy job. +func (sub *SubJob) ToProxyJob(parentJob *Job) *Job { + return &Job{ + ID: parentJob.ID, + Type: sub.Type, + SchemaID: parentJob.SchemaID, + TableID: parentJob.TableID, + SchemaName: parentJob.SchemaName, + State: sub.State, + Warning: sub.Warning, + Error: nil, + ErrorCount: 0, + RowCount: sub.RowCount, + Mu: sync.Mutex{}, + CtxVars: sub.CtxVars, + Args: sub.Args, + RawArgs: sub.RawArgs, + SchemaState: sub.SchemaState, + SnapshotVer: sub.SnapshotVer, + RealStartTS: parentJob.RealStartTS, + StartTS: parentJob.StartTS, + DependencyID: parentJob.DependencyID, + Query: parentJob.Query, + BinlogInfo: parentJob.BinlogInfo, + Version: parentJob.Version, + ReorgMeta: parentJob.ReorgMeta, + MultiSchemaInfo: &MultiSchemaInfo{Revertible: sub.Revertible}, + Priority: parentJob.Priority, + SeqNum: parentJob.SeqNum, + } +} + +func (sub *SubJob) FromProxyJob(proxyJob *Job) { + sub.Revertible = proxyJob.MultiSchemaInfo.Revertible + sub.SchemaState = proxyJob.SchemaState + sub.SnapshotVer = proxyJob.SnapshotVer + sub.Args = proxyJob.Args + sub.State = proxyJob.State + sub.Warning = proxyJob.Warning + sub.RowCount = proxyJob.RowCount +} + // Job is for a DDL operation. type Job struct { ID int64 `json:"id"` @@ -301,6 +362,7 @@ type Job struct { SchemaName string `json:"schema_name"` TableName string `json:"table_name"` State JobState `json:"state"` + Warning *terror.Error `json:"warning"` Error *terror.Error `json:"err"` // ErrorCount will be increased, every time we meet an error when running job. ErrorCount int64 `json:"err_count"` @@ -370,6 +432,14 @@ func (job *Job) FinishDBJob(jobState JobState, schemaState SchemaState, ver int6 job.BinlogInfo.AddDBInfo(ver, dbInfo) } +// MarkNonRevertible mark the current job to be non-revertible. +// It means the job cannot be cancelled or rollbacked. +func (job *Job) MarkNonRevertible() { + if job.MultiSchemaInfo != nil { + job.MultiSchemaInfo.Revertible = false + } +} + // TSConvert2Time converts timestamp to time. func TSConvert2Time(ts uint64) time.Time { t := int64(ts >> 18) // 18 is for the logical time. @@ -412,6 +482,18 @@ func (job *Job) Encode(updateRawArgs bool) ([]byte, error) { if err != nil { return nil, errors.Trace(err) } + if job.MultiSchemaInfo != nil { + for _, sub := range job.MultiSchemaInfo.SubJobs { + // Only update the args of executing sub-jobs. + if sub.Args == nil { + continue + } + sub.RawArgs, err = json.Marshal(sub.Args) + if err != nil { + return nil, errors.Trace(err) + } + } + } } var b []byte @@ -586,6 +668,8 @@ func (job *Job) IsRollbackable() bool { ActionModifySchemaCharsetAndCollate, ActionRepairTable, ActionModifyTableAutoIdCache, ActionModifySchemaDefaultPlacement: return job.SchemaState == StateNone + case ActionMultiSchemaChange: + return job.MultiSchemaInfo.Revertible } return true } diff --git a/parser/model/model_test.go b/parser/model/model_test.go index 73a6ec5e782e5..114b3a38bf5bf 100644 --- a/parser/model/model_test.go +++ b/parser/model/model_test.go @@ -18,6 +18,7 @@ import ( "fmt" "testing" "time" + "unsafe" "github.com/pingcap/tidb/parser/charset" "github.com/pingcap/tidb/parser/mysql" @@ -578,3 +579,12 @@ func TestLocation(t *testing.T) { location := time.FixedZone("UTC", loc1.Offset) require.Equal(t, nLoc, location) } + +func TestDDLJobSize(t *testing.T) { + msg := `Please make sure that the following methods work as expected: +- SubJob.FromProxyJob() +- SubJob.ToProxyJob() +` + job := Job{} + require.Equal(t, 288, int(unsafe.Sizeof(job)), msg) +}