Skip to content

Commit

Permalink
raft: pass Progress to maybeSendAppend
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Feb 23, 2024
1 parent 626b5c8 commit c38c1b7
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 19 deletions.
16 changes: 7 additions & 9 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,9 +601,7 @@ func (r *raft) send(m pb.Message) {
// Returns true if a message was sent, or false otherwise. A message is not sent
// if the follower log and commit index are up-to-date, the flow is paused (for
// reasons like in-flight limits), or the message could not be constructed.
func (r *raft) maybeSendAppend(to uint64) bool {
pr := r.trk.Progress[to]

func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool {
last, commit := r.raftLog.lastIndex(), r.raftLog.committed
if !pr.ShouldSendMsgApp(last, commit) {
return false
Expand Down Expand Up @@ -690,11 +688,11 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.trk.
func (r *raft) bcastAppend() {
r.trk.Visit(func(id uint64, _ *tracker.Progress) {
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
if id == r.id {
return
}
r.maybeSendAppend(id)
r.maybeSendAppend(id, pr)
})
}

Expand Down Expand Up @@ -1472,7 +1470,7 @@ func stepLeader(r *raft, m pb.Message) error {
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
r.maybeSendAppend(m.From)
r.maybeSendAppend(m.From, pr)
}
} else {
// We want to update our tracking if the response updates our
Expand Down Expand Up @@ -1517,7 +1515,7 @@ func stepLeader(r *raft, m pb.Message) error {
// transitioning from probe to replicate, or when freeTo() covers
// multiple messages). Send as many messages as we can.
if r.id != m.From {
for r.maybeSendAppend(m.From) {
for r.maybeSendAppend(m.From, pr) {
}
}
// Transfer leadership is in progress.
Expand All @@ -1530,7 +1528,7 @@ func stepLeader(r *raft, m pb.Message) error {
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.MsgAppFlowPaused = false
r.maybeSendAppend(m.From)
r.maybeSendAppend(m.From, pr)

if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
Expand Down Expand Up @@ -1601,7 +1599,7 @@ func stepLeader(r *raft, m pb.Message) error {
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
pr.MsgAppFlowPaused = false
r.maybeSendAppend(leadTransferee)
r.maybeSendAppend(leadTransferee, pr)
}
}
return nil
Expand Down
24 changes: 14 additions & 10 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2764,7 +2764,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
r.readMessages()
r.trk.Progress[2].BecomeProbe()
pr2 := r.trk.Progress[2]
pr2.BecomeProbe()

// each round is a heartbeat
for i := 0; i < 3; i++ {
Expand All @@ -2773,7 +2774,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
// loop. After that, the follower is paused until a heartbeat response is
// received.
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.maybeSendAppend(2)
r.maybeSendAppend(2, pr2)
msg := r.readMessages()
if len(msg) != 1 {
t.Errorf("len(msg) = %d, want %d", len(msg), 1)
Expand All @@ -2788,7 +2789,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
}
for j := 0; j < 10; j++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.maybeSendAppend(2)
r.maybeSendAppend(2, pr2)
if l := len(r.readMessages()); l != 0 {
t.Errorf("len(msg) = %d, want %d", l, 0)
}
Expand Down Expand Up @@ -2831,11 +2832,12 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
r.readMessages()
r.trk.Progress[2].BecomeReplicate()
pr2 := r.trk.Progress[2]
pr2.BecomeReplicate()

for i := 0; i < 10; i++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.maybeSendAppend(2)
r.maybeSendAppend(2, pr2)
msgs := r.readMessages()
if len(msgs) != 1 {
t.Errorf("len(msg) = %d, want %d", len(msgs), 1)
Expand All @@ -2848,11 +2850,12 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
r.readMessages()
r.trk.Progress[2].BecomeSnapshot(10)
pr2 := r.trk.Progress[2]
pr2.BecomeSnapshot(10)

for i := 0; i < 10; i++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.maybeSendAppend(2)
r.maybeSendAppend(2, pr2)
msgs := r.readMessages()
if len(msgs) != 0 {
t.Errorf("len(msg) = %d, want %d", len(msgs), 0)
Expand Down Expand Up @@ -4671,16 +4674,17 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) {
r1.becomeCandidate()
r1.becomeLeader()
r1.readMessages()
r1.trk.Progress[2].BecomeReplicate()
pr2 := r1.trk.Progress[2]
pr2.BecomeReplicate()

r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2)))

// r1 sends 2 MsgApp messages to r2.
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
r1.maybeSendAppend(2)
r1.maybeSendAppend(2, pr2)
req1 := expectOneMessage(t, r1)
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
r1.maybeSendAppend(2)
r1.maybeSendAppend(2, pr2)
req2 := expectOneMessage(t, r1)

// r2 receives the second MsgApp first due to reordering.
Expand Down

0 comments on commit c38c1b7

Please sign in to comment.