Skip to content

Commit

Permalink
schedule: placement rules cache reduce cache low usage fit (#5879)
Browse files Browse the repository at this point in the history
ref #5860, ref #5864

Signed-off-by: HunDunDM <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
HunDunDM and ti-chi-bot authored Feb 9, 2023
1 parent 6d23a31 commit a3bb320
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 45 deletions.
26 changes: 20 additions & 6 deletions server/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"time"

"github.com/pingcap/failpoint"
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -91,13 +92,26 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator {
}

if c.opts.IsPlacementRulesEnabled() {
fit := c.priorityInspector.Inspect(region)
if op := c.ruleChecker.CheckWithFit(region, fit); op != nil {
if opController.OperatorCount(operator.OpReplica) < c.opts.GetReplicaScheduleLimit() {
return []*operator.Operator{op}
skipRuleCheck := c.cluster.GetOpts().IsPlacementRulesCacheEnabled() &&
c.cluster.GetRuleManager().IsRegionFitCached(c.cluster, region)
if skipRuleCheck {
// If the fit is fetched from cache, it seems that the region doesn't need check
failpoint.Inject("assertShouldNotCache", func() {
panic("cached shouldn't be used")
})
ruleCheckerGetCacheCounter.Inc()
} else {
failpoint.Inject("assertShouldCache", func() {
panic("cached should be used")
})
fit := c.priorityInspector.Inspect(region)
if op := c.ruleChecker.CheckWithFit(region, fit); op != nil {
if opController.OperatorCount(operator.OpReplica) < c.opts.GetReplicaScheduleLimit() {
return []*operator.Operator{op}
}
operator.OperatorLimitCounter.WithLabelValues(c.ruleChecker.GetType(), operator.OpReplica.String()).Inc()
c.regionWaitingList.Put(region.GetID(), nil)
}
operator.OperatorLimitCounter.WithLabelValues(c.ruleChecker.GetType(), operator.OpReplica.String()).Inc()
c.regionWaitingList.Put(region.GetID(), nil)
}
} else {
if op := c.learnerChecker.Check(region); op != nil {
Expand Down
12 changes: 0 additions & 12 deletions server/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"math"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -128,17 +127,6 @@ func (c *RuleChecker) CheckWithFit(region *core.RegionInfo, fit *placement.Regio
log.Debug("fail to check region", zap.Uint64("region-id", region.GetID()), zap.Error(errRegionNoLeader))
return
}
// If the fit is fetched from cache, it seems that the region doesn't need cache
if c.cluster.GetOpts().IsPlacementRulesCacheEnabled() && fit.IsCached() {
failpoint.Inject("assertShouldNotCache", func() {
panic("cached shouldn't be used")
})
ruleCheckerGetCacheCounter.Inc()
return nil
}
failpoint.Inject("assertShouldCache", func() {
panic("cached should be used")
})

// If the fit is calculated by FitRegion, which means we get a new fit result, thus we should
// invalid the cache if it exists
Expand Down
19 changes: 0 additions & 19 deletions server/schedule/placement/fit.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/utils/syncutil"
)

const replicaBaseScore = 100
Expand All @@ -30,30 +29,12 @@ const replicaBaseScore = 100
// All peers are divided into corresponding rules according to the matching
// rules, and the remaining Peers are placed in the OrphanPeers list.
type RegionFit struct {
mu struct {
syncutil.RWMutex
cached bool
}
RuleFits []*RuleFit `json:"rule-fits"`
OrphanPeers []*metapb.Peer `json:"orphan-peers"`
regionStores []*core.StoreInfo
rules []*Rule
}

// SetCached indicates this RegionFit is fetch form cache
func (f *RegionFit) SetCached(cached bool) {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.cached = cached
}

// IsCached indicates whether this result is fetched from caches
func (f *RegionFit) IsCached() bool {
f.mu.RLock()
defer f.mu.RUnlock()
return f.mu.cached
}

// Replace return true if the replacement store is fit all constraints and isolation score is not less than the origin.
func (f *RegionFit) Replace(srcStoreID uint64, dstStore *core.StoreInfo) bool {
fit := f.getRuleFitByStoreID(srcStoreID)
Expand Down
4 changes: 3 additions & 1 deletion server/schedule/placement/fit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/assert"
Expand All @@ -28,6 +29,7 @@ import (

func makeStores() StoreSet {
stores := core.NewStoresInfo()
now := time.Now()
for zone := 1; zone <= 5; zone++ {
for rack := 1; rack <= 5; rack++ {
for host := 1; host <= 5; host++ {
Expand All @@ -42,7 +44,7 @@ func makeStores() StoreSet {
if x == 5 {
labels["engine"] = "tiflash"
}
stores.SetStore(core.NewStoreInfoWithLabel(id, labels))
stores.SetStore(core.NewStoreInfoWithLabel(id, labels).Clone(core.SetLastHeartbeatTS(now)))
}
}
}
Expand Down
18 changes: 15 additions & 3 deletions server/schedule/placement/region_rule_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"github.com/tikv/pd/pkg/utils/syncutil"
)

const (
minHitCountToCacheHit = 10 // RegionHit is cached only when the number of hits exceeds this
)

// RegionRuleFitCacheManager stores each region's RegionFit Result and involving variables
// only when the RegionFit result is satisfied with its rules
// RegionRuleFitCacheManager caches RegionFit result for each region only when:
Expand Down Expand Up @@ -65,7 +69,7 @@ func (manager *RegionRuleFitCacheManager) CheckAndGetCache(region *core.RegionIn
}
manager.mu.RLock()
defer manager.mu.RUnlock()
if cache, ok := manager.regionCaches[region.GetID()]; ok && cache.bestFit != nil {
if cache, ok := manager.regionCaches[region.GetID()]; ok {
if cache.IsUnchanged(region, rules, stores) {
return true, cache.bestFit
}
Expand All @@ -80,7 +84,13 @@ func (manager *RegionRuleFitCacheManager) SetCache(region *core.RegionInfo, fit
}
manager.mu.Lock()
defer manager.mu.Unlock()
fit.SetCached(true)
if cache, ok := manager.regionCaches[region.GetID()]; ok {
cache.hitCount++
if cache.hitCount >= minHitCountToCacheHit {
cache.bestFit = fit
}
return
}
manager.regionCaches[region.GetID()] = manager.toRegionRuleFitCache(region, fit)
}

Expand All @@ -90,6 +100,7 @@ type regionRuleFitCache struct {
regionStores []*storeCache
rules []ruleCache
bestFit *RegionFit
hitCount uint32
}

// IsUnchanged checks whether the region and rules unchanged for the cache
Expand Down Expand Up @@ -132,7 +143,8 @@ func (manager *RegionRuleFitCacheManager) toRegionRuleFitCache(region *core.Regi
region: toRegionCache(region),
regionStores: manager.toStoreCacheList(fit.regionStores),
rules: toRuleCacheList(fit.rules),
bestFit: fit,
bestFit: nil,
hitCount: 0,
}
}

Expand Down
18 changes: 15 additions & 3 deletions server/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,18 +325,30 @@ func (m *RuleManager) GetRulesForApplyRange(start, end []byte) []*Rule {
return m.ruleList.getRulesForApplyRange(start, end)
}

// IsRegionFitCached returns whether the RegionFit can be cached.
func (m *RuleManager) IsRegionFitCached(storeSet StoreSet, region *core.RegionInfo) bool {
regionStores := getStoresByRegion(storeSet, region)
rules := m.GetRulesForApplyRegion(region)
isCached, _ := m.cache.CheckAndGetCache(region, rules, regionStores)
return isCached
}

// FitRegion fits a region to the rules it matches.
func (m *RuleManager) FitRegion(storeSet StoreSet, region *core.RegionInfo) *RegionFit {
func (m *RuleManager) FitRegion(storeSet StoreSet, region *core.RegionInfo) (fit *RegionFit) {
regionStores := getStoresByRegion(storeSet, region)
rules := m.GetRulesForApplyRegion(region)
var isCached bool
if m.opt.IsPlacementRulesCacheEnabled() {
if ok, fit := m.cache.CheckAndGetCache(region, rules, regionStores); fit != nil && ok {
if isCached, fit = m.cache.CheckAndGetCache(region, rules, regionStores); isCached && fit != nil {
return fit
}
}
fit := fitRegion(regionStores, region, rules, m.opt.IsWitnessAllowed())
fit = fitRegion(regionStores, region, rules, m.opt.IsWitnessAllowed())
fit.regionStores = regionStores
fit.rules = rules
if isCached {
m.SetRegionFitCache(region, fit)
}
return fit
}

Expand Down
46 changes: 45 additions & 1 deletion server/schedule/placement/rule_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/server/config"
)

func newTestManager(t *testing.T) (endpoint.RuleStorage, *RuleManager) {
re := require.New(t)
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
var err error
manager := NewRuleManager(store, nil, nil)
manager := NewRuleManager(store, nil, config.NewTestOptions())
err = manager.Initialize(3, []string{"zone", "rack", "host"})
re.NoError(err)
return store, manager
Expand Down Expand Up @@ -421,6 +422,49 @@ func TestCheckApplyRules(t *testing.T) {
re.Regexp("needs at least one leader or voter", err.Error())
}

func TestCacheManager(t *testing.T) {
re := require.New(t)
_, manager := newTestManager(t)
manager.opt.SetPlacementRulesCacheEnabled(true)
rules := addExtraRules(0)
re.NoError(manager.SetRules(rules))
stores := makeStores()

regionMeta := &metapb.Region{
Id: 1,
StartKey: []byte(""),
EndKey: []byte(""),
RegionEpoch: &metapb.RegionEpoch{ConfVer: 0, Version: 0},
Peers: []*metapb.Peer{
{Id: 11, StoreId: 1111, Role: metapb.PeerRole_Voter},
{Id: 12, StoreId: 2111, Role: metapb.PeerRole_Voter},
{Id: 13, StoreId: 3111, Role: metapb.PeerRole_Voter},
},
}
region := core.NewRegionInfo(regionMeta, regionMeta.Peers[0])
fit := manager.FitRegion(stores, region)
manager.SetRegionFitCache(region, fit)
// bestFit is not stored when the total number of hits is insufficient.
for i := 1; i < minHitCountToCacheHit/2; i++ {
manager.FitRegion(stores, region)
re.True(manager.IsRegionFitCached(stores, region))
cache := manager.cache.regionCaches[1]
re.Equal(uint32(i), cache.hitCount)
re.Nil(cache.bestFit)
}
// Store bestFit when the total number of hits is sufficient.
for i := 0; i < minHitCountToCacheHit; i++ {
manager.FitRegion(stores, region)
}
cache := manager.cache.regionCaches[1]
re.Equal(uint32(minHitCountToCacheHit), cache.hitCount)
re.NotNil(cache.bestFit)
// Cache invalidation after change
regionMeta.Peers[2] = &metapb.Peer{Id: 14, StoreId: 4111, Role: metapb.PeerRole_Voter}
region = core.NewRegionInfo(regionMeta, regionMeta.Peers[0])
re.False(manager.IsRegionFitCached(stores, region))
}

func dhex(hk string) []byte {
k, err := hex.DecodeString(hk)
if err != nil {
Expand Down

0 comments on commit a3bb320

Please sign in to comment.