Skip to content

Commit

Permalink
Merge branch 'release-5.0' into cherry-pick-4942-to-release-5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Jun 14, 2022
2 parents c897834 + 498d5db commit 8e853f7
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 24 deletions.
20 changes: 12 additions & 8 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,9 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) {
tc.putRegion(region)
start := time.Now()
{
op1 := lb.Schedule(tc)[0]
ops := lb.Schedule(tc)
c.Assert(ops, HasLen, 1)
op1 := ops[0]
c.Assert(op1, NotNil)
c.Assert(oc.AddOperator(op1), IsTrue)
c.Assert(oc.RemoveOperator(op1), IsTrue)
Expand All @@ -983,23 +985,25 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) {
if time.Since(start) > time.Second {
break
}
c.Assert(ops, IsNil)
c.Assert(ops, HasLen, 0)
}

// reset all stores' limit
// scheduling one time needs 1/10 seconds
opt.SetAllStoresLimit(storelimit.AddPeer, 600)
opt.SetAllStoresLimit(storelimit.RemovePeer, 600)
time.Sleep(time.Second)
for i := 0; i < 10; i++ {
op1 := lb.Schedule(tc)[0]
c.Assert(op1, NotNil)
c.Assert(oc.AddOperator(op1), IsTrue)
c.Assert(oc.RemoveOperator(op1), IsTrue)
ops := lb.Schedule(tc)
c.Assert(ops, HasLen, 1)
op := ops[0]
c.Assert(oc.AddOperator(op), IsTrue)
c.Assert(oc.RemoveOperator(op), IsTrue)
}
// sleep 1 seconds to make sure that the token is filled up
time.Sleep(1 * time.Second)
time.Sleep(time.Second)
for i := 0; i < 100; i++ {
c.Assert(lb.Schedule(tc), NotNil)
c.Assert(len(lb.Schedule(tc)), Greater, 0)
}
}

Expand Down
32 changes: 22 additions & 10 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string) (*opera

func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *operator.Operator {
ordinaryFilter := filter.NewOrdinaryEngineFilter(r.name)
ordinaryPeers := make(map[uint64]*metapb.Peer)
ordinaryPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers()))
specialPeers := make(map[string]map[uint64]*metapb.Peer)
// Group peers by the engine of their stores
for _, peer := range region.GetPeers() {
Expand All @@ -283,24 +283,36 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
return nil
}
if ordinaryFilter.Target(r.cluster.GetOpts(), store) {
ordinaryPeers[peer.GetId()] = peer
ordinaryPeers[peer.GetStoreId()] = peer
} else {
engine := store.GetLabelValue(filter.EngineKey)
if _, ok := specialPeers[engine]; !ok {
specialPeers[engine] = make(map[uint64]*metapb.Peer)
}
specialPeers[engine][peer.GetId()] = peer
specialPeers[engine][peer.GetStoreId()] = peer
}
}

targetPeers := make(map[uint64]*metapb.Peer)
selectedStores := make(map[uint64]struct{})
scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) {
targetPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers())) // StoreID -> Peer
selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // StoreID set
scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer
for _, peer := range peers {
candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context)
newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context)
targetPeers[newPeer.GetStoreId()] = newPeer
selectedStores[newPeer.GetStoreId()] = struct{}{}
if _, ok := selectedStores[peer.GetStoreId()]; ok {
// It is both sourcePeer and targetPeer itself, no need to select.
continue
}
for {
candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context)
newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context)
targetPeers[newPeer.GetStoreId()] = newPeer
selectedStores[newPeer.GetStoreId()] = struct{}{}
// If the selected peer is a peer other than origin peer in this region,
// it is considered that the selected peer select itself.
// This origin peer re-selects.
if _, ok := peers[newPeer.GetStoreId()]; !ok || peer.GetStoreId() == newPeer.GetStoreId() {
break
}
}
}
}

Expand Down
56 changes: 54 additions & 2 deletions server/schedule/region_scatterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
Expand Down Expand Up @@ -354,8 +355,8 @@ func (s *testScatterRegionSuite) TestScatterGroupInConcurrency(c *C) {
}
// For leader, we expect each store have about 20 leader for each group
checker(scatterer.ordinaryEngine.selectedLeader, 20, 5)
// For peer, we expect each store have about 50 peers for each group
checker(scatterer.ordinaryEngine.selectedPeer, 50, 15)
// For peer, we expect each store have about 60 peers for each group
checker(scatterer.ordinaryEngine.selectedPeer, 60, 15)
cancel()
}
}
Expand Down Expand Up @@ -476,3 +477,54 @@ func (s *testScatterRegionSuite) TestRegionFromDifferentGroups(c *C) {
}
check(scatterer.ordinaryEngine.selectedPeer)
}

// TestSelectedStores tests if the peer count has changed due to the picking strategy.
// Ref https://github.com/tikv/pd/issues/4565
func (s *testScatterRegionSuite) TestSelectedStores(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(opt)
// Add 4 stores.
for i := uint64(1); i <= 4; i++ {
tc.AddRegionStore(i, 0)
// prevent store from being disconnected
tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute)
}
group := "group"
scatterer := NewRegionScatterer(ctx, tc)

