Skip to content

Commit

Permalink
Change: rename RaftStorage methods do_log_compaction: build_snapshot,…
Browse files Browse the repository at this point in the history
… delete_logs_from: delete_log
  • Loading branch information
drmingdrmer committed Jan 15, 2022
1 parent 7e204ec commit abda0d1
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 37 deletions.
4 changes: 2 additions & 2 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}

#[tracing::instrument(level = "debug", skip(self, range), fields(range=?range))]
async fn delete_logs_from<R: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn delete_log<R: RangeBounds<u64> + Clone + Debug + Send + Sync>(
&self,
range: R,
) -> Result<(), StorageError> {
Expand Down Expand Up @@ -231,7 +231,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn do_log_compaction(&self) -> Result<Snapshot<Self::SnapshotData>, StorageError> {
async fn build_snapshot(&self) -> Result<Snapshot<Self::SnapshotData>, StorageError> {
let (data, last_applied_log);

{
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// - keep track of last_log_id, first_log_id,
// RaftStorage should only provides the least basic APIs.

self.storage.delete_logs_from(start..).await?;
self.storage.delete_log(start..).await?;

self.last_log_id = if start == 0 {
None
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

tokio::spawn(
async move {
let f = storage.do_log_compaction();
let f = storage.build_snapshot();
let res = Abortable::new(f, reg).await;
match res {
Ok(res) => match res {
Expand Down Expand Up @@ -654,7 +654,7 @@ where
tracing::debug!(%last_applied, max_keep, delete_lt = x, "delete_applied_logs");

if x > 0 {
sto.delete_logs_from(..x).await
sto.delete_log(..x).await
} else {
Ok(())
}
Expand Down
17 changes: 8 additions & 9 deletions openraft/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ where
/// Delete all logs in a `range`.
///
/// Errors returned from this method will cause Raft to go into shutdown.
async fn delete_logs_from<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn delete_log<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
&self,
range: RB,
) -> Result<(), StorageError>;
Expand Down Expand Up @@ -293,15 +293,14 @@ where

// --- Snapshot

/// Perform log compaction, returning a handle to the generated snapshot.
/// Build snapshot
///
/// ### implementation guide
/// When performing log compaction, the compaction can only cover the breadth of the log up to
/// the last applied log and under write load this value may change quickly. As such, the
/// storage implementation should export/checkpoint/snapshot its state machine, and then use
/// the value of that export's last applied log as the metadata indicating the breadth of the
/// log covered by the snapshot.
async fn do_log_compaction(&self) -> Result<Snapshot<Self::SnapshotData>, StorageError>;
/// A snapshot has to contain information about exactly all logs upto the last applied.
///
/// Building snapshot can be done by:
/// - Performing log compaction, e.g. merge log entries that operates on the same key, like a LSM-tree does,
/// - or by fetching a snapshot from the state machine.
async fn build_snapshot(&self) -> Result<Snapshot<Self::SnapshotData>, StorageError>;

/// Create a new blank snapshot, returning a writable handle to the snapshot object.
///
Expand Down
8 changes: 4 additions & 4 deletions openraft/src/store_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ where
}

#[tracing::instrument(level = "trace", skip(self))]
async fn delete_logs_from<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
async fn delete_log<RB: RangeBounds<u64> + Clone + Debug + Send + Sync>(
&self,
range: RB,
) -> Result<(), StorageError> {
self.defensive_nonempty_range(range.clone()).await?;
self.defensive_half_open_range(range.clone()).await?;

self.inner().delete_logs_from(range).await
self.inner().delete_log(range).await
}

#[tracing::instrument(level = "trace", skip(self, entries), fields(entries=%entries.summary()))]
Expand All @@ -147,8 +147,8 @@ where
}

#[tracing::instrument(level = "trace", skip(self))]
async fn do_log_compaction(&self) -> Result<Snapshot<Self::SnapshotData>, StorageError> {
self.inner().do_log_compaction().await
async fn build_snapshot(&self) -> Result<Snapshot<Self::SnapshotData>, StorageError> {
self.inner().build_snapshot().await
}

#[tracing::instrument(level = "trace", skip(self))]
Expand Down
38 changes: 19 additions & 19 deletions openraft/src/testing/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ where
let store = builder.build().await;
Self::feed_10_logs_vote_self(&store).await?;

store.delete_logs_from(0..=0).await?;
store.delete_log(0..=0).await?;

let ent = store.try_get_log_entry(3).await?;
assert_eq!(Some(LogId { term: 1, index: 3 }), ent.map(|x| x.log_id));
Expand Down Expand Up @@ -505,7 +505,7 @@ where
{
store.append_to_log(&[&blank(1, 1), &blank(1, 2)]).await?;

store.delete_logs_from(0..2).await?;
store.delete_log(0..2).await?;

// NOTE: it assumes non applied logs always exist.
let log_id = store.first_known_log_id().await?;
Expand All @@ -523,7 +523,7 @@ where
let log_id = store.first_known_log_id().await?;
assert_eq!(Some(LogId::new(1, 2)), log_id, "least id is in log");

store.delete_logs_from(0..3).await?;
store.delete_log(0..3).await?;
let log_id = store.first_known_log_id().await?;
assert_eq!(Some(LogId::new(1, 3)), log_id, "no logs, returns last applied log id");
}
Expand Down Expand Up @@ -571,7 +571,7 @@ where

tracing::info!("--- no logs, return default");
{
store.delete_logs_from(..).await?;
store.delete_log(..).await?;

let (first_log_id, last_log_id) = store.get_log_state().await?;
assert_eq!(None, first_log_id);
Expand Down Expand Up @@ -608,7 +608,7 @@ where

tracing::info!("--- no logs, return default");
{
store.delete_logs_from(..).await?;
store.delete_log(..).await?;

let log_id = store.first_id_in_log().await?;
assert_eq!(None, log_id);
Expand Down Expand Up @@ -656,7 +656,7 @@ where

tracing::info!("--- no logs, return default");
{
store.delete_logs_from(..).await?;
store.delete_log(..).await?;

let log_id = store.last_id_in_log().await?;
assert_eq!(None, log_id);
Expand Down Expand Up @@ -721,7 +721,7 @@ where
let store = builder.build().await;
Self::feed_10_logs_vote_self(&store).await?;

store.delete_logs_from(1..1).await?;
store.delete_log(1..1).await?;

let logs = store.get_log_entries(1..11).await?;
assert_eq!(logs.len(), 10, "expected all (10) logs to be preserved");
Expand All @@ -732,9 +732,9 @@ where
let store = builder.build().await;
Self::feed_10_logs_vote_self(&store).await?;

store.delete_logs_from(..=0).await?;
store.delete_log(..=0).await?;

store.delete_logs_from(1..4).await?;
store.delete_log(1..4).await?;

let logs = store.try_get_log_entries(0..100).await?;
assert_eq!(logs.len(), 7);
Expand All @@ -746,9 +746,9 @@ where
let store = builder.build().await;
Self::feed_10_logs_vote_self(&store).await?;

store.delete_logs_from(..=0).await?;
store.delete_log(..=0).await?;

store.delete_logs_from(1..1000).await?;
store.delete_log(1..1000).await?;
let logs = store.try_get_log_entries(0..).await?;

assert_eq!(logs.len(), 0);
Expand All @@ -759,9 +759,9 @@ where
let store = builder.build().await;
Self::feed_10_logs_vote_self(&store).await?;

store.delete_logs_from(..=0).await?;
store.delete_log(..=0).await?;

store.delete_logs_from(1..).await?;
store.delete_log(1..).await?;
let logs = store.try_get_log_entries(0..100).await?;

assert_eq!(logs.len(), 0);
Expand All @@ -774,7 +774,7 @@ where
let store = builder.build().await;
Self::feed_10_logs_vote_self(&store).await?;

store.delete_logs_from(..=0).await?;
store.delete_log(..=0).await?;

store
.append_to_log(&[&Entry {
Expand Down Expand Up @@ -1171,7 +1171,7 @@ where
let store = builder.build().await;
Self::feed_10_logs_vote_self(&store).await?;

store.delete_logs_from(..=0).await?;
store.delete_log(..=0).await?;

store.get_log_entries(..).await?;
store.get_log_entries(5..).await?;
Expand Down Expand Up @@ -1233,7 +1233,7 @@ where
let store = builder.build().await;
Self::feed_10_logs_vote_self(&store).await?;

let res = store.delete_logs_from(10..10).await;
let res = store.delete_log(10..10).await;

let e = res.unwrap_err().into_defensive().unwrap();
assert_eq!(ErrorSubject::Logs, e.subject);
Expand All @@ -1245,7 +1245,7 @@ where
e.violation
);

let res = store.delete_logs_from(1..5).await;
let res = store.delete_log(1..5).await;

let e = res.unwrap_err().into_defensive().unwrap();
assert_eq!(ErrorSubject::Logs, e.subject);
Expand Down Expand Up @@ -1341,7 +1341,7 @@ where

store.apply_to_state_machine(&[&blank(0, 0), &blank(1, 1), &blank(1, 2)]).await?;

store.delete_logs_from(1..).await?;
store.delete_log(1..).await?;

let res = store
.append_to_log(&[&Entry {
Expand Down Expand Up @@ -1395,7 +1395,7 @@ where

store.apply_to_state_machine(&[&blank(0, 0), &blank(2, 1), &blank(2, 2)]).await?;

store.delete_logs_from(1..).await?;
store.delete_log(1..).await?;

let res = store.append_to_log(&[&blank(1, 3)]).await;

Expand Down

0 comments on commit abda0d1

Please sign in to comment.