Skip to content

Commit

Permalink
Introduce a recovery strategy as a replacement for set_discard_if_cor…
Browse files Browse the repository at this point in the history
…rupted

The user can now choose between discarding corrupted data, moving it out
of the way (into another file) and starting with an empty database or
bubbling up the error.

Fixes #234
  • Loading branch information
badboy committed Nov 1, 2023
1 parent 9f930b4 commit e055fb8
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 20 deletions.
13 changes: 13 additions & 0 deletions src/backend/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,16 @@ pub enum WriteFlags {
APPEND,
APPEND_DUP,
}

/// Strategy to use when corrupted data is detected while opening a database.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RecoveryStrategy {
/// Bubble up the error on detecting a corrupted data file. The default.
Error,

/// Discard the corrupted data and start with an empty database.
Discard,

/// Move the corrupted data file to `$file.corrupt` and start with an empty database.
Rename,
}
4 changes: 3 additions & 1 deletion src/backend/impl_lmdb/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::{
DatabaseFlagsImpl, DatabaseImpl, EnvironmentFlagsImpl, ErrorImpl, InfoImpl, RoTransactionImpl,
RwTransactionImpl, StatImpl,
};
use crate::backend::common::RecoveryStrategy;
use crate::backend::traits::{
BackendEnvironment, BackendEnvironmentBuilder, BackendInfo, BackendIter, BackendRoCursor,
BackendRoCursorTransaction, BackendStat,
Expand Down Expand Up @@ -86,7 +87,8 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
self
}

fn set_discard_if_corrupted(&mut self, _discard_if_corrupted: bool) -> &mut Self {
/// **UNIMPLEMENTED.** Will panic at runtime.
fn set_corruption_recovery_strategy(&mut self, _strategy: RecoveryStrategy) -> &mut Self {
// Unfortunately, when opening a database, LMDB doesn't handle all the ways it could have
// been corrupted. Prefer using the `SafeMode` backend if this is important.
unimplemented!();
Expand Down
49 changes: 34 additions & 15 deletions src/backend/impl_safe/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ use super::{
database::Database, DatabaseFlagsImpl, DatabaseImpl, EnvironmentFlagsImpl, ErrorImpl, InfoImpl,
RoTransactionImpl, RwTransactionImpl, StatImpl,
};
use crate::backend::common::RecoveryStrategy;
use crate::backend::traits::{BackendEnvironment, BackendEnvironmentBuilder};

const DEFAULT_DB_FILENAME: &str = "data.safe.bin";
const DEFAULT_CORRUPT_DB_EXTENSION: &str = "bin.corrupt";

type DatabaseArena = Arena<Database>;
type DatabaseNameMap = HashMap<Option<String>, DatabaseImpl>;
Expand All @@ -38,7 +40,7 @@ pub struct EnvironmentBuilderImpl {
max_dbs: Option<usize>,
map_size: Option<usize>,
make_dir_if_needed: bool,
discard_if_corrupted: bool,
corruption_recovery_strategy: RecoveryStrategy,
}

impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
Expand All @@ -53,7 +55,7 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
max_dbs: None,
map_size: None,
make_dir_if_needed: false,
discard_if_corrupted: false,
corruption_recovery_strategy: RecoveryStrategy::Error,
}
}

Expand Down Expand Up @@ -85,8 +87,8 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
self
}

fn set_discard_if_corrupted(&mut self, discard_if_corrupted: bool) -> &mut Self {
self.discard_if_corrupted = discard_if_corrupted;
fn set_corruption_recovery_strategy(&mut self, strategy: RecoveryStrategy) -> &mut Self {
self.corruption_recovery_strategy = strategy;
self
}

Expand All @@ -106,7 +108,7 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
self.max_dbs,
self.map_size,
)?;
env.read_from_disk(self.discard_if_corrupted)?;
env.read_from_disk(self.corruption_recovery_strategy)?;
Ok(env)
}
}
Expand Down Expand Up @@ -152,16 +154,32 @@ impl EnvironmentImpl {
Ok(bincode::serialize(&data)?)
}

