Skip to content

Commit

Permalink
Change: remove RaftNetworkFactory::ConnectionError and AddLearnerErro…
Browse files Browse the repository at this point in the history
…r::NetworkError

`RaftNetworkFactory::new_client()` does not return an error because
openraft can only ignore it.  Therefore it should **not** create a
connection but rather a client that will connect when required.  Thus
there is chance it will build a client that is unable to send out
anything, e.g., in case the Node network address is configured
incorrectly.

Because of the above change, And `AddLearnerError` will not include a
NetworkError any more, because when adding a learner, the connectivity
can not be effectively detected.

Upgrade tip:

Just update the application network implementation so that it compiles.
  • Loading branch information
drmingdrmer committed Feb 12, 2023
1 parent 1a65b48 commit d1b3b23
Show file tree
Hide file tree
Showing 12 changed files with 36 additions and 97 deletions.
11 changes: 3 additions & 8 deletions examples/raft-kv-memstore/src/network/raft_network_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,13 @@ impl ExampleNetwork {
#[async_trait]
impl RaftNetworkFactory<ExampleTypeConfig> for ExampleNetwork {
type Network = ExampleNetworkConnection;
type ConnectionError = NetworkError;

async fn new_client(
&mut self,
target: ExampleNodeId,
node: &BasicNode,
) -> Result<Self::Network, Self::ConnectionError> {
Ok(ExampleNetworkConnection {
async fn new_client(&mut self, target: ExampleNodeId, node: &BasicNode) -> Self::Network {
ExampleNetworkConnection {
owner: ExampleNetwork {},
target,
target_node: node.clone(),
})
}
}
}

Expand Down
9 changes: 2 additions & 7 deletions examples/raft-kv-rocksdb/src/network/raft_network_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,11 @@ pub struct ExampleNetwork {}
#[async_trait]
impl RaftNetworkFactory<ExampleTypeConfig> for ExampleNetwork {
type Network = ExampleNetworkConnection;
type ConnectionError = NetworkError;

async fn new_client(
&mut self,
target: ExampleNodeId,
node: &ExampleNode,
) -> Result<Self::Network, Self::ConnectionError> {
async fn new_client(&mut self, target: ExampleNodeId, node: &ExampleNode) -> Self::Network {
let addr = format!("ws://{}", node.rpc_addr);
let client = Client::dial_websocket(&addr).await.ok();
Ok(ExampleNetworkConnection { addr, client, target })
ExampleNetworkConnection { addr, client, target }
}
}

Expand Down
53 changes: 13 additions & 40 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use crate::engine::Command;
use crate::engine::Engine;
use crate::engine::SendResult;
use crate::entry::EntryRef;
use crate::error::AddLearnerError;
use crate::error::ChangeMembershipError;
use crate::error::CheckIsLeaderError;
use crate::error::ClientWriteError;
Expand All @@ -50,7 +49,6 @@ use crate::error::InProgress;
use crate::error::InitializeError;
use crate::error::LearnerIsLagging;
use crate::error::LearnerNotFound;
use crate::error::NetworkError;
use crate::error::QuorumNotEnough;
use crate::error::RPCError;
use crate::error::Timeout;
Expand Down Expand Up @@ -271,13 +269,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
let my_id = self.id;
// Safe unwrap(): target is in membership
let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap().clone();
let mut client = match self.network.new_client(target, &target_node).await {
Ok(n) => n,
Err(e) => {
tracing::error!(target = display(target), "Failed to create client, this is a non recoverable error, the node will be permanently ignored! {}", e);
continue;
}
};
let mut client = self.network.new_client(target, &target_node).await;

let ttl = Duration::from_millis(self.config.heartbeat_interval);

Expand Down Expand Up @@ -414,13 +406,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
return Ok(());
}

// Ensure the a client can successfully be created
if let Err(e) = self.network.new_client(target, &node).await {
let net_err = NetworkError::new(&anyerror::AnyError::new(&e));
let _ = tx.send(Err(AddLearnerError::NetworkError(net_err)));
return Ok(());
}

let curr = &self.engine.state.membership_state.effective().membership;
let new_membership = curr.add_learner(target, node);

Expand Down Expand Up @@ -934,16 +919,16 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
&mut self,
target: C::NodeId,
progress_entry: ProgressEntry<C::NodeId>,
) -> Result<ReplicationHandle<C::NodeId, C::Node, S::SnapshotData>, N::ConnectionError> {
) -> ReplicationHandle<C::NodeId, C::Node, S::SnapshotData> {
// Safe unwrap(): target must be in membership
let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap();

let membership_log_id = self.engine.state.membership_state.effective().log_id;
let network = self.network.new_client(target, target_node).await?;
let network = self.network.new_client(target, target_node).await;

let session_id = ReplicationSessionId::new(*self.engine.state.get_vote(), membership_log_id);

Ok(ReplicationCore::<C, N, S>::spawn(
ReplicationCore::<C, N, S>::spawn(
target,
session_id,
self.config.clone(),
Expand All @@ -953,7 +938,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.storage.get_log_reader().await,
self.tx_api.clone(),
tracing::span!(parent: &self.span, Level::DEBUG, "replication", id=display(self.id), target=display(target)),
))
)
}

/// Remove all replication.
Expand Down Expand Up @@ -1063,13 +1048,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

// Safe unwrap(): target must be in membership
let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap().clone();
let mut client = match self.network.new_client(target, &target_node).await {
Ok(n) => n,
Err(err) => {
tracing::error!({error=%err, target=display(target)}, "while requesting vote");
continue;
}
};
let mut client = self.network.new_client(target, &target_node).await;

let tx = self.tx_api.clone();

Expand Down Expand Up @@ -1551,19 +1530,13 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
self.remove_all_replication().await;

for (target, matching) in targets.iter() {
match self.spawn_replication_stream(*target, *matching).await {
Ok(state) => {
if let Some(l) = &mut self.leader_data {
l.nodes.insert(*target, state);
} else {
unreachable!("it has to be a leader!!!");
}
}
Err(e) => {
tracing::error!({node = % target}, "cannot connect to {:?}", e);
// cannot return Err, or raft fail completely
}
};
let handle = self.spawn_replication_stream(*target, *matching).await;

if let Some(l) = &mut self.leader_data {
l.nodes.insert(*target, handle);
} else {
unreachable!("it has to be a leader!!!");
}
}
}
Command::UpdateProgressMetrics { target, matching } => {
Expand Down
11 changes: 2 additions & 9 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,6 @@ where
{
#[error(transparent)]
ForwardToLeader(#[from] ForwardToLeader<NID, N>),

// TODO: do we really need this error? An app may check an target node if it wants to.
#[error(transparent)]
NetworkError(#[from] NetworkError),
}

impl<NID, N> TryAsRef<ForwardToLeader<NID, N>> for AddLearnerError<NID, N>
Expand All @@ -223,7 +219,6 @@ where
fn try_as_ref(&self) -> Option<&ForwardToLeader<NID, N>> {
match self {
Self::ForwardToLeader(f) => Some(f),
_ => None,
}
}
}
Expand All @@ -236,10 +231,8 @@ where
type Error = AddLearnerError<NID, N>;

fn try_from(value: AddLearnerError<NID, N>) -> Result<Self, Self::Error> {
if let AddLearnerError::ForwardToLeader(e) = value {
return Ok(e);
}
Err(value)
let AddLearnerError::ForwardToLeader(e) = value;
Ok(e)
}
}

Expand Down
17 changes: 4 additions & 13 deletions openraft/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! The Raft network interface.

use std::error::Error;
use std::fmt::Formatter;

use async_trait::async_trait;
Expand Down Expand Up @@ -81,22 +80,14 @@ where C: RaftTypeConfig
/// Actual type of the network handling a single connection.
type Network: RaftNetwork<C>;

/// The error that an implementation returns when `connect()` fails.
// TODO: renaming it to `create()` would be better?
type ConnectionError: Error + Send + Sync;

/// Create a new network instance sending RPCs to the target node.
///
/// This function should **not** create a connection but rather a client that will connect when
/// required
/// required. Therefore there is chance it will build a client that is unable to send out
/// anything, e.g., in case the Node network address is configured incorrectly. But this method
/// does not return an error because openraft can only ignore it.
///
/// The method is intentionally async to give the implementation a chance to use asynchronous
/// sync primitives to serialize access to the common internal object, if needed.
///
/// # Errors
/// When this function errors, it indicates a non-recoverable error in the network, no retries
/// will be attempted, and the corresponding node will be constantly unavailable.
/// Any recoverable error (such as unreachable host) should be returned as `Ok` with a client
/// that will perform the reconnect
async fn new_client(&mut self, target: C::NodeId, node: &C::Node) -> Result<Self::Network, Self::ConnectionError>;
async fn new_client(&mut self, target: C::NodeId, node: &C::Node) -> Self::Network;
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn conflict_with_empty_entries() -> Result<()> {
leader_commit: Some(LogId::new(LeaderId::new(1, 0), 5)),
};

let resp = router.new_client(0, &()).await?.send_append_entries(rpc).await?;
let resp = router.new_client(0, &()).await.send_append_entries(rpc).await?;
assert!(!resp.is_success());
assert!(resp.is_conflict());

Expand All @@ -77,7 +77,7 @@ async fn conflict_with_empty_entries() -> Result<()> {
leader_commit: Some(LogId::new(LeaderId::new(1, 0), 5)),
};

let resp = router.new_client(0, &()).await?.send_append_entries(rpc).await?;
let resp = router.new_client(0, &()).await.send_append_entries(rpc).await?;
assert!(resp.is_success());
assert!(!resp.is_conflict());

Expand All @@ -90,7 +90,7 @@ async fn conflict_with_empty_entries() -> Result<()> {
leader_commit: Some(LogId::new(LeaderId::new(1, 0), 5)),
};

let resp = router.new_client(0, &()).await?.send_append_entries(rpc).await?;
let resp = router.new_client(0, &()).await.send_append_entries(rpc).await?;
assert!(!resp.is_success());
assert!(resp.is_conflict());

Expand Down
2 changes: 1 addition & 1 deletion openraft/tests/append_entries/t10_see_higher_vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn append_sees_higher_vote() -> Result<()> {
{
router
.new_client(1, &())
.await?
.await
.send_vote(VoteRequest {
vote: Vote::new(10, 1),
last_log_id: Some(LogId::new(LeaderId::new(10, 1), 5)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn append_entries_with_bigger_term() -> Result<()> {
leader_commit: Some(LogId::new(LeaderId::new(1, 0), log_index)),
};

let resp = router.new_client(0, &()).await?.send_append_entries(req).await?;
let resp = router.new_client(0, &()).await.send_append_entries(req).await?;
assert!(resp.is_success());

// after append entries, check hard state in term 2 and vote for node 1
Expand Down
14 changes: 3 additions & 11 deletions openraft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,20 +955,12 @@ where
S: Default + Clone,
{
type Network = RaftRouterNetwork<C, S>;
type ConnectionError = NetworkError;

async fn new_client(&mut self, target: C::NodeId, _node: &C::Node) -> Result<Self::Network, NetworkError> {
{
let unreachable = self.unconnectable.lock().unwrap();
if unreachable.contains(&target) {
let e = NetworkError::new(&AnyError::error(format!("failed to connect: {}", target)));
return Err(e);
}
}
Ok(RaftRouterNetwork {
async fn new_client(&mut self, target: C::NodeId, _node: &C::Node) -> Self::Network {
RaftRouterNetwork {
target,
owner: self.clone(),
})
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion openraft/tests/log_compaction/t10_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async fn compaction() -> Result<()> {
{
let res = router
.new_client(1, &())
.await?
.await
.send_append_entries(AppendEntriesRequest {
vote: Vote::new_committed(1, 0),
prev_log_id: Some(LogId::new(LeaderId::new(1, 0), 2)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async fn snapshot_overrides_membership() -> Result<()> {
}],
leader_commit: Some(LogId::new(LeaderId::new(0, 0), 0)),
};
router.new_client(1, &()).await?.send_append_entries(req).await?;
router.new_client(1, &()).await.send_append_entries(req).await?;

tracing::info!("--- check that learner membership is affected");
{
Expand Down
4 changes: 2 additions & 2 deletions openraft/tests/snapshot/t43_snapshot_delete_conflict_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> {
],
leader_commit: Some(LogId::new(LeaderId::new(1, 0), 2)),
};
router.new_client(1, &()).await?.send_append_entries(req).await?;
router.new_client(1, &()).await.send_append_entries(req).await?;

tracing::info!("--- check that learner membership is affected");
{
Expand Down Expand Up @@ -146,7 +146,7 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> {
done: true,
};

router.new_client(1, &()).await?.send_install_snapshot(req).await?;
router.new_client(1, &()).await.send_install_snapshot(req).await?;

tracing::info!("--- DONE installing snapshot");

Expand Down

0 comments on commit d1b3b23

Please sign in to comment.