Skip to content

Commit

Permalink
ddl: fix return duplicate schema version during multi-schema change (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Defined2014 authored Nov 14, 2022
1 parent 9f9d289 commit e939c9b
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 19 deletions.
8 changes: 4 additions & 4 deletions ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Callback interface {
// OnChanged is called after a ddl statement is finished.
OnChanged(err error) error
// OnSchemaStateChanged is called after a schema state is changed.
OnSchemaStateChanged()
OnSchemaStateChanged(schemaVer int64)
// OnJobRunBefore is called before running job.
OnJobRunBefore(job *model.Job)
// OnJobUpdated is called after the running job is updated.
Expand All @@ -71,7 +71,7 @@ func (*BaseCallback) OnChanged(err error) error {
}

// OnSchemaStateChanged implements Callback interface.
func (*BaseCallback) OnSchemaStateChanged() {
func (*BaseCallback) OnSchemaStateChanged(schemaVer int64) {
// Nothing to do.
}

Expand Down Expand Up @@ -129,7 +129,7 @@ func (c *DefaultCallback) OnChanged(err error) error {
}

// OnSchemaStateChanged overrides the ddl Callback interface.
func (c *DefaultCallback) OnSchemaStateChanged() {
func (c *DefaultCallback) OnSchemaStateChanged(schemaVer int64) {
err := c.do.Reload()
if err != nil {
logutil.BgLogger().Error("domain callback failed on schema state changed", zap.Error(err))
Expand Down Expand Up @@ -166,7 +166,7 @@ func (c *ctcCallback) OnChanged(err error) error {
}

// OnSchemaStateChanged overrides the ddl Callback interface.
func (c *ctcCallback) OnSchemaStateChanged() {
func (c *ctcCallback) OnSchemaStateChanged(retVer int64) {
err := c.do.Reload()
if err != nil {
logutil.BgLogger().Error("domain callback failed on schema state changed", zap.Error(err))
Expand Down
22 changes: 14 additions & 8 deletions ddl/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ type TestDDLCallback struct {
// domain to reload schema before your ddl stepping into the next state change.
Do DomainReloader

onJobRunBefore func(*model.Job)
OnJobRunBeforeExported func(*model.Job)
onJobUpdated func(*model.Job)
OnJobUpdatedExported atomic.Pointer[func(*model.Job)]
onWatched func(ctx context.Context)
OnGetJobBeforeExported func(string)
OnGetJobAfterExported func(string, *model.Job)
onJobRunBefore func(*model.Job)
OnJobRunBeforeExported func(*model.Job)
onJobUpdated func(*model.Job)
OnJobUpdatedExported atomic.Pointer[func(*model.Job)]
onWatched func(ctx context.Context)
OnGetJobBeforeExported func(string)
OnGetJobAfterExported func(string, *model.Job)
OnJobSchemaStateChanged func(int64)
}

// OnChanged mock the same behavior with the main DDL hook.
Expand All @@ -73,12 +74,17 @@ func (tc *TestDDLCallback) OnChanged(err error) error {
}

// OnSchemaStateChanged mock the same behavior with the main ddl hook.
func (tc *TestDDLCallback) OnSchemaStateChanged() {
func (tc *TestDDLCallback) OnSchemaStateChanged(schemaVer int64) {
if tc.Do != nil {
if err := tc.Do.Reload(); err != nil {
logutil.BgLogger().Warn("reload failed on schema state changed", zap.Error(err))
}
}

if tc.OnJobSchemaStateChanged != nil {
tc.OnJobSchemaStateChanged(schemaVer)
return
}
}

// OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first.
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
if RunInGoTest {
// d.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously.
d.mu.RLock()
d.mu.hook.OnSchemaStateChanged()
d.mu.hook.OnSchemaStateChanged(schemaVer)
d.mu.RUnlock()
}

Expand Down
2 changes: 1 addition & 1 deletion ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
if RunInGoTest {
// d.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously.
d.mu.RLock()
d.mu.hook.OnSchemaStateChanged()
d.mu.hook.OnSchemaStateChanged(schemaVer)
d.mu.RUnlock()
}

Expand Down
12 changes: 7 additions & 5 deletions ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,13 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
proxyJob := sub.ToProxyJob(job)
if schemaVersionGenerated {
proxyJob.MultiSchemaInfo.SkipVersion = true
} else {
}
proxyJobVer, err := w.runDDLJob(d, t, &proxyJob)
if !schemaVersionGenerated && proxyJobVer != 0 {
schemaVersionGenerated = true
ver = proxyJobVer
}
ver, err = w.runDDLJob(d, t, &proxyJob)
sub.FromProxyJob(&proxyJob, ver)
sub.FromProxyJob(&proxyJob, proxyJobVer)
if err != nil || proxyJob.Error != nil {
for j := i - 1; j >= 0; j-- {
job.MultiSchemaInfo.SubJobs[j] = &subJobs[j]
Expand Down Expand Up @@ -376,8 +378,8 @@ func finishMultiSchemaJob(job *model.Job, t *meta.Meta) (ver int64, err error) {
}
tblInfo, err := t.GetTable(job.SchemaID, job.TableID)
if err != nil {
return ver, err
return 0, err
}
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
return ver, err
return 0, err
}
27 changes: 27 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,33 @@ func TestMultiSchemaChangeUnsupportedType(t *testing.T) {
"[ddl:8200]Unsupported multi schema change for modify auto id cache")
}

func TestMultiSchemaChangeSchemaVersion(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("create table t(a int, b int, c int, d int)")
tk.MustExec("insert into t values (1,2,3,4)")

schemaVerMap := map[int64]struct{}{}

originHook := dom.DDL().GetHook()
hook := &ddl.TestDDLCallback{Do: dom}
hook.OnJobSchemaStateChanged = func(schemaVer int64) {
if schemaVer != 0 {
// No same return schemaVer during multi-schema change
_, ok := schemaVerMap[schemaVer]
assert.False(t, ok)
schemaVerMap[schemaVer] = struct{}{}
}
}
dom.DDL().SetHook(hook)
tk.MustExec("alter table t drop column b, drop column c")
tk.MustExec("alter table t add column b int, add column c int")
tk.MustExec("alter table t add index k(b), add column e int")
tk.MustExec("alter table t alter index k invisible, drop column e")
dom.DDL().SetHook(originHook)
}

func TestMultiSchemaChangeMixedWithUpdate(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down

0 comments on commit e939c9b

Please sign in to comment.