Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txnkv: add new API for lock->put optimization #1

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 109 additions & 1 deletion integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"fmt"
"math"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -1731,7 +1732,7 @@ func (s *testCommitterSuite) TestFlagsInMemBufferMutations() {

forEachCase(func(op kvrpcpb.Op, key []byte, value []byte, i int, isPessimisticLock, assertExist, assertNotExist bool) {
handle := db.IterWithFlags(key, nil).Handle()
mutations.Push(op, isPessimisticLock, assertExist, assertNotExist, handle)
mutations.Push(op, isPessimisticLock, assertExist, assertNotExist, handle, nil)
})

forEachCase(func(op kvrpcpb.Op, key []byte, value []byte, i int, isPessimisticLock, assertExist, assertNotExist bool) {
Expand All @@ -1742,3 +1743,110 @@ func (s *testCommitterSuite) TestFlagsInMemBufferMutations() {
s.Equal(assertNotExist, mutations.IsAssertNotExist(i))
})
}

func (s *testCommitterSuite) TestSetLockedKeyValue() {
ctx := context.Background()
k1 := []byte("k1")
v1 := []byte("v1")
v2 := []byte("v2")

mustLockKey := func(txn transaction.TxnProbe, key []byte) {
s.Require().NoError(txn.LockKeys(ctx, &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}, key))
}
checkByOpVals := func(opVals ...interface{}) func(m transaction.CommitterMutations) {
s.Require().Equal(0, len(opVals)%2)
return func(m transaction.CommitterMutations) {
s.Require().Equal(m.Len(), len(opVals)/2)
for i := 0; i < len(opVals); i += 2 {
s.Require().Equal(opVals[i], m.GetOp(0))
if opVals[i+1] == nil {
s.Require().Nil(m.GetValue(0))
} else {
s.Require().Equal(opVals[i+1], m.GetValue(0))
}
}
}
}

for _, tt := range []struct {
name string
actions []func(txn transaction.TxnProbe)
checkPessimistic func(m transaction.CommitterMutations)
checkOptimisitc func(m transaction.CommitterMutations)
}{
{
"NoLock",
[]func(txn transaction.TxnProbe){
func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) },
},
checkByOpVals(),
checkByOpVals(),
},
{
"LockOnly",
[]func(txn transaction.TxnProbe){
func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) },
func(txn transaction.TxnProbe) { mustLockKey(txn, k1) },
},
checkByOpVals(kvrpcpb.Op_Put, v1),
checkByOpVals(kvrpcpb.Op_Lock, nil),
},
{
"LockAndSet",
[]func(txn transaction.TxnProbe){
func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) },
func(txn transaction.TxnProbe) { mustLockKey(txn, k1) },
func(txn transaction.TxnProbe) { s.Require().NoError(txn.Set(k1, v2)) },
},
checkByOpVals(kvrpcpb.Op_Put, v2),
checkByOpVals(kvrpcpb.Op_Put, v2),
},
{
"LockAndDelete",
[]func(txn transaction.TxnProbe){
func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) },
func(txn transaction.TxnProbe) { mustLockKey(txn, k1) },
func(txn transaction.TxnProbe) { s.Require().NoError(txn.Delete(k1)) },
},
checkByOpVals(kvrpcpb.Op_Del, []byte{}),
checkByOpVals(kvrpcpb.Op_Del, []byte{}),
},

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to test in different order, like calling txn.SetLockedKeyValue after calling LockKeys or any mutations?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I submitted the review to wrong place

} {
var testAll func(name string, state []bool, actions []func(txn transaction.TxnProbe))
testAll = func(name string, state []bool, actions []func(txn transaction.TxnProbe)) {
if len(actions) == len(tt.actions) {
s.Run("Pessimistic"+name, func() {
txn := s.begin()
txn.SetPessimistic(true)
for _, action := range actions {
action(txn)
}
c, err := txn.NewCommitter(1)
s.Require().NoError(err)
tt.checkPessimistic(c.GetMutations())
s.Require().NoError(txn.Rollback())
})
s.Run("Optimistic"+name, func() {
txn := s.begin()
for _, action := range actions {
action(txn)
}
c, err := txn.NewCommitter(1)
s.Require().NoError(err)
tt.checkOptimisitc(c.GetMutations())
s.Require().NoError(txn.Rollback())
})
return
}
for i, used := range state {
if used {
continue
}
state[i] = true
testAll(name+"-"+strconv.Itoa(i), state, append(actions, tt.actions[i]))
state[i] = false
}
}
testAll(tt.name, make([]bool, len(tt.actions)), nil)
}
}
52 changes: 42 additions & 10 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ type memBufferMutations struct {
// MSB LSB
// [13 bits: Op][1 bit: assertNotExist][1 bit: assertExist][1 bit: isPessimisticLock]
handles []unionstore.MemKeyHandle
// overlay of mutation values
overlay map[unionstore.MemKeyHandle][]byte
}

