diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index a11475e1e98e..e4c9938afa1b 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -806,17 +806,15 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { }) var overlaps []*core.RegionInfo - c.Lock() if saveCache { // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, // check its validation again here. // // However it can't solve the race condition of concurrent heartbeats from the same region. - if _, err := c.core.PreCheckPutRegion(region); err != nil { - c.Unlock() + if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil { return err } - overlaps = c.core.PutRegion(region) + for _, item := range overlaps { if c.regionStats != nil { c.regionStats.ClearDefunctRegion(item.GetID()) @@ -835,21 +833,19 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } } for key := range storeMap { - c.updateStoreStatusLocked(key) + c.core.UpdateStoreStatus(key) } - regionEventCounter.WithLabelValues("update_cache").Inc() - } - if !c.IsPrepared() && isNew { - c.coordinator.prepareChecker.collect(region) + regionEventCounter.WithLabelValues("update_cache").Inc() } if c.regionStats != nil { c.regionStats.Observe(region, c.getRegionStoresLocked(region)) } - changedRegions := c.changedRegions - c.Unlock() + if !c.IsPrepared() && isNew { + c.coordinator.prepareChecker.collect(region) + } if c.storage != nil { // If there are concurrent heartbeats from the same region, the last write will win even if @@ -877,7 +873,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { if saveKV || needSync { select { - case changedRegions <- region: + case c.changedRegions <- region: default: } } @@ -885,15 +881,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { return nil } -func (c *RaftCluster) updateStoreStatusLocked(id uint64) { - leaderCount := c.core.GetStoreLeaderCount(id) - regionCount := c.core.GetStoreRegionCount(id) - pendingPeerCount := c.core.GetStorePendingPeerCount(id) - leaderRegionSize := c.core.GetStoreLeaderRegionSize(id) - regionSize := c.core.GetStoreRegionSize(id) - c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize) -} - func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error { if c.storage != nil { if err := c.storage.SaveMeta(meta); err != nil { diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index c630434c8cd8..cf13b32e46fe 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -643,25 +643,25 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { for i, region := range regions { // region does not exist. c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) checkRegionsKV(c, cluster.storage, regions[:i+1]) // region is the same, not updated. c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) checkRegionsKV(c, cluster.storage, regions[:i+1]) origin := region // region is updated. region = origin.Clone(core.WithIncVersion()) regions[i] = region c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) checkRegionsKV(c, cluster.storage, regions[:i+1]) // region is stale (Version). stale := origin.Clone(core.WithIncConfVer()) c.Assert(cluster.processRegionHeartbeat(stale), NotNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) checkRegionsKV(c, cluster.storage, regions[:i+1]) // region is updated. @@ -671,13 +671,13 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { ) regions[i] = region c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) checkRegionsKV(c, cluster.storage, regions[:i+1]) // region is stale (ConfVer). stale = origin.Clone(core.WithIncConfVer()) c.Assert(cluster.processRegionHeartbeat(stale), NotNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) checkRegionsKV(c, cluster.storage, regions[:i+1]) // Add a down peer. @@ -689,69 +689,69 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { })) regions[i] = region c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Add a pending peer. region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetPeers()[rand.Intn(len(region.GetPeers()))]})) regions[i] = region c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Clear down peers. region = region.Clone(core.WithDownPeers(nil)) regions[i] = region c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Clear pending peers. region = region.Clone(core.WithPendingPeers(nil)) regions[i] = region c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Remove peers. origin = region region = origin.Clone(core.SetPeers(region.GetPeers()[:1])) regions[i] = region c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) checkRegionsKV(c, cluster.storage, regions[:i+1]) // Add peers. region = origin regions[i] = region c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) checkRegionsKV(c, cluster.storage, regions[:i+1]) // Change leader. region = region.Clone(core.WithLeader(region.GetPeers()[1])) regions[i] = region c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Change ApproximateSize. region = region.Clone(core.SetApproximateSize(144)) regions[i] = region c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Change ApproximateKeys. region = region.Clone(core.SetApproximateKeys(144000)) regions[i] = region c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Change bytes written. region = region.Clone(core.SetWrittenBytes(24000)) regions[i] = region c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Change bytes read. region = region.Clone(core.SetReadBytes(1080000)) regions[i] = region c.Assert(cluster.processRegionHeartbeat(region), IsNil) - checkRegions(c, cluster.core.Regions, regions[:i+1]) + checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1]) } regionCounts := make(map[uint64]int) @@ -1538,7 +1538,7 @@ func (s *testRegionsInfoSuite) Test(c *C) { _, opts, err := newTestScheduleConfig() c.Assert(err, IsNil) tc := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opts, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) - cache := tc.core.Regions + cache := tc.core.Regions.RegionsInfo for i := uint64(0); i < n; i++ { region := regions[i] diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index 95917ec34e25..eee97c11d11d 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -28,72 +28,59 @@ import ( // BasicCluster provides basic data member and interface for a tikv cluster. type BasicCluster struct { - syncutil.RWMutex - Stores *StoresInfo - Regions *RegionsInfo + Stores struct { + mu syncutil.RWMutex + *StoresInfo + } + + Regions struct { + mu syncutil.RWMutex + *RegionsInfo + } } // NewBasicCluster creates a BasicCluster. func NewBasicCluster() *BasicCluster { return &BasicCluster{ - Stores: NewStoresInfo(), - Regions: NewRegionsInfo(), + Stores: struct { + mu syncutil.RWMutex + *StoresInfo + }{StoresInfo: NewStoresInfo()}, + + Regions: struct { + mu syncutil.RWMutex + *RegionsInfo + }{RegionsInfo: NewRegionsInfo()}, } } +/* Stores read operations */ + // GetStores returns all Stores in the cluster. func (bc *BasicCluster) GetStores() []*StoreInfo { - bc.RLock() - defer bc.RUnlock() + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() return bc.Stores.GetStores() } // GetMetaStores gets a complete set of metapb.Store. func (bc *BasicCluster) GetMetaStores() []*metapb.Store { - bc.RLock() - defer bc.RUnlock() + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() return bc.Stores.GetMetaStores() } // GetStore searches for a store by ID. func (bc *BasicCluster) GetStore(storeID uint64) *StoreInfo { - bc.RLock() - defer bc.RUnlock() + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() return bc.Stores.GetStore(storeID) } -// GetRegion searches for a region by ID. -func (bc *BasicCluster) GetRegion(regionID uint64) *RegionInfo { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetRegion(regionID) -} - -// GetRegions gets all RegionInfo from regionMap. -func (bc *BasicCluster) GetRegions() []*RegionInfo { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetRegions() -} - -// GetMetaRegions gets a set of metapb.Region from regionMap. -func (bc *BasicCluster) GetMetaRegions() []*metapb.Region { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetMetaRegions() -} - -// GetStoreRegions gets all RegionInfo with a given storeID. -func (bc *BasicCluster) GetStoreRegions(storeID uint64) []*RegionInfo { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetStoreRegions(storeID) -} - // GetRegionStores returns all Stores that contains the region's peer. func (bc *BasicCluster) GetRegionStores(region *RegionInfo) []*StoreInfo { - bc.RLock() - defer bc.RUnlock() + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() var Stores []*StoreInfo for id := range region.GetStoreIds() { if store := bc.Stores.GetStore(id); store != nil { @@ -105,8 +92,8 @@ func (bc *BasicCluster) GetRegionStores(region *RegionInfo) []*StoreInfo { // GetFollowerStores returns all Stores that contains the region's follower peer. func (bc *BasicCluster) GetFollowerStores(region *RegionInfo) []*StoreInfo { - bc.RLock() - defer bc.RUnlock() + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() var Stores []*StoreInfo for id := range region.GetFollowers() { if store := bc.Stores.GetStore(id); store != nil { @@ -116,114 +103,185 @@ func (bc *BasicCluster) GetFollowerStores(region *RegionInfo) []*StoreInfo { return Stores } -// GetLeaderStoreByRegionID returns the leader store of the given region. -func (bc *BasicCluster) GetLeaderStoreByRegionID(regionID uint64) *StoreInfo { - bc.RLock() - defer bc.RUnlock() - region := bc.Regions.GetRegion(regionID) - if region == nil || region.GetLeader() == nil { - return nil - } - return bc.Stores.GetStore(region.GetLeader().GetStoreId()) -} - // GetLeaderStore returns all Stores that contains the region's leader peer. func (bc *BasicCluster) GetLeaderStore(region *RegionInfo) *StoreInfo { - bc.RLock() - defer bc.RUnlock() + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() return bc.Stores.GetStore(region.GetLeader().GetStoreId()) } -// GetAdjacentRegions returns region's info that is adjacent with specific region. -func (bc *BasicCluster) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetAdjacentRegions(region) +// GetStoreCount returns the total count of storeInfo. +func (bc *BasicCluster) GetStoreCount() int { + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() + return bc.Stores.GetStoreCount() } -// GetRangeHoles returns all range holes, i.e the key ranges without any region info. -func (bc *BasicCluster) GetRangeHoles() [][]string { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetRangeHoles() -} +/* Stores Write operations */ // PauseLeaderTransfer prevents the store from been selected as source or // target store of TransferLeader. func (bc *BasicCluster) PauseLeaderTransfer(storeID uint64) error { - bc.Lock() - defer bc.Unlock() + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() return bc.Stores.PauseLeaderTransfer(storeID) } // ResumeLeaderTransfer cleans a store's pause state. The store can be selected // as source or target of TransferLeader again. func (bc *BasicCluster) ResumeLeaderTransfer(storeID uint64) { - bc.Lock() - defer bc.Unlock() + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() bc.Stores.ResumeLeaderTransfer(storeID) } // SlowStoreEvicted marks a store as a slow store and prevents transferring // leader to the store func (bc *BasicCluster) SlowStoreEvicted(storeID uint64) error { - bc.Lock() - defer bc.Unlock() + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() return bc.Stores.SlowStoreEvicted(storeID) } // SlowStoreRecovered cleans the evicted state of a store. func (bc *BasicCluster) SlowStoreRecovered(storeID uint64) { - bc.Lock() - defer bc.Unlock() + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() bc.Stores.SlowStoreRecovered(storeID) } // ResetStoreLimit resets the limit for a specific store. func (bc *BasicCluster) ResetStoreLimit(storeID uint64, limitType storelimit.Type, ratePerSec ...float64) { - bc.Lock() - defer bc.Unlock() + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() bc.Stores.ResetStoreLimit(storeID, limitType, ratePerSec...) } // UpdateStoreStatus updates the information of the store. -func (bc *BasicCluster) UpdateStoreStatus(storeID uint64, leaderCount int, regionCount int, pendingPeerCount int, leaderSize int64, regionSize int64) { - bc.Lock() - defer bc.Unlock() - bc.Stores.UpdateStoreStatus(storeID, leaderCount, regionCount, pendingPeerCount, leaderSize, regionSize) +func (bc *BasicCluster) UpdateStoreStatus(storeID uint64) { + bc.Regions.mu.RLock() + leaderCount := bc.Regions.GetStoreLeaderCount(storeID) + regionCount := bc.Regions.GetStoreRegionCount(storeID) + pendingPeerCount := bc.Regions.GetStorePendingPeerCount(storeID) + leaderRegionSize := bc.Regions.GetStoreLeaderRegionSize(storeID) + regionSize := bc.Regions.GetStoreRegionSize(storeID) + bc.Regions.mu.RUnlock() + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() + bc.Stores.UpdateStoreStatus(storeID, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize) +} + +// PutStore put a store. +func (bc *BasicCluster) PutStore(store *StoreInfo) { + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() + bc.Stores.SetStore(store) +} + +// ResetStores resets the store cache. +func (bc *BasicCluster) ResetStores() { + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() + bc.Stores.StoresInfo = NewStoresInfo() +} + +// DeleteStore deletes a store. +func (bc *BasicCluster) DeleteStore(store *StoreInfo) { + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() + bc.Stores.DeleteStore(store) +} + +/* Regions read operations */ + +// GetRegion searches for a region by ID. +func (bc *BasicCluster) GetRegion(regionID uint64) *RegionInfo { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetRegion(regionID) +} + +// GetRegions gets all RegionInfo from regionMap. +func (bc *BasicCluster) GetRegions() []*RegionInfo { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetRegions() +} + +// GetMetaRegions gets a set of metapb.Region from regionMap. +func (bc *BasicCluster) GetMetaRegions() []*metapb.Region { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetMetaRegions() +} + +// GetStoreRegions gets all RegionInfo with a given storeID. +func (bc *BasicCluster) GetStoreRegions(storeID uint64) []*RegionInfo { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetStoreRegions(storeID) +} + +// GetLeaderStoreByRegionID returns the leader store of the given region. +func (bc *BasicCluster) GetLeaderStoreByRegionID(regionID uint64) *StoreInfo { + bc.Regions.mu.RLock() + region := bc.Regions.GetRegion(regionID) + if region == nil || region.GetLeader() == nil { + bc.Regions.mu.RUnlock() + return nil + } + bc.Regions.mu.RUnlock() + + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() + return bc.Stores.GetStore(region.GetLeader().GetStoreId()) +} + +// GetAdjacentRegions returns region's info that is adjacent with specific region. +func (bc *BasicCluster) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetAdjacentRegions(region) +} + +// GetRangeHoles returns all range holes, i.e the key ranges without any region info. +func (bc *BasicCluster) GetRangeHoles() [][]string { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetRangeHoles() } const randomRegionMaxRetry = 10 // RandFollowerRegion returns a random region that has a follower on the store. func (bc *BasicCluster) RandFollowerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo { - bc.RLock() + bc.Regions.mu.RLock() regions := bc.Regions.RandFollowerRegions(storeID, ranges, randomRegionMaxRetry) - bc.RUnlock() + bc.Regions.mu.RUnlock() return bc.selectRegion(regions, opts...) } // RandLeaderRegion returns a random region that has leader on the store. func (bc *BasicCluster) RandLeaderRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo { - bc.RLock() + bc.Regions.mu.RLock() regions := bc.Regions.RandLeaderRegions(storeID, ranges, randomRegionMaxRetry) - bc.RUnlock() + bc.Regions.mu.RUnlock() return bc.selectRegion(regions, opts...) } // RandPendingRegion returns a random region that has a pending peer on the store. func (bc *BasicCluster) RandPendingRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo { - bc.RLock() + bc.Regions.mu.RLock() regions := bc.Regions.RandPendingRegions(storeID, ranges, randomRegionMaxRetry) - bc.RUnlock() + bc.Regions.mu.RUnlock() return bc.selectRegion(regions, opts...) } // RandLearnerRegion returns a random region that has a learner peer on the store. func (bc *BasicCluster) RandLearnerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo { - bc.RLock() + bc.Regions.mu.RLock() regions := bc.Regions.RandLearnerRegions(storeID, ranges, randomRegionMaxRetry) - bc.RUnlock() + bc.Regions.mu.RUnlock() return bc.selectRegion(regions, opts...) } @@ -241,80 +299,112 @@ func (bc *BasicCluster) selectRegion(regions []*RegionInfo, opts ...RegionOption // GetRegionCount gets the total count of RegionInfo of regionMap. func (bc *BasicCluster) GetRegionCount() int { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetRegionCount() } -// GetStoreCount returns the total count of storeInfo. -func (bc *BasicCluster) GetStoreCount() int { - bc.RLock() - defer bc.RUnlock() - return bc.Stores.GetStoreCount() -} - // GetStoreRegionCount gets the total count of a store's leader and follower RegionInfo by storeID. func (bc *BasicCluster) GetStoreRegionCount(storeID uint64) int { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetStoreLeaderCount(storeID) + bc.Regions.GetStoreFollowerCount(storeID) + bc.Regions.GetStoreLearnerCount(storeID) } // GetStoreLeaderCount get the total count of a store's leader RegionInfo. func (bc *BasicCluster) GetStoreLeaderCount(storeID uint64) int { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetStoreLeaderCount(storeID) } // GetStoreFollowerCount get the total count of a store's follower RegionInfo. func (bc *BasicCluster) GetStoreFollowerCount(storeID uint64) int { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetStoreFollowerCount(storeID) } // GetStorePendingPeerCount gets the total count of a store's region that includes pending peer. func (bc *BasicCluster) GetStorePendingPeerCount(storeID uint64) int { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetStorePendingPeerCount(storeID) } // GetStoreLeaderRegionSize get total size of store's leader regions. func (bc *BasicCluster) GetStoreLeaderRegionSize(storeID uint64) int64 { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetStoreLeaderRegionSize(storeID) } // GetStoreRegionSize get total size of store's regions. func (bc *BasicCluster) GetStoreRegionSize(storeID uint64) int64 { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetStoreRegionSize(storeID) } // GetAverageRegionSize returns the average region approximate size. func (bc *BasicCluster) GetAverageRegionSize() int64 { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetAverageRegionSize() } +// GetRegionByKey searches RegionInfo from regionTree. +func (bc *BasicCluster) GetRegionByKey(regionKey []byte) *RegionInfo { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetRegionByKey(regionKey) +} + +// GetPrevRegionByKey searches previous RegionInfo from regionTree. +func (bc *BasicCluster) GetPrevRegionByKey(regionKey []byte) *RegionInfo { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetPrevRegionByKey(regionKey) +} + +// ScanRange scans regions intersecting [start key, end key), returns at most +// `limit` regions. limit <= 0 means no limit. +func (bc *BasicCluster) ScanRange(startKey, endKey []byte, limit int) []*RegionInfo { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.ScanRange(startKey, endKey, limit) +} + +// GetOverlaps returns the regions which are overlapped with the specified region range. +func (bc *BasicCluster) GetOverlaps(region *RegionInfo) []*RegionInfo { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetOverlaps(region) +} + +// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range. +func (bc *BasicCluster) GetRegionSizeByRange(startKey, endKey []byte) int64 { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetRegionSizeByRange(startKey, endKey) +} + func (bc *BasicCluster) getWriteRate( f func(storeID uint64) (bytesRate, keysRate float64), ) (storeIDs []uint64, bytesRates, keysRates []float64) { - bc.RLock() - defer bc.RUnlock() + bc.Stores.mu.RLock() count := len(bc.Stores.stores) storeIDs = make([]uint64, 0, count) + for _, store := range bc.Stores.stores { + storeIDs = append(storeIDs, store.GetID()) + } + bc.Stores.mu.RUnlock() bytesRates = make([]float64, 0, count) keysRates = make([]float64, 0, count) - for _, store := range bc.Stores.stores { - id := store.GetID() + for _, id := range storeIDs { + bc.Regions.mu.RLock() bytesRate, keysRate := f(id) - storeIDs = append(storeIDs, id) + bc.Regions.mu.RUnlock() bytesRates = append(bytesRates, bytesRate) keysRates = append(keysRates, keysRate) } @@ -323,8 +413,8 @@ func (bc *BasicCluster) getWriteRate( // GetRangeCount returns the number of regions that overlap with the range [startKey, endKey). func (bc *BasicCluster) GetRangeCount(startKey, endKey []byte) int { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetRangeCount(startKey, endKey) } @@ -338,30 +428,7 @@ func (bc *BasicCluster) GetStoresWriteRate() (storeIDs []uint64, bytesRates, key return bc.getWriteRate(bc.Regions.GetStoreWriteRate) } -// PutStore put a store. -func (bc *BasicCluster) PutStore(store *StoreInfo) { - bc.Lock() - defer bc.Unlock() - bc.Stores.SetStore(store) -} - -// ResetStores resets the store cache. -func (bc *BasicCluster) ResetStores() { - bc.Lock() - defer bc.Unlock() - bc.Stores = NewStoresInfo() -} - -// DeleteStore deletes a store. -func (bc *BasicCluster) DeleteStore(store *StoreInfo) { - bc.Lock() - defer bc.Unlock() - bc.Stores.DeleteStore(store) -} - -func (bc *BasicCluster) getRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) { - bc.RLock() - defer bc.RUnlock() +func (bc *BasicCluster) getRelevantRegionsLocked(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) { origin = bc.Regions.GetRegion(region.GetID()) if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { overlaps = bc.Regions.GetOverlaps(region) @@ -369,22 +436,20 @@ func (bc *BasicCluster) getRelevantRegions(region *RegionInfo) (origin *RegionIn return } -func isRegionRecreated(region *RegionInfo) bool { - // Regions recreated by online unsafe recover have both ver and conf ver equal to 1. To - // prevent stale bootstrap region (first region in a cluster which covers the entire key - // range) from reporting stale info, we exclude regions that covers the entire key range - // here. Technically, it is possible for unsafe recover to recreate such region, but that - // means the entire key range is unavailable, and we don't expect unsafe recover to perform - // better than recreating the cluster. - return region.GetRegionEpoch().GetVersion() == 1 && region.GetRegionEpoch().GetConfVer() == 1 && (len(region.GetStartKey()) != 0 || len(region.GetEndKey()) != 0) -} +/* Regions write operations */ // PreCheckPutRegion checks if the region is valid to put. func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) { - origin, overlaps := bc.getRelevantRegions(region) + bc.Regions.mu.RLock() + origin, overlaps := bc.getRelevantRegionsLocked(region) + bc.Regions.mu.RUnlock() + return bc.check(region, origin, overlaps) +} + +func (bc *BasicCluster) check(region, origin *RegionInfo, overlaps []*RegionInfo) (*RegionInfo, error) { for _, item := range overlaps { // PD ignores stale regions' heartbeats, unless it is recreated recently by unsafe recover operation. - if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() && !isRegionRecreated(region) { + if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() && !region.isRegionRecreated() { return nil, errRegionIsStale(region.GetMeta(), item.GetMeta()) } } @@ -397,27 +462,13 @@ func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, erro // TiKV reports term after v3.0 isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm() // Region meta is stale, return an error. - if (isTermBehind || r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer()) && !isRegionRecreated(region) { + if (isTermBehind || r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer()) && !region.isRegionRecreated() { return origin, errRegionIsStale(region.GetMeta(), origin.GetMeta()) } return origin, nil } -// PutRegion put a region. -func (bc *BasicCluster) PutRegion(region *RegionInfo) []*RegionInfo { - bc.Lock() - defer bc.Unlock() - return bc.Regions.SetRegion(region) -} - -// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range. -func (bc *BasicCluster) GetRegionSizeByRange(startKey, endKey []byte) int64 { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetRegionSizeByRange(startKey, endKey) -} - // CheckAndPutRegion checks if the region is valid to put, if valid then put. func (bc *BasicCluster) CheckAndPutRegion(region *RegionInfo) []*RegionInfo { origin, err := bc.PreCheckPutRegion(region) @@ -429,10 +480,29 @@ func (bc *BasicCluster) CheckAndPutRegion(region *RegionInfo) []*RegionInfo { return bc.PutRegion(region) } +// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put. +func (bc *BasicCluster) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo, error) { + bc.Regions.mu.Lock() + defer bc.Regions.mu.Unlock() + origin, overlaps := bc.getRelevantRegionsLocked(region) + _, err := bc.check(region, origin, overlaps) + if err != nil { + return nil, err + } + return bc.Regions.SetRegion(region), nil +} + +// PutRegion put a region. +func (bc *BasicCluster) PutRegion(region *RegionInfo) []*RegionInfo { + bc.Regions.mu.Lock() + defer bc.Regions.mu.Unlock() + return bc.Regions.SetRegion(region) +} + // RemoveRegionIfExist removes RegionInfo from regionTree and regionMap if exists. func (bc *BasicCluster) RemoveRegionIfExist(id uint64) { - bc.Lock() - defer bc.Unlock() + bc.Regions.mu.Lock() + defer bc.Regions.mu.Unlock() if r := bc.Regions.GetRegion(id); r != nil { bc.Regions.RemoveRegion(r) } @@ -440,47 +510,18 @@ func (bc *BasicCluster) RemoveRegionIfExist(id uint64) { // ResetRegionCache drops all region cache. func (bc *BasicCluster) ResetRegionCache() { - bc.Lock() - defer bc.Unlock() - bc.Regions = NewRegionsInfo() + bc.Regions.mu.Lock() + defer bc.Regions.mu.Unlock() + bc.Regions.RegionsInfo = NewRegionsInfo() } // RemoveRegion removes RegionInfo from regionTree and regionMap. func (bc *BasicCluster) RemoveRegion(region *RegionInfo) { - bc.Lock() - defer bc.Unlock() + bc.Regions.mu.Lock() + defer bc.Regions.mu.Unlock() bc.Regions.RemoveRegion(region) } -// GetRegionByKey searches RegionInfo from regionTree. -func (bc *BasicCluster) GetRegionByKey(regionKey []byte) *RegionInfo { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetRegionByKey(regionKey) -} - -// GetPrevRegionByKey searches previous RegionInfo from regionTree. -func (bc *BasicCluster) GetPrevRegionByKey(regionKey []byte) *RegionInfo { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetPrevRegionByKey(regionKey) -} - -// ScanRange scans regions intersecting [start key, end key), returns at most -// `limit` regions. limit <= 0 means no limit. -func (bc *BasicCluster) ScanRange(startKey, endKey []byte, limit int) []*RegionInfo { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.ScanRange(startKey, endKey, limit) -} - -// GetOverlaps returns the regions which are overlapped with the specified region range. -func (bc *BasicCluster) GetOverlaps(region *RegionInfo) []*RegionInfo { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetOverlaps(region) -} - // RegionSetInformer provides access to a shared informer of regions. type RegionSetInformer interface { GetRegionCount() int diff --git a/server/core/region.go b/server/core/region.go index 84aaea41acfe..e43ddd032ad9 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -558,6 +558,20 @@ func (r *RegionInfo) IsFromHeartbeat() bool { return r.fromHeartbeat } +func (r *RegionInfo) isInvolved(startKey, endKey []byte) bool { + return bytes.Compare(r.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(r.GetEndKey()) > 0 && bytes.Compare(r.GetEndKey(), endKey) <= 0)) +} + +func (r *RegionInfo) isRegionRecreated() bool { + // Regions recreated by online unsafe recover have both ver and conf ver equal to 1. To + // prevent stale bootstrap region (first region in a cluster which covers the entire key + // range) from reporting stale info, we exclude regions that covers the entire key range + // here. Technically, it is possible for unsafe recover to recreate such region, but that + // means the entire key range is unavailable, and we don't expect unsafe recover to perform + // better than recreating the cluster. + return r.GetRegionEpoch().GetVersion() == 1 && r.GetRegionEpoch().GetConfVer() == 1 && (len(r.GetStartKey()) != 0 || len(r.GetEndKey()) != 0) +} + // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) @@ -1283,10 +1297,6 @@ func DiffRegionKeyInfo(origin *RegionInfo, other *RegionInfo) string { return strings.Join(ret, ", ") } -func isInvolved(region *RegionInfo, startKey, endKey []byte) bool { - return bytes.Compare(region.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(region.GetEndKey()) > 0 && bytes.Compare(region.GetEndKey(), endKey) <= 0)) -} - // String converts slice of bytes to string without copy. func String(b []byte) (s string) { if len(b) == 0 { diff --git a/server/core/region_tree.go b/server/core/region_tree.go index aa32b8a10430..a5d717a93e98 100644 --- a/server/core/region_tree.go +++ b/server/core/region_tree.go @@ -273,7 +273,7 @@ func (t *regionTree) RandomRegion(ranges []KeyRange) *RegionInfo { } index := rand.Intn(endIndex-startIndex) + startIndex region := t.tree.GetAt(index).(*regionItem).region - if isInvolved(region, startKey, endKey) { + if region.isInvolved(startKey, endKey) { return region } }