From 2ed7510ab123cd6031e5153733b74bc8524e0140 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Tue, 17 Dec 2019 11:14:25 +0800 Subject: [PATCH] tikv: check lock timeout again after resolving lock (#14066) (#14083) --- store/tikv/2pc.go | 39 +++++++++++++++++---------------------- store/tikv/2pc_test.go | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 22 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 0f07bb6863027..fff7fbed84c43 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -815,35 +815,30 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * if err1 != nil { return errors.Trace(err1) } - // Check lock conflict error for nowait, if nowait set and key locked by others, - // report error immediately and do no more resolve locks. - // if the lock left behind whose related txn is already committed or rollbacked, - // (eg secondary locks not committed or rollbacked yet) - // we cant return "nowait conflict" directly - if lock.LockType == pb.Op_PessimisticLock { - if action.lockWaitTime == kv.LockNoWait { - // 3.0 release not supported yet - return kv.ErrNotImplemented - } else if action.lockWaitTime == kv.LockAlwaysWait { - // do nothing but keep wait - } else { - // the lockWaitTime is set, check the lock wait timeout or not - // the pessimistic lock found could be invalid locks which is timeout but not recycled yet - if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) { - if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime { - return ErrLockWaitTimeout - } - } - } - } locks = append(locks, lock) } // Because we already waited on tikv, no need to Backoff here. - _, err = c.store.lockResolver.ResolveLocks(bo, locks) + msBeforeTxnExpired, err := c.store.lockResolver.ResolveLocks(bo, locks) if err != nil { return errors.Trace(err) } + // If msBeforeTxnExpired is not zero, it means there are still locks blocking us acquiring + // the pessimistic lock. We should return timeout error if necessary. + if msBeforeTxnExpired > 0 { + if action.lockWaitTime == kv.LockNoWait { + // 3.0 release not supported yet + return kv.ErrNotImplemented + } else if action.lockWaitTime == kv.LockAlwaysWait { + // do nothing but keep wait + } else { + // the lockWaitTime is set, we should return wait timeout if we are still blocked by a lock + if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime { + return ErrLockWaitTimeout + } + } + } + // Handle the killed flag when waiting for the pessimistic lock. // When a txn runs into LockKeys() and backoff here, it has no chance to call // executor.Next() and check the killed flag. diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 065a00e0a1e55..52989752881b9 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -585,6 +585,40 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) { c.Assert(lockInfo.LockTtl-PessimisticLockTTL, Less, uint64(150)) } +// TestAcquireFalseTimeoutLock tests acquiring a key which is a secondary key of another transaction. +// The lock's own TTL is expired but the primary key is still alive due to heartbeats. +func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) { + // k1 is the primary lock of txn1 + k1 := kv.Key("k1") + // k2 is a secondary lock of txn1 and a key txn2 wants to lock + k2 := kv.Key("k2") + + txn1 := s.begin(c) + txn1.SetOption(kv.Pessimistic, true) + // lock the primary key + err := txn1.LockKeys(context.Background(), nil, txn1.startTS, kv.LockAlwaysWait, time.Now(), k1) + c.Assert(err, IsNil) + // lock the secondary key + err = txn1.LockKeys(context.Background(), nil, txn1.startTS, kv.LockAlwaysWait, time.Now(), k2) + c.Assert(err, IsNil) + + // Heartbeats will increase the TTL of the primary key + + // wait until secondary key exceeds its own TTL + time.Sleep(time.Duration(PessimisticLockTTL) * time.Millisecond) + txn2 := s.begin(c) + txn2.SetOption(kv.Pessimistic, true) + + // test for wait limited time (300ms) + startTime := time.Now() + err = txn2.LockKeys(context.Background(), nil, txn1.startTS, 300, time.Now(), k2) + elapsed := time.Now().Sub(startTime) + // cannot acquire lock in time thus error + c.Assert(err.Error(), Equals, ErrLockWaitTimeout.Error()) + // it should return after about 300ms + c.Assert(elapsed, Less, 350*time.Millisecond) +} + func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo { txn := s.begin(c) err := txn.Set(key, key)