Skip to content

Commit

Permalink
Fix the issue that primary pessimistic lock may be left not cleared a…
Browse files Browse the repository at this point in the history
…fter GC (#866)

* Fix the issue that primary pessimistic lock may be left not cleared after GC

Signed-off-by: MyonKeminta <[email protected]>

* Fix mysteriously shown up thing that makes compilation failed

Signed-off-by: MyonKeminta <[email protected]>

* Fix test effectiveness (forgot to set txn2 to pessimistic txn); add more strict checks

Signed-off-by: MyonKeminta <[email protected]>

* Address comments

Signed-off-by: MyonKeminta <[email protected]>

---------

Signed-off-by: MyonKeminta <[email protected]>
Co-authored-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta and MyonKeminta committed Jul 4, 2023
1 parent fbec023 commit c0cf773
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 23 deletions.
61 changes: 45 additions & 16 deletions integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,52 +1472,81 @@ func (s *testLockWithTiKVSuite) TestBatchResolveLocks() {

s.NoError(failpoint.Enable("tikvclient/beforeAsyncPessimisticRollback", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/beforeCommitSecondaries", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return`))
s.NoError(failpoint.Enable("tikvclient/onRollback", `return("skipRollbackPessimisticLock")`))
defer func() {
s.NoError(failpoint.Disable("tikvclient/beforeAsyncPessimisticRollback"))
s.NoError(failpoint.Disable("tikvclient/beforeCommitSecondaries"))
s.NoError(failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit"))
s.NoError(failpoint.Disable("tikvclient/onRollback"))
}()

k1, k2, k3 := []byte("k1"), []byte("k2"), []byte("k3")
k1, k2, k3, k4 := []byte("k1"), []byte("k2"), []byte("k3"), []byte("k4")
v2, v3 := []byte("v2"), []byte("v3")

ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))

txn, err := s.store.Begin()
txn1, err := s.store.Begin()
s.NoError(err)
txn.SetPessimistic(true)
txn1.SetPessimistic(true)

{
// Produce write conflict on key k2
txn2, err := s.store.Begin()
helperTxn, err := s.store.Begin()
s.NoError(err)
s.NoError(txn2.Set(k2, []byte("v0")))
s.NoError(txn2.Commit(ctx))
s.NoError(helperTxn.Set(k2, []byte("v0")))
s.NoError(helperTxn.Commit(ctx))
}

lockCtx := kv.NewLockCtx(txn.StartTS(), 200, time.Now())
err = txn.LockKeys(ctx, lockCtx, k1, k2)
lockCtx := kv.NewLockCtx(txn1.StartTS(), 200, time.Now())
err = txn1.LockKeys(ctx, lockCtx, k1, k2)
s.IsType(&tikverr.ErrWriteConflict{}, errors.Cause(err))

// k1 has txn's stale pessimistic lock now.
// k1 has txn1's stale pessimistic lock now.

forUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.NoError(err)
lockCtx = kv.NewLockCtx(forUpdateTS, 200, time.Now())
s.NoError(txn.LockKeys(ctx, lockCtx, k2, k3))
s.NoError(txn1.LockKeys(ctx, lockCtx, k2, k3))

s.NoError(txn.Set(k2, v2))
s.NoError(txn.Set(k3, v3))
s.NoError(txn.Commit(ctx))
s.NoError(txn1.Set(k2, v2))
s.NoError(txn1.Set(k3, v3))
s.NoError(txn1.Commit(ctx))

// k3 has txn's stale prewrite lock now.
// k3 has txn1's stale prewrite lock now.

// Perform ScanLock - BatchResolveLock.
txn2, err := s.store.Begin()
txn2.SetPessimistic(true)
s.NoError(err)
lockCtx = kv.NewLockCtx(txn1.StartTS(), 200, time.Now())
err = txn2.LockKeys(ctx, lockCtx, k4)
s.NoError(err)
s.NoError(txn2.Rollback())

// k4 has txn2's stale primary pessimistic lock now.
currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)

remainingLocks, err := s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS)
s.NoError(err)

s.Len(remainingLocks, 3)
s.Equal(remainingLocks[0].Key, k1)
s.Equal(remainingLocks[0].LockType, kvrpcpb.Op_PessimisticLock)
s.Equal(remainingLocks[1].Key, k3)
s.Equal(remainingLocks[1].LockType, kvrpcpb.Op_Put)
s.Equal(remainingLocks[2].Key, k4)
s.Equal(remainingLocks[2].LockType, kvrpcpb.Op_PessimisticLock)
s.Equal(remainingLocks[2].Primary, k4)

// Perform ScanLock - BatchResolveLock.
s.NoError(err)
s.NoError(s.store.GCResolveLockPhase(ctx, currentTS, 1))

// Do ScanLock again to make sure no locks are left.
remainingLocks, err = s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS)
s.NoError(err)
s.Empty(remainingLocks)

