From f28fe4468630a9073d4e7583bfcee3376a482f8b Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 29 Jan 2024 13:03:53 +0000 Subject: [PATCH] tracker: track in-flight commit index This commit adds a Progress.pendingCommit field tracking the highest commit index <= Next-1 which the leader sent to the follower. It is used to distinguish cases when a commit index update needs or doesn't need to be sent to a follower. Signed-off-by: Pavel Kalinnikov --- raft.go | 24 +++++++++++------- testdata/confchange_v2_replace_leader.txt | 4 --- testdata/probe_and_replicate.txt | 12 --------- tracker/progress.go | 30 +++++++++++++++++++++++ 4 files changed, 45 insertions(+), 25 deletions(-) diff --git a/raft.go b/raft.go index 7f591f26..16545836 100644 --- a/raft.go +++ b/raft.go @@ -599,6 +599,10 @@ func (r *raft) sendAppend(to uint64) { // argument controls whether messages with no entries will be sent // ("empty" messages are useful to convey updated Commit indexes, but // are undesirable when we're sending multiple messages in a batch). +// +// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress +// struct contains all the state necessary for deciding whether to send a +// message. func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { pr := r.trk.Progress[to] if pr.IsPaused() { @@ -641,6 +645,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { Commit: r.raftLog.committed, }) pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents))) + pr.SentCommit(r.raftLog.committed) return true } @@ -675,21 +680,21 @@ func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool { // sendHeartbeat sends a heartbeat RPC to the given peer. func (r *raft) sendHeartbeat(to uint64, ctx []byte) { + pr := r.trk.Progress[to] // Attach the commit as min(to.matched, r.committed). // When the leader sends out heartbeat message, // the receiver(follower) might not be matched with the leader // or it might not have all the committed entries. // The leader MUST NOT forward the follower's commit to // an unmatched index. - commit := min(r.trk.Progress[to].Match, r.raftLog.committed) - m := pb.Message{ + commit := min(pr.Match, r.raftLog.committed) + r.send(pb.Message{ To: to, Type: pb.MsgHeartbeat, Commit: commit, Context: ctx, - } - - r.send(m) + }) + pr.SentCommit(commit) } // bcastAppend sends RPC, with entries to all peers that are not up-to-date @@ -1480,7 +1485,6 @@ func stepLeader(r *raft, m pb.Message) error { r.sendAppend(m.From) } } else { - oldPaused := pr.IsPaused() // We want to update our tracking if the response updates our // matched index or if the response can move a probing peer back // into StateReplicate (see heartbeat_rep_recovers_from_probing.txt @@ -1517,9 +1521,11 @@ func stepLeader(r *raft, m pb.Message) error { // to respond to pending read index requests releasePendingReadIndexMessages(r) r.bcastAppend() - } else if oldPaused { - // If we were paused before, this node may be missing the - // latest commit index, so send it. + } else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) { + // This node may be missing the latest commit index, so send it. + // NB: this is not strictly necessary because the periodic heartbeat + // messages deliver commit indices too. However, a message sent now + // may arrive earlier than the next heartbeat fires. r.sendAppend(m.From) } // We've updated flow control information above, which may diff --git a/testdata/confchange_v2_replace_leader.txt b/testdata/confchange_v2_replace_leader.txt index ae43ce21..c31adad1 100644 --- a/testdata/confchange_v2_replace_leader.txt +++ b/testdata/confchange_v2_replace_leader.txt @@ -284,12 +284,10 @@ stabilize CommittedEntries: 2/5 EntryNormal "" Messages: - 4->1 MsgApp Term:2 Log:2/5 Commit:4 4->1 MsgApp Term:2 Log:2/5 Commit:5 4->2 MsgApp Term:2 Log:2/5 Commit:5 4->3 MsgApp Term:2 Log:2/5 Commit:5 > 1 receiving messages - 4->1 MsgApp Term:2 Log:2/5 Commit:4 4->1 MsgApp Term:2 Log:2/5 Commit:5 > 2 receiving messages 4->2 MsgApp Term:2 Log:2/5 Commit:5 @@ -302,7 +300,6 @@ stabilize 2/5 EntryNormal "" Messages: 1->4 MsgAppResp Term:2 Log:0/5 - 1->4 MsgAppResp Term:2 Log:0/5 > 2 handling Ready Ready MustSync=false: HardState Term:2 Vote:4 Commit:5 @@ -318,7 +315,6 @@ stabilize Messages: 3->4 MsgAppResp Term:2 Log:0/5 > 4 receiving messages - 1->4 MsgAppResp Term:2 Log:0/5 1->4 MsgAppResp Term:2 Log:0/5 2->4 MsgAppResp Term:2 Log:0/5 3->4 MsgAppResp Term:2 Log:0/5 diff --git a/testdata/probe_and_replicate.txt b/testdata/probe_and_replicate.txt index c4100e97..832be27b 100644 --- a/testdata/probe_and_replicate.txt +++ b/testdata/probe_and_replicate.txt @@ -513,18 +513,6 @@ stabilize 1 2 2->1 MsgAppResp Term:8 Log:0/21 > 1 receiving messages 2->1 MsgAppResp Term:8 Log:0/21 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->2 MsgApp Term:8 Log:8/21 Commit:18 -> 2 receiving messages - 1->2 MsgApp Term:8 Log:8/21 Commit:18 -> 2 handling Ready - Ready MustSync=false: - Messages: - 2->1 MsgAppResp Term:8 Log:0/21 -> 1 receiving messages - 2->1 MsgAppResp Term:8 Log:0/21 stabilize 1 3 ---- diff --git a/tracker/progress.go b/tracker/progress.go index cb4312a9..b3b1c56e 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -35,8 +35,20 @@ type Progress struct { // entries with indices in (Match, Next) interval are already in flight. // // Invariant: 0 <= Match < Next. + // NB: it follows that Next >= 1. Next uint64 + // pendingCommit is the highest commit index <= Next-1 in flight to the + // follower. + // + // The actual in-flight commit index can be higher, but we track only up to + // Next-1, because higher indices can be ignored by the peer or reduced to + // Next-1 when all the in-flight entries are appended. When Next regresses, + // pendingCommit regresses too. + // + // Invariant: 0 <= pendingCommit < Next. + pendingCommit uint64 + // State defines how the leader should interact with the follower. // // When in StateProbe, leader sends at most one replication message @@ -127,6 +139,7 @@ func (pr *Progress) BecomeProbe() { } else { pr.ResetState(StateProbe) pr.Next = pr.Match + 1 + pr.pendingCommit = min(pr.pendingCommit, pr.Match) } } @@ -134,6 +147,7 @@ func (pr *Progress) BecomeProbe() { func (pr *Progress) BecomeReplicate() { pr.ResetState(StateReplicate) pr.Next = pr.Match + 1 + pr.pendingCommit = min(pr.pendingCommit, pr.Match) } // BecomeSnapshot moves the Progress to StateSnapshot with the specified pending @@ -170,6 +184,20 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) { } } +// CanBumpCommit returns true if sending the given commit index can potentially +// advance the follower's commit index. +func (pr *Progress) CanBumpCommit(index uint64) bool { + return pr.pendingCommit < min(index, pr.Next-1) +} + +// SentCommit updates the pendingCommit. +func (pr *Progress) SentCommit(commit uint64) { + // Sending the given commit index may bump the follower's commit index up to + // Next-1, or even higher. We track only up to Next-1, and maintain the + // invariant: pendingCommit < Next. + pr.pendingCommit = min(max(pr.pendingCommit, commit), pr.Next-1) +} + // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the // index acked by it. The method returns false if the given n index comes from // an outdated message. Otherwise it updates the progress and returns true. @@ -205,6 +233,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { // // TODO(tbg): why not use matchHint if it's larger? pr.Next = pr.Match + 1 + pr.pendingCommit = min(pr.pendingCommit, pr.Match) return true } @@ -216,6 +245,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { } pr.Next = max(min(rejected, matchHint+1), pr.Match+1) + pr.pendingCommit = min(pr.pendingCommit, pr.Next-1) pr.MsgAppFlowPaused = false return true }