Skip to content

Commit

Permalink
refactor: partitioned_lock's elaboration (#1540)
Browse files Browse the repository at this point in the history
## Rationale
Extended the `try_new` interface while keeping the old one for
compatibility.

## Detailed Changes
* Implemented the `try_new_suggest_cap` method, while changing the old
`try_new` method to `try_new_bit_len` to ensure compatibility.
* Modified structs and functions that call old interfaces.

## Test Plan
* Added new unit tests
* Passed CI test

---------

Co-authored-by: chunhao.ch <[email protected]>
  • Loading branch information
alicorn0618 and chunhao.ch authored Jul 9, 2024
1 parent a1869dc commit 4888f80
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 40 deletions.
4 changes: 2 additions & 2 deletions src/components/object_store/src/disk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl DiskCache {

Ok(Self {
root_dir,
meta_cache: Arc::new(PartitionedMutex::try_new(
meta_cache: Arc::new(PartitionedMutex::try_new_with_bit_len(
init_lru,
partition_bits,
SeaHasherBuilder {},
Expand Down Expand Up @@ -545,7 +545,7 @@ impl DiskCacheStore {
assert!(cap_per_part > 0);
Ok(LruCache::new(cap_per_part))
};
let meta_cache = PartitionedMutex::try_new(
let meta_cache = PartitionedMutex::try_new_with_bit_len(
init_size_lru,
FILE_SIZE_CACHE_PARTITION_BITS,
SeaHasherBuilder,
Expand Down
2 changes: 1 addition & 1 deletion src/components/object_store/src/mem_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl MemCache {
))
};

let inner = PartitionedMutex::try_new(
let inner = PartitionedMutex::try_new_with_bit_len(
init_lru,
partition_bits,
build_fixed_seed_ahasher_builder(),
Expand Down
216 changes: 179 additions & 37 deletions src/components/partitioned_lock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,30 @@ impl<T, B> PartitionedRwLock<T, B>
where
B: BuildHasher,
{
pub fn try_new<F, E>(init_fn: F, partition_bit: usize, hash_builder: B) -> Result<Self, E>
/// New cache with capacity set to `2^bit_len`
pub fn try_new_with_bit_len<F, E>(
init_fn: F,
partition_bit_len: usize,
hash_builder: B,
) -> Result<Self, E>
where
F: Fn(usize) -> Result<T, E>,
{
let partition_num = 1 << partition_bit;
let partitions = (1..partition_num)
.map(|_| init_fn(partition_num).map(RwLock::new))
.collect::<Result<Vec<RwLock<T>>, E>>()?;
let partition_num = 1 << partition_bit_len;
PartitionedRwLock::try_new(init_fn, partition_num, hash_builder)
}

Ok(Self {
partitions,
partition_mask: partition_num - 1,
hash_builder,
})
/// New cache with capacity round to `suggest_cap`'s power of 2
pub fn try_new_with_suggest_cap<F, E>(
init_fn: F,
suggest_cap: usize,
hash_builder: B,
) -> Result<Self, E>
where
F: Fn(usize) -> Result<T, E>,
{
let partition_num = suggest_cap.next_power_of_two();
PartitionedRwLock::try_new(init_fn, partition_num, hash_builder)
}

pub fn read<K: Eq + Hash>(&self, key: &K) -> RwLockReadGuard<'_, T> {
Expand All @@ -68,6 +78,22 @@ where
&self.partitions[(self.hash_builder.hash_one(key) as usize) & self.partition_mask]
}

#[inline]
fn try_new<F, E>(init_fn: F, partition_num: usize, hash_builder: B) -> Result<Self, E>
where
F: Fn(usize) -> Result<T, E>,
{
let partitions = (0..partition_num)
.map(|_| init_fn(partition_num).map(RwLock::new))
.collect::<Result<Vec<RwLock<T>>, E>>()?;

Ok(Self {
partitions,
partition_mask: partition_num - 1,
hash_builder,
})
}

#[cfg(test)]
fn get_partition_by_index(&self, index: usize) -> &RwLock<T> {
&self.partitions[index]
Expand All @@ -89,20 +115,30 @@ impl<T, B> PartitionedMutex<T, B>
where
B: BuildHasher,
{
pub fn try_new<F, E>(init_fn: F, partition_bit: usize, hash_builder: B) -> Result<Self, E>
/// New cache with capacity set to `2^bit_len`
pub fn try_new_with_bit_len<F, E>(
init_fn: F,
partition_bit_len: usize,
hash_builder: B,
) -> Result<Self, E>
where
F: Fn(usize) -> Result<T, E>,
{
let partition_num = 1 << partition_bit;
let partitions = (0..partition_num)
.map(|_| init_fn(partition_num).map(Mutex::new))
.collect::<Result<Vec<Mutex<T>>, E>>()?;
let partition_num = 1 << partition_bit_len;
PartitionedMutex::try_new(init_fn, partition_num, hash_builder)
}

Ok(Self {
partitions,
partition_mask: partition_num - 1,
hash_builder,
})
/// New cache with capacity round to `suggest_cap`'s power of 2
pub fn try_new_with_suggest_cap<F, E>(
init_fn: F,
suggest_cap: usize,
hash_builder: B,
) -> Result<Self, E>
where
F: Fn(usize) -> Result<T, E>,
{
let partition_num = suggest_cap.next_power_of_two();
PartitionedMutex::try_new(init_fn, partition_num, hash_builder)
}

pub fn lock<K: Eq + Hash>(&self, key: &K) -> MutexGuard<'_, T> {
Expand All @@ -115,6 +151,22 @@ where
&self.partitions[(self.hash_builder.hash_one(key) as usize) & self.partition_mask]
}

#[inline]
fn try_new<F, E>(init_fn: F, partition_num: usize, hash_builder: B) -> Result<Self, E>
where
F: Fn(usize) -> Result<T, E>,
{
let partitions = (0..partition_num)
.map(|_| init_fn(partition_num).map(Mutex::new))
.collect::<Result<Vec<Mutex<T>>, E>>()?;

Ok(Self {
partitions,
partition_mask: partition_num - 1,
hash_builder,
})
}

#[cfg(test)]
fn get_partition_by_index(&self, index: usize) -> &Mutex<T> {
&self.partitions[index]
Expand All @@ -140,11 +192,43 @@ impl<T, B> PartitionedMutexAsync<T, B>
where
B: BuildHasher,
{
pub fn try_new<F, E>(init_fn: F, partition_bit: usize, hash_builder: B) -> Result<Self, E>
/// New cache with capacity set to `2^bit_len`
pub fn try_new_with_bit_len<F, E>(
init_fn: F,
partition_bit_len: usize,
hash_builder: B,
) -> Result<Self, E>
where
F: Fn(usize) -> Result<T, E>,
{
let partition_num = 1 << partition_bit_len;
PartitionedMutexAsync::try_new(init_fn, partition_num, hash_builder)
}

/// New cache with capacity round to `suggest_cap`'s power of 2
pub fn try_new_with_suggest_cap<F, E>(
init_fn: F,
suggest_cap: usize,
hash_builder: B,
) -> Result<Self, E>
where
F: Fn(usize) -> Result<T, E>,
{
let partition_num = suggest_cap.next_power_of_two();
PartitionedMutexAsync::try_new(init_fn, partition_num, hash_builder)
}

pub async fn lock<K: Eq + Hash>(&self, key: &K) -> tokio::sync::MutexGuard<'_, T> {
let mutex = self.get_partition(key);

mutex.lock().await
}

#[inline]
fn try_new<F, E>(init_fn: F, partition_num: usize, hash_builder: B) -> Result<Self, E>
where
F: Fn(usize) -> Result<T, E>,
{
let partition_num = 1 << partition_bit;
let partitions = (0..partition_num)
.map(|_| init_fn(partition_num).map(tokio::sync::Mutex::new))
.collect::<Result<Vec<tokio::sync::Mutex<T>>, E>>()?;
Expand All @@ -156,12 +240,6 @@ where
})
}

pub async fn lock<K: Eq + Hash>(&self, key: &K) -> tokio::sync::MutexGuard<'_, T> {
let mutex = self.get_partition(key);

mutex.lock().await
}

fn get_partition<K: Eq + Hash>(&self, key: &K) -> &tokio::sync::Mutex<T> {
&self.partitions[(self.hash_builder.hash_one(key) as usize) & self.partition_mask]
}
Expand All @@ -181,11 +259,66 @@ mod tests {

use super::*;

#[test]
fn test_new_equivalence() {
let init_42 = |_: usize| Ok::<_, ()>(42);

let test_rwlock_42_bit_len =
PartitionedRwLock::try_new_with_bit_len(init_42, 4, build_fixed_seed_ahasher_builder())
.unwrap();
let test_rwlock_42_suggest_cap = PartitionedRwLock::try_new_with_suggest_cap(
init_42,
13,
build_fixed_seed_ahasher_builder(),
)
.unwrap();

let test_mutex_42_bit_len =
PartitionedMutex::try_new_with_bit_len(init_42, 4, build_fixed_seed_ahasher_builder())
.unwrap();
let test_mutex_42_suggest_cap = PartitionedMutex::try_new_with_suggest_cap(
init_42,
16,
build_fixed_seed_ahasher_builder(),
)
.unwrap();

let test_mutex_async_42_bit_len = PartitionedMutexAsync::try_new_with_bit_len(
init_42,
4,
build_fixed_seed_ahasher_builder(),
)
.unwrap();
let test_mutex_async_42_suggest_cap = PartitionedMutexAsync::try_new_with_suggest_cap(
init_42,
13,
build_fixed_seed_ahasher_builder(),
)
.unwrap();

assert_eq!(
test_rwlock_42_bit_len.partition_mask,
test_rwlock_42_suggest_cap.partition_mask
);
assert_eq!(
test_mutex_42_bit_len.partition_mask,
test_mutex_42_suggest_cap.partition_mask
);
assert_eq!(
test_mutex_async_42_bit_len.partition_mask,
test_mutex_async_42_suggest_cap.partition_mask
);
}

#[test]
fn test_partitioned_rwlock() {
let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new());
let test_locked_map =
PartitionedRwLock::try_new(init_hmap, 4, build_fixed_seed_ahasher_builder()).unwrap();
let test_locked_map = PartitionedRwLock::try_new_with_bit_len(
init_hmap,
4,
build_fixed_seed_ahasher_builder(),
)
.unwrap();
let test_key = "test_key".to_string();
let test_value = "test_value".to_string();

Expand All @@ -203,8 +336,12 @@ mod tests {
#[test]
fn test_partitioned_mutex() {
let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new());
let test_locked_map =
PartitionedMutex::try_new(init_hmap, 4, build_fixed_seed_ahasher_builder()).unwrap();
let test_locked_map = PartitionedMutex::try_new_with_bit_len(
init_hmap,
4,
build_fixed_seed_ahasher_builder(),
)
.unwrap();
let test_key = "test_key".to_string();
let test_value = "test_value".to_string();

Expand All @@ -223,7 +360,7 @@ mod tests {
async fn test_partitioned_mutex_async() {
let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new());
let test_locked_map =
PartitionedMutexAsync::try_new(init_hmap, 4, SeaHasherBuilder).unwrap();
PartitionedMutexAsync::try_new_with_bit_len(init_hmap, 4, SeaHasherBuilder).unwrap();
let test_key = "test_key".to_string();
let test_value = "test_value".to_string();

Expand All @@ -242,7 +379,8 @@ mod tests {
fn test_partitioned_mutex_vis_different_partition() {
let init_vec = |_: usize| Ok::<_, ()>(Vec::<i32>::new());
let test_locked_map =
PartitionedMutex::try_new(init_vec, 4, build_fixed_seed_ahasher_builder()).unwrap();
PartitionedMutex::try_new_with_bit_len(init_vec, 4, build_fixed_seed_ahasher_builder())
.unwrap();
let mutex_first = test_locked_map.get_partition_by_index(0);

let mut _tmp_data = mutex_first.lock().unwrap();
Expand All @@ -256,8 +394,12 @@ mod tests {
#[test]
fn test_partitioned_rwmutex_vis_different_partition() {
let init_vec = |_: usize| Ok::<_, ()>(Vec::<i32>::new());
let test_locked_map =
PartitionedRwLock::try_new(init_vec, 4, build_fixed_seed_ahasher_builder()).unwrap();
let test_locked_map = PartitionedRwLock::try_new_with_bit_len(
init_vec,
4,
build_fixed_seed_ahasher_builder(),
)
.unwrap();
let mutex_first = test_locked_map.get_partition_by_index(0);
let mut _tmp = mutex_first.write().unwrap();
assert!(mutex_first.try_write().is_err());
Expand All @@ -271,7 +413,7 @@ mod tests {
async fn test_partitioned_mutex_async_vis_different_partition() {
let init_vec = |_: usize| Ok::<_, ()>(Vec::<i32>::new());
let test_locked_map =
PartitionedMutexAsync::try_new(init_vec, 4, SeaHasherBuilder).unwrap();
PartitionedMutexAsync::try_new_with_bit_len(init_vec, 4, SeaHasherBuilder).unwrap();
let mutex_first = test_locked_map.get_partition_by_index(0).await;

let mut _tmp_data = mutex_first.lock().await;
Expand Down

0 comments on commit 4888f80

Please sign in to comment.