Skip to content

Commit

Permalink
Merge branch 'master' into master-stale-patch
Browse files Browse the repository at this point in the history
  • Loading branch information
you06 authored Jul 5, 2023
2 parents 034f36e + c0cf773 commit 84e83df
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 42 deletions.
61 changes: 45 additions & 16 deletions integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,52 +1472,81 @@ func (s *testLockWithTiKVSuite) TestBatchResolveLocks() {

s.NoError(failpoint.Enable("tikvclient/beforeAsyncPessimisticRollback", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/beforeCommitSecondaries", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return`))
s.NoError(failpoint.Enable("tikvclient/onRollback", `return("skipRollbackPessimisticLock")`))
defer func() {
s.NoError(failpoint.Disable("tikvclient/beforeAsyncPessimisticRollback"))
s.NoError(failpoint.Disable("tikvclient/beforeCommitSecondaries"))
s.NoError(failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit"))
s.NoError(failpoint.Disable("tikvclient/onRollback"))
}()

k1, k2, k3 := []byte("k1"), []byte("k2"), []byte("k3")
k1, k2, k3, k4 := []byte("k1"), []byte("k2"), []byte("k3"), []byte("k4")
v2, v3 := []byte("v2"), []byte("v3")

ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))

txn, err := s.store.Begin()
txn1, err := s.store.Begin()
s.NoError(err)
txn.SetPessimistic(true)
txn1.SetPessimistic(true)

{
// Produce write conflict on key k2
txn2, err := s.store.Begin()
helperTxn, err := s.store.Begin()
s.NoError(err)
s.NoError(txn2.Set(k2, []byte("v0")))
s.NoError(txn2.Commit(ctx))
s.NoError(helperTxn.Set(k2, []byte("v0")))
s.NoError(helperTxn.Commit(ctx))
}

lockCtx := kv.NewLockCtx(txn.StartTS(), 200, time.Now())
err = txn.LockKeys(ctx, lockCtx, k1, k2)
lockCtx := kv.NewLockCtx(txn1.StartTS(), 200, time.Now())
err = txn1.LockKeys(ctx, lockCtx, k1, k2)
s.IsType(&tikverr.ErrWriteConflict{}, errors.Cause(err))

// k1 has txn's stale pessimistic lock now.
// k1 has txn1's stale pessimistic lock now.

forUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.NoError(err)
lockCtx = kv.NewLockCtx(forUpdateTS, 200, time.Now())
s.NoError(txn.LockKeys(ctx, lockCtx, k2, k3))
s.NoError(txn1.LockKeys(ctx, lockCtx, k2, k3))

s.NoError(txn.Set(k2, v2))
s.NoError(txn.Set(k3, v3))
s.NoError(txn.Commit(ctx))
s.NoError(txn1.Set(k2, v2))
s.NoError(txn1.Set(k3, v3))
s.NoError(txn1.Commit(ctx))

// k3 has txn's stale prewrite lock now.
// k3 has txn1's stale prewrite lock now.

// Perform ScanLock - BatchResolveLock.
txn2, err := s.store.Begin()
txn2.SetPessimistic(true)
s.NoError(err)
lockCtx = kv.NewLockCtx(txn1.StartTS(), 200, time.Now())
err = txn2.LockKeys(ctx, lockCtx, k4)
s.NoError(err)
s.NoError(txn2.Rollback())

// k4 has txn2's stale primary pessimistic lock now.
currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)

remainingLocks, err := s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS)
s.NoError(err)

s.Len(remainingLocks, 3)
s.Equal(remainingLocks[0].Key, k1)
s.Equal(remainingLocks[0].LockType, kvrpcpb.Op_PessimisticLock)
s.Equal(remainingLocks[1].Key, k3)
s.Equal(remainingLocks[1].LockType, kvrpcpb.Op_Put)
s.Equal(remainingLocks[2].Key, k4)
s.Equal(remainingLocks[2].LockType, kvrpcpb.Op_PessimisticLock)
s.Equal(remainingLocks[2].Primary, k4)

// Perform ScanLock - BatchResolveLock.
s.NoError(err)
s.NoError(s.store.GCResolveLockPhase(ctx, currentTS, 1))

