From 4888f80c537c8d65d771abf16f384b743972ab2b Mon Sep 17 00:00:00 2001 From: Hugh Chern <60749105+alicorn0618@users.noreply.github.com> Date: Tue, 9 Jul 2024 14:56:08 +0800 Subject: [PATCH] refactor: partitioned_lock's elaboration (#1540) ## Rationale Extended the `try_new` interface while keeping the old one for compatibility. ## Detailed Changes * Implemented the `try_new_suggest_cap` method, while changing the old `try_new` method to `try_new_bit_len` to ensure compatibility. * Modified structs and functions that call old interfaces. ## Test Plan * Added new unit tests * Passed CI test --------- Co-authored-by: chunhao.ch --- src/components/object_store/src/disk_cache.rs | 4 +- src/components/object_store/src/mem_cache.rs | 2 +- src/components/partitioned_lock/src/lib.rs | 216 +++++++++++++++--- 3 files changed, 182 insertions(+), 40 deletions(-) diff --git a/src/components/object_store/src/disk_cache.rs b/src/components/object_store/src/disk_cache.rs index 981e6d0648..b0c7ba13ac 100644 --- a/src/components/object_store/src/disk_cache.rs +++ b/src/components/object_store/src/disk_cache.rs @@ -296,7 +296,7 @@ impl DiskCache { Ok(Self { root_dir, - meta_cache: Arc::new(PartitionedMutex::try_new( + meta_cache: Arc::new(PartitionedMutex::try_new_with_bit_len( init_lru, partition_bits, SeaHasherBuilder {}, @@ -545,7 +545,7 @@ impl DiskCacheStore { assert!(cap_per_part > 0); Ok(LruCache::new(cap_per_part)) }; - let meta_cache = PartitionedMutex::try_new( + let meta_cache = PartitionedMutex::try_new_with_bit_len( init_size_lru, FILE_SIZE_CACHE_PARTITION_BITS, SeaHasherBuilder, diff --git a/src/components/object_store/src/mem_cache.rs b/src/components/object_store/src/mem_cache.rs index 001be2ab8a..f602eee66e 100644 --- a/src/components/object_store/src/mem_cache.rs +++ b/src/components/object_store/src/mem_cache.rs @@ -81,7 +81,7 @@ impl MemCache { )) }; - let inner = PartitionedMutex::try_new( + let inner = PartitionedMutex::try_new_with_bit_len( init_lru, partition_bits, build_fixed_seed_ahasher_builder(), diff --git a/src/components/partitioned_lock/src/lib.rs b/src/components/partitioned_lock/src/lib.rs index de7ba3454e..22273b9709 100644 --- a/src/components/partitioned_lock/src/lib.rs +++ b/src/components/partitioned_lock/src/lib.rs @@ -36,20 +36,30 @@ impl PartitionedRwLock where B: BuildHasher, { - pub fn try_new(init_fn: F, partition_bit: usize, hash_builder: B) -> Result + /// New cache with capacity set to `2^bit_len` + pub fn try_new_with_bit_len( + init_fn: F, + partition_bit_len: usize, + hash_builder: B, + ) -> Result where F: Fn(usize) -> Result, { - let partition_num = 1 << partition_bit; - let partitions = (1..partition_num) - .map(|_| init_fn(partition_num).map(RwLock::new)) - .collect::>, E>>()?; + let partition_num = 1 << partition_bit_len; + PartitionedRwLock::try_new(init_fn, partition_num, hash_builder) + } - Ok(Self { - partitions, - partition_mask: partition_num - 1, - hash_builder, - }) + /// New cache with capacity round to `suggest_cap`'s power of 2 + pub fn try_new_with_suggest_cap( + init_fn: F, + suggest_cap: usize, + hash_builder: B, + ) -> Result + where + F: Fn(usize) -> Result, + { + let partition_num = suggest_cap.next_power_of_two(); + PartitionedRwLock::try_new(init_fn, partition_num, hash_builder) } pub fn read(&self, key: &K) -> RwLockReadGuard<'_, T> { @@ -68,6 +78,22 @@ where &self.partitions[(self.hash_builder.hash_one(key) as usize) & self.partition_mask] } + #[inline] + fn try_new(init_fn: F, partition_num: usize, hash_builder: B) -> Result + where + F: Fn(usize) -> Result, + { + let partitions = (0..partition_num) + .map(|_| init_fn(partition_num).map(RwLock::new)) + .collect::>, E>>()?; + + Ok(Self { + partitions, + partition_mask: partition_num - 1, + hash_builder, + }) + } + #[cfg(test)] fn get_partition_by_index(&self, index: usize) -> &RwLock { &self.partitions[index] @@ -89,20 +115,30 @@ impl PartitionedMutex where B: BuildHasher, { - pub fn try_new(init_fn: F, partition_bit: usize, hash_builder: B) -> Result + /// New cache with capacity set to `2^bit_len` + pub fn try_new_with_bit_len( + init_fn: F, + partition_bit_len: usize, + hash_builder: B, + ) -> Result where F: Fn(usize) -> Result, { - let partition_num = 1 << partition_bit; - let partitions = (0..partition_num) - .map(|_| init_fn(partition_num).map(Mutex::new)) - .collect::>, E>>()?; + let partition_num = 1 << partition_bit_len; + PartitionedMutex::try_new(init_fn, partition_num, hash_builder) + } - Ok(Self { - partitions, - partition_mask: partition_num - 1, - hash_builder, - }) + /// New cache with capacity round to `suggest_cap`'s power of 2 + pub fn try_new_with_suggest_cap( + init_fn: F, + suggest_cap: usize, + hash_builder: B, + ) -> Result + where + F: Fn(usize) -> Result, + { + let partition_num = suggest_cap.next_power_of_two(); + PartitionedMutex::try_new(init_fn, partition_num, hash_builder) } pub fn lock(&self, key: &K) -> MutexGuard<'_, T> { @@ -115,6 +151,22 @@ where &self.partitions[(self.hash_builder.hash_one(key) as usize) & self.partition_mask] } + #[inline] + fn try_new(init_fn: F, partition_num: usize, hash_builder: B) -> Result + where + F: Fn(usize) -> Result, + { + let partitions = (0..partition_num) + .map(|_| init_fn(partition_num).map(Mutex::new)) + .collect::>, E>>()?; + + Ok(Self { + partitions, + partition_mask: partition_num - 1, + hash_builder, + }) + } + #[cfg(test)] fn get_partition_by_index(&self, index: usize) -> &Mutex { &self.partitions[index] @@ -140,11 +192,43 @@ impl PartitionedMutexAsync where B: BuildHasher, { - pub fn try_new(init_fn: F, partition_bit: usize, hash_builder: B) -> Result + /// New cache with capacity set to `2^bit_len` + pub fn try_new_with_bit_len( + init_fn: F, + partition_bit_len: usize, + hash_builder: B, + ) -> Result + where + F: Fn(usize) -> Result, + { + let partition_num = 1 << partition_bit_len; + PartitionedMutexAsync::try_new(init_fn, partition_num, hash_builder) + } + + /// New cache with capacity round to `suggest_cap`'s power of 2 + pub fn try_new_with_suggest_cap( + init_fn: F, + suggest_cap: usize, + hash_builder: B, + ) -> Result + where + F: Fn(usize) -> Result, + { + let partition_num = suggest_cap.next_power_of_two(); + PartitionedMutexAsync::try_new(init_fn, partition_num, hash_builder) + } + + pub async fn lock(&self, key: &K) -> tokio::sync::MutexGuard<'_, T> { + let mutex = self.get_partition(key); + + mutex.lock().await + } + + #[inline] + fn try_new(init_fn: F, partition_num: usize, hash_builder: B) -> Result where F: Fn(usize) -> Result, { - let partition_num = 1 << partition_bit; let partitions = (0..partition_num) .map(|_| init_fn(partition_num).map(tokio::sync::Mutex::new)) .collect::>, E>>()?; @@ -156,12 +240,6 @@ where }) } - pub async fn lock(&self, key: &K) -> tokio::sync::MutexGuard<'_, T> { - let mutex = self.get_partition(key); - - mutex.lock().await - } - fn get_partition(&self, key: &K) -> &tokio::sync::Mutex { &self.partitions[(self.hash_builder.hash_one(key) as usize) & self.partition_mask] } @@ -181,11 +259,66 @@ mod tests { use super::*; + #[test] + fn test_new_equivalence() { + let init_42 = |_: usize| Ok::<_, ()>(42); + + let test_rwlock_42_bit_len = + PartitionedRwLock::try_new_with_bit_len(init_42, 4, build_fixed_seed_ahasher_builder()) + .unwrap(); + let test_rwlock_42_suggest_cap = PartitionedRwLock::try_new_with_suggest_cap( + init_42, + 13, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); + + let test_mutex_42_bit_len = + PartitionedMutex::try_new_with_bit_len(init_42, 4, build_fixed_seed_ahasher_builder()) + .unwrap(); + let test_mutex_42_suggest_cap = PartitionedMutex::try_new_with_suggest_cap( + init_42, + 16, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); + + let test_mutex_async_42_bit_len = PartitionedMutexAsync::try_new_with_bit_len( + init_42, + 4, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); + let test_mutex_async_42_suggest_cap = PartitionedMutexAsync::try_new_with_suggest_cap( + init_42, + 13, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); + + assert_eq!( + test_rwlock_42_bit_len.partition_mask, + test_rwlock_42_suggest_cap.partition_mask + ); + assert_eq!( + test_mutex_42_bit_len.partition_mask, + test_mutex_42_suggest_cap.partition_mask + ); + assert_eq!( + test_mutex_async_42_bit_len.partition_mask, + test_mutex_async_42_suggest_cap.partition_mask + ); + } + #[test] fn test_partitioned_rwlock() { let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new()); - let test_locked_map = - PartitionedRwLock::try_new(init_hmap, 4, build_fixed_seed_ahasher_builder()).unwrap(); + let test_locked_map = PartitionedRwLock::try_new_with_bit_len( + init_hmap, + 4, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); let test_key = "test_key".to_string(); let test_value = "test_value".to_string(); @@ -203,8 +336,12 @@ mod tests { #[test] fn test_partitioned_mutex() { let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new()); - let test_locked_map = - PartitionedMutex::try_new(init_hmap, 4, build_fixed_seed_ahasher_builder()).unwrap(); + let test_locked_map = PartitionedMutex::try_new_with_bit_len( + init_hmap, + 4, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); let test_key = "test_key".to_string(); let test_value = "test_value".to_string(); @@ -223,7 +360,7 @@ mod tests { async fn test_partitioned_mutex_async() { let init_hmap = |_: usize| Ok::<_, ()>(HashMap::new()); let test_locked_map = - PartitionedMutexAsync::try_new(init_hmap, 4, SeaHasherBuilder).unwrap(); + PartitionedMutexAsync::try_new_with_bit_len(init_hmap, 4, SeaHasherBuilder).unwrap(); let test_key = "test_key".to_string(); let test_value = "test_value".to_string(); @@ -242,7 +379,8 @@ mod tests { fn test_partitioned_mutex_vis_different_partition() { let init_vec = |_: usize| Ok::<_, ()>(Vec::::new()); let test_locked_map = - PartitionedMutex::try_new(init_vec, 4, build_fixed_seed_ahasher_builder()).unwrap(); + PartitionedMutex::try_new_with_bit_len(init_vec, 4, build_fixed_seed_ahasher_builder()) + .unwrap(); let mutex_first = test_locked_map.get_partition_by_index(0); let mut _tmp_data = mutex_first.lock().unwrap(); @@ -256,8 +394,12 @@ mod tests { #[test] fn test_partitioned_rwmutex_vis_different_partition() { let init_vec = |_: usize| Ok::<_, ()>(Vec::::new()); - let test_locked_map = - PartitionedRwLock::try_new(init_vec, 4, build_fixed_seed_ahasher_builder()).unwrap(); + let test_locked_map = PartitionedRwLock::try_new_with_bit_len( + init_vec, + 4, + build_fixed_seed_ahasher_builder(), + ) + .unwrap(); let mutex_first = test_locked_map.get_partition_by_index(0); let mut _tmp = mutex_first.write().unwrap(); assert!(mutex_first.try_write().is_err()); @@ -271,7 +413,7 @@ mod tests { async fn test_partitioned_mutex_async_vis_different_partition() { let init_vec = |_: usize| Ok::<_, ()>(Vec::::new()); let test_locked_map = - PartitionedMutexAsync::try_new(init_vec, 4, SeaHasherBuilder).unwrap(); + PartitionedMutexAsync::try_new_with_bit_len(init_vec, 4, SeaHasherBuilder).unwrap(); let mutex_first = test_locked_map.get_partition_by_index(0).await; let mut _tmp_data = mutex_first.lock().await;