Skip to content

Commit

Permalink
*: reduce raft cluster lock and replace some place with atomic (#8786)
Browse files Browse the repository at this point in the history
close #8785

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] authored Nov 18, 2024
1 parent c8c7551 commit 1b4b21a
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 43 deletions.
20 changes: 8 additions & 12 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
Expand All @@ -33,7 +34,6 @@ import (
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/keyutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -78,10 +78,8 @@ type Controller struct {

// duration is the duration of the last patrol round.
// It's exported, so it should be protected by a mutex.
mu struct {
syncutil.RWMutex
duration time.Duration
}
duration atomic.Value // Store as time.Duration

// interval is the config interval of patrol regions.
// It's used to update the ticker, so we need to
// record it to avoid updating the ticker frequently.
Expand All @@ -96,7 +94,7 @@ type Controller struct {
// NewController create a new Controller.
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
pendingProcessedRegions := cache.NewIDTTL(ctx, time.Minute, 3*time.Minute)
return &Controller{
c := &Controller{
ctx: ctx,
cluster: cluster,
conf: conf,
Expand All @@ -114,6 +112,8 @@ func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config
interval: cluster.GetCheckerConfig().GetPatrolRegionInterval(),
patrolRegionScanLimit: calculateScanLimit(cluster),
}
c.duration.Store(time.Duration(0))
return c
}

// PatrolRegions is used to scan regions.
Expand Down Expand Up @@ -210,15 +210,11 @@ func (c *Controller) updatePatrolWorkersIfNeeded() {

// GetPatrolRegionsDuration returns the duration of the last patrol region round.
func (c *Controller) GetPatrolRegionsDuration() time.Duration {
c.mu.RLock()
defer c.mu.RUnlock()
return c.mu.duration
return c.duration.Load().(time.Duration)
}

func (c *Controller) setPatrolRegionsDuration(dur time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
c.mu.duration = dur
c.duration.Store(dur)
}

func (c *Controller) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) {
Expand Down
14 changes: 14 additions & 0 deletions pkg/utils/typeutil/clone_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package typeutil

import (
Expand Down
59 changes: 28 additions & 31 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/coreos/go-semver/semver"
Expand Down Expand Up @@ -157,8 +158,8 @@ type RaftCluster struct {
isAPIServiceMode bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS uint64
externalTS uint64
minResolvedTS atomic.Value // Store as uint64
externalTS atomic.Value // Store as uint64

// Keep the previous store limit settings when removing a store.
prevStoreLimit map[uint64]map[storelimit.Type]float64
Expand Down Expand Up @@ -354,10 +355,8 @@ func (c *RaftCluster) Start(s Server) error {
return err
}
c.limiter = NewStoreLimiter(s.GetPersistOptions())
c.externalTS, err = c.storage.LoadExternalTS()
if err != nil {
log.Error("load external timestamp meets error", zap.Error(err))
}
c.loadExternalTS()
c.loadMinResolvedTS()

if c.isAPIServiceMode {
// bootstrap keyspace group manager after starting other parts successfully.
Expand Down Expand Up @@ -2260,28 +2259,27 @@ func (c *RaftCluster) SetMinResolvedTS(storeID, minResolvedTS uint64) error {
}

// CheckAndUpdateMinResolvedTS checks and updates the min resolved ts of the cluster.
// It only be called by the background job runMinResolvedTSJob.
// This is exported for testing purpose.
func (c *RaftCluster) CheckAndUpdateMinResolvedTS() (uint64, bool) {
c.Lock()
defer c.Unlock()

if !c.isInitialized() {
return math.MaxUint64, false
}
curMinResolvedTS := uint64(math.MaxUint64)
newMinResolvedTS := uint64(math.MaxUint64)
for _, s := range c.GetStores() {
if !core.IsAvailableForMinResolvedTS(s) {
continue
}
if curMinResolvedTS > s.GetMinResolvedTS() {
curMinResolvedTS = s.GetMinResolvedTS()
if newMinResolvedTS > s.GetMinResolvedTS() {
newMinResolvedTS = s.GetMinResolvedTS()
}
}
if curMinResolvedTS == math.MaxUint64 || curMinResolvedTS <= c.minResolvedTS {
return c.minResolvedTS, false
oldMinResolvedTS := c.minResolvedTS.Load().(uint64)
if newMinResolvedTS == math.MaxUint64 || newMinResolvedTS <= oldMinResolvedTS {
return oldMinResolvedTS, false
}
c.minResolvedTS = curMinResolvedTS
return c.minResolvedTS, true
c.minResolvedTS.Store(newMinResolvedTS)
return newMinResolvedTS, true
}

func (c *RaftCluster) runMinResolvedTSJob() {
Expand All @@ -2295,7 +2293,6 @@ func (c *RaftCluster) runMinResolvedTSJob() {
ticker := time.NewTicker(interval)
defer ticker.Stop()

c.loadMinResolvedTS()
for {
select {
case <-c.ctx.Done():
Expand Down Expand Up @@ -2325,25 +2322,19 @@ func (c *RaftCluster) loadMinResolvedTS() {
log.Error("load min resolved ts meet error", errs.ZapError(err))
return
}
c.Lock()
defer c.Unlock()
c.minResolvedTS = minResolvedTS
c.minResolvedTS.Store(minResolvedTS)
}

// GetMinResolvedTS returns the min resolved ts of the cluster.
func (c *RaftCluster) GetMinResolvedTS() uint64 {
c.RLock()
defer c.RUnlock()
if !c.isInitialized() {
return math.MaxUint64
}
return c.minResolvedTS
return c.minResolvedTS.Load().(uint64)
}

// GetStoreMinResolvedTS returns the min resolved ts of the store.
func (c *RaftCluster) GetStoreMinResolvedTS(storeID uint64) uint64 {
c.RLock()
defer c.RUnlock()
store := c.GetStore(storeID)
if store == nil {
return math.MaxUint64
Expand Down Expand Up @@ -2371,22 +2362,28 @@ func (c *RaftCluster) GetMinResolvedTSByStoreIDs(ids []uint64) (uint64, map[uint

// GetExternalTS returns the external timestamp.
func (c *RaftCluster) GetExternalTS() uint64 {
c.RLock()
defer c.RUnlock()
if !c.isInitialized() {
return math.MaxUint64
}
return c.externalTS
return c.externalTS.Load().(uint64)
}

// SetExternalTS sets the external timestamp.
func (c *RaftCluster) SetExternalTS(timestamp uint64) error {
c.Lock()
defer c.Unlock()
c.externalTS = timestamp
c.externalTS.Store(timestamp)
return c.storage.SaveExternalTS(timestamp)
}

func (c *RaftCluster) loadExternalTS() {
// Use `c.GetStorage()` here to prevent from the data race in test.
externalTS, err := c.GetStorage().LoadExternalTS()
if err != nil {
log.Error("load external ts meet error", errs.ZapError(err))
return
}
c.externalTS.Store(externalTS)
}

// SetStoreLimit sets a store limit for a given type and rate.
func (c *RaftCluster) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePerMin float64) error {
old := c.opt.GetScheduleConfig().Clone()
Expand Down

0 comments on commit 1b4b21a

Please sign in to comment.