From 580374c628ac237a1b1c13f9c66fc3ab310b7e95 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 29 Jan 2024 17:13:14 +0000 Subject: [PATCH] raft: consolidate all append message sending Signed-off-by: Pavel Kalinnikov --- raft.go | 153 +++++++++++--------- raft_paper_test.go | 26 ++-- raft_test.go | 54 +++---- testdata/replicate_pause.txt | 3 + testdata/slow_follower_after_compaction.txt | 2 + tracker/progress.go | 8 +- 6 files changed, 130 insertions(+), 116 deletions(-) diff --git a/raft.go b/raft.go index 79a37966..1e823416 100644 --- a/raft.go +++ b/raft.go @@ -588,27 +588,49 @@ func (r *raft) send(m pb.Message) { } } -// sendAppend sends an append RPC with new entries (if any) and the -// current commit index to the given peer. -func (r *raft) sendAppend(to uint64) { - r.maybeSendAppend(to, true) -} - -// maybeSendAppend sends an append RPC with new entries to the given peer, -// if necessary. Returns true if a message was sent. The sendIfEmpty -// argument controls whether messages with no entries will be sent -// ("empty" messages are useful to convey updated Commit indexes, but -// are undesirable when we're sending multiple messages in a batch). +// sendAppend sends an append RPC with new entries to the given peer, if +// necessary. Returns true if a message was sent. // -// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress -// struct contains all the state necessary for deciding whether to send a -// message. -func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { - pr := r.trk.Progress[to] - if pr.IsPaused() { +// This may send an empty append message (with no entries) if replication to +// this follower is throttled, or there are no new entries but the commit index +// for the follower can be bumped. +func (r *raft) sendAppend(to uint64, pr *tracker.Progress) bool { + if pr.State == tracker.StateProbe { + return !pr.MsgAppFlowPaused && r.maybeSendAppend(to, pr) + } else if pr.State != tracker.StateReplicate { return false - } + } // only StateReplicate below + + // If there are any pending entries and the inflight tracking is not + // saturated, send a regular append message (or snapshot). + if pr.Next <= r.raftLog.lastIndex() && !pr.Inflights.Full() { + return r.maybeSendAppend(to, pr) + } + // NB: the commit index is periodically sent in the heartbeat messages, so + // technically we don't need the CanBumpCommit clause here to guarantee commit + // index convergence on the follower. However, sending it via MsgApp here + // allows faster (no heartbeat interval delay) convergence in some cases. + if pr.CanBumpCommit(r.raftLog.committed) { + return r.maybeSendEmptyAppend(to, pr) + } + // In a throttled StateReplicate, send an empty append message if we haven't + // done so recently. + // + // We must send periodic appends so that eventually the follower either + // accepts or rejects it. If we don't do so, replication can stall if all the + // in-flight appends are lost/dropped. + return !pr.MsgAppFlowPaused && pr.Match < r.raftLog.lastIndex() && + r.maybeSendEmptyAppend(to, pr) +} +// maybeSendAppend sends a non-empty append message to the given follower. It +// may send a snapshot instead if the required section of the log is no longer +// available in this leader's log. Returns true if a message was sent. +func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool { + // TODO(pav-kv): when pr.Next is updated, we always know the term of entry + // pr.Next-1, because the previous append message contains it. We should store + // (Next-1, Term) in Progress, instead of just Next. Then we don't have to + // fetch the term here, and may avoid an unnecessary snapshot. prevIndex := pr.Next - 1 prevTerm, err := r.raftLog.term(prevIndex) if err != nil { @@ -616,26 +638,10 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { // follower log anymore. Send a snapshot instead. return r.maybeSendSnapshot(to, pr) } - - var ents []pb.Entry - // In a throttled StateReplicate only send empty MsgApp, to ensure progress. - // Otherwise, if we had a full Inflights and all inflight messages were in - // fact dropped, replication to that follower would stall. Instead, an empty - // MsgApp will eventually reach the follower (heartbeats responses prompt the - // leader to send an append), allowing it to be acked or rejected, both of - // which will clear out Inflights. - if pr.State != tracker.StateReplicate || !pr.Inflights.Full() { - ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize) - } - if len(ents) == 0 && !sendIfEmpty { - return false - } - // TODO(pav-kv): move this check up to where err is returned. + ents, err := r.raftLog.entries(pr.Next, r.maxMsgSize) if err != nil { // send a snapshot if we failed to get the entries return r.maybeSendSnapshot(to, pr) } - - // Send the actual MsgApp otherwise, and update the progress accordingly. r.send(pb.Message{ To: to, Type: pb.MsgApp, @@ -649,6 +655,29 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { return true } +func (r *raft) maybeSendEmptyAppend(to uint64, pr *tracker.Progress) bool { + // TODO(pav-kv): when pr.Next is updated, we always know the term of entry + // pr.Next-1, because the append message contains it. Store (Next-1, Term) in + // Progress, instead of just Next. Then we don't have to fetch the term and + // send a potentially unnecessary snapshot here. + prevTerm, err := r.raftLog.term(pr.Next - 1) + if err != nil { + // The log probably got truncated at >= pr.Next, so we can't catch up the + // follower log anymore. Send a snapshot instead. + return r.maybeSendSnapshot(to, pr) + } + r.send(pb.Message{ + To: to, + Type: pb.MsgApp, + Index: pr.Next - 1, + LogTerm: prevTerm, + Commit: r.raftLog.committed, + }) + pr.SentEntries(0, 0) + pr.SentCommit(r.raftLog.committed) + return true +} + // maybeSendSnapshot fetches a snapshot from Storage, and sends it to the given // node. Returns true iff the snapshot message has been emitted successfully. func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool { @@ -700,11 +729,11 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // bcastAppend sends RPC, with entries to all peers that are not up-to-date // according to the progress recorded in r.trk. func (r *raft) bcastAppend() { - r.trk.Visit(func(id uint64, _ *tracker.Progress) { + r.trk.Visit(func(id uint64, pr *tracker.Progress) { if id == r.id { return } - r.sendAppend(id) + r.sendAppend(id, pr) }) } @@ -1482,7 +1511,7 @@ func stepLeader(r *raft, m pb.Message) error { if pr.State == tracker.StateReplicate { pr.BecomeProbe() } - r.sendAppend(m.From) + r.sendAppend(m.From, pr) } } else { // We want to update our tracking if the response updates our @@ -1521,21 +1550,13 @@ func stepLeader(r *raft, m pb.Message) error { // to respond to pending read index requests releasePendingReadIndexMessages(r) r.bcastAppend() - } else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) { - // This node may be missing the latest commit index, so send it. - // NB: this is not strictly necessary because the periodic heartbeat - // messages deliver commit indices too. However, a message sent now - // may arrive earlier than the next heartbeat fires. - r.sendAppend(m.From) } - // We've updated flow control information above, which may - // allow us to send multiple (size-limited) in-flight messages - // at once (such as when transitioning from probe to - // replicate, or when freeTo() covers multiple messages). If - // we have more entries to send, send as many messages as we - // can (without sending empty messages for the commit index) + // We've updated flow control information above, which may allow us to + // send multiple (size-limited) in-flight messages at once (such as when + // transitioning from StateProbe to StateReplicate). Send as many + // messages as we can. if r.id != m.From { - for r.maybeSendAppend(m.From, false /* sendIfEmpty */) { + for r.sendAppend(m.From, pr) { } } // Transfer leadership is in progress. @@ -1562,9 +1583,7 @@ func stepLeader(r *raft, m pb.Message) error { // Note that StateSnapshot typically satisfies pr.Match < lastIndex, but // `pr.Paused()` is always true for StateSnapshot, so sendAppend is a // no-op. - if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe { - r.sendAppend(m.From) - } + r.sendAppend(m.From, pr) if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { return nil @@ -1634,7 +1653,8 @@ func stepLeader(r *raft, m pb.Message) error { r.sendTimeoutNow(leadTransferee) r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee) } else { - r.sendAppend(leadTransferee) + pr.MsgAppFlowPaused = false // force a MsgApp + r.sendAppend(leadTransferee, pr) } } return nil @@ -1985,21 +2005,14 @@ func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.Co return cs } - if r.maybeCommit() { - // If the configuration change means that more entries are committed now, - // broadcast/append to everyone in the updated config. - r.bcastAppend() - } else { - // Otherwise, still probe the newly added replicas; there's no reason to - // let them wait out a heartbeat interval (or the next incoming - // proposal). - r.trk.Visit(func(id uint64, pr *tracker.Progress) { - if id == r.id { - return - } - r.maybeSendAppend(id, false /* sendIfEmpty */) - }) - } + // If the configuration change means that more entries are committed now, + // broadcast/append to everyone in the updated config. + // + // Otherwise, still probe the newly added replicas; there's no reason to let + // them wait out a heartbeat interval (or the next incoming proposal). + r.maybeCommit() + r.bcastAppend() + // If the leadTransferee was removed or demoted, abort the leadership transfer. if _, tOK := r.trk.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 { r.abortLeaderTransfer() diff --git a/raft_paper_test.go b/raft_paper_test.go index eff31f63..eab29786 100644 --- a/raft_paper_test.go +++ b/raft_paper_test.go @@ -28,6 +28,8 @@ package raft import ( "fmt" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "reflect" "sort" "testing" @@ -448,25 +450,17 @@ func TestLeaderCommitEntry(t *testing.T) { r.Step(acceptAndReply(m)) } - if g := r.raftLog.committed; g != li+1 { - t.Errorf("committed = %d, want %d", g, li+1) - } - wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}} - if g := r.raftLog.nextCommittedEnts(true); !reflect.DeepEqual(g, wents) { - t.Errorf("nextCommittedEnts = %+v, want %+v", g, wents) - } + require.Equal(t, li+1, r.raftLog.committed) + require.Equal(t, []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}, + r.raftLog.nextCommittedEnts(true)) + msgs := r.readMessages() + require.Len(t, msgs, 2) sort.Sort(messageSlice(msgs)) for i, m := range msgs { - if w := uint64(i + 2); m.To != w { - t.Errorf("to = %x, want %x", m.To, w) - } - if m.Type != pb.MsgApp { - t.Errorf("type = %v, want %v", m.Type, pb.MsgApp) - } - if m.Commit != li+1 { - t.Errorf("commit = %d, want %d", m.Commit, li+1) - } + assert.Equal(t, pb.MsgApp, m.Type) + assert.Equal(t, uint64(i+2), m.To) + assert.Equal(t, li+1, m.Commit) } } diff --git a/raft_test.go b/raft_test.go index 5a258e56..1be939fc 100644 --- a/raft_test.go +++ b/raft_test.go @@ -128,22 +128,24 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { r.becomeCandidate() r.becomeLeader() - r.trk.Progress[2].MsgAppFlowPaused = true + pr := r.trk.Progress[2] + pr.MsgAppFlowPaused = true r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if !r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused) - } + require.True(t, pr.MsgAppFlowPaused) + pr.BecomeReplicate() + require.False(t, pr.MsgAppFlowPaused) + pr.MsgAppFlowPaused = true + _ = r.readMessages() - r.trk.Progress[2].BecomeReplicate() - if r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppFlowPaused) - } - r.trk.Progress[2].MsgAppFlowPaused = true r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - if r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppFlowPaused) - } + msgs := r.readMessages() + require.Len(t, msgs, 1) + require.Equal(t, pb.MsgApp, msgs[0].Type) + require.Equal(t, uint64(2), msgs[0].To) + require.Len(t, msgs[0].Entries, 1) + + require.True(t, pr.MsgAppFlowPaused) } func TestProgressPaused(t *testing.T) { @@ -2764,7 +2766,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeProbe() + pr := r.trk.Progress[2] + pr.BecomeProbe() // each round is a heartbeat for i := 0; i < 3; i++ { @@ -2773,7 +2776,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { // loop. After that, the follower is paused until a heartbeat response is // received. mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.sendAppend(2, pr) msg := r.readMessages() if len(msg) != 1 { t.Errorf("len(msg) = %d, want %d", len(msg), 1) @@ -2788,7 +2791,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { } for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.sendAppend(2, pr) if l := len(r.readMessages()); l != 0 { t.Errorf("len(msg) = %d, want %d", l, 0) } @@ -2831,11 +2834,12 @@ func TestSendAppendForProgressReplicate(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeReplicate() + pr := r.trk.Progress[2] + pr.BecomeReplicate() for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.sendAppend(2, pr) msgs := r.readMessages() if len(msgs) != 1 { t.Errorf("len(msg) = %d, want %d", len(msgs), 1) @@ -2848,11 +2852,12 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeSnapshot(10) + pr := r.trk.Progress[2] + pr.BecomeSnapshot(10) for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.sendAppend(2, pr) msgs := r.readMessages() if len(msgs) != 0 { t.Errorf("len(msg) = %d, want %d", len(msgs), 0) @@ -3678,9 +3683,7 @@ func TestLeaderTransferToSlowFollower(t *testing.T) { nt.recover() lead := nt.peers[1].(*raft) - if lead.trk.Progress[3].Match != 1 { - t.Fatalf("node 1 has match %x for node 3, want %x", lead.trk.Progress[3].Match, 1) - } + require.Equal(t, uint64(1), lead.trk.Progress[3].Match) // Transfer leadership to 3 when node 3 is lack of log. nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) @@ -4671,16 +4674,17 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) { r1.becomeCandidate() r1.becomeLeader() r1.readMessages() - r1.trk.Progress[2].BecomeReplicate() + pr := r1.trk.Progress[2] + pr.BecomeReplicate() r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2))) // r1 sends 2 MsgApp messages to r2. mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")}) - r1.sendAppend(2) + r1.sendAppend(2, pr) req1 := expectOneMessage(t, r1) mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")}) - r1.sendAppend(2) + r1.sendAppend(2, pr) req2 := expectOneMessage(t, r1) // r2 receives the second MsgApp first due to reordering. diff --git a/testdata/replicate_pause.txt b/testdata/replicate_pause.txt index d9cee59f..4931480e 100644 --- a/testdata/replicate_pause.txt +++ b/testdata/replicate_pause.txt @@ -76,6 +76,9 @@ deliver-msgs drop=3 dropped: 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1_12"] dropped: 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_1_13"] dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:11 Entries:[1/14 EntryNormal "prop_1_14"] +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:12 +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:13 +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 # Repeat committing 3 entries. diff --git a/testdata/slow_follower_after_compaction.txt b/testdata/slow_follower_after_compaction.txt index 0d3d48c8..2ce02ada 100644 --- a/testdata/slow_follower_after_compaction.txt +++ b/testdata/slow_follower_after_compaction.txt @@ -88,6 +88,8 @@ deliver-msgs drop=3 ---- dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 Entries:[1/15 EntryNormal "prop_1_15"] dropped: 1->3 MsgApp Term:1 Log:1/15 Commit:14 Entries:[1/16 EntryNormal "prop_1_16"] +dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:15 +dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:16 # Truncate the leader's log beyond node 3 log size. compact 1 17 diff --git a/tracker/progress.go b/tracker/progress.go index d70e5780..ba89be25 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -133,6 +133,7 @@ func (pr *Progress) BecomeProbe() { pr.Next = pr.Match + 1 pr.pendingCommit = min(pr.pendingCommit, pr.Match) } + pr.pendingCommit = min(pr.pendingCommit, pr.Match) } // BecomeReplicate transitions into StateReplicate, resetting Next to Match+1. @@ -161,9 +162,7 @@ func (pr *Progress) SentEntries(entries int, bytes uint64) { pr.Next += uint64(entries) pr.Inflights.Add(pr.Next-1, bytes) } - // If this message overflows the in-flights tracker, or it was already full, - // consider this message being a probe, so that the flow is paused. - pr.MsgAppFlowPaused = pr.Inflights.Full() + pr.MsgAppFlowPaused = true case StateProbe: pr.MsgAppFlowPaused = true default: @@ -192,7 +191,6 @@ func (pr *Progress) MaybeUpdate(n uint64) bool { if pr.Match < n { pr.Match = n updated = true - pr.MsgAppFlowPaused = false } pr.Next = max(pr.Next, n+1) return updated @@ -249,7 +247,7 @@ func (pr *Progress) IsPaused() bool { case StateProbe: return pr.MsgAppFlowPaused case StateReplicate: - return pr.MsgAppFlowPaused + return pr.Inflights.Full() case StateSnapshot: return true default: