Skip to content

Commit

Permalink
Merge branch 'release-5.1' into cherry-pick-4097-to-release-5.1
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Sep 15, 2021
2 parents 2b91dee + 2b8584e commit c785920
Show file tree
Hide file tree
Showing 18 changed files with 462 additions and 221 deletions.
11 changes: 11 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (mc *Cluster) LoadRegion(regionID uint64, followerIds ...uint64) {

// GetStoresLoads gets stores load statistics.
func (mc *Cluster) GetStoresLoads() map[uint64][]float64 {
mc.HotStat.FilterUnhealthyStore(mc)
return mc.HotStat.GetStoresLoads()
}

Expand Down Expand Up @@ -437,6 +438,16 @@ func (mc *Cluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) {
mc.PutStore(newStore)
}

// SetStoreEvictLeader set store whether evict leader.
func (mc *Cluster) SetStoreEvictLeader(storeID uint64, enableEvictLeader bool) {
store := mc.GetStore(storeID)
if enableEvictLeader {
mc.PutStore(store.Clone(core.PauseLeaderTransfer()))
} else {
mc.PutStore(store.Clone(core.ResumeLeaderTransfer()))
}
}

// UpdateStoreRegionWeight updates store region weight.
func (mc *Cluster) UpdateStoreRegionWeight(storeID uint64, weight float64) {
store := mc.GetStore(storeID)
Expand Down
13 changes: 13 additions & 0 deletions pkg/typeutil/comparison.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,16 @@ func MinDuration(a, b time.Duration) time.Duration {
}
return b
}

// StringsEqual checks if two string slices are equal. Empyt slice and nil are considered equal.
func StringsEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
5 changes: 4 additions & 1 deletion server/api/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@ func (s *testClusterSuite) TestCluster(c *C) {
s.testGetClusterStatus(c)
s.svr.GetPersistOptions().SetPlacementRuleEnabled(true)
s.svr.GetPersistOptions().GetReplicationConfig().LocationLabels = []string{"host"}
rule := s.svr.GetRaftCluster().GetRuleManager().GetRule("pd", "default")
rm := s.svr.GetRaftCluster().GetRuleManager()
rule := rm.GetRule("pd", "default")
rule.LocationLabels = []string{"host"}
rule.Count = 1
rm.SetRule(rule)

// Test set the config
url := fmt.Sprintf("%s/cluster", s.urlPrefix)
c1 := &metapb.Cluster{}
Expand Down
4 changes: 3 additions & 1 deletion server/api/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ func (s *testRuleSuite) TestSetAll(c *C) {
rule6 := placement.Rule{GroupID: "pd", ID: "default", StartKeyHex: "", EndKeyHex: "", Role: "voter", Count: 3}

s.svr.GetPersistOptions().GetReplicationConfig().LocationLabels = []string{"host"}
s.svr.GetRaftCluster().GetRuleManager().GetRule("pd", "default").LocationLabels = []string{"host"}
defaultRule := s.svr.GetRaftCluster().GetRuleManager().GetRule("pd", "default")
defaultRule.LocationLabels = []string{"host"}
s.svr.GetRaftCluster().GetRuleManager().SetRule(defaultRule)

successData, err := json.Marshal([]*placement.Rule{&rule1, &rule2})
c.Assert(err, IsNil)
Expand Down
73 changes: 3 additions & 70 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/component"
Expand Down Expand Up @@ -567,6 +566,8 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
return nil
}

var regionGuide = core.GenerateRegionGuideFunc(true)

// processRegionHeartbeat updates the region information.
func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
c.RLock()
Expand All @@ -591,75 +592,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
var saveKV, saveCache, isNew, needSync bool
if origin == nil {
log.Debug("insert new region",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", core.RegionToHexMeta(region.GetMeta())))
saveKV, saveCache, isNew = true, true, true
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
if r.GetVersion() > o.GetVersion() {
log.Info("region Version changed",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactString("detail", core.DiffRegionKeyInfo(origin, region)),
zap.Uint64("old-version", o.GetVersion()),
zap.Uint64("new-version", r.GetVersion()),
)
saveKV, saveCache = true, true
}
if r.GetConfVer() > o.GetConfVer() {
log.Info("region ConfVer changed",
zap.Uint64("region-id", region.GetID()),
zap.String("detail", core.DiffRegionPeersInfo(origin, region)),
zap.Uint64("old-confver", o.GetConfVer()),
zap.Uint64("new-confver", r.GetConfVer()),
)
saveKV, saveCache = true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() == 0 {
isNew = true
} else {
log.Info("leader changed",
zap.Uint64("region-id", region.GetID()),
zap.Uint64("from", origin.GetLeader().GetStoreId()),
zap.Uint64("to", region.GetLeader().GetStoreId()),
)
}
saveCache, needSync = true, true
}
if !core.SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) {
log.Debug("down-peers changed", zap.Uint64("region-id", region.GetID()))
saveCache, needSync = true, true
}
if !core.SortedPeersEqual(region.GetPendingPeers(), origin.GetPendingPeers()) {
log.Debug("pending-peers changed", zap.Uint64("region-id", region.GetID()))
saveCache, needSync = true, true
}
if len(region.GetPeers()) != len(origin.GetPeers()) {
saveKV, saveCache = true, true
}

if region.GetApproximateSize() != origin.GetApproximateSize() ||
region.GetApproximateKeys() != origin.GetApproximateKeys() {
saveCache = true
}
// Once flow has changed, will update the cache.
// Because keys and bytes are strongly related, only bytes are judged.
if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
region.GetRoundBytesRead() != origin.GetRoundBytesRead() {
saveCache, needSync = true, true
}

if region.GetReplicationStatus().GetState() != replication_modepb.RegionReplicationState_UNKNOWN &&
(region.GetReplicationStatus().GetState() != origin.GetReplicationStatus().GetState() ||
region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) {
saveCache = true
}
}

isNew, saveKV, saveCache, needSync := regionGuide(region, origin)
if !saveKV && !saveCache && !isNew {
return nil
}
Expand Down
91 changes: 91 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/logutil"
"go.uber.org/zap"
)

// errRegionIsStale is error info for region is stale.
Expand Down Expand Up @@ -458,6 +461,94 @@ func (r *RegionInfo) GetReplicationStatus() *replication_modepb.RegionReplicatio
return r.replicationStatus
}

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
noLog := func(msg string, fields ...zap.Field) {}
debug, info := noLog, noLog
if enableLog {
debug = log.Debug
info = log.Info
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) {
if origin == nil {
debug("insert new region",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
saveKV, saveCache, isNew = true, true, true
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
if r.GetVersion() > o.GetVersion() {
info("region Version changed",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactString("detail", DiffRegionKeyInfo(origin, region)),
zap.Uint64("old-version", o.GetVersion()),
zap.Uint64("new-version", r.GetVersion()),
)
saveKV, saveCache = true, true
}
if r.GetConfVer() > o.GetConfVer() {
info("region ConfVer changed",
zap.Uint64("region-id", region.GetID()),
zap.String("detail", DiffRegionPeersInfo(origin, region)),
zap.Uint64("old-confver", o.GetConfVer()),
zap.Uint64("new-confver", r.GetConfVer()),
)
saveKV, saveCache = true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() == 0 {
isNew = true
} else {
info("leader changed",
zap.Uint64("region-id", region.GetID()),
zap.Uint64("from", origin.GetLeader().GetStoreId()),
zap.Uint64("to", region.GetLeader().GetStoreId()),
)
}
saveCache, needSync = true, true
}
if !SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) {
debug("down-peers changed", zap.Uint64("region-id", region.GetID()))
saveCache, needSync = true, true
}
if !SortedPeersEqual(region.GetPendingPeers(), origin.GetPendingPeers()) {
debug("pending-peers changed", zap.Uint64("region-id", region.GetID()))
saveCache, needSync = true, true
}
if len(region.GetPeers()) != len(origin.GetPeers()) {
saveKV, saveCache = true, true
}

if region.GetApproximateSize() != origin.GetApproximateSize() ||
region.GetApproximateKeys() != origin.GetApproximateKeys() {
saveCache = true
}
// Once flow has changed, will update the cache.
// Because keys and bytes are strongly related, only bytes are judged.
if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
region.GetRoundBytesRead() != origin.GetRoundBytesRead() {
saveCache, needSync = true, true
}

if region.GetReplicationStatus().GetState() != replication_modepb.RegionReplicationState_UNKNOWN &&
(region.GetReplicationStatus().GetState() != origin.GetReplicationStatus().GetState() ||
region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) {
saveCache = true
}
}
return
}
}

