From bcf361528c380f200f7094c863e7af0c4ba0538a Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 13 Jun 2022 14:14:32 +0800 Subject: [PATCH 1/3] metrics: fix the residual label (#4824) (#4828) ref tikv/pd#4824, close tikv/pd#4825 Signed-off-by: Ryan Leung Co-authored-by: Ryan Leung --- server/cluster/coordinator_test.go | 20 ++++++++++++-------- server/statistics/store_collection.go | 2 ++ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index 5b24633c615..a019976ce33 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -972,7 +972,9 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) { tc.putRegion(region) start := time.Now() { - op1 := lb.Schedule(tc)[0] + ops := lb.Schedule(tc) + c.Assert(ops, HasLen, 1) + op1 := ops[0] c.Assert(op1, NotNil) c.Assert(oc.AddOperator(op1), IsTrue) c.Assert(oc.RemoveOperator(op1), IsTrue) @@ -983,23 +985,25 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) { if time.Since(start) > time.Second { break } - c.Assert(ops, IsNil) + c.Assert(ops, HasLen, 0) } // reset all stores' limit // scheduling one time needs 1/10 seconds opt.SetAllStoresLimit(storelimit.AddPeer, 600) opt.SetAllStoresLimit(storelimit.RemovePeer, 600) + time.Sleep(time.Second) for i := 0; i < 10; i++ { - op1 := lb.Schedule(tc)[0] - c.Assert(op1, NotNil) - c.Assert(oc.AddOperator(op1), IsTrue) - c.Assert(oc.RemoveOperator(op1), IsTrue) + ops := lb.Schedule(tc) + c.Assert(ops, HasLen, 1) + op := ops[0] + c.Assert(oc.AddOperator(op), IsTrue) + c.Assert(oc.RemoveOperator(op), IsTrue) } // sleep 1 seconds to make sure that the token is filled up - time.Sleep(1 * time.Second) + time.Sleep(time.Second) for i := 0; i < 100; i++ { - c.Assert(lb.Schedule(tc), NotNil) + c.Assert(len(lb.Schedule(tc)), Greater, 0) } } diff --git a/server/statistics/store_collection.go b/server/statistics/store_collection.go index ea5483a7578..88207eebc76 100644 --- a/server/statistics/store_collection.go +++ b/server/statistics/store_collection.go @@ -126,6 +126,8 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) { } func (s *storeStatistics) Collect() { + placementStatusGauge.Reset() + metrics := make(map[string]float64) metrics["store_up_count"] = float64(s.Up) metrics["store_disconnected_count"] = float64(s.Disconnect) From 9a6dec54c35a64a76c892da6e944cd66a01d582c Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 13 Jun 2022 14:54:34 +0800 Subject: [PATCH 2/3] tso: fix the corner case that may cause TSO fallback (#4885) (#4894) close tikv/pd#4884, ref tikv/pd#4885 tso: fix the corner case that may cause TSO fallback Signed-off-by: JmPotato Co-authored-by: JmPotato --- server/server.go | 9 ++++++++- server/tso/tso.go | 13 ++++++++++--- tests/client/client_test.go | 38 +++++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/server/server.go b/server/server.go index e05d2959553..e9b8b9a3849 100644 --- a/server/server.go +++ b/server/server.go @@ -1238,7 +1238,14 @@ func (s *Server) campaignLeader() { log.Error("failed to initialize the global TSO allocator", errs.ZapError(err)) return } - defer s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation) + defer func() { + s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation) + failpoint.Inject("updateAfterResetTSO", func() { + if err = alllocator.UpdateTSO(); err != nil { + panic(err) + } + }) + }() if err := s.reloadConfigFromKV(); err != nil { log.Error("failed to reload configuration", errs.ZapError(err)) diff --git a/server/tso/tso.go b/server/tso/tso.go index 87206e521a2..2fe03293b86 100644 --- a/server/tso/tso.go +++ b/server/tso/tso.go @@ -70,9 +70,13 @@ type timestampOracle struct { dcLocation string } -func (t *timestampOracle) setTSOPhysical(next time.Time) { +func (t *timestampOracle) setTSOPhysical(next time.Time, force bool) { t.tsoMux.Lock() defer t.tsoMux.Unlock() + // Do not update the zero physical time if the `force` flag is false. + if t.tsoMux.physical == typeutil.ZeroTime && !force { + return + } // make sure the ts won't fall back if typeutil.SubTSOPhysicalByWallClock(next, t.tsoMux.physical) > 0 { t.tsoMux.physical = next @@ -190,7 +194,7 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error { tsoCounter.WithLabelValues("sync_ok", t.dcLocation).Inc() log.Info("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next)) // save into memory - t.setTSOPhysical(next) + t.setTSOPhysical(next, true) return nil } @@ -266,6 +270,9 @@ func (t *timestampOracle) resetUserTimestamp(leadership *election.Leadership, ts // 1. The saved time is monotonically increasing. // 2. The physical time is monotonically increasing. // 3. The physical time is always less than the saved timestamp. +// +// NOTICE: this function should be called after the TSO in memory has been initialized +// and should not be called when the TSO in memory has been reset anymore. func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error { prevPhysical, prevLogical := t.getTSO() tsoGauge.WithLabelValues("tso", t.dcLocation).Set(float64(prevPhysical.Unix())) @@ -315,7 +322,7 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error } } // save into memory - t.setTSOPhysical(next) + t.setTSOPhysical(next, false) return nil } diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 0ea6ee7e2e9..94dbc9db46c 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -182,6 +182,44 @@ func (s *clientTestSuite) TestLeaderTransfer(c *C) { wg.Wait() } +// More details can be found in this issue: https://github.com/tikv/pd/issues/4884 +func (s *clientTestSuite) TestUpdateAfterResetTSO(c *C) { + cluster, err := tests.NewTestCluster(s.ctx, 2) + c.Assert(err, IsNil) + defer cluster.Destroy() + + endpoints := s.runServer(c, cluster) + cli := s.setupCli(c, endpoints, false) + + testutil.WaitUntil(c, func(c *C) bool { + _, _, err := cli.GetTS(context.TODO()) + return err == nil + }) + // Transfer leader to trigger the TSO resetting. + c.Assert(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)"), IsNil) + oldLeaderName := cluster.WaitLeader() + err = cluster.GetServer(oldLeaderName).ResignLeader() + c.Assert(err, IsNil) + c.Assert(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO"), IsNil) + newLeaderName := cluster.WaitLeader() + c.Assert(newLeaderName, Not(Equals), oldLeaderName) + // Request a new TSO. + testutil.WaitUntil(c, func(c *C) bool { + _, _, err := cli.GetTS(context.TODO()) + return err == nil + }) + // Transfer leader back. + c.Assert(failpoint.Enable("github.com/tikv/pd/server/tso/delaySyncTimestamp", `return(true)`), IsNil) + err = cluster.GetServer(newLeaderName).ResignLeader() + c.Assert(err, IsNil) + // Should NOT panic here. + testutil.WaitUntil(c, func(c *C) bool { + _, _, err := cli.GetTS(context.TODO()) + return err == nil + }) + c.Assert(failpoint.Disable("github.com/tikv/pd/server/tso/delaySyncTimestamp"), IsNil) +} + func (s *clientTestSuite) TestTSOAllocatorLeader(c *C) { dcLocationConfig := map[string]string{ "pd1": "dc-1", From 498d5dbcf81bd83783131a5f42deabfcdf1ad69d Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 13 Jun 2022 15:16:33 +0800 Subject: [PATCH 3/3] region_scatterer: fix the bug that could generate schedule with too few peers (#4570) (#4578) close tikv/pd#4565, ref tikv/pd#4570 Signed-off-by: HunDunDM Co-authored-by: HunDunDM --- server/schedule/region_scatterer.go | 32 +++++++++----- server/schedule/region_scatterer_test.go | 56 +++++++++++++++++++++++- 2 files changed, 76 insertions(+), 12 deletions(-) diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index 527c35376c1..415c051b3c0 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -274,7 +274,7 @@ func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string) (*opera func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *operator.Operator { ordinaryFilter := filter.NewOrdinaryEngineFilter(r.name) - ordinaryPeers := make(map[uint64]*metapb.Peer) + ordinaryPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers())) specialPeers := make(map[string]map[uint64]*metapb.Peer) // Group peers by the engine of their stores for _, peer := range region.GetPeers() { @@ -283,24 +283,36 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * return nil } if ordinaryFilter.Target(r.cluster.GetOpts(), store) { - ordinaryPeers[peer.GetId()] = peer + ordinaryPeers[peer.GetStoreId()] = peer } else { engine := store.GetLabelValue(filter.EngineKey) if _, ok := specialPeers[engine]; !ok { specialPeers[engine] = make(map[uint64]*metapb.Peer) } - specialPeers[engine][peer.GetId()] = peer + specialPeers[engine][peer.GetStoreId()] = peer } } - targetPeers := make(map[uint64]*metapb.Peer) - selectedStores := make(map[uint64]struct{}) - scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { + targetPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers())) // StoreID -> Peer + selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // StoreID set + scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer for _, peer := range peers { - candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context) - newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context) - targetPeers[newPeer.GetStoreId()] = newPeer - selectedStores[newPeer.GetStoreId()] = struct{}{} + if _, ok := selectedStores[peer.GetStoreId()]; ok { + // It is both sourcePeer and targetPeer itself, no need to select. + continue + } + for { + candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context) + newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context) + targetPeers[newPeer.GetStoreId()] = newPeer + selectedStores[newPeer.GetStoreId()] = struct{}{} + // If the selected peer is a peer other than origin peer in this region, + // it is considered that the selected peer select itself. + // This origin peer re-selects. + if _, ok := peers[newPeer.GetStoreId()]; !ok || peer.GetStoreId() == newPeer.GetStoreId() { + break + } + } } } diff --git a/server/schedule/region_scatterer_test.go b/server/schedule/region_scatterer_test.go index 2c421939bab..ab5959b744f 100644 --- a/server/schedule/region_scatterer_test.go +++ b/server/schedule/region_scatterer_test.go @@ -9,6 +9,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/mock/mockcluster" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" @@ -354,8 +355,8 @@ func (s *testScatterRegionSuite) TestScatterGroupInConcurrency(c *C) { } // For leader, we expect each store have about 20 leader for each group checker(scatterer.ordinaryEngine.selectedLeader, 20, 5) - // For peer, we expect each store have about 50 peers for each group - checker(scatterer.ordinaryEngine.selectedPeer, 50, 15) + // For peer, we expect each store have about 60 peers for each group + checker(scatterer.ordinaryEngine.selectedPeer, 60, 15) cancel() } } @@ -476,3 +477,54 @@ func (s *testScatterRegionSuite) TestRegionFromDifferentGroups(c *C) { } check(scatterer.ordinaryEngine.selectedPeer) } + +// TestSelectedStores tests if the peer count has changed due to the picking strategy. +// Ref https://github.com/tikv/pd/issues/4565 +func (s *testScatterRegionSuite) TestSelectedStores(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + opt := config.NewTestOptions() + tc := mockcluster.NewCluster(opt) + // Add 4 stores. + for i := uint64(1); i <= 4; i++ { + tc.AddRegionStore(i, 0) + // prevent store from being disconnected + tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute) + } + group := "group" + scatterer := NewRegionScatterer(ctx, tc) + + // Put a lot of regions in Store 1/2/3. + for i := uint64(1); i < 100; i++ { + region := tc.AddLeaderRegion(i+10, i%3+1, (i+1)%3+1, (i+2)%3+1) + peers := make(map[uint64]*metapb.Peer, 3) + for _, peer := range region.GetPeers() { + peers[peer.GetStoreId()] = peer + } + scatterer.Put(peers, i%3+1, group) + } + + // Try to scatter a region with peer store id 2/3/4 + for i := uint64(1); i < 20; i++ { + region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2) + op := scatterer.scatterRegion(region, group) + c.Assert(isPeerCountChanged(op), IsFalse) + } +} + +func isPeerCountChanged(op *operator.Operator) bool { + if op == nil { + return false + } + add, remove := 0, 0 + for i := 0; i < op.Len(); i++ { + step := op.Step(i) + switch step.(type) { + case operator.AddPeer, operator.AddLearner, operator.AddLightPeer, operator.AddLightLearner: + add++ + case operator.RemovePeer: + remove++ + } + } + return add != remove +}