diff --git a/config/config.go b/config/config.go index a0e01f7841479..f3be588ec2355 100644 --- a/config/config.go +++ b/config/config.go @@ -69,6 +69,8 @@ const ( DefStoreLivenessTimeout = "5s" // DefTxnScope is the default value for TxnScope DefTxnScope = "global" + // DefStoresRefreshInterval is the default value of StoresRefreshInterval + DefStoresRefreshInterval = 60 ) // Valid config maps @@ -176,6 +178,8 @@ type Config struct { // where M is the element literal length and w is the number of bytes required for the maximum-length character in the character set. // See https://dev.mysql.com/doc/refman/8.0/en/string-type-syntax.html for more details. EnableEnumLengthLimit bool `toml:"enable-enum-length-limit" json:"enable-enum-length-limit"` + // StoresRefreshInterval indicates the interval of refreshing stores info, the unit is second. + StoresRefreshInterval uint64 `toml:"stores-refresh-interval" json:"stores-refresh-interval"` // EnableTCP4Only enables net.Listen("tcp4",...) // Note that: it can make lvs with toa work and thus tidb can get real client ip. EnableTCP4Only bool `toml:"enable-tcp4-only" json:"enable-tcp4-only"` @@ -789,6 +793,7 @@ var defaultConf = Config{ DeprecateIntegerDisplayWidth: false, TxnScope: DefTxnScope, EnableEnumLengthLimit: true, + StoresRefreshInterval: DefStoresRefreshInterval, } var ( diff --git a/config/config_test.go b/config/config_test.go index 425bd348e7133..f13bb486f490d 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -196,6 +196,7 @@ skip-register-to-dashboard = true deprecate-integer-display-length = true txn-scope = "dc-1" enable-enum-length-limit = false +stores-refresh-interval = 30 [performance] txn-total-size-limit=2000 [tikv-client] @@ -273,6 +274,7 @@ spilled-file-encryption-method = "plaintext" c.Assert(conf.DeprecateIntegerDisplayWidth, Equals, true) c.Assert(conf.TxnScope, Equals, "dc-1") c.Assert(conf.EnableEnumLengthLimit, Equals, false) + c.Assert(conf.StoresRefreshInterval, Equals, uint64(30)) _, err = f.WriteString(` [log.file] diff --git a/store/mockstore/mocktikv/cluster_manipulate.go b/store/mockstore/mocktikv/cluster_manipulate.go index bc7a1121ff8da..11512521609f5 100644 --- a/store/mockstore/mocktikv/cluster_manipulate.go +++ b/store/mockstore/mocktikv/cluster_manipulate.go @@ -13,7 +13,11 @@ package mocktikv -import "fmt" +import ( + "fmt" + + "github.com/pingcap/kvproto/pkg/metapb" +) // BootstrapWithSingleStore initializes a Cluster with 1 Region and 1 Store. func BootstrapWithSingleStore(cluster *Cluster) (storeID, peerID, regionID uint64) { @@ -31,7 +35,13 @@ func BootstrapWithMultiStores(cluster *Cluster, n int) (storeIDs, peerIDs []uint leaderPeer = peerIDs[0] regionID = cluster.AllocID() for _, storeID := range storeIDs { - cluster.AddStore(storeID, fmt.Sprintf("store%d", storeID)) + labels := []*metapb.StoreLabel{ + { + Key: "id", + Value: fmt.Sprintf("%v", storeID), + }, + } + cluster.AddStore(storeID, fmt.Sprintf("store%d", storeID), labels...) } cluster.Bootstrap(regionID, storeIDs, peerIDs, leaderPeer) return diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 79901a159190d..8bc435aa60c1a 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/util" @@ -286,7 +287,8 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.storeMu.stores = make(map[uint64]*Store) c.notifyCheckCh = make(chan struct{}, 1) c.closeCh = make(chan struct{}) - go c.asyncCheckAndResolveLoop() + interval := config.GetGlobalConfig().StoresRefreshInterval + go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second) return c } @@ -296,7 +298,9 @@ func (c *RegionCache) Close() { } // asyncCheckAndResolveLoop with -func (c *RegionCache) asyncCheckAndResolveLoop() { +func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() var needCheckStores []*Store for { select { @@ -305,6 +309,18 @@ func (c *RegionCache) asyncCheckAndResolveLoop() { case <-c.notifyCheckCh: needCheckStores = needCheckStores[:0] c.checkAndResolve(needCheckStores) + case <-ticker.C: + // refresh store once a minute to update labels + var stores []*Store + c.storeMu.RLock() + stores = make([]*Store, 0, len(c.storeMu.stores)) + for _, s := range c.storeMu.stores { + stores = append(stores, s) + } + c.storeMu.RUnlock() + for _, store := range stores { + store.reResolve(c) + } } } } @@ -1130,6 +1146,18 @@ func (c *RegionCache) getStoreByStoreID(storeID uint64) (store *Store) { return } +func (c *RegionCache) getStoresByLabels(labels []*metapb.StoreLabel) []*Store { + c.storeMu.RLock() + defer c.storeMu.RUnlock() + s := make([]*Store, 0) + for _, store := range c.storeMu.stores { + if store.IsLabelsMatch(labels) { + s = append(s, store) + } + } + return s +} + // OnRegionEpochNotMatch removes the old region and inserts new regions into the cache. func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, currentRegions []*metapb.Region) error { // Find whether the region epoch in `ctx` is ahead of TiKV's. If so, backoff. @@ -1385,14 +1413,15 @@ func (r *Region) ContainsByEnd(key []byte) bool { // Store contains a kv process's address. type Store struct { - addr string // loaded store address - saddr string // loaded store status address - storeID uint64 // store's id - state uint64 // unsafe store storeState - resolveMutex sync.Mutex // protect pd from concurrent init requests - epoch uint32 // store fail epoch, see RegionStore.storeEpochs - storeType kv.StoreType // type of the store - tokenCount atomic2.Int64 // used store token count + addr string // loaded store address + saddr string // loaded store status address + storeID uint64 // store's id + state uint64 // unsafe store storeState + labels []*metapb.StoreLabel // stored store labels + resolveMutex sync.Mutex // protect pd from concurrent init requests + epoch uint32 // store fail epoch, see RegionStore.storeEpochs + storeType kv.StoreType // type of the store + tokenCount atomic2.Int64 // used store token count } type resolveState uint64 @@ -1439,6 +1468,7 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err s.addr = addr s.saddr = store.GetStatusAddress() s.storeType = GetStoreTypeByMeta(store) + s.labels = store.GetLabels() retry: state = s.getResolveState() if state != unresolved { @@ -1491,9 +1521,9 @@ func (s *Store) reResolve(c *RegionCache) { storeType := GetStoreTypeByMeta(store) addr = store.GetAddress() - if s.addr != addr { + if s.addr != addr || !s.IsSameLabels(store.GetLabels()) { state := resolved - newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType} + newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels()} newStore.state = *(*uint64)(&state) c.storeMu.Lock() c.storeMu.stores[newStore.storeID] = newStore @@ -1547,7 +1577,34 @@ retry: case notifyCheckCh <- struct{}{}: default: } +} + +// IsSameLabels returns whether the store have the same labels with target labels +func (s *Store) IsSameLabels(labels []*metapb.StoreLabel) bool { + if len(s.labels) != len(labels) { + return false + } + return s.IsLabelsMatch(labels) +} +// IsLabelsMatch return whether the store's labels match the target labels +func (s *Store) IsLabelsMatch(labels []*metapb.StoreLabel) bool { + if len(labels) < 1 { + return true + } + for _, targetLabel := range labels { + match := false + for _, label := range s.labels { + if targetLabel.Key == label.Key && targetLabel.Value == label.Value { + match = true + break + } + } + if !match { + return false + } + } + return true } type livenessState uint32 diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index c05c6b025139c..34da8c0dc1728 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -121,6 +121,34 @@ func (s *testRegionCacheSuite) getAddr(c *C, key []byte, replicaRead kv.ReplicaR return ctx.Addr } +func (s *testRegionCacheSuite) TestStoreLabels(c *C) { + testcases := []struct { + storeID uint64 + }{ + { + storeID: s.store1, + }, + { + storeID: s.store2, + }, + } + for _, testcase := range testcases { + c.Log(testcase.storeID) + store := s.cache.getStoreByStoreID(testcase.storeID) + _, err := store.initResolve(s.bo, s.cache) + c.Assert(err, IsNil) + labels := []*metapb.StoreLabel{ + { + Key: "id", + Value: fmt.Sprintf("%v", testcase.storeID), + }, + } + stores := s.cache.getStoresByLabels(labels) + c.Assert(len(stores), Equals, 1) + c.Assert(stores[0].labels, DeepEquals, labels) + } +} + func (s *testRegionCacheSuite) TestSimple(c *C) { seed := rand.Uint32() r := s.getRegion(c, []byte("a"))