Skip to content

Commit

Permalink
reload region cache when store is resolved from invalid status (#843)
Browse files Browse the repository at this point in the history
Signed-off-by: you06 <[email protected]>
Co-authored-by: disksing <[email protected]>
  • Loading branch information
you06 and disksing committed Jul 14, 2023
1 parent 51aab26 commit 85fc8f3
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 35 deletions.
2 changes: 1 addition & 1 deletion error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ type ErrAssertionFailed struct {
*kvrpcpb.AssertionFailed
}

// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues is not.
// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues` is not.
type ErrLockOnlyIfExistsNoReturnValue struct {
StartTS uint64
ForUpdateTs uint64
Expand Down
91 changes: 76 additions & 15 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type Region struct {
syncFlag int32 // region need be sync in next turn
lastAccess int64 // last region access time, see checkRegionCacheTTL
invalidReason InvalidReason // the reason why the region is invalidated
asyncReload atomic.Bool // the region need to be reloaded in async mode
}

// AccessIndex represent the index for accessIndex array
Expand Down Expand Up @@ -420,7 +421,7 @@ func newRegionIndexMu(rs []*Region) *regionIndexMu {
r.latestVersions = make(map[uint64]RegionVerID)
r.sorted = NewSortedRegions(btreeDegree)
for _, region := range rs {
r.insertRegionToCache(region)
r.insertRegionToCache(region, true)
}
return r
}
Expand Down Expand Up @@ -466,6 +467,11 @@ type RegionCache struct {
// requestLiveness always returns unreachable.
mockRequestLiveness atomic.Pointer[livenessFunc]
}

regionsNeedReload struct {
sync.Mutex
regions []uint64
}
}

// NewRegionCache creates a RegionCache.
Expand Down Expand Up @@ -519,8 +525,8 @@ func (c *RegionCache) clear() {
}

// thread unsafe, should use with lock
func (c *RegionCache) insertRegionToCache(cachedRegion *Region) {
c.mu.insertRegionToCache(cachedRegion)
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion)
}

// Close releases region cache's resource.
Expand All @@ -531,8 +537,13 @@ func (c *RegionCache) Close() {
// asyncCheckAndResolveLoop with
func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
reloadRegionTicker := time.NewTicker(10 * time.Second)
defer func() {
ticker.Stop()
reloadRegionTicker.Stop()
}()
var needCheckStores []*Store
reloadNextLoop := make(map[uint64]struct{})
for {
needCheckStores = needCheckStores[:0]
select {
Expand All @@ -550,6 +561,21 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
// there's a deleted store in the stores map which guaranteed by reReslve().
return state != unresolved && state != tombstone && state != deleted
})
case <-reloadRegionTicker.C:
for regionID := range reloadNextLoop {
c.reloadRegion(regionID)
delete(reloadNextLoop, regionID)
}
c.regionsNeedReload.Lock()
for _, regionID := range c.regionsNeedReload.regions {
// will reload in next tick, wait a while for two reasons:
// 1. there may an unavailable duration while recreating the connection.
// 2. the store may just be started, and wait safe ts synced to avoid the
// possible dataIsNotReady error.
reloadNextLoop[regionID] = struct{}{}
}
c.regionsNeedReload.regions = c.regionsNeedReload.regions[:0]
c.regionsNeedReload.Unlock()
}
}
}
Expand Down Expand Up @@ -1060,7 +1086,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID())
r = lr
c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
} else if r.checkNeedReloadAndMarkUpdated() {
// load region when it be marked as need reload.
Expand All @@ -1073,7 +1099,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to need-reload", lr.GetID())
r = lr
c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
}
}
Expand Down Expand Up @@ -1214,7 +1240,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
} else {
r = lr
c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
}
}
Expand All @@ -1233,7 +1259,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
}

c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
return &KeyLocation{
Region: r.VerID(),
Expand All @@ -1243,6 +1269,36 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
}, nil
}

func (c *RegionCache) scheduleReloadRegion(region *Region) {
if region == nil || !region.asyncReload.CompareAndSwap(false, true) {
// async reload scheduled by other thread.
return
}
regionID := region.GetID()
if regionID > 0 {
c.regionsNeedReload.Lock()
c.regionsNeedReload.regions = append(c.regionsNeedReload.regions, regionID)
c.regionsNeedReload.Unlock()
}
}

func (c *RegionCache) reloadRegion(regionID uint64) {
bo := retry.NewNoopBackoff(context.Background())
lr, err := c.loadRegionByID(bo, regionID)
if err != nil {
// ignore error and use old region info.
logutil.Logger(bo.GetCtx()).Error("load region failure",
zap.Uint64("regionID", regionID), zap.Error(err))
if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil {
oldRegion.asyncReload.Store(false)
}
return
}
c.mu.Lock()
c.insertRegionToCache(lr, false)
c.mu.Unlock()
}

// GroupKeysByRegion separates keys into groups by their belonging Regions.
// Specially it also returns the first key's region which may be used as the
// 'PrimaryLockKey' and should be committed ahead of others.
Expand Down Expand Up @@ -1327,7 +1383,7 @@ func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *retry.Backoffer, startKey
// TODO(youjiali1995): scanRegions always fetch regions from PD and these regions don't contain buckets information
// for less traffic, so newly inserted regions in region cache don't have buckets information. We should improve it.
for _, region := range regions {
c.insertRegionToCache(region)
c.insertRegionToCache(region, true)
}

return
Expand Down Expand Up @@ -1401,7 +1457,9 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin

// insertRegionToCache tries to insert the Region to cache.
// It should be protected by c.mu.l.Lock().
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) {
// if `invalidateOldRegion` is false, the old region cache should be still valid,
// and it may still be used by some kv requests.
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion)
if oldRegion != nil {
store := cachedRegion.getStore()
Expand All @@ -1416,8 +1474,11 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) {
if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader {
store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly))
}
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
oldRegion.invalidate(Other)
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
if invalidateOldRegion {
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
oldRegion.invalidate(Other)
}
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
// is under transferring regions.
store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load())
Expand Down Expand Up @@ -1939,7 +2000,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext

c.mu.Lock()
for _, region := range newRegions {
c.insertRegionToCache(region)
c.insertRegionToCache(region, true)
}
c.mu.Unlock()

Expand Down Expand Up @@ -2057,7 +2118,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV
return
}
c.mu.Lock()
c.insertRegionToCache(new)
c.insertRegionToCache(new, true)
c.mu.Unlock()
}()
}
Expand Down Expand Up @@ -2527,8 +2588,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
}

func (s *Store) getResolveState() resolveState {
var state resolveState
if s == nil {
var state resolveState
return state
}
return resolveState(atomic.LoadUint64(&s.state))
Expand Down
18 changes: 9 additions & 9 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV() {
region := createSampleRegion([]byte("k1"), []byte("k2"))
region.meta.Id = 1
region.meta.RegionEpoch = &metapb.RegionEpoch{Version: 10, ConfVer: 10}
cache.insertRegionToCache(region)
cache.insertRegionToCache(region, true)

r1 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 9, ConfVer: 10}}
r2 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 10, ConfVer: 9}}
Expand Down Expand Up @@ -1257,7 +1257,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() {
filterUnavailablePeers(cpRegion)
region, err := newRegion(s.bo, s.cache, cpRegion)
s.Nil(err)
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)

// OnSendFail should not panic
s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
Expand Down Expand Up @@ -1293,7 +1293,7 @@ func (s *testRegionCacheSuite) TestPeersLenChangedByWitness() {
cpRegion := &pd.Region{Meta: cpMeta}
region, err := newRegion(s.bo, s.cache, cpRegion)
s.Nil(err)
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)

// OnSendFail should not panic
s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
Expand Down Expand Up @@ -1466,12 +1466,12 @@ func (s *testRegionCacheSuite) TestBuckets() {
fakeRegion.setStore(cachedRegion.getStore().clone())
// no buckets
fakeRegion.getStore().buckets = nil
s.cache.insertRegionToCache(fakeRegion)
s.cache.insertRegionToCache(fakeRegion, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
// stale buckets
fakeRegion.getStore().buckets = &metapb.Buckets{Version: defaultBuckets.Version - 1}
s.cache.insertRegionToCache(fakeRegion)
s.cache.insertRegionToCache(fakeRegion, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
// new buckets
Expand All @@ -1481,7 +1481,7 @@ func (s *testRegionCacheSuite) TestBuckets() {
Keys: buckets.Keys,
}
fakeRegion.getStore().buckets = newBuckets
s.cache.insertRegionToCache(fakeRegion)
s.cache.insertRegionToCache(fakeRegion, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(newBuckets, cachedRegion.getStore().buckets)

Expand Down Expand Up @@ -1614,7 +1614,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
region, err := s.cache.loadRegion(s.bo, []byte("c"), false)
s.Nil(err)
s.Equal(region.GetID(), regions[0])
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)
loc, err = s.cache.LocateKey(s.bo, []byte{'c'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[0])
Expand All @@ -1625,7 +1625,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
region, err = s.cache.loadRegion(s.bo, []byte("e"), false)
s.Nil(err)
s.Equal(region.GetID(), regions[0])
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)
loc, err = s.cache.LocateKey(s.bo, []byte{'e'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[0])
Expand Down Expand Up @@ -1739,7 +1739,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() {
v2 := region.Region.confVer + 1
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
st := &Store{storeID: s.store}
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()})
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)

r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10)
s.Equal(len(r), 2)
Expand Down
33 changes: 27 additions & 6 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,18 +572,39 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
if state.option.preferLeader {
state.lastIdx = state.leaderIdx
}
var offset int
if state.lastIdx >= 0 {
offset = rand.Intn(replicaSize)
}
reloadRegion := false
for i := 0; i < replicaSize && !state.option.leaderOnly; i++ {
idx := AccessIndex((int(state.lastIdx) + i) % replicaSize)
// If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader
// as candidates to serve the Read request. Meanwhile, we should make the choice of next() meet Uniform Distribution.
for cnt := 0; cnt < replicaSize && !state.isCandidate(idx, selector.replicas[idx]); cnt++ {
idx = AccessIndex((int(idx) + rand.Intn(replicaSize)) % replicaSize)
var idx AccessIndex
if state.option.preferLeader {
if i == 0 {
idx = state.lastIdx
} else {
// randomly select next replica, but skip state.lastIdx
if (i+offset)%replicaSize == 0 {
offset++
}
}
} else {
idx = AccessIndex((int(state.lastIdx) + i) % replicaSize)
}
if state.isCandidate(idx, selector.replicas[idx]) {
selectReplica := selector.replicas[idx]
if state.isCandidate(idx, selectReplica) {
state.lastIdx = idx
selector.targetIdx = idx
break
}
if selectReplica.isEpochStale() &&
selectReplica.store.getResolveState() == resolved &&
selectReplica.store.getLivenessState() == reachable {
reloadRegion = true
}
}
if reloadRegion {
selector.regionCache.scheduleReloadRegion(selector.region)
}
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
Expand Down
4 changes: 2 additions & 2 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {
cache := NewRegionCache(s.cache.pdClient)
defer cache.Close()
cache.mu.Lock()
cache.insertRegionToCache(region)
cache.insertRegionToCache(region, true)
cache.mu.Unlock()

// Test accessFollower state with kv.ReplicaReadLearner request type.
Expand Down Expand Up @@ -373,7 +373,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
cache := NewRegionCache(s.cache.pdClient)
defer cache.Close()
cache.mu.Lock()
cache.insertRegionToCache(region)
cache.insertRegionToCache(region, true)
cache.mu.Unlock()

// Verify creating the replicaSelector.
Expand Down
4 changes: 2 additions & 2 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() {
v2 := region.Region.confVer + 1
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
st := &Store{storeID: s.store}
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()})
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)
region, err = s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
Expand All @@ -618,7 +618,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() {
v3 := region.Region.confVer + 1
r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: v3, ConfVer: region.Region.confVer}, StartKey: []byte{2}}
st = &Store{storeID: s.store}
s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()})
s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)
region, err = s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
Expand Down

0 comments on commit 85fc8f3

Please sign in to comment.