Skip to content

Commit

Permalink
tmp: refactor maybeSendAppend
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Jan 25, 2024
1 parent 8e5b410 commit eeefed5
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 172 deletions.
3 changes: 1 addition & 2 deletions confchange/confchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ func (c Changer) initProgress(cfg *tracker.Config, trk tracker.ProgressMap, id u
// at all (and will thus likely need a snapshot), though the app may
// have applied a snapshot out of band before adding the replica (thus
// making the first index the better choice).
Next: c.LastIndex,
Match: 0,
Watermark: tracker.Watermark{Next: c.LastIndex},
Inflights: tracker.NewInflights(c.Tracker.MaxInflight, c.Tracker.MaxInflightBytes),
IsLearner: isLearner,
// When a node is first added, we should mark it as recently active.
Expand Down
208 changes: 113 additions & 95 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,85 +586,119 @@ func (r *raft) send(m pb.Message) {
}
}

// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
r.maybeSendAppend(to, true)
}

// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.trk.Progress[to]
if pr.IsPaused() {
// maybeSendAppend sends an append RPC with new entries and commit index to the
// given peer, if necessary. Returns true if a message was sent.
//
// May send an empty message, to convey an update Commit index.
func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool {
if to == r.id {
return false
}
if st := pr.State; st != tracker.StateProbe && st != tracker.StateReplicate {
return false
} else if st == tracker.StateProbe && pr.MsgAppFlowPaused {
return false
}

lastIndex, nextIndex := pr.Next-1, pr.Next
lastTerm, errt := r.raftLog.term(lastIndex)

var ents []pb.Entry
var erre error
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
// Otherwise, if we had a full Inflights and all inflight messages were in
// fact dropped, replication to that follower would stall. Instead, an empty
// MsgApp will eventually reach the follower (heartbeats responses prompt the
// leader to send an append), allowing it to be acked or rejected, both of
// which will clear out Inflights.
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
ents, erre = r.raftLog.entries(nextIndex, r.maxMsgSize)
prevTerm, err := r.raftLog.term(pr.Next - 1)
if err != nil {
// The log probably got truncated at >= pr.Next, so we can't catch up the
// follower log anymore. Send a snapshot instead.
return r.sendSnapshot(to, pr)
}
if pr.State == tracker.StateProbe { // !pr.MsgAppFlowPaused
pr.MsgAppFlowPaused = true
return r.sendAppend(to, pr, prevTerm)
}
// StateReplicate
repl := (*tracker.ProgressReplicate)(pr)

if len(ents) == 0 && !sendIfEmpty {
lastIndex, commit := r.raftLog.lastIndex(), r.raftLog.committed
if repl.UpToDate(lastIndex, commit) {
return false
}

if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return false
}

snapshot, err := r.raftLog.snapshot()
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
return false
}
panic(err) // TODO(bdarnell)
}
if IsEmptySnap(snapshot) {
panic("need non-empty snapshot")
}
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)

r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
return true
if !repl.IsThrottled(lastIndex) {
return r.sendAppend(to, pr, prevTerm)
}
if repl.ShouldSendCommit(commit) {
return r.sendMsgAppPing(to, pr, prevTerm)
}
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
// Otherwise, if all the inflight messages are dropped, replication to that
// follower stalls. We send an empty MsgApp periodically, so that eventually
// it reaches the follower, and the latter acks or rejects it. The MsgAppFlowPaused flag
// is reset by a HeartbeatResp message, which is guaranteed if the follower is
// connected.
//
// Also, ensure sending an empty MsgApp if the follower's commit index can be
// moved forward.
if !repl.MsgAppFlowPaused {
return r.sendMsgAppPing(to, pr, prevTerm)
}
return false
}

func (r *raft) sendMsgAppPing(to uint64, pr *tracker.Progress, prevTerm uint64) bool {
commit := r.raftLog.committed
r.send(pb.Message{
To: to,
Type: pb.MsgApp,
Index: pr.Next - 1,
LogTerm: prevTerm,
Commit: commit,
})
pr.MsgAppFlowPaused = true
pr.Commit.Sent(min(commit, pr.Next-1))
return true
}

