diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index 178dcefd0..96e402f3c 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -39,6 +39,7 @@ package tikv_test import ( "bytes" "context" + stderrs "errors" "fmt" "math" "math/rand" @@ -310,7 +311,7 @@ func (s *testCommitterSuite) TestContextCancel2() { cancel() // Secondary keys should not be canceled. s.Eventually(func() bool { - return !s.isKeyLocked([]byte("b")) + return !s.isKeyOptimisticLocked([]byte("b")) }, 2*time.Second, 20*time.Millisecond, "Secondary locks are not committed after 2 seconds") } @@ -370,7 +371,7 @@ func (s *testCommitterSuite) mustGetRegionID(key []byte) uint64 { return loc.Region.GetID() } -func (s *testCommitterSuite) isKeyLocked(key []byte) bool { +func (s *testCommitterSuite) isKeyOptimisticLocked(key []byte) bool { ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) s.Nil(err) bo := tikv.NewBackofferWithVars(context.Background(), 500, nil) @@ -387,6 +388,34 @@ func (s *testCommitterSuite) isKeyLocked(key []byte) bool { return keyErr.GetLocked() != nil } +func (s *testCommitterSuite) checkIsKeyLocked(key []byte, expectedLocked bool) { + // To be aware of the result of async operations (e.g. async pessimistic rollback), retry if the check fails. + for i := 0; i < 5; i++ { + txn := s.begin() + txn.SetPessimistic(true) + + lockCtx := kv.NewLockCtx(txn.StartTS(), kv.LockNoWait, time.Now()) + err := txn.LockKeys(context.Background(), lockCtx, key) + + var isCheckSuccess bool + if err != nil && stderrs.Is(err, tikverr.ErrLockAcquireFailAndNoWaitSet) { + isCheckSuccess = expectedLocked + } else { + s.Nil(err) + isCheckSuccess = !expectedLocked + } + + if isCheckSuccess { + s.Nil(txn.Rollback()) + return + } + + s.Nil(txn.Rollback()) + time.Sleep(time.Millisecond * 50) + } + s.Fail(fmt.Sprintf("expected key %q locked = %v, but the actual result not match", string(key), expectedLocked)) +} + func (s *testCommitterSuite) TestPrewriteCancel() { // Setup region delays for key "b" and "c". delays := map[uint64]time.Duration{ @@ -416,7 +445,7 @@ func (s *testCommitterSuite) TestPrewriteCancel() { s.NotNil(err) // "c" should be cleaned up in reasonable time. s.Eventually(func() bool { - return !s.isKeyLocked([]byte("c")) + return !s.isKeyOptimisticLocked([]byte("c")) }, 500*time.Millisecond, 10*time.Millisecond) } @@ -1112,6 +1141,318 @@ func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflictError() { } } +func (s *testCommitterSuite) TestAggressiveLocking() { + for _, finalIsDone := range []bool{false, true} { + txn := s.begin() + txn.SetPessimistic(true) + s.False(txn.IsInAggressiveLockingMode()) + + // Lock some keys in normal way. + lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"), []byte("k2"))) + s.checkIsKeyLocked([]byte("k1"), true) + s.checkIsKeyLocked([]byte("k2"), true) + + // Enter aggressive locking mode and lock some keys. + txn.StartAggressiveLocking() + lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + for _, key := range []string{"k2", "k3", "k4"} { + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte(key))) + s.checkIsKeyLocked([]byte(key), true) + } + s.True(!txn.IsInAggressiveLockingStage([]byte("k2"))) + s.True(txn.IsInAggressiveLockingStage([]byte("k3"))) + s.True(txn.IsInAggressiveLockingStage([]byte("k4"))) + + // Retry and change some of the keys to be locked. + txn.RetryAggressiveLocking(context.Background()) + s.checkIsKeyLocked([]byte("k1"), true) + s.checkIsKeyLocked([]byte("k2"), true) + s.checkIsKeyLocked([]byte("k3"), true) + s.checkIsKeyLocked([]byte("k4"), true) + lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k4"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k5"))) + s.checkIsKeyLocked([]byte("k4"), true) + s.checkIsKeyLocked([]byte("k5"), true) + + // Retry again, then the unnecessary locks acquired in the previous stage should be released. + txn.RetryAggressiveLocking(context.Background()) + s.checkIsKeyLocked([]byte("k1"), true) + s.checkIsKeyLocked([]byte("k2"), true) + s.checkIsKeyLocked([]byte("k3"), false) + s.checkIsKeyLocked([]byte("k4"), true) + s.checkIsKeyLocked([]byte("k5"), true) + + // Lock some different keys again and then done or cancel. + lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k5"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k6"))) + + if finalIsDone { + txn.DoneAggressiveLocking(context.Background()) + time.Sleep(time.Millisecond * 50) + s.checkIsKeyLocked([]byte("k1"), true) + s.checkIsKeyLocked([]byte("k2"), true) + s.checkIsKeyLocked([]byte("k3"), false) + s.checkIsKeyLocked([]byte("k4"), false) + s.checkIsKeyLocked([]byte("k5"), true) + s.checkIsKeyLocked([]byte("k6"), true) + } else { + txn.CancelAggressiveLocking(context.Background()) + time.Sleep(time.Millisecond * 50) + s.checkIsKeyLocked([]byte("k1"), true) + s.checkIsKeyLocked([]byte("k2"), true) + s.checkIsKeyLocked([]byte("k3"), false) + s.checkIsKeyLocked([]byte("k4"), false) + s.checkIsKeyLocked([]byte("k5"), false) + s.checkIsKeyLocked([]byte("k6"), false) + } + + s.NoError(txn.Rollback()) + } +} + +func (s *testCommitterSuite) TestAggressiveLockingInsert() { + txn0 := s.begin() + s.NoError(txn0.Set([]byte("k1"), []byte("v1"))) + s.NoError(txn0.Set([]byte("k3"), []byte("v3"))) + s.NoError(txn0.Set([]byte("k6"), []byte("v6"))) + s.NoError(txn0.Set([]byte("k8"), []byte("v8"))) + s.NoError(txn0.Commit(context.Background())) + + txn := s.begin() + txn.SetPessimistic(true) + + lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + lockCtx.InitReturnValues(2) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"), []byte("k2"))) + s.NoError(txn.Set([]byte("k5"), []byte("v5"))) + s.NoError(txn.Delete([]byte("k6"))) + + insertPessimisticLock := func(lockCtx *kv.LockCtx, key string) error { + txn.GetMemBuffer().UpdateFlags([]byte(key), kv.SetPresumeKeyNotExists) + if lockCtx == nil { + lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + } + return txn.LockKeys(context.Background(), lockCtx, []byte(key)) + } + + mustAlreadyExist := func(err error) { + if _, ok := errors.Cause(err).(*tikverr.ErrKeyExist); !ok { + s.Fail(fmt.Sprintf("expected KeyExist error, but got: %+q", err)) + } + } + + txn.StartAggressiveLocking() + // Already-locked before aggressive locking. + mustAlreadyExist(insertPessimisticLock(nil, "k1")) + s.NoError(insertPessimisticLock(nil, "k2")) + // Acquiring new locks normally. + mustAlreadyExist(insertPessimisticLock(nil, "k3")) + s.NoError(insertPessimisticLock(nil, "k4")) + // The key added or deleted in the same transaction before entering aggressive locking. + // Since TiDB can detect it before invoking LockKeys, client-go actually didn't handle this case for now (no matter + // if in aggressive locking or not). So skip this test case here, and it can be uncommented if someday client-go + // supports such check. + // mustAlreadyExist(insertPessimisticLock(nil, "k5")) + // s.NoError(insertPessimisticLock(nil, "k6")) + + // Locked with conflict and then do pessimistic retry. + txn2 := s.begin() + s.NoError(txn2.Set([]byte("k7"), []byte("v7"))) + s.NoError(txn2.Delete([]byte("k8"))) + s.NoError(txn2.Commit(context.Background())) + lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + err := insertPessimisticLock(lockCtx, "k7") + s.IsType(errors.Cause(err), &tikverr.ErrWriteConflict{}) + lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + s.NoError(insertPessimisticLock(lockCtx, "k8")) + s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS) + s.Equal(txn2.GetCommitTS(), lockCtx.Values["k8"].LockedWithConflictTS) + // Update forUpdateTS to simulate a pessimistic retry. + newForUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + s.Nil(err) + s.GreaterOrEqual(newForUpdateTS, txn2.GetCommitTS()) + lockCtx = &kv.LockCtx{ForUpdateTS: newForUpdateTS, WaitStartTime: time.Now()} + mustAlreadyExist(insertPessimisticLock(lockCtx, "k7")) + s.NoError(insertPessimisticLock(lockCtx, "k8")) + + txn.CancelAggressiveLocking(context.Background()) + s.NoError(txn.Rollback()) +} + +func (s *testCommitterSuite) TestAggressiveLockingSwitchPrimary() { + txn := s.begin() + txn.SetPessimistic(true) + checkPrimary := func(key string, expectedPrimary string) { + lockInfo := s.getLockInfo([]byte(key)) + s.Equal(kvrpcpb.Op_PessimisticLock, lockInfo.LockType) + s.Equal(expectedPrimary, string(lockInfo.PrimaryLock)) + } + + forUpdateTS := txn.StartTS() + txn.StartAggressiveLocking() + lockCtx := &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2"))) + checkPrimary("k1", "k1") + checkPrimary("k2", "k1") + + // Primary not changed. + forUpdateTS++ + txn.RetryAggressiveLocking(context.Background()) + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k3"))) + checkPrimary("k1", "k1") + checkPrimary("k3", "k1") + + // Primary changed and is not in the set of previously locked keys. + forUpdateTS++ + txn.RetryAggressiveLocking(context.Background()) + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k4"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k5"))) + checkPrimary("k4", "k4") + checkPrimary("k5", "k4") + // Previously locked keys that are not in the most recent aggressive locking stage will be released. + s.checkIsKeyLocked([]byte("k2"), false) + + // Primary changed and is in the set of previously locked keys. + forUpdateTS++ + txn.RetryAggressiveLocking(context.Background()) + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k5"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k6"))) + checkPrimary("k5", "k5") + checkPrimary("k6", "k5") + s.checkIsKeyLocked([]byte("k1"), false) + s.checkIsKeyLocked([]byte("k3"), false) + + // Primary changed and is locked *before* the previous aggressive locking stage (suppose it's the n-th retry, + // the expected primary is locked during the (n-2)-th retry). + forUpdateTS++ + txn.RetryAggressiveLocking(context.Background()) + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k7"))) + forUpdateTS++ + txn.RetryAggressiveLocking(context.Background()) + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k6"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k5"))) + checkPrimary("k5", "k6") + checkPrimary("k6", "k6") + + txn.CancelAggressiveLocking(context.Background()) + // Check all released. + for i := 0; i < 6; i++ { + key := []byte{byte('k'), byte('1') + byte(i)} + s.checkIsKeyLocked(key, false) + } + s.NoError(txn.Rollback()) + + // Also test the primary-switching logic won't misbehave when the primary is already selected before entering + // aggressive locking. + txn = s.begin() + txn.SetPessimistic(true) + forUpdateTS = txn.StartTS() + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"), []byte("k2"))) + checkPrimary("k1", "k1") + checkPrimary("k2", "k1") + + txn.StartAggressiveLocking() + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k3"))) + checkPrimary("k2", "k1") + checkPrimary("k3", "k1") + + forUpdateTS++ + txn.RetryAggressiveLocking(context.Background()) + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k3"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k4"))) + checkPrimary("k3", "k1") + checkPrimary("k4", "k1") + + txn.CancelAggressiveLocking(context.Background()) + s.checkIsKeyLocked([]byte("k1"), true) + s.checkIsKeyLocked([]byte("k2"), true) + s.checkIsKeyLocked([]byte("k3"), false) + s.checkIsKeyLocked([]byte("k4"), false) + s.NoError(txn.Rollback()) + s.checkIsKeyLocked([]byte("k1"), false) + s.checkIsKeyLocked([]byte("k2"), false) + +} + +func (s *testCommitterSuite) TestAggressiveLockingLoadValueOptionChanges() { + txn0 := s.begin() + s.NoError(txn0.Set([]byte("k2"), []byte("v2"))) + s.NoError(txn0.Commit(context.Background())) + + for _, firstAttemptLockedWithConflict := range []bool{false, true} { + txn := s.begin() + txn.SetPessimistic(true) + + // Make the primary deterministic to avoid the following test code involves primary re-selecting logic. + lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k0"))) + + forUpdateTS := txn.StartTS() + txn.StartAggressiveLocking() + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + + var txn2 transaction.TxnProbe + if firstAttemptLockedWithConflict { + txn2 = s.begin() + s.NoError(txn2.Delete([]byte("k1"))) + s.NoError(txn2.Set([]byte("k2"), []byte("v2"))) + s.NoError(txn2.Commit(context.Background())) + } + + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2"))) + + if firstAttemptLockedWithConflict { + s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS) + s.Equal(txn2.GetCommitTS(), lockCtx.Values["k1"].LockedWithConflictTS) + s.Equal(txn2.GetCommitTS(), lockCtx.Values["k2"].LockedWithConflictTS) + } + + if firstAttemptLockedWithConflict { + forUpdateTS = txn2.GetCommitTS() + 1 + } else { + forUpdateTS++ + } + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + lockCtx.InitCheckExistence(2) + txn.RetryAggressiveLocking(context.Background()) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2"))) + s.Equal(uint64(0), lockCtx.MaxLockedWithConflictTS) + s.Equal(false, lockCtx.Values["k1"].Exists) + s.Equal(true, lockCtx.Values["k2"].Exists) + + forUpdateTS++ + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + lockCtx.InitReturnValues(2) + txn.RetryAggressiveLocking(context.Background()) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2"))) + s.Equal(uint64(0), lockCtx.MaxLockedWithConflictTS) + s.Equal(false, lockCtx.Values["k1"].Exists) + s.Equal(true, lockCtx.Values["k2"].Exists) + s.Equal([]byte("v2"), lockCtx.Values["k2"].Value) + + txn.CancelAggressiveLocking(context.Background()) + s.NoError(txn.Rollback()) + } +} + // TestElapsedTTL tests that elapsed time is correct even if ts physical time is greater than local time. func (s *testCommitterSuite) TestElapsedTTL() { key := []byte("key") diff --git a/internal/mockstore/mocktikv/errors.go b/internal/mockstore/mocktikv/errors.go index 0b290b66e..8fb674867 100644 --- a/internal/mockstore/mocktikv/errors.go +++ b/internal/mockstore/mocktikv/errors.go @@ -107,6 +107,7 @@ type ErrConflict struct { ConflictTS uint64 ConflictCommitTS uint64 Key []byte + CanForceLock bool } func (e *ErrConflict) Error() string { diff --git a/internal/mockstore/mocktikv/mvcc_leveldb.go b/internal/mockstore/mocktikv/mvcc_leveldb.go index 82ea4a0d6..6891809b1 100644 --- a/internal/mockstore/mocktikv/mvcc_leveldb.go +++ b/internal/mockstore/mocktikv/mvcc_leveldb.go @@ -650,12 +650,13 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation dec := lockDecoder{ expectKey: mutation.Key, } - ok, err := dec.Decode(iter) + alreadyLocked, err := dec.Decode(iter) if err != nil { return err } - if ok { + if alreadyLocked { if dec.lock.startTS != startTS { + // Locked by another transaction. errDeadlock := mvcc.deadlockDetector.Detect(startTS, dec.lock.startTS, farm.Fingerprint64(mutation.Key)) if errDeadlock != nil { return &ErrDeadlock{ @@ -666,14 +667,15 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation } return dec.lock.lockErr(mutation.Key) } - return nil } - // For pessimisticLockMutation, check the correspond rollback record, there may be rollbackLock + // For pessimisticLockMutation, check the corresponding rollback record, there may be rollbackLock // operation between startTS and forUpdateTS + // It's also possible that the key is already locked by the same transaction. Also do the conflict check to + // provide an idempotent result. val, err := checkConflictValue(iter, mutation, forUpdateTS, startTS, true, kvrpcpb.AssertionLevel_Off, lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock) if err != nil { - if conflict, ok := err.(*ErrConflict); lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock && ok { + if conflict, ok := err.(*ErrConflict); lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock && ok && conflict.CanForceLock { lctx.results = append(lctx.results, &kvrpcpb.PessimisticLockKeyResult{ Type: kvrpcpb.PessimisticLockKeyResultType_LockResultLockedWithConflict, Value: val, @@ -709,21 +711,23 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation return nil } - lock := mvccLock{ - startTS: startTS, - primary: lctx.primary, - op: kvrpcpb.Op_PessimisticLock, - ttl: lctx.ttl, - forUpdateTS: forUpdateTS, - minCommitTS: lctx.minCommitTs, - } - writeKey := mvccEncode(mutation.Key, lockVer) - writeValue, err := lock.MarshalBinary() - if err != nil { - return err + if !alreadyLocked || dec.lock.forUpdateTS < forUpdateTS { + lock := mvccLock{ + startTS: startTS, + primary: lctx.primary, + op: kvrpcpb.Op_PessimisticLock, + ttl: lctx.ttl, + forUpdateTS: forUpdateTS, + minCommitTS: lctx.minCommitTs, + } + writeKey := mvccEncode(mutation.Key, lockVer) + writeValue, err := lock.MarshalBinary() + if err != nil { + return err + } + batch.Put(writeKey, writeValue) } - batch.Put(writeKey, writeValue) return nil } @@ -899,12 +903,11 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64, if dec.value.valueType == typePut || dec.value.valueType == typeLock { if needCheckShouldNotExistForPessimisticLock { - return nil, &ErrAssertionFailed{ - StartTS: startTS, - Key: m.Key, - Assertion: m.Assertion, - ExistingStartTS: dec.value.startTS, - ExistingCommitTS: dec.value.commitTS, + if writeConflictErr != nil { + return nil, writeConflictErr + } + return nil, &ErrKeyAlreadyExist{ + Key: m.Key, } } if needCheckAssertionForPrewerite && m.Assertion == kvrpcpb.Assertion_NotExist { @@ -947,6 +950,9 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64, } // writeConflictErr is not nil only when write conflict is found and `allowLockWithConflict is set to true. + if writeConflictErr != nil { + writeConflictErr.(*ErrConflict).CanForceLock = true + } if getVal { return retVal, writeConflictErr } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index a8bb345db..3237ddbca 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -818,7 +818,9 @@ func (txn *KVTxn) filterAggressiveLockedKeys(lockCtx *tikv.LockCtx, allKeys [][] !txn.mayAggressiveLockingLastLockedKeysExpire() { // We can skip locking it since it's already locked during last attempt to aggressive locking, and // we already have the information that we need. - lockCtx.Values[keyStr] = lastResult.Value + if lockCtx.Values != nil { + lockCtx.Values[keyStr] = lastResult.Value + } txn.aggressiveLockingContext.currentLockedKeys[keyStr] = lastResult continue } @@ -886,16 +888,24 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput checkKeyExists = flags.HasNeedCheckExists() } // If the key is locked in the current aggressive locking stage, override the information in memBuf. + isInLastAggressiveLockingStage := false if txn.aggressiveLockingContext != nil { if entry, ok := txn.aggressiveLockingContext.currentLockedKeys[string(key)]; ok { locked = true valueExist = entry.Value.Exists + } else if entry, ok := txn.aggressiveLockingContext.lastRetryUnnecessaryLocks[string(key)]; ok { + locked = true + valueExist = entry.Value.Exists + isInLastAggressiveLockingStage = true } } - if !locked { + if !locked || isInLastAggressiveLockingStage { + // Locks acquired in the previous aggressive locking stage might need to be updated later in + // `filterAggressiveLockedKeys`. keys = append(keys, key) - } else if txn.IsPessimistic() { + } + if locked && txn.IsPessimistic() { if checkKeyExists && valueExist { alreadyExist := kvrpcpb.AlreadyExist{Key: key} e := &tikverr.ErrKeyExist{AlreadyExist: &alreadyExist}