diff --git a/raft.go b/raft.go index 912c5c05..3764ffae 100644 --- a/raft.go +++ b/raft.go @@ -601,9 +601,7 @@ func (r *raft) send(m pb.Message) { // Returns true if a message was sent, or false otherwise. A message is not sent // if the follower log and commit index are up-to-date, the flow is paused (for // reasons like in-flight limits), or the message could not be constructed. -func (r *raft) maybeSendAppend(to uint64) bool { - pr := r.trk.Progress[to] - +func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool { last, commit := r.raftLog.lastIndex(), r.raftLog.committed if !pr.ShouldSendMsgApp(last, commit) { return false @@ -690,11 +688,11 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // bcastAppend sends RPC, with entries to all peers that are not up-to-date // according to the progress recorded in r.trk. func (r *raft) bcastAppend() { - r.trk.Visit(func(id uint64, _ *tracker.Progress) { + r.trk.Visit(func(id uint64, pr *tracker.Progress) { if id == r.id { return } - r.maybeSendAppend(id) + r.maybeSendAppend(id, pr) }) } @@ -1472,7 +1470,7 @@ func stepLeader(r *raft, m pb.Message) error { if pr.State == tracker.StateReplicate { pr.BecomeProbe() } - r.maybeSendAppend(m.From) + r.maybeSendAppend(m.From, pr) } } else { // We want to update our tracking if the response updates our @@ -1517,7 +1515,7 @@ func stepLeader(r *raft, m pb.Message) error { // transitioning from probe to replicate, or when freeTo() covers // multiple messages). Send as many messages as we can. if r.id != m.From { - for r.maybeSendAppend(m.From) { + for r.maybeSendAppend(m.From, pr) { } } // Transfer leadership is in progress. @@ -1530,7 +1528,7 @@ func stepLeader(r *raft, m pb.Message) error { case pb.MsgHeartbeatResp: pr.RecentActive = true pr.PauseMsgAppProbes(false) - r.maybeSendAppend(m.From) + r.maybeSendAppend(m.From, pr) if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { return nil @@ -1601,7 +1599,7 @@ func stepLeader(r *raft, m pb.Message) error { r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee) } else { pr.PauseMsgAppProbes(false) - r.maybeSendAppend(leadTransferee) + r.maybeSendAppend(leadTransferee, pr) } } return nil diff --git a/raft_test.go b/raft_test.go index c6b65896..c25bd14d 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2764,7 +2764,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeProbe() + pr2 := r.trk.Progress[2] + pr2.BecomeProbe() // each round is a heartbeat for i := 0; i < 3; i++ { @@ -2773,7 +2774,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { // loop. After that, the follower is paused until a heartbeat response is // received. mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.maybeSendAppend(2) + r.maybeSendAppend(2, pr2) msg := r.readMessages() if len(msg) != 1 { t.Errorf("len(msg) = %d, want %d", len(msg), 1) @@ -2788,7 +2789,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { } for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.maybeSendAppend(2) + r.maybeSendAppend(2, pr2) if l := len(r.readMessages()); l != 0 { t.Errorf("len(msg) = %d, want %d", l, 0) } @@ -2831,11 +2832,12 @@ func TestSendAppendForProgressReplicate(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeReplicate() + pr2 := r.trk.Progress[2] + pr2.BecomeReplicate() for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.maybeSendAppend(2) + r.maybeSendAppend(2, pr2) msgs := r.readMessages() if len(msgs) != 1 { t.Errorf("len(msg) = %d, want %d", len(msgs), 1) @@ -2848,11 +2850,12 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeSnapshot(10) + pr2 := r.trk.Progress[2] + pr2.BecomeSnapshot(10) for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.maybeSendAppend(2) + r.maybeSendAppend(2, pr2) msgs := r.readMessages() if len(msgs) != 0 { t.Errorf("len(msg) = %d, want %d", len(msgs), 0) @@ -4671,16 +4674,17 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) { r1.becomeCandidate() r1.becomeLeader() r1.readMessages() - r1.trk.Progress[2].BecomeReplicate() + pr2 := r1.trk.Progress[2] + pr2.BecomeReplicate() r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2))) // r1 sends 2 MsgApp messages to r2. mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")}) - r1.maybeSendAppend(2) + r1.maybeSendAppend(2, pr2) req1 := expectOneMessage(t, r1) mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")}) - r1.maybeSendAppend(2) + r1.maybeSendAppend(2, pr2) req2 := expectOneMessage(t, r1) // r2 receives the second MsgApp first due to reordering.