Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: make kv pairs converted from locks invisible #42409

Merged
merged 3 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,15 +450,17 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if !e.txn.Valid() {
return kv.ErrInvalidTxn
}
membuf := e.txn.GetMemBuffer()
for _, idxKey := range indexKeys {
handleVal := handleVals[string(idxKey)]
if len(handleVal) == 0 {
continue
}
err = membuf.Set(idxKey, handleVal)
if err != nil {
return err
membuf, ok := e.txn.GetMemBuffer().(interface{ ChangeLockIntoPut(kv.Key, []byte) error })
if ok {
for _, idxKey := range indexKeys {
handleVal := handleVals[string(idxKey)]
if len(handleVal) == 0 {
continue
}
err = membuf.ChangeLockIntoPut(idxKey, handleVal)
if err != nil {
return err
}
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,12 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.txn.Valid() {
return kv.ErrInvalidTxn
}
memBuffer := e.txn.GetMemBuffer()
err = memBuffer.Set(e.idxKey, e.handleVal)
if err != nil {
return err
membuf, ok := e.txn.GetMemBuffer().(interface{ ChangeLockIntoPut(kv.Key, []byte) error })
if ok {
err = membuf.ChangeLockIntoPut(e.idxKey, e.handleVal)
if err != nil {
return err
}
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
type tikvTxn struct {
*tikv.KVTxn
idxNameCache map[int64]*model.TableInfo
invisibleKeys map[string]struct{}
snapshotInterceptor kv.SnapshotInterceptor
// columnMapsCache is a cache used for the mutation checker
columnMapsCache interface{}
Expand All @@ -54,7 +55,7 @@ func NewTiKVTxn(txn *tikv.KVTxn) kv.Transaction {
totalLimit := atomic.LoadUint64(&kv.TxnTotalSizeLimit)
txn.GetUnionStore().SetEntrySizeLimit(entryLimit, totalLimit)

return &tikvTxn{txn, make(map[int64]*model.TableInfo), nil, nil}
return &tikvTxn{txn, make(map[int64]*model.TableInfo), make(map[string]struct{}), nil, nil}
}

func (txn *tikvTxn) GetTableInfo(id int64) *model.TableInfo {
Expand Down Expand Up @@ -171,7 +172,7 @@ func (txn *tikvTxn) Set(k kv.Key, v []byte) error {
}

func (txn *tikvTxn) GetMemBuffer() kv.MemBuffer {
return newMemBuffer(txn.KVTxn.GetMemBuffer())
return newMemBuffer(txn.KVTxn.GetMemBuffer(), txn.invisibleKeys)
}

func (txn *tikvTxn) SetOption(opt int, val interface{}) {
Expand Down
96 changes: 87 additions & 9 deletions store/driver/txn/unionstore_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,73 @@ import (

"github.com/pingcap/tidb/kv"
derr "github.com/pingcap/tidb/store/driver/error"
tikverr "github.com/tikv/client-go/v2/error"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/util"
)

type visibilityChecker interface {
isVisible(kv.Key) bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is a bit vague. The status is 3-valued: (1)exist + visible, (2)exist + invisible, (3)absent(unknown).
It returns true iff (1) || (3).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about to rename it to invisible?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Or something like isVisibleOrUnknown(?)

}

// memBuffer wraps tikv.MemDB as kv.MemBuffer.
type memBuffer struct {
*tikv.MemDB
invisibleKeys map[string]struct{}
}

func newMemBuffer(m *tikv.MemDB) kv.MemBuffer {
func newMemBuffer(m *tikv.MemDB, invisibleKeys map[string]struct{}) kv.MemBuffer {
if m == nil {
return nil
}
return &memBuffer{MemDB: m}
return &memBuffer{MemDB: m, invisibleKeys: invisibleKeys}
}

func (m *memBuffer) addInvisibleKey(k kv.Key) {
m.Lock()
m.invisibleKeys[util.String(k)] = struct{}{}
m.Unlock()
}

func (m *memBuffer) delInvisibleKey(k kv.Key) {
m.Lock()
delete(m.invisibleKeys, util.String(k))
m.Unlock()
}

func (m *memBuffer) isVisible(k kv.Key) bool {
// shall be protected by MemBuffer.RLock
_, ok := m.invisibleKeys[util.String(k)]
return !ok
}

func (m *memBuffer) Size() int {
return m.MemDB.Size()
}

func (m *memBuffer) Delete(k kv.Key) error {
return m.MemDB.Delete(k)
err := m.MemDB.Delete(k)
m.delInvisibleKey(k)
return derr.ToTiDBErr(err)
}

func (m *memBuffer) DeleteWithFlags(k kv.Key, ops ...kv.FlagsOp) error {
err := m.MemDB.DeleteWithFlags(k, getTiKVFlagsOps(ops)...)
m.delInvisibleKey(k)
return derr.ToTiDBErr(err)
}

func (m *memBuffer) Get(_ context.Context, key kv.Key) ([]byte, error) {
if !m.isVisible(key) {
return nil, kv.ErrNotExist
}
data, err := m.MemDB.Get(key)
return data, derr.ToTiDBErr(err)
}

func (m *memBuffer) GetFlags(key kv.Key) (kv.KeyFlags, error) {
// do not check `invisibleKeys` here since LockKeys may set flags on keys and those flags are always visible.
data, err := m.MemDB.GetFlags(key)
return getTiDBKeyFlags(data), derr.ToTiDBErr(err)
}
Expand All @@ -79,21 +111,32 @@ func (m *memBuffer) InspectStage(handle kv.StagingHandle, f func(kv.Key, kv.KeyF

func (m *memBuffer) Set(key kv.Key, value []byte) error {
err := m.MemDB.Set(key, value)
m.delInvisibleKey(key)
return derr.ToTiDBErr(err)
}

func (m *memBuffer) SetWithFlags(key kv.Key, value []byte, ops ...kv.FlagsOp) error {
err := m.MemDB.SetWithFlags(key, value, getTiKVFlagsOps(ops)...)
m.delInvisibleKey(key)
return derr.ToTiDBErr(err)
}

func (m *memBuffer) ChangeLockIntoPut(key kv.Key, value []byte) error {
// only change LOCK into PUT when the key does not existed, otherwise, we may mark a visible key as invisible.
if _, err := m.MemDB.Get(key); tikverr.IsErrNotFound(err) {
m.addInvisibleKey(key)
return derr.ToTiDBErr(m.MemDB.Set(key, value))
}
return nil
}

// Iter creates an Iterator positioned on the first entry that k <= entry's key.
// If such entry is not found, it returns an invalid Iterator with no error.
// It yields only keys that < upperBound. If upperBound is nil, it means the upperBound is unbounded.
// The Iterator must be Closed after use.
func (m *memBuffer) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
it, err := m.MemDB.Iter(k, upperBound)
return &tikvIterator{Iterator: it}, derr.ToTiDBErr(err)
return newKVIterator(it, m), derr.ToTiDBErr(err)
}

// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
Expand All @@ -102,42 +145,77 @@ func (m *memBuffer) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
// TODO: Add lower bound limit
func (m *memBuffer) IterReverse(k kv.Key) (kv.Iterator, error) {
it, err := m.MemDB.IterReverse(k)
return &tikvIterator{Iterator: it}, derr.ToTiDBErr(err)
return newKVIterator(it, m), derr.ToTiDBErr(err)
}

// SnapshotIter returns a Iterator for a snapshot of MemBuffer.
func (m *memBuffer) SnapshotIter(k, upperbound kv.Key) kv.Iterator {
it := m.MemDB.SnapshotIter(k, upperbound)
return &tikvIterator{Iterator: it}
return newKVIterator(it, m)
}

// SnapshotGetter returns a Getter for a snapshot of MemBuffer.
func (m *memBuffer) SnapshotGetter() kv.Getter {
return newKVGetter(m.MemDB.SnapshotGetter())
return newKVGetter(m.MemDB.SnapshotGetter(), m)
}

type tikvGetter struct {
tikv.Getter
checker visibilityChecker
}

func newKVGetter(getter tikv.Getter) kv.Getter {
return &tikvGetter{Getter: getter}
func newKVGetter(getter tikv.Getter, checker visibilityChecker) kv.Getter {
return &tikvGetter{getter, checker}
}

func (g *tikvGetter) Get(_ context.Context, k kv.Key) ([]byte, error) {
if !g.checker.isVisible(k) {
return nil, kv.ErrNotExist
}
data, err := g.Getter.Get(k)
return data, derr.ToTiDBErr(err)
}

// tikvIterator wraps tikv.Iterator as kv.Iterator
type tikvIterator struct {
tikv.Iterator
checker visibilityChecker
initErr error
}

func newKVIterator(iterator tikv.Iterator, checker visibilityChecker) kv.Iterator {
it := &tikvIterator{iterator, checker, nil}
if it.Valid() && !it.checker.isVisible(it.Key()) {
// skip first invisible key
it.initErr = it.Next()
}
return it
}

func (it *tikvIterator) Key() kv.Key {
return kv.Key(it.Iterator.Key())
}

func (it *tikvIterator) Next() error {
if it.initErr != nil {
err := it.initErr
it.initErr = nil
return err
}
for {
err := it.Iterator.Next()
if err != nil {
return err
}
if !it.Valid() {
return nil
}
if it.checker.isVisible(it.Key()) {
return nil
}
}
}

func getTiDBKeyFlags(flag tikvstore.KeyFlags) kv.KeyFlags {
var v kv.KeyFlags
if flag.HasPresumeKeyNotExists() {
Expand Down
36 changes: 35 additions & 1 deletion tests/realtikvtest/pessimistictest/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2864,7 +2864,7 @@ func TestChangeLockToPut(t *testing.T) {
tk.MustExec("use test")
tk2.MustExec("use test")

tk.MustExec("drop table if exists tk")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 varchar(20) key, c2 int, c3 int, unique key k1(c2), key k2(c3))")
tk.MustExec(`insert into t1 values ("1", 1, 1), ("2", 2, 2), ("3", 3, 3)`)

Expand Down Expand Up @@ -2912,6 +2912,40 @@ func TestChangeLockToPut(t *testing.T) {
tk.MustExec("admin check table t1")
}

func TestIssue28011(t *testing.T) {
store, clean := realtikvtest.CreateMockStoreAndSetup(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

for _, tt := range []struct {
name string
lockQuery string
finalRows [][]interface{}
}{
{"Update", "update t set b = 'x' where a = 'a'", testkit.Rows("a x", "b y", "c z")},
{"BatchUpdate", "update t set b = 'x' where a in ('a', 'b', 'c')", testkit.Rows("a x", "b y", "c x")},
{"SelectForUpdate", "select a from t where a = 'a' for update", testkit.Rows("a x", "b y", "c z")},
{"BatchSelectForUpdate", "select a from t where a in ('a', 'b', 'c') for update", testkit.Rows("a x", "b y", "c z")},
} {
t.Run(tt.name, func(t *testing.T) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a varchar(10) primary key nonclustered, b varchar(10))")
tk.MustExec("insert into t values ('a', 'x'), ('b', 'x'), ('c', 'z')")
tk.MustExec("begin")
tk.MustExec(tt.lockQuery)
tk.MustQuery("select a from t").Check(testkit.Rows("a", "b", "c"))
tk.MustExec("replace into t values ('b', 'y')")
tk.MustQuery("select a from t").Check(testkit.Rows("a", "b", "c"))
tk.MustQuery("select a, b from t order by a").Check(tt.finalRows)
tk.MustExec("commit")
tk.MustQuery("select a, b from t order by a").Check(tt.finalRows)
tk.MustExec("admin check table t")
})
}
}

func createTable(part bool, columnNames []string, columnTypes []string) string {
var str string
str = "create table t("
Expand Down