Skip to content

Commit

Permalink
change: AppendEntriesRequest: merge prev_log_{term,index} into prev_l…
Browse files Browse the repository at this point in the history
…og: LogId
  • Loading branch information
drmingdrmer committed Jul 9, 2021
1 parent 9c5f3d7 commit 58d8e3a
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 23 deletions.
18 changes: 9 additions & 9 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// See `receiver implementation: AppendEntries RPC` in raft-essentials.md in this repo.
#[tracing::instrument(
level="trace", skip(self, msg),
fields(term=msg.term, leader_id=msg.leader_id, prev_log_index=msg.prev_log_index, prev_log_term=msg.prev_log_term, leader_commit=msg.leader_commit),
fields(term=msg.term, leader_id=msg.leader_id, prev_log_index=msg.prev_log.index, prev_log_term=msg.prev_log.term, leader_commit=msg.leader_commit),
)]
pub(super) async fn handle_append_entries_request(
&mut self,
Expand Down Expand Up @@ -61,9 +61,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

// If RPC's `prev_log_index` is 0, or the RPC's previous log info matches the local
// log info, then replication is g2g.
let msg_prev_index_is_min = msg.prev_log_index == u64::min_value();
let msg_prev_index_is_min = msg.prev_log.index == u64::min_value();
let msg_index_and_term_match =
(msg.prev_log_index == self.last_log.index) && (msg.prev_log_term == self.last_log.term);
(msg.prev_log.index == self.last_log.index) && (msg.prev_log.term == self.last_log.term);
if msg_prev_index_is_min || msg_index_and_term_match {
self.append_log_entries(&msg.entries).await?;
self.replicate_to_state_machine_if_needed(msg.entries).await;
Expand All @@ -85,7 +85,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// result.
let entries = self
.storage
.get_log_entries(msg.prev_log_index, msg.prev_log_index + 1)
.get_log_entries(msg.prev_log.index, msg.prev_log.index + 1)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
let target_entry = match entries.first() {
Expand All @@ -108,7 +108,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
};

// The target entry was found. Compare its term with target term to ensure everything is consistent.
if target_entry.term == msg.prev_log_term {
if target_entry.term == msg.prev_log.term {
// We've found a point of agreement with the leader. If we have any logs present
// with an index greater than this, then we must delete them per §5.3.
if self.last_log.index > target_entry.index {
Expand All @@ -124,17 +124,17 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// The target entry does not have the same term. Fetch the last 50 logs, and use the last
// entry of that payload which is still in the target term for conflict optimization.
else {
let start = if msg.prev_log_index >= 50 {
msg.prev_log_index - 50
let start = if msg.prev_log.index >= 50 {
msg.prev_log.index - 50
} else {
0
};
let old_entries = self
.storage
.get_log_entries(start, msg.prev_log_index)
.get_log_entries(start, msg.prev_log.index)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
let opt = match old_entries.iter().find(|entry| entry.term == msg.prev_log_term) {
let opt = match old_entries.iter().find(|entry| entry.term == msg.prev_log.term) {
Some(entry) => Some(ConflictOpt {
term: entry.term,
index: entry.index,
Expand Down
3 changes: 1 addition & 2 deletions async-raft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
let rpc = AppendEntriesRequest {
term: self.core.current_term,
leader_id: self.core.id,
prev_log_index: node.matched.index,
prev_log_term: node.matched.term,
prev_log: node.matched,
entries: vec![],
leader_commit: self.core.commit_index,
};
Expand Down
8 changes: 4 additions & 4 deletions async-raft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,10 @@ pub struct AppendEntriesRequest<D: AppData> {
pub term: u64,
/// The leader's ID. Useful in redirecting clients.
pub leader_id: u64,
/// The index of the log entry immediately preceding the new entries.
pub prev_log_index: u64,
/// The term of the `prev_log_index` entry.
pub prev_log_term: u64,

/// The log entry immediately preceding the new entries.
pub prev_log: LogId,

/// The new log entries to store.
///
/// This may be empty when the leader is sending heartbeats. Entries
Expand Down
3 changes: 1 addition & 2 deletions async-raft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
let payload = AppendEntriesRequest {
term: self.term,
leader_id: self.id,
prev_log_index: self.matched.index,
prev_log_term: self.matched.term,
prev_log: self.matched,
leader_commit: self.commit_index,
entries: self.outbound_buffer.iter().map(|entry| entry.as_ref().clone()).collect(),
};
Expand Down
10 changes: 4 additions & 6 deletions async-raft/tests/conflict_with_empty_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use async_raft::raft::Entry;
use async_raft::raft::EntryNormal;
use async_raft::raft::EntryPayload;
use async_raft::Config;
use async_raft::LogId;
use async_raft::RaftNetwork;
use fixtures::RaftRouter;
use memstore::ClientRequest;
Expand Down Expand Up @@ -49,8 +50,7 @@ async fn conflict_with_empty_entries() -> Result<()> {
let rpc = AppendEntriesRequest::<memstore::ClientRequest> {
term: 1,
leader_id: 1,
prev_log_index: 5,
prev_log_term: 1,
prev_log: LogId { term: 1, index: 5 },
entries: vec![],
leader_commit: 5,
};
Expand All @@ -66,8 +66,7 @@ async fn conflict_with_empty_entries() -> Result<()> {
let rpc = AppendEntriesRequest::<memstore::ClientRequest> {
term: 1,
leader_id: 1,
prev_log_index: 0,
prev_log_term: 1,
prev_log: LogId { term: 1, index: 0 },
entries: vec![
Entry {
term: 1,
Expand Down Expand Up @@ -98,8 +97,7 @@ async fn conflict_with_empty_entries() -> Result<()> {
let rpc = AppendEntriesRequest::<memstore::ClientRequest> {
term: 1,
leader_id: 1,
prev_log_index: 3,
prev_log_term: 1,
prev_log: LogId { term: 1, index: 3 },
entries: vec![],
leader_commit: 5,
};
Expand Down

0 comments on commit 58d8e3a

Please sign in to comment.