Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikv: check lock timeout again after resolving lock (#14066) #14083

Merged
merged 2 commits into from
Dec 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 17 additions & 22 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,35 +815,30 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
if err1 != nil {
return errors.Trace(err1)
}
// Check lock conflict error for nowait, if nowait set and key locked by others,
// report error immediately and do no more resolve locks.
// if the lock left behind whose related txn is already committed or rollbacked,
// (eg secondary locks not committed or rollbacked yet)
// we cant return "nowait conflict" directly
if lock.LockType == pb.Op_PessimisticLock {
if action.lockWaitTime == kv.LockNoWait {
// 3.0 release not supported yet
return kv.ErrNotImplemented
} else if action.lockWaitTime == kv.LockAlwaysWait {
// do nothing but keep wait
} else {
// the lockWaitTime is set, check the lock wait timeout or not
// the pessimistic lock found could be invalid locks which is timeout but not recycled yet
if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) {
if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime {
return ErrLockWaitTimeout
}
}
}
}
locks = append(locks, lock)
}
// Because we already waited on tikv, no need to Backoff here.
_, err = c.store.lockResolver.ResolveLocks(bo, locks)
msBeforeTxnExpired, err := c.store.lockResolver.ResolveLocks(bo, locks)
if err != nil {
return errors.Trace(err)
}

// If msBeforeTxnExpired is not zero, it means there are still locks blocking us acquiring
// the pessimistic lock. We should return timeout error if necessary.
if msBeforeTxnExpired > 0 {
if action.lockWaitTime == kv.LockNoWait {
// 3.0 release not supported yet
return kv.ErrNotImplemented
} else if action.lockWaitTime == kv.LockAlwaysWait {
// do nothing but keep wait
} else {
// the lockWaitTime is set, we should return wait timeout if we are still blocked by a lock
if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime {
return ErrLockWaitTimeout
}
}
}

// Handle the killed flag when waiting for the pessimistic lock.
// When a txn runs into LockKeys() and backoff here, it has no chance to call
// executor.Next() and check the killed flag.
Expand Down
34 changes: 34 additions & 0 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,40 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) {
c.Assert(lockInfo.LockTtl-PessimisticLockTTL, Less, uint64(150))
}

// TestAcquireFalseTimeoutLock tests acquiring a key which is a secondary key of another transaction.
// The lock's own TTL is expired but the primary key is still alive due to heartbeats.
func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) {
// k1 is the primary lock of txn1
k1 := kv.Key("k1")
// k2 is a secondary lock of txn1 and a key txn2 wants to lock
k2 := kv.Key("k2")

txn1 := s.begin(c)
txn1.SetOption(kv.Pessimistic, true)
// lock the primary key
err := txn1.LockKeys(context.Background(), nil, txn1.startTS, kv.LockAlwaysWait, time.Now(), k1)
c.Assert(err, IsNil)
// lock the secondary key
err = txn1.LockKeys(context.Background(), nil, txn1.startTS, kv.LockAlwaysWait, time.Now(), k2)
c.Assert(err, IsNil)

// Heartbeats will increase the TTL of the primary key

// wait until secondary key exceeds its own TTL
time.Sleep(time.Duration(PessimisticLockTTL) * time.Millisecond)
txn2 := s.begin(c)
txn2.SetOption(kv.Pessimistic, true)

// test for wait limited time (300ms)
startTime := time.Now()
err = txn2.LockKeys(context.Background(), nil, txn1.startTS, 300, time.Now(), k2)
elapsed := time.Now().Sub(startTime)
// cannot acquire lock in time thus error
c.Assert(err.Error(), Equals, ErrLockWaitTimeout.Error())
// it should return after about 300ms
c.Assert(elapsed, Less, 350*time.Millisecond)
}

func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo {
txn := s.begin(c)
err := txn.Set(key, key)
Expand Down