Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: remove Pessimistic option #24332

Merged
merged 4 commits into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
txn: txn.KVTxn,
binInfo: val.(*binloginfo.BinlogInfo), // val cannot be other type.
})
case tikvstore.Pessimistic:
txn.SetPessimistic(val.(bool))
default:
txn.KVTxn.SetOption(opt, val)
}
Expand Down
44 changes: 22 additions & 22 deletions store/tikv/tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) {
func (s *testCommitterSuite) TestPessimisticPrewriteRequest(c *C) {
// This test checks that the isPessimisticLock field is set in the request even when no keys are pessimistic lock.
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
txn.SetPessimistic(true)
err := txn.Set([]byte("t1"), []byte("v1"))
c.Assert(err, IsNil)
committer, err := txn.NewCommitter(0)
Expand All @@ -636,7 +636,7 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) {
c.Assert(txn.Commit(context.Background()), IsNil)

txn = s.begin(c)
txn.SetOption(kv.Pessimistic, true)
txn.SetPessimistic(true)
_, _ = txn.GetUnionStore().Get(context.TODO(), key)
c.Assert(txn.GetMemBuffer().SetWithFlags(key, key, kv.SetPresumeKeyNotExists), IsNil)
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
Expand All @@ -651,7 +651,7 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) {

func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) {
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
txn.SetPessimistic(true)
lockCtx := &kv.LockCtx{ForUpdateTS: 100, WaitStartTime: time.Now()}
err := txn.LockKeys(context.Background(), lockCtx, []byte("abc"), []byte("def"))
c.Assert(err, IsNil)
Expand All @@ -664,7 +664,7 @@ func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) {
func (s *testCommitterSuite) TestPessimisticTTL(c *C) {
key := []byte("key")
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
txn.SetPessimistic(true)
time.Sleep(time.Millisecond * 100)
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
err := txn.LockKeys(context.Background(), lockCtx, key)
Expand Down Expand Up @@ -710,7 +710,7 @@ func (s *testCommitterSuite) TestPessimisticLockReturnValues(c *C) {
c.Assert(txn.Set(key2, key2), IsNil)
c.Assert(txn.Commit(context.Background()), IsNil)
txn = s.begin(c)
txn.SetOption(kv.Pessimistic, true)
txn.SetPessimistic(true)
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
lockCtx.ReturnValues = true
lockCtx.Values = map[string]kv.ReturnedValue{}
Expand All @@ -725,7 +725,7 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) {
key := []byte("key")
txn := s.begin(c)
txn.SetStartTS(oracle.ComposeTS(oracle.GetPhysical(time.Now().Add(time.Second*10)), 1))
txn.SetOption(kv.Pessimistic, true)
txn.SetPessimistic(true)
time.Sleep(time.Millisecond * 100)
lockCtx := &kv.LockCtx{
ForUpdateTS: oracle.ComposeTS(oracle.ExtractPhysical(txn.StartTS())+100, 1),
Expand All @@ -746,7 +746,7 @@ func (s *testCommitterSuite) TestDeleteYourWriteCauseGhostPrimary(c *C) {

// insert k1, k2, k3 and delete k1
txn1 := s.begin(c)
txn1.DelOption(kv.Pessimistic)
txn1.SetPessimistic(false)
s.store.ClearTxnLatches()
txn1.Get(context.Background(), k1)
txn1.GetMemBuffer().SetWithFlags(k1, []byte{0}, kv.SetPresumeKeyNotExists)
Expand All @@ -771,7 +771,7 @@ func (s *testCommitterSuite) TestDeleteYourWriteCauseGhostPrimary(c *C) {

// start txn2 to read k3(prewrite success and primary should be committed)
txn2 := s.begin(c)
txn2.DelOption(kv.Pessimistic)
txn2.SetPessimistic(false)
s.store.ClearTxnLatches()
v, err := txn2.Get(context.Background(), k3)
c.Assert(err, IsNil) // should resolve lock and read txn1 k3 result instead of rollback it.
Expand All @@ -788,7 +788,7 @@ func (s *testCommitterSuite) TestDeleteAllYourWrites(c *C) {

// insert k1, k2, k3 and delete k1, k2, k3
txn1 := s.begin(c)
txn1.DelOption(kv.Pessimistic)
txn1.SetPessimistic(false)
s.store.ClearTxnLatches()
txn1.GetMemBuffer().SetWithFlags(k1, []byte{0}, kv.SetPresumeKeyNotExists)
txn1.Delete(k1)
Expand All @@ -808,7 +808,7 @@ func (s *testCommitterSuite) TestDeleteAllYourWritesWithSFU(c *C) {

// insert k1, k2, k2 and delete k1
txn1 := s.begin(c)
txn1.DelOption(kv.Pessimistic)
txn1.SetPessimistic(false)
s.store.ClearTxnLatches()
txn1.GetMemBuffer().SetWithFlags(k1, []byte{0}, kv.SetPresumeKeyNotExists)
txn1.Delete(k1)
Expand All @@ -832,7 +832,7 @@ func (s *testCommitterSuite) TestDeleteAllYourWritesWithSFU(c *C) {
<-ac
// start txn2 to read k3
txn2 := s.begin(c)
txn2.DelOption(kv.Pessimistic)
txn2.SetPessimistic(false)
s.store.ClearTxnLatches()
err = txn2.Set(k3, []byte{33})
c.Assert(err, IsNil)
Expand Down Expand Up @@ -860,7 +860,7 @@ func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) {
k2 := []byte("k2")

txn1 := s.begin(c)
txn1.SetOption(kv.Pessimistic, true)
txn1.SetPessimistic(true)
// lock the primary key
lockCtx := &kv.LockCtx{ForUpdateTS: txn1.StartTS(), WaitStartTime: time.Now()}
err := txn1.LockKeys(context.Background(), lockCtx, k1)
Expand All @@ -875,7 +875,7 @@ func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) {
// wait until secondary key exceeds its own TTL
time.Sleep(time.Duration(atomic.LoadUint64(&tikv.ManagedLockTTL)) * time.Millisecond)
txn2 := s.begin(c)
txn2.SetOption(kv.Pessimistic, true)
txn2.SetPessimistic(true)

// test no wait
lockCtx = &kv.LockCtx{ForUpdateTS: txn2.StartTS(), LockWaitTime: tikv.LockNoWait, WaitStartTime: time.Now()}
Expand Down Expand Up @@ -921,7 +921,7 @@ func (s *testCommitterSuite) TestPkNotFound(c *C) {
k3 := []byte("k3")

txn1 := s.begin(c)
txn1.SetOption(kv.Pessimistic, true)
txn1.SetPessimistic(true)
// lock the primary key.
lockCtx := &kv.LockCtx{ForUpdateTS: txn1.StartTS(), WaitStartTime: time.Now()}
err := txn1.LockKeys(ctx, lockCtx, k1)
Expand Down Expand Up @@ -957,7 +957,7 @@ func (s *testCommitterSuite) TestPkNotFound(c *C) {
c.Assert(err, IsNil)
c.Assert(status.Action(), Equals, kvrpcpb.Action_LockNotExistDoNothing)
txn2 := s.begin(c)
txn2.SetOption(kv.Pessimistic, true)
txn2.SetPessimistic(true)
lockCtx = &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now()}
err = txn2.LockKeys(ctx, lockCtx, k2)
c.Assert(err, IsNil)
Expand All @@ -981,7 +981,7 @@ func (s *testCommitterSuite) TestPkNotFound(c *C) {
// After disable fail point, the rollbackIfNotExist flag will be set, and the resolve should succeed. In this
// case, the returned action of TxnStatus should be LockNotExistDoNothing, and lock on k3 could be resolved.
txn3 := s.begin(c)
txn3.SetOption(kv.Pessimistic, true)
txn3.SetPessimistic(true)
lockCtx = &kv.LockCtx{ForUpdateTS: txn3.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tikv.LockNoWait}
err = txn3.LockKeys(ctx, lockCtx, k3)
c.Assert(err, IsNil)
Expand All @@ -997,7 +997,7 @@ func (s *testCommitterSuite) TestPessimisticLockPrimary(c *C) {
k2 := []byte("b")

txn1 := s.begin(c)
txn1.SetOption(kv.Pessimistic, true)
txn1.SetPessimistic(true)
// txn1 lock k1
lockCtx := &kv.LockCtx{ForUpdateTS: txn1.StartTS(), WaitStartTime: time.Now()}
err := txn1.LockKeys(context.Background(), lockCtx, k1)
Expand All @@ -1008,7 +1008,7 @@ func (s *testCommitterSuite) TestPessimisticLockPrimary(c *C) {
doneCh := make(chan error)
go func() {
txn2 := s.begin(c)
txn2.SetOption(kv.Pessimistic, true)
txn2.SetPessimistic(true)
lockCtx2 := &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now(), LockWaitTime: 200}
waitErr := txn2.LockKeys(context.Background(), lockCtx2, k1, k2)
doneCh <- waitErr
Expand All @@ -1017,7 +1017,7 @@ func (s *testCommitterSuite) TestPessimisticLockPrimary(c *C) {

// txn3 should locks k2 successfully using no wait
txn3 := s.begin(c)
txn3.SetOption(kv.Pessimistic, true)
txn3.SetPessimistic(true)
lockCtx3 := &kv.LockCtx{ForUpdateTS: txn3.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tikv.LockNoWait}
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/txnNotFoundRetTTL", "return"), IsNil)
err = txn3.LockKeys(context.Background(), lockCtx3, k2)
Expand Down Expand Up @@ -1108,7 +1108,7 @@ func (s *testCommitterSuite) TestPushPessimisticLock(c *C) {
ctx := context.Background()

txn1 := s.begin(c)
txn1.SetOption(kv.Pessimistic, true)
txn1.SetPessimistic(true)
lockCtx := &kv.LockCtx{ForUpdateTS: txn1.StartTS(), WaitStartTime: time.Now()}
err := txn1.LockKeys(context.Background(), lockCtx, k1, k2)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -1160,7 +1160,7 @@ func (s *testCommitterSuite) TestResolveMixed(c *C) {

// make the optimistic and pessimistic lock left with primary lock not found
txn1 := s.begin(c)
txn1.SetOption(kv.Pessimistic, true)
txn1.SetPessimistic(true)
// lock the primary key
lockCtx := &kv.LockCtx{ForUpdateTS: txn1.StartTS(), WaitStartTime: time.Now()}
err := txn1.LockKeys(context.Background(), lockCtx, pk)
Expand Down Expand Up @@ -1202,7 +1202,7 @@ func (s *testCommitterSuite) TestResolveMixed(c *C) {

// txn2 tries to lock the pessimisticLockKey, the lock should has been resolved in clean whole region resolve
txn2 := s.begin(c)
txn2.SetOption(kv.Pessimistic, true)
txn2.SetPessimistic(true)
lockCtx = &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tikv.LockNoWait}
err = txn2.LockKeys(context.Background(), lockCtx, pessimisticLockKey)
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (s *testAsyncCommitSuite) TestRepeatableRead(c *C) {
sessionID++
ctx := context.WithValue(context.Background(), util.SessionID, sessionID)
txn1 := s.beginAsyncCommit(c)
txn1.SetOption(kv.Pessimistic, isPessimistic)
txn1.SetPessimistic(isPessimistic)
s.mustGetFromTxn(c, txn1, []byte("k1"), []byte("v1"))
txn1.Set([]byte("k1"), []byte("v2"))

Expand Down
2 changes: 1 addition & 1 deletion store/tikv/tests/ticlient_slow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *testTiclientSuite) TestSplitRegionIn2PC(c *C) {
checkKeyRegion(bo, startKey, endKey, Equals)
txn := s.beginTxn(c)
if m == "pessimistic" {
txn.SetOption(kv.Pessimistic, true)
txn.SetPessimistic(true)
lockCtx := &kv.LockCtx{}
lockCtx.ForUpdateTS = txn.StartTS()
keys := make([][]byte, 0, preSplitThresholdInTest)
Expand Down
10 changes: 8 additions & 2 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ type KVTxn struct {
// commitCallback is called after current transaction gets committed
commitCallback func(info string, err error)

binlog BinlogExecutor
binlog BinlogExecutor
isPessimistic bool
}

func newTiKVTxn(store *KVStore, txnScope string) (*KVTxn, error) {
Expand Down Expand Up @@ -198,9 +199,14 @@ func (txn *KVTxn) DelOption(opt int) {
txn.us.DelOption(opt)
}

// SetPessimistic indicates if the transaction should use pessimictic lock.
func (txn *KVTxn) SetPessimistic(b bool) {
txn.isPessimistic = b
}

// IsPessimistic returns true if it is pessimistic.
func (txn *KVTxn) IsPessimistic() bool {
return txn.us.GetOption(kv.Pessimistic) != nil
return txn.isPessimistic
}

func (txn *KVTxn) getKVFilter() KVFilter {
Expand Down