// Send the actual MsgApp otherwise, and update the progress accordingly.
if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), nextIndex); err != nil {
r.logger.Panicf("%x: %v", r.id, err)
func (r *raft) sendAppend(to uint64, pr *tracker.Progress, prevTerm uint64) bool {
entries, err := r.raftLog.entries(pr.Next, r.maxMsgSize)
if err != nil { // send snapshot if we failed to get entries
return r.sendSnapshot(to, pr)
}
// NB: pr has been updated, but we make sure to only use its old values below.
prevIndex := pr.Next - 1
pr.UpdateOnEntriesSend(len(entries), uint64(payloadsSize(entries)))
commit := r.raftLog.committed
pr.Commit.Sent(min(commit, pr.Next-1))
r.send(pb.Message{
To: to,
Type: pb.MsgApp,
Index: lastIndex,
LogTerm: lastTerm,
Entries: ents,
Commit: r.raftLog.committed,
Index: prevIndex,
LogTerm: prevTerm,
Entries: entries,
Commit: commit,
})
return true
}

func (r *raft) sendSnapshot(to uint64, pr *tracker.Progress) bool {
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return false
}

snapshot, err := r.raftLog.snapshot()
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
return false
}
panic(err) // TODO(bdarnell)
}
if IsEmptySnap(snapshot) {
panic("need non-empty snapshot")
}
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)

r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
return true
}

// sendHeartbeat sends a heartbeat RPC to the given peer.
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// Attach the commit as min(to.matched, r.committed).
Expand All @@ -687,11 +721,8 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.trk.
func (r *raft) bcastAppend() {
r.trk.Visit(func(id uint64, _ *tracker.Progress) {
if id == r.id {
return
}
r.sendAppend(id)
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
r.maybeSendAppend(id, pr)
})
}