// Put a lot of regions in Store 1/2/3.
for i := uint64(1); i < 100; i++ {
region := tc.AddLeaderRegion(i+10, i%3+1, (i+1)%3+1, (i+2)%3+1)
peers := make(map[uint64]*metapb.Peer, 3)
for _, peer := range region.GetPeers() {
peers[peer.GetStoreId()] = peer
}
scatterer.Put(peers, i%3+1, group)
}

// Try to scatter a region with peer store id 2/3/4
for i := uint64(1); i < 20; i++ {
region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2)
op := scatterer.scatterRegion(region, group)
c.Assert(isPeerCountChanged(op), IsFalse)
}
}

func isPeerCountChanged(op *operator.Operator) bool {
if op == nil {
return false
}
add, remove := 0, 0
for i := 0; i < op.Len(); i++ {
step := op.Step(i)
switch step.(type) {
case operator.AddPeer, operator.AddLearner, operator.AddLightPeer, operator.AddLightLearner:
add++
case operator.RemovePeer:
remove++
}
}
return add != remove
}
9 changes: 8 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,14 @@ func (s *Server) campaignLeader() {
log.Error("failed to initialize the global TSO allocator", errs.ZapError(err))
return
}
defer s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation)
defer func() {
s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation)
failpoint.Inject("updateAfterResetTSO", func() {
if err = alllocator.UpdateTSO(); err != nil {
panic(err)
}
})
}()

if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", errs.ZapError(err))
Expand Down
2 changes: 2 additions & 0 deletions server/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) {
}

func (s *storeStatistics) Collect() {
placementStatusGauge.Reset()

metrics := make(map[string]float64)
metrics["store_up_count"] = float64(s.Up)
metrics["store_disconnected_count"] = float64(s.Disconnect)
Expand Down
13 changes: 10 additions & 3 deletions server/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,13 @@ type timestampOracle struct {
dcLocation string
}

func (t *timestampOracle) setTSOPhysical(next time.Time) {
func (t *timestampOracle) setTSOPhysical(next time.Time, force bool) {
t.tsoMux.Lock()
defer t.tsoMux.Unlock()
// Do not update the zero physical time if the `force` flag is false.
if t.tsoMux.physical == typeutil.ZeroTime && !force {
return
}
// make sure the ts won't fall back
if typeutil.SubTSOPhysicalByWallClock(next, t.tsoMux.physical) > 0 {
t.tsoMux.physical = next
Expand Down Expand Up @@ -190,7 +194,7 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error {
tsoCounter.WithLabelValues("sync_ok", t.dcLocation).Inc()
log.Info("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next))
// save into memory
t.setTSOPhysical(next)
t.setTSOPhysical(next, true)
return nil
}

Expand Down Expand Up @@ -266,6 +270,9 @@ func (t *timestampOracle) resetUserTimestamp(leadership *election.Leadership, ts
// 1. The saved time is monotonically increasing.
// 2. The physical time is monotonically increasing.
// 3. The physical time is always less than the saved timestamp.
//
// NOTICE: this function should be called after the TSO in memory has been initialized
// and should not be called when the TSO in memory has been reset anymore.
func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error {
prevPhysical, prevLogical := t.getTSO()
tsoGauge.WithLabelValues("tso", t.dcLocation).Set(float64(prevPhysical.Unix()))
Expand Down Expand Up @@ -315,7 +322,7 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error
}
}
// save into memory
t.setTSOPhysical(next)
t.setTSOPhysical(next, false)

return nil
}
Expand Down
38 changes: 38 additions & 0 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,44 @@ func (s *clientTestSuite) TestLeaderTransfer(c *C) {
wg.Wait()
}

// More details can be found in this issue: https://github.com/tikv/pd/issues/4884
func (s *clientTestSuite) TestUpdateAfterResetTSO(c *C) {
cluster, err := tests.NewTestCluster(s.ctx, 2)
c.Assert(err, IsNil)
defer cluster.Destroy()

endpoints := s.runServer(c, cluster)
cli := s.setupCli(c, endpoints, false)

testutil.WaitUntil(c, func(c *C) bool {
_, _, err := cli.GetTS(context.TODO())
return err == nil
})
// Transfer leader to trigger the TSO resetting.
c.Assert(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)"), IsNil)
oldLeaderName := cluster.WaitLeader()
err = cluster.GetServer(oldLeaderName).ResignLeader()
c.Assert(err, IsNil)
c.Assert(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO"), IsNil)
newLeaderName := cluster.WaitLeader()
c.Assert(newLeaderName, Not(Equals), oldLeaderName)
// Request a new TSO.
testutil.WaitUntil(c, func(c *C) bool {
_, _, err := cli.GetTS(context.TODO())
return err == nil
})
// Transfer leader back.
c.Assert(failpoint.Enable("github.com/tikv/pd/server/tso/delaySyncTimestamp", `return(true)`), IsNil)
err = cluster.GetServer(newLeaderName).ResignLeader()
c.Assert(err, IsNil)
// Should NOT panic here.
testutil.WaitUntil(c, func(c *C) bool {
_, _, err := cli.GetTS(context.TODO())
return err == nil
})
c.Assert(failpoint.Disable("github.com/tikv/pd/server/tso/delaySyncTimestamp"), IsNil)
}

func (s *clientTestSuite) TestTSOAllocatorLeader(c *C) {
dcLocationConfig := map[string]string{
"pd1": "dc-1",
Expand Down

0 comments on commit 8e853f7

Please sign in to comment.