Skip to content

Commit

Permalink
raft: consolidate all append message sending
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Feb 8, 2024
1 parent 8d90676 commit 580374c
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 116 deletions.
153 changes: 83 additions & 70 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,54 +588,60 @@ func (r *raft) send(m pb.Message) {
}
}

// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
r.maybeSendAppend(to, true)
}

// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
// sendAppend sends an append RPC with new entries to the given peer, if
// necessary. Returns true if a message was sent.
//
// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress
// struct contains all the state necessary for deciding whether to send a
// message.
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.trk.Progress[to]
if pr.IsPaused() {
// This may send an empty append message (with no entries) if replication to
// this follower is throttled, or there are no new entries but the commit index
// for the follower can be bumped.
func (r *raft) sendAppend(to uint64, pr *tracker.Progress) bool {
if pr.State == tracker.StateProbe {
return !pr.MsgAppFlowPaused && r.maybeSendAppend(to, pr)
} else if pr.State != tracker.StateReplicate {
return false
}
} // only StateReplicate below

// If there are any pending entries and the inflight tracking is not
// saturated, send a regular append message (or snapshot).
if pr.Next <= r.raftLog.lastIndex() && !pr.Inflights.Full() {
return r.maybeSendAppend(to, pr)
}
// NB: the commit index is periodically sent in the heartbeat messages, so
// technically we don't need the CanBumpCommit clause here to guarantee commit
// index convergence on the follower. However, sending it via MsgApp here
// allows faster (no heartbeat interval delay) convergence in some cases.
if pr.CanBumpCommit(r.raftLog.committed) {
return r.maybeSendEmptyAppend(to, pr)
}
// In a throttled StateReplicate, send an empty append message if we haven't
// done so recently.
//
// We must send periodic appends so that eventually the follower either
// accepts or rejects it. If we don't do so, replication can stall if all the
// in-flight appends are lost/dropped.
return !pr.MsgAppFlowPaused && pr.Match < r.raftLog.lastIndex() &&
r.maybeSendEmptyAppend(to, pr)
}

// maybeSendAppend sends a non-empty append message to the given follower. It
// may send a snapshot instead if the required section of the log is no longer
// available in this leader's log. Returns true if a message was sent.
func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool {
// TODO(pav-kv): when pr.Next is updated, we always know the term of entry
// pr.Next-1, because the previous append message contains it. We should store
// (Next-1, Term) in Progress, instead of just Next. Then we don't have to
// fetch the term here, and may avoid an unnecessary snapshot.
prevIndex := pr.Next - 1
prevTerm, err := r.raftLog.term(prevIndex)
if err != nil {
// The log probably got truncated at >= pr.Next, so we can't catch up the
// follower log anymore. Send a snapshot instead.
return r.maybeSendSnapshot(to, pr)
}

var ents []pb.Entry
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
// Otherwise, if we had a full Inflights and all inflight messages were in
// fact dropped, replication to that follower would stall. Instead, an empty
// MsgApp will eventually reach the follower (heartbeats responses prompt the
// leader to send an append), allowing it to be acked or rejected, both of
// which will clear out Inflights.
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize)
}
if len(ents) == 0 && !sendIfEmpty {
return false
}
// TODO(pav-kv): move this check up to where err is returned.
ents, err := r.raftLog.entries(pr.Next, r.maxMsgSize)
if err != nil { // send a snapshot if we failed to get the entries
return r.maybeSendSnapshot(to, pr)
}

