Skip to content

Commit

Permalink
lock_resolver: support verifying primary for check_txn_status (tikv#777)
Browse files Browse the repository at this point in the history
* support verifying primary for check_txn_status

Signed-off-by: MyonKeminta <[email protected]>

* update kvproto

Signed-off-by: MyonKeminta <[email protected]>

* add more failpoint usages

Signed-off-by: MyonKeminta <[email protected]>

* update depencency and fix test

Signed-off-by: MyonKeminta <[email protected]>

* Do not skip for unistore; refine logs

Signed-off-by: MyonKeminta <[email protected]>

* Address comments

Signed-off-by: MyonKeminta <[email protected]>

---------

Signed-off-by: MyonKeminta <[email protected]>
Co-authored-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta and MyonKeminta committed May 26, 2023
1 parent d4aab1e commit 4ee50ea
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 1 deletion.
111 changes: 111 additions & 0 deletions integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/client-go/v2/txnkv/transaction"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/util"
)

var getMaxBackoff = tikv.ConfigProbe{}.GetGetMaxBackoff()
Expand All @@ -65,6 +66,10 @@ func TestLock(t *testing.T) {
suite.Run(t, new(testLockSuite))
}

func TestLockWithTiKV(t *testing.T) {
suite.Run(t, new(testLockWithTiKVSuite))
}