// regionMap wraps a map[uint64]*regionItem and supports randomly pick a region. They are the leaves of regionTree.
type regionMap map[uint64]*regionItem

Expand Down
27 changes: 22 additions & 5 deletions server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (s *RegionSyncer) syncRegion(conn *grpc.ClientConn) (ClientStream, error) {
return syncStream, nil
}

var regionGuide = core.GenerateRegionGuideFunc(false)

// StartSyncWithLeader starts to sync with leader.
func (s *RegionSyncer) StartSyncWithLeader(addr string) {
s.wg.Add(1)
Expand All @@ -132,7 +134,9 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
go func() {
defer s.wg.Done()
// used to load region from kv storage to cache storage.
err := s.server.GetStorage().LoadRegionsOnce(s.server.GetBasicCluster().CheckAndPutRegion)
bc := s.server.GetBasicCluster()
storage := s.server.GetStorage()
err := storage.LoadRegionsOnce(bc.CheckAndPutRegion)
if err != nil {
log.Warn("failed to load regions.", errs.ZapError(err))
}
Expand All @@ -149,9 +153,9 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Error("cannot establish connection with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err))
continue
}
defer conn.Close()
break
}
defer conn.Close()

// Start syncing data.
for {
Expand All @@ -172,6 +176,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
time.Sleep(time.Second)
continue
}

log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex()))
for {
resp, err := stream.Recv()
Expand Down Expand Up @@ -201,7 +206,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
region *core.RegionInfo
regionLeader *metapb.Peer
)
if len(regionLeaders) > i && regionLeaders[i].Id != 0 {
if len(regionLeaders) > i && regionLeaders[i].GetId() != 0 {
regionLeader = regionLeaders[i]
}
if hasStats {
Expand All @@ -215,11 +220,23 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
region = core.NewRegionInfo(r, regionLeader)
}

s.server.GetBasicCluster().CheckAndPutRegion(region)
err = s.server.GetStorage().SaveRegion(r)
origin, err := bc.PreCheckPutRegion(region)
if err != nil {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
continue
}
_, saveKV, _, _ := regionGuide(region, origin)
overlaps := bc.PutRegion(region)

if saveKV {
err = storage.SaveRegion(r)
}
if err == nil {
s.history.Record(region)
}
for _, old := range overlaps {
_ = storage.DeleteRegion(old.GetMeta())
}
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions server/region_syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func (s *RegionSyncer) RunServer(regionNotifier <-chan *core.RegionInfo, quit ch
return
case first := <-regionNotifier:
requests = append(requests, first.GetMeta())
stats := append(stats, first.GetStat())
leaders := append(leaders, first.GetLeader())
stats = append(stats, first.GetStat())
leaders = append(leaders, first.GetLeader())
startIndex := s.history.GetNextIndex()
s.history.Record(first)
pending := len(regionNotifier)
Expand All @@ -140,6 +140,8 @@ func (s *RegionSyncer) RunServer(regionNotifier <-chan *core.RegionInfo, quit ch
s.broadcast(alive)
}
requests = requests[:0]
stats = stats[:0]
leaders = leaders[:0]
}
}

Expand Down Expand Up @@ -210,9 +212,11 @@ func (s *RegionSyncer) syncHistoryRegion(request *pdpb.SyncRegionRequest, stream
lastIndex += len(metas)
if err := stream.Send(resp); err != nil {
log.Error("failed to send sync region response", errs.ZapError(errs.ErrGRPCSend, err))
return err
}
metas = metas[:0]
stats = stats[:0]
leaders = leaders[:0]
}
log.Info("requested server has completed full synchronization with server",
zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Duration("cost", time.Since(start)))
Expand Down
12 changes: 12 additions & 0 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ func (o *Operator) Status() OpStatus {
return o.status.Status()
}

// CheckAndGetStatus returns operator status after `CheckExpired` and `CheckTimeout`.
func (o *Operator) CheckAndGetStatus() OpStatus {
switch {
case o.CheckExpired():
return EXPIRED
case o.CheckTimeout():
return TIMEOUT
default:
return o.Status()
}
}

// GetReachTimeOf returns the time when operator reaches the given status.
func (o *Operator) GetReachTimeOf(st OpStatus) time.Time {
return o.status.ReachTimeOf(st)
Expand Down
Loading

0 comments on commit c785920

Please sign in to comment.