// Do ScanLock again to make sure no locks are left.
remainingLocks, err = s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS)
s.NoError(err)
s.Empty(remainingLocks)

// Check data consistency
readTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
snapshot := s.store.GetSnapshot(readTS)
Expand Down
7 changes: 5 additions & 2 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1356,7 +1356,7 @@ func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *retry.Backoffer, startKey
return
}
if len(regions) == 0 {
err = errors.New("PD returned no region")
err = errors.Errorf("PD returned no region, startKey: %q, endKey: %q", startKey, endKey)
return
}

Expand Down Expand Up @@ -1780,7 +1780,10 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte,
metrics.RegionCacheCounterWithScanRegionsOK.Inc()

if len(regionsInfo) == 0 {
return nil, errors.New("PD returned no region")
return nil, errors.Errorf(
"PD returned no region, startKey: %q, endKey: %q, limit: %q",
startKey, endKey, limit,
)
}
regions := make([]*Region, 0, len(regionsInfo))
for _, r := range regionsInfo {
Expand Down
32 changes: 15 additions & 17 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,12 +532,12 @@ func (state *tryNewProxy) onNoLeader(selector *replicaSelector) {
type accessFollower struct {
stateBase
// If tryLeader is true, the request can also be sent to the leader when !leader.isSlow()
tryLeader bool
isGlobalStaleRead bool
option storeSelectorOp
leaderIdx AccessIndex
lastIdx AccessIndex
learnerOnly bool
tryLeader bool
isStaleRead bool
option storeSelectorOp
leaderIdx AccessIndex
lastIdx AccessIndex
learnerOnly bool
}

func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
Expand All @@ -558,12 +558,10 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
}
}
} else {
// Stale Read request will retry the leader or next peer on error,
// if txnScope is global, we will only retry the leader by using the WithLeaderOnly option,
// if txnScope is local, we will retry both other peers and the leader by the strategy of replicaSelector.
if state.isGlobalStaleRead {
// Stale Read request will retry the leader only by using the WithLeaderOnly option.
if state.isStaleRead {
WithLeaderOnly()(&state.option)
// retry on the leader should not use stale read flag to avoid possible DataIsNotReady error as it always can serve any read
// retry on the leader should not use stale read flag to avoid possible DataIsNotReady error as it always can serve any read.
resetStaleRead = true
}
state.lastIdx++
Expand Down Expand Up @@ -787,12 +785,12 @@ func newReplicaSelector(
}
tryLeader := req.ReplicaReadType == kv.ReplicaReadMixed || req.ReplicaReadType == kv.ReplicaReadPreferLeader
state = &accessFollower{
tryLeader: tryLeader,
isGlobalStaleRead: req.IsGlobalStaleRead(),
option: option,
leaderIdx: regionStore.workTiKVIdx,
lastIdx: -1,
learnerOnly: req.ReplicaReadType == kv.ReplicaReadLearner,
tryLeader: tryLeader,
isStaleRead: req.StaleRead,
option: option,
leaderIdx: regionStore.workTiKVIdx,
lastIdx: -1,
learnerOnly: req.ReplicaReadType == kv.ReplicaReadLearner,
}
}

Expand Down
36 changes: 36 additions & 0 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,3 +657,39 @@ func (s *testRegionRequestToSingleStoreSuite) TestCloseConnectionOnStoreNotMatch
s.NotNil(regionErr)
s.Equal(target, client.closedAddr)
}

func (s *testRegionRequestToSingleStoreSuite) TestStaleReadRetry() {
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
Key: []byte("key"),
})
req.EnableStaleRead()
req.ReadReplicaScope = "z1" // not global stale read.
region, err := s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)

oc := s.regionRequestSender.client
defer func() {
s.regionRequestSender.client = oc
}()

s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if req.StaleRead {
// Mock for stale-read request always return DataIsNotReady error when tikv `ResolvedTS` is blocked.
response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}},
}}
} else {
response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}
}
return response, nil
}}

bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
regionErr, _ := resp.GetRegionError()
s.Nil(regionErr)
s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value)
}
37 changes: 37 additions & 0 deletions tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
package tikv