type testLockSuite struct {
suite.Suite
store tikv.StoreProbe
Expand Down Expand Up @@ -1007,3 +1012,109 @@ func (s *testLockSuite) TestLockWaitTimeLimit() {
s.Nil(txn1.Rollback())
s.Nil(txn2.Rollback())
}

type testLockWithTiKVSuite struct {
suite.Suite
store tikv.StoreProbe
}

func (s *testLockWithTiKVSuite) SetupTest() {
if *withTiKV {
s.store = tikv.StoreProbe{KVStore: NewTestStore(s.T())}
} else {
s.store = tikv.StoreProbe{KVStore: NewTestUniStore(s.T())}
}
}

func (s *testLockWithTiKVSuite) TearDownTest() {
s.store.Close()
}

func (s *testLockWithTiKVSuite) TestCheckTxnStatusSentToSecondary() {
s.NoError(failpoint.Enable("tikvclient/beforeAsyncPessimisticRollback", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", "return"))
s.NoError(failpoint.Enable("tikvclient/shortPessimisticLockTTL", "return"))
s.NoError(failpoint.Enable("tikvclient/twoPCShortLockTTL", "return"))
defer func() {
s.NoError(failpoint.Disable("tikvclient/beforeAsyncPessimisticRollback"))
s.NoError(failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit"))
s.NoError(failpoint.Disable("tikvclient/shortPessimisticLockTTL"))
s.NoError(failpoint.Disable("tikvclient/twoPCShortLockTTL"))
}()

k1 := []byte("k1")
k2 := []byte("k2")
k3 := []byte("k3")

ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))

txn, err := s.store.Begin()
s.NoError(err)
txn.SetPessimistic(true)

// Construct write conflict to make the LockKeys operation fail.
{
txn2, err := s.store.Begin()
s.NoError(err)
s.NoError(txn2.Set(k3, []byte("v3")))
s.NoError(txn2.Commit(ctx))
}

lockCtx := kv.NewLockCtx(txn.StartTS(), 200, time.Now())
err = txn.LockKeys(ctx, lockCtx, k1, k2, k3)
s.IsType(&tikverr.ErrWriteConflict{}, errors.Cause(err))

// At this time: txn's primary is unsetted, and the keys:
// * k1: stale pessimistic lock, primary
// * k2: stale pessimistic lock, primary -> k1

forUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.NoError(err)
lockCtx = kv.NewLockCtx(forUpdateTS, 200, time.Now())
err = txn.LockKeys(ctx, lockCtx, k3) // k3 becomes primary
err = txn.LockKeys(ctx, lockCtx, k1)
s.Equal(k3, txn.GetCommitter().GetPrimaryKey())

// At this time:
// * k1: pessimistic lock, primary -> k3
// * k2: stale pessimistic lock, primary -> k1
// * k3: pessimistic lock, primary

s.NoError(txn.Set(k1, []byte("v1-1")))
s.NoError(txn.Set(k3, []byte("v3-1")))

s.NoError(failpoint.Enable("tikvclient/beforeCommitSecondaries", `return("skip")`))
defer func() {
s.NoError(failpoint.Disable("tikvclient/beforeCommitSecondaries"))
}()

s.NoError(txn.Commit(ctx))

// At this time:
// * k1: prewritten, primary -> k3
// * k2: stale pessimistic lock, primary -> k1
// * k3: committed

// Trigger resolving lock on k2
{
txn2, err := s.store.Begin()
s.NoError(err)
txn2.SetPessimistic(true)
lockCtx = kv.NewLockCtx(txn2.StartTS(), 200, time.Now())
s.NoError(txn2.LockKeys(ctx, lockCtx, k2))
s.NoError(txn2.Rollback())
}

// Check data consistency
readTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.NoError(err)
snapshot := s.store.GetSnapshot(readTS)
v, err := snapshot.Get(ctx, k3)
s.NoError(err)
s.Equal([]byte("v3-1"), v)
_, err = snapshot.Get(ctx, k2)
s.Equal(tikverr.ErrNotExist, err)
v, err = snapshot.Get(ctx, k1)
s.NoError(err)
s.Equal([]byte("v1-1"), v)
}
15 changes: 15 additions & 0 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"encoding/hex"
"math"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1641,6 +1642,20 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
logutil.Logger(ctx).Info("[failpoint] injected delay before commit",
zap.Uint64("txnStartTS", c.startTS), zap.Duration("duration", duration))
time.Sleep(duration)
} else if strings.HasPrefix(action, "delay(") && strings.HasSuffix(action, ")") {
durationStr := action[6:]
durationStr = durationStr[:len(durationStr)-1]
millis, err := strconv.ParseUint(durationStr, 10, 64)
if err != nil {
panic("failed to parse delay duration: " + durationStr)
}
duration := time.Millisecond * time.Duration(millis)
logutil.Logger(ctx).Info("[failpoint] injected delay before commit",
zap.Uint64("txnStartTS", c.startTS), zap.Duration("duration", duration))
time.Sleep(duration)
} else {
logutil.Logger(ctx).Info("[failpoint] unknown failpoint config",
zap.Uint64("txnStartTS", c.startTS), zap.String("config", action))
}
}
}
Expand Down
25 changes: 24 additions & 1 deletion txnkv/txnlock/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,15 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio
var resolve func(*Lock, bool) (TxnStatus, error)
resolve = func(l *Lock, forceSyncCommit bool) (TxnStatus, error) {
status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, forceSyncCommit, detail)
if err != nil {

if _, ok := errors.Cause(err).(primaryMismatch); ok {
if l.LockType != kvrpcpb.Op_PessimisticLock {
logutil.BgLogger().Info("unexpected primaryMismatch error occurred on a non-pessimistic lock", zap.Stringer("lock", l), zap.Error(err))
return TxnStatus{}, err
}
// Pessimistic rollback the pessimistic lock as it points to an invalid primary.
status, err = TxnStatus{}, nil
} else if err != nil {
return TxnStatus{}, err
}
if status.ttl != 0 {
Expand Down Expand Up @@ -672,6 +680,14 @@ func (e txnNotFoundErr) Error() string {
return e.TxnNotFound.String()
}

type primaryMismatch struct {
currentLock *kvrpcpb.LockInfo
}

func (e primaryMismatch) Error() string {
return "primary mismatch, current lock: " + e.currentLock.String()
}

// getTxnStatus sends the CheckTxnStatus request to the TiKV server.
// When rollbackIfNotExist is false, the caller should be careful with the txnNotFoundErr error.
func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary []byte,
Expand Down Expand Up @@ -701,6 +717,7 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary
RollbackIfNotExist: rollbackIfNotExist,
ForceSyncCommit: forceSyncCommit,
ResolvingPessimisticLock: resolvingPessimisticLock,
VerifyIsPrimary: true,
}, kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(bo.GetCtx()),
})
Expand Down Expand Up @@ -735,6 +752,12 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary
return status, txnNotFoundErr{txnNotFound}
}

if p := keyErr.GetPrimaryMismatch(); p != nil && resolvingPessimisticLock {
err = primaryMismatch{currentLock: p.GetLockInfo()}
logutil.BgLogger().Info("getTxnStatus was called on secondary lock", zap.Error(err))
return status, err
}

err = errors.Errorf("unexpected err: %s, tid: %v", keyErr, txnID)
logutil.BgLogger().Error("getTxnStatus error", zap.Error(err))
return status, err
Expand Down

0 comments on commit 4ee50ea

Please sign in to comment.