Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

region_cache: filter peers on tombstone or dropped stores #24726

Merged
merged 3 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions store/tikv/mockstore/mocktikv/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package mocktikv

import (
"context"
"fmt"
"math"
"sync"
"time"
Expand Down Expand Up @@ -126,6 +127,13 @@ func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store,
default:
}
store := c.cluster.GetStore(storeID)
// It's same as PD's implementation.
if store == nil {
return nil, fmt.Errorf("invalid store ID %d, not found", storeID)
}
if store.GetState() == metapb.StoreState_Tombstone {
return nil, nil
}
return store, nil
}

Expand Down
183 changes: 104 additions & 79 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (r *RegionStore) filterStoreCandidate(aidx AccessIndex, op *storeSelectorOp
}

// init initializes region after constructed.
func (r *Region) init(c *RegionCache) error {
func (r *Region) init(bo *Backoffer, c *RegionCache) error {
// region store pull used store from global store map
// to avoid acquire storeMu in later access.
rs := &RegionStore{
Expand All @@ -188,17 +188,23 @@ func (r *Region) init(c *RegionCache) error {
stores: make([]*Store, 0, len(r.meta.Peers)),
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
}
availablePeers := r.meta.GetPeers()[:0]
for _, p := range r.meta.Peers {
c.storeMu.RLock()
store, exists := c.storeMu.stores[p.StoreId]
c.storeMu.RUnlock()
if !exists {
store = c.getStoreByStoreID(p.StoreId)
}
_, err := store.initResolve(retry.NewNoopBackoff(context.Background()), c)
addr, err := store.initResolve(bo, c)
if err != nil {
return err
}
// Filter the peer on a tombstone store.
if addr == "" {
continue
}
availablePeers = append(availablePeers, p)
switch store.storeType {
case tikvrpc.TiKV:
rs.accessIndex[TiKVOnly] = append(rs.accessIndex[TiKVOnly], len(rs.stores))
Expand All @@ -208,6 +214,13 @@ func (r *Region) init(c *RegionCache) error {
rs.stores = append(rs.stores, store)
rs.storeEpochs = append(rs.storeEpochs, atomic.LoadUint32(&store.epoch))
}
// TODO(youjiali1995): It's possible the region info in PD is stale for now but it can recover.
// Maybe we need backoff here.
if len(availablePeers) == 0 {
return errors.Errorf("no available peers, region: {%v}", r.meta)
}
r.meta.Peers = availablePeers

atomic.StorePointer(&r.store, unsafe.Pointer(rs))

// mark region has been init accessed.
Expand Down Expand Up @@ -312,6 +325,18 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
return c
}

// clear clears all cached data in the RegionCache. It's only used in tests.
func (c *RegionCache) clear() {
c.mu.Lock()
c.mu.regions = make(map[RegionVerID]*Region)
c.mu.latestVersions = make(map[uint64]RegionVerID)
c.mu.sorted = btree.New(btreeDegree)
c.mu.Unlock()
c.storeMu.Lock()
c.storeMu.stores = make(map[uint64]*Store)
c.storeMu.Unlock()
}

// Close releases region cache's resource.
func (c *RegionCache) Close() {
close(c.closeCh)
Expand All @@ -323,32 +348,29 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
defer ticker.Stop()
var needCheckStores []*Store
for {
needCheckStores = needCheckStores[:0]
select {
case <-c.closeCh:
return
case <-c.notifyCheckCh:
needCheckStores = needCheckStores[:0]
c.checkAndResolve(needCheckStores)
c.checkAndResolve(needCheckStores, func(s *Store) bool {
return s.getResolveState() == needCheck
})
case <-ticker.C:
// refresh store once a minute to update labels
var stores []*Store
c.storeMu.RLock()
stores = make([]*Store, 0, len(c.storeMu.stores))
for _, s := range c.storeMu.stores {
stores = append(stores, s)
}
c.storeMu.RUnlock()
for _, store := range stores {
_, err := store.reResolve(c)
terror.Log(err)
}
// refresh store to update labels.
c.checkAndResolve(needCheckStores, func(s *Store) bool {
state := s.getResolveState()
// Only valid stores should be reResolved. In fact, it's impossible
// there's a deleted store in the stores map which guaranteed by reReslve().
return state != unresolved && state != tombstone && state != deleted
})
}
}
}

// checkAndResolve checks and resolve addr of failed stores.
// this method isn't thread-safe and only be used by one goroutine.
func (c *RegionCache) checkAndResolve(needCheckStores []*Store) {
func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*Store) bool) {
defer func() {
r := recover()
if r != nil {
Expand All @@ -360,8 +382,7 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store) {

c.storeMu.RLock()
for _, store := range c.storeMu.stores {
state := store.getResolveState()
if state == needCheck {
if needCheck(store) {
needCheckStores = append(needCheckStores, store)
}
}
Expand Down Expand Up @@ -1116,9 +1137,6 @@ func filterUnavailablePeers(region *pd.Region) {
new = append(new, p)
}
}
for i := len(new); i < len(region.Meta.Peers); i++ {
region.Meta.Peers[i] = nil
}
region.Meta.Peers = new
}

Expand Down Expand Up @@ -1171,7 +1189,7 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Reg
continue
}
region := &Region{meta: reg.Meta}
err = region.init(c)
err = region.init(bo, c)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1216,7 +1234,7 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e
return nil, errors.New("receive Region with no available peer")
}
region := &Region{meta: reg.Meta}
err = region.init(c)
err = region.init(bo, c)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1267,7 +1285,7 @@ func (c *RegionCache) scanRegions(bo *Backoffer, startKey, endKey []byte, limit
regions := make([]*Region, 0, len(regionsInfo))
for _, r := range regionsInfo {
region := &Region{meta: r.Meta}
err := region.init(c)
err := region.init(bo, c)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1308,6 +1326,8 @@ func (c *RegionCache) getStoreAddr(bo *Backoffer, region *Region, store *Store,
case deleted:
addr = c.changeToActiveStore(region, store, storeIdx)
return
case tombstone:
return "", nil
default:
panic("unsupported resolve state")
}
Expand Down Expand Up @@ -1355,6 +1375,8 @@ func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *RegionStor
return nil, 0, 0
}

// changeToActiveStore replace the deleted store in the region by an up-to-date store in the stores map.
// The order is guaranteed by reResolve() which adds the new store before marking old store deleted.
func (c *RegionCache) changeToActiveStore(region *Region, store *Store, storeIdx int) (addr string) {
c.storeMu.RLock()
store = c.storeMu.stores[store.storeID]
Expand Down Expand Up @@ -1429,7 +1451,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr
}
}
region := &Region{meta: meta}
err := region.init(c)
err := region.init(bo, c)
if err != nil {
return err
}
Expand Down Expand Up @@ -1759,19 +1781,31 @@ type Store struct {
type resolveState uint64

const (
// The store is just created and normally is being resolved.
// Store in this state will only be resolved by initResolve().
unresolved resolveState = iota
// The store is resolved and its address is valid.
resolved
// Request failed on this store and it will be re-resolved by asyncCheckAndResolveLoop().
needCheck
// The store's address or label is changed and marked deleted.
// There is a new store struct replaced it in the RegionCache and should
// call changeToActiveStore() to get the new struct.
deleted
// The store is a tombstone. Should invalidate the region if tries to access it.
tombstone
youjiali1995 marked this conversation as resolved.
Show resolved Hide resolved
)

// initResolve resolves addr for store that never resolved.
// initResolve resolves the address of the store that never resolved and returns an
// emptry string if it's a tombstone.
youjiali1995 marked this conversation as resolved.
Show resolved Hide resolved
func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err error) {
s.resolveMutex.Lock()
state := s.getResolveState()
defer s.resolveMutex.Unlock()
if state != unresolved {
addr = s.addr
if state != tombstone {
addr = s.addr
}
return
}
var store *metapb.Store
Expand All @@ -1782,35 +1816,33 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err
} else {
metrics.RegionCacheCounterWithGetStoreOK.Inc()
}
if err != nil {
if bo.GetCtx().Err() != nil && errors.Cause(bo.GetCtx().Err()) == context.Canceled {
return
}
if err != nil && !isStoreNotFoundError(err) {
// TODO: more refine PD error status handle.
if errors.Cause(err) == context.Canceled {
return
}
err = errors.Errorf("loadStore from PD failed, id: %d, err: %v", s.storeID, err)
if err = bo.Backoff(retry.BoPDRPC, err); err != nil {
return
}
continue
}
// The store is a tombstone.
if store == nil {
return
s.setResolveState(tombstone)
return "", nil
}
addr = store.GetAddress()
if addr == "" {
return "", errors.Errorf("empty store(%d) address", s.storeID)
}
s.addr = addr
s.saddr = store.GetStatusAddress()
s.storeType = GetStoreTypeByMeta(store)
s.labels = store.GetLabels()
retry:
state = s.getResolveState()
if state != unresolved {
addr = s.addr
return
}
if !s.compareAndSwapState(state, resolved) {
goto retry
}
return
// Shouldn't have other one changing its state concurrently, but we still use changeResolveStateTo for safety.
s.changeResolveStateTo(unresolved, resolved)
return s.addr, nil
}
}

Expand Down Expand Up @@ -1843,41 +1875,22 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
logutil.BgLogger().Info("invalidate regions in removed store",
zap.Uint64("store", s.storeID), zap.String("add", s.addr))
atomic.AddUint32(&s.epoch, 1)
atomic.StoreUint64(&s.state, uint64(deleted))
s.setResolveState(tombstone)
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
return false, nil
}

storeType := GetStoreTypeByMeta(store)
addr = store.GetAddress()
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) {
state := resolved
newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels()}
newStore.state = *(*uint64)(&state)
newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
c.storeMu.Lock()
c.storeMu.stores[newStore.storeID] = newStore
c.storeMu.Unlock()
retryMarkDel:
// all region used those
oldState := s.getResolveState()
if oldState == deleted {
return false, nil
}
newState := deleted
if !s.compareAndSwapState(oldState, newState) {
goto retryMarkDel
}
s.setResolveState(deleted)
return false, nil
}
retryMarkResolved:
oldState := s.getResolveState()
if oldState != needCheck {
return true, nil
}
newState := resolved
if !s.compareAndSwapState(oldState, newState) {
goto retryMarkResolved
}
s.changeResolveStateTo(needCheck, resolved)
return true, nil
}

