Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#5587
Browse files Browse the repository at this point in the history
ref tikv#5586

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
rleungx committed Nov 22, 2022
1 parent 24dcebb commit 3bae6b0
Show file tree
Hide file tree
Showing 5 changed files with 423 additions and 208 deletions.
23 changes: 11 additions & 12 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,17 +806,15 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
})

var overlaps []*core.RegionInfo
c.Lock()
if saveCache {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
if _, err := c.core.PreCheckPutRegion(region); err != nil {
c.Unlock()
if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil {
return err
}
overlaps = c.core.PutRegion(region)

for _, item := range overlaps {
if c.regionStats != nil {
c.regionStats.ClearDefunctRegion(item.GetID())
Expand All @@ -835,21 +833,19 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
}
for key := range storeMap {
c.updateStoreStatusLocked(key)
c.core.UpdateStoreStatus(key)
}
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if !c.IsPrepared() && isNew {
c.coordinator.prepareChecker.collect(region)
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if c.regionStats != nil {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}

changedRegions := c.changedRegions
c.Unlock()
if !c.IsPrepared() && isNew {
c.coordinator.prepareChecker.collect(region)
}

if c.storage != nil {
// If there are concurrent heartbeats from the same region, the last write will win even if
Expand Down Expand Up @@ -877,14 +873,15 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {

if saveKV || needSync {
select {
case changedRegions <- region:
case c.changedRegions <- region:
default:
}
}

return nil
}

<<<<<<< HEAD
func (c *RaftCluster) updateStoreStatusLocked(id uint64) {
leaderCount := c.core.GetStoreLeaderCount(id)
regionCount := c.core.GetStoreRegionCount(id)
Expand All @@ -894,6 +891,8 @@ func (c *RaftCluster) updateStoreStatusLocked(id uint64) {
c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize)
}

=======
>>>>>>> 2b519327b (*: use independent lock (#5587))
func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error {
if c.storage != nil {
if err := c.storage.SaveMeta(meta); err != nil {
Expand Down
118 changes: 118 additions & 0 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) {
for _, region := range regions {
c.Assert(cluster.putRegion(region), IsNil)
}
<<<<<<< HEAD
c.Assert(cluster.core.Regions.GetRegionCount(), Equals, int(n))
=======
re.Equal(int(n), cluster.core.Regions.RegionsInfo.GetRegionCount())
>>>>>>> 2b519327b (*: use independent lock (#5587))

for i, store := range stores {
storeStats := &pdpb.StoreStats{
Expand Down Expand Up @@ -642,6 +646,7 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {

for i, region := range regions {
// region does not exist.
<<<<<<< HEAD
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegionsKV(c, cluster.storage, regions[:i+1])
Expand All @@ -650,10 +655,21 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegionsKV(c, cluster.storage, regions[:i+1])
=======
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])

// region is the same, not updated.
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])
>>>>>>> 2b519327b (*: use independent lock (#5587))
origin := region
// region is updated.
region = origin.Clone(core.WithIncVersion())
regions[i] = region
<<<<<<< HEAD
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegionsKV(c, cluster.storage, regions[:i+1])
Expand All @@ -663,13 +679,25 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
c.Assert(cluster.processRegionHeartbeat(stale), NotNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegionsKV(c, cluster.storage, regions[:i+1])
=======
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])

// region is stale (Version).
stale := origin.Clone(core.WithIncConfVer())
re.Error(cluster.processRegionHeartbeat(stale))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])
>>>>>>> 2b519327b (*: use independent lock (#5587))

// region is updated.
region = origin.Clone(
core.WithIncVersion(),
core.WithIncConfVer(),
)
regions[i] = region
<<<<<<< HEAD
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegionsKV(c, cluster.storage, regions[:i+1])
Expand All @@ -679,6 +707,17 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
c.Assert(cluster.processRegionHeartbeat(stale), NotNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegionsKV(c, cluster.storage, regions[:i+1])
=======
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])

// region is stale (ConfVer).
stale = origin.Clone(core.WithIncConfVer())
re.Error(cluster.processRegionHeartbeat(stale))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])
>>>>>>> 2b519327b (*: use independent lock (#5587))

// Add a down peer.
region = region.Clone(core.WithDownPeers([]*pdpb.PeerStats{
Expand All @@ -688,31 +727,52 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
},
}))
regions[i] = region
<<<<<<< HEAD
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
=======
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
>>>>>>> 2b519327b (*: use independent lock (#5587))

// Add a pending peer.
region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetPeers()[rand.Intn(len(region.GetPeers()))]}))
regions[i] = region
<<<<<<< HEAD
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
=======
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
>>>>>>> 2b519327b (*: use independent lock (#5587))

// Clear down peers.
region = region.Clone(core.WithDownPeers(nil))
regions[i] = region
<<<<<<< HEAD
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
=======
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
>>>>>>> 2b519327b (*: use independent lock (#5587))

// Clear pending peers.
region = region.Clone(core.WithPendingPeers(nil))
regions[i] = region
<<<<<<< HEAD
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
=======
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
>>>>>>> 2b519327b (*: use independent lock (#5587))

// Remove peers.
origin = region
region = origin.Clone(core.SetPeers(region.GetPeers()[:1]))
regions[i] = region
<<<<<<< HEAD
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegionsKV(c, cluster.storage, regions[:i+1])
Expand All @@ -722,36 +782,81 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegionsKV(c, cluster.storage, regions[:i+1])
=======
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])
// Add peers.
region = origin
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])

// Change one peer to witness
region = region.Clone(
core.WithWitnesses([]*metapb.Peer{region.GetPeers()[rand.Intn(len(region.GetPeers()))]}),
core.WithIncConfVer(),
)
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
>>>>>>> 2b519327b (*: use independent lock (#5587))

// Change leader.
region = region.Clone(core.WithLeader(region.GetPeers()[1]))
regions[i] = region
<<<<<<< HEAD
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
=======
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
>>>>>>> 2b519327b (*: use independent lock (#5587))

// Change ApproximateSize.
region = region.Clone(core.SetApproximateSize(144))
regions[i] = region
<<<<<<< HEAD
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
=======
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
>>>>>>> 2b519327b (*: use independent lock (#5587))

// Change ApproximateKeys.
region = region.Clone(core.SetApproximateKeys(144000))
regions[i] = region
<<<<<<< HEAD
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
=======
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
>>>>>>> 2b519327b (*: use independent lock (#5587))

// Change bytes written.
region = region.Clone(core.SetWrittenBytes(24000))
regions[i] = region
<<<<<<< HEAD
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
=======
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
>>>>>>> 2b519327b (*: use independent lock (#5587))

// Change bytes read.
region = region.Clone(core.SetReadBytes(1080000))
regions[i] = region
<<<<<<< HEAD
c.Assert(cluster.processRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
=======
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
>>>>>>> 2b519327b (*: use independent lock (#5587))
}

regionCounts := make(map[uint64]int)
Expand Down Expand Up @@ -782,10 +887,17 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
}

for _, store := range cluster.core.Stores.GetStores() {
<<<<<<< HEAD
c.Assert(store.GetLeaderCount(), Equals, cluster.core.Regions.GetStoreLeaderCount(store.GetID()))
c.Assert(store.GetRegionCount(), Equals, cluster.core.Regions.GetStoreRegionCount(store.GetID()))
c.Assert(store.GetLeaderSize(), Equals, cluster.core.Regions.GetStoreLeaderRegionSize(store.GetID()))
c.Assert(store.GetRegionSize(), Equals, cluster.core.Regions.GetStoreRegionSize(store.GetID()))
=======
re.Equal(cluster.core.Regions.RegionsInfo.GetStoreLeaderCount(store.GetID()), store.GetLeaderCount())
re.Equal(cluster.core.Regions.RegionsInfo.GetStoreRegionCount(store.GetID()), store.GetRegionCount())
re.Equal(cluster.core.Regions.RegionsInfo.GetStoreLeaderRegionSize(store.GetID()), store.GetLeaderSize())
re.Equal(cluster.core.Regions.RegionsInfo.GetStoreRegionSize(store.GetID()), store.GetRegionSize())
>>>>>>> 2b519327b (*: use independent lock (#5587))
}

// Test with storage.
Expand Down Expand Up @@ -1536,9 +1648,15 @@ func (s *testRegionsInfoSuite) Test(c *C) {
n, np := uint64(10), uint64(3)
regions := newTestRegions(n, n, np)
_, opts, err := newTestScheduleConfig()
<<<<<<< HEAD
c.Assert(err, IsNil)
tc := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opts, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cache := tc.core.Regions
=======
re.NoError(err)
tc := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opts, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cache := tc.core.Regions.RegionsInfo
>>>>>>> 2b519327b (*: use independent lock (#5587))

for i := uint64(0); i < n; i++ {
region := regions[i]
Expand Down
Loading

0 comments on commit 3bae6b0

Please sign in to comment.