Skip to content

Commit

Permalink
change: introduce StorageError. RaftStorage gets rid of anyhow::Error
Browse files Browse the repository at this point in the history
`StorageError` is an `enum` of DefensiveError and StorageIOError.
An error a RaftStorage impl returns could be a defensive check error
or an actual io operation error.

Why:

anyhow::Error is not enough to support the flow control in RaftCore.
It is typeless thus RaftCore can not decide what next to do
depending on the returned error.

Inside raft, anyhow::Error should never be used, although it could be used as
`source()` of some other error types.
  • Loading branch information
drmingdrmer committed Sep 13, 2021
1 parent be4b3aa commit 74b1652
Show file tree
Hide file tree
Showing 14 changed files with 635 additions and 228 deletions.
2 changes: 1 addition & 1 deletion async-raft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ futures = "0.3"
log = "0.4"
rand = "0.8"
serde = { version="1", features=["derive"] }
thiserror = "1.0.20"
thiserror = "1.0.29"
tokio = { version="1.8", default-features=false, features=["fs", "io-util", "macros", "rt", "rt-multi-thread", "sync", "time"] }
tracing = "0.1.26"
tracing-futures = "0.2.4"
Expand Down
26 changes: 10 additions & 16 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,11 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

#[tracing::instrument(level = "debug", skip(self))]
async fn delete_logs(&mut self, start: u64) -> RaftResult<()> {
self.storage.delete_logs_from(start..).await.map_err(|err| self.map_fatal_storage_error(err))?;
self.storage.delete_logs_from(start..).await.map_err(|err| self.map_storage_error(err))?;

self.last_log_id = self.get_log_id(start - 1).await?;

let membership = self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?;
let membership = self.storage.get_membership_config().await.map_err(|err| self.map_storage_error(err))?;

self.update_membership(membership)?;

Expand All @@ -231,8 +231,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
return Ok(self.last_applied);
}

let entries =
self.storage.get_log_entries(index..=index).await.map_err(|err| self.map_fatal_storage_error(err))?;
let entries = self.storage.get_log_entries(index..=index).await.map_err(|err| self.map_storage_error(err))?;

let entry = entries
.first()
Expand Down Expand Up @@ -300,8 +299,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
msg_entries.summary()
);

let entries =
self.storage.get_log_entries(index..end).await.map_err(|err| self.map_fatal_storage_error(err))?;
let entries = self.storage.get_log_entries(index..end).await.map_err(|err| self.map_storage_error(err))?;

for (i, ent) in entries.iter().enumerate() {
assert_eq!(msg_entries[i].log_id.index, ent.log_id.index);
Expand Down Expand Up @@ -336,8 +334,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
return Ok(LogId { term: 0, index: 0 });
}

let entries =
self.storage.get_log_entries(start..=start).await.map_err(|err| self.map_fatal_storage_error(err))?;
let entries = self.storage.get_log_entries(start..=start).await.map_err(|err| self.map_storage_error(err))?;

let log_id = entries.first().unwrap().log_id;

Expand Down Expand Up @@ -394,7 +391,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

// Replicate entries to log (same as append, but in follower mode).
let entry_refs = entries.iter().collect::<Vec<_>>();
self.storage.append_to_log(&entry_refs).await.map_err(|err| self.map_fatal_storage_error(err))?;
self.storage.append_to_log(&entry_refs).await.map_err(|err| self.map_storage_error(err))?;
if let Some(entry) = entries.last() {
self.last_log_id = entry.log_id;
}
Expand Down Expand Up @@ -433,18 +430,15 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.storage
.get_log_entries(self.last_applied.index + 1..=self.commit_index)
.await
.map_err(|e| self.map_fatal_storage_error(e))?;
.map_err(|e| self.map_storage_error(e))?;

let last_log_id = entries.last().map(|x| x.log_id).unwrap();

tracing::debug!("entries: {}", entries.as_slice().summary());
tracing::debug!(?last_log_id);

let entries_refs: Vec<_> = entries.iter().collect();
self.storage
.apply_to_state_machine(&entries_refs)
.await
.map_err(|e| self.map_fatal_storage_error(e))?;
self.storage.apply_to_state_machine(&entries_refs).await.map_err(|e| self.map_storage_error(e))?;

self.last_applied = last_log_id;

Expand Down Expand Up @@ -474,12 +468,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

// Fetch the series of entries which must be applied to the state machine, then apply them.

let entries = storage.get_log_entries(start..stop).await.map_err(|e| self.map_fatal_storage_error(e))?;
let entries = storage.get_log_entries(start..stop).await.map_err(|e| self.map_storage_error(e))?;

let new_last_applied = entries.last().unwrap();

let data_entries: Vec<_> = entries.iter().collect();
storage.apply_to_state_machine(&data_entries).await.map_err(|e| self.map_fatal_storage_error(e))?;
storage.apply_to_state_machine(&data_entries).await.map_err(|e| self.map_storage_error(e))?;