// Check data consistency
readTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
snapshot := s.store.GetSnapshot(readTS)
Expand Down
37 changes: 37 additions & 0 deletions tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
package tikv

import (
"bytes"
"context"

"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -114,6 +115,42 @@ func (s StoreProbe) GCResolveLockPhase(ctx context.Context, safepoint uint64, co
return s.resolveLocks(ctx, safepoint, concurrency)
}

func (s StoreProbe) ScanLocks(ctx context.Context, startKey, endKey []byte, maxVersion uint64) ([]*txnlock.Lock, error) {
bo := NewGcResolveLockMaxBackoffer(ctx)
const limit = 1024

var result []*txnlock.Lock

outerLoop:
for {
locks, loc, err := s.KVStore.scanLocksInRegionWithStartKey(bo, startKey, maxVersion, limit)
if err != nil {
return nil, err
}
for _, l := range locks {
if bytes.Compare(endKey, l.Key) <= 0 {
// Finished scanning the given range.
break outerLoop
}
result = append(result, l)
}

if len(locks) < limit {
if len(loc.EndKey) == 0 {
// Scanned to the very end.
break outerLoop
}
// The current region is completely scanned.
startKey = loc.EndKey
} else {
// The current region may still have more locks.
startKey = append(locks[len(locks)-1].Key, 0)
}
}

return result, nil
}

// LockResolverProbe wraps a LockResolver and exposes internal stats for testing purpose.
type LockResolverProbe struct {
*txnlock.LockResolverProbe
Expand Down
19 changes: 18 additions & 1 deletion txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,10 +556,27 @@ func (txn *KVTxn) Rollback() error {
txn.CancelAggressiveLocking(context.Background())
}

// `skipPessimisticRollback` may be true only when set by failpoint in tests.
skipPessimisticRollback := false
if val, err := util.EvalFailpoint("onRollback"); err == nil {
if s, ok := val.(string); ok {
if s == "skipRollbackPessimisticLock" {
logutil.BgLogger().Info("[failpoint] injected skip pessimistic rollback on explicit rollback",
zap.Uint64("txnStartTS", txn.startTS))
skipPessimisticRollback = true
} else {
panic(fmt.Sprintf("unknown instruction %s for failpoint \"onRollback\"", s))
}
}
}

start := time.Now()
// Clean up pessimistic lock.
if txn.IsPessimistic() && txn.committer != nil {
err := txn.rollbackPessimisticLocks()
var err error
if !skipPessimisticRollback {
err = txn.rollbackPessimisticLocks()
}
txn.committer.ttlManager.close()
if err != nil {
logutil.BgLogger().Error(err.Error())
Expand Down
17 changes: 11 additions & 6 deletions txnkv/txnlock/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,26 +245,29 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
}
metrics.LockResolverCountWithExpired.Inc()

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l)
if err != nil {
return false, err
}

if l.LockType == kvrpcpb.Op_PessimisticLock {
// BatchResolveLocks forces resolving the locks ignoring whether whey are expired.
// For pessimistic locks, committing them makes no sense, but it won't affect transaction
// correctness if we always roll back them.
// Pessimistic locks needs special handling logic because their primary may not point
// to the real primary of that transaction, and their state cannot be put in `txnInfos`.
// (see: https://github.com/pingcap/tidb/issues/42937).
//
// `resolvePessimisticLock` should be called after calling `getTxnStatus`.
// See: https://github.com/pingcap/tidb/issues/45134
err := lr.resolvePessimisticLock(bo, l)
if err != nil {
return false, err
}
continue
}

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l)
if err != nil {
return false, err
}

// If the transaction uses async commit, CheckTxnStatus will reject rolling back the primary lock.
// Then we need to check the secondary locks to determine the final status of the transaction.
if status.primaryLock != nil && status.primaryLock.UseAsyncCommit {
Expand Down Expand Up @@ -1173,6 +1176,8 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat
}
}

// resolvePessimisticLock handles pessimistic locks after checking txn status.
// Note that this function assumes `CheckTxnStatus` is done (or `getTxnStatusFromLock` has been called) on the lock.
func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) error {
metrics.LockResolverCountWithResolveLocks.Inc()
// The lock has been resolved by getTxnStatusFromLock.
Expand Down

0 comments on commit c0cf773

Please sign in to comment.