Skip to content

Commit

Permalink
ddl, domain: make schema correct after canceling jobs (#7997) (#8057)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored and zz-jason committed Oct 31, 2018
1 parent 01c9d82 commit dcc5fc7
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 11 deletions.
10 changes: 10 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
type DDLForTest interface {
// SetHook sets the hook.
SetHook(h Callback)
// GetHook gets the hook.
GetHook() Callback
// SetInterceptoror sets the interceptor.
SetInterceptoror(h Interceptor)
}
Expand All @@ -51,6 +53,14 @@ func (d *ddl) SetHook(h Callback) {
d.mu.hook = h
}

// GetHook implements DDL.GetHook interface.
func (d *ddl) GetHook() Callback {
d.mu.Lock()
defer d.mu.Unlock()

return d.mu.hook
}

// SetInterceptoror implements DDL.SetInterceptoror interface.
func (d *ddl) SetInterceptoror(i Interceptor) {
d.mu.Lock()
Expand Down
25 changes: 14 additions & 11 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,18 +276,20 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()

switch job.Type {
case model.ActionAddIndex:
if job.State != model.JobStateRollbackDone {
break
if !job.IsCancelled() {
switch job.Type {
case model.ActionAddIndex:
if job.State != model.JobStateRollbackDone {
break
}
// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
err = w.deleteRange(job)
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropTablePartition:
err = w.deleteRange(job)
}
if err != nil {
return errors.Trace(err)
}
// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
err = w.deleteRange(job)
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropTablePartition:
err = w.deleteRange(job)
}
if err != nil {
return errors.Trace(err)
}

_, err = t.DeQueueDDLJob()
Expand Down Expand Up @@ -380,6 +382,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
// and retry later if the job is not cancelled.
schemaVer, runJobErr = w.runDDLJob(d, t, job)
if job.IsCancelled() {
txn.Reset()
err = w.finishDDLJob(t, job)
return errors.Trace(err)
}
Expand Down
116 changes: 116 additions & 0 deletions ddl/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,131 @@ package ddl_test

import (
"fmt"
"time"

gofail "github.com/etcd-io/gofail/runtime"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"golang.org/x/net/context"
)

var _ = Suite(&testFailDBSuite{})

type testFailDBSuite struct {
lease time.Duration
store kv.Storage
dom *domain.Domain
se session.Session
p *parser.Parser
}

func (s *testFailDBSuite) SetUpSuite(c *C) {
testleak.BeforeTest()
s.lease = 200 * time.Millisecond
ddl.WaitTimeWhenErrorOccured = 1 * time.Microsecond
var err error
s.store, err = mockstore.NewMockTikvStore()
c.Assert(err, IsNil)
session.SetSchemaLease(s.lease)
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
s.se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
s.p = parser.New()
}

func (s *testFailDBSuite) TearDownSuite(c *C) {
s.se.Execute(context.Background(), "drop database if exists test_db_state")
s.se.Close()
s.dom.Close()
s.store.Close()
testleak.AfterTest(c)()
}

// TestHalfwayCancelOperations tests the case that the schema is correct after the execution of operations are cancelled halfway.
func (s *testFailDBSuite) TestHalfwayCancelOperations(c *C) {
gofail.Enable("github.com/pingcap/tidb/ddl/truncateTableErr", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/ddl/truncateTableErr")

// test for truncating table
_, err := s.se.Execute(context.Background(), "create database cancel_job_db")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "use cancel_job_db")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "create table t(a int)")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "insert into t values(1)")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "truncate table t")
c.Assert(err, NotNil)
// Make sure that the table's data has not been deleted.
rs, err := s.se.Execute(context.Background(), "select count(*) from t")
c.Assert(err, IsNil)
chk := rs[0].NewChunk()
err = rs[0].Next(context.Background(), chk)
c.Assert(err, IsNil)
c.Assert(chk.NumRows() == 0, IsFalse)
row := chk.GetRow(0)
c.Assert(row.Len(), Equals, 1)
c.Assert(row.GetInt64(0), DeepEquals, int64(1))
c.Assert(rs[0].Close(), IsNil)
// Reload schema.
s.dom.ResetHandle(s.store)
err = s.dom.DDL().(ddl.DDLForTest).GetHook().OnChanged(nil)
c.Assert(err, IsNil)
s.se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "use cancel_job_db")
c.Assert(err, IsNil)
// Test schema is correct.
_, err = s.se.Execute(context.Background(), "select * from t")
c.Assert(err, IsNil)

// test for renaming table
gofail.Enable("github.com/pingcap/tidb/ddl/errRenameTable", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/ddl/errRenameTable")
_, err = s.se.Execute(context.Background(), "create table tx(a int)")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "insert into tx values(1)")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "rename table tx to ty")
c.Assert(err, NotNil)
// Make sure that the table's data has not been deleted.
rs, err = s.se.Execute(context.Background(), "select count(*) from tx")
c.Assert(err, IsNil)
chk = rs[0].NewChunk()
err = rs[0].Next(context.Background(), chk)
c.Assert(err, IsNil)
c.Assert(chk.NumRows() == 0, IsFalse)
row = chk.GetRow(0)
c.Assert(row.Len(), Equals, 1)
c.Assert(row.GetInt64(0), DeepEquals, int64(1))
c.Assert(rs[0].Close(), IsNil)
// Reload schema.
s.dom.ResetHandle(s.store)
err = s.dom.DDL().(ddl.DDLForTest).GetHook().OnChanged(nil)
c.Assert(err, IsNil)
s.se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "use cancel_job_db")
c.Assert(err, IsNil)
// Test schema is correct.
_, err = s.se.Execute(context.Background(), "select * from tx")
c.Assert(err, IsNil)

// clean up
_, err = s.se.Execute(context.Background(), "drop database cancel_job_db")
c.Assert(err, IsNil)
}

// TestInitializeOffsetAndState tests the case that the column's offset and state don't be initialized in the file of ddl_api.go when
// doing the operation of 'modify column'.
func (s *testStateChangeSuite) TestInitializeOffsetAndState(c *C) {
Expand Down
10 changes: 10 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// gofail: var truncateTableErr bool
// if truncateTableErr {
// job.State = model.JobStateCancelled
// return ver, errors.New("occur an error after dropping table.")
// }

var oldPartitionIDs []int64
if tblInfo.GetPartitionInfo() != nil {
Expand Down Expand Up @@ -334,6 +339,11 @@ func onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// gofail: var renameTableErr bool
// if renameTableErr {
// job.State = model.JobStateCancelled
// return ver, errors.New("occur an error after renaming table.")
// }
tblInfo.Name = tableName
err = t.CreateTable(newSchemaID, tblInfo)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,11 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
}
}

// ResetHandle resets the domain's infoschema handle. It is used for testing.
func (do *Domain) ResetHandle(store kv.Storage) {
do.infoHandle = infoschema.NewHandle(store)
}

// Init initializes a domain.
func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.Resource, error)) error {
if ebd, ok := do.store.(EtcdBackend); ok {
Expand Down

0 comments on commit dcc5fc7

Please sign in to comment.