fn deserialize(
bytes: &[u8],
discard_if_corrupted: bool,
fn load(
path: &Path,
strategy: RecoveryStrategy,
) -> Result<(DatabaseArena, DatabaseNameMap), ErrorImpl> {
let bytes = fs::read(path)?;

match Self::deserialize(&bytes) {
Ok((arena, name_map)) => Ok((arena, name_map)),
Err(err) => match strategy {
RecoveryStrategy::Error => Err(err),
RecoveryStrategy::Discard => Ok((DatabaseArena::new(), HashMap::new())),
RecoveryStrategy::Rename => {
let corrupted_path = path.with_extension(DEFAULT_CORRUPT_DB_EXTENSION);
fs::rename(path, corrupted_path)?;

Ok((DatabaseArena::new(), HashMap::new()))
}
},
}
}

fn deserialize(bytes: &[u8]) -> Result<(DatabaseArena, DatabaseNameMap), ErrorImpl> {
let mut arena = DatabaseArena::new();
let mut name_map = HashMap::new();
let data: HashMap<_, _> = match bincode::deserialize(bytes) {
Err(_) if discard_if_corrupted => Ok(HashMap::new()),
result => result,
}?;
let data: HashMap<_, _> = bincode::deserialize(bytes)?;

for (name, db) in data {
name_map.insert(name, DatabaseImpl(arena.alloc(db)));
}
Expand Down Expand Up @@ -199,15 +217,15 @@ impl EnvironmentImpl {
})
}

pub(crate) fn read_from_disk(&mut self, discard_if_corrupted: bool) -> Result<(), ErrorImpl> {
pub(crate) fn read_from_disk(&mut self, strategy: RecoveryStrategy) -> Result<(), ErrorImpl> {
let mut path = Cow::from(&self.path);
if fs::metadata(&path)?.is_dir() {
path.to_mut().push(DEFAULT_DB_FILENAME);
};
if fs::metadata(&path).is_err() {
return Ok(());
};
let (arena, name_map) = Self::deserialize(&fs::read(&path)?, discard_if_corrupted)?;
let (arena, name_map) = Self::load(&path, strategy)?;
self.dbs = RwLock::new(EnvironmentDbs { arena, name_map });
Ok(())
}
Expand Down Expand Up @@ -272,7 +290,8 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
// TOOD: don't reallocate `name`.
let key = name.map(String::from);
let mut dbs = self.dbs.write().map_err(|_| ErrorImpl::EnvPoisonError)?;
if dbs.name_map.keys().filter_map(|k| k.as_ref()).count() >= self.max_dbs && name.is_some() {
if dbs.name_map.keys().filter_map(|k| k.as_ref()).count() >= self.max_dbs && name.is_some()
{
return Err(ErrorImpl::DbsFull);
}
let parts = EnvironmentDbsRefMut::from(dbs.deref_mut());
Expand Down
5 changes: 3 additions & 2 deletions src/backend/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
};

use crate::{
backend::common::{DatabaseFlags, EnvironmentFlags, WriteFlags},
backend::common::{DatabaseFlags, EnvironmentFlags, RecoveryStrategy, WriteFlags},
error::StoreError,
};

Expand Down Expand Up @@ -83,7 +83,8 @@ pub trait BackendEnvironmentBuilder<'b>: Debug + Eq + PartialEq + Copy + Clone {

fn set_make_dir_if_needed(&mut self, make_dir_if_needed: bool) -> &mut Self;

fn set_discard_if_corrupted(&mut self, discard_if_corrupted: bool) -> &mut Self;
/// Set the corruption recovery strategy. See [`RecoveryStrategy`] for details.
fn set_corruption_recovery_strategy(&mut self, strategy: RecoveryStrategy) -> &mut Self;

fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error>;
}
Expand Down
95 changes: 93 additions & 2 deletions tests/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tempfile::Builder;
#[cfg(feature = "lmdb")]
use rkv::backend::{Lmdb, LmdbEnvironment};
use rkv::{
backend::{BackendEnvironmentBuilder, SafeMode, SafeModeEnvironment},
backend::{BackendEnvironmentBuilder, RecoveryStrategy, SafeMode, SafeModeEnvironment},
CloseOptions, Rkv, StoreOptions, Value,
};

