Skip to content

Commit

Permalink
cluster: fix the bug that region statistics are not updated after flo…
Browse files Browse the repository at this point in the history
…w-round-by-digit change (#4304) (#4305)

* cluster: fix the bug that region statistics are not updated after flow-round-by-digit change

close #4295

Signed-off-by: HunDunDM <[email protected]>

* refine some code

Signed-off-by: HunDunDM <[email protected]>

* address comment

Signed-off-by: HunDunDM <[email protected]>

Co-authored-by: HunDunDM <[email protected]>
  • Loading branch information
ti-chi-bot and HunDunDM authored Nov 8, 2021
1 parent a2d6504 commit e8e8c1f
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 6 deletions.
3 changes: 2 additions & 1 deletion server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,8 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
// Once flow has changed, will update the cache.
// Because keys and bytes are strongly related, only bytes are judged.
if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
region.GetRoundBytesRead() != origin.GetRoundBytesRead() {
region.GetRoundBytesRead() != origin.GetRoundBytesRead() ||
region.flowRoundDivisor < origin.flowRoundDivisor {
saveCache, needSync = true, true
}

Expand Down
3 changes: 2 additions & 1 deletion server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ func WithDownPeers(downPeers []*pdpb.PeerStats) RegionCreateOption {

// WithFlowRoundByDigit set the digit, which use to round to the nearest number
func WithFlowRoundByDigit(digit int) RegionCreateOption {
flowRoundDivisor := uint64(math.Pow10(digit))
return func(region *RegionInfo) {
region.flowRoundDivisor = uint64(math.Pow10(digit))
region.flowRoundDivisor = flowRoundDivisor
}
}

Expand Down
85 changes: 85 additions & 0 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,91 @@ func (s *testRegionInfoSuite) TestRegionWriteRate(c *C) {
}
}

var _ = Suite(&testRegionGuideSuite{})

type testRegionGuideSuite struct {
RegionGuide RegionGuideFunc
}

func (s *testRegionGuideSuite) SetUpSuite(c *C) {
s.RegionGuide = GenerateRegionGuideFunc(false)
}

func (s *testRegionGuideSuite) TestNeedSync(c *C) {
meta := &metapb.Region{
Id: 1000,
StartKey: []byte("a"),
EndKey: []byte("z"),
RegionEpoch: &metapb.RegionEpoch{ConfVer: 100, Version: 100},
Peers: []*metapb.Peer{
{Id: 11, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 12, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 13, StoreId: 1, Role: metapb.PeerRole_Voter},
},
}
region := NewRegionInfo(meta, meta.Peers[0])

testcases := []struct {
optionsA []RegionCreateOption
optionsB []RegionCreateOption
needSync bool
}{
{
optionsB: []RegionCreateOption{WithLeader(nil)},
needSync: true,
},
{
optionsB: []RegionCreateOption{WithLeader(meta.Peers[1])},
needSync: true,
},
{
optionsB: []RegionCreateOption{WithPendingPeers(meta.Peers[1:2])},
needSync: true,
},
{
optionsB: []RegionCreateOption{WithDownPeers([]*pdpb.PeerStats{{Peer: meta.Peers[1], DownSeconds: 600}})},
needSync: true,
},
{
optionsA: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(2)},
optionsB: []RegionCreateOption{SetWrittenBytes(300), WithFlowRoundByDigit(2)},
needSync: true,
},
{
optionsA: []RegionCreateOption{SetWrittenBytes(250), WithFlowRoundByDigit(2)},
optionsB: []RegionCreateOption{SetWrittenBytes(349), WithFlowRoundByDigit(2)},
needSync: false,
},
{
optionsA: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(4)},
optionsB: []RegionCreateOption{SetWrittenBytes(300), WithFlowRoundByDigit(4)},
needSync: false,
},
{
optionsA: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(4)},
optionsB: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(2)},
needSync: true,
},
{
optionsA: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(127)},
optionsB: []RegionCreateOption{SetWrittenBytes(0), WithFlowRoundByDigit(2)},
needSync: false,
},
{
optionsA: []RegionCreateOption{SetWrittenBytes(0), WithFlowRoundByDigit(2)},
optionsB: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(127)},
needSync: true,
},
}

for _, t := range testcases {
regionA := region.Clone(t.optionsA...)
regionB := region.Clone(t.optionsB...)
_, _, _, needSync := s.RegionGuide(regionA, regionB)
c.Assert(needSync, Equals, t.needSync)
}
}

var _ = Suite(&testRegionMapSuite{})

type testRegionMapSuite struct{}
Expand Down
8 changes: 4 additions & 4 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,9 +666,9 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) {

// RegionHeartbeat implements gRPC PDServer.
func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
server := &heartbeatServer{stream: stream}
FlowRoundByDigit := s.persistOptions.GetPDServerConfig().FlowRoundByDigit
var (
server = &heartbeatServer{stream: stream}
flowRoundOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit)
forwardStream pdpb.PD_RegionHeartbeatClient
cancel context.CancelFunc
lastForwardedHost string
Expand Down Expand Up @@ -750,11 +750,11 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc()
s.hbStreams.BindStream(storeID, server)
// refresh FlowRoundByDigit
FlowRoundByDigit = s.persistOptions.GetPDServerConfig().FlowRoundByDigit
flowRoundOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit)
lastBind = time.Now()
}

region := core.RegionFromHeartbeat(request, core.WithFlowRoundByDigit(FlowRoundByDigit))
region := core.RegionFromHeartbeat(request, flowRoundOption)
if region.GetLeader() == nil {
log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil))
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()
Expand Down

0 comments on commit e8e8c1f

Please sign in to comment.