// Send the actual MsgApp otherwise, and update the progress accordingly.
r.send(pb.Message{
To: to,
Type: pb.MsgApp,
Expand All @@ -649,6 +655,29 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
return true
}

func (r *raft) maybeSendEmptyAppend(to uint64, pr *tracker.Progress) bool {
// TODO(pav-kv): when pr.Next is updated, we always know the term of entry
// pr.Next-1, because the append message contains it. Store (Next-1, Term) in
// Progress, instead of just Next. Then we don't have to fetch the term and
// send a potentially unnecessary snapshot here.
prevTerm, err := r.raftLog.term(pr.Next - 1)
if err != nil {
// The log probably got truncated at >= pr.Next, so we can't catch up the
// follower log anymore. Send a snapshot instead.
return r.maybeSendSnapshot(to, pr)
}
r.send(pb.Message{
To: to,
Type: pb.MsgApp,
Index: pr.Next - 1,
LogTerm: prevTerm,
Commit: r.raftLog.committed,
})
pr.SentEntries(0, 0)
pr.SentCommit(r.raftLog.committed)
return true
}

// maybeSendSnapshot fetches a snapshot from Storage, and sends it to the given
// node. Returns true iff the snapshot message has been emitted successfully.
func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool {
Expand Down Expand Up @@ -700,11 +729,11 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.trk.
func (r *raft) bcastAppend() {
r.trk.Visit(func(id uint64, _ *tracker.Progress) {
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
if id == r.id {
return
}
r.sendAppend(id)
r.sendAppend(id, pr)
})
}

Expand Down Expand Up @@ -1482,7 +1511,7 @@ func stepLeader(r *raft, m pb.Message) error {
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
r.sendAppend(m.From)
r.sendAppend(m.From, pr)
}
} else {
// We want to update our tracking if the response updates our
Expand Down Expand Up @@ -1521,21 +1550,13 @@ func stepLeader(r *raft, m pb.Message) error {
// to respond to pending read index requests
releasePendingReadIndexMessages(r)
r.bcastAppend()
} else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) {
// This node may be missing the latest commit index, so send it.
// NB: this is not strictly necessary because the periodic heartbeat
// messages deliver commit indices too. However, a message sent now
// may arrive earlier than the next heartbeat fires.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
// We've updated flow control information above, which may allow us to
// send multiple (size-limited) in-flight messages at once (such as when
// transitioning from StateProbe to StateReplicate). Send as many
// messages as we can.
if r.id != m.From {
for r.maybeSendAppend(m.From, false /* sendIfEmpty */) {
for r.sendAppend(m.From, pr) {
}
}
// Transfer leadership is in progress.
Expand All @@ -1562,9 +1583,7 @@ func stepLeader(r *raft, m pb.Message) error {
// Note that StateSnapshot typically satisfies pr.Match < lastIndex, but
// `pr.Paused()` is always true for StateSnapshot, so sendAppend is a
// no-op.
if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe {
r.sendAppend(m.From)
}
r.sendAppend(m.From, pr)

if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
Expand Down Expand Up @@ -1634,7 +1653,8 @@ func stepLeader(r *raft, m pb.Message) error {
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
r.sendAppend(leadTransferee)
pr.MsgAppFlowPaused = false // force a MsgApp
r.sendAppend(leadTransferee, pr)
}
}
return nil
Expand Down Expand Up @@ -1985,21 +2005,14 @@ func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.Co
return cs
}

if r.maybeCommit() {
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
r.bcastAppend()
} else {
// Otherwise, still probe the newly added replicas; there's no reason to
// let them wait out a heartbeat interval (or the next incoming
// proposal).
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
if id == r.id {
return
}
r.maybeSendAppend(id, false /* sendIfEmpty */)
})
}
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
//
// Otherwise, still probe the newly added replicas; there's no reason to let
// them wait out a heartbeat interval (or the next incoming proposal).
r.maybeCommit()
r.bcastAppend()

// If the leadTransferee was removed or demoted, abort the leadership transfer.
if _, tOK := r.trk.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
r.abortLeaderTransfer()
Expand Down
26 changes: 10 additions & 16 deletions raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ package raft

import (
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"reflect"
"sort"
"testing"
Expand Down Expand Up @@ -448,25 +450,17 @@ func TestLeaderCommitEntry(t *testing.T) {
r.Step(acceptAndReply(m))
}

if g := r.raftLog.committed; g != li+1 {
t.Errorf("committed = %d, want %d", g, li+1)
}
wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}
if g := r.raftLog.nextCommittedEnts(true); !reflect.DeepEqual(g, wents) {
t.Errorf("nextCommittedEnts = %+v, want %+v", g, wents)
}
require.Equal(t, li+1, r.raftLog.committed)
require.Equal(t, []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}},
r.raftLog.nextCommittedEnts(true))