self.last_applied = new_last_applied.log_id;
self.report_metrics(Update::Ignore);
Expand Down
25 changes: 13 additions & 12 deletions async-raft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::LogId;
use crate::MessageSummary;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::StorageError;

/// A wrapper around a ClientRequest which has been transformed into an Entry, along with its response channel.
pub(super) struct ClientRequestEntry<D: AppData, R: AppDataResponse> {
Expand Down Expand Up @@ -87,8 +88,12 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
// Thus if a new leader sees only the first one, it needs to append the final config log to let
// the change-membership operation to finish.

let last_logs =
self.core.storage.get_log_entries(last_index..=last_index).await.map_err(RaftError::RaftStorage)?;
let last_logs = self
.core
.storage
.get_log_entries(last_index..=last_index)
.await
.map_err(|x| RaftError::RaftStorage(x.into()))?;
let last_log = &last_logs[0];

let req = match last_log.payload {
Expand Down Expand Up @@ -266,11 +271,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
},
payload,
};
self.core
.storage
.append_to_log(&[&entry])
.await
.map_err(|err| self.core.map_fatal_storage_error(err))?;
self.core.storage.append_to_log(&[&entry]).await.map_err(|err| self.core.map_storage_error(err))?;
self.core.last_log_id.index = entry.log_id.index;

self.leader_report_metrics();
Expand Down Expand Up @@ -436,7 +437,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
.storage
.get_log_entries(expected_next_index..index)
.await
.map_err(|err| self.core.map_fatal_storage_error(err))?;
.map_err(|err| self.core.map_storage_error(err))?;

if let Some(entry) = entries.last() {
self.core.last_applied = entry.log_id;
Expand All @@ -448,18 +449,18 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
.storage
.apply_to_state_machine(&data_entries)
.await
.map_err(|err| self.core.map_fatal_storage_error(err))?;
.map_err(|err| self.core.map_storage_error(err))?;
}
}

// Apply this entry to the state machine and return its data response.
let res = self.core.storage.apply_to_state_machine(&[entry]).await.map_err(|err| {
if err.downcast_ref::<S::ShutdownError>().is_some() {
if let StorageError::IO { .. } = err {
// If this is an instance of the storage impl's shutdown error, then trigger shutdown.
self.core.map_fatal_storage_error(err)
self.core.map_storage_error(err)
} else {
// Else, we propagate normally.
RaftError::RaftStorage(err)
RaftError::RaftStorage(err.into())
}
});

Expand Down
8 changes: 3 additions & 5 deletions async-raft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}

// Create a new snapshot and begin writing its contents.
let mut snapshot =
self.storage.begin_receiving_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))?;
let mut snapshot = self.storage.begin_receiving_snapshot().await.map_err(|err| self.map_storage_error(err))?;
snapshot.as_mut().write_all(&req.data).await?;

// If this was a small snapshot, and it is already done, then finish up.
Expand Down Expand Up @@ -199,7 +198,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.storage
.finalize_snapshot_installation(&req.meta, snapshot)
.await
.map_err(|e| self.map_fatal_storage_error(e))?;
.map_err(|e| self.map_storage_error(e))?;

tracing::debug!("update after apply or install-snapshot: {:?}", changes);

Expand All @@ -216,8 +215,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}

// There could be unknown membership in the snapshot.
let membership =
self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?;
let membership = self.storage.get_membership_config().await.map_err(|err| self.map_storage_error(err))?;

self.update_membership(membership)?;

Expand Down
15 changes: 10 additions & 5 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use crate::MessageSummary;
use crate::NodeId;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::StorageError;
use crate::Update;

/// The core type implementing the Raft protocol.
Expand Down Expand Up @@ -179,7 +180,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
async fn main(mut self) -> RaftResult<()> {
tracing::debug!("raft node is initializing");

let state = self.storage.get_initial_state().await.map_err(|err| self.map_fatal_storage_error(err))?;
let state = self.storage.get_initial_state().await.map_err(|err| self.map_storage_error(err))?;
self.last_log_id = state.last_log_id;
self.current_term = state.hard_state.current_term;
self.voted_for = state.hard_state.voted_for;
Expand All @@ -191,9 +192,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.commit_index = 0;

// Fetch the most recent snapshot in the system.
if let Some(snapshot) =
self.storage.get_current_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))?
{
if let Some(snapshot) = self.storage.get_current_snapshot().await.map_err(|err| self.map_storage_error(err))? {
self.snapshot_last_log_id = snapshot.meta.last_log_id;
self.report_metrics(Update::Ignore);
}
Expand Down Expand Up @@ -286,7 +285,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
current_term: self.current_term,
voted_for: self.voted_for,
};
self.storage.save_hard_state(&hs).await.map_err(|err| self.map_fatal_storage_error(err))
self.storage.save_hard_state(&hs).await.map_err(|err| self.map_storage_error(err))
}

/// Update core's target state, ensuring all invariants are upheld.
Expand Down Expand Up @@ -367,6 +366,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
RaftError::RaftStorage(err)
}

fn map_storage_error(&mut self, err: StorageError) -> RaftError {
tracing::error!({error=?err, id=self.id}, "fatal storage error, shutting down");
self.set_target_state(State::Shutdown);
RaftError::RaftStorage(err.into())
}

