Skip to content

Commit

Permalink
member: avoid frequent campaign times (#7301)
Browse files Browse the repository at this point in the history
close #7251, ref #7377

when pd leader frequently campaign leader, but etcd leader did not change.
We need to prevent this pd leader campaign and resign to another member.

Signed-off-by: husharp <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
HuSharp and ti-chi-bot[bot] authored Nov 16, 2023
1 parent 181fdc9 commit 0ebf4b2
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 16 deletions.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ type Client interface {
LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error)
// StoreGlobalConfig set the config from etcd
StoreGlobalConfig(ctx context.Context, configPath string, items []GlobalConfigItem) error
// WatchGlobalConfig returns an stream with all global config and updates
// WatchGlobalConfig returns a stream with all global config and updates
WatchGlobalConfig(ctx context.Context, configPath string, revision int64) (chan []GlobalConfigItem, error)
// UpdateOption updates the client option.
UpdateOption(option DynamicOption, value interface{}) error
Expand Down
32 changes: 27 additions & 5 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ import (
"go.uber.org/zap"
)

const watchLoopUnhealthyTimeout = 60 * time.Second
const (
watchLoopUnhealthyTimeout = 60 * time.Second
campaignTimesRecordTimeout = 5 * time.Minute
)

// GetLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func GetLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) {
Expand Down Expand Up @@ -62,20 +65,24 @@ type Leadership struct {
keepAliveCtx context.Context
keepAliveCancelFunc context.CancelFunc
keepAliveCancelFuncLock syncutil.Mutex
// CampaignTimes is used to record the campaign times of the leader within `campaignTimesRecordTimeout`.
// It is ordered by time to prevent the leader from campaigning too frequently.
CampaignTimes []time.Time
}

// NewLeadership creates a new Leadership.
func NewLeadership(client *clientv3.Client, leaderKey, purpose string) *Leadership {
leadership := &Leadership{
purpose: purpose,
client: client,
leaderKey: leaderKey,
purpose: purpose,
client: client,
leaderKey: leaderKey,
CampaignTimes: make([]time.Time, 0, 10),
}
return leadership
}

// getLease gets the lease of leadership, only if leadership is valid,
// i.e the owner is a true leader, the lease is not nil.
// i.e. the owner is a true leader, the lease is not nil.
func (ls *Leadership) getLease() *lease {
l := ls.lease.Load()
if l == nil {
Expand Down Expand Up @@ -104,8 +111,23 @@ func (ls *Leadership) GetLeaderKey() string {
return ls.leaderKey
}

// addCampaignTimes is used to add the campaign times of the leader.
func (ls *Leadership) addCampaignTimes() {
for i := len(ls.CampaignTimes) - 1; i >= 0; i-- {
if time.Since(ls.CampaignTimes[i]) > campaignTimesRecordTimeout {
// remove the time which is more than `campaignTimesRecordTimeout`
// array is sorted by time
ls.CampaignTimes = ls.CampaignTimes[i:]
break
}
}

ls.CampaignTimes = append(ls.CampaignTimes, time.Now())
}

// Campaign is used to campaign the leader with given lease and returns a leadership
func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error {
ls.addCampaignTimes()
ls.leaderValue = leaderData
// Create a new lease to campaign
newLease := &lease{
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (s *Server) primaryElectionLoop() {

func (s *Server) campaignLeader() {
log.Info("start to campaign the primary/leader", zap.String("campaign-resource-manager-primary-name", s.participant.Name()))
if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil {
if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign resource manager primary meets error due to txn conflict, another server may campaign successfully",
zap.String("campaign-resource-manager-primary-name", s.participant.Name()))
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (s *Server) primaryElectionLoop() {

func (s *Server) campaignLeader() {
log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name()))
if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil {
if err := s.participant.CampaignLeader(s.Context(), s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully",
zap.String("campaign-scheduling-primary-name", s.participant.Name()))
Expand Down
12 changes: 11 additions & 1 deletion pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const (
// The timeout to wait transfer etcd leader to complete.
moveLeaderTimeout = 5 * time.Second
dcLocationConfigEtcdPrefix = "dc-location"
// If the campaign times is more than this value in `campaignTimesRecordTimeout`, the PD will resign and campaign again.
campaignLeaderFrequencyTimes = 3
)

// EmbeddedEtcdMember is used for the election related logic. It implements Member interface.
Expand Down Expand Up @@ -177,7 +179,15 @@ func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time {

// CampaignLeader is used to campaign a PD member's leadership
// and make it become a PD leader.
func (m *EmbeddedEtcdMember) CampaignLeader(leaseTimeout int64) error {
// leader should be changed when campaign leader frequently.
func (m *EmbeddedEtcdMember) CampaignLeader(ctx context.Context, leaseTimeout int64) error {
if len(m.leadership.CampaignTimes) >= campaignLeaderFrequencyTimes {
log.Warn("campaign times is too frequent, resign and campaign again",
zap.String("leader-name", m.Name()), zap.String("leader-key", m.GetLeaderPath()))
// remove all campaign times
m.leadership.CampaignTimes = nil
return m.ResignEtcdLeader(ctx, m.Name(), "")
}
return m.leadership.Campaign(leaseTimeout, m.MemberValue())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (m *Participant) GetLeadership() *election.Leadership {
}

// CampaignLeader is used to campaign the leadership and make it become a leader.
func (m *Participant) CampaignLeader(leaseTimeout int64) error {
func (m *Participant) CampaignLeader(_ context.Context, leaseTimeout int64) error {
if !m.campaignCheck() {
return errs.ErrCheckCampaign
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ func (info *DCLocationInfo) clone() DCLocationInfo {
type ElectionMember interface {
// ID returns the unique ID in the election group. For example, it can be unique
// server id of a cluster or the unique keyspace group replica id of the election
// group comprised of the replicas of a keyspace group.
// group composed of the replicas of a keyspace group.
ID() uint64
// ID returns the unique name in the election group.
// Name returns the unique name in the election group.
Name() string
// MemberValue returns the member value.
MemberValue() string
// GetMember() returns the current member
// GetMember returns the current member
GetMember() interface{}
// Client returns the etcd client.
Client() *clientv3.Client
Expand All @@ -124,7 +124,7 @@ type ElectionMember interface {
// KeepLeader is used to keep the leader's leadership.
KeepLeader(ctx context.Context)
// CampaignLeader is used to campaign the leadership and make it become a leader in an election group.
CampaignLeader(leaseTimeout int64) error
CampaignLeader(ctx context.Context, leaseTimeout int64) error
// ResetLeader is used to reset the member's current leadership.
// Basically it will reset the leader lease and unset leader info.
ResetLeader()
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
log.Info("start to campaign the primary",
logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0),
zap.String("campaign-tso-primary-name", gta.member.Name()))
if err := gta.am.member.CampaignLeader(gta.am.leaderLease); err != nil {
if err := gta.am.member.CampaignLeader(gta.ctx, gta.am.leaderLease); err != nil {
if errors.Is(err, errs.ErrEtcdTxnConflict) {
log.Info("campaign tso primary meets error due to txn conflict, another tso server may campaign successfully",
logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0),
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1636,7 +1636,7 @@ func (s *Server) leaderLoop() {

func (s *Server) campaignLeader() {
log.Info(fmt.Sprintf("start to campaign %s leader", s.mode), zap.String("campaign-leader-name", s.Name()))
if err := s.member.CampaignLeader(s.cfg.LeaderLease); err != nil {
if err := s.member.CampaignLeader(s.ctx, s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info(fmt.Sprintf("campaign %s leader meets error due to txn conflict, another PD/API server may campaign successfully", s.mode),
zap.String("campaign-leader-name", s.Name()))
Expand Down
7 changes: 7 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ func (s *TestServer) Destroy() error {
return nil
}

// ResetPDLeader resigns the leader of the server.
func (s *TestServer) ResetPDLeader() {
s.Lock()
defer s.Unlock()
s.server.GetMember().ResetLeader()
}

// ResignLeader resigns the leader of the server.
func (s *TestServer) ResignLeader() error {
s.Lock()
Expand Down
24 changes: 24 additions & 0 deletions tests/server/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,30 @@ func TestMoveLeader(t *testing.T) {
}
}

func TestCampaignLeaderFrequently(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 5)
defer cluster.Destroy()
re.NoError(err)

err = cluster.RunInitialServers()
re.NoError(err)
cluster.WaitLeader()
leader := cluster.GetLeader()
re.NotEmpty(cluster.GetLeader())

for i := 0; i < 3; i++ {
cluster.GetServers()[cluster.GetLeader()].ResetPDLeader()
cluster.WaitLeader()
}
// leader should be changed when campaign leader frequently
cluster.WaitLeader()
re.NotEmpty(cluster.GetLeader())
re.NotEqual(leader, cluster.GetLeader())
}

func TestGetLeader(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit 0ebf4b2

Please sign in to comment.