Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: support maintaining labels in Store #21565

Merged
merged 19 commits into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ const (
DefStoreLivenessTimeout = "5s"
// DefTxnScope is the default value for TxnScope
DefTxnScope = "global"
// DefStoresRefreshInterval is the default value of StoresRefreshInterval
DefStoresRefreshInterval = 60
)

// Valid config maps
Expand Down Expand Up @@ -176,6 +178,8 @@ type Config struct {
// where M is the element literal length and w is the number of bytes required for the maximum-length character in the character set.
// See https://dev.mysql.com/doc/refman/8.0/en/string-type-syntax.html for more details.
EnableEnumLengthLimit bool `toml:"enable-enum-length-limit" json:"enable-enum-length-limit"`
// StoresRefreshInterval indicates the interval of refreshing stores info, the unit is second.
StoresRefreshInterval uint64 `toml:"stores-refresh-interval" json:"stores-refresh-interval"`
}

// UpdateTempStoragePath is to update the `TempStoragePath` if port/statusPort was changed
Expand Down Expand Up @@ -786,6 +790,7 @@ var defaultConf = Config{
DeprecateIntegerDisplayWidth: false,
TxnScope: DefTxnScope,
EnableEnumLengthLimit: true,
StoresRefreshInterval: DefStoresRefreshInterval,
}

