From 85059c0cb2a7b8decec23b5f2b4cda10f314a6bc Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Fri, 6 Dec 2019 15:17:49 +0800 Subject: [PATCH] executor: change pessimistic lock wait start for one statement --- ddl/index.go | 2 +- executor/adapter.go | 3 ++- executor/admin.go | 3 ++- executor/executor.go | 3 ++- kv/kv.go | 4 +++- kv/mock.go | 3 ++- kv/mock_test.go | 3 ++- session/pessimistic_test.go | 41 +++++++++++++++++++++++++++++++++++ sessionctx/stmtctx/stmtctx.go | 12 +++++++++- store/tikv/2pc.go | 18 ++++++++++----- store/tikv/2pc_test.go | 12 +++++----- store/tikv/ticlient_test.go | 2 +- store/tikv/txn.go | 5 +++-- 13 files changed, 88 insertions(+), 23 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 390d0e5522eff..b9b5f28b8f574 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -969,7 +969,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx // Lock the row key to notify us that someone delete or update the row, // then we should not backfill the index of it, otherwise the adding index is redundant. - err := txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, idxRecord.key) + err := txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, time.Now(), idxRecord.key) if err != nil { return errors.Trace(err) } diff --git a/executor/adapter.go b/executor/adapter.go index aac68603173ff..ef042753dd17b 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -460,7 +460,8 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { return nil } forUpdateTS := txnCtx.GetForUpdateTS() - err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, sctx.GetSessionVars().LockWaitTimeout, keys...) + err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, sctx.GetSessionVars().LockWaitTimeout, + sctx.GetSessionVars().StmtCtx.GetLockWaitStartTime(), keys...) if err == nil { return nil } diff --git a/executor/admin.go b/executor/admin.go index e0528620ad6dc..585fcfdd94709 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -16,6 +16,7 @@ package executor import ( "context" "math" + "time" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" @@ -431,7 +432,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa } recordKey := e.table.RecordKey(row.handle) - err := txn.LockKeys(ctx, nil, 0, kv.LockAlwaysWait, recordKey) + err := txn.LockKeys(ctx, nil, 0, kv.LockAlwaysWait, time.Now(), recordKey) if err != nil { return result, err } diff --git a/executor/executor.go b/executor/executor.go index 13b9a3769fbbc..a540dd8fe2f66 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -797,7 +797,8 @@ func doLockKeys(ctx context.Context, se sessionctx.Context, lockWaitTime int64, return err } forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS() - return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, lockWaitTime, keys...) + return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, lockWaitTime, + se.GetSessionVars().StmtCtx.GetLockWaitStartTime(), keys...) } // LimitExec represents limit executor diff --git a/kv/kv.go b/kv/kv.go index 9ca01915e806d..c76cc2f00a216 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -15,6 +15,7 @@ package kv import ( "context" + "time" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/store/tikv/oracle" @@ -137,7 +138,8 @@ type Transaction interface { // String implements fmt.Stringer interface. String() string // LockKeys tries to lock the entries with the keys in KV store. - LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, lockWaitTime int64, keys ...Key) error + LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, + lockWaitTime int64, waitStartTime time.Time, keys ...Key) error // SetOption sets an option with a value, when val is nil, uses the default // value of this option. SetOption(opt Option, val interface{}) diff --git a/kv/mock.go b/kv/mock.go index 352e3bbd29866..327b683a50dc2 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -15,6 +15,7 @@ package kv import ( "context" + "time" "github.com/pingcap/tidb/store/tikv/oracle" ) @@ -39,7 +40,7 @@ func (t *mockTxn) String() string { return "" } -func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ int64, _ ...Key) error { +func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ int64, _ time.Time, _ ...Key) error { return nil } diff --git a/kv/mock_test.go b/kv/mock_test.go index b6fd8b192638f..e9a1f16c05207 100644 --- a/kv/mock_test.go +++ b/kv/mock_test.go @@ -15,6 +15,7 @@ package kv import ( "context" + "time" . "github.com/pingcap/check" ) @@ -37,7 +38,7 @@ func (s testMockSuite) TestInterface(c *C) { transaction, err := storage.Begin() c.Check(err, IsNil) - err = transaction.LockKeys(context.Background(), nil, 0, LockAlwaysWait, Key("lock")) + err = transaction.LockKeys(context.Background(), nil, 0, LockAlwaysWait, time.Now(), Key("lock")) c.Check(err, IsNil) transaction.SetOption(Option(23), struct{}{}) if mock, ok := transaction.(*mockTxn); ok { diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 386469140b097..54cd89f8aa37a 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -21,6 +21,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/domain" @@ -562,3 +563,43 @@ func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) { // clean tk.MustExec("drop table if exists tk") } + +func (s *testPessimisticSuite) TestInnodbLockWaitTimeoutWaitStart(c *C) { + // prepare work + tk := testkit.NewTestKitWithInit(c, s.store) + defer tk.MustExec("drop table if exists tk") + tk.MustExec("drop table if exists tk") + tk.MustExec("create table tk (c1 int primary key, c2 int)") + tk.MustExec("insert into tk values(1,1),(2,2),(3,3),(4,4),(5,5)") + tk.MustExec("set global innodb_lock_wait_timeout = 1") + + // raise pessimistic transaction in tk2 and trigger failpoint returning ErrWriteConflict + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk3 := testkit.NewTestKitWithInit(c, s.store) + tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 1")) + + // tk3 gets the pessimistic lock + tk3.MustExec("begin pessimistic") + tk3.MustQuery("select * from tk where c1 = 1 for update") + + tk2.MustExec("begin pessimistic") + done := make(chan error) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/PessimisticLockErrWriteConflict", "return"), IsNil) + start := time.Now() + go func() { + var err error + defer func() { + done <- err + }() + _, err = tk2.Exec("select * from tk where c1 = 1 for update") + }() + time.Sleep(time.Millisecond * 30) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/PessimisticLockErrWriteConflict"), IsNil) + waitErr := <-done + tk3.MustExec("commit") + tk2.MustExec("rollback") + c.Assert(waitErr, NotNil) + c.Check(waitErr.Error(), Equals, tikv.ErrLockWaitTimeout.Error()) + c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond)) + c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) +} diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 2812ffea60bd7..e2dd000c1f960 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -133,7 +133,8 @@ type StatementContext struct { normalized string digest string } - Tables []TableEntry + Tables []TableEntry + lockWaitStartTime *time.Time // LockWaitStartTime stores the pessimistic lock wait start time } // GetNowTsCached getter for nowTs, if not set get now time and cache it @@ -494,6 +495,15 @@ func (sc *StatementContext) CopTasksDetails() *CopTasksDetails { return d } +// GetLockWaitStartTime returns the statement pessimistic lock wait start time +func (sc *StatementContext) GetLockWaitStartTime() time.Time { + if sc.lockWaitStartTime == nil { + curTime := time.Now() + sc.lockWaitStartTime = &curTime + } + return *sc.lockWaitStartTime +} + //CopTasksDetails collects some useful information of cop-tasks during execution. type CopTasksDetails struct { NumCopTasks int diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index aafca4ae34397..0f07bb6863027 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -50,8 +50,9 @@ type actionPrewrite struct{} type actionCommit struct{} type actionCleanup struct{} type actionPessimisticLock struct { - killed *uint32 - lockWaitTime int64 + killed *uint32 + lockWaitTime int64 + waitStartTime time.Time } type actionPessimisticRollback struct{} @@ -755,7 +756,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * SyncLog: c.syncLog, }, } - lockWaitStartTime := time.Now() + lockWaitStartTime := action.waitStartTime for { // if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit if action.lockWaitTime > 0 { @@ -766,6 +767,10 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * req.PessimisticLock.WaitTimeout = timeLeft } } + failpoint.Inject("PessimisticLockErrWriteConflict", func() error { + time.Sleep(300 * time.Millisecond) + return kv.ErrWriteConflict + }) resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort) if err != nil { return errors.Trace(err) @@ -779,7 +784,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * if err != nil { return errors.Trace(err) } - err = c.pessimisticLockKeys(bo, action.killed, action.lockWaitTime, batch.keys) + err = c.pessimisticLockKeys(bo, action.killed, action.lockWaitTime, lockWaitStartTime, batch.keys) return errors.Trace(err) } lockResp := resp.PessimisticLock @@ -1068,8 +1073,9 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error { return c.doActionOnKeys(bo, actionCleanup{}, keys) } -func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, killed *uint32, lockWaitTime int64, keys [][]byte) error { - return c.doActionOnKeys(bo, actionPessimisticLock{killed, lockWaitTime}, keys) +func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, killed *uint32, lockWaitTime int64, + waitStartTime time.Time, keys [][]byte) error { + return c.doActionOnKeys(bo, actionPessimisticLock{killed, lockWaitTime, waitStartTime}, keys) } func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte) error { diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index c628a89639f58..065a00e0a1e55 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -512,7 +512,7 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) { _, _ = txn.us.Get(key) c.Assert(txn.Set(key, key), IsNil) txn.DelOption(kv.PresumeKeyNotExists) - err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key) + err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, time.Now(), key) c.Assert(err, NotNil) c.Assert(txn.Delete(key), IsNil) key2 := kv.Key("key2") @@ -524,9 +524,9 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) { func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) { txn := s.begin(c) txn.SetOption(kv.Pessimistic, true) - err := txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, kv.Key("abc"), kv.Key("def")) + err := txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, time.Now(), kv.Key("abc"), kv.Key("def")) c.Assert(err, IsNil) - err = txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, kv.Key("abc"), kv.Key("def")) + err = txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, time.Now(), kv.Key("abc"), kv.Key("def")) c.Assert(err, IsNil) c.Assert(txn.lockKeys, HasLen, 2) } @@ -536,11 +536,11 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) { txn := s.begin(c) txn.SetOption(kv.Pessimistic, true) time.Sleep(time.Millisecond * 100) - err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key) + err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, time.Now(), key) c.Assert(err, IsNil) time.Sleep(time.Millisecond * 100) key2 := kv.Key("key2") - err = txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key2) + err = txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, time.Now(), key2) c.Assert(err, IsNil) lockInfo := s.getLockInfo(c, key) msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl) @@ -578,7 +578,7 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) { txn.SetOption(kv.Pessimistic, true) time.Sleep(time.Millisecond * 100) forUpdateTS := oracle.ComposeTS(oracle.ExtractPhysical(txn.startTS)+100, 1) - err := txn.LockKeys(context.Background(), nil, forUpdateTS, kv.LockAlwaysWait, key) + err := txn.LockKeys(context.Background(), nil, forUpdateTS, kv.LockAlwaysWait, time.Now(), key) c.Assert(err, IsNil) lockInfo := s.getLockInfo(c, key) c.Assert(lockInfo.LockTtl-PessimisticLockTTL, GreaterEqual, uint64(100)) diff --git a/store/tikv/ticlient_test.go b/store/tikv/ticlient_test.go index a6869cec71004..bb37e2b98e9c5 100644 --- a/store/tikv/ticlient_test.go +++ b/store/tikv/ticlient_test.go @@ -119,7 +119,7 @@ func (s *testTiclientSuite) TestSingleKey(c *C) { txn := s.beginTxn(c) err := txn.Set(encodeKey(s.prefix, "key"), []byte("value")) c.Assert(err, IsNil) - err = txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, encodeKey(s.prefix, "key")) + err = txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, time.Now(), encodeKey(s.prefix, "key")) c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 13eb05f265a6b..7b2b41f95f8ea 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -353,8 +353,9 @@ func (txn *tikvTxn) rollbackPessimisticLocks() error { return txn.committer.pessimisticRollbackKeys(NewBackoffer(context.Background(), cleanupMaxBackoff), txn.lockKeys) } +// LockKeys input param lockWaitTime in ms, except that kv.LockAlwaysWait(0) means always wait lock, kv.LockNowait(-1) means nowait lock func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, - lockWaitTime int64, keysInput ...kv.Key) error { + lockWaitTime int64, waitStartTime time.Time, keysInput ...kv.Key) error { // Exclude keys that are already locked. keys := make([][]byte, 0, len(keysInput)) txn.mu.Lock() @@ -393,7 +394,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS ui // If the number of keys greater than 1, it can be on different region, // concurrently execute on multiple regions may lead to deadlock. txn.committer.isFirstLock = len(txn.lockKeys) == 0 && len(keys) == 1 - err := txn.committer.pessimisticLockKeys(bo, killed, lockWaitTime, keys) + err := txn.committer.pessimisticLockKeys(bo, killed, lockWaitTime, waitStartTime, keys) if killed != nil { // If the kill signal is received during waiting for pessimisticLock, // pessimisticLockKeys would handle the error but it doesn't reset the flag.