From dcc5fc7b55d2b097f330ac54b464c5d823d0d4ae Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 31 Oct 2018 15:42:41 +0800 Subject: [PATCH] ddl, domain: make schema correct after canceling jobs (#7997) (#8057) --- ddl/ddl_test.go | 10 ++++ ddl/ddl_worker.go | 25 +++++----- ddl/fail_db_test.go | 116 ++++++++++++++++++++++++++++++++++++++++++++ ddl/table.go | 10 ++++ domain/domain.go | 5 ++ 5 files changed, 155 insertions(+), 11 deletions(-) diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index a353102726404..3968a7d04ffc2 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -39,6 +39,8 @@ import ( type DDLForTest interface { // SetHook sets the hook. SetHook(h Callback) + // GetHook gets the hook. + GetHook() Callback // SetInterceptoror sets the interceptor. SetInterceptoror(h Interceptor) } @@ -51,6 +53,14 @@ func (d *ddl) SetHook(h Callback) { d.mu.hook = h } +// GetHook implements DDL.GetHook interface. +func (d *ddl) GetHook() Callback { + d.mu.Lock() + defer d.mu.Unlock() + + return d.mu.hook +} + // SetInterceptoror implements DDL.SetInterceptoror interface. func (d *ddl) SetInterceptoror(i Interceptor) { d.mu.Lock() diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 4237a2f56c42b..128737f065d74 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -276,18 +276,20 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) }() - switch job.Type { - case model.ActionAddIndex: - if job.State != model.JobStateRollbackDone { - break + if !job.IsCancelled() { + switch job.Type { + case model.ActionAddIndex: + if job.State != model.JobStateRollbackDone { + break + } + // After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data. + err = w.deleteRange(job) + case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropTablePartition: + err = w.deleteRange(job) + } + if err != nil { + return errors.Trace(err) } - // After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data. - err = w.deleteRange(job) - case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropTablePartition: - err = w.deleteRange(job) - } - if err != nil { - return errors.Trace(err) } _, err = t.DeQueueDDLJob() @@ -380,6 +382,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { // and retry later if the job is not cancelled. schemaVer, runJobErr = w.runDDLJob(d, t, job) if job.IsCancelled() { + txn.Reset() err = w.finishDDLJob(t, job) return errors.Trace(err) } diff --git a/ddl/fail_db_test.go b/ddl/fail_db_test.go index 086f13af894eb..724d59bdcf621 100644 --- a/ddl/fail_db_test.go +++ b/ddl/fail_db_test.go @@ -15,15 +15,131 @@ package ddl_test import ( "fmt" + "time" gofail "github.com/etcd-io/gofail/runtime" . "github.com/pingcap/check" + "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/model" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util/testkit" + "github.com/pingcap/tidb/util/testleak" "golang.org/x/net/context" ) +var _ = Suite(&testFailDBSuite{}) + +type testFailDBSuite struct { + lease time.Duration + store kv.Storage + dom *domain.Domain + se session.Session + p *parser.Parser +} + +func (s *testFailDBSuite) SetUpSuite(c *C) { + testleak.BeforeTest() + s.lease = 200 * time.Millisecond + ddl.WaitTimeWhenErrorOccured = 1 * time.Microsecond + var err error + s.store, err = mockstore.NewMockTikvStore() + c.Assert(err, IsNil) + session.SetSchemaLease(s.lease) + s.dom, err = session.BootstrapSession(s.store) + c.Assert(err, IsNil) + s.se, err = session.CreateSession4Test(s.store) + c.Assert(err, IsNil) + s.p = parser.New() +} + +func (s *testFailDBSuite) TearDownSuite(c *C) { + s.se.Execute(context.Background(), "drop database if exists test_db_state") + s.se.Close() + s.dom.Close() + s.store.Close() + testleak.AfterTest(c)() +} + +// TestHalfwayCancelOperations tests the case that the schema is correct after the execution of operations are cancelled halfway. +func (s *testFailDBSuite) TestHalfwayCancelOperations(c *C) { + gofail.Enable("github.com/pingcap/tidb/ddl/truncateTableErr", `return(true)`) + defer gofail.Disable("github.com/pingcap/tidb/ddl/truncateTableErr") + + // test for truncating table + _, err := s.se.Execute(context.Background(), "create database cancel_job_db") + c.Assert(err, IsNil) + _, err = s.se.Execute(context.Background(), "use cancel_job_db") + c.Assert(err, IsNil) + _, err = s.se.Execute(context.Background(), "create table t(a int)") + c.Assert(err, IsNil) + _, err = s.se.Execute(context.Background(), "insert into t values(1)") + c.Assert(err, IsNil) + _, err = s.se.Execute(context.Background(), "truncate table t") + c.Assert(err, NotNil) + // Make sure that the table's data has not been deleted. + rs, err := s.se.Execute(context.Background(), "select count(*) from t") + c.Assert(err, IsNil) + chk := rs[0].NewChunk() + err = rs[0].Next(context.Background(), chk) + c.Assert(err, IsNil) + c.Assert(chk.NumRows() == 0, IsFalse) + row := chk.GetRow(0) + c.Assert(row.Len(), Equals, 1) + c.Assert(row.GetInt64(0), DeepEquals, int64(1)) + c.Assert(rs[0].Close(), IsNil) + // Reload schema. + s.dom.ResetHandle(s.store) + err = s.dom.DDL().(ddl.DDLForTest).GetHook().OnChanged(nil) + c.Assert(err, IsNil) + s.se, err = session.CreateSession4Test(s.store) + c.Assert(err, IsNil) + _, err = s.se.Execute(context.Background(), "use cancel_job_db") + c.Assert(err, IsNil) + // Test schema is correct. + _, err = s.se.Execute(context.Background(), "select * from t") + c.Assert(err, IsNil) + + // test for renaming table + gofail.Enable("github.com/pingcap/tidb/ddl/errRenameTable", `return(true)`) + defer gofail.Disable("github.com/pingcap/tidb/ddl/errRenameTable") + _, err = s.se.Execute(context.Background(), "create table tx(a int)") + c.Assert(err, IsNil) + _, err = s.se.Execute(context.Background(), "insert into tx values(1)") + c.Assert(err, IsNil) + _, err = s.se.Execute(context.Background(), "rename table tx to ty") + c.Assert(err, NotNil) + // Make sure that the table's data has not been deleted. + rs, err = s.se.Execute(context.Background(), "select count(*) from tx") + c.Assert(err, IsNil) + chk = rs[0].NewChunk() + err = rs[0].Next(context.Background(), chk) + c.Assert(err, IsNil) + c.Assert(chk.NumRows() == 0, IsFalse) + row = chk.GetRow(0) + c.Assert(row.Len(), Equals, 1) + c.Assert(row.GetInt64(0), DeepEquals, int64(1)) + c.Assert(rs[0].Close(), IsNil) + // Reload schema. + s.dom.ResetHandle(s.store) + err = s.dom.DDL().(ddl.DDLForTest).GetHook().OnChanged(nil) + c.Assert(err, IsNil) + s.se, err = session.CreateSession4Test(s.store) + c.Assert(err, IsNil) + _, err = s.se.Execute(context.Background(), "use cancel_job_db") + c.Assert(err, IsNil) + // Test schema is correct. + _, err = s.se.Execute(context.Background(), "select * from tx") + c.Assert(err, IsNil) + + // clean up + _, err = s.se.Execute(context.Background(), "drop database cancel_job_db") + c.Assert(err, IsNil) +} + // TestInitializeOffsetAndState tests the case that the column's offset and state don't be initialized in the file of ddl_api.go when // doing the operation of 'modify column'. func (s *testStateChangeSuite) TestInitializeOffsetAndState(c *C) { diff --git a/ddl/table.go b/ddl/table.go index 0bba885b07c53..cf7643f1a5ff4 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -208,6 +208,11 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro job.State = model.JobStateCancelled return ver, errors.Trace(err) } + // gofail: var truncateTableErr bool + // if truncateTableErr { + // job.State = model.JobStateCancelled + // return ver, errors.New("occur an error after dropping table.") + // } var oldPartitionIDs []int64 if tblInfo.GetPartitionInfo() != nil { @@ -334,6 +339,11 @@ func onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) { job.State = model.JobStateCancelled return ver, errors.Trace(err) } + // gofail: var renameTableErr bool + // if renameTableErr { + // job.State = model.JobStateCancelled + // return ver, errors.New("occur an error after renaming table.") + // } tblInfo.Name = tableName err = t.CreateTable(newSchemaID, tblInfo) if err != nil { diff --git a/domain/domain.go b/domain/domain.go index 8061751fc852b..280112c45a76f 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -533,6 +533,11 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio } } +// ResetHandle resets the domain's infoschema handle. It is used for testing. +func (do *Domain) ResetHandle(store kv.Storage) { + do.infoHandle = infoschema.NewHandle(store) +} + // Init initializes a domain. func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.Resource, error)) error { if ebd, ok := do.store.(EtcdBackend); ok {