Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reload region cache when store is resolved from invalid status (#843) #846

Merged
merged 12 commits into from
Jun 28, 2023
2 changes: 1 addition & 1 deletion error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ type ErrAssertionFailed struct {
*kvrpcpb.AssertionFailed
}

// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues`` is not.
// ErrLockOnlyIfExistsNoReturnValue is used when the flag `LockOnlyIfExists` of `LockCtx` is set, but `ReturnValues` is not.
type ErrLockOnlyIfExistsNoReturnValue struct {
StartTS uint64
ForUpdateTs uint64
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ type checkRequestClient struct {

func (c *checkRequestClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
resp, err := c.Client.SendRequest(ctx, addr, req, timeout)
if err != nil {
return resp, err
}
if c.priority != req.Priority {
if resp.Resp != nil {
if getResp, ok := resp.Resp.(*kvrpcpb.GetResponse); ok {
Expand Down
97 changes: 81 additions & 16 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type Region struct {
syncFlag int32 // region need be sync in next turn
lastAccess int64 // last region access time, see checkRegionCacheTTL
invalidReason InvalidReason // the reason why the region is invalidated
asyncReload int32 // the region need to be reloaded in async mode
}

// AccessIndex represent the index for accessIndex array
Expand Down Expand Up @@ -363,6 +364,8 @@ func (r *Region) isValid() bool {
return r != nil && !r.checkNeedReload() && r.checkRegionCacheTTL(time.Now().Unix())
}

type livenessFunc func(s *Store, bo *retry.Backoffer) livenessState

// RegionCache caches Regions loaded from PD.
// All public methods of this struct should be thread-safe, unless explicitly pointed out or the method is for testing
// purposes only.
Expand Down Expand Up @@ -395,7 +398,12 @@ type RegionCache struct {
testingKnobs struct {
// Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set,
// requestLiveness always returns unreachable.
mockRequestLiveness func(s *Store, bo *retry.Backoffer) livenessState
mockRequestLiveness atomic.Value
}

regionsNeedReload struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using a channel so Mutex could be saved and operations on the RegionCache are already synchronized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A channel is bounded, if it's full when trying scheduling a region to it, it'll wait(maybe the asyncCheckAndResolveLoop is doing something and cannot pull the channel immediately).

sync.Mutex
regions []uint64
}
}

Expand Down Expand Up @@ -447,8 +455,13 @@ func (c *RegionCache) Close() {
// asyncCheckAndResolveLoop with
func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
reloadRegionTicker := time.NewTicker(10 * time.Second)
defer func() {
ticker.Stop()
reloadRegionTicker.Stop()
}()
var needCheckStores []*Store
reloadNextLoop := make(map[uint64]struct{})
for {
needCheckStores = needCheckStores[:0]
select {
Expand All @@ -466,6 +479,22 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
// there's a deleted store in the stores map which guaranteed by reReslve().
return state != unresolved && state != tombstone && state != deleted
})

case <-reloadRegionTicker.C:
for regionID := range reloadNextLoop {
c.reloadRegion(regionID)
delete(reloadNextLoop, regionID)
}
c.regionsNeedReload.Lock()
for _, regionID := range c.regionsNeedReload.regions {
// will reload in next tick, wait a while for two reasons:
// 1. there may an unavailable duration while recreating the connection.
// 2. the store may just be started, and wait safe ts synced to avoid the
// possible dataIsNotReady error.
reloadNextLoop[regionID] = struct{}{}
}
c.regionsNeedReload.regions = c.regionsNeedReload.regions[:0]
c.regionsNeedReload.Unlock()
}
}
}
Expand Down Expand Up @@ -967,7 +996,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID())
r = lr
c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
} else if r.checkNeedReloadAndMarkUpdated() {
// load region when it be marked as need reload.
Expand All @@ -980,7 +1009,7 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to need-reload", lr.GetID())
r = lr
c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
}
}
Expand Down Expand Up @@ -1113,7 +1142,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
} else {
r = lr
c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
}
}
Expand All @@ -1132,7 +1161,7 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
}

c.mu.Lock()
c.insertRegionToCache(r)
c.insertRegionToCache(r, true)
c.mu.Unlock()
return &KeyLocation{
Region: r.VerID(),
Expand All @@ -1142,6 +1171,36 @@ func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*K
}, nil
}

func (c *RegionCache) scheduleReloadRegion(region *Region) {
if region == nil || !atomic.CompareAndSwapInt32(&region.asyncReload, 0, 1) {
// async reload triggered by other thread.
return
}
regionID := region.GetID()
if regionID > 0 {
c.regionsNeedReload.Lock()
c.regionsNeedReload.regions = append(c.regionsNeedReload.regions, regionID)
c.regionsNeedReload.Unlock()
}
}

func (c *RegionCache) reloadRegion(regionID uint64) {
bo := retry.NewNoopBackoff(c.ctx)
lr, err := c.loadRegionByID(bo, regionID)
if err != nil {
// ignore error and use old region info.
logutil.Logger(bo.GetCtx()).Error("load region failure",
zap.Uint64("regionID", regionID), zap.Error(err))
if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil {
atomic.StoreInt32(&oldRegion.asyncReload, 0)
}
return
}
c.mu.Lock()
c.insertRegionToCache(lr, false)
c.mu.Unlock()
}

// GroupKeysByRegion separates keys into groups by their belonging Regions.
// Specially it also returns the first key's region which may be used as the
// 'PrimaryLockKey' and should be committed ahead of others.
Expand Down Expand Up @@ -1226,7 +1285,7 @@ func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *retry.Backoffer, startKey
// TODO(youjiali1995): scanRegions always fetch regions from PD and these regions don't contain buckets information
// for less traffic, so newly inserted regions in region cache don't have buckets information. We should improve it.
for _, region := range regions {
c.insertRegionToCache(region)
c.insertRegionToCache(region, true)
}