Expand Down Expand Up @@ -773,8 +804,7 @@ func (r *raft) reset(term uint64) {
r.trk.ResetVotes()
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
*pr = tracker.Progress{
Match: 0,
Next: r.raftLog.lastIndex() + 1,
Watermark: tracker.Watermark{Next: r.raftLog.lastIndex() + 1},
Inflights: tracker.NewInflights(r.trk.MaxInflight, r.trk.MaxInflightBytes),
IsLearner: pr.IsLearner,
}
Expand Down Expand Up @@ -1462,11 +1492,10 @@ func stepLeader(r *raft, m pb.Message) error {
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
r.sendAppend(m.From)
r.maybeSendAppend(m.From, pr)
}
} else {
oldPaused := pr.IsPaused()
pr.UpdateCommit(m.Commit)
pr.Commit.Update(m.Commit)
// We want to update our tracking if the response updates our
// matched index or if the response can move a probing peer back
// into StateReplicate (see heartbeat_rep_recovers_from_probing.txt
Expand Down Expand Up @@ -1503,9 +1532,6 @@ func stepLeader(r *raft, m pb.Message) error {
// to respond to pending read index requests
releasePendingReadIndexMessages(r)
r.bcastAppend()
} else if oldPaused && r.id != m.From && pr.Commit < r.raftLog.committed {
// The node is potentially missing the latest commit index. Send it.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
Expand All @@ -1514,7 +1540,7 @@ func stepLeader(r *raft, m pb.Message) error {
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
if r.id != m.From {
for r.maybeSendAppend(m.From, false /* sendIfEmpty */) {
for r.maybeSendAppend(m.From, pr) {
}
}
// Transfer leadership is in progress.
Expand All @@ -1541,9 +1567,7 @@ func stepLeader(r *raft, m pb.Message) error {
// Note that StateSnapshot typically satisfies pr.Match < lastIndex, but
// `pr.Paused()` is always true for StateSnapshot, so sendAppend is a
// no-op.
if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe {
r.sendAppend(m.From)
}
r.maybeSendAppend(m.From, pr)

if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
Expand Down Expand Up @@ -1613,7 +1637,7 @@ func stepLeader(r *raft, m pb.Message) error {
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
r.sendAppend(leadTransferee)
r.maybeSendAppend(leadTransferee, pr)
}
}
return nil
Expand Down Expand Up @@ -1947,21 +1971,15 @@ func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.Co
return cs
}

if r.maybeCommit() {
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
r.bcastAppend()
} else {
// Otherwise, still probe the newly added replicas; there's no reason to
// let them wait out a heartbeat interval (or the next incoming
// proposal).
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
if id == r.id {
return
}
r.maybeSendAppend(id, false /* sendIfEmpty */)
})
}
r.maybeCommit()
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
//
// Otherwise, still probe the newly added replicas; there's no reason to
// let them wait out a heartbeat interval (or the next incoming
// proposal).
r.bcastAppend()

// If the leadTransferee was removed or demoted, abort the leadership transfer.
if _, tOK := r.trk.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
r.abortLeaderTransfer()
Expand Down
19 changes: 11 additions & 8 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2794,7 +2794,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
r.readMessages()
r.trk.Progress[2].BecomeProbe()
pr2 := r.trk.Progress[2]
pr2.BecomeProbe()

// each round is a heartbeat
for i := 0; i < 3; i++ {
Expand All @@ -2803,7 +2804,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
// loop. After that, the follower is paused until a heartbeat response is
// received.
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.maybeSendAppend(2, pr2)
msg := r.readMessages()
if len(msg) != 1 {
t.Errorf("len(msg) = %d, want %d", len(msg), 1)
Expand All @@ -2818,7 +2819,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
}
for j := 0; j < 10; j++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.maybeSendAppend(2, pr2)
if l := len(r.readMessages()); l != 0 {
t.Errorf("len(msg) = %d, want %d", l, 0)
}
Expand Down Expand Up @@ -2861,11 +2862,12 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
r.readMessages()
r.trk.Progress[2].BecomeReplicate()
pr2 := r.trk.Progress[2]
pr2.BecomeReplicate()

for i := 0; i < 10; i++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.maybeSendAppend(2, pr2)
msgs := r.readMessages()
if len(msgs) != 1 {
t.Errorf("len(msg) = %d, want %d", len(msgs), 1)
Expand All @@ -2878,11 +2880,12 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
r.readMessages()
r.trk.Progress[2].BecomeSnapshot(10)
pr2 := r.trk.Progress[2]
pr2.BecomeSnapshot(10)

for i := 0; i < 10; i++ {
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
r.maybeSendAppend(2, pr2)
msgs := r.readMessages()
if len(msgs) != 0 {
t.Errorf("len(msg) = %d, want %d", len(msgs), 0)
Expand All @@ -2901,7 +2904,7 @@ func TestRecvMsgUnreachable(t *testing.T) {
// set node 2 to state replicate
r.trk.Progress[2].Match = 3
r.trk.Progress[2].BecomeReplicate()
r.trk.Progress[2].OptimisticUpdate(5)
r.trk.Progress[2].Update(5)

r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})

Expand Down
4 changes: 4 additions & 0 deletions testdata/confchange_v2_add_double_auto.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,20 @@ stabilize 1 2
Ready MustSync=false:
Messages:
1->2 MsgApp Term:1 Log:1/4 Commit:4 Entries:[1/5 EntryConfChangeV2]
1->2 MsgApp Term:1 Log:1/5 Commit:4
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/4 Commit:4 Entries:[1/5 EntryConfChangeV2]
1->2 MsgApp Term:1 Log:1/5 Commit:4
> 2 handling Ready
Ready MustSync=true:
Entries:
1/5 EntryConfChangeV2
Messages:
2->1 MsgAppResp Term:1 Log:0/5 Commit:4
2->1 MsgAppResp Term:1 Log:0/5 Commit:4
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/5 Commit:4
2->1 MsgAppResp Term:1 Log:0/5 Commit:4
> 1 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:5
Expand Down
Loading

0 comments on commit eeefed5

Please sign in to comment.