Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: fix the leader cannot election after pd leader lost while etcd leader intact (#6447) #6461

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type EmbeddedEtcdMember struct {
// etcd leader key when the PD node is successfully elected as the PD leader
// of the cluster. Every write will use it to check PD leadership.
memberValue string
// lastLeaderUpdatedTime is the last time when the leader is updated.
lastLeaderUpdatedTime atomic.Value
}

// NewMember create a new Member.
Expand Down Expand Up @@ -140,11 +142,13 @@ func (m *EmbeddedEtcdMember) GetLeader() *pdpb.Member {
// setLeader sets the member's PD leader.
func (m *EmbeddedEtcdMember) setLeader(member *pdpb.Member) {
m.leader.Store(member)
m.lastLeaderUpdatedTime.Store(time.Now())
}

// unsetLeader unsets the member's PD leader.
func (m *EmbeddedEtcdMember) unsetLeader() {
m.leader.Store(&pdpb.Member{})
m.lastLeaderUpdatedTime.Store(time.Now())
}

// EnableLeader sets the member itself to a PD leader.
Expand All @@ -162,6 +166,15 @@ func (m *EmbeddedEtcdMember) GetLeadership() *election.Leadership {
return m.leadership
}

// GetLastLeaderUpdatedTime returns the last time when the leader is updated.
func (m *EmbeddedEtcdMember) GetLastLeaderUpdatedTime() time.Time {
lastLeaderUpdatedTime := m.lastLeaderUpdatedTime.Load()
if lastLeaderUpdatedTime == nil {
return time.Time{}
}
return lastLeaderUpdatedTime.(time.Time)
}

// CampaignLeader is used to campaign a PD member's leadership
// and make it become a PD leader.
func (m *EmbeddedEtcdMember) CampaignLeader(leaseTimeout int64) error {
Expand Down
16 changes: 15 additions & 1 deletion pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type Participant struct {
// preCampaignChecker is called before the campaign. If it returns false, the
// campaign will be skipped.
preCampaignChecker leadershipCheckFunc
// lastLeaderUpdatedTime is the last time when the leader is updated.
lastLeaderUpdatedTime atomic.Value
}

// NewParticipant create a new Participant.
Expand Down Expand Up @@ -78,7 +80,8 @@ func (m *Participant) InitInfo(name string, id uint64, rootPath string, leaderNa
m.rootPath = rootPath
m.leaderPath = path.Join(rootPath, leaderName)
m.leadership = election.NewLeadership(m.client, m.GetLeaderPath(), purpose)
log.Info("Participant joining election", zap.Stringer("participant-info", m.member), zap.String("leader-path", m.leaderPath))
m.lastLeaderUpdatedTime.Store(time.Now())
log.Info("participant joining election", zap.Stringer("participant-info", m.member), zap.String("leader-path", m.leaderPath))
}

// ID returns the unique ID for this participant in the election group
Expand Down Expand Up @@ -143,11 +146,13 @@ func (m *Participant) GetLeader() *tsopb.Participant {
// setLeader sets the member's leader.
func (m *Participant) setLeader(member *tsopb.Participant) {
m.leader.Store(member)
m.lastLeaderUpdatedTime.Store(time.Now())
}

// unsetLeader unsets the member's leader.
func (m *Participant) unsetLeader() {
m.leader.Store(&tsopb.Participant{})
m.lastLeaderUpdatedTime.Store(time.Now())
}

// EnableLeader declares the member itself to be the leader.
Expand All @@ -160,6 +165,15 @@ func (m *Participant) GetLeaderPath() string {
return m.leaderPath
}

// GetLastLeaderUpdatedTime returns the last time when the leader is updated.
func (m *Participant) GetLastLeaderUpdatedTime() time.Time {
lastLeaderUpdatedTime := m.lastLeaderUpdatedTime.Load()
if lastLeaderUpdatedTime == nil {
return time.Time{}
}
return lastLeaderUpdatedTime.(time.Time)
}

// GetLeadership returns the leadership of the member.
func (m *Participant) GetLeadership() *election.Leadership {
return m.leadership
Expand Down
2 changes: 2 additions & 0 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ type ElectionMember interface {
GetLeaderPath() string
// GetLeadership returns the leadership of the PD member.
GetLeadership() *election.Leadership
// GetLastLeaderUpdatedTime returns the last time when the leader is updated.
GetLastLeaderUpdatedTime() time.Time
// GetDCLocationPathPrefix returns the dc-location path prefix of the cluster.
GetDCLocationPathPrefix() string
// GetDCLocationPath returns the dc-location path of a member with the given member ID.
Expand Down
41 changes: 41 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"math/rand"
"net/http"
"os"
"path"
Expand Down Expand Up @@ -106,6 +107,9 @@ const (
retryIntervalGetServicePrimary = 100 * time.Millisecond
// TODO: move it to etcdutil
watchEtcdChangeRetryInterval = 1 * time.Second

lostPDLeaderMaxTimeoutSecs = 10
lostPDLeaderReElectionFactor = 10
)

// EtcdStartTimeout the timeout of the startup etcd.
Expand Down Expand Up @@ -1408,6 +1412,14 @@ func (s *Server) leaderLoop() {
}

leader, checkAgain := s.member.CheckLeader()
// add failpoint to test leader check go to stuck.
failpoint.Inject("leaderLoopCheckAgain", func(val failpoint.Value) {
memberString := val.(string)
memberID, _ := strconv.ParseUint(memberString, 10, 64)
if s.member.ID() == memberID {
checkAgain = true
}
})
if checkAgain {
continue
}
Expand Down Expand Up @@ -1435,6 +1447,25 @@ func (s *Server) leaderLoop() {
// To make sure the etcd leader and PD leader are on the same server.
etcdLeader := s.member.GetEtcdLeader()
if etcdLeader != s.member.ID() {
if s.member.GetLeader() == nil {
lastUpdated := s.member.GetLastLeaderUpdatedTime()
// use random timeout to avoid leader campaigning storm.
randomTimeout := time.Duration(rand.Intn(int(lostPDLeaderMaxTimeoutSecs)))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration
// add failpoint to test the campaign leader logic.
failpoint.Inject("timeoutWaitPDLeader", func() {
log.Info("timeoutWaitPDLeader is injected, skip wait other etcd leader be etcd leader")
randomTimeout = time.Duration(rand.Intn(10))*time.Millisecond + 100*time.Millisecond
})
if lastUpdated.Add(randomTimeout).Before(time.Now()) && !lastUpdated.IsZero() && etcdLeader != 0 {
log.Info("the pd leader is lost for a long time, try to re-campaign a pd leader with resign etcd leader",
zap.Duration("timeout", randomTimeout),
zap.Time("last-updated", lastUpdated),
zap.String("current-leader-member-id", types.ID(etcdLeader).String()),
zap.String("transferee-member-id", types.ID(s.member.ID()).String()),
)
s.member.MoveEtcdLeader(s.ctx, etcdLeader, s.member.ID())
}
}
log.Info("skip campaigning of pd leader and check later",
zap.String("server-name", s.Name()),
zap.Uint64("etcd-leader-id", etcdLeader),
Expand Down Expand Up @@ -1551,6 +1582,16 @@ func (s *Server) campaignLeader() {
log.Info("no longer a leader because lease has expired, pd leader will step down")
return
}
// add failpoint to test exit leader, failpoint judge the member is the give value, then break
failpoint.Inject("exitCampaignLeader", func(val failpoint.Value) {
memberString := val.(string)
memberID, _ := strconv.ParseUint(memberString, 10, 64)
if s.member.ID() == memberID {
log.Info("exit PD leader")
failpoint.Return()
}
})

etcdLeader := s.member.GetEtcdLeader()
if etcdLeader != s.member.ID() {
log.Info("etcd leader changed, resigns pd leadership", zap.String("old-pd-leader-name", s.Name()))
Expand Down
24 changes: 24 additions & 0 deletions tests/server/member/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,30 @@ func TestLeaderResignWithBlock(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/server/raftclusterIsBusy"))
}

func TestPDLeaderLostWhileEtcdLeaderIntact(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 2)
defer cluster.Destroy()
re.NoError(err)

err = cluster.RunInitialServers()
re.NoError(err)

leader1 := cluster.WaitLeader()
memberID := cluster.GetServer(leader1).GetLeader().GetMemberId()

re.NoError(failpoint.Enable("github.com/tikv/pd/server/leaderLoopCheckAgain", fmt.Sprintf("return(\"%d\")", memberID)))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/exitCampaignLeader", fmt.Sprintf("return(\"%d\")", memberID)))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/timeoutWaitPDLeader", `return(true)`))
leader2 := waitLeaderChange(re, cluster, leader1)
re.NotEqual(leader1, leader2)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/leaderLoopCheckAgain"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/exitCampaignLeader"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/timeoutWaitPDLeader"))
}

func waitLeaderChange(re *require.Assertions, cluster *tests.TestCluster, old string) string {
var leader string
testutil.Eventually(re, func() bool {
Expand Down