func newMemBufferMutations(sizeHint int, storage *unionstore.MemDB) *memBufferMutations {
Expand All @@ -211,7 +213,13 @@ func (m *memBufferMutations) GetKeys() [][]byte {
}

func (m *memBufferMutations) GetValue(i int) []byte {
v, _ := m.storage.GetValueByHandle(m.handles[i])
h := m.handles[i]
if m.overlay != nil {
if v, ok := m.overlay[h]; ok {
return v
}
}
v, _ := m.storage.GetValueByHandle(h)
return v
}

Expand All @@ -235,10 +243,11 @@ func (m *memBufferMutations) Slice(from, to int) CommitterMutations {
return &memBufferMutations{
handles: m.handles[from:to],
storage: m.storage,
overlay: m.overlay,
}
}

func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, assertNotExist bool, handle unionstore.MemKeyHandle) {
func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, assertNotExist bool, handle unionstore.MemKeyHandle, value []byte) {
// See comments of `m.handles` field about the format of the user data `aux`.
aux := uint16(op) << 3
if isPessimisticLock {
Expand All @@ -252,6 +261,18 @@ func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist,
}
handle.UserData = aux
m.handles = append(m.handles, handle)
if len(value) > 0 {
if op != kvrpcpb.Op_Put {
panic("op must be PUT when pushing with value")
}
if !isPessimisticLock {
panic("key must be locked when pushing with value")
}
if m.overlay == nil {
m.overlay = make(map[unionstore.MemKeyHandle][]byte)
}
m.overlay[handle] = value
}
}

// CommitterMutationFlags represents various bit flags of mutations.
Expand Down Expand Up @@ -493,7 +514,7 @@ func (c *twoPhaseCommitter) checkSchemaOnAssertionFail(ctx context.Context, asse
}

func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
var size, putCnt, delCnt, lockCnt, checkCnt int
var size, putCnt, delCnt, lockCnt, checkCnt, putFromLockCnt int

txn := c.txn
memBuf := txn.GetMemBuffer()
Expand All @@ -508,15 +529,25 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
_ = err
key := it.Key()
flags := it.Flags()
var value []byte
var op kvrpcpb.Op
var (
value []byte
cachedValue []byte = nil
op kvrpcpb.Op
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safer to add a flag to indicate if the value is from the lock to put,

valueFromLockToPut := false


if !it.HasValue() {
if !flags.HasLocked() {
continue
}
op = kvrpcpb.Op_Lock
lockCnt++
if val, ok := txn.getValueByLockedKey(key); ok && len(val) > 0 && c.isPessimistic {
// Change the LOCK into PUT if the value of this key has a cached value.
cachedValue = val
op = kvrpcpb.Op_Put
putFromLockCnt++
} else {
op = kvrpcpb.Op_Lock
lockCnt++
}
} else {
value = it.Value()
var isUnnecessaryKV bool
Expand Down Expand Up @@ -581,8 +612,8 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
if c.txn.schemaAmender != nil || c.txn.assertionLevel == kvrpcpb.AssertionLevel_Off {
mustExist, mustNotExist, hasAssertUnknown = false, false, false
}
c.mutations.Push(op, isPessimistic, mustExist, mustNotExist, it.Handle())
size += len(key) + len(value)
c.mutations.Push(op, isPessimistic, mustExist, mustNotExist, it.Handle(), cachedValue)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then use the valueFromLockToPut mentioned above as the input parameter.

size += len(key) + len(value) + len(cachedValue)

if c.txn.assertionLevel != kvrpcpb.AssertionLevel_Off {
// Check mutations for pessimistic-locked keys with the read results of pessimistic lock requests.
Expand Down Expand Up @@ -635,6 +666,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
zap.Int("dels", delCnt),
zap.Int("locks", lockCnt),
zap.Int("checks", checkCnt),
zap.Int("putsFromLocks", putFromLockCnt),
zap.Uint64("txnStartTS", txn.startTS))
}

Expand Down Expand Up @@ -1758,7 +1790,7 @@ func (c *twoPhaseCommitter) tryAmendTxn(ctx context.Context, startInfoSchema Sch
return false, err
}
handle := c.txn.GetMemBuffer().IterWithFlags(key, nil).Handle()
c.mutations.Push(op, addMutations.IsPessimisticLock(i), addMutations.IsAssertExists(i), addMutations.IsAssertNotExist(i), handle)
c.mutations.Push(op, addMutations.IsPessimisticLock(i), addMutations.IsAssertExists(i), addMutations.IsAssertNotExist(i), handle, nil)
}
}
return false, nil
Expand Down
26 changes: 25 additions & 1 deletion txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,14 @@ type KVTxn struct {
startTS uint64
startTime time.Time // Monotonic timestamp for recording txn time consuming.
commitTS uint64
mu sync.Mutex // For thread-safe LockKeys function.
mu sync.Mutex // For thread-safe LockKeys, SetLockedKeyValue functions.
setCnt int64
vars *tikv.Variables
committer *twoPhaseCommitter
lockedCnt int
// lockedKV is used to cache kv pairs that have been locked, the 2pc committer will read this map when init
// mutations, convert lock into put if needed.
lockedKVs map[string][]byte

valid bool

Expand Down Expand Up @@ -749,6 +752,27 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
return nil
}

// SetLockedKeyValue caches a key-value pair whose key has been locked. Those key-value pairs may be turned to PUT
// record if possible.
func (txn *KVTxn) SetLockedKeyValue(key []byte, value []byte) {
txn.mu.Lock()
if txn.lockedKVs == nil {
txn.lockedKVs = make(map[string][]byte)
}
txn.lockedKVs[string(key)] = value
txn.mu.Unlock()
}

// getValueByLockedKey returns the cached value of the given locked key.
func (txn *KVTxn) getValueByLockedKey(key []byte) (value []byte, ok bool) {
txn.mu.Lock()
if txn.lockedKVs != nil {
value, ok = txn.lockedKVs[string(key)]
}
txn.mu.Unlock()
return
}

// deduplicateKeys deduplicate the keys, it use sort instead of map to avoid memory allocation.
func deduplicateKeys(keys [][]byte) [][]byte {
sort.Slice(keys, func(i, j int) bool {
Expand Down