Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#5587
Browse files Browse the repository at this point in the history
ref tikv#5586

Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Nov 22, 2022
1 parent 24dcebb commit f02f21f
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 249 deletions.
29 changes: 8 additions & 21 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,17 +806,15 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
})

var overlaps []*core.RegionInfo
c.Lock()
if saveCache {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
if _, err := c.core.PreCheckPutRegion(region); err != nil {
c.Unlock()
if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil {
return err
}
overlaps = c.core.PutRegion(region)

for _, item := range overlaps {
if c.regionStats != nil {
c.regionStats.ClearDefunctRegion(item.GetID())
Expand All @@ -835,21 +833,19 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
}
for key := range storeMap {
c.updateStoreStatusLocked(key)
c.core.UpdateStoreStatus(key)
}
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if !c.IsPrepared() && isNew {
c.coordinator.prepareChecker.collect(region)
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if c.regionStats != nil {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}

changedRegions := c.changedRegions
c.Unlock()
if !c.IsPrepared() && isNew {
c.coordinator.prepareChecker.collect(region)
}

if c.storage != nil {
// If there are concurrent heartbeats from the same region, the last write will win even if
Expand Down Expand Up @@ -877,23 +873,14 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {

if saveKV || needSync {
select {
case changedRegions <- region:
case c.changedRegions <- region:
default:
}
}

return nil
}

func (c *RaftCluster) updateStoreStatusLocked(id uint64) {
leaderCount := c.core.GetStoreLeaderCount(id)
regionCount := c.core.GetStoreRegionCount(id)
pendingPeerCount := c.core.GetStorePendingPeerCount(id)
leaderRegionSize := c.core.GetStoreLeaderRegionSize(id)
regionSize := c.core.GetStoreRegionSize(id)
c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize)
}

func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error {
if c.storage != nil {
if err := c.storage.SaveMeta(meta); err != nil {
Expand Down
36 changes: 18 additions & 18 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,25 +643,25 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
for i, region := range regions {
// region does not exist.
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(c, cluster.storage, regions[:i+1])

// region is the same, not updated.
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(c, cluster.storage, regions[:i+1])
origin := region
// region is updated.
region = origin.Clone(core.WithIncVersion())
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(c, cluster.storage, regions[:i+1])

// region is stale (Version).
stale := origin.Clone(core.WithIncConfVer())
c.Assert(cluster.processRegionHeartbeat(stale), NotNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(c, cluster.storage, regions[:i+1])

// region is updated.
Expand All @@ -671,13 +671,13 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
)
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(c, cluster.storage, regions[:i+1])

// region is stale (ConfVer).
stale = origin.Clone(core.WithIncConfVer())
c.Assert(cluster.processRegionHeartbeat(stale), NotNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(c, cluster.storage, regions[:i+1])

// Add a down peer.
Expand All @@ -689,69 +689,69 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
}))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Add a pending peer.
region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetPeers()[rand.Intn(len(region.GetPeers()))]}))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Clear down peers.
region = region.Clone(core.WithDownPeers(nil))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Clear pending peers.
region = region.Clone(core.WithPendingPeers(nil))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Remove peers.
origin = region
region = origin.Clone(core.SetPeers(region.GetPeers()[:1]))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(c, cluster.storage, regions[:i+1])
// Add peers.
region = origin
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(c, cluster.storage, regions[:i+1])

// Change leader.
region = region.Clone(core.WithLeader(region.GetPeers()[1]))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Change ApproximateSize.
region = region.Clone(core.SetApproximateSize(144))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Change ApproximateKeys.
region = region.Clone(core.SetApproximateKeys(144000))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Change bytes written.
region = region.Clone(core.SetWrittenBytes(24000))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Change bytes read.
region = region.Clone(core.SetReadBytes(1080000))
regions[i] = region
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegions(c, cluster.core.Regions.RegionsInfo, regions[:i+1])
}

regionCounts := make(map[uint64]int)
Expand Down Expand Up @@ -1538,7 +1538,7 @@ func (s *testRegionsInfoSuite) Test(c *C) {
_, opts, err := newTestScheduleConfig()
c.Assert(err, IsNil)
tc := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opts, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cache := tc.core.Regions
cache := tc.core.Regions.RegionsInfo

for i := uint64(0); i < n; i++ {
region := regions[i]
Expand Down
Loading

0 comments on commit f02f21f

Please sign in to comment.