msgs := r.readMessages()
require.Len(t, msgs, 2)
sort.Sort(messageSlice(msgs))
for i, m := range msgs {
if w := uint64(i + 2); m.To != w {
t.Errorf("to = %x, want %x", m.To, w)
}
if m.Type != pb.MsgApp {
t.Errorf("type = %v, want %v", m.Type, pb.MsgApp)
}
if m.Commit != li+1 {
t.Errorf("commit = %d, want %d", m.Commit, li+1)
}
assert.Equal(t, pb.MsgApp, m.Type)
assert.Equal(t, uint64(i+2), m.To)
assert.Equal(t, li+1, m.Commit)
}
}

Expand Down
54 changes: 29 additions & 25 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,22 +128,24 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()

r.trk.Progress[2].MsgAppFlowPaused = true
pr := r.trk.Progress[2]
pr.MsgAppFlowPaused = true

r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
if !r.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
}
require.True(t, pr.MsgAppFlowPaused)
pr.BecomeReplicate()
require.False(t, pr.MsgAppFlowPaused)
pr.MsgAppFlowPaused = true
_ = r.readMessages()

r.trk.Progress[2].BecomeReplicate()
if r.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppFlowPaused)
}
r.trk.Progress[2].MsgAppFlowPaused = true
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
if r.trk.Progress[2].MsgAppFlowPaused {
t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppFlowPaused)
}
msgs := r.readMessages()
require.Len(t, msgs, 1)
require.Equal(t, pb.MsgApp, msgs[0].Type)
require.Equal(t, uint64(2), msgs[0].To)
require.Len(t, msgs[0].Entries, 1)

require.True(t, pr.MsgAppFlowPaused)
}

func TestProgressPaused(t *testing.T) {
Expand Down Expand Up @@ -2764,7 +2766,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
r.readMessages()
r.trk.Progress[2].BecomeProbe()
pr := r.trk.Progress[2]
pr.BecomeProbe()

// each round is a heartbeat
for i := 0; i < 3; i++ {
Expand All @@ -2773,7 +2776,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
// loop. After that, the follower is paused until a heartbeat response is
// received.
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.sendAppend(2, pr)
msg := r.readMessages()
if len(msg) != 1 {
t.Errorf("len(msg) = %d, want %d", len(msg), 1)
Expand All @@ -2788,7 +2791,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
}
for j := 0; j < 10; j++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.sendAppend(2, pr)
if l := len(r.readMessages()); l != 0 {
t.Errorf("len(msg) = %d, want %d", l, 0)
}
Expand Down Expand Up @@ -2831,11 +2834,12 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
r.readMessages()
r.trk.Progress[2].BecomeReplicate()
pr := r.trk.Progress[2]
pr.BecomeReplicate()

for i := 0; i < 10; i++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.sendAppend(2, pr)
msgs := r.readMessages()
if len(msgs) != 1 {
t.Errorf("len(msg) = %d, want %d", len(msgs), 1)
Expand All @@ -2848,11 +2852,12 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
r.readMessages()
r.trk.Progress[2].BecomeSnapshot(10)
pr := r.trk.Progress[2]
pr.BecomeSnapshot(10)

for i := 0; i < 10; i++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.sendAppend(2, pr)
msgs := r.readMessages()
if len(msgs) != 0 {
t.Errorf("len(msg) = %d, want %d", len(msgs), 0)
Expand Down Expand Up @@ -3678,9 +3683,7 @@ func TestLeaderTransferToSlowFollower(t *testing.T) {

nt.recover()
lead := nt.peers[1].(*raft)
if lead.trk.Progress[3].Match != 1 {
t.Fatalf("node 1 has match %x for node 3, want %x", lead.trk.Progress[3].Match, 1)
}
require.Equal(t, uint64(1), lead.trk.Progress[3].Match)

// Transfer leadership to 3 when node 3 is lack of log.
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
Expand Down Expand Up @@ -4671,16 +4674,17 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) {
r1.becomeCandidate()
r1.becomeLeader()
r1.readMessages()
r1.trk.Progress[2].BecomeReplicate()
pr := r1.trk.Progress[2]
pr.BecomeReplicate()

r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2)))

// r1 sends 2 MsgApp messages to r2.
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
r1.sendAppend(2)
r1.sendAppend(2, pr)
req1 := expectOneMessage(t, r1)
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
r1.sendAppend(2)
r1.sendAppend(2, pr)
req2 := expectOneMessage(t, r1)

// r2 receives the second MsgApp first due to reordering.
Expand Down
Loading

0 comments on commit 580374c

Please sign in to comment.