Skip to content

Commit

Permalink
region_cache: extract store related fields to store cache (#1279)
Browse files Browse the repository at this point in the history
Signed-off-by: zyguan <[email protected]>
Co-authored-by: cfzjywxk <[email protected]>
  • Loading branch information
zyguan and cfzjywxk authored Apr 18, 2024
1 parent 09b120c commit 36c8d2c
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 131 deletions.
70 changes: 25 additions & 45 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,11 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio
var leaderAccessIdx AccessIndex
availablePeers := r.meta.GetPeers()[:0]
for _, p := range r.meta.Peers {
store, exists := c.getStore(p.StoreId)
store, exists := c.stores.get(p.StoreId)
if !exists {
store = c.getStoreOrInsertDefault(p.StoreId)
store = c.stores.getOrInsertDefault(p.StoreId)
}
addr, err := store.initResolve(bo, c)
addr, err := store.initResolve(bo, c.stores)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -633,25 +633,11 @@ type RegionCache struct {

mu regionIndexMu

storeMu struct {
sync.RWMutex
stores map[uint64]*Store
}
tiflashComputeStoreMu struct {
sync.RWMutex
needReload bool
stores []*Store
}
notifyCheckCh chan struct{}
stores storeCache

// runner for background jobs
bg *bgRunner

testingKnobs struct {
// Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set,
// requestLiveness always returns unreachable.
mockRequestLiveness atomic.Pointer[livenessFunc]
}
clusterID uint64
}

Expand Down Expand Up @@ -681,10 +667,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
c.codec = codecPDClient.GetCodec()
}

c.storeMu.stores = make(map[uint64]*Store)
c.tiflashComputeStoreMu.needReload = true
c.tiflashComputeStoreMu.stores = make([]*Store, 0)
c.notifyCheckCh = make(chan struct{}, 1)
c.stores = newStoreCache(pdClient)
c.bg = newBackgroundRunner(context.Background())
c.enableForwarding = config.GetGlobalConfig().EnableForwarding
if c.pdClient != nil {
Expand Down Expand Up @@ -719,7 +702,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
}
needCheckStores = c.checkAndResolve(needCheckStores[:0], func(s *Store) bool { return filter(s.getResolveState()) })
return false
}, time.Duration(refreshStoreInterval/4)*time.Second, c.getCheckStoreEvents())
}, time.Duration(refreshStoreInterval/4)*time.Second, c.stores.getCheckStoreEvents())
if !options.noHealthTick {
c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second)
}
Expand All @@ -741,10 +724,6 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
// only used fot test.
func newTestRegionCache() *RegionCache {
c := &RegionCache{}
c.storeMu.stores = make(map[uint64]*Store)
c.tiflashComputeStoreMu.needReload = true
c.tiflashComputeStoreMu.stores = make([]*Store, 0)
c.notifyCheckCh = make(chan struct{}, 1)
c.bg = newBackgroundRunner(context.Background())
c.mu = *newRegionIndexMu(nil)
return c
Expand All @@ -753,7 +732,7 @@ func newTestRegionCache() *RegionCache {
// clear clears all cached data in the RegionCache. It's only used in tests.
func (c *RegionCache) clear() {
c.mu.refresh(nil)
c.clearStores()
c.stores.clear()
}

// thread unsafe, should use with lock
Expand All @@ -778,22 +757,23 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*
}
}()

needCheckStores = c.filterStores(needCheckStores, needCheck)
needCheckStores = c.stores.filter(needCheckStores, needCheck)
for _, store := range needCheckStores {
_, err := store.reResolve(c)
_, err := store.reResolve(c.stores)
tikverr.Log(err)
}
return needCheckStores
}

// SetRegionCacheStore is used to set a store in region cache, for testing only
func (c *RegionCache) SetRegionCacheStore(id uint64, addr string, peerAddr string, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
c.putStore(newStore(id, addr, peerAddr, "", storeType, resolveState(state), labels))
c.stores.put(newStore(id, addr, peerAddr, "", storeType, resolveState(state), labels))
}

// SetPDClient replaces pd client,for testing only
func (c *RegionCache) SetPDClient(client pd.Client) {
c.pdClient = client
c.stores = newStoreCache(client)
}

// RPCContext contains data that is needed to send RPC to a region.
Expand Down Expand Up @@ -1057,7 +1037,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID,
return nil, nil
}
if store.getResolveState() == needCheck {
_, err := store.reResolve(c)
_, err := store.reResolve(c.stores)
tikverr.Log(err)
}
regionStore.workTiFlashIdx.Store(int32(accessIdx))
Expand Down Expand Up @@ -1406,7 +1386,7 @@ func (c *RegionCache) markRegionNeedBeRefill(s *Store, storeIdx int, rs *regionS
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
}
// schedule a store addr resolve.
c.markStoreNeedCheck(s)
c.stores.markStoreNeedCheck(s)
return incEpochStoreIdx
}

