diff --git a/error/error.go b/error/error.go index 761ebef6d..62a75ff65 100644 --- a/error/error.go +++ b/error/error.go @@ -246,7 +246,7 @@ type ErrAssertionFailed struct { *kvrpcpb.AssertionFailed } -// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues“ is not. +// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues` is not. type ErrLockOnlyIfExistsNoReturnValue struct { StartTS uint64 ForUpdateTs uint64 diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 8581dc5b2..a5fb471af 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -153,6 +153,7 @@ type Region struct { syncFlag int32 // region need be sync in next turn lastAccess int64 // last region access time, see checkRegionCacheTTL invalidReason InvalidReason // the reason why the region is invalidated + asyncReload atomic.Bool // the region need to be reloaded in async mode } // AccessIndex represent the index for accessIndex array @@ -420,7 +421,7 @@ func newRegionIndexMu(rs []*Region) *regionIndexMu { r.latestVersions = make(map[uint64]RegionVerID) r.sorted = NewSortedRegions(btreeDegree) for _, region := range rs { - r.insertRegionToCache(region) + r.insertRegionToCache(region, true) } return r } @@ -466,6 +467,11 @@ type RegionCache struct { // requestLiveness always returns unreachable. mockRequestLiveness atomic.Pointer[livenessFunc] } + + regionsNeedReload struct { + sync.Mutex + regions []uint64 + } } // NewRegionCache creates a RegionCache. @@ -519,8 +525,8 @@ func (c *RegionCache) clear() { } // thread unsafe, should use with lock -func (c *RegionCache) insertRegionToCache(cachedRegion *Region) { - c.mu.insertRegionToCache(cachedRegion) +func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) { + c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion) } // Close releases region cache's resource. @@ -531,8 +537,13 @@ func (c *RegionCache) Close() { // asyncCheckAndResolveLoop with func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { ticker := time.NewTicker(interval) - defer ticker.Stop() + reloadRegionTicker := time.NewTicker(10 * time.Second) + defer func() { + ticker.Stop() + reloadRegionTicker.Stop() + }() var needCheckStores []*Store + reloadNextLoop := make(map[uint64]struct{}) for { needCheckStores = needCheckStores[:0] select { @@ -550,6 +561,21 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { // there's a deleted store in the stores map which guaranteed by reReslve(). return state != unresolved && state != tombstone && state != deleted }) + case <-reloadRegionTicker.C: + for regionID := range reloadNextLoop { + c.reloadRegion(regionID) + delete(reloadNextLoop, regionID) + } + c.regionsNeedReload.Lock() + for _, regionID := range c.regionsNeedReload.regions { + // will reload in next tick, wait a while for two reasons: + // 1. there may an unavailable duration while recreating the connection. + // 2. the store may just be started, and wait safe ts synced to avoid the + // possible dataIsNotReady error. + reloadNextLoop[regionID] = struct{}{} + } + c.regionsNeedReload.regions = c.regionsNeedReload.regions[:0] + c.regionsNeedReload.Unlock() } } } @@ -1060,7 +1086,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID()) r = lr c.mu.Lock() - c.insertRegionToCache(r) + c.insertRegionToCache(r, true) c.mu.Unlock() } else if r.checkNeedReloadAndMarkUpdated() { // load region when it be marked as need reload. @@ -1073,7 +1099,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to need-reload", lr.GetID()) r = lr c.mu.Lock() - c.insertRegionToCache(r) + c.insertRegionToCache(r, true) c.mu.Unlock() } } @@ -1214,7 +1240,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K } else { r = lr c.mu.Lock() - c.insertRegionToCache(r) + c.insertRegionToCache(r, true) c.mu.Unlock() } } @@ -1233,7 +1259,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K } c.mu.Lock() - c.insertRegionToCache(r) + c.insertRegionToCache(r, true) c.mu.Unlock() return &KeyLocation{ Region: r.VerID(), @@ -1243,6 +1269,36 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K }, nil } +func (c *RegionCache) scheduleReloadRegion(region *Region) { + if region == nil || !region.asyncReload.CompareAndSwap(false, true) { + // async reload scheduled by other thread. + return + } + regionID := region.GetID() + if regionID > 0 { + c.regionsNeedReload.Lock() + c.regionsNeedReload.regions = append(c.regionsNeedReload.regions, regionID) + c.regionsNeedReload.Unlock() + } +} + +func (c *RegionCache) reloadRegion(regionID uint64) { + bo := retry.NewNoopBackoff(context.Background()) + lr, err := c.loadRegionByID(bo, regionID) + if err != nil { + // ignore error and use old region info. + logutil.Logger(bo.GetCtx()).Error("load region failure", + zap.Uint64("regionID", regionID), zap.Error(err)) + if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil { + oldRegion.asyncReload.Store(false) + } + return + } + c.mu.Lock() + c.insertRegionToCache(lr, false) + c.mu.Unlock() +} + // GroupKeysByRegion separates keys into groups by their belonging Regions. // Specially it also returns the first key's region which may be used as the // 'PrimaryLockKey' and should be committed ahead of others. @@ -1327,7 +1383,7 @@ func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *retry.Backoffer, startKey // TODO(youjiali1995): scanRegions always fetch regions from PD and these regions don't contain buckets information // for less traffic, so newly inserted regions in region cache don't have buckets information. We should improve it. for _, region := range regions { - c.insertRegionToCache(region) + c.insertRegionToCache(region, true) } return @@ -1401,7 +1457,9 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin // insertRegionToCache tries to insert the Region to cache. // It should be protected by c.mu.l.Lock(). -func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) { +// if `invalidateOldRegion` is false, the old region cache should be still valid, +// and it may still be used by some kv requests. +func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) { oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion) if oldRegion != nil { store := cachedRegion.getStore() @@ -1416,8 +1474,11 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) { if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader { store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly)) } - // Invalidate the old region in case it's not invalidated and some requests try with the stale region information. - oldRegion.invalidate(Other) + // If the old region is still valid, do not invalidate it to avoid unnecessary backoff. + if invalidateOldRegion { + // Invalidate the old region in case it's not invalidated and some requests try with the stale region information. + oldRegion.invalidate(Other) + } // Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which // is under transferring regions. store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load()) @@ -1939,7 +2000,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext c.mu.Lock() for _, region := range newRegions { - c.insertRegionToCache(region) + c.insertRegionToCache(region, true) } c.mu.Unlock() @@ -2057,7 +2118,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV return } c.mu.Lock() - c.insertRegionToCache(new) + c.insertRegionToCache(new, true) c.mu.Unlock() }() } @@ -2527,8 +2588,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { } func (s *Store) getResolveState() resolveState { - var state resolveState if s == nil { + var state resolveState return state } return resolveState(atomic.LoadUint64(&s.state)) diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 03ef3e309..619da2d2e 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -966,7 +966,7 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV() { region := createSampleRegion([]byte("k1"), []byte("k2")) region.meta.Id = 1 region.meta.RegionEpoch = &metapb.RegionEpoch{Version: 10, ConfVer: 10} - cache.insertRegionToCache(region) + cache.insertRegionToCache(region, true) r1 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 9, ConfVer: 10}} r2 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 10, ConfVer: 9}} @@ -1257,7 +1257,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() { filterUnavailablePeers(cpRegion) region, err := newRegion(s.bo, s.cache, cpRegion) s.Nil(err) - s.cache.insertRegionToCache(region) + s.cache.insertRegionToCache(region, true) // OnSendFail should not panic s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail")) @@ -1293,7 +1293,7 @@ func (s *testRegionCacheSuite) TestPeersLenChangedByWitness() { cpRegion := &pd.Region{Meta: cpMeta} region, err := newRegion(s.bo, s.cache, cpRegion) s.Nil(err) - s.cache.insertRegionToCache(region) + s.cache.insertRegionToCache(region, true) // OnSendFail should not panic s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail")) @@ -1466,12 +1466,12 @@ func (s *testRegionCacheSuite) TestBuckets() { fakeRegion.setStore(cachedRegion.getStore().clone()) // no buckets fakeRegion.getStore().buckets = nil - s.cache.insertRegionToCache(fakeRegion) + s.cache.insertRegionToCache(fakeRegion, true) cachedRegion = s.getRegion([]byte("a")) s.Equal(defaultBuckets, cachedRegion.getStore().buckets) // stale buckets fakeRegion.getStore().buckets = &metapb.Buckets{Version: defaultBuckets.Version - 1} - s.cache.insertRegionToCache(fakeRegion) + s.cache.insertRegionToCache(fakeRegion, true) cachedRegion = s.getRegion([]byte("a")) s.Equal(defaultBuckets, cachedRegion.getStore().buckets) // new buckets @@ -1481,7 +1481,7 @@ func (s *testRegionCacheSuite) TestBuckets() { Keys: buckets.Keys, } fakeRegion.getStore().buckets = newBuckets - s.cache.insertRegionToCache(fakeRegion) + s.cache.insertRegionToCache(fakeRegion, true) cachedRegion = s.getRegion([]byte("a")) s.Equal(newBuckets, cachedRegion.getStore().buckets) @@ -1614,7 +1614,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() { region, err := s.cache.loadRegion(s.bo, []byte("c"), false) s.Nil(err) s.Equal(region.GetID(), regions[0]) - s.cache.insertRegionToCache(region) + s.cache.insertRegionToCache(region, true) loc, err = s.cache.LocateKey(s.bo, []byte{'c'}) s.Nil(err) s.Equal(loc.Region.GetID(), regions[0]) @@ -1625,7 +1625,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() { region, err = s.cache.loadRegion(s.bo, []byte("e"), false) s.Nil(err) s.Equal(region.GetID(), regions[0]) - s.cache.insertRegionToCache(region) + s.cache.insertRegionToCache(region, true) loc, err = s.cache.LocateKey(s.bo, []byte{'e'}) s.Nil(err) s.Equal(loc.Region.GetID(), regions[0]) @@ -1739,7 +1739,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() { v2 := region.Region.confVer + 1 r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}} st := &Store{storeID: s.store} - s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}) + s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true) r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10) s.Equal(len(r), 2) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 8e4cb1cab..9d3193431 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -572,18 +572,39 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector if state.option.preferLeader { state.lastIdx = state.leaderIdx } + var offset int + if state.lastIdx >= 0 { + offset = rand.Intn(replicaSize) + } + reloadRegion := false for i := 0; i < replicaSize && !state.option.leaderOnly; i++ { - idx := AccessIndex((int(state.lastIdx) + i) % replicaSize) - // If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader - // as candidates to serve the Read request. Meanwhile, we should make the choice of next() meet Uniform Distribution. - for cnt := 0; cnt < replicaSize && !state.isCandidate(idx, selector.replicas[idx]); cnt++ { - idx = AccessIndex((int(idx) + rand.Intn(replicaSize)) % replicaSize) + var idx AccessIndex + if state.option.preferLeader { + if i == 0 { + idx = state.lastIdx + } else { + // randomly select next replica, but skip state.lastIdx + if (i+offset)%replicaSize == 0 { + offset++ + } + } + } else { + idx = AccessIndex((int(state.lastIdx) + i) % replicaSize) } - if state.isCandidate(idx, selector.replicas[idx]) { + selectReplica := selector.replicas[idx] + if state.isCandidate(idx, selectReplica) { state.lastIdx = idx selector.targetIdx = idx break } + if selectReplica.isEpochStale() && + selectReplica.store.getResolveState() == resolved && + selectReplica.store.getLivenessState() == reachable { + reloadRegion = true + } + } + if reloadRegion { + selector.regionCache.scheduleReloadRegion(selector.region) } // If there is no candidate, fallback to the leader. if selector.targetIdx < 0 { diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index e05c2cac5..ad29e4710 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -322,7 +322,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { cache := NewRegionCache(s.cache.pdClient) defer cache.Close() cache.mu.Lock() - cache.insertRegionToCache(region) + cache.insertRegionToCache(region, true) cache.mu.Unlock() // Test accessFollower state with kv.ReplicaReadLearner request type. @@ -373,7 +373,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { cache := NewRegionCache(s.cache.pdClient) defer cache.Close() cache.mu.Lock() - cache.insertRegionToCache(region) + cache.insertRegionToCache(region, true) cache.mu.Unlock() // Verify creating the replicaSelector. diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 7f52d34cd..42c8ceb34 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -608,7 +608,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() { v2 := region.Region.confVer + 1 r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}} st := &Store{storeID: s.store} - s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}) + s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) @@ -618,7 +618,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() { v3 := region.Region.confVer + 1 r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: v3, ConfVer: region.Region.confVer}, StartKey: []byte{2}} st = &Store{storeID: s.store} - s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}) + s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region)