var (
Expand Down
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ skip-register-to-dashboard = true
deprecate-integer-display-length = true
txn-scope = "dc-1"
enable-enum-length-limit = false
stores-refresh-interval = 30
[performance]
txn-total-size-limit=2000
[tikv-client]
Expand Down Expand Up @@ -273,6 +274,7 @@ spilled-file-encryption-method = "plaintext"
c.Assert(conf.DeprecateIntegerDisplayWidth, Equals, true)
c.Assert(conf.TxnScope, Equals, "dc-1")
c.Assert(conf.EnableEnumLengthLimit, Equals, false)
c.Assert(conf.StoresRefreshInterval, Equals, uint64(30))

_, err = f.WriteString(`
[log.file]
Expand Down
14 changes: 12 additions & 2 deletions store/mockstore/mocktikv/cluster_manipulate.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

package mocktikv

import "fmt"
import (
"fmt"

"github.com/pingcap/kvproto/pkg/metapb"
)

// BootstrapWithSingleStore initializes a Cluster with 1 Region and 1 Store.
func BootstrapWithSingleStore(cluster *Cluster) (storeID, peerID, regionID uint64) {
Expand All @@ -31,7 +35,13 @@ func BootstrapWithMultiStores(cluster *Cluster, n int) (storeIDs, peerIDs []uint
leaderPeer = peerIDs[0]
regionID = cluster.AllocID()
for _, storeID := range storeIDs {
cluster.AddStore(storeID, fmt.Sprintf("store%d", storeID))
labels := []*metapb.StoreLabel{
{
Key: "id",
Value: fmt.Sprintf("%v", storeID),
},
}
cluster.AddStore(storeID, fmt.Sprintf("store%d", storeID), labels...)
}
cluster.Bootstrap(regionID, storeIDs, peerIDs, leaderPeer)
return
Expand Down
81 changes: 69 additions & 12 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -286,7 +287,8 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
c.storeMu.stores = make(map[uint64]*Store)
c.notifyCheckCh = make(chan struct{}, 1)
c.closeCh = make(chan struct{})
go c.asyncCheckAndResolveLoop()
interval := config.GetGlobalConfig().StoresRefreshInterval
go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second)
return c
}

Expand All @@ -296,7 +298,9 @@ func (c *RegionCache) Close() {
}

// asyncCheckAndResolveLoop with
func (c *RegionCache) asyncCheckAndResolveLoop() {
func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
var needCheckStores []*Store
for {
select {
Expand All @@ -305,6 +309,18 @@ func (c *RegionCache) asyncCheckAndResolveLoop() {
case <-c.notifyCheckCh:
needCheckStores = needCheckStores[:0]
c.checkAndResolve(needCheckStores)
case <-ticker.C:
// refresh store once a minute to update labels
var stores []*Store
c.storeMu.RLock()
stores = make([]*Store, 0, len(c.storeMu.stores))
for _, s := range c.storeMu.stores {
stores = append(stores, s)
}
c.storeMu.RUnlock()
for _, store := range stores {
store.reResolve(c)
}
}
}
}
Expand Down Expand Up @@ -1130,6 +1146,18 @@ func (c *RegionCache) getStoreByStoreID(storeID uint64) (store *Store) {
return
}

func (c *RegionCache) getStoresByLabels(labels []*metapb.StoreLabel) []*Store {
c.storeMu.RLock()
defer c.storeMu.RUnlock()
s := make([]*Store, 0)
for _, store := range c.storeMu.stores {
if store.IsLabelsMatch(labels) {
s = append(s, store)
}
}
return s
}

// OnRegionEpochNotMatch removes the old region and inserts new regions into the cache.
func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, currentRegions []*metapb.Region) error {
// Find whether the region epoch in `ctx` is ahead of TiKV's. If so, backoff.
Expand Down Expand Up @@ -1385,14 +1413,15 @@ func (r *Region) ContainsByEnd(key []byte) bool {

// Store contains a kv process's address.
type Store struct {
addr string // loaded store address
saddr string // loaded store status address
storeID uint64 // store's id
state uint64 // unsafe store storeState
resolveMutex sync.Mutex // protect pd from concurrent init requests
epoch uint32 // store fail epoch, see RegionStore.storeEpochs
storeType kv.StoreType // type of the store
tokenCount atomic2.Int64 // used store token count
addr string // loaded store address
saddr string // loaded store status address
storeID uint64 // store's id
state uint64 // unsafe store storeState
labels []*metapb.StoreLabel // stored store labels
resolveMutex sync.Mutex // protect pd from concurrent init requests
epoch uint32 // store fail epoch, see RegionStore.storeEpochs
storeType kv.StoreType // type of the store
tokenCount atomic2.Int64 // used store token count
}

type resolveState uint64
Expand Down Expand Up @@ -1439,6 +1468,7 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err
s.addr = addr
s.saddr = store.GetStatusAddress()
s.storeType = GetStoreTypeByMeta(store)
s.labels = store.GetLabels()
retry:
state = s.getResolveState()
if state != unresolved {
Expand Down Expand Up @@ -1491,9 +1521,9 @@ func (s *Store) reResolve(c *RegionCache) {

storeType := GetStoreTypeByMeta(store)
addr = store.GetAddress()
if s.addr != addr {
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) {
state := resolved
newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType}
newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels()}
newStore.state = *(*uint64)(&state)
c.storeMu.Lock()
c.storeMu.stores[newStore.storeID] = newStore
Expand Down Expand Up @@ -1547,7 +1577,34 @@ retry:
case notifyCheckCh <- struct{}{}:
default:
}
}

// IsSameLabels returns whether the store have the same labels with target labels
func (s *Store) IsSameLabels(labels []*metapb.StoreLabel) bool {
if len(s.labels) != len(labels) {
return false
}
return s.IsLabelsMatch(labels)
}

// IsLabelsMatch return whether the store's labels match the target labels
func (s *Store) IsLabelsMatch(labels []*metapb.StoreLabel) bool {
if len(labels) < 1 {
return true
}
for _, targetLabel := range labels {
match := false
for _, label := range s.labels {
if targetLabel.Key == label.Key && targetLabel.Value == label.Value {
match = true
break
}
}
if !match {
return false
}
}
return true
}

type livenessState uint32
Expand Down
28 changes: 28 additions & 0 deletions store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,34 @@ func (s *testRegionCacheSuite) getAddr(c *C, key []byte, replicaRead kv.ReplicaR
return ctx.Addr
}

func (s *testRegionCacheSuite) TestStoreLabels(c *C) {
testcases := []struct {
storeID uint64
}{
{
storeID: s.store1,
},
{
storeID: s.store2,
},
}
for _, testcase := range testcases {
c.Log(testcase.storeID)
store := s.cache.getStoreByStoreID(testcase.storeID)
_, err := store.initResolve(s.bo, s.cache)
c.Assert(err, IsNil)
labels := []*metapb.StoreLabel{
{
Key: "id",
Value: fmt.Sprintf("%v", testcase.storeID),
},
}
stores := s.cache.getStoresByLabels(labels)
c.Assert(len(stores), Equals, 1)
c.Assert(stores[0].labels, DeepEquals, labels)
}
}

func (s *testRegionCacheSuite) TestSimple(c *C) {
seed := rand.Uint32()
r := s.getRegion(c, []byte("a"))
Expand Down