Expand All @@ -1889,23 +1902,35 @@ func (s *Store) getResolveState() resolveState {
return resolveState(atomic.LoadUint64(&s.state))
}

func (s *Store) compareAndSwapState(oldState, newState resolveState) bool {
return atomic.CompareAndSwapUint64(&s.state, uint64(oldState), uint64(newState))
func (s *Store) setResolveState(state resolveState) {
atomic.StoreUint64(&s.state, uint64(state))
}

// changeResolveStateTo changes the store resolveState from the old state to the new state.
// Returns true if it changes the state successfully, and false if the store's state
// is changed by another one.
func (s *Store) changeResolveStateTo(from, to resolveState) bool {
for {
state := s.getResolveState()
if state == to {
return true
}
if state != from {
return false
}
if atomic.CompareAndSwapUint64(&s.state, uint64(from), uint64(to)) {
return true
}
}
}

// markNeedCheck marks resolved store to be async resolve to check store addr change.
func (s *Store) markNeedCheck(notifyCheckCh chan struct{}) {
retry:
oldState := s.getResolveState()
if oldState != resolved {
return
}
if !s.compareAndSwapState(oldState, needCheck) {
goto retry
}
select {
case notifyCheckCh <- struct{}{}:
default:
if s.changeResolveStateTo(resolved, needCheck) {
select {
case notifyCheckCh <- struct{}{}:
default:
}
}
}

Expand Down
Loading