Skip to content

Commit

Permalink
This is an automated cherry-pick of #3808
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
HunDunDM authored and ti-chi-bot committed Aug 16, 2021
1 parent 6864bb3 commit 6e4a041
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 8 deletions.
8 changes: 7 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/component"
Expand Down Expand Up @@ -538,6 +537,8 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
return nil
}

var regionGuide = core.GenerateRegionGuideFunc(true)

// processRegionHeartbeat updates the region information.
func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
c.RLock()
Expand All @@ -553,6 +554,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
<<<<<<< HEAD
var saveKV, saveCache, isNew, needSync bool
if origin == nil {
log.Debug("insert new region",
Expand Down Expand Up @@ -624,6 +626,10 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}

if len(writeItems) == 0 && len(readItems) == 0 && !saveKV && !saveCache && !isNew {
=======
isNew, saveKV, saveCache, needSync := regionGuide(region, origin)
if !saveKV && !saveCache && !isNew {
>>>>>>> 0affd19af (region_syncer: reduce saveKV of client (#3808))
return nil
}

Expand Down
96 changes: 96 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/logutil"
"go.uber.org/zap"
)

// errRegionIsStale is error info for region is stale.
Expand Down Expand Up @@ -436,12 +439,105 @@ func (r *RegionInfo) GetReplicationStatus() *replication_modepb.RegionReplicatio
return r.replicationStatus
}

<<<<<<< HEAD
// regionMap wraps a map[uint64]*core.RegionInfo and supports randomly pick a region.
type regionMap struct {
m map[uint64]*RegionInfo
totalSize int64
totalKeys int64
}
=======
// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
noLog := func(msg string, fields ...zap.Field) {}
debug, info := noLog, noLog
if enableLog {
debug = log.Debug
info = log.Info
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) {
if origin == nil {
debug("insert new region",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
saveKV, saveCache, isNew = true, true, true
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
if r.GetVersion() > o.GetVersion() {
info("region Version changed",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactString("detail", DiffRegionKeyInfo(origin, region)),
zap.Uint64("old-version", o.GetVersion()),
zap.Uint64("new-version", r.GetVersion()),
)
saveKV, saveCache = true, true
}
if r.GetConfVer() > o.GetConfVer() {
info("region ConfVer changed",
zap.Uint64("region-id", region.GetID()),
zap.String("detail", DiffRegionPeersInfo(origin, region)),
zap.Uint64("old-confver", o.GetConfVer()),
zap.Uint64("new-confver", r.GetConfVer()),
)
saveKV, saveCache = true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() == 0 {
isNew = true
} else {
info("leader changed",
zap.Uint64("region-id", region.GetID()),
zap.Uint64("from", origin.GetLeader().GetStoreId()),
zap.Uint64("to", region.GetLeader().GetStoreId()),
)
}
saveCache, needSync = true, true
}
if !SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) {
debug("down-peers changed", zap.Uint64("region-id", region.GetID()))
saveCache, needSync = true, true
}
if !SortedPeersEqual(region.GetPendingPeers(), origin.GetPendingPeers()) {
debug("pending-peers changed", zap.Uint64("region-id", region.GetID()))
saveCache, needSync = true, true
}
if len(region.GetPeers()) != len(origin.GetPeers()) {
saveKV, saveCache = true, true
}

if region.GetApproximateSize() != origin.GetApproximateSize() ||
region.GetApproximateKeys() != origin.GetApproximateKeys() {
saveCache = true
}
// Once flow has changed, will update the cache.
// Because keys and bytes are strongly related, only bytes are judged.
if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
region.GetRoundBytesRead() != origin.GetRoundBytesRead() {
saveCache, needSync = true, true
}

if region.GetReplicationStatus().GetState() != replication_modepb.RegionReplicationState_UNKNOWN &&
(region.GetReplicationStatus().GetState() != origin.GetReplicationStatus().GetState() ||
region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) {
saveCache = true
}
}
return
}
}

// regionMap wraps a map[uint64]*regionItem and supports randomly pick a region. They are the leaves of regionTree.
type regionMap map[uint64]*regionItem
>>>>>>> 0affd19af (region_syncer: reduce saveKV of client (#3808))

func newRegionMap() *regionMap {
return &regionMap{
Expand Down
27 changes: 22 additions & 5 deletions server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (s *RegionSyncer) syncRegion(conn *grpc.ClientConn) (ClientStream, error) {
return syncStream, nil
}

var regionGuide = core.GenerateRegionGuideFunc(false)

// StartSyncWithLeader starts to sync with leader.
func (s *RegionSyncer) StartSyncWithLeader(addr string) {
s.wg.Add(1)
Expand All @@ -132,7 +134,9 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
go func() {
defer s.wg.Done()
// used to load region from kv storage to cache storage.
err := s.server.GetStorage().LoadRegionsOnce(s.server.GetBasicCluster().CheckAndPutRegion)
bc := s.server.GetBasicCluster()
storage := s.server.GetStorage()
err := storage.LoadRegionsOnce(bc.CheckAndPutRegion)
if err != nil {
log.Warn("failed to load regions.", errs.ZapError(err))
}
Expand All @@ -149,9 +153,9 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Error("cannot establish connection with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err))
continue
}
defer conn.Close()
break
}
defer conn.Close()

// Start syncing data.
for {
Expand All @@ -172,6 +176,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
time.Sleep(time.Second)
continue
}

log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.GetNextIndex()))
for {
resp, err := stream.Recv()
Expand Down Expand Up @@ -201,7 +206,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
region *core.RegionInfo
regionLeader *metapb.Peer
)
if len(regionLeaders) > i && regionLeaders[i].Id != 0 {
if len(regionLeaders) > i && regionLeaders[i].GetId() != 0 {
regionLeader = regionLeaders[i]
}
if hasStats {
Expand All @@ -215,11 +220,23 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
region = core.NewRegionInfo(r, regionLeader)
}

s.server.GetBasicCluster().CheckAndPutRegion(region)
err = s.server.GetStorage().SaveRegion(r)
origin, err := bc.PreCheckPutRegion(region)
if err != nil {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
continue
}
_, saveKV, _, _ := regionGuide(region, origin)
overlaps := bc.PutRegion(region)

if saveKV {
err = storage.SaveRegion(r)
}
if err == nil {
s.history.Record(region)
}
for _, old := range overlaps {
_ = storage.DeleteRegion(old.GetMeta())
}
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions server/region_syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func (s *RegionSyncer) RunServer(regionNotifier <-chan *core.RegionInfo, quit ch
return
case first := <-regionNotifier:
requests = append(requests, first.GetMeta())
stats := append(stats, first.GetStat())
leaders := append(leaders, first.GetLeader())
stats = append(stats, first.GetStat())
leaders = append(leaders, first.GetLeader())
startIndex := s.history.GetNextIndex()
s.history.Record(first)
pending := len(regionNotifier)
Expand All @@ -140,6 +140,8 @@ func (s *RegionSyncer) RunServer(regionNotifier <-chan *core.RegionInfo, quit ch
s.broadcast(alive)
}
requests = requests[:0]
stats = stats[:0]
leaders = leaders[:0]
}
}

Expand Down Expand Up @@ -210,9 +212,11 @@ func (s *RegionSyncer) syncHistoryRegion(request *pdpb.SyncRegionRequest, stream
lastIndex += len(metas)
if err := stream.Send(resp); err != nil {
log.Error("failed to send sync region response", errs.ZapError(errs.ErrGRPCSend, err))
return err
}
metas = metas[:0]
stats = stats[:0]
leaders = leaders[:0]
}
log.Info("requested server has completed full synchronization with server",
zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Duration("cost", time.Since(start)))
Expand Down

0 comments on commit 6e4a041

Please sign in to comment.