Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scatter: make peer scatter logic same with the leader #6965

Merged
merged 10 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 39 additions & 67 deletions pkg/schedule/scatter/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"context"
"fmt"
"math"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -106,26 +107,6 @@
return s.getDistributionByGroupLocked(group)
}

// TotalCountByStore counts the total count by store
func (s *selectedStores) TotalCountByStore(storeID uint64) uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
groups := s.groupDistribution.GetAllID()
totalCount := uint64(0)
for _, group := range groups {
storeDistribution, ok := s.getDistributionByGroupLocked(group)
if !ok {
continue
}
count, ok := storeDistribution[storeID]
if !ok {
continue
}
totalCount += count
}
return totalCount
}

// getDistributionByGroupLocked should be called with lock
func (s *selectedStores) getDistributionByGroupLocked(group string) (map[uint64]uint64, bool) {
if result, ok := s.groupDistribution.Get(group); ok {
Expand Down Expand Up @@ -332,6 +313,11 @@
selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // selected StoreID set
leaderCandidateStores := make([]uint64, 0, len(region.GetPeers())) // StoreID allowed to become Leader
scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer
filterLen := len(context.filterFuncs) + 2
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
filters := make([]filter.Filter, filterLen)
for i, filterFunc := range context.filterFuncs {
filters[i] = filterFunc()
}
for _, peer := range peers {
if _, ok := selectedStores[peer.GetStoreId()]; ok {
if allowLeader(oldFit, peer) {
Expand All @@ -340,9 +326,15 @@
// It is both sourcePeer and targetPeer itself, no need to select.
continue
}
sourceStore := r.cluster.GetStore(peer.GetStoreId())
if sourceStore == nil {
log.Error("failed to get the store", zap.Uint64("store-id", peer.GetStoreId()), errs.ZapError(errs.ErrGetSourceStore))
continue

Check warning on line 332 in pkg/schedule/scatter/region_scatterer.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/scatter/region_scatterer.go#L331-L332

Added lines #L331 - L332 were not covered by tests
}
filters[filterLen-1] = filter.NewPlacementSafeguard(r.name, r.cluster.GetSharedConfig(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit)
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
for {
candidates := r.selectCandidates(region, oldFit, peer.GetStoreId(), selectedStores, context)
newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context)
filters[filterLen-2] = filter.NewExcludedFilter(r.name, nil, selectedStores)
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
newPeer := r.selectNewPeer(context, group, peer, filters)
targetPeers[newPeer.GetStoreId()] = newPeer
selectedStores[newPeer.GetStoreId()] = struct{}{}
// If the selected peer is a peer other than origin peer in this region,
Expand All @@ -363,7 +355,7 @@
// FIXME: target leader only considers the ordinary stores, maybe we need to consider the
// special engine stores if the engine supports to become a leader. But now there is only
// one engine, tiflash, which does not support the leader, so don't consider it for now.
targetLeader := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine)
targetLeader, leaderHit := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine)
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
if targetLeader == 0 {
scatterSkipNoLeaderCounter.Inc()
return nil
Expand Down Expand Up @@ -398,6 +390,8 @@
if op != nil {
scatterSuccessCounter.Inc()
r.Put(targetPeers, targetLeader, group)
op.AdditionalInfos["group"] = group
op.AdditionalInfos["leader-hit"] = strconv.FormatUint(leaderHit, 10)
op.SetPriorityLevel(constant.High)
}
return op
Expand Down Expand Up @@ -432,69 +426,46 @@
return region.GetLeader().GetStoreId() == targetLeader
}

func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, oldFit *placement.RegionFit, sourceStoreID uint64, selectedStores map[uint64]struct{}, context engineContext) []uint64 {
sourceStore := r.cluster.GetStore(sourceStoreID)
if sourceStore == nil {
log.Error("failed to get the store", zap.Uint64("store-id", sourceStoreID), errs.ZapError(errs.ErrGetSourceStore))
return nil
}
filters := []filter.Filter{
filter.NewExcludedFilter(r.name, nil, selectedStores),
}
scoreGuard := filter.NewPlacementSafeguard(r.name, r.cluster.GetSharedConfig(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit)
for _, filterFunc := range context.filterFuncs {
filters = append(filters, filterFunc())
}
filters = append(filters, scoreGuard)
func (r *RegionScatterer) selectNewPeer(context engineContext, group string, peer *metapb.Peer, filters []filter.Filter) *metapb.Peer {
stores := r.cluster.GetStores()
candidates := make([]uint64, 0)
maxStoreTotalCount := uint64(0)
minStoreTotalCount := uint64(math.MaxUint64)
for _, store := range stores {
count := context.selectedPeer.TotalCountByStore(store.GetID())
count := context.selectedPeer.Get(store.GetID(), group)
if count > maxStoreTotalCount {
maxStoreTotalCount = count
}
if count < minStoreTotalCount {
minStoreTotalCount = count
}
}

var newPeer *metapb.Peer
minCount := uint64(math.MaxUint64)
sourceHit := uint64(math.MaxUint64)
for _, store := range stores {
storeCount := context.selectedPeer.TotalCountByStore(store.GetID())
storeCount := context.selectedPeer.Get(store.GetID(), group)
if store.GetID() == peer.GetId() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if store.GetID() == peer.GetStoreId() {

sourceHit = storeCount
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
}
// If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate.
// If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store
// could be selected as candidate.
if storeCount < maxStoreTotalCount || maxStoreTotalCount == minStoreTotalCount {
if filter.Target(r.cluster.GetSharedConfig(), store, filters) {
candidates = append(candidates, store.GetID())
if storeCount < minCount {
minCount = storeCount
newPeer = &metapb.Peer{
StoreId: store.GetID(),
Role: peer.GetRole(),
}
}
}
}
}
return candidates
}

func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceStoreID uint64, candidates []uint64, context engineContext) *metapb.Peer {
if len(candidates) < 1 {
if sourceHit <= minCount {
return peer
}
var newPeer *metapb.Peer
minCount := uint64(math.MaxUint64)
for _, storeID := range candidates {
count := context.selectedPeer.Get(storeID, group)
if count < minCount {
minCount = count
newPeer = &metapb.Peer{
StoreId: storeID,
Role: peer.GetRole(),
}
}
}
// if the source store have the least count, we don't need to scatter this peer
for _, storeID := range candidates {
if storeID == sourceStoreID && context.selectedPeer.Get(sourceStoreID, group) <= minCount {
return peer
}
}
if newPeer == nil {
return peer
}
Expand All @@ -503,11 +474,12 @@

// selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by
// the existed peers store depended on the leader counts in the group level. Please use this func before scatter spacial engines.
func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, leaderCandidateStores []uint64, context engineContext) uint64 {
func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo,
leaderCandidateStores []uint64, context engineContext) (leaderID uint64, hit uint64) {
sourceStore := r.cluster.GetStore(region.GetLeader().GetStoreId())
if sourceStore == nil {
log.Error("failed to get the store", zap.Uint64("store-id", region.GetLeader().GetStoreId()), errs.ZapError(errs.ErrGetSourceStore))
return 0
return 0, 0

Check warning on line 482 in pkg/schedule/scatter/region_scatterer.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/scatter/region_scatterer.go#L482

Added line #L482 was not covered by tests
}
minStoreGroupLeader := uint64(math.MaxUint64)
id := uint64(0)
Expand All @@ -522,7 +494,7 @@
id = storeID
}
}
return id
return id, minStoreGroupLeader
}

// Put put the final distribution in the context no matter the operator was created
Expand Down
51 changes: 8 additions & 43 deletions pkg/schedule/scatter/region_scatterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"math"
"math/rand"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -533,48 +532,11 @@ func TestSelectedStoreGC(t *testing.T) {
re.False(ok)
}

// TestRegionFromDifferentGroups test the multi regions. each region have its own group.
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
// After scatter, the distribution for the whole cluster should be well.
func TestRegionFromDifferentGroups(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := mockconfig.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false)
oc := operator.NewController(ctx, tc.GetBasicCluster(), tc.GetSharedConfig(), stream)
// Add 6 stores.
storeCount := 6
for i := uint64(1); i <= uint64(storeCount); i++ {
tc.AddRegionStore(i, 0)
}
scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions)
regionCount := 50
for i := 1; i <= regionCount; i++ {
p := rand.Perm(storeCount)
scatterer.scatterRegion(tc.AddLeaderRegion(uint64(i), uint64(p[0])+1, uint64(p[1])+1, uint64(p[2])+1), fmt.Sprintf("t%d", i), false)
}
check := func(ss *selectedStores) {
max := uint64(0)
min := uint64(math.MaxUint64)
for i := uint64(1); i <= uint64(storeCount); i++ {
count := ss.TotalCountByStore(i)
if count > max {
max = count
}
if count < min {
min = count
}
}
re.LessOrEqual(max-min, uint64(2))
}
check(scatterer.ordinaryEngine.selectedPeer)
}