import (
"bytes"
"context"

"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -114,6 +115,42 @@ func (s StoreProbe) GCResolveLockPhase(ctx context.Context, safepoint uint64, co
return s.resolveLocks(ctx, safepoint, concurrency)
}

func (s StoreProbe) ScanLocks(ctx context.Context, startKey, endKey []byte, maxVersion uint64) ([]*txnlock.Lock, error) {
bo := NewGcResolveLockMaxBackoffer(ctx)
const limit = 1024

var result []*txnlock.Lock

outerLoop:
for {
locks, loc, err := s.KVStore.scanLocksInRegionWithStartKey(bo, startKey, maxVersion, limit)
if err != nil {
return nil, err
}
for _, l := range locks {
if bytes.Compare(endKey, l.Key) <= 0 {
// Finished scanning the given range.
break outerLoop
}
result = append(result, l)
}

if len(locks) < limit {
if len(loc.EndKey) == 0 {
// Scanned to the very end.
break outerLoop
}
// The current region is completely scanned.
startKey = loc.EndKey
} else {
// The current region may still have more locks.
startKey = append(locks[len(locks)-1].Key, 0)
}
}

return result, nil
}

// LockResolverProbe wraps a LockResolver and exposes internal stats for testing purpose.
type LockResolverProbe struct {
*txnlock.LockResolverProbe
Expand Down
19 changes: 18 additions & 1 deletion txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,10 +556,27 @@ func (txn *KVTxn) Rollback() error {
txn.CancelAggressiveLocking(context.Background())
}

// `skipPessimisticRollback` may be true only when set by failpoint in tests.
skipPessimisticRollback := false
if val, err := util.EvalFailpoint("onRollback"); err == nil {
if s, ok := val.(string); ok {
if s == "skipRollbackPessimisticLock" {
logutil.BgLogger().Info("[failpoint] injected skip pessimistic rollback on explicit rollback",
zap.Uint64("txnStartTS", txn.startTS))
skipPessimisticRollback = true
} else {
panic(fmt.Sprintf("unknown instruction %s for failpoint \"onRollback\"", s))
}
}
}

start := time.Now()
// Clean up pessimistic lock.
if txn.IsPessimistic() && txn.committer != nil {
err := txn.rollbackPessimisticLocks()
var err error
if !skipPessimisticRollback {
err = txn.rollbackPessimisticLocks()
}
txn.committer.ttlManager.close()
if err != nil {
logutil.BgLogger().Error(err.Error())
Expand Down
17 changes: 11 additions & 6 deletions txnkv/txnlock/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,26 +245,29 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
}
metrics.LockResolverCountWithExpired.Inc()

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l)
if err != nil {
return false, err
}

if l.LockType == kvrpcpb.Op_PessimisticLock {
// BatchResolveLocks forces resolving the locks ignoring whether whey are expired.
// For pessimistic locks, committing them makes no sense, but it won't affect transaction
// correctness if we always roll back them.
// Pessimistic locks needs special handling logic because their primary may not point
// to the real primary of that transaction, and their state cannot be put in `txnInfos`.
// (see: https://github.com/pingcap/tidb/issues/42937).
//
// `resolvePessimisticLock` should be called after calling `getTxnStatus`.
// See: https://github.com/pingcap/tidb/issues/45134
err := lr.resolvePessimisticLock(bo, l)
if err != nil {
return false, err
}
continue
}

// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l)
if err != nil {
return false, err
}

// If the transaction uses async commit, CheckTxnStatus will reject rolling back the primary lock.
// Then we need to check the secondary locks to determine the final status of the transaction.
if status.primaryLock != nil && status.primaryLock.UseAsyncCommit {
Expand Down Expand Up @@ -1173,6 +1176,8 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat
}
}

// resolvePessimisticLock handles pessimistic locks after checking txn status.
// Note that this function assumes `CheckTxnStatus` is done (or `getTxnStatusFromLock` has been called) on the lock.
func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock) error {
metrics.LockResolverCountWithResolveLocks.Inc()
// The lock has been resolved by getTxnStatusFromLock.
Expand Down

0 comments on commit 84e83df

Please sign in to comment.