/// Update the node's current membership config & save hard state.
#[tracing::instrument(level = "trace", skip(self))]
fn update_membership(&mut self, cfg: MembershipConfig) -> RaftResult<()> {
Expand Down
8 changes: 2 additions & 6 deletions async-raft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
};

// Check for existence of current snapshot.
let current_snapshot_opt = self
.core
.storage
.get_current_snapshot()
.await
.map_err(|err| self.core.map_fatal_storage_error(err))?;
let current_snapshot_opt =
self.core.storage.get_current_snapshot().await.map_err(|err| self.core.map_storage_error(err))?;

if let Some(snapshot) = current_snapshot_opt {
// If snapshot exists, ensure its distance from the leader's last log index is <= half
Expand Down
25 changes: 16 additions & 9 deletions async-raft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

use std::fmt;

use thiserror::Error;

use crate::raft_types::SnapshotSegmentId;
use crate::AppData;
use crate::NodeId;
Expand All @@ -12,7 +10,7 @@ use crate::NodeId;
pub type RaftResult<T> = std::result::Result<T, RaftError>;

/// Error variants related to the internals of Raft.
#[derive(Debug, Error)]
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum RaftError {
// Streaming-snapshot encountered mismatched snapshot_id/offset
Expand All @@ -21,12 +19,15 @@ pub enum RaftError {
expect: SnapshotSegmentId,
got: SnapshotSegmentId,
},

/// An error which has come from the `RaftStorage` layer.
#[error("{0}")]
RaftStorage(anyhow::Error),

/// An error which has come from the `RaftNetwork` layer.
#[error("{0}")]
RaftNetwork(anyhow::Error),

/// An internal Raft error indicating that Raft is shutting down.
#[error("Raft is shutting down")]
ShuttingDown,
Expand All @@ -39,7 +40,7 @@ impl From<tokio::io::Error> for RaftError {
}

/// An error related to a client read request.
#[derive(Debug, Error)]
#[derive(Debug, thiserror::Error)]
pub enum ClientReadError {
/// A Raft error.
#[error("{0}")]
Expand All @@ -50,7 +51,7 @@ pub enum ClientReadError {
}

/// An error related to a client write request.
#[derive(Error)]
#[derive(thiserror::Error)]
pub enum ClientWriteError<D: AppData> {
/// A Raft error.
#[error("{0}")]
Expand All @@ -72,13 +73,14 @@ impl<D: AppData> fmt::Debug for ClientWriteError<D> {
}

/// Error variants related to configuration.
#[derive(Debug, Error, Eq, PartialEq)]
#[derive(Debug, thiserror::Error, Eq, PartialEq)]
#[non_exhaustive]
pub enum ConfigError {
/// A configuration error indicating that the given values for election timeout min & max are invalid: max must be
/// greater than min.
#[error("given values for election timeout min & max are invalid: max must be greater than min")]
InvalidElectionTimeoutMinMax,

/// The given value for max_payload_entries is too small, must be > 0.
#[error("the given value for max_payload_entries is too small, must be > 0")]
MaxPayloadEntriesTooSmall,
Expand All @@ -90,19 +92,20 @@ pub enum ConfigError {
}

/// The set of errors which may take place when initializing a pristine Raft node.
#[derive(Debug, Error)]
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum InitializeError {
/// An internal error has taken place.
#[error("{0}")]
RaftError(#[from] RaftError),

/// The requested action is not allowed due to the Raft node's current state.
#[error("the requested action is not allowed due to the Raft node's current state")]
NotAllowed,
}

/// The set of errors which may take place when requesting to propose a config change.
#[derive(Debug, Error)]
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum ChangeConfigError {
/// An error related to the processing of the config change request.
Expand All @@ -111,19 +114,23 @@ pub enum ChangeConfigError {
/// to the Raft log and the process related to that workflow.
#[error("{0}")]
RaftError(#[from] RaftError),

/// The cluster is already undergoing a configuration change.
#[error("the cluster is already undergoing a configuration change")]
ConfigChangeInProgress,

/// The given config would leave the cluster in an inoperable state.
///
/// This error will be returned if the full set of changes, once fully applied, would leave
/// the cluster in an inoperable state.
#[error("the given config would leave the cluster in an inoperable state")]
InoperableConfig,

/// The node the config change proposal was sent to was not the leader of the cluster. The ID
/// of the current leader is returned if known.
#[error("this node is not the Raft leader")]
NodeNotLeader(Option<NodeId>),

/// The proposed config changes would make no difference to the current config.
///
/// This takes into account a current joint consensus and the end result of the config.
Expand All @@ -141,7 +148,7 @@ impl<D: AppData> From<ClientWriteError<D>> for ChangeConfigError {
}

// A error wrapper of every type of error that will be sent to the caller.
#[derive(Debug, Error)]
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum ResponseError {
#[error(transparent)]
Expand Down
Loading

0 comments on commit 74b1652

Please sign in to comment.