Skip to content

Commit

Permalink
record the schema version to sub-job
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Jul 18, 2022
1 parent 8dae14e commit 4dc1c58
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
18 changes: 9 additions & 9 deletions ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
if err != nil {
return ver, err
}
sub.FromProxyJob(&proxyJob)
sub.FromProxyJob(&proxyJob, ver)
return ver, nil
}
// The last rollback/cancelling sub-job is done.
Expand All @@ -95,7 +95,7 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
proxyJob := sub.ToProxyJob(job)
ver, err = w.runDDLJob(d, t, &proxyJob)
sub.FromProxyJob(&proxyJob)
sub.FromProxyJob(&proxyJob, ver)
handleRevertibleException(job, sub, proxyJob.Error)
return ver, err
}
Expand All @@ -122,7 +122,7 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
schemaVersionGenerated = true
}
ver, err = w.runDDLJob(d, t, &proxyJob)
sub.FromProxyJob(&proxyJob)
sub.FromProxyJob(&proxyJob, ver)
if err != nil || proxyJob.Error != nil {
for j := i - 1; j >= 0; j-- {
job.MultiSchemaInfo.SubJobs[j] = &subJobs[j]
Expand All @@ -144,7 +144,7 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
proxyJob := sub.ToProxyJob(job)
ver, err = w.runDDLJob(d, t, &proxyJob)
sub.FromProxyJob(&proxyJob)
sub.FromProxyJob(&proxyJob, ver)
return ver, err
}
return finishMultiSchemaJob(job, t)
Expand Down Expand Up @@ -366,12 +366,12 @@ func rollingBackMultiSchemaChange(job *model.Job) error {
}

func finishMultiSchemaJob(job *model.Job, t *meta.Meta) (ver int64, err error) {
ver, err = t.GetSchemaVersion()
if err != nil {
return ver, err
for _, sub := range job.MultiSchemaInfo.SubJobs {
if ver < sub.SchemaVer {
ver = sub.SchemaVer
}
}
var tblInfo *model.TableInfo
tblInfo, err = t.GetTable(job.SchemaID, job.TableID)
tblInfo, err := t.GetTable(job.SchemaID, job.TableID)
if err != nil {
return ver, err
}
Expand Down
5 changes: 4 additions & 1 deletion parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ type MultiSchemaInfo struct {
SubJobs []*SubJob `json:"sub_jobs"`
Revertible bool `json:"revertible"`

// SkipVersion is used to control whether generating a new schema version for a sub-job.
SkipVersion bool `json:"-"`

AddColumns []CIStr `json:"-"`
Expand Down Expand Up @@ -291,6 +292,7 @@ type SubJob struct {
RowCount int64 `json:"row_count"`
Warning *terror.Error `json:"warning"`
CtxVars []interface{} `json:"-"`
SchemaVer int64 `json:"schema_version"`
}

// IsNormal returns true if the sub-job is normally running.
Expand Down Expand Up @@ -344,14 +346,15 @@ func (sub *SubJob) ToProxyJob(parentJob *Job) Job {
}

// FromProxyJob converts a proxy job to a sub-job.
func (sub *SubJob) FromProxyJob(proxyJob *Job) {
func (sub *SubJob) FromProxyJob(proxyJob *Job, ver int64) {
sub.Revertible = proxyJob.MultiSchemaInfo.Revertible
sub.SchemaState = proxyJob.SchemaState
sub.SnapshotVer = proxyJob.SnapshotVer
sub.Args = proxyJob.Args
sub.State = proxyJob.State
sub.Warning = proxyJob.Warning
sub.RowCount = proxyJob.RowCount
sub.SchemaVer = ver
}

// Job is for a DDL operation.
Expand Down

0 comments on commit 4dc1c58

Please sign in to comment.