Skip to content

Commit

Permalink
*: use independent lock (#5587)
Browse files Browse the repository at this point in the history
ref #5586

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
rleungx and ti-chi-bot authored Oct 25, 2022
1 parent f4fa6f4 commit 2b51932
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 256 deletions.
30 changes: 8 additions & 22 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,17 +806,15 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
})

var overlaps []*core.RegionInfo
c.Lock()
if saveCache {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
if _, err := c.core.PreCheckPutRegion(region); err != nil {
c.Unlock()
if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil {
return err
}
overlaps = c.core.PutRegion(region)

for _, item := range overlaps {
if c.regionStats != nil {
c.regionStats.ClearDefunctRegion(item.GetID())
Expand All @@ -835,21 +833,19 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
}
for key := range storeMap {
c.updateStoreStatusLocked(key)
c.core.UpdateStoreStatus(key)
}
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if !c.IsPrepared() && isNew {
c.coordinator.prepareChecker.collect(region)
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if c.regionStats != nil {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}

changedRegions := c.changedRegions
c.Unlock()
if !c.IsPrepared() && isNew {
c.coordinator.prepareChecker.collect(region)
}

if c.storage != nil {
// If there are concurrent heartbeats from the same region, the last write will win even if
Expand Down Expand Up @@ -877,24 +873,14 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {

if saveKV || needSync {
select {
case changedRegions <- region:
case c.changedRegions <- region:
default:
}
}

return nil
}

func (c *RaftCluster) updateStoreStatusLocked(id uint64) {
leaderCount := c.core.GetStoreLeaderCount(id)
regionCount := c.core.GetStoreRegionCount(id)
witnessCount := c.core.GetStoreWitnessCount(id)
pendingPeerCount := c.core.GetStorePendingPeerCount(id)
leaderRegionSize := c.core.GetStoreLeaderRegionSize(id)
regionSize := c.core.GetStoreRegionSize(id)
c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize, witnessCount)
}

func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error {
if c.storage != nil {
if err := c.storage.SaveMeta(meta); err != nil {
Expand Down
48 changes: 24 additions & 24 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestStoreHeartbeat(t *testing.T) {
for _, region := range regions {
re.NoError(cluster.putRegion(region))
}
re.Equal(int(n), cluster.core.Regions.GetRegionCount())
re.Equal(int(n), cluster.core.Regions.RegionsInfo.GetRegionCount())

for i, store := range stores {
storeStats := &pdpb.StoreStats{
Expand Down Expand Up @@ -700,25 +700,25 @@ func TestRegionHeartbeat(t *testing.T) {
for i, region := range regions {
// region does not exist.
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])

// region is the same, not updated.
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])
origin := region
// region is updated.
region = origin.Clone(core.WithIncVersion())
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])

// region is stale (Version).
stale := origin.Clone(core.WithIncConfVer())
re.Error(cluster.processRegionHeartbeat(stale))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])

// region is updated
Expand All @@ -728,13 +728,13 @@ func TestRegionHeartbeat(t *testing.T) {
)
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])

// region is stale (ConfVer).
stale = origin.Clone(core.WithIncConfVer())
re.Error(cluster.processRegionHeartbeat(stale))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])

// Add a down peer.
Expand All @@ -746,38 +746,38 @@ func TestRegionHeartbeat(t *testing.T) {
}))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Add a pending peer.
region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetPeers()[rand.Intn(len(region.GetPeers()))]}))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Clear down peers.
region = region.Clone(core.WithDownPeers(nil))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Clear pending peers.
region = region.Clone(core.WithPendingPeers(nil))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Remove peers.
origin = region
region = origin.Clone(core.SetPeers(region.GetPeers()[:1]))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])
// Add peers.
region = origin
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])

// Change one peer to witness
Expand All @@ -787,37 +787,37 @@ func TestRegionHeartbeat(t *testing.T) {
)
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Change leader.
region = region.Clone(core.WithLeader(region.GetPeers()[1]))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Change ApproximateSize.
region = region.Clone(core.SetApproximateSize(144))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Change ApproximateKeys.
region = region.Clone(core.SetApproximateKeys(144000))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Change bytes written.
region = region.Clone(core.SetWrittenBytes(24000))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Change bytes read.
region = region.Clone(core.SetReadBytes(1080000))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
}

regionCounts := make(map[uint64]int)
Expand Down Expand Up @@ -848,10 +848,10 @@ func TestRegionHeartbeat(t *testing.T) {
}

for _, store := range cluster.core.Stores.GetStores() {
re.Equal(cluster.core.Regions.GetStoreLeaderCount(store.GetID()), store.GetLeaderCount())
re.Equal(cluster.core.Regions.GetStoreRegionCount(store.GetID()), store.GetRegionCount())
re.Equal(cluster.core.Regions.GetStoreLeaderRegionSize(store.GetID()), store.GetLeaderSize())
re.Equal(cluster.core.Regions.GetStoreRegionSize(store.GetID()), store.GetRegionSize())
re.Equal(cluster.core.Regions.RegionsInfo.GetStoreLeaderCount(store.GetID()), store.GetLeaderCount())
re.Equal(cluster.core.Regions.RegionsInfo.GetStoreRegionCount(store.GetID()), store.GetRegionCount())
re.Equal(cluster.core.Regions.RegionsInfo.GetStoreLeaderRegionSize(store.GetID()), store.GetLeaderSize())
re.Equal(cluster.core.Regions.RegionsInfo.GetStoreRegionSize(store.GetID()), store.GetRegionSize())
}

// Test with storage.
Expand Down Expand Up @@ -1648,7 +1648,7 @@ func Test(t *testing.T) {
_, opts, err := newTestScheduleConfig()
re.NoError(err)
tc := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opts, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cache := tc.core.Regions
cache := tc.core.Regions.RegionsInfo

for i := uint64(0); i < n; i++ {
region := regions[i]
Expand Down
Loading

0 comments on commit 2b51932

Please sign in to comment.