diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index a4f40ef8e..690cd7cb4 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -287,11 +287,9 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio var leaderAccessIdx AccessIndex availablePeers := r.meta.GetPeers()[:0] for _, p := range r.meta.Peers { - c.storeMu.RLock() - store, exists := c.storeMu.stores[p.StoreId] - c.storeMu.RUnlock() + store, exists := c.getStore(p.StoreId) if !exists { - store = c.getStoreByStoreID(p.StoreId) + store = c.getStoreOrInsertDefault(p.StoreId) } addr, err := store.initResolve(bo, c) if err != nil { @@ -334,11 +332,9 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio } for _, p := range pdRegion.DownPeers { - c.storeMu.RLock() - store, exists := c.storeMu.stores[p.StoreId] - c.storeMu.RUnlock() + store, exists := c.getStore(p.StoreId) if !exists { - store = c.getStoreByStoreID(p.StoreId) + store = c.getStoreOrInsertDefault(p.StoreId) } addr, err := store.initResolve(bo, c) if err != nil { @@ -468,7 +464,7 @@ func (mu *regionIndexMu) refresh(r []*Region) { mu.sorted = newMu.sorted } -type livenessFunc func(s *Store, bo *retry.Backoffer) livenessState +type livenessFunc func(ctx context.Context, s *Store) livenessState // RegionCache caches Regions loaded from PD. // All public methods of this struct should be thread-safe, unless explicitly pointed out or the method is for testing @@ -564,9 +560,7 @@ func newTestRegionCache() *RegionCache { // clear clears all cached data in the RegionCache. It's only used in tests. func (c *RegionCache) clear() { c.mu = *newRegionIndexMu(nil) - c.storeMu.Lock() - c.storeMu.stores = make(map[uint64]*Store) - c.storeMu.Unlock() + c.clearStores() } // thread unsafe, should use with lock @@ -596,7 +590,7 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { select { case <-c.ctx.Done(): return - case <-c.notifyCheckCh: + case <-c.getCheckStoreEvents(): c.checkAndResolve(needCheckStores, func(s *Store) bool { return s.getResolveState() == needCheck }) @@ -639,15 +633,10 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(* } }() - c.storeMu.RLock() - for _, store := range c.storeMu.stores { - if needCheck(store) { - needCheckStores = append(needCheckStores, store) - } - } - c.storeMu.RUnlock() - - for _, store := range needCheckStores { + for _, store := range c.listStores(listStoreOptions{ + target: needCheckStores, + filter: needCheck, + }) { _, err := store.reResolve(c) tikverr.Log(err) } @@ -655,16 +644,14 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(* // SetRegionCacheStore is used to set a store in region cache, for testing only func (c *RegionCache) SetRegionCacheStore(id uint64, addr string, peerAddr string, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) { - c.storeMu.Lock() - defer c.storeMu.Unlock() - c.storeMu.stores[id] = &Store{ + c.putStore(&Store{ storeID: id, storeType: storeType, state: state, labels: labels, addr: addr, peerAddr: peerAddr, - } + }) } // SetPDClient replaces pd client,for testing only @@ -1224,7 +1211,7 @@ func (c *RegionCache) markRegionNeedBeRefill(s *Store, storeIdx int, rs *regionS metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() } // schedule a store addr resolve. - s.markNeedCheck(c.notifyCheckCh) + c.markStoreNeedCheck(s) return incEpochStoreIdx } @@ -1625,41 +1612,23 @@ func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region { } // GetStoresByType gets stores by type `typ` -// TODO: revise it by get store by closure. func (c *RegionCache) GetStoresByType(typ tikvrpc.EndpointType) []*Store { - c.storeMu.Lock() - defer c.storeMu.Unlock() - stores := make([]*Store, 0) - for _, store := range c.storeMu.stores { - if store.getResolveState() != resolved { - continue - } - if store.storeType == typ { - //TODO: revise it with store.clone() - storeLabel := make([]*metapb.StoreLabel, 0) - for _, label := range store.labels { - storeLabel = append(storeLabel, &metapb.StoreLabel{ - Key: label.Key, - Value: label.Value, - }) - } - stores = append(stores, &Store{ - addr: store.addr, - peerAddr: store.peerAddr, - storeID: store.storeID, - labels: storeLabel, - storeType: typ, - }) - } - } - return stores + return c.listStores(listStoreOptions{ + filter: func(s *Store) bool { + return s.getResolveState() == resolved && s.storeType == typ + }, + copy: true, + }) } // GetAllStores gets TiKV and TiFlash stores. func (c *RegionCache) GetAllStores() []*Store { - stores := c.GetStoresByType(tikvrpc.TiKV) - tiflashStores := c.GetStoresByType(tikvrpc.TiFlash) - return append(stores, tiflashStores...) + return c.listStores(listStoreOptions{ + filter: func(s *Store) bool { + return s.getResolveState() == resolved && (s.storeType == tikvrpc.TiKV || s.storeType == tikvrpc.TiFlash) + }, + copy: true, + }) } func filterUnavailablePeers(region *pd.Region) { @@ -1927,7 +1896,7 @@ func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *S addr, err = store.initResolve(bo, c) return case deleted: - addr = c.changeToActiveStore(region, store) + addr = c.changeToActiveStore(region, store.storeID) return case tombstone: return "", nil @@ -1980,10 +1949,8 @@ func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStor // changeToActiveStore replace the deleted store in the region by an up-to-date store in the stores map. // The order is guaranteed by reResolve() which adds the new store before marking old store deleted. -func (c *RegionCache) changeToActiveStore(region *Region, store *Store) (addr string) { - c.storeMu.RLock() - store = c.storeMu.stores[store.storeID] - c.storeMu.RUnlock() +func (c *RegionCache) changeToActiveStore(region *Region, storeID uint64) (addr string) { + store, _ := c.getStore(storeID) for { oldRegionStore := region.getStore() newRegionStore := oldRegionStore.clone() @@ -2003,32 +1970,6 @@ func (c *RegionCache) changeToActiveStore(region *Region, store *Store) (addr st return } -func (c *RegionCache) getStoreByStoreID(storeID uint64) (store *Store) { - var ok bool - c.storeMu.Lock() - store, ok = c.storeMu.stores[storeID] - if ok { - c.storeMu.Unlock() - return - } - store = &Store{storeID: storeID} - c.storeMu.stores[storeID] = store - c.storeMu.Unlock() - return -} - -func (c *RegionCache) getStoresByLabels(labels []*metapb.StoreLabel) []*Store { - c.storeMu.RLock() - defer c.storeMu.RUnlock() - s := make([]*Store, 0) - for _, store := range c.storeMu.stores { - if store.IsLabelsMatch(labels) { - s = append(s, store) - } - } - return s -} - // OnBucketVersionNotMatch removes the old buckets meta if the version is stale. func (c *RegionCache) OnBucketVersionNotMatch(ctx *RPCContext, version uint64, keys [][]byte) { r := c.GetCachedRegionWithRLock(ctx.Region) @@ -2118,35 +2059,29 @@ func (c *RegionCache) PDClient() pd.Client { // GetTiFlashStores returns the information of all tiflash nodes. func (c *RegionCache) GetTiFlashStores(labelFilter LabelFilter) []*Store { - c.storeMu.RLock() - defer c.storeMu.RUnlock() - var stores []*Store - for _, s := range c.storeMu.stores { - if s.storeType == tikvrpc.TiFlash { - if !labelFilter(s.labels) { - continue - } - stores = append(stores, s) - } - } - return stores + return c.listStores(listStoreOptions{ + filter: func(s *Store) bool { + return s.storeType == tikvrpc.TiFlash && labelFilter(s.labels) + }, + }) } // GetTiFlashComputeStores returns all stores with lable . func (c *RegionCache) GetTiFlashComputeStores(bo *retry.Backoffer) (res []*Store, err error) { - c.tiflashComputeStoreMu.RLock() - needReload := c.tiflashComputeStoreMu.needReload - stores := c.tiflashComputeStoreMu.stores - c.tiflashComputeStoreMu.RUnlock() + stores, needReload := c.listTiflashComputeStores() if needReload { - return c.reloadTiFlashComputeStores(bo) + stores, err = reloadTiFlashComputeStores(bo.GetCtx(), c.pdClient) + if err == nil { + c.setTiflashCoumputStores(stores) + } + return stores, err } return stores, nil } -func (c *RegionCache) reloadTiFlashComputeStores(bo *retry.Backoffer) (res []*Store, _ error) { - stores, err := c.pdClient.GetAllStores(bo.GetCtx()) +func reloadTiFlashComputeStores(ctx context.Context, pdClient pd.Client) (res []*Store, _ error) { + stores, err := pdClient.GetAllStores(ctx) if err != nil { return nil, err } @@ -2163,10 +2098,6 @@ func (c *RegionCache) reloadTiFlashComputeStores(bo *retry.Backoffer) (res []*St }) } } - - c.tiflashComputeStoreMu.Lock() - c.tiflashComputeStoreMu.stores = res - c.tiflashComputeStoreMu.Unlock() return res, nil } @@ -2192,9 +2123,7 @@ func (c *RegionCache) InvalidateTiFlashComputeStoresIfGRPCError(err error) bool // InvalidateTiFlashComputeStores set needReload be true, // and will refresh tiflash_compute store cache next time. func (c *RegionCache) InvalidateTiFlashComputeStores() { - c.tiflashComputeStoreMu.Lock() - defer c.tiflashComputeStoreMu.Unlock() - c.tiflashComputeStoreMu.needReload = true + c.markTiflashComputeStoresNeedReload() } // UpdateBucketsIfNeeded queries PD to update the buckets of the region in the cache if @@ -2700,12 +2629,10 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { addr = store.GetAddress() if s.addr != addr || !s.IsSameLabels(store.GetLabels()) { newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)} - c.storeMu.Lock() if s.addr == addr { newStore.slowScore = s.slowScore } - c.storeMu.stores[newStore.storeID] = newStore - c.storeMu.Unlock() + c.putStore(newStore) s.setResolveState(deleted) return false, nil } @@ -2749,16 +2676,6 @@ func (s *Store) changeResolveStateTo(from, to resolveState) bool { } } -// markNeedCheck marks resolved store to be async resolve to check store addr change. -func (s *Store) markNeedCheck(notifyCheckCh chan struct{}) { - if s.changeResolveStateTo(resolved, needCheck) { - select { - case notifyCheckCh <- struct{}{}: - default: - } - } -} - // IsSameLabels returns whether the store have the same labels with target labels func (s *Store) IsSameLabels(labels []*metapb.StoreLabel) bool { if len(s.labels) != len(labels) { @@ -2822,6 +2739,10 @@ func (s *Store) getLivenessState() livenessState { type livenessState uint32 +func (l livenessState) injectConstantLiveness(tk testingKnobs) { + tk.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState { return l }) +} + var ( livenessSf singleflight.Group // storeLivenessTimeout is the max duration of resolving liveness of a TiKV instance. @@ -2858,7 +2779,11 @@ func (s livenessState) String() string { } } -func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) { +func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoffer, c *RegionCache) (liveness livenessState) { + liveness = s.requestLiveness(bo.GetCtx(), c) + if liveness == reachable { + return + } // This mechanism doesn't support non-TiKV stores currently. if s.storeType != tikvrpc.TiKV { logutil.BgLogger().Info("[health check] skip running health check loop for non-tikv store", @@ -2871,6 +2796,7 @@ func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessSt s.unreachableSince = time.Now() go s.checkUntilHealth(c) } + return } func (s *Store) checkUntilHealth(c *RegionCache) { @@ -2897,8 +2823,7 @@ func (s *Store) checkUntilHealth(c *RegionCache) { } } - bo := retry.NewNoopBackoff(c.ctx) - l := s.requestLiveness(bo, c) + l := s.requestLiveness(c.ctx, c) if l == reachable { logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID)) @@ -2909,7 +2834,7 @@ func (s *Store) checkUntilHealth(c *RegionCache) { } } -func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l livenessState) { +func (s *Store) requestLiveness(ctx context.Context, tk testingKnobs) (l livenessState) { // It's not convenient to mock liveness in integration tests. Use failpoint to achieve that instead. if val, err := util.EvalFailpoint("injectLiveness"); err == nil { switch val.(string) { @@ -2922,10 +2847,10 @@ func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l liveness } } - if c != nil { - livenessFunc := c.testingKnobs.mockRequestLiveness.Load() + if tk != nil { + livenessFunc := tk.getMockRequestLiveness() if livenessFunc != nil { - return (*livenessFunc)(s, bo) + return livenessFunc(ctx, s) } } @@ -2941,12 +2866,6 @@ func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l liveness rsCh := livenessSf.DoChan(addr, func() (interface{}, error) { return invokeKVStatusAPI(addr, storeLivenessTimeout), nil }) - var ctx context.Context - if bo != nil { - ctx = bo.GetCtx() - } else { - ctx = context.Background() - } select { case rs := <-rsCh: l = rs.Val.(livenessState) @@ -3083,12 +3002,10 @@ func (c *RegionCache) checkAndUpdateStoreSlowScores() { } }() slowScoreMetrics := make(map[string]float64) - c.storeMu.RLock() - for _, store := range c.storeMu.stores { + c.forEachStore(func(store *Store) { store.updateSlowScoreStat() slowScoreMetrics[store.addr] = float64(store.getSlowScore()) - } - c.storeMu.RUnlock() + }) for store, score := range slowScoreMetrics { metrics.TiKVStoreSlowScoreGauge.WithLabelValues(store).Set(score) } @@ -3118,14 +3035,12 @@ func (c *RegionCache) asyncReportStoreReplicaFlows(interval time.Duration) { case <-c.ctx.Done(): return case <-ticker.C: - c.storeMu.RLock() - for _, store := range c.storeMu.stores { + c.forEachStore(func(store *Store) { for destType := toLeader; destType < numReplicaFlowsType; destType++ { metrics.TiKVPreferLeaderFlowsGauge.WithLabelValues(destType.String(), store.addr).Set(float64(store.getReplicaFlowsStats(destType))) store.resetReplicaFlowsStats(destType) } - } - c.storeMu.RUnlock() + }) } } } @@ -3184,3 +3099,128 @@ func contains(startKey, endKey, key []byte) bool { return bytes.Compare(startKey, key) <= 0 && (bytes.Compare(key, endKey) < 0 || len(endKey) == 0) } + +type testingKnobs interface { + getMockRequestLiveness() livenessFunc + setMockRequestLiveness(f livenessFunc) +} + +func (c *RegionCache) getMockRequestLiveness() livenessFunc { + return *c.testingKnobs.mockRequestLiveness.Load() +} + +func (c *RegionCache) setMockRequestLiveness(f livenessFunc) { + c.testingKnobs.mockRequestLiveness.Store(&f) +} + +func (c *RegionCache) getStore(id uint64) (store *Store, exists bool) { + c.storeMu.RLock() + store, exists = c.storeMu.stores[id] + c.storeMu.RUnlock() + return +} + +func (c *RegionCache) getStoreOrInsertDefault(id uint64) *Store { + c.storeMu.Lock() + store, exists := c.storeMu.stores[id] + if !exists { + store = &Store{storeID: id} + c.storeMu.stores[id] = store + } + c.storeMu.Unlock() + return store +} + +func (c *RegionCache) putStore(store *Store) { + c.storeMu.Lock() + c.storeMu.stores[store.storeID] = store + c.storeMu.Unlock() +} + +func (c *RegionCache) clearStores() { + c.storeMu.Lock() + c.storeMu.stores = make(map[uint64]*Store) + c.storeMu.Unlock() +} + +func (c *RegionCache) forEachStore(f func(*Store)) { + c.storeMu.RLock() + defer c.storeMu.RUnlock() + for _, s := range c.storeMu.stores { + f(s) + } +} + +func (c *RegionCache) listStores(opts listStoreOptions) []*Store { + c.storeMu.RLock() + for _, store := range c.storeMu.stores { + if opts.filter == nil || opts.filter(store) { + opts.append(store) + } + } + c.storeMu.RUnlock() + return opts.target +} + +type listStoreOptions struct { + target []*Store + filter func(store *Store) bool + // TODO(zyguan): why we need copy semantics when GetStoresByType and GetAllStores? + // ref https://github.com/pingcap/tidb/commit/75913fdc9cce920a5a49c532bf55078b2928fc92#diff-625dbb0d6c1c2727cc16393b810d430d9a4fb0f99b35129071d99f04541b5b76R1083-R1087 + copy bool +} + +func (opts *listStoreOptions) append(store *Store) { + if !opts.copy { + opts.target = append(opts.target, store) + return + } + labels := make([]*metapb.StoreLabel, len(store.labels)) + for _, label := range store.labels { + labels = append(labels, &metapb.StoreLabel{ + Key: label.Key, + Value: label.Value, + }) + } + opts.target = append(opts.target, &Store{ + addr: store.addr, + peerAddr: store.peerAddr, + storeID: store.storeID, + storeType: store.storeType, + labels: labels, + }) +} + +func (c *RegionCache) listTiflashComputeStores() (stores []*Store, needReload bool) { + c.tiflashComputeStoreMu.RLock() + needReload = c.tiflashComputeStoreMu.needReload + stores = c.tiflashComputeStoreMu.stores + c.tiflashComputeStoreMu.RUnlock() + return +} + +func (c *RegionCache) setTiflashCoumputStores(stores []*Store) { + // TODO(zyguan): needReload seems to be always true, do we need to set it to false here? + c.tiflashComputeStoreMu.Lock() + c.tiflashComputeStoreMu.stores = stores + c.tiflashComputeStoreMu.Unlock() +} + +func (c *RegionCache) markTiflashComputeStoresNeedReload() { + c.tiflashComputeStoreMu.Lock() + c.tiflashComputeStoreMu.needReload = true + c.tiflashComputeStoreMu.Unlock() +} + +func (c *RegionCache) markStoreNeedCheck(store *Store) { + if store.changeResolveStateTo(resolved, needCheck) { + select { + case c.notifyCheckCh <- struct{}{}: + default: + } + } +} + +func (c *RegionCache) getCheckStoreEvents() <-chan struct{} { + return c.notifyCheckCh +} diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index c84c4e77b..c299c6c99 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -170,7 +170,7 @@ func (s *testRegionCacheSuite) TestStoreLabels() { } for _, testcase := range testcases { s.T().Log(testcase.storeID) - store := s.cache.getStoreByStoreID(testcase.storeID) + store := s.cache.getStoreOrInsertDefault(testcase.storeID) _, err := store.initResolve(s.bo, s.cache) s.Nil(err) labels := []*metapb.StoreLabel{ @@ -179,7 +179,9 @@ func (s *testRegionCacheSuite) TestStoreLabels() { Value: fmt.Sprintf("%v", testcase.storeID), }, } - stores := s.cache.getStoresByLabels(labels) + stores := s.cache.listStores(listStoreOptions{ + filter: func(s *Store) bool { return s.IsLabelsMatch(labels) }, + }) s.Equal(len(stores), 1) s.Equal(stores[0].labels, labels) } @@ -209,7 +211,7 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { // Check resolving normal stores. The resolve state should be resolved. for _, storeMeta := range s.cluster.GetAllStores() { - store := cache.getStoreByStoreID(storeMeta.GetId()) + store := cache.getStoreOrInsertDefault(storeMeta.GetId()) s.Equal(store.getResolveState(), unresolved) addr, err := store.initResolve(bo, cache) s.Nil(err) @@ -227,26 +229,26 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { } // Mark the store needCheck. The resolve state should be resolved soon. - store := cache.getStoreByStoreID(s.store1) - store.markNeedCheck(cache.notifyCheckCh) + store := cache.getStoreOrInsertDefault(s.store1) + cache.markStoreNeedCheck(store) waitResolve(store) s.Equal(store.getResolveState(), resolved) // Mark the store needCheck and it becomes a tombstone. The resolve state should be tombstone. s.cluster.MarkTombstone(s.store1) - store.markNeedCheck(cache.notifyCheckCh) + cache.markStoreNeedCheck(store) waitResolve(store) s.Equal(store.getResolveState(), tombstone) s.cluster.StartStore(s.store1) // Mark the store needCheck and it's deleted from PD. The resolve state should be tombstone. cache.clear() - store = cache.getStoreByStoreID(s.store1) + store = cache.getStoreOrInsertDefault(s.store1) store.initResolve(bo, cache) s.Equal(store.getResolveState(), resolved) storeMeta := s.cluster.GetStore(s.store1) s.cluster.RemoveStore(s.store1) - store.markNeedCheck(cache.notifyCheckCh) + cache.markStoreNeedCheck(store) waitResolve(store) s.Equal(store.getResolveState(), tombstone) s.cluster.AddStore(storeMeta.GetId(), storeMeta.GetAddress(), storeMeta.GetLabels()...) @@ -254,14 +256,14 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { // Mark the store needCheck and its address and labels are changed. // The resolve state should be deleted and a new store is added to the cache. cache.clear() - store = cache.getStoreByStoreID(s.store1) + store = cache.getStoreOrInsertDefault(s.store1) store.initResolve(bo, cache) s.Equal(store.getResolveState(), resolved) s.cluster.UpdateStoreAddr(s.store1, store.addr+"0", &metapb.StoreLabel{Key: "k", Value: "v"}) - store.markNeedCheck(cache.notifyCheckCh) + cache.markStoreNeedCheck(store) waitResolve(store) s.Equal(store.getResolveState(), deleted) - newStore := cache.getStoreByStoreID(s.store1) + newStore := cache.getStoreOrInsertDefault(s.store1) s.Equal(newStore.getResolveState(), resolved) s.Equal(newStore.addr, store.addr+"0") s.Equal(newStore.labels, []*metapb.StoreLabel{{Key: "k", Value: "v"}}) @@ -269,7 +271,7 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { // Check initResolve()ing a tombstone store. The resolve state should be tombstone. cache.clear() s.cluster.MarkTombstone(s.store1) - store = cache.getStoreByStoreID(s.store1) + store = cache.getStoreOrInsertDefault(s.store1) for i := 0; i < 2; i++ { addr, err := store.initResolve(bo, cache) s.Nil(err) @@ -283,7 +285,7 @@ func (s *testRegionCacheSuite) TestResolveStateTransition() { cache.clear() storeMeta = s.cluster.GetStore(s.store1) s.cluster.RemoveStore(s.store1) - store = cache.getStoreByStoreID(s.store1) + store = cache.getStoreOrInsertDefault(s.store1) for i := 0; i < 2; i++ { addr, err := store.initResolve(bo, cache) s.Nil(err) @@ -1542,7 +1544,7 @@ func (s *testRegionCacheSuite) TestBuckets() { newMeta := proto.Clone(cachedRegion.meta).(*metapb.Region) newMeta.RegionEpoch.Version++ newMeta.RegionEpoch.ConfVer++ - _, err = s.cache.OnRegionEpochNotMatch(s.bo, &RPCContext{Region: cachedRegion.VerID(), Store: s.cache.getStoreByStoreID(s.store1)}, []*metapb.Region{newMeta}) + _, err = s.cache.OnRegionEpochNotMatch(s.bo, &RPCContext{Region: cachedRegion.VerID(), Store: s.cache.getStoreOrInsertDefault(s.store1)}, []*metapb.Region{newMeta}) s.Nil(err) cachedRegion = s.getRegion([]byte("a")) s.Equal(newBuckets, cachedRegion.getStore().buckets) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 214a4a05b..c8f7a617f 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1120,12 +1120,7 @@ func (s *replicaSelector) onReadReqConfigurableTimeout(req *tikvrpc.Request) boo } func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState { - store := accessReplica.store - liveness := store.requestLiveness(bo, s.regionCache) - if liveness != reachable { - store.startHealthCheckLoopIfNeeded(s.regionCache, liveness) - } - return liveness + return accessReplica.store.requestLivenessAndStartHealthCheckLoopIfNeeded(bo, s.regionCache) } func (s *replicaSelector) invalidateReplicaStore(replica *replica, cause error) { @@ -1139,7 +1134,7 @@ func (s *replicaSelector) invalidateReplicaStore(replica *replica, cause error) ) metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() // schedule a store addr resolve. - store.markNeedCheck(s.regionCache.notifyCheckCh) + s.regionCache.markStoreNeedCheck(store) store.markAlreadySlow() } } @@ -2168,7 +2163,7 @@ func (s *RegionRequestSender) onRegionError( zap.Stringer("storeNotMatch", storeNotMatch), zap.Stringer("ctx", ctx), ) - ctx.Store.markNeedCheck(s.regionCache.notifyCheckCh) + s.regionCache.markStoreNeedCheck(ctx.Store) s.regionCache.InvalidateCachedRegion(ctx.Region) // It's possible the address of store is not changed but the DNS resolves to a different address in k8s environment, // so we always reconnect in this case. diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index b5486bc76..3aa263823 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -100,7 +100,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit() { s.NotNil(region) oldStoreLimit := kv.StoreLimit.Load() kv.StoreLimit.Store(500) - s.cache.getStoreByStoreID(s.storeIDs[0]).tokenCount.Store(500) + s.cache.getStoreOrInsertDefault(s.storeIDs[0]).tokenCount.Store(500) // cause there is only one region in this cluster, regionID maps this leader. resp, _, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second) s.NotNil(err) @@ -170,13 +170,12 @@ func (s *testRegionRequestToThreeStoresSuite) TestForwarding() { return innerClient.SendRequest(ctx, addr, req, timeout) }} var storeState = uint32(unreachable) - tf := func(s *Store, bo *retry.Backoffer) livenessState { + s.regionRequestSender.regionCache.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState { if s.addr == leaderAddr { return livenessState(atomic.LoadUint32(&storeState)) } return reachable - } - s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + }) loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("k")) s.Nil(err) @@ -444,10 +443,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) - tf := func(s *Store, bo *retry.Backoffer) livenessState { - return unreachable - } - cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + unreachable.injectConstantLiveness(cache) s.IsType(&accessKnownLeader{}, replicaSelector.state) _, err = replicaSelector.next(s.bo) s.Nil(err) @@ -483,11 +479,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { // Do not try to use proxy if livenessState is unknown instead of unreachable. refreshEpochs(regionStore) cache.enableForwarding = true - tf = func(s *Store, bo *retry.Backoffer) livenessState { - return unknown - } - cache.testingKnobs.mockRequestLiveness.Store( - (*livenessFunc)(&tf)) + unknown.injectConstantLiveness(cache) replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) @@ -509,10 +501,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) - tf = func(s *Store, bo *retry.Backoffer) livenessState { - return unreachable - } - cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + unreachable.injectConstantLiveness(cache) s.Eventually(func() bool { return regionStore.stores[regionStore.workTiKVIdx].getLivenessState() == unreachable }, 3*time.Second, 200*time.Millisecond) @@ -772,11 +761,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { s.cluster.ChangeLeader(s.regionID, s.peerIDs[0]) // The leader store is alive but can't provide service. - - tf := func(s *Store, bo *retry.Backoffer) livenessState { - return reachable - } - s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + reachable.injectConstantLiveness(s.cache) s.Eventually(func() bool { stores := s.regionRequestSender.replicaSelector.regionStore.stores return stores[0].getLivenessState() == reachable && @@ -902,10 +887,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { } // Runs out of all replicas and then returns a send error. - tf = func(s *Store, bo *retry.Backoffer) livenessState { - return unreachable - } - s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + unreachable.injectConstantLiveness(s.cache) reloadRegion() for _, store := range s.storeIDs { s.cluster.StopStore(store) @@ -922,10 +904,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { // Verify switch to the leader immediately when stale read requests with global txn scope meet region errors. s.cluster.ChangeLeader(region.Region.id, s.peerIDs[0]) - tf = func(s *Store, bo *retry.Backoffer) livenessState { - return reachable - } - s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + reachable.injectConstantLiveness(s.cache) s.Eventually(func() bool { stores := s.regionRequestSender.replicaSelector.regionStore.stores return stores[0].getLivenessState() == reachable && @@ -1290,10 +1269,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { } // Test for write request. - tf := func(s *Store, bo *retry.Backoffer) livenessState { - return reachable - } - s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + reachable.injectConstantLiveness(s.cache) resetStats() req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{}, kvrpcpb.Context{}) req.ReplicaReadType = kv.ReplicaReadLeader diff --git a/internal/locate/region_request_state_test.go b/internal/locate/region_request_state_test.go index f012452b1..3636355e2 100644 --- a/internal/locate/region_request_state_test.go +++ b/internal/locate/region_request_state_test.go @@ -84,7 +84,7 @@ func (s *testRegionCacheStaleReadSuite) SetupTest() { } func (s *testRegionCacheStaleReadSuite) TearDownTest() { - s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(nil)) + s.cache.setMockRequestLiveness(nil) s.cache.Close() s.mvccStore.Close() } @@ -222,14 +222,13 @@ func (s *testRegionCacheStaleReadSuite) setClient() { return }} - tf := func(store *Store, bo *retry.Backoffer) livenessState { + s.cache.setMockRequestLiveness(func(ctx context.Context, store *Store) livenessState { _, ok := s.injection.unavailableStoreIDs[store.storeID] if ok { return unreachable } return reachable - } - s.cache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + }) } func (s *testRegionCacheStaleReadSuite) extractResp(resp *tikvrpc.Response) (uint64, string, SuccessReadType) { diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 53f9fe1cc..b79689fb2 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -748,9 +748,6 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { return rpcClient.SendRequest(ctx, server.Addr(), req, timeout) }} - tf := func(s *Store, bo *retry.Backoffer) livenessState { - return reachable - } defer func() { rpcClient.Close() @@ -775,7 +772,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { }() req := tikvrpc.NewRequest(tikvrpc.CmdCop, &coprocessor.Request{Data: []byte("a"), StartTs: 1}) regionRequestSender := NewRegionRequestSender(s.cache, fnClient) - regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + reachable.injectConstantLiveness(regionRequestSender.regionCache) regionRequestSender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) } }()