Expand Down Expand Up @@ -1759,14 +1739,14 @@ func (c *RegionCache) searchCachedRegionByID(regionID uint64) (*Region, bool) {

// GetStoresByType gets stores by type `typ`
func (c *RegionCache) GetStoresByType(typ tikvrpc.EndpointType) []*Store {
return c.filterStores(nil, func(s *Store) bool {
return c.stores.filter(nil, func(s *Store) bool {
return s.getResolveState() == resolved && s.storeType == typ
})
}

// GetAllStores gets TiKV and TiFlash stores.
func (c *RegionCache) GetAllStores() []*Store {
return c.filterStores(nil, func(s *Store) bool {
return c.stores.filter(nil, func(s *Store) bool {
return s.getResolveState() == resolved && (s.storeType == tikvrpc.TiKV || s.storeType == tikvrpc.TiFlash)
})
}
Expand Down Expand Up @@ -2037,7 +2017,7 @@ func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *S
addr = store.addr
return
case unresolved:
addr, err = store.initResolve(bo, c)
addr, err = store.initResolve(bo, c.stores)
return
case deleted:
addr = c.changeToActiveStore(region, store.storeID)
Expand Down Expand Up @@ -2094,7 +2074,7 @@ func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStor
// changeToActiveStore replace the deleted store in the region by an up-to-date store in the stores map.
// The order is guaranteed by reResolve() which adds the new store before marking old store deleted.
func (c *RegionCache) changeToActiveStore(region *Region, storeID uint64) (addr string) {
store, _ := c.getStore(storeID)
store, _ := c.stores.get(storeID)
for {
oldRegionStore := region.getStore()
newRegionStore := oldRegionStore.clone()
Expand Down Expand Up @@ -2204,19 +2184,19 @@ func (c *RegionCache) PDClient() pd.Client {
// GetTiFlashStores returns the information of all tiflash nodes. Like `GetAllStores`, the method only returns resolved
// stores so that users won't be bothered by tombstones. (related issue: https://github.com/pingcap/tidb/issues/46602)
func (c *RegionCache) GetTiFlashStores(labelFilter LabelFilter) []*Store {
return c.filterStores(nil, func(s *Store) bool {
return c.stores.filter(nil, func(s *Store) bool {
return s.storeType == tikvrpc.TiFlash && labelFilter(s.labels) && s.getResolveState() == resolved
})
}

// GetTiFlashComputeStores returns all stores with lable <engine, tiflash_compute>.
func (c *RegionCache) GetTiFlashComputeStores(bo *retry.Backoffer) (res []*Store, err error) {
stores, needReload := c.listTiflashComputeStores()
stores, needReload := c.stores.listTiflashComputeStores()

if needReload {
stores, err = reloadTiFlashComputeStores(bo.GetCtx(), c)
stores, err = reloadTiFlashComputeStores(bo.GetCtx(), c.stores)
if err == nil {
c.setTiflashComputeStores(stores)
c.stores.setTiflashComputeStores(stores)
}
return stores, err
}
Expand Down Expand Up @@ -2266,7 +2246,7 @@ func (c *RegionCache) InvalidateTiFlashComputeStoresIfGRPCError(err error) bool
// InvalidateTiFlashComputeStores set needReload be true,
// and will refresh tiflash_compute store cache next time.
func (c *RegionCache) InvalidateTiFlashComputeStores() {
c.markTiflashComputeStoresNeedReload()
c.stores.markTiflashComputeStoresNeedReload()
}

// UpdateBucketsIfNeeded queries PD to update the buckets of the region in the cache if
Expand Down Expand Up @@ -2652,7 +2632,7 @@ func (c *RegionCache) checkAndUpdateStoreHealthStatus() {
}()
healthDetails := make(map[uint64]HealthStatusDetail)
now := time.Now()
c.forEachStore(func(store *Store) {
c.stores.forEach(func(store *Store) {
store.healthStatus.tick(now)
healthDetails[store.storeID] = store.healthStatus.GetHealthStatusDetail()
})
Expand All @@ -2664,7 +2644,7 @@ func (c *RegionCache) checkAndUpdateStoreHealthStatus() {

// reportStoreReplicaFlows reports the statistics on the related replicaFlowsType.
func (c *RegionCache) reportStoreReplicaFlows() {
c.forEachStore(func(store *Store) {
c.stores.forEach(func(store *Store) {
for destType := toLeader; destType < numReplicaFlowsType; destType++ {
metrics.TiKVPreferLeaderFlowsGauge.WithLabelValues(destType.String(), store.addr).Set(float64(store.getReplicaFlowsStats(destType)))
store.resetReplicaFlowsStats(destType)
Expand All @@ -2683,7 +2663,7 @@ func contains(startKey, endKey, key []byte) bool {
}

func (c *RegionCache) onHealthFeedback(feedback *tikvpb.HealthFeedback) {
store, ok := c.getStore(feedback.GetStoreId())
store, ok := c.stores.get(feedback.GetStoreId())
if !ok {
logutil.BgLogger().Info("dropped health feedback info due to unknown store id", zap.Uint64("storeID", feedback.GetStoreId()))
return
Expand Down
Loading

0 comments on commit 36c8d2c

Please sign in to comment.