return
Expand Down Expand Up @@ -1300,7 +1359,7 @@ func (c *RegionCache) removeVersionFromCache(oldVer RegionVerID, regionID uint64

// insertRegionToCache tries to insert the Region to cache.
// It should be protected by c.mu.Lock().
func (c *RegionCache) insertRegionToCache(cachedRegion *Region) {
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to comment about the new parameter.

oldRegion := c.mu.sorted.ReplaceOrInsert(cachedRegion)
if oldRegion != nil {
store := cachedRegion.getStore()
Expand All @@ -1315,8 +1374,11 @@ func (c *RegionCache) insertRegionToCache(cachedRegion *Region) {
if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader {
store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly))
}
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
oldRegion.invalidate(Other)
// If the region info is async reloaded, the old region is still valid.
if invalidateOldRegion {
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
oldRegion.invalidate(Other)
}
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
// is under transferring regions.
store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load())
Expand Down Expand Up @@ -1804,7 +1866,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext

c.mu.Lock()
for _, region := range newRegions {
c.insertRegionToCache(region)
c.insertRegionToCache(region, true)
}
c.mu.Unlock()

Expand Down Expand Up @@ -1918,7 +1980,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV
return
}
c.mu.Lock()
c.insertRegionToCache(new)
c.insertRegionToCache(new, true)
c.mu.Unlock()
}()
}
Expand Down Expand Up @@ -2371,9 +2433,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
}

func (s *Store) getResolveState() resolveState {
var state resolveState
if s == nil {
return state
return unresolved
}
return resolveState(atomic.LoadUint64(&s.state))
}
Expand Down Expand Up @@ -2544,8 +2605,12 @@ func (s *Store) requestLiveness(bo *retry.Backoffer, c *RegionCache) (l liveness
return unknown
}
}
if c != nil && c.testingKnobs.mockRequestLiveness != nil {
return c.testingKnobs.mockRequestLiveness(s, bo)

if c != nil {
lf := c.testingKnobs.mockRequestLiveness.Load()
if lf != nil {
return (*lf.(*livenessFunc))(s, bo)
}
}

if storeLivenessTimeout == 0 {
Expand Down
14 changes: 7 additions & 7 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV() {
region := createSampleRegion([]byte("k1"), []byte("k2"))
region.meta.Id = 1
region.meta.RegionEpoch = &metapb.RegionEpoch{Version: 10, ConfVer: 10}
cache.insertRegionToCache(region)
cache.insertRegionToCache(region, true)

r1 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 9, ConfVer: 10}}
r2 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 10, ConfVer: 9}}
Expand Down Expand Up @@ -1255,7 +1255,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange() {
filterUnavailablePeers(cpRegion)
region, err := newRegion(s.bo, s.cache, cpRegion)
s.Nil(err)
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)

// OnSendFail should not panic
s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail"))
Expand Down Expand Up @@ -1428,12 +1428,12 @@ func (s *testRegionCacheSuite) TestBuckets() {
fakeRegion.setStore(cachedRegion.getStore().clone())
// no buckets
fakeRegion.getStore().buckets = nil
s.cache.insertRegionToCache(fakeRegion)
s.cache.insertRegionToCache(fakeRegion, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
// stale buckets
fakeRegion.getStore().buckets = &metapb.Buckets{Version: defaultBuckets.Version - 1}
s.cache.insertRegionToCache(fakeRegion)
s.cache.insertRegionToCache(fakeRegion, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(defaultBuckets, cachedRegion.getStore().buckets)
// new buckets
Expand All @@ -1443,7 +1443,7 @@ func (s *testRegionCacheSuite) TestBuckets() {
Keys: buckets.Keys,
}
fakeRegion.getStore().buckets = newBuckets
s.cache.insertRegionToCache(fakeRegion)
s.cache.insertRegionToCache(fakeRegion, true)
cachedRegion = s.getRegion([]byte("a"))
s.Equal(newBuckets, cachedRegion.getStore().buckets)

Expand Down Expand Up @@ -1576,7 +1576,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
region, err := s.cache.loadRegion(s.bo, []byte("c"), false)
s.Nil(err)
s.Equal(region.GetID(), regions[0])
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)
loc, err = s.cache.LocateKey(s.bo, []byte{'c'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[0])
Expand All @@ -1587,7 +1587,7 @@ func (s *testRegionCacheSuite) TestRemoveIntersectingRegions() {
region, err = s.cache.loadRegion(s.bo, []byte("e"), false)
s.Nil(err)
s.Equal(region.GetID(), regions[0])
s.cache.insertRegionToCache(region)
s.cache.insertRegionToCache(region, true)
loc, err = s.cache.LocateKey(s.bo, []byte{'e'})
s.Nil(err)
s.Equal(loc.Region.GetID(), regions[0])
Expand Down
12 changes: 11 additions & 1 deletion internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,13 +551,23 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
state.lastIdx++
}

reloadRegion := false
for i := 0; i < len(selector.replicas) && !state.option.leaderOnly; i++ {
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
if state.isCandidate(idx, selector.replicas[idx]) {
selectReplica := selector.replicas[idx]
if state.isCandidate(idx, selectReplica) {
state.lastIdx = idx
selector.targetIdx = idx
break
}
if selectReplica.isEpochStale() &&
selectReplica.store.getResolveState() == resolved &&
selectReplica.store.getLivenessState() == reachable {
reloadRegion = true
}
}
if reloadRegion {
selector.regionCache.scheduleReloadRegion(selector.region)
}
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
Expand Down
Loading
Loading