Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pending Part #1

Open
wants to merge 81 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
7777f7a
init in_memory_tree
rongma7 Sep 6, 2024
95f4177
replace pending_part with VersionedHashMap
rongma7 Sep 6, 2024
b69c5e1
convert error
rongma7 Sep 6, 2024
8f4ee43
impl get_pending_part
rongma7 Sep 6, 2024
550bd4f
impl add_to_pending_part
rongma7 Sep 6, 2024
59da8c7
rename pending_part
rongma7 Sep 9, 2024
996b0d1
impl pending then history
rongma7 Sep 9, 2024
d03aaab
fix get_historical_part
rongma7 Sep 9, 2024
aaddb8e
VersionedHashMap.history supports delete_commitid
rongma7 Sep 9, 2024
1986b52
Tree's rollbacks to support lazy delete
rongma7 Sep 9, 2024
cdc15f5
impl change_root for pending's Tree
rongma7 Sep 10, 2024
e56791b
Merge branch 'master' into pending
ChenxingLi Sep 10, 2024
5d2006c
impl change_root for Pending's VersionedHashMap
rongma7 Sep 10, 2024
d8b0b14
Merge branch 'master' into pending
ChenxingLi Sep 10, 2024
0b4ea80
cargo fmt
ChenxingLi Sep 10, 2024
67d363e
move pending part to a module in the crate
rongma7 Sep 10, 2024
f046059
Add Hash to the VersionedKeyValueSchema::Key
rongma7 Sep 10, 2024
93ff831
target_commit != commit will not happen
rongma7 Sep 10, 2024
a704229
Adjust the orders in get_pending_part's match
rongma7 Sep 10, 2024
327896c
pending.add_node does not allow repeat keys
rongma7 Sep 10, 2024
89be61f
use PendingKeyValueSchema & fix bug in add_node
rongma7 Sep 10, 2024
a8aa9cd
rename xxx_to_xxx to get_xxx_by_xxx
rongma7 Sep 10, 2024
2873c51
while let Some(parent_slab_index)
rongma7 Sep 11, 2024
82c48dd
while let Some(parent_slab_index) in find_path
rongma7 Sep 11, 2024
a62363e
impl get_slab_index_by_commit_id
rongma7 Sep 11, 2024
61aefca
rename current_up to export_rollback_data
rongma7 Sep 11, 2024
c4feb49
derive Eq for PendingError
rongma7 Sep 11, 2024
81ab0c8
Move history_inner_map outside the loop.
rongma7 Sep 11, 2024
ce0df7f
remove unnecessary bracket
rongma7 Sep 11, 2024
b20b6c3
early report CommitIdAlreadyExists in add_node
rongma7 Sep 11, 2024
cadccf4
use if let Some(x) = xx {} else {} instd of unwrap
rongma7 Sep 11, 2024
68c0e7a
impl get_parent_node()
rongma7 Sep 11, 2024
54f9245
fix cargo clippy
rongma7 Sep 11, 2024
61fbf5a
cargo fmt
rongma7 Sep 11, 2024
707d9a5
define Key<S> then use it to define others
rongma7 Sep 12, 2024
311bced
cargo fmt
rongma7 Sep 12, 2024
69aa823
allow(clippy::type_complexity) History
rongma7 Sep 12, 2024
26967d0
allow(clippy::type_complexity) Current
rongma7 Sep 12, 2024
413103c
allow(clippy::type_complexity) ToCommit
rongma7 Sep 12, 2024
9b49ebd
allow(clippy::type_complexity) CIdVevPair
rongma7 Sep 12, 2024
90d12ea
allow(clippy::type_complexity) RollComm
rongma7 Sep 12, 2024
e02e1d7
Rewrite current maintainence
ChenxingLi Sep 13, 2024
07e7fe2
separate add_root & add_non_root_node
rongma7 Sep 13, 2024
f280dad
remove unused fn
rongma7 Sep 13, 2024
ae659e4
rename get_apply_map_from_root
rongma7 Sep 13, 2024
39b2672
cargo fmt
rongma7 Sep 13, 2024
70b560e
remove unused func
rongma7 Sep 13, 2024
bb464d9
add comments
rongma7 Sep 14, 2024
bd5984e
fix bug in change_root
rongma7 Sep 14, 2024
40a6b37
remove unnecessary pub
rongma7 Sep 14, 2024
9a6220e
impl confirmed_pending_to_history
rongma7 Sep 23, 2024
7027bbb
use height as history number
rongma7 Sep 23, 2024
3730042
index not use bitmap, !height as history_number
rongma7 Sep 24, 2024
659585f
complete confirmed_pending_to_history
rongma7 Sep 24, 2024
c56e6cc
add assertion
rongma7 Sep 24, 2024
c8aa8f7
rename variable
rongma7 Sep 24, 2024
972eccf
update get_pending_part
ChenxingLi Sep 24, 2024
280e04e
remove redundant history in pending part
rongma7 Sep 24, 2024
c0ce2e1
fix history_number as height & make height unique
rongma7 Sep 24, 2024
9dc4316
tablereader as Arc & pending_part as &'db RwLock
rongma7 Sep 25, 2024
4369116
move RwLock into pending_part's current
rongma7 Sep 26, 2024
fa9f914
pending part iter_historical_changes iterator&
rongma7 Sep 26, 2024
f8a5c78
impl iter_historical_changes_history_part
rongma7 Sep 27, 2024
b4e45e7
unify return type of iter history&pending changes
rongma7 Sep 27, 2024
d17811a
impl iter_historical_changes
rongma7 Sep 27, 2024
6dbd856
impl get_versioned_key
rongma7 Sep 27, 2024
2f5392e
impl pending_part's discard
rongma7 Sep 27, 2024
a27a8da
impl discard for VersionedStore
rongma7 Sep 29, 2024
0fa7168
OneStore for VersionedStore: KeyValueStoreManager
rongma7 Sep 29, 2024
bc3e7ea
impl pending part's get_versioned_store
rongma7 Sep 29, 2024
c402d99
impl get_versioned_store for VersionedStore
rongma7 Sep 29, 2024
71617f5
split to key_value_store_manager_impl.rs
rongma7 Sep 30, 2024
e6232d0
rearrange function organization for VersionedStore
rongma7 Sep 30, 2024
9df9ce8
remove unnecessary funcs
rongma7 Sep 30, 2024
ddf3c5d
modify tests for pending part's get_versioned_key
rongma7 Sep 30, 2024
e5dda2f
rename versioned_map
rongma7 Sep 30, 2024
db1275d
split current_map.rs
rongma7 Sep 30, 2024
402ed48
rearrange versioned_map's order of funcs
rongma7 Sep 30, 2024
341377c
split tree and node
rongma7 Sep 30, 2024
d229bc5
rearrange tree's order of funcs
rongma7 Sep 30, 2024
1eafd69
split tree into smaller .rs files
rongma7 Sep 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,11 @@ kvdb-rocksdb = "0.19"
kvdb = "0.13"
parking_lot = "0.12"

