Skip to content

Commit

Permalink
tikv: check lock timeout again after resolving lock (#14066) (#14083)
Browse files Browse the repository at this point in the history
  • Loading branch information
sticnarf authored and sre-bot committed Dec 17, 2019
1 parent 5d5b071 commit 2ed7510
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 22 deletions.
39 changes: 17 additions & 22 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,35 +815,30 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
if err1 != nil {
return errors.Trace(err1)
}
// Check lock conflict error for nowait, if nowait set and key locked by others,
// report error immediately and do no more resolve locks.
// if the lock left behind whose related txn is already committed or rollbacked,
// (eg secondary locks not committed or rollbacked yet)
// we cant return "nowait conflict" directly
if lock.LockType == pb.Op_PessimisticLock {
if action.lockWaitTime == kv.LockNoWait {
// 3.0 release not supported yet
return kv.ErrNotImplemented
} else if action.lockWaitTime == kv.LockAlwaysWait {
// do nothing but keep wait
} else {
// the lockWaitTime is set, check the lock wait timeout or not
// the pessimistic lock found could be invalid locks which is timeout but not recycled yet
if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) {
if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime {
return ErrLockWaitTimeout
}
}
}
}
locks = append(locks, lock)
}
// Because we already waited on tikv, no need to Backoff here.
_, err = c.store.lockResolver.ResolveLocks(bo, locks)
msBeforeTxnExpired, err := c.store.lockResolver.ResolveLocks(bo, locks)
if err != nil {
return errors.Trace(err)
}

// If msBeforeTxnExpired is not zero, it means there are still locks blocking us acquiring
// the pessimistic lock. We should return timeout error if necessary.
if msBeforeTxnExpired > 0 {
if action.lockWaitTime == kv.LockNoWait {
// 3.0 release not supported yet
return kv.ErrNotImplemented
} else if action.lockWaitTime == kv.LockAlwaysWait {
// do nothing but keep wait
} else {
// the lockWaitTime is set, we should return wait timeout if we are still blocked by a lock
if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime {
return ErrLockWaitTimeout
}
}
}

// Handle the killed flag when waiting for the pessimistic lock.
// When a txn runs into LockKeys() and backoff here, it has no chance to call
// executor.Next() and check the killed flag.
Expand Down
34 changes: 34 additions & 0 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,40 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) {
c.Assert(lockInfo.LockTtl-PessimisticLockTTL, Less, uint64(150))
}

// TestAcquireFalseTimeoutLock tests acquiring a key which is a secondary key of another transaction.
// The lock's own TTL is expired but the primary key is still alive due to heartbeats.
func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) {
// k1 is the primary lock of txn1
k1 := kv.Key("k1")
// k2 is a secondary lock of txn1 and a key txn2 wants to lock
k2 := kv.Key("k2")

txn1 := s.begin(c)
txn1.SetOption(kv.Pessimistic, true)
// lock the primary key
err := txn1.LockKeys(context.Background(), nil, txn1.startTS, kv.LockAlwaysWait, time.Now(), k1)
c.Assert(err, IsNil)
// lock the secondary key
err = txn1.LockKeys(context.Background(), nil, txn1.startTS, kv.LockAlwaysWait, time.Now(), k2)
c.Assert(err, IsNil)

// Heartbeats will increase the TTL of the primary key

// wait until secondary key exceeds its own TTL
time.Sleep(time.Duration(PessimisticLockTTL) * time.Millisecond)
txn2 := s.begin(c)
txn2.SetOption(kv.Pessimistic, true)

// test for wait limited time (300ms)
startTime := time.Now()
err = txn2.LockKeys(context.Background(), nil, txn1.startTS, 300, time.Now(), k2)
elapsed := time.Now().Sub(startTime)
// cannot acquire lock in time thus error
c.Assert(err.Error(), Equals, ErrLockWaitTimeout.Error())
// it should return after about 300ms
c.Assert(elapsed, Less, 350*time.Millisecond)
}

func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo {
txn := s.begin(c)
err := txn.Set(key, key)
Expand Down

0 comments on commit 2ed7510

Please sign in to comment.