Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: use independent lock #5587

Merged
merged 7 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 8 additions & 22 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,17 +806,15 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
})

var overlaps []*core.RegionInfo
c.Lock()
if saveCache {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
if _, err := c.core.PreCheckPutRegion(region); err != nil {
c.Unlock()
if overlaps, err = c.core.AtomicCheckAndPutRegion(region); err != nil {
return err
}
overlaps = c.core.PutRegion(region)

for _, item := range overlaps {
if c.regionStats != nil {
c.regionStats.ClearDefunctRegion(item.GetID())
Expand All @@ -835,21 +833,19 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
}
for key := range storeMap {
c.updateStoreStatusLocked(key)
c.core.UpdateStoreStatus(key)
}
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if !c.IsPrepared() && isNew {
c.coordinator.prepareChecker.collect(region)
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if c.regionStats != nil {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}

changedRegions := c.changedRegions
c.Unlock()
if !c.IsPrepared() && isNew {
c.coordinator.prepareChecker.collect(region)
}

if c.storage != nil {
// If there are concurrent heartbeats from the same region, the last write will win even if
Expand Down Expand Up @@ -877,24 +873,14 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {

if saveKV || needSync {
select {
case changedRegions <- region:
case c.changedRegions <- region:
default:
}
}

return nil
}

func (c *RaftCluster) updateStoreStatusLocked(id uint64) {
leaderCount := c.core.GetStoreLeaderCount(id)
regionCount := c.core.GetStoreRegionCount(id)
witnessCount := c.core.GetStoreWitnessCount(id)
pendingPeerCount := c.core.GetStorePendingPeerCount(id)
leaderRegionSize := c.core.GetStoreLeaderRegionSize(id)
regionSize := c.core.GetStoreRegionSize(id)
c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize, witnessCount)
}

func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error {
if c.storage != nil {
if err := c.storage.SaveMeta(meta); err != nil {
Expand Down
48 changes: 24 additions & 24 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestStoreHeartbeat(t *testing.T) {
for _, region := range regions {
re.NoError(cluster.putRegion(region))
}
re.Equal(int(n), cluster.core.Regions.GetRegionCount())
re.Equal(int(n), cluster.core.Regions.RegionsInfo.GetRegionCount())

for i, store := range stores {
storeStats := &pdpb.StoreStats{
Expand Down Expand Up @@ -700,25 +700,25 @@ func TestRegionHeartbeat(t *testing.T) {
for i, region := range regions {
// region does not exist.
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])

// region is the same, not updated.
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])
origin := region
// region is updated.
region = origin.Clone(core.WithIncVersion())
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])

// region is stale (Version).
stale := origin.Clone(core.WithIncConfVer())
re.Error(cluster.processRegionHeartbeat(stale))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])

// region is updated
Expand All @@ -728,13 +728,13 @@ func TestRegionHeartbeat(t *testing.T) {
)
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])

// region is stale (ConfVer).
stale = origin.Clone(core.WithIncConfVer())
re.Error(cluster.processRegionHeartbeat(stale))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])

// Add a down peer.
Expand All @@ -746,38 +746,38 @@ func TestRegionHeartbeat(t *testing.T) {
}))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Add a pending peer.
region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetPeers()[rand.Intn(len(region.GetPeers()))]}))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Clear down peers.
region = region.Clone(core.WithDownPeers(nil))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Clear pending peers.
region = region.Clone(core.WithPendingPeers(nil))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Remove peers.
origin = region
region = origin.Clone(core.SetPeers(region.GetPeers()[:1]))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])
// Add peers.
region = origin
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
checkRegionsKV(re, cluster.storage, regions[:i+1])

// Change one peer to witness
Expand All @@ -787,37 +787,37 @@ func TestRegionHeartbeat(t *testing.T) {
)
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Change leader.
region = region.Clone(core.WithLeader(region.GetPeers()[1]))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Change ApproximateSize.
region = region.Clone(core.SetApproximateSize(144))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Change ApproximateKeys.
region = region.Clone(core.SetApproximateKeys(144000))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Change bytes written.
region = region.Clone(core.SetWrittenBytes(24000))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])

// Change bytes read.
region = region.Clone(core.SetReadBytes(1080000))
regions[i] = region
re.NoError(cluster.processRegionHeartbeat(region))
checkRegions(re, cluster.core.Regions, regions[:i+1])
checkRegions(re, cluster.core.Regions.RegionsInfo, regions[:i+1])
}

regionCounts := make(map[uint64]int)
Expand Down Expand Up @@ -848,10 +848,10 @@ func TestRegionHeartbeat(t *testing.T) {
}

for _, store := range cluster.core.Stores.GetStores() {
re.Equal(cluster.core.Regions.GetStoreLeaderCount(store.GetID()), store.GetLeaderCount())
re.Equal(cluster.core.Regions.GetStoreRegionCount(store.GetID()), store.GetRegionCount())
re.Equal(cluster.core.Regions.GetStoreLeaderRegionSize(store.GetID()), store.GetLeaderSize())
re.Equal(cluster.core.Regions.GetStoreRegionSize(store.GetID()), store.GetRegionSize())
re.Equal(cluster.core.Regions.RegionsInfo.GetStoreLeaderCount(store.GetID()), store.GetLeaderCount())
re.Equal(cluster.core.Regions.RegionsInfo.GetStoreRegionCount(store.GetID()), store.GetRegionCount())
re.Equal(cluster.core.Regions.RegionsInfo.GetStoreLeaderRegionSize(store.GetID()), store.GetLeaderSize())
re.Equal(cluster.core.Regions.RegionsInfo.GetStoreRegionSize(store.GetID()), store.GetRegionSize())
}

// Test with storage.
Expand Down Expand Up @@ -1648,7 +1648,7 @@ func Test(t *testing.T) {
_, opts, err := newTestScheduleConfig()
re.NoError(err)
tc := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opts, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cache := tc.core.Regions
cache := tc.core.Regions.RegionsInfo

for i := uint64(0); i < n; i++ {
region := regions[i]
Expand Down
Loading