ethereum-types = "0.12"
ethereum-types = "0.12"

in-memory-tree = { path = "src/pending_part" }

[workspace]
members = [
"src/pending_part"
]
6 changes: 6 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use in_memory_tree::PendingError;
use thiserror::Error;

use crate::middlewares::CommitID;

#[derive(Error, Debug)]
pub enum StorageError {
#[error("unknown version")]
Expand All @@ -13,6 +16,9 @@ pub enum StorageError {

#[error("decode error {0:?}")]
DecodeError(#[from] DecodeError),

#[error("pending error {0:?}")]
PendingError(#[from] PendingError<CommitID>),
}

pub type Result<T> = ::std::result::Result<T, StorageError>;
Expand Down
46 changes: 40 additions & 6 deletions src/middlewares/versioned_flat_key_value/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
mod serde;
mod table_schema;

use in_memory_tree::VersionedHashMap;
use std::hash::Hash;

use self::table_schema::{HistoryChangeTable, HistoryIndicesTable, VersionedKeyValueSchema};

use super::ChangeKey;
use super::CommitIDSchema;
use crate::backends::TableReader;
Expand All @@ -23,18 +27,48 @@ impl HistoryIndices {
}
}

struct PendingPart;
// struct PendingPart;

pub struct VersionedStore<'db, T: VersionedKeyValueSchema> {
pending_part: PendingPart,
pub struct VersionedStore<'db, T: VersionedKeyValueSchema>
where
T::Key: Hash,
{
pending_part: VersionedHashMap<T::Key, CommitID, T::Value>,
history_index_table: TableReader<'db, HistoryIndicesTable<T>>,
commit_id_table: TableReader<'db, CommitIDSchema>,
change_history_table: KeyValueStoreBulks<'db, HistoryChangeTable<T>>,
}

impl<'db, T: VersionedKeyValueSchema> VersionedStore<'db, T> {
pub fn get_pending_part(&self, commit: CommitID, key: &T::Key) -> Result<Option<T::Value>> {
todo!()
impl<'db, T: VersionedKeyValueSchema> VersionedStore<'db, T>
where
T::Key: Hash,
{
pub fn get_pending_part(&mut self, commit: CommitID, key: &T::Key) -> Result<Option<T::Value>> {
let res_value = self.pending_part.query(commit, key);
let history_commit = match res_value {
Ok(None) => self.pending_part.get_parent_of_root(),
Err(in_memory_tree::PendingError::CommitIDNotFound(target_commit))
if target_commit == commit =>
{
Some(commit)
}
Ok(Some(value)) => return Ok(value),
Err(e) => return Err(StorageError::PendingError(e)),
};
if let Some(history_commit) = history_commit {
self.get_historical_part(history_commit, key)
} else {
Ok(None)
}
}

pub fn add_to_pending_part(
&mut self,
parent_commit: Option<CommitID>,
commit: CommitID,
updates: Vec<(T::Key, Option<T::Value>)>,
) -> Result<()> {
Ok(self.pending_part.add_node(updates, commit, parent_commit)?)
}

pub fn get_historical_part(&self, commit: CommitID, key: &T::Key) -> Result<Option<T::Value>> {
Expand Down
12 changes: 12 additions & 0 deletions src/pending_part/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "in-memory-tree"
version = "0.1.0"
edition = "2021"

[dependencies]
slab = "0.4.9"
thiserror = "1"

[dev-dependencies]
rand = "0.8.0"
rand_distr = "0.4.0"
284 changes: 284 additions & 0 deletions src/pending_part/src/commit_tree.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
use std::collections::{BTreeSet, HashMap, HashSet};
use std::{fmt::Debug, hash::Hash};

use slab::Slab;

use crate::PendingError;

type SlabIndex = usize;

pub struct TreeNode<Key: Eq + Hash + Clone, CommitId: Debug + Eq + Hash + Copy, Value: Clone> {
parent: Option<SlabIndex>,
children: BTreeSet<SlabIndex>,

// todo: test lazy height
// height will not be changed even when root is changed
// height is only used for lca
height: usize,

commit_id: CommitId,
// before current node, the old value of this key is modified by which commit_id,
// if none, this key is absent before current node
// here must use CommitID instead of SlabIndex (which may be reused, see slab doc)
modifications: Vec<(Key, Option<Value>, Option<CommitId>)>,
}

pub struct Tree<Key: Eq + Hash + Clone, CommitId: Debug + Eq + Hash + Copy, Value: Clone> {
nodes: Slab<TreeNode<Key, CommitId, Value>>,
index_map: HashMap<CommitId, SlabIndex>,
}

impl<Key: Eq + Hash + Clone, CommitId: Debug + Eq + Hash + Copy, Value: Clone>
Tree<Key, CommitId, Value>
{
pub fn new() -> Self {
Tree {
nodes: Slab::new(),
index_map: HashMap::new(),
}
}

fn contains_commit_id(&self, commit_id: &CommitId) -> bool {
self.index_map.contains_key(commit_id)
}

fn commit_id_to_slab_index(
&self,
commit_id: CommitId,
) -> Result<SlabIndex, PendingError<CommitId>> {
let slab_index = *self
.index_map
.get(&commit_id)
.ok_or_else(|| PendingError::CommitIDNotFound(commit_id))?;
Ok(slab_index)
}

fn slab_index_to_node(&self, slab_index: SlabIndex) -> &TreeNode<Key, CommitId, Value> {
&self.nodes[slab_index]
}

fn has_root(&self) -> bool {
!self.index_map.is_empty()
}

pub fn get_parent_commit_id(&self, node: &TreeNode<Key, CommitId, Value>) -> Option<CommitId> {
node.parent
.and_then(|p_slab_index| Some(self.nodes[p_slab_index].commit_id))
}
}

impl<Key: Eq + Hash + Clone, CommitId: Debug + Eq + Hash + Copy, Value: Clone>
Tree<Key, CommitId, Value>
{
pub fn add_node(
&mut self,
commit_id: CommitId,
parent_commit_id: Option<CommitId>,
modifications: Vec<(Key, Option<Value>, Option<CommitId>)>,
) -> Result<(), PendingError<CommitId>> {
// return error if Some(parent_commit_id) but parent_commit_id does not exist
let (parent_slab_index, parent_height) = if let Some(parent_commit_id) = parent_commit_id {
let p_slab_index = self.commit_id_to_slab_index(parent_commit_id)?;
let p_height = self.slab_index_to_node(p_slab_index).height;
(Some(p_slab_index), p_height)
} else {
// return error if want to add root but there has been a root
if self.has_root() {
return Err(PendingError::MultipleRootsNotAllowed);
}
(None, 0)
};
// return error if commit_id exists
if self.index_map.contains_key(&commit_id) {
return Err(PendingError::CommitIdAlreadyExists(commit_id));
}
let node = TreeNode::new(commit_id, parent_slab_index, parent_height, modifications);

let slab_index = self.nodes.insert(node);
self.index_map.insert(commit_id, slab_index);
if let Some(parent_slab_index) = parent_slab_index {
self.nodes[parent_slab_index].children.insert(slab_index);
}
Ok(())
}
}

impl<Key: Eq + Hash + Clone, CommitId: Debug + Eq + Hash + Copy, Value: Clone>
Tree<Key, CommitId, Value>
{
fn bfs_subtree(&self, subroot_slab_index: SlabIndex) -> Vec<SlabIndex> {
let mut slab_indices = vec![subroot_slab_index];
let mut head = 0;
while head < slab_indices.len() {
let node = self.slab_index_to_node(slab_indices[head]);

for &child_index in &node.children {
slab_indices.push(child_index);
}

head += 1;
}

slab_indices
}

fn find_path_nodes(&self, target_slab_index: SlabIndex) -> (Vec<CommitId>, HashSet<SlabIndex>) {
let mut target_node = self.slab_index_to_node(target_slab_index);
let mut path = Vec::new();
let mut set = HashSet::new();
while target_node.parent.is_some() {
let slab_index = target_node.parent.unwrap();
set.insert(slab_index);
target_node = self.slab_index_to_node(slab_index);
path.push(target_node.commit_id);
}
(path, set)
}

// todo: test
pub fn change_root(
&mut self,
commit_id: CommitId,
) -> Result<(Vec<CommitId>, Vec<CommitId>), PendingError<CommitId>> {
let slab_index = self.commit_id_to_slab_index(commit_id)?;

// (root)..=(new_root's parent)
let (to_commit_rev, to_commit_set) = self.find_path_nodes(slab_index);

// subtree of new_root
let to_maintain_vec = self.bfs_subtree(slab_index);
let to_maintain = BTreeSet::from_iter(to_maintain_vec.into_iter());

// tree - subtree of new_root - (root)..-(new_root's parent)
let mut to_remove_indices = Vec::new();
for (idx, _) in self.nodes.iter() {
if !to_maintain.contains(&idx) && !to_commit_set.contains(&idx) {
to_remove_indices.push(idx);
}
}
let mut to_remove = Vec::new();
for idx in to_remove_indices.into_iter() {
let to_remove_node = self.nodes.remove(idx);
self.index_map.remove(&to_remove_node.commit_id);
to_remove.push(to_remove_node.commit_id);
}

// set new_root's parent as None
self.nodes[slab_index].parent = None;

Ok((to_commit_rev, to_remove))
}
}

impl<Key: Eq + Hash + Clone, CommitId: Debug + Eq + Hash + Copy, Value: Clone>
Tree<Key, CommitId, Value>
{
pub fn find_path(
&self,
target_commit_id: CommitId,
) -> Result<HashMap<Key, (CommitId, Option<Value>)>, PendingError<CommitId>> {
let target_slab_index = self.commit_id_to_slab_index(target_commit_id)?;
let mut target_node = self.slab_index_to_node(target_slab_index);
let mut commits_rev = HashMap::new();
loop {
target_node.target_up(&mut commits_rev);
if target_node.parent.is_none() {
break;
}
target_node = self.slab_index_to_node(target_node.parent.unwrap());
}
Ok(commits_rev)
}

// correctness based on single root
pub fn lca(
&self,
current_commit_id: CommitId,
target_commit_id: CommitId,
) -> Result<
(
HashMap<Key, Option<CommitId>>,
HashMap<Key, (CommitId, Option<Value>)>,
),
PendingError<CommitId>,
> {
let current_slab_index = self.commit_id_to_slab_index(current_commit_id)?;
let target_slab_index = self.commit_id_to_slab_index(target_commit_id)?;
let mut current_node = self.slab_index_to_node(current_slab_index);
let mut target_node = self.slab_index_to_node(target_slab_index);
let mut rollbacks = HashMap::new();
let mut commits_rev = HashMap::new();
while current_node.height > target_node.height {
current_node.current_up(&mut rollbacks);
current_node = self.slab_index_to_node(current_node.parent.unwrap());
}
while target_node.height > current_node.height {
target_node.target_up(&mut commits_rev);
target_node = self.slab_index_to_node(target_node.parent.unwrap());
}
while current_node.commit_id != target_node.commit_id {
current_node.current_up(&mut rollbacks);
current_node = self.slab_index_to_node(current_node.parent.unwrap());
target_node.target_up(&mut commits_rev);
target_node = self.slab_index_to_node(target_node.parent.unwrap());
}
// check rollbacks' old_commit_id because TreeNodes are deleted
// in a lazy way with respect to TreeNodes.modifications
// todo: test this lazy method
for (_, old_commit_id_option) in rollbacks.iter_mut() {
if let Some(ref old_commit_id) = old_commit_id_option {
if !self.contains_commit_id(old_commit_id) {
*old_commit_id_option = None;
}
}
}
// rollbacks or commits_rev may be empty,
// they contain current and target (if they are not lca), respectively,
// but they do not contain lca
Ok((rollbacks, commits_rev))
}
}

impl<Key: Eq + Hash + Clone, CommitId: Debug + Eq + Hash + Copy, Value: Clone>
TreeNode<Key, CommitId, Value>
{
pub fn new(
commit_id: CommitId,
parent: Option<SlabIndex>,
parent_height: usize,
modifications: Vec<(Key, Option<Value>, Option<CommitId>)>,
) -> Self {
Self {
height: parent_height + 1,
commit_id,
parent,
children: BTreeSet::new(),
modifications,
}
}

pub fn get_commit_id(&self) -> CommitId {
self.commit_id
}

pub fn get_modifications(
&self,
) -> impl Iterator<Item = &(Key, Option<Value>, Option<CommitId>)> {
self.modifications.iter()
}

pub fn current_up(&self, rollbacks: &mut HashMap<Key, Option<CommitId>>) {
for (key, _, old_commit_id) in self.get_modifications() {
rollbacks.insert(key.clone(), *old_commit_id);
}
}

pub fn target_up(&self, commits_rev: &mut HashMap<Key, (CommitId, Option<Value>)>) {
let commit_id = self.commit_id;
for (key, value, _) in self.get_modifications() {
commits_rev
.entry(key.clone())
.or_insert_with(|| (commit_id, value.clone()));
}
}
}
Loading