diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index a9e09a271ce5e..8992866877d4d 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -35,7 +35,7 @@ type tikvTxn struct { // NewTiKVTxn returns a new Transaction. func NewTiKVTxn(txn *tikv.KVTxn) kv.Transaction { - txn.SetOption(tikvstore.KVFilter, TiDBKVFilter{}) + txn.SetKVFilter(TiDBKVFilter{}) entryLimit := atomic.LoadUint64(&kv.TxnEntrySizeLimit) totalLimit := atomic.LoadUint64(&kv.TxnTotalSizeLimit) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index c33bf6d0f6d1f..dafd0847af797 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -327,7 +327,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { sizeHint := txn.us.GetMemBuffer().Len() c.mutations = newMemBufferMutations(sizeHint, memBuf) c.isPessimistic = txn.IsPessimistic() - filter := txn.getKVFilter() + filter := txn.kvFilter var err error for it := memBuf.IterWithFlags(nil, nil); it.Valid(); err = it.Next() { diff --git a/store/tikv/kv/option.go b/store/tikv/kv/option.go index e9479cf609502..a87ddcb7214a4 100644 --- a/store/tikv/kv/option.go +++ b/store/tikv/kv/option.go @@ -59,8 +59,6 @@ const ( IsStalenessReadOnly // MatchStoreLabels indicates the labels the store should be matched MatchStoreLabels - // KVFilter filters out the key-value pairs in the memBuf that is unnecessary to be committed - KVFilter ) // Priority value for transaction priority. diff --git a/store/tikv/tests/2pc_test.go b/store/tikv/tests/2pc_test.go index 9313e6f0abeb8..60af78db111ff 100644 --- a/store/tikv/tests/2pc_test.go +++ b/store/tikv/tests/2pc_test.go @@ -1032,7 +1032,7 @@ func (s *testCommitterSuite) TestResolvePessimisticLock(c *C) { untouchedIndexValue := []byte{0, 0, 0, 0, 0, 0, 0, 1, 49} noValueIndexKey := []byte("t00000001_i000000002") txn := s.begin(c) - txn.SetOption(kv.KVFilter, drivertxn.TiDBKVFilter{}) + txn.SetKVFilter(drivertxn.TiDBKVFilter{}) err := txn.Set(untouchedIndexKey, untouchedIndexValue) c.Assert(err, IsNil) lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tikv.LockNoWait} diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 5133a6deb35f6..e78eaba81febe 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -73,8 +73,8 @@ type KVTxn struct { schemaAmender SchemaAmender // commitCallback is called after current transaction gets committed commitCallback func(info string, err error) - - binlog BinlogExecutor + binlog BinlogExecutor + kvFilter KVFilter } func newTiKVTxn(store *KVStore, txnScope string) (*KVTxn, error) { @@ -198,18 +198,16 @@ func (txn *KVTxn) DelOption(opt int) { txn.us.DelOption(opt) } +// SetKVFilter sets the filter to ignore key-values in memory buffer. +func (txn *KVTxn) SetKVFilter(filter KVFilter) { + txn.kvFilter = filter +} + // IsPessimistic returns true if it is pessimistic. func (txn *KVTxn) IsPessimistic() bool { return txn.us.GetOption(kv.Pessimistic) != nil } -func (txn *KVTxn) getKVFilter() KVFilter { - if filter := txn.us.GetOption(kv.KVFilter); filter != nil { - return filter.(KVFilter) - } - return nil -} - // Commit commits the transaction operations to KV store. func (txn *KVTxn) Commit(ctx context.Context) error { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {