Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reload region cache when store is resolved from invalid status #843

Merged
merged 14 commits into from
Jul 14, 2023
2 changes: 1 addition & 1 deletion error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ type ErrAssertionFailed struct {
*kvrpcpb.AssertionFailed
}

// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues is not.
// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues` is not.
type ErrLockOnlyIfExistsNoReturnValue struct {
StartTS uint64
ForUpdateTs uint64
Expand Down
91 changes: 76 additions & 15 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type Region struct {
syncFlag int32 // region need be sync in next turn
lastAccess int64 // last region access time, see checkRegionCacheTTL
invalidReason InvalidReason // the reason why the region is invalidated
asyncReload atomic.Bool // the region need to be reloaded in async mode
}

// AccessIndex represent the index for accessIndex array
Expand Down Expand Up @@ -420,7 +421,7 @@ func newRegionIndexMu(rs []*Region) *regionIndexMu {
r.latestVersions = make(map[uint64]RegionVerID)
r.sorted = NewSortedRegions(btreeDegree)
for _, region := range rs {
r.insertRegionToCache(region)
r.insertRegionToCache(region, true)
}
return r
}
Expand Down Expand Up @@ -466,6 +467,11 @@ type RegionCache struct {
// requestLiveness always returns unreachable.
mockRequestLiveness atomic.Pointer[livenessFunc]
}

regionsNeedReload struct {
you06 marked this conversation as resolved.
Show resolved Hide resolved
sync.Mutex
regions []uint64
}
}

// NewRegionCache creates a RegionCache.
Expand Down Expand Up @@ -519,8 +525,8 @@ func (c *RegionCache) clear() {
}

// thread unsafe, should use with lock
func (c *RegionCache) insertRegionToCache(cachedRegion *Region) {
c.mu.insertRegionToCache(cachedRegion)
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion)
}

// Close releases region cache's resource.
Expand All @@ -531,8 +537,13 @@ func (c *RegionCache) Close() {
// asyncCheckAndResolveLoop with
func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
reloadRegionTicker := time.NewTicker(10 * time.Second)
defer func() {
ticker.Stop()
reloadRegionTicker.Stop()
}()
var needCheckStores []*Store
reloadNextLoop := make(map[uint64]struct{})
for {
needCheckStores = needCheckStores[:0]
select {
Expand All @@ -550,6 +561,21 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
// there's a deleted store in the stores map which guaranteed by reReslve().
return state != unresolved && state != tombstone && state != deleted
})
case <-reloadRegionTicker.C:
for regionID := range reloadNextLoop {
c.reloadRegion(regionID)
delete(reloadNextLoop, regionID)
}
c.regionsNeedReload.Lock()
for _, regionID := range c.regionsNeedReload.regions {
// will reload in next tick, wait a while for two reasons:
// 1. there may an unavailable duration while recreating the connection.
// 2. the store may just be started, and wait safe ts synced to avoid the
// possible dataIsNotReady error.
reloadNextLoop[regionID] = struct{}{}
}
c.regionsNeedReload.regions = c.regionsNeedReload.regions[:0]
c.regionsNeedReload.Unlock()
}
}
}
Expand Down Expand Up @@ -1060,7 +1086,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID())
r = lr
c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
} else if r.checkNeedReloadAndMarkUpdated() {
// load region when it be marked as need reload.
Expand All @@ -1073,7 +1099,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to need-reload", lr.GetID())
r = lr
c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
}
}
Expand Down Expand Up @@ -1214,7 +1240,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
} else {
r = lr
c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
}
}
Expand All @@ -1233,7 +1259,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
}

c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
return &KeyLocation{
Region: r.VerID(),
Expand All @@ -1243,6 +1269,36 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
}, nil
}

func (c *RegionCache) scheduleReloadRegion(region *Region) {
if region == nil || !region.asyncReload.CompareAndSwap(false, true) {
// async reload scheduled by other thread.
return
}
regionID := region.GetID()
if regionID > 0 {
c.regionsNeedReload.Lock()
c.regionsNeedReload.regions = append(c.regionsNeedReload.regions, regionID)
c.regionsNeedReload.Unlock()
}
}

func (c *RegionCache) reloadRegion(regionID uint64) {
bo := retry.NewNoopBackoff(context.Background())
lr, err := c.loadRegionByID(bo, regionID)
if err != nil {
// ignore error and use old region info.
logutil.Logger(bo.GetCtx()).Error("load region failure",
zap.Uint64("regionID", regionID), zap.Error(err))
if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil {
oldRegion.asyncReload.Store(false)
}
return
}
c.mu.Lock()
c.insertRegionToCache(lr, false)
c.mu.Unlock()
}

// GroupKeysByRegion separates keys into groups by their belonging Regions.
// Specially it also returns the first key's region which may be used as the
// 'PrimaryLockKey' and should be committed ahead of others.
Expand Down Expand Up @@ -1327,7 +1383,7 @@ func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *retry.Backoffer, startKey
// TODO(youjiali1995): scanRegions always fetch regions from PD and these regions don't contain buckets information
// for less traffic, so newly inserted regions in region cache don't have buckets information. We should improve it.
for _, region := range regions {
c.insertRegionToCache(region)
c.insertRegionToCache(region, true)
}

return
Expand Down Expand Up @@ -1401,7 +1457,9 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin

// insertRegionToCache tries to insert the Region to cache.
// It should be protected by c.mu.l.Lock().
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) {
// if `invalidateOldRegion` is false, the old region cache should be still valid,
// and it may still be used by some kv requests.
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion)
if oldRegion != nil {
store := cachedRegion.getStore()
Expand All @@ -1416,8 +1474,11 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region) {
if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader {
store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly))
}
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
oldRegion.invalidate(Other)
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
if invalidateOldRegion {
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
oldRegion.invalidate(Other)
}
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
// is under transferring regions.
store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load())
Expand Down Expand Up @@ -1939,7 +2000,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext

c.mu.Lock()
for _, region := range newRegions {
c.insertRegionToCache(region)
c.insertRegionToCache(region, true)
}
c.mu.Unlock()

Expand Down Expand Up @@ -2057,7 +2118,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV
return
}
c.mu.Lock()
c.insertRegionToCache(new)
c.insertRegionToCache(new, true)
c.mu.Unlock()
}()
}
Expand Down Expand Up @@ -2527,8 +2588,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
}

func (s *Store) getResolveState() resolveState {
var state resolveState
if s == nil {
var state resolveState
return state
}
return resolveState(atomic.LoadUint64(&s.state))
Expand Down
18 changes: 9 additions & 9 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV() {
region := createSampleRegion([]byte("k1"), []byte("k2"))
region.meta.Id = 1
region.meta.RegionEpoch = &metapb.RegionEpoch{Version: 10, ConfVer: 10}
cache.insertRegionToCache(region)
cache.insertRegionToCache(region, true)

r1 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 9, ConfVer: 10}}
r2 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 10, ConfVer: 9}}
Expand Down Expand Up @@ -1257,7 +1257,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() {
filterUnavailablePeers(cpRegion)
region, err := newRegion(s.bo, s.cache, cpRegion)
s.Nil(err)
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)

// OnSendFail should not panic
s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
Expand Down Expand Up @@ -1293,7 +1293,7 @@ func (s *testRegionCacheSuite) TestPeersLenChangedByWitness() {
cpRegion := &pd.Region{Meta: cpMeta}
region, err := newRegion(s.bo, s.cache, cpRegion)
s.Nil(err)
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)

// OnSendFail should not panic
s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
Expand Down Expand Up @@ -1466,12 +1466,12 @@ func (s *testRegionCacheSuite) TestBuckets() {
fakeRegion.setStore(cachedRegion.getStore().clone())
// no buckets
fakeRegion.getStore().buckets = nil
s.cache.insertRegionToCache(fakeRegion)
s.cache.insertRegionToCache(fakeRegion, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
// stale buckets
fakeRegion.getStore().buckets = &metapb.Buckets{Version: defaultBuckets.Version - 1}
s.cache.insertRegionToCache(fakeRegion)
s.cache.insertRegionToCache(fakeRegion, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
// new buckets
Expand All @@ -1481,7 +1481,7 @@ func (s *testRegionCacheSuite) TestBuckets() {
Keys: buckets.Keys,
}
fakeRegion.getStore().buckets = newBuckets
s.cache.insertRegionToCache(fakeRegion)
s.cache.insertRegionToCache(fakeRegion, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(newBuckets, cachedRegion.getStore().buckets)

Expand Down Expand Up @@ -1614,7 +1614,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
region, err := s.cache.loadRegion(s.bo, []byte("c"), false)
s.Nil(err)
s.Equal(region.GetID(), regions[0])
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)
loc, err = s.cache.LocateKey(s.bo, []byte{'c'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[0])
Expand All @@ -1625,7 +1625,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
region, err = s.cache.loadRegion(s.bo, []byte("e"), false)
s.Nil(err)
s.Equal(region.GetID(), regions[0])
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)
loc, err = s.cache.LocateKey(s.bo, []byte{'e'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[0])
Expand Down Expand Up @@ -1739,7 +1739,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() {
v2 := region.Region.confVer + 1
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
st := &Store{storeID: s.store}
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()})
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)

r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10)
s.Equal(len(r), 2)
Expand Down
33 changes: 27 additions & 6 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,18 +572,39 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
if state.option.preferLeader {
state.lastIdx = state.leaderIdx
}
var offset int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Peer selection can become quite complex when the accessFollower is used for all follower read and stale read processing, with various configurations. To prevent any potential issues, it may be beneficial to separate them into different categories.

ReplicaSelector itself is quite complex too..

Copy link
Contributor

@cfzjywxk cfzjywxk Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing with @you06, we could do some refactoring on the master branch like

match access_type {
     leade_only =>
     follower_read_random =>
     follower_read_replica_only =>
     follower_read_closet => 
     stale_read_random => 
     stale_read_closest =>
     stale_read_retry =>
}

while keeping the change on release-6.5 as simple as possible to reduce the risk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the refactor can be in another PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok let's merge this first

if state.lastIdx >= 0 {
offset = rand.Intn(replicaSize)
}
reloadRegion := false
for i := 0; i < replicaSize && !state.option.leaderOnly; i++ {
idx := AccessIndex((int(state.lastIdx) + i) % replicaSize)
// If the given store is abnormal to be accessed under `ReplicaReadMixed` mode, we should choose other followers or leader
// as candidates to serve the Read request. Meanwhile, we should make the choice of next() meet Uniform Distribution.
for cnt := 0; cnt < replicaSize && !state.isCandidate(idx, selector.replicas[idx]); cnt++ {
idx = AccessIndex((int(idx) + rand.Intn(replicaSize)) % replicaSize)
var idx AccessIndex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still recommend to revert to the original one, current implementation is bit more complex and hard to read.

if state.option.preferLeader {
if i == 0 {
idx = state.lastIdx
} else {
// randomly select next replica, but skip state.lastIdx
if (i+offset)%replicaSize == 0 {
offset++
}
}
} else {
idx = AccessIndex((int(state.lastIdx) + i) % replicaSize)
}
if state.isCandidate(idx, selector.replicas[idx]) {
selectReplica := selector.replicas[idx]
if state.isCandidate(idx, selectReplica) {
state.lastIdx = idx
selector.targetIdx = idx
break
}
if selectReplica.isEpochStale() &&
selectReplica.store.getResolveState() == resolved &&
selectReplica.store.getLivenessState() == reachable {
reloadRegion = true
}
}
if reloadRegion {
selector.regionCache.scheduleReloadRegion(selector.region)
}
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
Expand Down
4 changes: 2 additions & 2 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {
cache := NewRegionCache(s.cache.pdClient)
defer cache.Close()
cache.mu.Lock()
cache.insertRegionToCache(region)
cache.insertRegionToCache(region, true)
cache.mu.Unlock()

// Test accessFollower state with kv.ReplicaReadLearner request type.
Expand Down Expand Up @@ -373,7 +373,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
cache := NewRegionCache(s.cache.pdClient)
defer cache.Close()
cache.mu.Lock()
cache.insertRegionToCache(region)
cache.insertRegionToCache(region, true)
cache.mu.Unlock()

// Verify creating the replicaSelector.
Expand Down
4 changes: 2 additions & 2 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() {
v2 := region.Region.confVer + 1
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
st := &Store{storeID: s.store}
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()})
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)
region, err = s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
Expand All @@ -618,7 +618,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() {
v3 := region.Region.confVer + 1
r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: v3, ConfVer: region.Region.confVer}, StartKey: []byte{2}}
st = &Store{storeID: s.store}
s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()})
s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), lastAccess: time.Now().Unix()}, true)
region, err = s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
s.NotNil(region)
Expand Down