diff --git a/go.mod b/go.mod index a3c777853ac6d..cfaee0249ae0f 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334 + github.com/kisielk/errcheck v1.2.0 // indirect github.com/klauspost/cpuid v1.2.1 github.com/kr/pretty v0.2.0 // indirect github.com/mattn/go-colorable v0.1.7 // indirect diff --git a/go.sum b/go.sum index 21136a84328d0..6510d00a57e95 100644 --- a/go.sum +++ b/go.sum @@ -271,6 +271,7 @@ github.com/juju/ratelimit v1.0.1 h1:+7AIFJVQ0EQgq/K9+0Krm7m530Du7tIz0METWzN0RgY= github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/errcheck v1.2.0 h1:reN85Pxc5larApoH1keMBiu2GWtPqXQ1nc9gx+jOU+E= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w= diff --git a/store/mockstore/mocktikv/pd.go b/store/mockstore/mocktikv/pd.go index 6c589f252d0b3..c9f0777484e6e 100644 --- a/store/mockstore/mocktikv/pd.go +++ b/store/mockstore/mocktikv/pd.go @@ -119,6 +119,7 @@ func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, default: } store := c.cluster.GetStore(storeID) + // It's same as PD's implementation. if store == nil { return nil, fmt.Errorf("invalid store ID %d, not found", storeID) } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index f1262f0d83a79..d6b0d4a07ed4d 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/util" @@ -180,7 +181,7 @@ func (r *RegionStore) kvPeer(seed uint32) AccessIndex { } // init initializes region after constructed. -func (r *Region) init(c *RegionCache) error { +func (r *Region) init(bo *Backoffer, c *RegionCache) error { // region store pull used store from global store map // to avoid acquire storeMu in later access. rs := &RegionStore{ @@ -189,6 +190,7 @@ func (r *Region) init(c *RegionCache) error { stores: make([]*Store, 0, len(r.meta.Peers)), storeEpochs: make([]uint32, 0, len(r.meta.Peers)), } + availablePeers := r.meta.GetPeers()[:0] for _, p := range r.meta.Peers { c.storeMu.RLock() store, exists := c.storeMu.stores[p.StoreId] @@ -196,10 +198,15 @@ func (r *Region) init(c *RegionCache) error { if !exists { store = c.getStoreByStoreID(p.StoreId) } - _, err := store.initResolve(NewNoopBackoff(context.Background()), c) + addr, err := store.initResolve(bo, c) if err != nil { return err } + // Filter the peer on a tombstone store. + if addr == "" { + continue + } + availablePeers = append(availablePeers, p) switch store.storeType { case kv.TiKV: rs.accessIndex[TiKvOnly] = append(rs.accessIndex[TiKvOnly], len(rs.stores)) @@ -209,6 +216,13 @@ func (r *Region) init(c *RegionCache) error { rs.stores = append(rs.stores, store) rs.storeEpochs = append(rs.storeEpochs, atomic.LoadUint32(&store.epoch)) } + // TODO(youjiali1995): It's possible the region info in PD is stale for now but it can recover. + // Maybe we need backoff here. + if len(availablePeers) == 0 { + return errors.Errorf("no available peers, region: {%v}", r.meta) + } + r.meta.Peers = availablePeers + atomic.StorePointer(&r.store, unsafe.Pointer(rs)) // mark region has been init accessed. @@ -294,6 +308,17 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { return c } +// clear clears all cached data in the RegionCache. It's only used in tests. +func (c *RegionCache) clear() { + c.mu.Lock() + c.mu.regions = make(map[RegionVerID]*Region) + c.mu.sorted = btree.New(btreeDegree) + c.mu.Unlock() + c.storeMu.Lock() + c.storeMu.stores = make(map[uint64]*Store) + c.storeMu.Unlock() +} + // Close releases region cache's resource. func (c *RegionCache) Close() { close(c.closeCh) @@ -303,19 +328,21 @@ func (c *RegionCache) Close() { func (c *RegionCache) asyncCheckAndResolveLoop() { var needCheckStores []*Store for { + needCheckStores = needCheckStores[:0] select { case <-c.closeCh: return case <-c.notifyCheckCh: - needCheckStores = needCheckStores[:0] - c.checkAndResolve(needCheckStores) + c.checkAndResolve(needCheckStores, func(s *Store) bool { + return s.getResolveState() == needCheck + }) } } } // checkAndResolve checks and resolve addr of failed stores. // this method isn't thread-safe and only be used by one goroutine. -func (c *RegionCache) checkAndResolve(needCheckStores []*Store) { +func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*Store) bool) { defer func() { r := recover() if r != nil { @@ -327,15 +354,15 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store) { c.storeMu.RLock() for _, store := range c.storeMu.stores { - state := store.getResolveState() - if state == needCheck { + if needCheck(store) { needCheckStores = append(needCheckStores, store) } } c.storeMu.RUnlock() for _, store := range needCheckStores { - store.reResolve(c) + _, err := store.reResolve(c) + terror.Log(err) } } @@ -461,7 +488,8 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC return nil, nil } if store.getResolveState() == needCheck { - store.reResolve(c) + _, err := store.reResolve(c) + terror.Log(err) } atomic.StoreInt32(®ionStore.workTiFlashIdx, int32(accessIdx)) peer := cachedRegion.meta.Peers[storeIdx] @@ -945,9 +973,6 @@ func filterUnavailablePeers(region *pd.Region) { new = append(new, p) } } - for i := len(new); i < len(region.Meta.Peers); i++ { - region.Meta.Peers[i] = nil - } region.Meta.Peers = new } @@ -1000,7 +1025,7 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Reg continue } region := &Region{meta: reg.Meta} - err = region.init(c) + err = region.init(bo, c) if err != nil { return nil, err } @@ -1045,7 +1070,7 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e return nil, errors.New("receive Region with no available peer") } region := &Region{meta: reg.Meta} - err = region.init(c) + err = region.init(bo, c) if err != nil { return nil, err } @@ -1099,7 +1124,7 @@ func (c *RegionCache) scanRegions(bo *Backoffer, startKey, endKey []byte, limit regions := make([]*Region, 0, len(metas)) for i, meta := range metas { region := &Region{meta: meta} - err := region.init(c) + err := region.init(bo, c) if err != nil { return nil, err } @@ -1140,11 +1165,15 @@ func (c *RegionCache) getStoreAddr(bo *Backoffer, region *Region, store *Store, case deleted: addr = c.changeToActiveStore(region, store, storeIdx) return + case tombstone: + return "", nil default: panic("unsupported resolve state") } } +// changeToActiveStore replace the deleted store in the region by an up-to-date store in the stores map. +// The order is guaranteed by reResolve() which adds the new store before marking old store deleted. func (c *RegionCache) changeToActiveStore(region *Region, store *Store, storeIdx int) (addr string) { c.storeMu.RLock() store = c.storeMu.stores[store.storeID] @@ -1207,7 +1236,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr } } region := &Region{meta: meta} - err := region.init(c) + err := region.init(bo, c) if err != nil { return err } @@ -1460,19 +1489,31 @@ type Store struct { type resolveState uint64 const ( + // The store is just created and normally is being resolved. + // Store in this state will only be resolved by initResolve(). unresolved resolveState = iota + // The store is resolved and its address is valid. resolved + // Request failed on this store and it will be re-resolved by asyncCheckAndResolveLoop(). needCheck + // The store's address or label is changed and marked deleted. + // There is a new store struct replaced it in the RegionCache and should + // call changeToActiveStore() to get the new struct. deleted + // The store is a tombstone. Should invalidate the region if tries to access it. + tombstone ) -// initResolve resolves addr for store that never resolved. +// initResolve resolves the address of the store that never resolved and returns an +// empty string if it's a tombstone. func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err error) { s.resolveMutex.Lock() state := s.getResolveState() defer s.resolveMutex.Unlock() if state != unresolved { - addr = s.addr + if state != tombstone { + addr = s.addr + } return } var store *metapb.Store @@ -1483,34 +1524,32 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err } else { tikvRegionCacheCounterWithGetStoreOK.Inc() } + if bo.ctx.Err() != nil && errors.Cause(bo.ctx.Err()) == context.Canceled { + return + } if err != nil && !isStoreNotFoundError(err) { // TODO: more refine PD error status handle. - if errors.Cause(err) == context.Canceled { - return - } err = errors.Errorf("loadStore from PD failed, id: %d, err: %v", s.storeID, err) if err = bo.Backoff(BoPDRPC, err); err != nil { return } continue } + // The store is a tombstone. if store == nil { + s.setResolveState(tombstone) return "", nil } addr = store.GetAddress() + if addr == "" { + return "", errors.Errorf("empty store(%d) address", s.storeID) + } s.addr = addr s.saddr = store.GetStatusAddress() s.storeType = GetStoreTypeByMeta(store) - retry: - state = s.getResolveState() - if state != unresolved { - addr = s.addr - return - } - if !s.compareAndSwapState(state, resolved) { - goto retry - } - return + // Shouldn't have other one changing its state concurrently, but we still use changeResolveStateTo for safety. + s.changeResolveStateTo(unresolved, resolved) + return s.addr, nil } } @@ -1534,8 +1573,9 @@ func isStoreNotFoundError(err error) bool { return strings.Contains(err.Error(), "invalid store ID") && strings.Contains(err.Error(), "not found") } -// reResolve try to resolve addr for store that need check. -func (s *Store) reResolve(c *RegionCache) { +// reResolve try to resolve addr for store that need check. Returns false if the region is in tombstone state or is +// deleted. +func (s *Store) reResolve(c *RegionCache) (bool, error) { var addr string store, err := c.pdClient.GetStore(context.Background(), s.storeID) if err != nil { @@ -1549,48 +1589,30 @@ func (s *Store) reResolve(c *RegionCache) { if err != nil && !isStoreNotFoundError(err) { logutil.BgLogger().Error("loadStore from PD failed", zap.Uint64("id", s.storeID), zap.Error(err)) // we cannot do backoff in reResolve loop but try check other store and wait tick. - return + return false, err } if store == nil { // store has be removed in PD, we should invalidate all regions using those store. logutil.BgLogger().Info("invalidate regions in removed store", zap.Uint64("store", s.storeID), zap.String("add", s.addr)) atomic.AddUint32(&s.epoch, 1) - atomic.StoreUint64(&s.state, uint64(deleted)) + s.setResolveState(tombstone) tikvRegionCacheCounterWithInvalidateStoreRegionsOK.Inc() - return + return false, nil } storeType := GetStoreTypeByMeta(store) addr = store.GetAddress() if s.addr != addr { - state := resolved - newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType} - newStore.state = *(*uint64)(&state) + newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, state: uint64(resolved)} c.storeMu.Lock() c.storeMu.stores[newStore.storeID] = newStore c.storeMu.Unlock() - retryMarkDel: - // all region used those - oldState := s.getResolveState() - if oldState == deleted { - return - } - newState := deleted - if !s.compareAndSwapState(oldState, newState) { - goto retryMarkDel - } - return - } -retryMarkResolved: - oldState := s.getResolveState() - if oldState != needCheck { - return - } - newState := resolved - if !s.compareAndSwapState(oldState, newState) { - goto retryMarkResolved + s.setResolveState(deleted) + return false, nil } + s.changeResolveStateTo(needCheck, resolved) + return true, nil } func (s *Store) getResolveState() resolveState { @@ -1601,23 +1623,35 @@ func (s *Store) getResolveState() resolveState { return resolveState(atomic.LoadUint64(&s.state)) } -func (s *Store) compareAndSwapState(oldState, newState resolveState) bool { - return atomic.CompareAndSwapUint64(&s.state, uint64(oldState), uint64(newState)) +func (s *Store) setResolveState(state resolveState) { + atomic.StoreUint64(&s.state, uint64(state)) +} + +// changeResolveStateTo changes the store resolveState from the old state to the new state. +// Returns true if it changes the state successfully, and false if the store's state +// is changed by another one. +func (s *Store) changeResolveStateTo(from, to resolveState) bool { + for { + state := s.getResolveState() + if state == to { + return true + } + if state != from { + return false + } + if atomic.CompareAndSwapUint64(&s.state, uint64(from), uint64(to)) { + return true + } + } } // markNeedCheck marks resolved store to be async resolve to check store addr change. func (s *Store) markNeedCheck(notifyCheckCh chan struct{}) { -retry: - oldState := s.getResolveState() - if oldState != resolved { - return - } - if !s.compareAndSwapState(oldState, needCheck) { - goto retry - } - select { - case notifyCheckCh <- struct{}{}: - default: + if s.changeResolveStateTo(resolved, needCheck) { + select { + case notifyCheckCh <- struct{}{}: + default: + } } } diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 68107d5bee4a6..b9aafe6e084a9 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -153,32 +153,174 @@ func (s *testRegionCacheSuite) TestSimple(c *C) { c.Assert(r, IsNil) } -func (s *testRegionCacheSuite) TestDropStore(c *C) { - bo := NewBackofferWithVars(context.Background(), 100, nil) +// TestResolveStateTransition verifies store's resolve state transition. For example, +// a newly added store is in unresolved state and will be resolved soon if it's an up store, +// or in tombstone state if it's a tombstone. +func (s *testRegionCacheSuite) TestResolveStateTransition(c *C) { + cache := s.cache + bo := NewNoopBackoff(context.Background()) + + // Check resolving normal stores. The resolve state should be resolved. + for _, storeMeta := range s.cluster.GetAllStores() { + store := cache.getStoreByStoreID(storeMeta.GetId()) + c.Assert(store.getResolveState(), Equals, unresolved) + addr, err := store.initResolve(bo, cache) + c.Assert(err, IsNil) + c.Assert(addr, Equals, storeMeta.GetAddress()) + c.Assert(store.getResolveState(), Equals, resolved) + } + + waitResolve := func(s *Store) { + for i := 0; i < 10; i++ { + if s.getResolveState() != needCheck { + break + } + time.Sleep(50 * time.Millisecond) + } + } + + // Mark the store needCheck. The resolve state should be resolved soon. + store := cache.getStoreByStoreID(s.store1) + store.markNeedCheck(cache.notifyCheckCh) + waitResolve(store) + c.Assert(store.getResolveState(), Equals, resolved) + + // Mark the store needCheck and it becomes a tombstone. The resolve state should be tombstone. + s.cluster.MarkTombstone(s.store1) + store.markNeedCheck(cache.notifyCheckCh) + waitResolve(store) + c.Assert(store.getResolveState(), Equals, tombstone) + s.cluster.StartStore(s.store1) + + // Mark the store needCheck and it's deleted from PD. The resolve state should be tombstone. + cache.clear() + store = cache.getStoreByStoreID(s.store1) + store.initResolve(bo, cache) + c.Assert(store.getResolveState(), Equals, resolved) + storeMeta := s.cluster.GetStore(s.store1) s.cluster.RemoveStore(s.store1) - loc, err := s.cache.LocateKey(bo, []byte("a")) - c.Assert(err, IsNil) - ctx, err := s.cache.GetTiKVRPCContext(bo, loc.Region, kv.ReplicaReadLeader, 0) + store.markNeedCheck(cache.notifyCheckCh) + waitResolve(store) + c.Assert(store.getResolveState(), Equals, tombstone) + s.cluster.AddStore(storeMeta.GetId(), storeMeta.GetAddress()) + + // Mark the store needCheck and its address and labels are changed. + // The resolve state should be deleted and a new store is added to the cache. + cache.clear() + store = cache.getStoreByStoreID(s.store1) + store.initResolve(bo, cache) + c.Assert(store.getResolveState(), Equals, resolved) + s.cluster.UpdateStoreAddr(s.store1, store.addr+"0", &metapb.StoreLabel{Key: "k", Value: "v"}) + store.markNeedCheck(cache.notifyCheckCh) + waitResolve(store) + c.Assert(store.getResolveState(), Equals, deleted) + newStore := cache.getStoreByStoreID(s.store1) + c.Assert(newStore.getResolveState(), Equals, resolved) + c.Assert(newStore.addr, Equals, store.addr+"0") + + // Check initResolve()ing a tombstone store. The resolve state should be tombstone. + cache.clear() + s.cluster.MarkTombstone(s.store1) + store = cache.getStoreByStoreID(s.store1) + for i := 0; i < 2; i++ { + addr, err := store.initResolve(bo, cache) + c.Assert(err, IsNil) + c.Assert(addr, Equals, "") + c.Assert(store.getResolveState(), Equals, tombstone) + } + s.cluster.StartStore(s.store1) + cache.clear() + + // Check initResolve()ing a dropped store. The resolve state should be tombstone. + cache.clear() + storeMeta = s.cluster.GetStore(s.store1) + s.cluster.RemoveStore(s.store1) + store = cache.getStoreByStoreID(s.store1) + for i := 0; i < 2; i++ { + addr, err := store.initResolve(bo, cache) + c.Assert(err, IsNil) + c.Assert(addr, Equals, "") + c.Assert(store.getResolveState(), Equals, tombstone) + } +} + +// TestFilterDownPeersOrPeersOnTombstoneOrDroppedStore verifies the RegionCache filter +// region's down peers and peers on tombstone or dropped stores. RegionCache shouldn't +// report errors in such cases if there are available peers. +func (s *testRegionCacheSuite) TestFilterDownPeersOrPeersOnTombstoneOrDroppedStores(c *C) { + key := []byte("a") + bo := NewBackofferWithVars(context.Background(), 100, nil) + + verifyGetRPCCtx := func(meta *metapb.Region) { + loc, err := s.cache.LocateKey(bo, key) + c.Assert(loc, NotNil) + c.Assert(err, IsNil) + ctx, err := s.cache.GetTiKVRPCContext(bo, loc.Region, kv.ReplicaReadLeader, 0) + c.Assert(err, IsNil) + c.Assert(ctx, NotNil) + c.Assert(ctx.Meta, DeepEquals, meta) + ctx, err = s.cache.GetTiKVRPCContext(bo, loc.Region, kv.ReplicaReadFollower, rand.Uint32()) + c.Assert(err, IsNil) + c.Assert(ctx, NotNil) + c.Assert(ctx.Meta, DeepEquals, meta) + } + + // When all peers are normal, the cached region should contain all peers. + reg, err := s.cache.findRegionByKey(bo, key, false) + c.Assert(reg, NotNil) c.Assert(err, IsNil) - c.Assert(ctx, IsNil) - ctx, err = s.cache.GetTiKVRPCContext(bo, loc.Region, kv.ReplicaReadFollower, rand.Uint32()) + regInPD, _ := s.cluster.GetRegion(reg.GetID()) + c.Assert(reg.meta, DeepEquals, regInPD) + c.Assert(len(reg.meta.GetPeers()), Equals, len(reg.getStore().stores)) + verifyGetRPCCtx(reg.meta) + s.checkCache(c, 1) + s.cache.clear() + + // Shouldn't contain the peer on the tombstone store. + s.cluster.MarkTombstone(s.store1) + reg, err = s.cache.findRegionByKey(bo, key, false) + c.Assert(reg, NotNil) c.Assert(err, IsNil) - c.Assert(ctx, IsNil) - s.checkCache(c, 0) -} + c.Assert(len(reg.meta.GetPeers()), Equals, len(regInPD.GetPeers())-1) + c.Assert(len(reg.meta.GetPeers()), Equals, len(reg.getStore().stores)) + for _, peer := range reg.meta.GetPeers() { + c.Assert(peer.GetStoreId(), Not(Equals), s.store1) + } + for _, store := range reg.getStore().stores { + c.Assert(store.storeID, Not(Equals), s.store1) + } + verifyGetRPCCtx(reg.meta) + s.checkCache(c, 1) + s.cache.clear() + s.cluster.StartStore(s.store1) -func (s *testRegionCacheSuite) TestDropStoreRetry(c *C) { + // Shouldn't contain the peer on the dropped store. + store := s.cluster.GetStore(s.store1) s.cluster.RemoveStore(s.store1) - done := make(chan struct{}) - go func() { - time.Sleep(time.Millisecond * 10) - s.cluster.AddStore(s.store1, s.storeAddr(s.store1)) - close(done) - }() - loc, err := s.cache.LocateKey(s.bo, []byte("a")) + reg, err = s.cache.findRegionByKey(bo, key, false) + c.Assert(reg, NotNil) c.Assert(err, IsNil) - c.Assert(loc.Region.id, Equals, s.region1) - <-done + c.Assert(len(reg.meta.GetPeers()), Equals, len(regInPD.GetPeers())-1) + c.Assert(len(reg.meta.GetPeers()), Equals, len(reg.getStore().stores)) + for _, peer := range reg.meta.GetPeers() { + c.Assert(peer.GetStoreId(), Not(Equals), s.store1) + } + for _, store := range reg.getStore().stores { + c.Assert(store.storeID, Not(Equals), s.store1) + } + verifyGetRPCCtx(reg.meta) + s.checkCache(c, 1) + s.cache.clear() + s.cluster.AddStore(store.GetId(), store.GetAddress()) + + // Report an error when there's no available peers. + s.cluster.MarkTombstone(s.store1) + s.cluster.MarkTombstone(s.store2) + _, err = s.cache.findRegionByKey(bo, key, false) + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, ".*no available peers.*") + s.cluster.StartStore(s.store1) + s.cluster.StartStore(s.store2) } func (s *testRegionCacheSuite) TestUpdateLeader(c *C) { @@ -284,7 +426,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { c.Assert(err, IsNil) c.Assert(ctx.Addr, Equals, "store2") s.cache.OnSendFail(NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail")) - s.cache.checkAndResolve(nil) + s.cache.checkAndResolve(nil, func(*Store) bool { return true }) s.cache.UpdateLeader(loc.Region, s.store2, 0) addr := s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0) c.Assert(addr, Equals, "") @@ -1152,7 +1294,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange(c *C) { } filterUnavailablePeers(cpRegion) region := &Region{meta: cpRegion.Meta} - err = region.init(s.cache) + err = region.init(s.bo, s.cache) c.Assert(err, IsNil) s.cache.insertRegionToCache(region)