Expand Down Expand Up @@ -247,7 +247,7 @@ fn test_safe_mode_corrupt_while_open_1() {

// But we can use a builder and pass `discard_if_corrupted` to deal with it.
let mut builder = Rkv::environment_builder::<SafeMode>();
builder.set_discard_if_corrupted(true);
builder.set_corruption_recovery_strategy(RecoveryStrategy::Discard);
manager
.get_or_create_from_builder(root.path(), builder, Rkv::from_builder::<SafeMode>)
.expect("created");
Expand Down Expand Up @@ -378,3 +378,94 @@ fn test_safe_mode_corrupt_while_open_2() {
Some(Value::Str("byé, yöu"))
);
}

/// Test how the manager can discard corrupted databases, while moving the corrupted one aside for
/// later inspection.
#[test]
fn test_safe_mode_corrupt_while_open_3() {
type Manager = rkv::Manager<SafeModeEnvironment>;

let root = Builder::new()
.prefix("test_safe_mode_corrupt_while_open_3")
.tempdir()
.expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");

let mut safebin = root.path().to_path_buf();
safebin.push("data.safe.bin");

// Oops, corruption.
fs::write(&safebin, "bogus").expect("dbfile corrupted");
assert!(safebin.exists(), "Corrupted database file was written to");

// Create environment.
let mut manager = Manager::singleton().write().unwrap();

// Recreating environment fails.
manager
.get_or_create(root.path(), Rkv::new::<SafeMode>)
.expect_err("not created");
assert!(manager.get(root.path()).expect("success").is_none());

// But we can use a builder and pass `RecoveryStrategy::Rename` to deal with it.
let mut builder = Rkv::environment_builder::<SafeMode>();
builder.set_corruption_recovery_strategy(RecoveryStrategy::Rename);
manager
.get_or_create_from_builder(root.path(), builder, Rkv::from_builder::<SafeMode>)
.expect("created");
assert!(manager.get(root.path()).expect("success").is_some());

assert!(!safebin.exists(), "Database file was moved out of the way");

let mut corruptbin = root.path().to_path_buf();
corruptbin.push("data.safe.bin.corrupt");
assert!(corruptbin.exists(), "Corrupted database file exists");

let shared_env = manager
.get_or_create(root.path(), Rkv::new::<SafeMode>)
.expect("created");
let env = shared_env.read().unwrap();

// Writing still works.
let store = env
.open_single("store", StoreOptions::create())
.expect("opened");

let reader = env.read().expect("reader");
assert_eq!(store.get(&reader, "foo").expect("read"), None, "Nothing to be read");

// We can write.
let mut writer = env.write().expect("writer");
store
.put(&mut writer, "foo", &Value::I64(5678))
.expect("wrote");
writer.commit().expect("committed");
env.sync(true).expect("synced");

assert!(safebin.exists(), "Database file exists");

// Close everything.
drop(env);
drop(shared_env);
manager
.try_close(root.path(), CloseOptions::default())
.expect("closed without deleting");
assert!(manager.get(root.path()).expect("success").is_none());

// Recreate environment.
let shared_env = manager
.get_or_create(root.path(), Rkv::new::<SafeMode>)
.expect("created");
let env = shared_env.read().unwrap();

// Verify that the dbfile is not corrupted.
let store = env
.open_single("store", StoreOptions::default())
.expect("opened");
let reader = env.read().expect("reader");
assert_eq!(
store.get(&reader, "foo").expect("read"),
Some(Value::I64(5678)),
"Database contains expected value"
);
}

0 comments on commit e055fb8

Please sign in to comment.