Skip to content

Commit

Permalink
fix the conflict
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed May 30, 2022
1 parent 4e61080 commit 2fcba0c
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 806 deletions.
47 changes: 20 additions & 27 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {

// IsPrepared return true if the prepare checker is ready.
func (c *RaftCluster) IsPrepared() bool {
return c.coordinator.prepareChecker.isPrepared()
return c.prepareChecker.isPrepared()
}

var regionGuide = core.GenerateRegionGuideFunc(true)
Expand Down Expand Up @@ -668,13 +668,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
regionEventCounter.WithLabelValues("update_cache").Inc()
}

<<<<<<< HEAD
if isNew {
c.prepareChecker.collect(region)
=======
if !c.IsPrepared() && isNew {
c.coordinator.prepareChecker.collect(region)
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
c.prepareChecker.collect(region)
}

if c.regionStats != nil {
Expand Down Expand Up @@ -728,16 +723,6 @@ func (c *RaftCluster) updateStoreStatusLocked(id uint64) {
c.core.UpdateStoreStatus(id, leaderCount, regionCount, pendingPeerCount, leaderRegionSize, regionSize)
}

<<<<<<< HEAD
//nolint:unused
func (c *RaftCluster) getClusterID() uint64 {
c.RLock()
defer c.RUnlock()
return c.meta.GetId()
}

=======
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
func (c *RaftCluster) putMetaLocked(meta *metapb.Cluster) error {
if c.storage != nil {
if err := c.storage.SaveMeta(meta); err != nil {
Expand Down Expand Up @@ -1458,13 +1443,6 @@ func (c *RaftCluster) GetComponentManager() *component.Manager {
return c.componentManager
}

// isPrepared if the cluster information is collected
func (c *RaftCluster) isPrepared() bool {
c.RLock()
defer c.RUnlock()
return c.prepareChecker.check(c)
}

// GetStoresLoads returns load stats of all stores.
func (c *RaftCluster) GetStoresLoads() map[uint64][]float64 {
c.RLock()
Expand Down Expand Up @@ -1517,10 +1495,11 @@ func (c *RaftCluster) FitRegion(region *core.RegionInfo) *placement.RegionFit {
}

type prepareChecker struct {
sync.RWMutex
reactiveRegions map[uint64]int
start time.Time
sum int
isPrepared bool
prepared bool
}

func newPrepareChecker() *prepareChecker {
Expand All @@ -1532,7 +1511,13 @@ func newPrepareChecker() *prepareChecker {

// Before starting up the scheduler, we need to take the proportion of the regions on each store into consideration.
func (checker *prepareChecker) check(c *RaftCluster) bool {
if checker.isPrepared || time.Since(checker.start) > collectTimeout {
checker.Lock()
defer checker.Unlock()
if checker.prepared {
return true
}
if time.Since(checker.start) > collectTimeout {
checker.prepared = true
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
Expand All @@ -1549,17 +1534,25 @@ func (checker *prepareChecker) check(c *RaftCluster) bool {
return false
}
}
checker.isPrepared = true
checker.prepared = true
return true
}

func (checker *prepareChecker) collect(region *core.RegionInfo) {
checker.Lock()
defer checker.Unlock()
for _, p := range region.GetPeers() {
checker.reactiveRegions[p.GetStoreId()]++
}
checker.sum++
}

func (checker *prepareChecker) isPrepared() bool {
checker.RLock()
defer checker.RUnlock()
return checker.prepared
}

// GetHotWriteRegions gets hot write regions' info.
func (c *RaftCluster) GetHotWriteRegions(storeIDs ...uint64) *statistics.StoreHotPeersInfos {
c.RLock()
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ func (c *coordinator) resetHotSpotMetrics() {
}

func (c *coordinator) shouldRun() bool {
return c.cluster.isPrepared()
return c.cluster.IsPrepared()
}

func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string) error {
Expand Down
7 changes: 1 addition & 6 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,8 @@ func (s *testCoordinatorSuite) TestBasic(c *C) {
}

func (s *testCoordinatorSuite) TestDispatch(c *C) {
tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.prepareChecker.isPrepared = true }, nil, c)
tc, co, cleanup := prepare(nil, func(tc *testCluster) { tc.prepareChecker.prepared = true }, nil, c)
defer cleanup()
<<<<<<< HEAD

=======
co.prepareChecker.prepared = true
>>>>>>> 429b49283 (*: fix scheduling can not immediately start after transfer leader (#4875))
// Transfer peer from store 4 to store 1.
c.Assert(tc.addRegionStore(4, 40), IsNil)
c.Assert(tc.addRegionStore(3, 30), IsNil)
Expand Down
81 changes: 0 additions & 81 deletions server/cluster/prepare_checker.go

This file was deleted.

Loading

0 comments on commit 2fcba0c

Please sign in to comment.