diff --git a/br/pkg/lightning/restore/check_info.go b/br/pkg/lightning/restore/check_info.go index a4e49a2fbc274..49a8a8a9427d1 100644 --- a/br/pkg/lightning/restore/check_info.go +++ b/br/pkg/lightning/restore/check_info.go @@ -38,7 +38,6 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/table" @@ -179,6 +178,15 @@ func (rc *Controller) ClusterIsAvailable(ctx context.Context) error { return nil } +func isTiFlash(store *api.MetaStore) bool { + for _, label := range store.Labels { + if label.Key == "engine" && label.Value == "tiflash" { + return true + } + } + return false +} + func (rc *Controller) checkEmptyRegion(ctx context.Context) error { passed := true message := "Cluster doesn't have too many empty regions" @@ -206,7 +214,7 @@ func (rc *Controller) checkEmptyRegion(ctx context.Context) error { } } for _, store := range storeInfo.Stores { - stores[store.Store.Id] = store + stores[store.Store.StoreID] = store } tableCount := 0 for _, db := range rc.dbMetas { @@ -224,10 +232,10 @@ func (rc *Controller) checkEmptyRegion(ctx context.Context) error { ) for storeID, regionCnt := range regions { if store, ok := stores[storeID]; ok { - if store.Store.State != metapb.StoreState_Up { + if metapb.StoreState(metapb.StoreState_value[store.Store.StateName]) != metapb.StoreState_Up { continue } - if version.IsTiFlash(store.Store.Store) { + if isTiFlash(store.Store) { continue } if regionCnt > errorThrehold { @@ -269,10 +277,10 @@ func (rc *Controller) checkRegionDistribution(ctx context.Context) error { } stores := make([]*api.StoreInfo, 0, len(result.Stores)) for _, store := range result.Stores { - if store.Store.State != metapb.StoreState_Up { + if metapb.StoreState(metapb.StoreState_value[store.Store.StateName]) != metapb.StoreState_Up { continue } - if version.IsTiFlash(store.Store.Store) { + if isTiFlash(store.Store) { continue } stores = append(stores, store) @@ -302,11 +310,11 @@ func (rc *Controller) checkRegionDistribution(ctx context.Context) error { passed = false message = fmt.Sprintf("Region distribution is unbalanced, the ratio of the regions count of the store(%v) "+ "with least regions(%v) to the store(%v) with most regions(%v) is %v, but we expect it must not be less than %v", - minStore.Store.Id, minStore.Status.RegionCount, maxStore.Store.Id, maxStore.Status.RegionCount, ratio, errorRegionCntMinMaxRatio) + minStore.Store.StoreID, minStore.Status.RegionCount, maxStore.Store.StoreID, maxStore.Status.RegionCount, ratio, errorRegionCntMinMaxRatio) } else if ratio < warnRegionCntMinMaxRatio { message = fmt.Sprintf("Region distribution is unbalanced, the ratio of the regions count of the store(%v) "+ "with least regions(%v) to the store(%v) with most regions(%v) is %v, but we expect it should not be less than %v", - minStore.Store.Id, minStore.Status.RegionCount, maxStore.Store.Id, maxStore.Status.RegionCount, ratio, warnRegionCntMinMaxRatio) + minStore.Store.StoreID, minStore.Status.RegionCount, maxStore.Store.StoreID, maxStore.Status.RegionCount, ratio, warnRegionCntMinMaxRatio) } return nil } diff --git a/br/pkg/lightning/restore/restore_test.go b/br/pkg/lightning/restore/restore_test.go index 19c68b9db287f..d66c7c9685aa5 100644 --- a/br/pkg/lightning/restore/restore_test.go +++ b/br/pkg/lightning/restore/restore_test.go @@ -1979,7 +1979,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) { testCases := []testCase{ { stores: api.StoresInfo{Stores: []*api.StoreInfo{ - {Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 200}}, + {Store: &api.MetaStore{StoreID: 1}, Status: &api.StoreStatus{RegionCount: 200}}, }}, emptyRegions: api.RegionsInfo{ Regions: append([]api.RegionInfo(nil), makeRegions(100, 1)...), @@ -1990,9 +1990,9 @@ func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) { }, { stores: api.StoresInfo{Stores: []*api.StoreInfo{ - {Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 2000}}, - {Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 3100}}, - {Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}}, + {Store: &api.MetaStore{StoreID: 1}, Status: &api.StoreStatus{RegionCount: 2000}}, + {Store: &api.MetaStore{StoreID: 2}, Status: &api.StoreStatus{RegionCount: 3100}}, + {Store: &api.MetaStore{StoreID: 3}, Status: &api.StoreStatus{RegionCount: 2500}}, }}, emptyRegions: api.RegionsInfo{ Regions: append(append(append([]api.RegionInfo(nil), @@ -2010,9 +2010,9 @@ func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) { }, { stores: api.StoresInfo{Stores: []*api.StoreInfo{ - {Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 1200}}, - {Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 3000}}, - {Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}}, + {Store: &api.MetaStore{StoreID: 1}, Status: &api.StoreStatus{RegionCount: 1200}}, + {Store: &api.MetaStore{StoreID: 2}, Status: &api.StoreStatus{RegionCount: 3000}}, + {Store: &api.MetaStore{StoreID: 3}, Status: &api.StoreStatus{RegionCount: 2500}}, }}, expectMsgs: []string{".*Region distribution is unbalanced.*but we expect it must not be less than 0.5.*"}, expectResult: false, @@ -2020,9 +2020,9 @@ func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) { }, { stores: api.StoresInfo{Stores: []*api.StoreInfo{ - {Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 0}}, - {Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 2800}}, - {Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}}, + {Store: &api.MetaStore{StoreID: 1}, Status: &api.StoreStatus{RegionCount: 0}}, + {Store: &api.MetaStore{StoreID: 2}, Status: &api.StoreStatus{RegionCount: 2800}}, + {Store: &api.MetaStore{StoreID: 3}, Status: &api.StoreStatus{RegionCount: 2500}}, }}, expectMsgs: []string{".*Region distribution is unbalanced.*but we expect it must not be less than 0.5.*"}, expectResult: false, diff --git a/go.mod b/go.mod index 4f83f251d4d54..b0216c411f6e3 100644 --- a/go.mod +++ b/go.mod @@ -48,7 +48,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211009033009-93128226aaa3 github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20211029081837-3c7bd947cf9b + github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f github.com/pingcap/log v0.0.0-20210906054005-afc726e70354 github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5 github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible @@ -66,7 +66,7 @@ require ( github.com/stretchr/testify v1.7.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211118154139-b11da6307c6f - github.com/tikv/pd v1.1.0-beta.0.20211104095303-69c86d05d379 + github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee github.com/twmb/murmur3 v1.1.3 github.com/uber/jaeger-client-go v2.22.1+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible // indirect @@ -76,7 +76,7 @@ require ( go.etcd.io/etcd v0.5.0-alpha.5.0.20210512015243-d19fbe541bf9 go.uber.org/atomic v1.9.0 go.uber.org/automaxprocs v1.4.0 - go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 + go.uber.org/goleak v1.1.12 go.uber.org/multierr v1.7.0 go.uber.org/zap v1.19.1 golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420 diff --git a/go.sum b/go.sum index 4e8fbc01390bd..ccf7a5cb71fa6 100644 --- a/go.sum +++ b/go.sum @@ -582,8 +582,9 @@ github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210819164333-bd5706b9d9f2/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210915062418-0f5764a128ad/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20211029081837-3c7bd947cf9b h1:/aj6ITlHSJZmsm4hIMOgJAAZti+Dmq11tCyKedA6Dcs= -github.com/pingcap/kvproto v0.0.0-20211029081837-3c7bd947cf9b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20211109071446-a8b4d34474bc/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f h1:hjInxK1Ie6CYx7Jy2pYnBdEnWI8jIfr423l9Yh6LRy8= +github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= @@ -594,7 +595,7 @@ github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041 github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5 h1:7rvAtZe/ZUzOKzgriNPQoBNvleJXBk4z7L3Z47+tS98= github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5/go.mod h1:XsOaV712rUk63aOEKYP9PhXTIE3FMNHmC2r1wX5wElY= github.com/pingcap/tidb-dashboard v0.0.0-20211008050453-a25c25809529/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ= -github.com/pingcap/tidb-dashboard v0.0.0-20211031170437-08e58c069a2a/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ= +github.com/pingcap/tidb-dashboard v0.0.0-20211107164327-80363dfbe884/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ= github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible h1:c7+izmker91NkjkZ6FgTlmD4k1A5FLOAq+li6Ki2/GY= github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tipb v0.0.0-20211116093845-e9b045a0bdf8 h1:Vu/6oq8EFNWgyXRHiclNzTKIu+YKHPCSI/Ba5oVrLtM= @@ -713,8 +714,8 @@ github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211118154139-b11da6307c6f h1:UyJjp3wGIjf1edGiQiIdAtL5QFqaqR4+s3LDwUZU7NY= github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211118154139-b11da6307c6f/go.mod h1:BEAS0vXm5BorlF/HTndqGwcGDvaiwe7B7BkfgwwZMJ4= github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae/go.mod h1:varH0IE0jJ9E9WN2Ei/N6pajMlPkcXdDEf7f5mmsUVQ= -github.com/tikv/pd v1.1.0-beta.0.20211104095303-69c86d05d379 h1:nFm1jQDz1iRktoyV2SyM5zVk6+PJHQNunJZ7ZJcqzAo= -github.com/tikv/pd v1.1.0-beta.0.20211104095303-69c86d05d379/go.mod h1:y+09hAUXJbrd4c0nktL74zXDDuD7atGtfOKxL90PCOE= +github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee h1:rAAdvQ8Hh36syHr92g0VmZEpkH+40RGQBpFL2121xMs= +github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee/go.mod h1:lRbwxBAhnTQR5vqbTzeI/Bj62bD2OvYYuFezo2vrmeI= github.com/tklauser/go-sysconf v0.3.4 h1:HT8SVixZd3IzLdfs/xlpq0jeSfTX57g1v6wB1EuzV7M= github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek= github.com/tklauser/numcpus v0.2.1 h1:ct88eFm+Q7m2ZfXJdan1xYoXKlmwsfP+k88q05KvlZc= @@ -804,8 +805,9 @@ go.uber.org/dig v1.8.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw= go.uber.org/fx v1.10.0/go.mod h1:vLRicqpG/qQEzno4SYU86iCwfT95EZza+Eba0ItuxqY= go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= -go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= diff --git a/server/conn.go b/server/conn.go index f74c00a5550cc..31ccba25cf881 100644 --- a/server/conn.go +++ b/server/conn.go @@ -888,7 +888,7 @@ func (cc *clientConn) checkAuthPlugin(ctx context.Context, resp *handshakeRespon func (cc *clientConn) PeerHost(hasPassword string) (host, port string, err error) { if len(cc.peerHost) > 0 { - return cc.peerHost, "", nil + return cc.peerHost, cc.peerPort, nil } host = variable.DefHostname if cc.isUnixSocket { diff --git a/server/server_test.go b/server/server_test.go index 210e58caed3f8..f9d22e866458a 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -779,7 +779,7 @@ func (cli *testServerClient) runTestLoadDataForListColumnPartition2(t *testing.T }) } -func (cli *testServerClient) checkRows(t *testing.T, rows *sql.Rows, expectedRows ...string) { +func (cli *testServerClient) Rows(t *testing.T, rows *sql.Rows) []string { buf := bytes.NewBuffer(nil) result := make([]string, 0, 2) for rows.Next() { @@ -806,7 +806,11 @@ func (cli *testServerClient) checkRows(t *testing.T, rows *sql.Rows, expectedRow } result = append(result, buf.String()) } + return result +} +func (cli *testServerClient) checkRows(t *testing.T, rows *sql.Rows, expectedRows ...string) { + result := cli.Rows(t, rows) require.Equal(t, strings.Join(expectedRows, "\n"), strings.Join(result, "\n")) } diff --git a/server/tidb_serial_test.go b/server/tidb_serial_test.go index 0431baa32fa8f..b5f2483584052 100644 --- a/server/tidb_serial_test.go +++ b/server/tidb_serial_test.go @@ -11,6 +11,8 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +//go:build !race +// +build !race package server diff --git a/server/tidb_test.go b/server/tidb_test.go index 01a19d70df6d1..2536752d3b0fe 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -528,6 +528,9 @@ func TestSocketAndIp(t *testing.T) { cli.checkRows(t, rows, "user1@127.0.0.1") rows = dbt.MustQuery("show grants") cli.checkRows(t, rows, "GRANT USAGE ON *.* TO 'user1'@'%'\nGRANT SELECT ON test.* TO 'user1'@'%'") + rows = dbt.MustQuery("select host from information_schema.processlist where user = 'user1'") + records := cli.Rows(t, rows) + require.Contains(t, records[0], ":", "Missing : in is.processlist") }) // Test with unix domain socket file connection with all hosts cli.runTests(t, func(config *mysql.Config) { diff --git a/store/mockstore/unistore/tikv/mvcc.go b/store/mockstore/unistore/tikv/mvcc.go index 47a59a3cb1f75..bba8d53409352 100644 --- a/store/mockstore/unistore/tikv/mvcc.go +++ b/store/mockstore/unistore/tikv/mvcc.go @@ -240,7 +240,7 @@ func (store *MVCCStore) PessimisticLock(reqCtx *requestCtx, req *kvrpcpb.Pessimi for _, m := range mutations { lock, err := store.checkConflictInLockStore(reqCtx, m, startTS) if err != nil { - var resourceGroupTag []byte = nil + var resourceGroupTag []byte if req.Context != nil { resourceGroupTag = req.Context.ResourceGroupTag } @@ -1098,34 +1098,56 @@ func (store *MVCCStore) checkCommitted(reader *dbreader.DBReader, key []byte, st return 0, nil } -func checkLock(lock mvcc.Lock, key []byte, startTS uint64, resolved []uint64) error { - if isResolved(lock.StartTS, resolved) { - return nil +// LockPair contains a pair of key and lock. It's used for reading through locks. +type LockPair struct { + key []byte + lock *mvcc.Lock +} + +func getValueFromLock(lock *mvcc.Lock) []byte { + if lock.Op == byte(kvrpcpb.Op_Put) { + // lock owns the value so needn't to safeCopy it. + return lock.Value + } + return nil +} + +// *LockPair is not nil if the lock in the committed timestamp set. Read operations can get value from it without deep copy. +func checkLock(lock mvcc.Lock, key []byte, startTS uint64, resolved []uint64, committed []uint64) (*LockPair, error) { + if inTSSet(lock.StartTS, resolved) { + return nil, nil } lockVisible := lock.StartTS <= startTS isWriteLock := lock.Op == uint8(kvrpcpb.Op_Put) || lock.Op == uint8(kvrpcpb.Op_Del) isPrimaryGet := startTS == maxSystemTS && bytes.Equal(lock.Primary, key) && !lock.UseAsyncCommit if lockVisible && isWriteLock && !isPrimaryGet { - return BuildLockErr(safeCopy(key), &lock) + if inTSSet(lock.StartTS, committed) { + return &LockPair{safeCopy(key), &lock}, nil + } + return nil, BuildLockErr(safeCopy(key), &lock) } - return nil + return nil, nil } // CheckKeysLock implements the MVCCStore interface. -func (store *MVCCStore) CheckKeysLock(startTS uint64, resolved []uint64, keys ...[]byte) error { +func (store *MVCCStore) CheckKeysLock(startTS uint64, resolved, committed []uint64, keys ...[]byte) ([]*LockPair, error) { var buf []byte + var lockPairs []*LockPair for _, key := range keys { buf = store.lockStore.Get(key, buf) if len(buf) == 0 { continue } lock := mvcc.DecodeLock(buf) - err := checkLock(lock, key, startTS, resolved) + lockPair, err := checkLock(lock, key, startTS, resolved, committed) + if lockPair != nil { + lockPairs = append(lockPairs, lockPair) + } if err != nil { - return err + return nil, err } } - return nil + return lockPairs, nil } // CheckRangeLock implements the MVCCStore interface. @@ -1136,7 +1158,7 @@ func (store *MVCCStore) CheckRangeLock(startTS uint64, startKey, endKey []byte, break } lock := mvcc.DecodeLock(it.Value()) - err := checkLock(lock, it.Key(), startTS, resolved) + _, err := checkLock(lock, it.Key(), startTS, resolved, nil) if err != nil { return err } @@ -1386,14 +1408,32 @@ func (store *MVCCStore) DeleteFileInRange(start, end []byte) { store.db.DeleteFilesInRange(start, end) } +// Get implements the MVCCStore interface. +func (store *MVCCStore) Get(reqCtx *requestCtx, key []byte, version uint64) ([]byte, error) { + lockPairs, err := store.CheckKeysLock(version, reqCtx.rpcCtx.ResolvedLocks, reqCtx.rpcCtx.CommittedLocks, key) + if err != nil { + return nil, err + } + if len(lockPairs) != 0 { + return getValueFromLock(lockPairs[0].lock), nil + } + val, err := reqCtx.getDBReader().Get(key, version) + return safeCopy(val), err +} + // BatchGet implements the MVCCStore interface. func (store *MVCCStore) BatchGet(reqCtx *requestCtx, keys [][]byte, version uint64) []*kvrpcpb.KvPair { pairs := make([]*kvrpcpb.KvPair, 0, len(keys)) remain := make([][]byte, 0, len(keys)) for _, key := range keys { - err := store.CheckKeysLock(version, reqCtx.rpcCtx.ResolvedLocks, key) + lockPairs, err := store.CheckKeysLock(version, reqCtx.rpcCtx.ResolvedLocks, reqCtx.rpcCtx.CommittedLocks, key) if err != nil { pairs = append(pairs, &kvrpcpb.KvPair{Key: key, Error: convertToKeyError(err)}) + } else if len(lockPairs) != 0 { + value := getValueFromLock(lockPairs[0].lock) + if value != nil { + pairs = append(pairs, &kvrpcpb.KvPair{Key: key, Value: value}) + } } else { remain = append(remain, key) } @@ -1411,7 +1451,7 @@ func (store *MVCCStore) BatchGet(reqCtx *requestCtx, keys [][]byte, version uint return pairs } -func (store *MVCCStore) collectRangeLock(startTS uint64, startKey, endKey []byte, resolved []uint64) []*kvrpcpb.KvPair { +func (store *MVCCStore) collectRangeLock(startTS uint64, startKey, endKey []byte, resolved, committed []uint64) []*kvrpcpb.KvPair { var pairs []*kvrpcpb.KvPair it := store.lockStore.NewIterator() for it.Seek(startKey); it.Valid(); it.Next() { @@ -1419,8 +1459,14 @@ func (store *MVCCStore) collectRangeLock(startTS uint64, startKey, endKey []byte break } lock := mvcc.DecodeLock(it.Value()) - err := checkLock(lock, it.Key(), startTS, resolved) - if err != nil { + lockPair, err := checkLock(lock, it.Key(), startTS, resolved, committed) + if lockPair != nil { + pairs = append(pairs, &kvrpcpb.KvPair{ + Key: lockPair.key, + // deleted key's value is nil + Value: getValueFromLock(lockPair.lock), + }) + } else if err != nil { pairs = append(pairs, &kvrpcpb.KvPair{ Error: convertToKeyError(err), Key: safeCopy(it.Key()), @@ -1430,8 +1476,8 @@ func (store *MVCCStore) collectRangeLock(startTS uint64, startKey, endKey []byte return pairs } -func isResolved(startTS uint64, resolved []uint64) bool { - for _, v := range resolved { +func inTSSet(startTS uint64, tsSet []uint64) bool { + for _, v := range tsSet { if startTS == v { return true } @@ -1486,7 +1532,7 @@ func (store *MVCCStore) Scan(reqCtx *requestCtx, req *kvrpcpb.ScanRequest) []*kv var lockPairs []*kvrpcpb.KvPair limit := req.GetLimit() if req.SampleStep == 0 { - lockPairs = store.collectRangeLock(req.GetVersion(), startKey, endKey, req.Context.ResolvedLocks) + lockPairs = store.collectRangeLock(req.GetVersion(), startKey, endKey, reqCtx.rpcCtx.ResolvedLocks, reqCtx.rpcCtx.CommittedLocks) } else { limit = req.SampleStep * limit } @@ -1506,31 +1552,26 @@ func (store *MVCCStore) Scan(reqCtx *requestCtx, req *kvrpcpb.ScanRequest) []*kv }) return scanProc.pairs } - pairs := append(scanProc.pairs, lockPairs...) - sort.Slice(pairs, func(i, j int) bool { + pairs := append(lockPairs, scanProc.pairs...) + sort.SliceStable(pairs, func(i, j int) bool { cmp := bytes.Compare(pairs[i].Key, pairs[j].Key) if req.Reverse { cmp = -cmp } - if cmp < 0 { - return true - } else if cmp > 0 { - return false - } - return pairs[i].Error != nil + return cmp < 0 }) validPairs := pairs[:0] - var prevErr *kvrpcpb.KvPair + var prev *kvrpcpb.KvPair for _, pair := range pairs { - if prevErr != nil && bytes.Equal(prevErr.Key, pair.Key) { + if prev != nil && bytes.Equal(prev.Key, pair.Key) { continue } - if pair.Error != nil { - prevErr = pair - } - validPairs = append(validPairs, pair) - if len(validPairs) >= int(limit) { - break + prev = pair + if pair.Error != nil || len(pair.Value) != 0 { + validPairs = append(validPairs, pair) + if len(validPairs) >= int(limit) { + break + } } } return validPairs diff --git a/store/mockstore/unistore/tikv/mvcc_test.go b/store/mockstore/unistore/tikv/mvcc_test.go index 1f2f4fe15d8de..f9d681511ced4 100644 --- a/store/mockstore/unistore/tikv/mvcc_test.go +++ b/store/mockstore/unistore/tikv/mvcc_test.go @@ -153,8 +153,12 @@ func PessimisticLock(pk []byte, key []byte, startTs uint64, lockTTL uint64, forU // PrewriteOptimistic raises optimistic prewrite requests on store func PrewriteOptimistic(pk []byte, key []byte, value []byte, startTs uint64, lockTTL uint64, minCommitTs uint64, useAsyncCommit bool, secondaries [][]byte, store *TestStore) error { + op := kvrpcpb.Op_Put + if value == nil { + op = kvrpcpb.Op_Del + } prewriteReq := &kvrpcpb.PrewriteRequest{ - Mutations: []*kvrpcpb.Mutation{newMutation(kvrpcpb.Op_Put, key, value)}, + Mutations: []*kvrpcpb.Mutation{newMutation(op, key, value)}, PrimaryLock: pk, StartVersion: startTs, LockTtl: lockTTL, @@ -416,21 +420,19 @@ func MustGetVal(key, val []byte, startTs uint64, store *TestStore) { } func MustGetErr(key []byte, startTs uint64, store *TestStore) { - _, err := kvGet(key, startTs, store) + _, err := kvGet(key, startTs, nil, nil, store) require.Error(store.t, err) } -func kvGet(key []byte, readTs uint64, store *TestStore) ([]byte, error) { - err := store.MvccStore.CheckKeysLock(readTs, nil, key) - if err != nil { - return nil, err - } - getVal, err := store.newReqCtx().getDBReader().Get(key, readTs) - return getVal, err +func kvGet(key []byte, readTs uint64, resolved, committed []uint64, store *TestStore) ([]byte, error) { + reqCtx := store.newReqCtx() + reqCtx.rpcCtx.ResolvedLocks = resolved + reqCtx.rpcCtx.CommittedLocks = committed + return store.MvccStore.Get(reqCtx, key, readTs) } func MustGet(key []byte, readTs uint64, store *TestStore) (val []byte) { - val, err := kvGet(key, readTs, store) + val, err := kvGet(key, readTs, nil, nil, store) require.NoError(store.t, err) return val } @@ -1550,3 +1552,100 @@ func TestAsyncCommitPrewrite(t *testing.T) { require.Greater(t, secLock.MinCommitTS, uint64(0)) require.Equal(t, 0, bytes.Compare(secLock.Value, secVal2)) } + +func TestAccessCommittedLocks(t *testing.T) { + t.Parallel() + store, close := NewTestStore("basic_optimistic_db", "basic_optimistic_log", t) + defer close() + + k0 := []byte("t0") + v0 := []byte("v0") + MustLoad(10, 20, store, "t0:v0") + // delete + MustPrewriteDelete(k0, k0, 30, store) + MustGetErr(k0, 40, store) + // meet lock + val, err := kvGet(k0, 40, []uint64{20}, nil, store) + require.Error(store.t, err) + require.Nil(store.t, val) + val, err = kvGet(k0, 40, []uint64{20}, []uint64{20}, store) + require.Error(store.t, err) + require.Nil(store.t, val) + // ignore lock + val, err = kvGet(k0, 40, []uint64{30}, nil, store) + require.NoError(store.t, err) + require.Equal(store.t, v0, val) + // access lock + val, err = kvGet(k0, 40, nil, []uint64{30}, store) + require.NoError(store.t, err) + require.Nil(store.t, val) + + k1 := []byte("t1") + v1 := []byte("v1") + // put + MustPrewritePut(k1, k1, v1, 50, store) + // ignore lock + val, err = kvGet(k1, 60, []uint64{50}, nil, store) + require.NoError(store.t, err) + require.Len(store.t, val, 0) + // access lock + val, err = kvGet(k1, 60, nil, []uint64{50}, store) + require.NoError(store.t, err) + require.Equal(store.t, v1, val) + + // locked + k2 := []byte("t2") + v2 := []byte("v2") + MustPrewritePut(k2, k2, v2, 70, store) + + // lock for ingore + k3 := []byte("t3") + v3 := []byte("v3") + MustPrewritePut(k3, k3, v3, 80, store) + + // No lock + k4 := []byte("t4") + v4 := []byte("v4") + MustLoad(80, 90, store, "t4:v4") + + keys := [][]byte{k0, k1, k2, k3, k4} + expected := []struct { + key []byte + val []byte + err bool + }{{k1, v1, false}, {k2, nil, true}, {k4, v4, false}} + reqCtx := store.newReqCtx() + reqCtx.rpcCtx.ResolvedLocks = []uint64{80} + reqCtx.rpcCtx.CommittedLocks = []uint64{30, 50} + pairs := store.MvccStore.BatchGet(reqCtx, keys, 100) + require.Equal(store.t, len(expected), len(pairs)) + for i, pair := range pairs { + e := expected[i] + require.Equal(store.t, pair.Key, e.key) + require.Equal(store.t, pair.Value, e.val) + if e.err { + require.NotNil(store.t, pair.Error) + } else { + require.Nil(store.t, pair.Error) + } + } + + scanReq := &kvrpcpb.ScanRequest{ + StartKey: []byte("t0"), + EndKey: []byte("t5"), + Limit: 100, + Version: 100, + } + pairs = store.MvccStore.Scan(reqCtx, scanReq) + require.Equal(store.t, len(expected), len(pairs)) + for i, pair := range pairs { + e := expected[i] + require.Equal(store.t, pair.Key, e.key) + require.Equal(store.t, pair.Value, e.val) + if e.err { + require.NotNil(store.t, pair.Error) + } else { + require.Nil(store.t, pair.Error) + } + } +} diff --git a/store/mockstore/unistore/tikv/server.go b/store/mockstore/unistore/tikv/server.go index 8038acac31922..940f2770ecf0a 100644 --- a/store/mockstore/unistore/tikv/server.go +++ b/store/mockstore/unistore/tikv/server.go @@ -155,20 +155,10 @@ func (svr *Server) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb if reqCtx.regErr != nil { return &kvrpcpb.GetResponse{RegionError: reqCtx.regErr}, nil } - err = svr.mvccStore.CheckKeysLock(req.GetVersion(), req.Context.ResolvedLocks, req.Key) - if err != nil { - return &kvrpcpb.GetResponse{Error: convertToKeyError(err)}, nil - } - reader := reqCtx.getDBReader() - val, err := reader.Get(req.Key, req.GetVersion()) - if err != nil { - return &kvrpcpb.GetResponse{ - Error: convertToKeyError(err), - }, nil - } - val = safeCopy(val) + val, err := svr.mvccStore.Get(reqCtx, req.Key, req.Version) return &kvrpcpb.GetResponse{ Value: val, + Error: convertToKeyError(err), }, nil }