Skip to content

Commit

Permalink
refactor:if there may be more logs to replicate, continue to call sen…
Browse files Browse the repository at this point in the history
…d_append_entries in next loop, no need to wait heartbeat tick
  • Loading branch information
lichuang committed Jan 4, 2022
1 parent 4fa5006 commit a85055f
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 16 deletions.
3 changes: 0 additions & 3 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ pub enum ReplicationError {
#[error("{0}")]
LackEntry(#[from] LackEntry),

#[error("more logs")]
MoreLogs,

#[error("leader committed index {committed_index} advances target log index {target_index} too many")]
CommittedAdvanceTooMany { committed_index: u64, target_index: u64 },

Expand Down
26 changes: 13 additions & 13 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
ReplicationError::Network { .. } => {
// nothing to do
}
ReplicationError::MoreLogs { .. } => {
// nothing to do
}
};
}
}
Expand Down Expand Up @@ -318,8 +315,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
break (prev_log_id, logs);
};

let log_size = logs.len();

// Build the heartbeat frame to be sent to the follower.
let payload = AppendEntriesRequest {
term: self.term,
Expand Down Expand Up @@ -393,10 +388,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
// Continue to find the matching log id on follower.
self.max_possible_matched_index = conflict.index - 1;

// log_size == max_payload_entries means there may be more logs to replicate
if log_size == self.config.max_payload_entries.try_into().unwrap() {
return Err(ReplicationError::MoreLogs);
}
Ok(())
}

Expand Down Expand Up @@ -467,6 +458,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
}
}

/// Perform a check to see if this replication stream has more log to replicate
#[tracing::instrument(level = "trace", skip(self))]
pub(self) fn has_more_log(&self) -> bool {
return self.last_log_index > self.matched.index;
}

#[tracing::instrument(level = "trace", skip(self))]
pub async fn try_drain_raft_rx(&mut self) -> Result<(), ReplicationError> {
tracing::debug!("try_drain_raft_rx");
Expand Down Expand Up @@ -648,10 +645,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
ReplicationError::Network { .. } => {
break;
}
ReplicationError::MoreLogs { .. } => {
// continue to call send_append_entries in next loop, no need to wait heartbeat tick
continue;
}
_ => {
return Err(err);
}
Expand All @@ -674,6 +667,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
let span = tracing::debug_span!("CHrx:LineRate");
let _en = span.enter();

// Check raft channel to ensure we are staying up-to-date
self.try_drain_raft_rx().await?;
if self.has_more_log() {
// if there is more log, continue to send_append_entries
continue;
}

tokio::select! {
_ = self.heartbeat.tick() => {
tracing::debug!("heartbeat triggered");
Expand Down

0 comments on commit a85055f

Please sign in to comment.