Skip to content

Commit

Permalink
refactor: Improve logging and snapshot management
Browse files Browse the repository at this point in the history
This commit introduces several enhancements to the snapshot management
system:

- Improved Progress Logging for Snapshot Building

- **Introduction of `SMEntry`:**
   - A new structure, `SMEntry`, has been introduced to represent all
     variants of a state-machine record. `RaftStoreEntry` is a supper
     set of `SMEntry` that includes both state machine record and log
     records.

3. **Asynchronous Snapshot Writing:**
   - Added the `spawn_writer_thread()` function to facilitate the
     writing of snapshots in a separate thread, thereby improving the
     performance.
  • Loading branch information
drmingdrmer committed May 20, 2024
1 parent b39b311 commit 285600e
Show file tree
Hide file tree
Showing 10 changed files with 329 additions and 240 deletions.
27 changes: 14 additions & 13 deletions src/binaries/metactl/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use databend_common_meta_raft_store::ondisk::DataVersion;
use databend_common_meta_raft_store::ondisk::OnDisk;
use databend_common_meta_raft_store::sm_v002::leveled_store::sys_data_api::SysDataApiRO;
use databend_common_meta_raft_store::sm_v002::SnapshotStoreV002;
use databend_common_meta_raft_store::sm_v002::WriteEntry;
use databend_common_meta_raft_store::state::RaftState;
use databend_common_meta_sled_store::get_sled_db;
use databend_common_meta_sled_store::init_sled_db;
Expand Down Expand Up @@ -172,18 +173,20 @@ async fn import_v002(
let mut max_log_id: Option<LogId> = None;
let mut trees = BTreeMap::new();

let mut snapshot_store = SnapshotStoreV002::new(DataVersion::V002, raft_config);
let mut writer = snapshot_store.new_writer()?;
let snapshot_store = SnapshotStoreV002::new(DataVersion::V002, raft_config);

let (tx, join_handle) = snapshot_store.spawn_writer_thread("import_v002");

for line in lines {
let l = line?;
let (tree_name, kv_entry): (String, RaftStoreEntry) = serde_json::from_str(&l)?;

if tree_name.starts_with("state_machine/") {
// Write to snapshot
writer
.write_entry_results::<io::Error>(futures::stream::iter([Ok(kv_entry)]))
.await?;
let sm_entry = kv_entry.try_into().map_err(|err_str| {
anyhow::anyhow!("Failed to convert RaftStoreEntry to SMEntry: {}", err_str)
})?;
tx.send(WriteEntry::Data(sm_entry)).await?;
} else {
// Write to sled tree
if !trees.contains_key(&tree_name) {
Expand All @@ -208,14 +211,12 @@ async fn import_v002(
for tree in trees.values() {
tree.flush()?;
}
let (snapshot_id, snapshot_size) = writer.commit(None)?;

eprintln!(
"Imported {} records, snapshot id: {}; snapshot size: {}",
n,
snapshot_id.to_string(),
snapshot_size
);

tx.send(WriteEntry::Finish).await?;

let (_snapshot_store, snapshot_stat) = join_handle.await??;

eprintln!("Imported {} records, snapshot: {}", n, snapshot_stat,);
Ok(max_log_id)
}

Expand Down
144 changes: 111 additions & 33 deletions src/meta/raft-store/src/key_spaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,86 @@ impl SledKeySpace for DataHeader {
type V = Header;
}

/// Serialize SledKeySpace key value pair
macro_rules! serialize_for_sled {
($ks:tt, $key:expr, $value:expr) => {
Ok(($ks::serialize_key($key)?, $ks::serialize_value($value)?))
};
}

/// Convert (sub_tree_prefix, key, value, key_space1, key_space2...) into a [`RaftStoreEntry`].
///
/// It compares the sub_tree_prefix with prefix defined by every key space to determine which key space it belongs to.
macro_rules! deserialize_by_prefix {
($prefix: expr, $vec_key: expr, $vec_value: expr, $($key_space: tt),+ ) => {
$(

if <$key_space as SledKeySpace>::PREFIX == $prefix {

let key = SledOrderedSerde::de($vec_key)?;
let value = SledSerde::de($vec_value)?;

// Self reference the enum that use this macro
return Ok(Self::$key_space { key, value, });
}
)+
};
}

/// Enum of key-value pairs that are used in the raft state machine.
///
/// It is a sub set of [`RaftStoreEntry`] and contains only the types used by state-machine.
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SMEntry {
DataHeader { key: <DataHeader as SledKeySpace>::K, value: <DataHeader as SledKeySpace>::V, },
Nodes { key: <Nodes as SledKeySpace>::K, value: <Nodes as SledKeySpace>::V, },
StateMachineMeta { key: <StateMachineMeta as SledKeySpace>::K, value: <StateMachineMeta as SledKeySpace>::V, },
Expire { key: <Expire as SledKeySpace>::K, value: <Expire as SledKeySpace>::V, },
GenericKV { key: <GenericKV as SledKeySpace>::K, value: <GenericKV as SledKeySpace>::V, },
Sequences { key: <Sequences as SledKeySpace>::K, value: <Sequences as SledKeySpace>::V, },
}

impl SMEntry {
/// Serialize a key-value entry into a two elt vec of vec<u8>: `[key, value]`.
#[rustfmt::skip]
pub fn serialize(kv: &SMEntry) -> Result<(sled::IVec, sled::IVec), MetaStorageError> {

match kv {
Self::DataHeader { key, value } => serialize_for_sled!(DataHeader, key, value),
Self::Nodes { key, value } => serialize_for_sled!(Nodes, key, value),
Self::StateMachineMeta { key, value } => serialize_for_sled!(StateMachineMeta, key, value),
Self::Expire { key, value } => serialize_for_sled!(Expire, key, value),
Self::GenericKV { key, value } => serialize_for_sled!(GenericKV, key, value),
Self::Sequences { key, value } => serialize_for_sled!(Sequences, key, value),
}
}

/// Deserialize a serialized key-value entry `[key, value]`.
///
/// It is able to deserialize openraft-v7 or openraft-v8 key-value pairs.
/// The compatibility is provided by [`SledSerde`] implementation for value types.
pub fn deserialize(prefix_key: &[u8], vec_value: &[u8]) -> Result<Self, MetaStorageError> {
let prefix = prefix_key[0];
let vec_key = &prefix_key[1..];

deserialize_by_prefix!(
prefix,
vec_key,
vec_value,
// Available key spaces:
DataHeader,
Nodes,
StateMachineMeta,
Expire,
GenericKV,
Sequences
);

unreachable!("unknown prefix: {}", prefix);
}
}

/// Enum of key-value pairs that are used in the raft storage impl for meta-service.
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -151,23 +231,18 @@ impl RaftStoreEntry {
/// Serialize a key-value entry into a two elt vec of vec<u8>: `[key, value]`.
#[rustfmt::skip]
pub fn serialize(kv: &RaftStoreEntry) -> Result<(sled::IVec, sled::IVec), MetaStorageError> {
macro_rules! ser {
($ks:tt, $key:expr, $value:expr) => {
Ok(($ks::serialize_key($key)?, $ks::serialize_value($value)?))
};
}

match kv {
Self::DataHeader { key, value } => ser!(DataHeader, key, value),
Self::Logs { key, value } => ser!(Logs, key, value),
Self::Nodes { key, value } => ser!(Nodes, key, value),
Self::StateMachineMeta { key, value } => ser!(StateMachineMeta, key, value),
Self::RaftStateKV { key, value } => ser!(RaftStateKV, key, value),
Self::Expire { key, value } => ser!(Expire, key, value),
Self::GenericKV { key, value } => ser!(GenericKV, key, value),
Self::Sequences { key, value } => ser!(Sequences, key, value),
Self::ClientLastResps { key, value } => ser!(ClientLastResps, key, value),
Self::LogMeta { key, value } => ser!(LogMeta, key, value),
Self::DataHeader { key, value } => serialize_for_sled!(DataHeader, key, value),
Self::Logs { key, value } => serialize_for_sled!(Logs, key, value),
Self::Nodes { key, value } => serialize_for_sled!(Nodes, key, value),
Self::StateMachineMeta { key, value } => serialize_for_sled!(StateMachineMeta, key, value),
Self::RaftStateKV { key, value } => serialize_for_sled!(RaftStateKV, key, value),
Self::Expire { key, value } => serialize_for_sled!(Expire, key, value),
Self::GenericKV { key, value } => serialize_for_sled!(GenericKV, key, value),
Self::Sequences { key, value } => serialize_for_sled!(Sequences, key, value),
Self::ClientLastResps { key, value } => serialize_for_sled!(ClientLastResps, key, value),
Self::LogMeta { key, value } => serialize_for_sled!(LogMeta, key, value),
}
}

Expand All @@ -179,24 +254,6 @@ impl RaftStoreEntry {
let prefix = prefix_key[0];
let vec_key = &prefix_key[1..];

// Convert (sub_tree_prefix, key, value, key_space1, key_space2...) into a [`RaftStoreEntry`].
//
// It compares the sub_tree_prefix with prefix defined by every key space to determine which key space it belongs to.
macro_rules! deserialize_by_prefix {
($prefix: expr, $vec_key: expr, $vec_value: expr, $($key_space: tt),+ ) => {
$(

if <$key_space as SledKeySpace>::PREFIX == $prefix {

let key = SledOrderedSerde::de($vec_key)?;
let value = SledSerde::de($vec_value)?;

return Ok(RaftStoreEntry::$key_space { key, value, });
}
)+
};
}

deserialize_by_prefix!(
prefix,
vec_key,
Expand All @@ -217,3 +274,24 @@ impl RaftStoreEntry {
unreachable!("unknown prefix: {}", prefix);
}
}

impl TryInto<SMEntry> for RaftStoreEntry {
type Error = String;

#[rustfmt::skip]
fn try_into(self) -> Result<SMEntry, Self::Error> {
match self {
Self::DataHeader { key, value } => Ok(SMEntry::DataHeader { key, value }),
Self::Nodes { key, value } => Ok(SMEntry::Nodes { key, value }),
Self::StateMachineMeta { key, value } => Ok(SMEntry::StateMachineMeta { key, value }),
Self::Expire { key, value } => Ok(SMEntry::Expire { key, value }),
Self::GenericKV { key, value } => Ok(SMEntry::GenericKV { key, value }),
Self::Sequences { key, value } => Ok(SMEntry::Sequences { key, value }),

Self::Logs { .. } => {Err("SMEntry does not contain Logs".to_string())},
Self::RaftStateKV { .. } => {Err("SMEntry does not contain RaftStateKV".to_string())}
Self::ClientLastResps { .. } => {Err("SMEntry does not contain ClientLastResps".to_string())}
Self::LogMeta { .. } => {Err("SMEntry does not contain LogMeta".to_string())}
}
}
}
57 changes: 28 additions & 29 deletions src/meta/raft-store/src/ondisk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ pub use header::Header;
use log::debug;
use log::info;
use openraft::AnyError;
use tokio::io;

use crate::config::RaftConfig;
use crate::key_spaces::DataHeader;
use crate::key_spaces::RaftStoreEntry;
use crate::key_spaces::SMEntry;
use crate::log::TREE_RAFT_LOG;
use crate::sm_v002::SnapshotStoreV002;
use crate::sm_v002::WriteEntry;
use crate::state::TREE_RAFT_STATE;
use crate::state_machine::StateMachineMetaKey;

Expand Down Expand Up @@ -276,31 +276,33 @@ impl OnDisk {
&mut self,
sm_tree_name: &str,
) -> Result<(), MetaStorageError> {
let mut cnt = 0;
// Helper function to create a snapshot error.
fn snap_err(e: impl std::error::Error + 'static, context: &str) -> MetaStorageError {
let ae = AnyError::new(&e).add_context(|| context);
MetaStorageError::SnapshotError(ae)
}

let tree = self.db.open_tree(sm_tree_name)?;

let mut snapshot_store = SnapshotStoreV002::new(DataVersion::V002, self.config.clone());
let snapshot_store = SnapshotStoreV002::new(DataVersion::V002, self.config.clone());

let mut writer = snapshot_store.new_writer().map_err(|e| {
let ae = AnyError::new(&e).add_context(|| "new snapshot writer");
MetaStorageError::SnapshotError(ae)
})?;
let (tx, join_handle) = snapshot_store.spawn_writer_thread("upgrade-v001-to-v002-snapshot");

for ivec_pair_res in tree.iter() {
let kv_entry = {
let sm_entry = {
let (k_ivec, v_ivec) = ivec_pair_res?;
RaftStoreEntry::deserialize(&k_ivec, &v_ivec)?
SMEntry::deserialize(&k_ivec, &v_ivec)?
};

debug!(
kv_entry :? =(&kv_entry);
kv_entry :? =(&sm_entry);
"upgrade kv from {:?}", self.header.version
);

if let RaftStoreEntry::StateMachineMeta {
if let SMEntry::StateMachineMeta {
key: StateMachineMetaKey::Initialized,
..
} = kv_entry
} = sm_entry
{
self.progress(format_args!(
"Skip no longer used state machine key: {}",
Expand All @@ -309,27 +311,24 @@ impl OnDisk {
continue;
}

writer
.write_entry_results::<io::Error>(futures::stream::iter([Ok(kv_entry)]))
tx.send(WriteEntry::Data(sm_entry))
.await
.map_err(|e| {
let ae = AnyError::new(&e).add_context(|| "write snapshot entry");
MetaStorageError::SnapshotError(ae)
})?;

cnt += 1;
.map_err(|e| snap_err(e, "send SMEntry"))?;
}

let (snapshot_id, file_size) = writer.commit(None).map_err(|e| {
let ae = AnyError::new(&e).add_context(|| "commit snapshot");
MetaStorageError::SnapshotError(ae)
})?;
tx.send(WriteEntry::Finish)
.await
.map_err(|e| snap_err(e, "send Commit"))?;

let (snapshot_store, snapshot_stat) = join_handle
.await
.map_err(|e| snap_err(e, "join snapshot writer thread"))?
.map_err(|e| snap_err(e, "writer error"))?;

self.progress(format_args!(
"Written {} records to snapshot, filesize: {}, path: {}",
cnt,
file_size,
snapshot_store.snapshot_path(&snapshot_id.to_string())
"Written to snapshot: {}, path: {}",
snapshot_stat,
snapshot_store.snapshot_path(&snapshot_stat.snapshot_id.to_string())
));

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions src/meta/raft-store/src/sm_v002/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod leveled_store;
pub(in crate::sm_v002) mod marked;
#[allow(clippy::module_inception)]
mod sm_v002;
mod snapshot_stat;
mod snapshot_store;
mod snapshot_view_v002;
mod writer_v002;
Expand All @@ -29,6 +30,7 @@ mod snapshot_view_v002_test;

pub use importer::Importer;
pub use sm_v002::SMV002;
pub use snapshot_stat::SnapshotStat;
pub use snapshot_store::SnapshotStoreError;
pub use snapshot_store::SnapshotStoreV002;
pub use snapshot_view_v002::SnapshotViewV002;
Expand Down
44 changes: 44 additions & 0 deletions src/meta/raft-store/src/sm_v002/snapshot_stat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;
use std::fmt::Formatter;

use crate::state_machine::MetaSnapshotId;

/// Snapshot stat.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct SnapshotStat {
pub snapshot_id: MetaSnapshotId,

/// Total number of entries in the snapshot.
///
/// Including meta entries, such as seq, nodes, generic kv, and expire index
pub entry_cnt: u64,

/// Size in bytes of the snapshot file
pub size: u64,
}

impl fmt::Display for SnapshotStat {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"{{ snapshot_id: {}, entry_cnt: {}, size: {} }}",
self.snapshot_id.to_string(),
self.entry_cnt,
self.size
)
}
}
Loading

0 comments on commit 285600e

Please sign in to comment.