From 59181bd9d8dcb4269a6cc39bc30f3375e64511d3 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 11 Oct 2022 14:17:28 +0800 Subject: [PATCH 1/6] use independent lock Signed-off-by: Ryan Leung --- server/cluster/cluster.go | 15 +- server/cluster/cluster_test.go | 48 ++-- server/core/basic_cluster.go | 409 +++++++++++++++++---------------- server/core/region.go | 10 + 4 files changed, 254 insertions(+), 228 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index a29b35fac9b..13afe66188b 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -806,8 +806,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { }) var overlaps []*core.RegionInfo - c.Lock() if saveCache { + c.Lock() // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, // check its validation again here. // @@ -837,19 +837,18 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { for key := range storeMap { c.updateStoreStatusLocked(key) } - regionEventCounter.WithLabelValues("update_cache").Inc() - } - if !c.IsPrepared() && isNew { - c.coordinator.prepareChecker.collect(region) + c.Unlock() + regionEventCounter.WithLabelValues("update_cache").Inc() } if c.regionStats != nil { c.regionStats.Observe(region, c.getRegionStoresLocked(region)) } - changedRegions := c.changedRegions - c.Unlock() + if !c.IsPrepared() && isNew { + c.coordinator.prepareChecker.collect(region) + } if c.storage != nil { // If there are concurrent heartbeats from the same region, the last write will win even if @@ -877,7 +876,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { if saveKV || needSync { select { - case changedRegions <- region: + case c.changedRegions <- region: default: } } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 50fa24e1c89..dabbe5e2f91 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -61,7 +61,7 @@ func TestStoreHeartbeat(t *testing.T) { for _, region := range regions { re.NoError(cluster.putRegion(region)) } - re.Equal(int(n), cluster.core.Regions.GetRegionCount()) + re.Equal(int(n), cluster.core.Regions.RegionsInfo.GetRegionCount()) for i, store := range stores { storeStats := &pdpb.StoreStats{ @@ -700,25 +700,25 @@ func TestRegionHeartbeat(t *testing.T) { for i, region := range regions { // region does not exist. re.NoError(cluster.processRegionHeartbeat(region)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is the same, not updated. re.NoError(cluster.processRegionHeartbeat(region)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) origin := region // region is updated. region = origin.Clone(core.WithIncVersion()) regions[i] = region re.NoError(cluster.processRegionHeartbeat(region)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is stale (Version). stale := origin.Clone(core.WithIncConfVer()) re.Error(cluster.processRegionHeartbeat(stale)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is updated @@ -728,13 +728,13 @@ func TestRegionHeartbeat(t *testing.T) { ) regions[i] = region re.NoError(cluster.processRegionHeartbeat(region)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is stale (ConfVer). stale = origin.Clone(core.WithIncConfVer()) re.Error(cluster.processRegionHeartbeat(stale)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // Add a down peer. @@ -746,38 +746,38 @@ func TestRegionHeartbeat(t *testing.T) { })) regions[i] = region re.NoError(cluster.processRegionHeartbeat(region)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Add a pending peer. region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetPeers()[rand.Intn(len(region.GetPeers()))]})) regions[i] = region re.NoError(cluster.processRegionHeartbeat(region)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Clear down peers. region = region.Clone(core.WithDownPeers(nil)) regions[i] = region re.NoError(cluster.processRegionHeartbeat(region)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Clear pending peers. region = region.Clone(core.WithPendingPeers(nil)) regions[i] = region re.NoError(cluster.processRegionHeartbeat(region)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Remove peers. origin = region region = origin.Clone(core.SetPeers(region.GetPeers()[:1])) regions[i] = region re.NoError(cluster.processRegionHeartbeat(region)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // Add peers. region = origin regions[i] = region re.NoError(cluster.processRegionHeartbeat(region)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // Change one peer to witness @@ -787,37 +787,37 @@ func TestRegionHeartbeat(t *testing.T) { ) regions[i] = region re.NoError(cluster.processRegionHeartbeat(region)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Change leader. region = region.Clone(core.WithLeader(region.GetPeers()[1])) regions[i] = region re.NoError(cluster.processRegionHeartbeat(region)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Change ApproximateSize. region = region.Clone(core.SetApproximateSize(144)) regions[i] = region re.NoError(cluster.processRegionHeartbeat(region)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Change ApproximateKeys. region = region.Clone(core.SetApproximateKeys(144000)) regions[i] = region re.NoError(cluster.processRegionHeartbeat(region)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Change bytes written. region = region.Clone(core.SetWrittenBytes(24000)) regions[i] = region re.NoError(cluster.processRegionHeartbeat(region)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) // Change bytes read. region = region.Clone(core.SetReadBytes(1080000)) regions[i] = region re.NoError(cluster.processRegionHeartbeat(region)) - checkRegions(re, cluster.core.Regions, regions[:i+1]) + checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1]) } regionCounts := make(map[uint64]int) @@ -848,10 +848,10 @@ func TestRegionHeartbeat(t *testing.T) { } for _, store := range cluster.core.Stores.GetStores() { - re.Equal(cluster.core.Regions.GetStoreLeaderCount(store.GetID()), store.GetLeaderCount()) - re.Equal(cluster.core.Regions.GetStoreRegionCount(store.GetID()), store.GetRegionCount()) - re.Equal(cluster.core.Regions.GetStoreLeaderRegionSize(store.GetID()), store.GetLeaderSize()) - re.Equal(cluster.core.Regions.GetStoreRegionSize(store.GetID()), store.GetRegionSize()) + re.Equal(cluster.core.Regions.RegionsInfo.GetStoreLeaderCount(store.GetID()), store.GetLeaderCount()) + re.Equal(cluster.core.Regions.RegionsInfo.GetStoreRegionCount(store.GetID()), store.GetRegionCount()) + re.Equal(cluster.core.Regions.RegionsInfo.GetStoreLeaderRegionSize(store.GetID()), store.GetLeaderSize()) + re.Equal(cluster.core.Regions.RegionsInfo.GetStoreRegionSize(store.GetID()), store.GetRegionSize()) } // Test with storage. @@ -1648,7 +1648,7 @@ func Test(t *testing.T) { _, opts, err := newTestScheduleConfig() re.NoError(err) tc := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opts, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) - cache := tc.core.Regions + cache := tc.core.Regions.RegionsInfo for i := uint64(0); i < n; i++ { region := regions[i] diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index 5f4ad579fa4..9dc844bcc7e 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -27,72 +27,59 @@ import ( // BasicCluster provides basic data member and interface for a tikv cluster. type BasicCluster struct { - syncutil.RWMutex - Stores *StoresInfo - Regions *RegionsInfo + Stores struct { + mu syncutil.RWMutex + *StoresInfo + } + + Regions struct { + mu syncutil.RWMutex + *RegionsInfo + } } // NewBasicCluster creates a BasicCluster. func NewBasicCluster() *BasicCluster { return &BasicCluster{ - Stores: NewStoresInfo(), - Regions: NewRegionsInfo(), + Stores: struct { + mu syncutil.RWMutex + *StoresInfo + }{StoresInfo: NewStoresInfo()}, + + Regions: struct { + mu syncutil.RWMutex + *RegionsInfo + }{RegionsInfo: NewRegionsInfo()}, } } +/* Stores read operations */ + // GetStores returns all Stores in the cluster. func (bc *BasicCluster) GetStores() []*StoreInfo { - bc.RLock() - defer bc.RUnlock() + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() return bc.Stores.GetStores() } // GetMetaStores gets a complete set of metapb.Store. func (bc *BasicCluster) GetMetaStores() []*metapb.Store { - bc.RLock() - defer bc.RUnlock() + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() return bc.Stores.GetMetaStores() } // GetStore searches for a store by ID. func (bc *BasicCluster) GetStore(storeID uint64) *StoreInfo { - bc.RLock() - defer bc.RUnlock() + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() return bc.Stores.GetStore(storeID) } -// GetRegion searches for a region by ID. -func (bc *BasicCluster) GetRegion(regionID uint64) *RegionInfo { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetRegion(regionID) -} - -// GetRegions gets all RegionInfo from regionMap. -func (bc *BasicCluster) GetRegions() []*RegionInfo { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetRegions() -} - -// GetMetaRegions gets a set of metapb.Region from regionMap. -func (bc *BasicCluster) GetMetaRegions() []*metapb.Region { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetMetaRegions() -} - -// GetStoreRegions gets all RegionInfo with a given storeID. -func (bc *BasicCluster) GetStoreRegions(storeID uint64) []*RegionInfo { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetStoreRegions(storeID) -} - // GetRegionStores returns all Stores that contains the region's peer. func (bc *BasicCluster) GetRegionStores(region *RegionInfo) []*StoreInfo { - bc.RLock() - defer bc.RUnlock() + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() var Stores []*StoreInfo for id := range region.GetStoreIDs() { if store := bc.Stores.GetStore(id); store != nil { @@ -104,8 +91,8 @@ func (bc *BasicCluster) GetRegionStores(region *RegionInfo) []*StoreInfo { // GetFollowerStores returns all Stores that contains the region's follower peer. func (bc *BasicCluster) GetFollowerStores(region *RegionInfo) []*StoreInfo { - bc.RLock() - defer bc.RUnlock() + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() var Stores []*StoreInfo for id := range region.GetFollowers() { if store := bc.Stores.GetStore(id); store != nil { @@ -115,188 +102,279 @@ func (bc *BasicCluster) GetFollowerStores(region *RegionInfo) []*StoreInfo { return Stores } -// GetLeaderStoreByRegionID returns the leader store of the given region. -func (bc *BasicCluster) GetLeaderStoreByRegionID(regionID uint64) *StoreInfo { - bc.RLock() - defer bc.RUnlock() - region := bc.Regions.GetRegion(regionID) - if region == nil || region.GetLeader() == nil { - return nil - } - return bc.Stores.GetStore(region.GetLeader().GetStoreId()) -} - // GetLeaderStore returns all Stores that contains the region's leader peer. func (bc *BasicCluster) GetLeaderStore(region *RegionInfo) *StoreInfo { - bc.RLock() - defer bc.RUnlock() + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() return bc.Stores.GetStore(region.GetLeader().GetStoreId()) } -// GetAdjacentRegions returns region's info that is adjacent with specific region. -func (bc *BasicCluster) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetAdjacentRegions(region) +// GetStoreCount returns the total count of storeInfo. +func (bc *BasicCluster) GetStoreCount() int { + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() + return bc.Stores.GetStoreCount() } -// GetRangeHoles returns all range holes, i.e the key ranges without any region info. -func (bc *BasicCluster) GetRangeHoles() [][]string { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetRangeHoles() -} +/* Stores Write operations */ // PauseLeaderTransfer prevents the store from been selected as source or // target store of TransferLeader. func (bc *BasicCluster) PauseLeaderTransfer(storeID uint64) error { - bc.Lock() - defer bc.Unlock() + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() return bc.Stores.PauseLeaderTransfer(storeID) } // ResumeLeaderTransfer cleans a store's pause state. The store can be selected // as source or target of TransferLeader again. func (bc *BasicCluster) ResumeLeaderTransfer(storeID uint64) { - bc.Lock() - defer bc.Unlock() + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() bc.Stores.ResumeLeaderTransfer(storeID) } // SlowStoreEvicted marks a store as a slow store and prevents transferring // leader to the store func (bc *BasicCluster) SlowStoreEvicted(storeID uint64) error { - bc.Lock() - defer bc.Unlock() + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() return bc.Stores.SlowStoreEvicted(storeID) } // SlowStoreRecovered cleans the evicted state of a store. func (bc *BasicCluster) SlowStoreRecovered(storeID uint64) { - bc.Lock() - defer bc.Unlock() + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() bc.Stores.SlowStoreRecovered(storeID) } // ResetStoreLimit resets the limit for a specific store. func (bc *BasicCluster) ResetStoreLimit(storeID uint64, limitType storelimit.Type, ratePerSec ...float64) { - bc.Lock() - defer bc.Unlock() + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() bc.Stores.ResetStoreLimit(storeID, limitType, ratePerSec...) } // UpdateStoreStatus updates the information of the store. func (bc *BasicCluster) UpdateStoreStatus(storeID uint64, leaderCount int, regionCount int, pendingPeerCount int, leaderSize int64, regionSize int64, witnessCount int) { - bc.Lock() - defer bc.Unlock() + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() bc.Stores.UpdateStoreStatus(storeID, leaderCount, regionCount, pendingPeerCount, leaderSize, regionSize, witnessCount) } +// PutStore put a store. +func (bc *BasicCluster) PutStore(store *StoreInfo) { + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() + bc.Stores.SetStore(store) +} + +// ResetStores resets the store cache. +func (bc *BasicCluster) ResetStores() { + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() + bc.Stores.StoresInfo = NewStoresInfo() +} + +// DeleteStore deletes a store. +func (bc *BasicCluster) DeleteStore(store *StoreInfo) { + bc.Stores.mu.Lock() + defer bc.Stores.mu.Unlock() + bc.Stores.DeleteStore(store) +} + +/* Regions read operations */ + +// GetRegion searches for a region by ID. +func (bc *BasicCluster) GetRegion(regionID uint64) *RegionInfo { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetRegion(regionID) +} + +// GetRegions gets all RegionInfo from regionMap. +func (bc *BasicCluster) GetRegions() []*RegionInfo { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetRegions() +} + +// GetMetaRegions gets a set of metapb.Region from regionMap. +func (bc *BasicCluster) GetMetaRegions() []*metapb.Region { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetMetaRegions() +} + +// GetStoreRegions gets all RegionInfo with a given storeID. +func (bc *BasicCluster) GetStoreRegions(storeID uint64) []*RegionInfo { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetStoreRegions(storeID) +} + +// GetLeaderStoreByRegionID returns the leader store of the given region. +func (bc *BasicCluster) GetLeaderStoreByRegionID(regionID uint64) *StoreInfo { + bc.Regions.mu.RLock() + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() + defer bc.Regions.mu.RUnlock() + region := bc.Regions.GetRegion(regionID) + if region == nil || region.GetLeader() == nil { + return nil + } + return bc.Stores.GetStore(region.GetLeader().GetStoreId()) +} + +// GetAdjacentRegions returns region's info that is adjacent with specific region. +func (bc *BasicCluster) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetAdjacentRegions(region) +} + +// GetRangeHoles returns all range holes, i.e the key ranges without any region info. +func (bc *BasicCluster) GetRangeHoles() [][]string { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetRangeHoles() +} + const randomRegionMaxRetry = 10 // RandFollowerRegions returns a random region that has a follower on the store. func (bc *BasicCluster) RandFollowerRegions(storeID uint64, ranges []KeyRange) []*RegionInfo { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.RandFollowerRegions(storeID, ranges, randomRegionMaxRetry) } // RandLeaderRegions returns a random region that has leader on the store. func (bc *BasicCluster) RandLeaderRegions(storeID uint64, ranges []KeyRange) []*RegionInfo { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.RandLeaderRegions(storeID, ranges, randomRegionMaxRetry) } // RandPendingRegions returns a random region that has a pending peer on the store. func (bc *BasicCluster) RandPendingRegions(storeID uint64, ranges []KeyRange) []*RegionInfo { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.RandPendingRegions(storeID, ranges, randomRegionMaxRetry) } // RandLearnerRegions returns a random region that has a learner peer on the store. func (bc *BasicCluster) RandLearnerRegions(storeID uint64, ranges []KeyRange) []*RegionInfo { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.RandLearnerRegions(storeID, ranges, randomRegionMaxRetry) } // GetRegionCount gets the total count of RegionInfo of regionMap. func (bc *BasicCluster) GetRegionCount() int { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetRegionCount() } -// GetStoreCount returns the total count of storeInfo. -func (bc *BasicCluster) GetStoreCount() int { - bc.RLock() - defer bc.RUnlock() - return bc.Stores.GetStoreCount() -} - // GetStoreRegionCount gets the total count of a store's leader and follower RegionInfo by storeID. func (bc *BasicCluster) GetStoreRegionCount(storeID uint64) int { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetStoreLeaderCount(storeID) + bc.Regions.GetStoreFollowerCount(storeID) + bc.Regions.GetStoreLearnerCount(storeID) } // GetStoreLeaderCount get the total count of a store's leader RegionInfo. func (bc *BasicCluster) GetStoreLeaderCount(storeID uint64) int { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetStoreLeaderCount(storeID) } // GetStoreFollowerCount get the total count of a store's follower RegionInfo. func (bc *BasicCluster) GetStoreFollowerCount(storeID uint64) int { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetStoreFollowerCount(storeID) } // GetStorePendingPeerCount gets the total count of a store's region that includes pending peer. func (bc *BasicCluster) GetStorePendingPeerCount(storeID uint64) int { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetStorePendingPeerCount(storeID) } // GetStoreWitnessCount gets the total count of a store's witness RegionInfo. func (bc *BasicCluster) GetStoreWitnessCount(storeID uint64) int { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetStoreWitnessCount(storeID) } // GetStoreLeaderRegionSize get total size of store's leader regions. func (bc *BasicCluster) GetStoreLeaderRegionSize(storeID uint64) int64 { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetStoreLeaderRegionSize(storeID) } // GetStoreRegionSize get total size of store's regions. func (bc *BasicCluster) GetStoreRegionSize(storeID uint64) int64 { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetStoreRegionSize(storeID) } // GetAverageRegionSize returns the average region approximate size. func (bc *BasicCluster) GetAverageRegionSize() int64 { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.Regions.GetAverageRegionSize() } +// GetRegionByKey searches RegionInfo from regionTree. +func (bc *BasicCluster) GetRegionByKey(regionKey []byte) *RegionInfo { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetRegionByKey(regionKey) +} + +// GetPrevRegionByKey searches previous RegionInfo from regionTree. +func (bc *BasicCluster) GetPrevRegionByKey(regionKey []byte) *RegionInfo { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetPrevRegionByKey(regionKey) +} + +// ScanRange scans regions intersecting [start key, end key), returns at most +// `limit` regions. limit <= 0 means no limit. +func (bc *BasicCluster) ScanRange(startKey, endKey []byte, limit int) []*RegionInfo { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.ScanRange(startKey, endKey, limit) +} + +// GetOverlaps returns the regions which are overlapped with the specified region range. +func (bc *BasicCluster) GetOverlaps(region *RegionInfo) []*RegionInfo { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetOverlaps(region) +} + +// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range. +func (bc *BasicCluster) GetRegionSizeByRange(startKey, endKey []byte) int64 { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() + return bc.Regions.GetRegionSizeByRange(startKey, endKey) +} + func (bc *BasicCluster) getWriteRate( f func(storeID uint64) (bytesRate, keysRate float64), ) (storeIDs []uint64, bytesRates, keysRates []float64) { - bc.RLock() - defer bc.RUnlock() + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() count := len(bc.Stores.stores) storeIDs = make([]uint64, 0, count) bytesRates = make([]float64, 0, count) @@ -313,38 +391,21 @@ func (bc *BasicCluster) getWriteRate( // GetStoresLeaderWriteRate get total write rate of each store's leaders. func (bc *BasicCluster) GetStoresLeaderWriteRate() (storeIDs []uint64, bytesRates, keysRates []float64) { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.getWriteRate(bc.Regions.GetStoreLeaderWriteRate) } // GetStoresWriteRate get total write rate of each store's regions. func (bc *BasicCluster) GetStoresWriteRate() (storeIDs []uint64, bytesRates, keysRates []float64) { + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() return bc.getWriteRate(bc.Regions.GetStoreWriteRate) } -// PutStore put a store. -func (bc *BasicCluster) PutStore(store *StoreInfo) { - bc.Lock() - defer bc.Unlock() - bc.Stores.SetStore(store) -} - -// ResetStores resets the store cache. -func (bc *BasicCluster) ResetStores() { - bc.Lock() - defer bc.Unlock() - bc.Stores = NewStoresInfo() -} - -// DeleteStore deletes a store. -func (bc *BasicCluster) DeleteStore(store *StoreInfo) { - bc.Lock() - defer bc.Unlock() - bc.Stores.DeleteStore(store) -} - func (bc *BasicCluster) getRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) { - bc.RLock() - defer bc.RUnlock() + bc.Regions.mu.RLock() + defer bc.Regions.mu.RUnlock() origin = bc.Regions.GetRegion(region.GetID()) if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { overlaps = bc.Regions.GetOverlaps(region) @@ -352,15 +413,7 @@ func (bc *BasicCluster) getRelevantRegions(region *RegionInfo) (origin *RegionIn return } -func isRegionRecreated(region *RegionInfo) bool { - // Regions recreated by online unsafe recover have both ver and conf ver equal to 1. To - // prevent stale bootstrap region (first region in a cluster which covers the entire key - // range) from reporting stale info, we exclude regions that covers the entire key range - // here. Technically, it is possible for unsafe recover to recreate such region, but that - // means the entire key range is unavailable, and we don't expect unsafe recover to perform - // better than recreating the cluster. - return region.GetRegionEpoch().GetVersion() == 1 && region.GetRegionEpoch().GetConfVer() == 1 && (len(region.GetStartKey()) != 0 || len(region.GetEndKey()) != 0) -} +/* Regions write operations */ // PreCheckPutRegion checks if the region is valid to put. func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) { @@ -387,20 +440,6 @@ func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, erro return origin, nil } -// PutRegion put a region. -func (bc *BasicCluster) PutRegion(region *RegionInfo) []*RegionInfo { - bc.Lock() - defer bc.Unlock() - return bc.Regions.SetRegion(region) -} - -// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range. -func (bc *BasicCluster) GetRegionSizeByRange(startKey, endKey []byte) int64 { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetRegionSizeByRange(startKey, endKey) -} - // CheckAndPutRegion checks if the region is valid to put, if valid then put. func (bc *BasicCluster) CheckAndPutRegion(region *RegionInfo) []*RegionInfo { origin, err := bc.PreCheckPutRegion(region) @@ -412,10 +451,17 @@ func (bc *BasicCluster) CheckAndPutRegion(region *RegionInfo) []*RegionInfo { return bc.PutRegion(region) } +// PutRegion put a region. +func (bc *BasicCluster) PutRegion(region *RegionInfo) []*RegionInfo { + bc.Regions.mu.Lock() + defer bc.Regions.mu.Unlock() + return bc.Regions.SetRegion(region) +} + // RemoveRegionIfExist removes RegionInfo from regionTree and regionMap if exists. func (bc *BasicCluster) RemoveRegionIfExist(id uint64) { - bc.Lock() - defer bc.Unlock() + bc.Regions.mu.Lock() + defer bc.Regions.mu.Unlock() if r := bc.Regions.GetRegion(id); r != nil { bc.Regions.RemoveRegion(r) } @@ -423,47 +469,18 @@ func (bc *BasicCluster) RemoveRegionIfExist(id uint64) { // ResetRegionCache drops all region cache. func (bc *BasicCluster) ResetRegionCache() { - bc.Lock() - defer bc.Unlock() - bc.Regions = NewRegionsInfo() + bc.Regions.mu.Lock() + defer bc.Regions.mu.Unlock() + bc.Regions.RegionsInfo = NewRegionsInfo() } // RemoveRegion removes RegionInfo from regionTree and regionMap. func (bc *BasicCluster) RemoveRegion(region *RegionInfo) { - bc.Lock() - defer bc.Unlock() + bc.Regions.mu.Lock() + defer bc.Regions.mu.Unlock() bc.Regions.RemoveRegion(region) } -// GetRegionByKey searches RegionInfo from regionTree. -func (bc *BasicCluster) GetRegionByKey(regionKey []byte) *RegionInfo { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetRegionByKey(regionKey) -} - -// GetPrevRegionByKey searches previous RegionInfo from regionTree. -func (bc *BasicCluster) GetPrevRegionByKey(regionKey []byte) *RegionInfo { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetPrevRegionByKey(regionKey) -} - -// ScanRange scans regions intersecting [start key, end key), returns at most -// `limit` regions. limit <= 0 means no limit. -func (bc *BasicCluster) ScanRange(startKey, endKey []byte, limit int) []*RegionInfo { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.ScanRange(startKey, endKey, limit) -} - -// GetOverlaps returns the regions which are overlapped with the specified region range. -func (bc *BasicCluster) GetOverlaps(region *RegionInfo) []*RegionInfo { - bc.RLock() - defer bc.RUnlock() - return bc.Regions.GetOverlaps(region) -} - // RegionSetInformer provides access to a shared informer of regions. type RegionSetInformer interface { GetRegionCount() int diff --git a/server/core/region.go b/server/core/region.go index 5ddee39865f..94fc22fd57d 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -1293,6 +1293,16 @@ func isInvolved(region *RegionInfo, startKey, endKey []byte) bool { return bytes.Compare(region.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(region.GetEndKey()) > 0 && bytes.Compare(region.GetEndKey(), endKey) <= 0)) } +func isRegionRecreated(region *RegionInfo) bool { + // Regions recreated by online unsafe recover have both ver and conf ver equal to 1. To + // prevent stale bootstrap region (first region in a cluster which covers the entire key + // range) from reporting stale info, we exclude regions that covers the entire key range + // here. Technically, it is possible for unsafe recover to recreate such region, but that + // means the entire key range is unavailable, and we don't expect unsafe recover to perform + // better than recreating the cluster. + return region.GetRegionEpoch().GetVersion() == 1 && region.GetRegionEpoch().GetConfVer() == 1 && (len(region.GetStartKey()) != 0 || len(region.GetEndKey()) != 0) +} + // String converts slice of bytes to string without copy. func String(b []byte) (s string) { if len(b) == 0 { From bcd7f31f2fec2e26200e4232fac67abd34ec1fae Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 13 Oct 2022 11:52:51 +0800 Subject: [PATCH 2/6] remove raft cluster lock from region heartbeat Signed-off-by: Ryan Leung --- server/cluster/cluster.go | 19 +++------------- server/core/basic_cluster.go | 44 ++++++++++++++++++++++++++++++++++-- 2 files changed, 45 insertions(+), 18 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 13afe66188b..984c37812c1 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -807,16 +807,14 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { var overlaps []*core.RegionInfo if saveCache { - c.Lock() // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, // check its validation again here. // // However it can't solve the race condition of concurrent heartbeats from the same region. - if _, err := c.core.PreCheckPutRegion(region); err != nil { - c.Unlock() + if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil { return err } - overlaps = c.core.PutRegion(region) + for _, item := range overlaps { if c.regionStats != nil { c.regionStats.ClearDefunctRegion(item.GetID()) @@ -835,10 +833,9 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } } for key := range storeMap { - c.updateStoreStatusLocked(key) + c.core.UpdateStoreStatus(key) } - c.Unlock() regionEventCounter.WithLabelValues("update_cache").Inc() } @@ -884,16 +881,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { return nil } -func (c *RaftCluster) updateStoreStatusLocked(id uint64) { - leaderCount := c.core.GetStoreLeaderCount(id) - regionCount := c.core.GetStoreRegionCount(id) - witnessCount := c.core.GetStoreWitnessCount(id) - pendingPeerCount := c.core.GetStorePendingPeerCount(id) - leaderRegionSize := c.core.GetStoreLeaderRegionSize(id) - regionSize := c.core.GetStoreRegionSize(id) - c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize, witnessCount) -} - func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error { if c.storage != nil { if err := c.storage.SaveMeta(meta); err != nil { diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index 9dc844bcc7e..8a53e8005f1 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -157,10 +157,18 @@ func (bc *BasicCluster) ResetStoreLimit(storeID uint64, limitType storelimit.Typ } // UpdateStoreStatus updates the information of the store. -func (bc *BasicCluster) UpdateStoreStatus(storeID uint64, leaderCount int, regionCount int, pendingPeerCount int, leaderSize int64, regionSize int64, witnessCount int) { +func (bc *BasicCluster) UpdateStoreStatus(storeID uint64) { + bc.Regions.mu.RLock() + leaderCount := bc.Regions.GetStoreLeaderCount(storeID) + regionCount := bc.Regions.GetStoreRegionCount(storeID) + witnessCount := bc.Regions.GetStoreWitnessCount(storeID) + pendingPeerCount := bc.Regions.GetStorePendingPeerCount(storeID) + leaderRegionSize := bc.Regions.GetStoreLeaderRegionSize(storeID) + regionSize := bc.Regions.GetStoreRegionSize(storeID) + bc.Regions.mu.RUnlock() bc.Stores.mu.Lock() defer bc.Stores.mu.Unlock() - bc.Stores.UpdateStoreStatus(storeID, leaderCount, regionCount, pendingPeerCount, leaderSize, regionSize, witnessCount) + bc.Stores.UpdateStoreStatus(storeID, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize, witnessCount) } // PutStore put a store. @@ -451,6 +459,38 @@ func (bc *BasicCluster) CheckAndPutRegion(region *RegionInfo) []*RegionInfo { return bc.PutRegion(region) } +// AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put. +func (bc *BasicCluster) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo, error) { + bc.Regions.mu.Lock() + defer bc.Regions.mu.Unlock() + var overlaps []*RegionInfo + origin := bc.Regions.GetRegion(region.GetID()) + if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { + overlaps = bc.Regions.GetOverlaps(region) + } + for _, item := range overlaps { + // PD ignores stale regions' heartbeats, unless it is recreated recently by unsafe recover operation. + if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() && !isRegionRecreated(region) { + return nil, errRegionIsStale(region.GetMeta(), item.GetMeta()) + } + } + + if origin == nil { + return bc.Regions.SetRegion(region), nil + } + + r := region.GetRegionEpoch() + o := origin.GetRegionEpoch() + // TiKV reports term after v3.0 + isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm() + // Region meta is stale, return an error. + if (isTermBehind || r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer()) && !isRegionRecreated(region) { + return nil, errRegionIsStale(region.GetMeta(), origin.GetMeta()) + } + + return bc.Regions.SetRegion(region), nil +} + // PutRegion put a region. func (bc *BasicCluster) PutRegion(region *RegionInfo) []*RegionInfo { bc.Regions.mu.Lock() From 86108651f8f5e25cdb34a89578656f829dec4256 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 17 Oct 2022 14:54:02 +0800 Subject: [PATCH 3/6] address the comment Signed-off-by: Ryan Leung --- server/core/basic_cluster.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index 8a53e8005f1..2ec2cc054d5 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -225,13 +225,14 @@ func (bc *BasicCluster) GetStoreRegions(storeID uint64) []*RegionInfo { // GetLeaderStoreByRegionID returns the leader store of the given region. func (bc *BasicCluster) GetLeaderStoreByRegionID(regionID uint64) *StoreInfo { bc.Regions.mu.RLock() - bc.Stores.mu.RLock() - defer bc.Stores.mu.RUnlock() - defer bc.Regions.mu.RUnlock() region := bc.Regions.GetRegion(regionID) if region == nil || region.GetLeader() == nil { return nil } + bc.Regions.mu.RUnlock() + + bc.Stores.mu.RLock() + defer bc.Stores.mu.RUnlock() return bc.Stores.GetStore(region.GetLeader().GetStoreId()) } @@ -400,15 +401,17 @@ func (bc *BasicCluster) getWriteRate( // GetStoresLeaderWriteRate get total write rate of each store's leaders. func (bc *BasicCluster) GetStoresLeaderWriteRate() (storeIDs []uint64, bytesRates, keysRates []float64) { bc.Regions.mu.RLock() - defer bc.Regions.mu.RUnlock() - return bc.getWriteRate(bc.Regions.GetStoreLeaderWriteRate) + storeLeaderWriteRate := bc.Regions.GetStoreLeaderWriteRate + bc.Regions.mu.RUnlock() + return bc.getWriteRate(storeLeaderWriteRate) } // GetStoresWriteRate get total write rate of each store's regions. func (bc *BasicCluster) GetStoresWriteRate() (storeIDs []uint64, bytesRates, keysRates []float64) { bc.Regions.mu.RLock() - defer bc.Regions.mu.RUnlock() - return bc.getWriteRate(bc.Regions.GetStoreWriteRate) + storeWriteRate := bc.Regions.GetStoreWriteRate + bc.Regions.mu.RUnlock() + return bc.getWriteRate(storeWriteRate) } func (bc *BasicCluster) getRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) { From aa2489e6fe229eb16166733610cda7e9af466b0d Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 18 Oct 2022 10:55:43 +0800 Subject: [PATCH 4/6] address the comment Signed-off-by: Ryan Leung --- server/core/basic_cluster.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index 2ec2cc054d5..070edbb86ba 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -227,6 +227,7 @@ func (bc *BasicCluster) GetLeaderStoreByRegionID(regionID uint64) *StoreInfo { bc.Regions.mu.RLock() region := bc.Regions.GetRegion(regionID) if region == nil || region.GetLeader() == nil { + bc.Regions.mu.RUnlock() return nil } bc.Regions.mu.RUnlock() From 8af4e0041136399ff8c69e56fe79b7fd666b7ee4 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 21 Oct 2022 13:08:39 +0800 Subject: [PATCH 5/6] address comments Signed-off-by: Ryan Leung --- server/core/basic_cluster.go | 20 ++++++++------------ server/core/region.go | 28 ++++++++++++++-------------- server/core/region_tree.go | 2 +- 3 files changed, 23 insertions(+), 27 deletions(-) diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index 070edbb86ba..8b7b9acf9dd 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -391,7 +391,9 @@ func (bc *BasicCluster) getWriteRate( keysRates = make([]float64, 0, count) for _, store := range bc.Stores.stores { id := store.GetID() + bc.Regions.mu.RLock() bytesRate, keysRate := f(id) + bc.Regions.mu.RUnlock() storeIDs = append(storeIDs, id) bytesRates = append(bytesRates, bytesRate) keysRates = append(keysRates, keysRate) @@ -401,18 +403,12 @@ func (bc *BasicCluster) getWriteRate( // GetStoresLeaderWriteRate get total write rate of each store's leaders. func (bc *BasicCluster) GetStoresLeaderWriteRate() (storeIDs []uint64, bytesRates, keysRates []float64) { - bc.Regions.mu.RLock() - storeLeaderWriteRate := bc.Regions.GetStoreLeaderWriteRate - bc.Regions.mu.RUnlock() - return bc.getWriteRate(storeLeaderWriteRate) + return bc.getWriteRate(bc.Regions.GetStoreLeaderWriteRate) } // GetStoresWriteRate get total write rate of each store's regions. func (bc *BasicCluster) GetStoresWriteRate() (storeIDs []uint64, bytesRates, keysRates []float64) { - bc.Regions.mu.RLock() - storeWriteRate := bc.Regions.GetStoreWriteRate - bc.Regions.mu.RUnlock() - return bc.getWriteRate(storeWriteRate) + return bc.getWriteRate(bc.Regions.GetStoreWriteRate) } func (bc *BasicCluster) getRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) { @@ -432,7 +428,7 @@ func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, erro origin, overlaps := bc.getRelevantRegions(region) for _, item := range overlaps { // PD ignores stale regions' heartbeats, unless it is recreated recently by unsafe recover operation. - if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() && !isRegionRecreated(region) { + if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() && !region.isRegionRecreated() { return nil, errRegionIsStale(region.GetMeta(), item.GetMeta()) } } @@ -445,7 +441,7 @@ func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, erro // TiKV reports term after v3.0 isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm() // Region meta is stale, return an error. - if (isTermBehind || r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer()) && !isRegionRecreated(region) { + if (isTermBehind || r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer()) && !region.isRegionRecreated() { return origin, errRegionIsStale(region.GetMeta(), origin.GetMeta()) } @@ -474,7 +470,7 @@ func (bc *BasicCluster) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionIn } for _, item := range overlaps { // PD ignores stale regions' heartbeats, unless it is recreated recently by unsafe recover operation. - if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() && !isRegionRecreated(region) { + if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() && !region.isRegionRecreated() { return nil, errRegionIsStale(region.GetMeta(), item.GetMeta()) } } @@ -488,7 +484,7 @@ func (bc *BasicCluster) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionIn // TiKV reports term after v3.0 isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm() // Region meta is stale, return an error. - if (isTermBehind || r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer()) && !isRegionRecreated(region) { + if (isTermBehind || r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer()) && !region.isRegionRecreated() { return nil, errRegionIsStale(region.GetMeta(), origin.GetMeta()) } diff --git a/server/core/region.go b/server/core/region.go index 94fc22fd57d..d041ede2cb9 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -573,6 +573,20 @@ func (r *RegionInfo) IsFromHeartbeat() bool { return r.fromHeartbeat } +func (r *RegionInfo) isInvolved(startKey, endKey []byte) bool { + return bytes.Compare(r.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(r.GetEndKey()) > 0 && bytes.Compare(r.GetEndKey(), endKey) <= 0)) +} + +func (r *RegionInfo) isRegionRecreated() bool { + // Regions recreated by online unsafe recover have both ver and conf ver equal to 1. To + // prevent stale bootstrap region (first region in a cluster which covers the entire key + // range) from reporting stale info, we exclude regions that covers the entire key range + // here. Technically, it is possible for unsafe recover to recreate such region, but that + // means the entire key range is unavailable, and we don't expect unsafe recover to perform + // better than recreating the cluster. + return r.GetRegionEpoch().GetVersion() == 1 && r.GetRegionEpoch().GetConfVer() == 1 && (len(r.GetStartKey()) != 0 || len(r.GetEndKey()) != 0) +} + // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) @@ -1289,20 +1303,6 @@ func DiffRegionKeyInfo(origin *RegionInfo, other *RegionInfo) string { return strings.Join(ret, ", ") } -func isInvolved(region *RegionInfo, startKey, endKey []byte) bool { - return bytes.Compare(region.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(region.GetEndKey()) > 0 && bytes.Compare(region.GetEndKey(), endKey) <= 0)) -} - -func isRegionRecreated(region *RegionInfo) bool { - // Regions recreated by online unsafe recover have both ver and conf ver equal to 1. To - // prevent stale bootstrap region (first region in a cluster which covers the entire key - // range) from reporting stale info, we exclude regions that covers the entire key range - // here. Technically, it is possible for unsafe recover to recreate such region, but that - // means the entire key range is unavailable, and we don't expect unsafe recover to perform - // better than recreating the cluster. - return region.GetRegionEpoch().GetVersion() == 1 && region.GetRegionEpoch().GetConfVer() == 1 && (len(region.GetStartKey()) != 0 || len(region.GetEndKey()) != 0) -} - // String converts slice of bytes to string without copy. func String(b []byte) (s string) { if len(b) == 0 { diff --git a/server/core/region_tree.go b/server/core/region_tree.go index aa32b8a1043..a5d717a93e9 100644 --- a/server/core/region_tree.go +++ b/server/core/region_tree.go @@ -273,7 +273,7 @@ func (t *regionTree) RandomRegion(ranges []KeyRange) *RegionInfo { } index := rand.Intn(endIndex-startIndex) + startIndex region := t.tree.GetAt(index).(*regionItem).region - if isInvolved(region, startKey, endKey) { + if region.isInvolved(startKey, endKey) { return region } } From 29c9e5bc4b3a19892e3a8f8bf50e135f4bc142b2 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 21 Oct 2022 14:43:12 +0800 Subject: [PATCH 6/6] address comments Signed-off-by: Ryan Leung --- server/core/basic_cluster.go | 49 +++++++++++++----------------------- 1 file changed, 17 insertions(+), 32 deletions(-) diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index 8b7b9acf9dd..997434b649f 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -384,17 +384,18 @@ func (bc *BasicCluster) getWriteRate( f func(storeID uint64) (bytesRate, keysRate float64), ) (storeIDs []uint64, bytesRates, keysRates []float64) { bc.Stores.mu.RLock() - defer bc.Stores.mu.RUnlock() count := len(bc.Stores.stores) storeIDs = make([]uint64, 0, count) + for _, store := range bc.Stores.stores { + storeIDs = append(storeIDs, store.GetID()) + } + bc.Stores.mu.RUnlock() bytesRates = make([]float64, 0, count) keysRates = make([]float64, 0, count) - for _, store := range bc.Stores.stores { - id := store.GetID() + for _, id := range storeIDs { bc.Regions.mu.RLock() bytesRate, keysRate := f(id) bc.Regions.mu.RUnlock() - storeIDs = append(storeIDs, id) bytesRates = append(bytesRates, bytesRate) keysRates = append(keysRates, keysRate) } @@ -411,9 +412,7 @@ func (bc *BasicCluster) GetStoresWriteRate() (storeIDs []uint64, bytesRates, key return bc.getWriteRate(bc.Regions.GetStoreWriteRate) } -func (bc *BasicCluster) getRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) { - bc.Regions.mu.RLock() - defer bc.Regions.mu.RUnlock() +func (bc *BasicCluster) getRelevantRegionsLocked(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) { origin = bc.Regions.GetRegion(region.GetID()) if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { overlaps = bc.Regions.GetOverlaps(region) @@ -425,7 +424,13 @@ func (bc *BasicCluster) getRelevantRegions(region *RegionInfo) (origin *RegionIn // PreCheckPutRegion checks if the region is valid to put. func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) { - origin, overlaps := bc.getRelevantRegions(region) + bc.Regions.mu.RLock() + origin, overlaps := bc.getRelevantRegionsLocked(region) + bc.Regions.mu.RUnlock() + return bc.check(region, origin, overlaps) +} + +func (bc *BasicCluster) check(region, origin *RegionInfo, overlaps []*RegionInfo) (*RegionInfo, error) { for _, item := range overlaps { // PD ignores stale regions' heartbeats, unless it is recreated recently by unsafe recover operation. if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() && !region.isRegionRecreated() { @@ -463,31 +468,11 @@ func (bc *BasicCluster) CheckAndPutRegion(region *RegionInfo) []*RegionInfo { func (bc *BasicCluster) AtomicCheckAndPutRegion(region *RegionInfo) ([]*RegionInfo, error) { bc.Regions.mu.Lock() defer bc.Regions.mu.Unlock() - var overlaps []*RegionInfo - origin := bc.Regions.GetRegion(region.GetID()) - if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { - overlaps = bc.Regions.GetOverlaps(region) - } - for _, item := range overlaps { - // PD ignores stale regions' heartbeats, unless it is recreated recently by unsafe recover operation. - if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() && !region.isRegionRecreated() { - return nil, errRegionIsStale(region.GetMeta(), item.GetMeta()) - } - } - - if origin == nil { - return bc.Regions.SetRegion(region), nil - } - - r := region.GetRegionEpoch() - o := origin.GetRegionEpoch() - // TiKV reports term after v3.0 - isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm() - // Region meta is stale, return an error. - if (isTermBehind || r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer()) && !region.isRegionRecreated() { - return nil, errRegionIsStale(region.GetMeta(), origin.GetMeta()) + origin, overlaps := bc.getRelevantRegionsLocked(region) + _, err := bc.check(region, origin, overlaps) + if err != nil { + return nil, err } - return bc.Regions.SetRegion(region), nil }