Skip to content

Commit

Permalink
Refactor: memstore stores serialized entry
Browse files Browse the repository at this point in the history
So that Entry does not need to be `Clone`. Therefore `Clone` can be
removed from `AppData`.
  • Loading branch information
drmingdrmer committed Mar 26, 2023
1 parent 9093414 commit 19c94f4
Showing 1 changed file with 25 additions and 10 deletions.
35 changes: 25 additions & 10 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use openraft::storage::Snapshot;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::RaftLogId;
use openraft::RaftStorage;
use openraft::RaftStorageDebug;
use openraft::SnapshotMeta;
Expand Down Expand Up @@ -100,8 +101,8 @@ pub struct MemStoreStateMachine {
pub struct MemStore {
last_purged_log_id: RwLock<Option<LogId<MemNodeId>>>,

/// The Raft log.
log: RwLock<BTreeMap<u64, Entry<Config>>>,
/// The Raft log. Logs are stored in serialized json.
log: RwLock<BTreeMap<u64, String>>,

/// The Raft state machine.
sm: RwLock<MemStoreStateMachine>,
Expand Down Expand Up @@ -157,27 +158,39 @@ impl RaftLogReader<Config> for Arc<MemStore> {
&mut self,
range: RB,
) -> Result<Vec<Entry<Config>>, StorageError<MemNodeId>> {
let res = {
let mut entries = vec![];
{
let log = self.log.read().await;
log.range(range.clone()).map(|(_, val)| val.clone()).collect::<Vec<_>>()
for (_, serialized) in log.range(range.clone()) {
let ent = serde_json::from_str(serialized).map_err(|e| StorageIOError::read_logs(&e))?;
entries.push(ent);
}
};

Ok(res)
Ok(entries)
}

async fn get_log_state(&mut self) -> Result<LogState<Config>, StorageError<MemNodeId>> {
let log = self.log.read().await;
let last = log.iter().rev().next().map(|(_, ent)| ent.log_id);
let last_serialized = log.iter().rev().next().map(|(_, ent)| ent);

let last = match last_serialized {
None => None,
Some(serialized) => {
let ent: Entry<Config> = serde_json::from_str(serialized).map_err(|e| StorageIOError::read_logs(&e))?;
Some(*ent.get_log_id())
}
};

let last_deleted = *self.last_purged_log_id.read().await;
let last_purged = *self.last_purged_log_id.read().await;

let last = match last {
None => last_deleted,
None => last_purged,
Some(x) => Some(x),
};

Ok(LogState {
last_purged_log_id: last_deleted,
last_purged_log_id: last_purged,
last_log_id: last,
})
}
Expand Down Expand Up @@ -305,7 +318,9 @@ impl RaftStorage<Config> for Arc<MemStore> {
async fn append_to_log(&mut self, entries: &[Entry<Config>]) -> Result<(), StorageError<MemNodeId>> {
let mut log = self.log.write().await;
for entry in entries {
log.insert(entry.log_id.index, (*entry).clone());
let s =
serde_json::to_string(entry).map_err(|e| StorageIOError::write_log_entry(*entry.get_log_id(), &e))?;
log.insert(entry.log_id.index, s);
}
Ok(())
}
Expand Down

0 comments on commit 19c94f4

Please sign in to comment.