func TestRegionHasLearner(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
group := "group"
opt := mockconfig.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false)
Expand Down Expand Up @@ -617,14 +579,14 @@ func TestRegionHasLearner(t *testing.T) {
scatterer := NewRegionScatterer(ctx, tc, oc, tc.AddSuspectRegions)
regionCount := 50
for i := 1; i <= regionCount; i++ {
_, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), "group", false)
_, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), group, false)
re.NoError(err)
}
check := func(ss *selectedStores) {
max := uint64(0)
min := uint64(math.MaxUint64)
for i := uint64(1); i <= max; i++ {
count := ss.TotalCountByStore(i)
count := ss.Get(i, group)
if count > max {
max = count
}
Expand All @@ -639,7 +601,7 @@ func TestRegionHasLearner(t *testing.T) {
max := uint64(0)
min := uint64(math.MaxUint64)
for i := uint64(1); i <= voterCount; i++ {
count := ss.TotalCountByStore(i)
count := ss.Get(i, group)
if count > max {
max = count
}
Expand All @@ -650,7 +612,7 @@ func TestRegionHasLearner(t *testing.T) {
re.LessOrEqual(max-2, uint64(regionCount)/voterCount)
re.LessOrEqual(min-1, uint64(regionCount)/voterCount)
for i := voterCount + 1; i <= storeCount; i++ {
count := ss.TotalCountByStore(i)
count := ss.Get(i, group)
re.LessOrEqual(count, uint64(0))
}
}
Expand Down Expand Up @@ -691,6 +653,9 @@ func TestSelectedStoresTooFewPeers(t *testing.T) {
region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2)
op := scatterer.scatterRegion(region, group, false)
re.False(isPeerCountChanged(op))
if op != nil {
re.Equal(group, op.AdditionalInfos["group"])
}
}
}

Expand Down
Loading