Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: only use one schema version for the non-revertible step #36262

Merged
merged 13 commits into from
Jul 18, 2022
Merged
4 changes: 2 additions & 2 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
if err != nil {
if ifNotExists && infoschema.ErrColumnExists.Equal(err) {
job.Warning = toTError(err)
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
job.State = model.JobStateDone
return ver, nil
}
return ver, errors.Trace(err)
Expand Down Expand Up @@ -230,7 +230,7 @@ func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
if ifExists && dbterror.ErrCantDropFieldOrKey.Equal(err) {
// Convert the "not exists" error to a warning.
job.Warning = toTError(err)
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
job.State = model.JobStateDone
return ver, nil
}
return ver, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ func checkMultiSchemaSpecs(_sctx sessionctx.Context, specs []*ast.DatabaseOption
for _, spec := range specs {
if spec.Tp == ast.DatabaseSetTiFlashReplica {
if hasSetTiFlashReplica {
return dbterror.ErrRunMultiSchemaChanges
return dbterror.ErrRunMultiSchemaChanges.FastGenByArgs(model.ActionSetTiFlashReplica.String())
}
hasSetTiFlashReplica = true
}
Expand Down
8 changes: 4 additions & 4 deletions ddl/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,10 +783,10 @@ func TestAlterDatabaseErrorGrammar(t *testing.T) {
defer tear()

tk := testkit.NewTestKit(t, store)
tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 SET TIFLASH REPLICA 2 LOCATION LABELS 'a','b'", "[ddl:8200]Unsupported multi schema change")
tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 SET TIFLASH REPLICA 2", "[ddl:8200]Unsupported multi schema change")
tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 LOCATION LABELS 'a','b' SET TIFLASH REPLICA 2", "[ddl:8200]Unsupported multi schema change")
tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 LOCATION LABELS 'a','b' SET TIFLASH REPLICA 2 LOCATION LABELS 'a','b'", "[ddl:8200]Unsupported multi schema change")
tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 SET TIFLASH REPLICA 2 LOCATION LABELS 'a','b'", "[ddl:8200]Unsupported multi schema change for set tiflash replica")
tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 SET TIFLASH REPLICA 2", "[ddl:8200]Unsupported multi schema change for set tiflash replica")
tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 LOCATION LABELS 'a','b' SET TIFLASH REPLICA 2", "[ddl:8200]Unsupported multi schema change for set tiflash replica")
tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 LOCATION LABELS 'a','b' SET TIFLASH REPLICA 2 LOCATION LABELS 'a','b'", "[ddl:8200]Unsupported multi schema change for set tiflash replica")
}

func TestAlterDatabaseBasic(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
if err != nil {
if ifExists && dbterror.ErrCantDropFieldOrKey.Equal(err) {
job.Warning = toTError(err)
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
job.State = model.JobStateDone
return ver, nil
}
return ver, errors.Trace(err)
Expand Down
34 changes: 27 additions & 7 deletions ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
if err != nil {
return ver, err
}
sub.FromProxyJob(&proxyJob)
sub.FromProxyJob(&proxyJob, ver)
return ver, nil
}
// The last rollback/cancelling sub-job is done.
Expand All @@ -95,7 +95,7 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
proxyJob := sub.ToProxyJob(job)
ver, err = w.runDDLJob(d, t, &proxyJob)
sub.FromProxyJob(&proxyJob)
sub.FromProxyJob(&proxyJob, ver)
handleRevertibleException(job, sub, proxyJob.Error)
return ver, err
}
Expand All @@ -106,16 +106,23 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
if err != nil {
return ver, err
}
var schemaVersionGenerated = false
subJobs := make([]model.SubJob, len(job.MultiSchemaInfo.SubJobs))
// Step the sub-jobs to the non-revertible states all at once.
// We only generate 1 schema version for these sub-job.
for i, sub := range job.MultiSchemaInfo.SubJobs {
if sub.IsFinished() {
continue
}
subJobs[i] = *sub
proxyJob := sub.ToProxyJob(job)
if schemaVersionGenerated {
proxyJob.MultiSchemaInfo.SkipVersion = true
} else {
schemaVersionGenerated = true
}
ver, err = w.runDDLJob(d, t, &proxyJob)
sub.FromProxyJob(&proxyJob)
sub.FromProxyJob(&proxyJob, ver)
if err != nil || proxyJob.Error != nil {
for j := i - 1; j >= 0; j-- {
job.MultiSchemaInfo.SubJobs[j] = &subJobs[j]
Expand All @@ -137,11 +144,10 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
proxyJob := sub.ToProxyJob(job)
ver, err = w.runDDLJob(d, t, &proxyJob)
sub.FromProxyJob(&proxyJob)
sub.FromProxyJob(&proxyJob, ver)
return ver, err
}
job.State = model.JobStateDone
return ver, err
return finishMultiSchemaJob(job, t)
}

func handleRevertibleException(job *model.Job, subJob *model.SubJob, err *terror.Error) {
Expand Down Expand Up @@ -252,7 +258,7 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error
info.AlterIndexes = append(info.AlterIndexes, idxName)
case model.ActionRebaseAutoID, model.ActionModifyTableComment, model.ActionModifyTableCharsetAndCollate:
default:
return dbterror.ErrRunMultiSchemaChanges
return dbterror.ErrRunMultiSchemaChanges.FastGenByArgs(job.Type.String())
}
return nil
}
Expand Down Expand Up @@ -358,3 +364,17 @@ func rollingBackMultiSchemaChange(job *model.Job) error {
job.State = model.JobStateRollingback
return dbterror.ErrCancelledDDLJob
}

func finishMultiSchemaJob(job *model.Job, t *meta.Meta) (ver int64, err error) {
for _, sub := range job.MultiSchemaInfo.SubJobs {
if ver < sub.SchemaVer {
ver = sub.SchemaVer
}
}
tblInfo, err := t.GetTable(job.SchemaID, job.TableID)
if err != nil {
return ver, err
}
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
return ver, err
}
11 changes: 11 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,17 @@ func TestMultiSchemaChangeNoSubJobs(t *testing.T) {
require.Equal(t, "create table", rs[0][3])
}

func TestMultiSchemaChangeUnsupportedType(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")

tk.MustExec("create table t (a int, b int);")
tk.MustGetErrMsg("alter table t add column c int, auto_id_cache = 1;",
"[ddl:8200]Unsupported multi schema change for modify auto id cache")
}

type cancelOnceHook struct {
store kv.Storage
triggered bool
Expand Down
2 changes: 1 addition & 1 deletion ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ func updateVersionAndTableInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo
default:
}
})
if shouldUpdateVer {
if shouldUpdateVer && (job.MultiSchemaInfo == nil || !job.MultiSchemaInfo.SkipVersion) {
ver, err = updateSchemaVersion(d, t, job)
if err != nil {
return 0, errors.Trace(err)
Expand Down
7 changes: 6 additions & 1 deletion parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ type MultiSchemaInfo struct {
SubJobs []*SubJob `json:"sub_jobs"`
Revertible bool `json:"revertible"`

// SkipVersion is used to control whether generating a new schema version for a sub-job.
SkipVersion bool `json:"-"`

AddColumns []CIStr `json:"-"`
DropColumns []CIStr `json:"-"`
ModifyColumns []CIStr `json:"-"`
Expand Down Expand Up @@ -289,6 +292,7 @@ type SubJob struct {
RowCount int64 `json:"row_count"`
Warning *terror.Error `json:"warning"`
CtxVars []interface{} `json:"-"`
SchemaVer int64 `json:"schema_version"`
}

// IsNormal returns true if the sub-job is normally running.
Expand Down Expand Up @@ -342,14 +346,15 @@ func (sub *SubJob) ToProxyJob(parentJob *Job) Job {
}

// FromProxyJob converts a proxy job to a sub-job.
func (sub *SubJob) FromProxyJob(proxyJob *Job) {
func (sub *SubJob) FromProxyJob(proxyJob *Job, ver int64) {
sub.Revertible = proxyJob.MultiSchemaInfo.Revertible
sub.SchemaState = proxyJob.SchemaState
sub.SnapshotVer = proxyJob.SnapshotVer
sub.Args = proxyJob.Args
sub.State = proxyJob.State
sub.Warning = proxyJob.Warning
sub.RowCount = proxyJob.RowCount
sub.SchemaVer = ver
}

// Job is for a DDL operation.
Expand Down
2 changes: 1 addition & 1 deletion util/dbterror/ddl_terror.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
// ErrCancelledDDLJob means the DDL job is cancelled.
ErrCancelledDDLJob = ClassDDL.NewStd(mysql.ErrCancelledDDLJob)
// ErrRunMultiSchemaChanges means we run multi schema changes.
ErrRunMultiSchemaChanges = ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message(fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUnsupportedDDLOperation].Raw, "multi schema change"), nil))
ErrRunMultiSchemaChanges = ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message(fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUnsupportedDDLOperation].Raw, "multi schema change for %s"), nil))
// ErrOperateSameColumn means we change the same columns multiple times in a DDL.
ErrOperateSameColumn = ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message(fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUnsupportedDDLOperation].Raw, "operate same column '%s'"), nil))
// ErrOperateSameIndex means we change the same indexes multiple times in a DDL.
Expand Down