diff --git a/ddl/column.go b/ddl/column.go index 792b1ca54c057..5c40c4d10e397 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -119,7 +119,7 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) if err != nil { if ifNotExists && infoschema.ErrColumnExists.Equal(err) { job.Warning = toTError(err) - job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) + job.State = model.JobStateDone return ver, nil } return ver, errors.Trace(err) @@ -230,7 +230,7 @@ func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) if ifExists && dbterror.ErrCantDropFieldOrKey.Equal(err) { // Convert the "not exists" error to a warning. job.Warning = toTError(err) - job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) + job.State = model.JobStateDone return ver, nil } return ver, errors.Trace(err) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index db6935a658360..dd2a63215d2ee 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -511,7 +511,7 @@ func checkMultiSchemaSpecs(_sctx sessionctx.Context, specs []*ast.DatabaseOption for _, spec := range specs { if spec.Tp == ast.DatabaseSetTiFlashReplica { if hasSetTiFlashReplica { - return dbterror.ErrRunMultiSchemaChanges + return dbterror.ErrRunMultiSchemaChanges.FastGenByArgs(model.ActionSetTiFlashReplica.String()) } hasSetTiFlashReplica = true } diff --git a/ddl/ddl_tiflash_test.go b/ddl/ddl_tiflash_test.go index 949b09f7738fd..2db35d19cb744 100644 --- a/ddl/ddl_tiflash_test.go +++ b/ddl/ddl_tiflash_test.go @@ -783,10 +783,10 @@ func TestAlterDatabaseErrorGrammar(t *testing.T) { defer tear() tk := testkit.NewTestKit(t, store) - tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 SET TIFLASH REPLICA 2 LOCATION LABELS 'a','b'", "[ddl:8200]Unsupported multi schema change") - tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 SET TIFLASH REPLICA 2", "[ddl:8200]Unsupported multi schema change") - tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 LOCATION LABELS 'a','b' SET TIFLASH REPLICA 2", "[ddl:8200]Unsupported multi schema change") - tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 LOCATION LABELS 'a','b' SET TIFLASH REPLICA 2 LOCATION LABELS 'a','b'", "[ddl:8200]Unsupported multi schema change") + tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 SET TIFLASH REPLICA 2 LOCATION LABELS 'a','b'", "[ddl:8200]Unsupported multi schema change for set tiflash replica") + tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 SET TIFLASH REPLICA 2", "[ddl:8200]Unsupported multi schema change for set tiflash replica") + tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 LOCATION LABELS 'a','b' SET TIFLASH REPLICA 2", "[ddl:8200]Unsupported multi schema change for set tiflash replica") + tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 LOCATION LABELS 'a','b' SET TIFLASH REPLICA 2 LOCATION LABELS 'a','b'", "[ddl:8200]Unsupported multi schema change for set tiflash replica") } func TestAlterDatabaseBasic(t *testing.T) { diff --git a/ddl/index.go b/ddl/index.go index 40b8b3cf11ebc..f89fa95ad9c0d 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -695,7 +695,7 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { if err != nil { if ifExists && dbterror.ErrCantDropFieldOrKey.Equal(err) { job.Warning = toTError(err) - job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) + job.State = model.JobStateDone return ver, nil } return ver, errors.Trace(err) diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index ddb298c2f92e3..894b926ba512e 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -76,7 +76,7 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve if err != nil { return ver, err } - sub.FromProxyJob(&proxyJob) + sub.FromProxyJob(&proxyJob, ver) return ver, nil } // The last rollback/cancelling sub-job is done. @@ -95,7 +95,7 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve } proxyJob := sub.ToProxyJob(job) ver, err = w.runDDLJob(d, t, &proxyJob) - sub.FromProxyJob(&proxyJob) + sub.FromProxyJob(&proxyJob, ver) handleRevertibleException(job, sub, proxyJob.Error) return ver, err } @@ -106,16 +106,23 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve if err != nil { return ver, err } + var schemaVersionGenerated = false subJobs := make([]model.SubJob, len(job.MultiSchemaInfo.SubJobs)) // Step the sub-jobs to the non-revertible states all at once. + // We only generate 1 schema version for these sub-job. for i, sub := range job.MultiSchemaInfo.SubJobs { if sub.IsFinished() { continue } subJobs[i] = *sub proxyJob := sub.ToProxyJob(job) + if schemaVersionGenerated { + proxyJob.MultiSchemaInfo.SkipVersion = true + } else { + schemaVersionGenerated = true + } ver, err = w.runDDLJob(d, t, &proxyJob) - sub.FromProxyJob(&proxyJob) + sub.FromProxyJob(&proxyJob, ver) if err != nil || proxyJob.Error != nil { for j := i - 1; j >= 0; j-- { job.MultiSchemaInfo.SubJobs[j] = &subJobs[j] @@ -137,11 +144,10 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve } proxyJob := sub.ToProxyJob(job) ver, err = w.runDDLJob(d, t, &proxyJob) - sub.FromProxyJob(&proxyJob) + sub.FromProxyJob(&proxyJob, ver) return ver, err } - job.State = model.JobStateDone - return ver, err + return finishMultiSchemaJob(job, t) } func handleRevertibleException(job *model.Job, subJob *model.SubJob, err *terror.Error) { @@ -252,7 +258,7 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error info.AlterIndexes = append(info.AlterIndexes, idxName) case model.ActionRebaseAutoID, model.ActionModifyTableComment, model.ActionModifyTableCharsetAndCollate: default: - return dbterror.ErrRunMultiSchemaChanges + return dbterror.ErrRunMultiSchemaChanges.FastGenByArgs(job.Type.String()) } return nil } @@ -358,3 +364,17 @@ func rollingBackMultiSchemaChange(job *model.Job) error { job.State = model.JobStateRollingback return dbterror.ErrCancelledDDLJob } + +func finishMultiSchemaJob(job *model.Job, t *meta.Meta) (ver int64, err error) { + for _, sub := range job.MultiSchemaInfo.SubJobs { + if ver < sub.SchemaVer { + ver = sub.SchemaVer + } + } + tblInfo, err := t.GetTable(job.SchemaID, job.TableID) + if err != nil { + return ver, err + } + job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) + return ver, err +} diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index 60ba161ef09d6..e0e166e62e9b5 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -1137,6 +1137,17 @@ func TestMultiSchemaChangeNoSubJobs(t *testing.T) { require.Equal(t, "create table", rs[0][3]) } +func TestMultiSchemaChangeUnsupportedType(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + + tk.MustExec("create table t (a int, b int);") + tk.MustGetErrMsg("alter table t add column c int, auto_id_cache = 1;", + "[ddl:8200]Unsupported multi schema change for modify auto id cache") +} + type cancelOnceHook struct { store kv.Storage triggered bool diff --git a/ddl/table.go b/ddl/table.go index 4391ccd416f49..b253554516976 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -1334,7 +1334,7 @@ func updateVersionAndTableInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo default: } }) - if shouldUpdateVer { + if shouldUpdateVer && (job.MultiSchemaInfo == nil || !job.MultiSchemaInfo.SkipVersion) { ver, err = updateSchemaVersion(d, t, job) if err != nil { return 0, errors.Trace(err) diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 92c7280649318..1bbddf1382bf6 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -258,6 +258,9 @@ type MultiSchemaInfo struct { SubJobs []*SubJob `json:"sub_jobs"` Revertible bool `json:"revertible"` + // SkipVersion is used to control whether generating a new schema version for a sub-job. + SkipVersion bool `json:"-"` + AddColumns []CIStr `json:"-"` DropColumns []CIStr `json:"-"` ModifyColumns []CIStr `json:"-"` @@ -289,6 +292,7 @@ type SubJob struct { RowCount int64 `json:"row_count"` Warning *terror.Error `json:"warning"` CtxVars []interface{} `json:"-"` + SchemaVer int64 `json:"schema_version"` } // IsNormal returns true if the sub-job is normally running. @@ -342,7 +346,7 @@ func (sub *SubJob) ToProxyJob(parentJob *Job) Job { } // FromProxyJob converts a proxy job to a sub-job. -func (sub *SubJob) FromProxyJob(proxyJob *Job) { +func (sub *SubJob) FromProxyJob(proxyJob *Job, ver int64) { sub.Revertible = proxyJob.MultiSchemaInfo.Revertible sub.SchemaState = proxyJob.SchemaState sub.SnapshotVer = proxyJob.SnapshotVer @@ -350,6 +354,7 @@ func (sub *SubJob) FromProxyJob(proxyJob *Job) { sub.State = proxyJob.State sub.Warning = proxyJob.Warning sub.RowCount = proxyJob.RowCount + sub.SchemaVer = ver } // Job is for a DDL operation. diff --git a/util/dbterror/ddl_terror.go b/util/dbterror/ddl_terror.go index b65f516bf71c4..2e90d25ffdd2c 100644 --- a/util/dbterror/ddl_terror.go +++ b/util/dbterror/ddl_terror.go @@ -33,7 +33,7 @@ var ( // ErrCancelledDDLJob means the DDL job is cancelled. ErrCancelledDDLJob = ClassDDL.NewStd(mysql.ErrCancelledDDLJob) // ErrRunMultiSchemaChanges means we run multi schema changes. - ErrRunMultiSchemaChanges = ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message(fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUnsupportedDDLOperation].Raw, "multi schema change"), nil)) + ErrRunMultiSchemaChanges = ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message(fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUnsupportedDDLOperation].Raw, "multi schema change for %s"), nil)) // ErrOperateSameColumn means we change the same columns multiple times in a DDL. ErrOperateSameColumn = ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message(fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUnsupportedDDLOperation].Raw, "operate same column '%s'"), nil)) // ErrOperateSameIndex means we change the same indexes multiple times in a DDL.