diff --git a/src/common.rs b/src/common.rs index 048963fe..9748e86f 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + pub(crate) mod builder_utils; pub(crate) mod concurrent; pub(crate) mod deque; @@ -10,6 +12,11 @@ pub(crate) mod timer_wheel; #[cfg(test)] pub(crate) mod test_utils; +use self::concurrent::constants::{ + DEFAULT_EVICTION_BATCH_SIZE, DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS, + DEFAULT_MAX_LOG_SYNC_REPEATS, +}; + // Note: `CacheRegion` cannot have more than four enum variants. This is because // `crate::{sync,unsync}::DeqNodes` uses a `tagptr::TagNonNull, 2>` // pointer, where the 2-bit tag is `CacheRegion`. @@ -56,6 +63,54 @@ impl PartialEq for CacheRegion { } } +#[derive(Clone, Debug)] +pub(crate) struct HousekeeperConfig { + /// The timeout duration for the `run_pending_tasks` method. This is a safe-guard + /// to prevent cache read/write operations (that may call `run_pending_tasks` + /// internally) from being blocked for a long time when the user wrote a slow + /// eviction listener closure. + /// + /// Used only when the eviction listener closure is set for the cache instance. + /// + /// Default: `DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS` + pub(crate) maintenance_task_timeout: Duration, + /// The maximum repeat count for receiving operation logs from the read and write + /// log channels. Default: `MAX_LOG_SYNC_REPEATS`. + pub(crate) max_log_sync_repeats: usize, + /// The batch size of entries to be processed by each internal eviction method. + /// Default: `EVICTION_BATCH_SIZE`. + pub(crate) eviction_batch_size: usize, +} + +impl Default for HousekeeperConfig { + fn default() -> Self { + Self { + maintenance_task_timeout: Duration::from_millis( + DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS, + ), + max_log_sync_repeats: DEFAULT_MAX_LOG_SYNC_REPEATS, + eviction_batch_size: DEFAULT_EVICTION_BATCH_SIZE, + } + } +} + +impl HousekeeperConfig { + #[cfg(test)] + pub(crate) fn new( + maintenance_task_timeout: Option, + max_log_sync_repeats: Option, + eviction_batch_size: Option, + ) -> Self { + Self { + maintenance_task_timeout: maintenance_task_timeout.unwrap_or(Duration::from_millis( + DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS, + )), + max_log_sync_repeats: max_log_sync_repeats.unwrap_or(DEFAULT_MAX_LOG_SYNC_REPEATS), + eviction_batch_size: eviction_batch_size.unwrap_or(DEFAULT_EVICTION_BATCH_SIZE), + } + } +} + // Ensures the value fits in a range of `128u32..=u32::MAX`. pub(crate) fn sketch_capacity(max_capacity: u64) -> u32 { max_capacity.try_into().unwrap_or(u32::MAX).max(128) diff --git a/src/common/concurrent/constants.rs b/src/common/concurrent/constants.rs index 242b10cf..cc3b094f 100644 --- a/src/common/concurrent/constants.rs +++ b/src/common/concurrent/constants.rs @@ -1,19 +1,23 @@ -pub(crate) const MAX_SYNC_REPEATS: usize = 4; -pub(crate) const PERIODICAL_SYNC_INITIAL_DELAY_MILLIS: u64 = 300; +pub(crate) const DEFAULT_MAX_LOG_SYNC_REPEATS: usize = 4; +pub(crate) const LOG_SYNC_INTERVAL_MILLIS: u64 = 300; pub(crate) const READ_LOG_FLUSH_POINT: usize = 64; -pub(crate) const READ_LOG_SIZE: usize = READ_LOG_FLUSH_POINT * (MAX_SYNC_REPEATS + 2); // 384 - pub(crate) const WRITE_LOG_FLUSH_POINT: usize = 64; -pub(crate) const WRITE_LOG_SIZE: usize = WRITE_LOG_FLUSH_POINT * (MAX_SYNC_REPEATS + 2); // 384 + +// 384 elements +pub(crate) const READ_LOG_CH_SIZE: usize = + READ_LOG_FLUSH_POINT * (DEFAULT_MAX_LOG_SYNC_REPEATS + 2); + +// 384 elements +pub(crate) const WRITE_LOG_CH_SIZE: usize = + WRITE_LOG_FLUSH_POINT * (DEFAULT_MAX_LOG_SYNC_REPEATS + 2); // TODO: Calculate the batch size based on the number of entries in the cache (or an // estimated number of entries to evict) -pub(crate) const EVICTION_BATCH_SIZE: usize = WRITE_LOG_SIZE; -pub(crate) const INVALIDATION_BATCH_SIZE: usize = WRITE_LOG_SIZE; +pub(crate) const DEFAULT_EVICTION_BATCH_SIZE: usize = WRITE_LOG_CH_SIZE; /// The default timeout duration for the `run_pending_tasks` method. -pub(crate) const DEFAULT_RUN_PENDING_TASKS_TIMEOUT_MILLIS: u64 = 100; +pub(crate) const DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS: u64 = 100; #[cfg(feature = "sync")] pub(crate) const WRITE_RETRY_INTERVAL_MICROS: u64 = 50; diff --git a/src/common/concurrent/housekeeper.rs b/src/common/concurrent/housekeeper.rs index 2ded85f7..706e34ad 100644 --- a/src/common/concurrent/housekeeper.rs +++ b/src/common/concurrent/housekeeper.rs @@ -1,13 +1,11 @@ -use super::constants::{ - DEFAULT_RUN_PENDING_TASKS_TIMEOUT_MILLIS, MAX_SYNC_REPEATS, - PERIODICAL_SYNC_INITIAL_DELAY_MILLIS, -}; +use super::constants::LOG_SYNC_INTERVAL_MILLIS; use super::{ atomic_time::AtomicInstant, constants::{READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT}, }; use crate::common::time::{CheckedTimeOps, Instant}; +use crate::common::HousekeeperConfig; use parking_lot::{Mutex, MutexGuard}; use std::{ @@ -17,7 +15,12 @@ use std::{ pub(crate) trait InnerSync { /// Runs the pending tasks. Returns `true` if there are more entries to evict. - fn run_pending_tasks(&self, max_sync_repeats: usize, timeout: Option) -> bool; + fn run_pending_tasks( + &self, + timeout: Option, + max_log_sync_repeats: usize, + eviction_batch_size: usize, + ) -> bool; fn now(&self) -> Instant; } @@ -25,11 +28,11 @@ pub(crate) trait InnerSync { pub(crate) struct Housekeeper { run_lock: Mutex<()>, run_after: AtomicInstant, - /// A flag to indicate if the last `run_pending_tasks` call left more entries to - /// evict. + /// A flag to indicate if the last call on `run_pending_tasks` method left some + /// entries to evict. /// /// Used only when the eviction listener closure is set for this cache instance - /// because, if not, `run_pending_tasks` will never leave more entries to evict. + /// because, if not, `run_pending_tasks` will never leave entries to evict. more_entries_to_evict: Option, /// The timeout duration for the `run_pending_tasks` method. This is a safe-guard /// to prevent cache read/write operations (that may call `run_pending_tasks` @@ -38,17 +41,21 @@ pub(crate) struct Housekeeper { /// /// Used only when the eviction listener closure is set for this cache instance. maintenance_task_timeout: Option, + /// The maximum repeat count for receiving operation logs from the read and write + /// log channels. Default: `MAX_LOG_SYNC_REPEATS`. + max_log_sync_repeats: usize, + /// The batch size of entries to be processed by each internal eviction method. + /// Default: `EVICTION_BATCH_SIZE`. + eviction_batch_size: usize, auto_run_enabled: AtomicBool, } impl Housekeeper { - pub(crate) fn new(is_eviction_listener_enabled: bool) -> Self { + pub(crate) fn new(is_eviction_listener_enabled: bool, config: HousekeeperConfig) -> Self { let (more_entries_to_evict, maintenance_task_timeout) = if is_eviction_listener_enabled { ( Some(AtomicBool::new(false)), - Some(Duration::from_millis( - DEFAULT_RUN_PENDING_TASKS_TIMEOUT_MILLIS, - )), + Some(config.maintenance_task_timeout), ) } else { (None, None) @@ -59,6 +66,8 @@ impl Housekeeper { run_after: AtomicInstant::new(Self::sync_after(Instant::now())), more_entries_to_evict, maintenance_task_timeout, + max_log_sync_repeats: config.max_log_sync_repeats, + eviction_batch_size: config.eviction_batch_size, auto_run_enabled: AtomicBool::new(true), } } @@ -109,12 +118,14 @@ impl Housekeeper { let now = cache.now(); self.run_after.set_instant(Self::sync_after(now)); let timeout = self.maintenance_task_timeout; - let more_to_evict = cache.run_pending_tasks(MAX_SYNC_REPEATS, timeout); + let repeats = self.max_log_sync_repeats; + let batch_size = self.eviction_batch_size; + let more_to_evict = cache.run_pending_tasks(timeout, repeats, batch_size); self.set_more_entries_to_evict(more_to_evict); } fn sync_after(now: Instant) -> Instant { - let dur = Duration::from_millis(PERIODICAL_SYNC_INITIAL_DELAY_MILLIS); + let dur = Duration::from_millis(LOG_SYNC_INTERVAL_MILLIS); let ts = now.checked_add(dur); // Assuming that `now` is current wall clock time, this should never fail at // least next millions of years. diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index 30b244bf..57e8d5d6 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -12,8 +12,7 @@ use crate::{ concurrent::{ atomic_time::AtomicInstant, constants::{ - EVICTION_BATCH_SIZE, INVALIDATION_BATCH_SIZE, READ_LOG_FLUSH_POINT, READ_LOG_SIZE, - WRITE_LOG_FLUSH_POINT, WRITE_LOG_SIZE, + READ_LOG_CH_SIZE, READ_LOG_FLUSH_POINT, WRITE_LOG_CH_SIZE, WRITE_LOG_FLUSH_POINT, }, deques::Deques, entry_info::EntryInfo, @@ -24,7 +23,7 @@ use crate::{ frequency_sketch::FrequencySketch, time::{CheckedTimeOps, Clock, Instant}, timer_wheel::{ReschedulingResult, TimerWheel}, - CacheRegion, + CacheRegion, HousekeeperConfig, }, future::CancelGuard, notification::{AsyncEvictionListener, RemovalCause}, @@ -171,12 +170,13 @@ where eviction_policy: EvictionPolicy, eviction_listener: Option>, expiration_policy: ExpirationPolicy, + housekeeper_config: HousekeeperConfig, invalidator_enabled: bool, ) -> Self { let (r_size, w_size) = if max_capacity == Some(0) { (0, 0) } else { - (READ_LOG_SIZE, WRITE_LOG_SIZE) + (READ_LOG_CH_SIZE, WRITE_LOG_CH_SIZE) }; let is_eviction_listener_enabled = eviction_listener.is_some(); @@ -204,7 +204,10 @@ where write_op_ch: w_snd, interrupted_op_ch_snd: i_snd, interrupted_op_ch_rcv: i_rcv, - housekeeper: Some(Arc::new(Housekeeper::new(is_eviction_listener_enabled))), + housekeeper: Some(Arc::new(Housekeeper::new( + is_eviction_listener_enabled, + housekeeper_config, + ))), } } @@ -1211,7 +1214,7 @@ where (1, 0) } else { let ic = initial_capacity - .map(|cap| cap + WRITE_LOG_SIZE) + .map(|cap| cap + WRITE_LOG_CH_SIZE) .unwrap_or_default(); (64, ic) }; @@ -1398,8 +1401,14 @@ where S: BuildHasher + Clone + Send + Sync + 'static, { /// Runs the pending tasks. Returns `true` if there are more entries to evict. - async fn run_pending_tasks(&self, max_repeats: usize, timeout: Option) -> bool { - self.do_run_pending_tasks(max_repeats, timeout).await + async fn run_pending_tasks( + &self, + timeout: Option, + max_log_sync_repeats: usize, + eviction_batch_size: usize, + ) -> bool { + self.do_run_pending_tasks(timeout, max_log_sync_repeats, eviction_batch_size) + .await } /// Notifies all the async tasks waiting in `BaseCache::schedule_write_op` method @@ -1420,7 +1429,12 @@ where S: BuildHasher + Clone + Send + Sync + 'static, { /// Runs the pending tasks. Returns `true` if there are more entries to evict. - async fn do_run_pending_tasks(&self, max_repeats: usize, timeout: Option) -> bool { + async fn do_run_pending_tasks( + &self, + timeout: Option, + max_log_sync_repeats: usize, + eviction_batch_size: usize, + ) -> bool { if self.max_capacity == Some(0) { return false; } @@ -1464,7 +1478,7 @@ where // method for the write op channel to have enough room, notify them. let listeners = self.write_op_ch_ready_event.total_listeners(); if listeners > 0 { - let n = listeners.min(WRITE_LOG_SIZE - self.write_op_ch.len()); + let n = listeners.min(WRITE_LOG_CH_SIZE - self.write_op_ch.len()); // Notify the `n` listeners. The `notify` method accepts 0, so no // need to check if `n` is greater than 0. self.write_op_ch_ready_event.notify(n); @@ -1492,7 +1506,7 @@ where self.evict_expired_entries_using_deqs( &mut deqs, &mut timer_wheel, - EVICTION_BATCH_SIZE, + eviction_batch_size, &mut eviction_state, ) .await; @@ -1506,7 +1520,7 @@ where invalidator, &mut deqs, &mut timer_wheel, - INVALIDATION_BATCH_SIZE, + eviction_batch_size, &mut eviction_state, ) .await; @@ -1519,7 +1533,7 @@ where self.evict_lru_entries( &mut deqs, &mut timer_wheel, - EVICTION_BATCH_SIZE, + eviction_batch_size, weights_to_evict, &mut eviction_state, ) @@ -1528,7 +1542,7 @@ where // Check whether to continue this loop or not. - let should_process_logs = calls <= max_repeats + let should_process_logs = calls <= max_log_sync_repeats && (self.read_op_ch.len() >= READ_LOG_FLUSH_POINT || self.write_op_ch.len() >= WRITE_LOG_FLUSH_POINT); @@ -2840,7 +2854,10 @@ fn is_expired_by_ttl( #[cfg(test)] mod tests { - use crate::policy::{EvictionPolicy, ExpirationPolicy}; + use crate::{ + common::HousekeeperConfig, + policy::{EvictionPolicy, ExpirationPolicy}, + }; use super::BaseCache; @@ -2862,6 +2879,7 @@ mod tests { EvictionPolicy::default(), None, ExpirationPolicy::default(), + HousekeeperConfig::default(), false, ); cache.inner.enable_frequency_sketch_for_testing().await; @@ -2949,19 +2967,19 @@ mod tests { ($cache:ident, $key:ident, $hash:ident, $mock:ident, $duration_secs:expr) => { // Increment the time. $mock.increment(Duration::from_millis($duration_secs * 1000 - 1)); - $cache.inner.do_run_pending_tasks(1, None).await; + $cache.inner.do_run_pending_tasks(None, 1, 10).await; assert!($cache.contains_key_with_hash(&$key, $hash)); assert_eq!($cache.entry_count(), 1); // Increment the time by 1ms (3). The entry should be expired. $mock.increment(Duration::from_millis(1)); - $cache.inner.do_run_pending_tasks(1, None).await; + $cache.inner.do_run_pending_tasks(None, 1, 10).await; assert!(!$cache.contains_key_with_hash(&$key, $hash)); // Increment the time again to ensure the entry has been evicted from the // cache. $mock.increment(Duration::from_secs(1)); - $cache.inner.do_run_pending_tasks(1, None).await; + $cache.inner.do_run_pending_tasks(None, 1, 10).await; assert_eq!($cache.entry_count(), 0); }; } @@ -3215,6 +3233,7 @@ mod tests { Some(Duration::from_secs(TTI)), expiry, ), + HousekeeperConfig::default(), false, ); cache.reconfigure_for_testing().await; @@ -3245,7 +3264,7 @@ mod tests { insert(&cache, key, hash, value).await; // Run a sync to register the entry to the internal data structures including // the timer wheel. - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert_eq!(cache.entry_count(), 1); assert_expiry!(cache, key, hash, mock, 1); @@ -3267,12 +3286,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None); let inserted_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(1)); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert!(cache.contains_key_with_hash(&key, hash)); // Read the entry (2). @@ -3292,7 +3311,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert_expiry!(cache, key, hash, mock, 3); @@ -3314,12 +3333,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None); let inserted_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(1)); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert!(cache.contains_key_with_hash(&key, hash)); // Read the entry (2). @@ -3339,11 +3358,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; // Increment the time. mock.increment(Duration::from_secs(2)); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3358,7 +3377,7 @@ mod tests { Some(3), ); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert_eq!(cache.entry_count(), 1); assert_expiry!(cache, key, hash, mock, 3); @@ -3381,12 +3400,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None); let inserted_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(1)); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3407,11 +3426,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; // Increment the time. mock.increment(Duration::from_secs(2)); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3426,7 +3445,7 @@ mod tests { None, ); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert_eq!(cache.entry_count(), 1); assert_expiry!(cache, key, hash, mock, 7); @@ -3448,12 +3467,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8)); let inserted_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(5)); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3474,7 +3493,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert_expiry!(cache, key, hash, mock, 7); @@ -3496,12 +3515,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8)); let inserted_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(5)); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3522,11 +3541,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3547,7 +3566,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert_expiry!(cache, key, hash, mock, 5); @@ -3568,12 +3587,12 @@ mod tests { *expectation.lock().unwrap() = ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(9)); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3589,12 +3608,12 @@ mod tests { ); let updated_at = current_time(&cache); insert(&cache, key, hash, value).await; - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3615,11 +3634,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3640,7 +3659,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.do_run_pending_tasks(1, None).await; + cache.inner.do_run_pending_tasks(None, 1, 10).await; assert_expiry!(cache, key, hash, mock, 4); } diff --git a/src/future/builder.rs b/src/future/builder.rs index 7c6943de..6a99c428 100644 --- a/src/future/builder.rs +++ b/src/future/builder.rs @@ -1,6 +1,6 @@ use super::{Cache, FutureExt}; use crate::{ - common::{builder_utils, concurrent::Weigher}, + common::{builder_utils, concurrent::Weigher, HousekeeperConfig}, notification::{AsyncEvictionListener, ListenerFuture, RemovalCause}, policy::{EvictionPolicy, ExpirationPolicy}, Expiry, @@ -63,6 +63,7 @@ pub struct CacheBuilder { eviction_policy: EvictionPolicy, eviction_listener: Option>, expiration_policy: ExpirationPolicy, + housekeeper_config: HousekeeperConfig, invalidator_enabled: bool, cache_type: PhantomData, } @@ -81,6 +82,7 @@ where eviction_policy: EvictionPolicy::default(), eviction_listener: None, expiration_policy: ExpirationPolicy::default(), + housekeeper_config: HousekeeperConfig::default(), invalidator_enabled: false, cache_type: PhantomData, } @@ -97,7 +99,7 @@ where pub fn new(max_capacity: u64) -> Self { Self { max_capacity: Some(max_capacity), - ..Default::default() + ..Self::default() } } @@ -121,6 +123,7 @@ where self.eviction_policy, self.eviction_listener, self.expiration_policy, + self.housekeeper_config, self.invalidator_enabled, ) } @@ -218,6 +221,7 @@ where self.eviction_policy, self.eviction_listener, self.expiration_policy, + self.housekeeper_config, self.invalidator_enabled, ) } @@ -394,6 +398,14 @@ impl CacheBuilder { builder } + #[cfg(test)] + pub(crate) fn housekeeper_config(self, conf: HousekeeperConfig) -> Self { + Self { + housekeeper_config: conf, + ..self + } + } + /// Enables support for [`Cache::invalidate_entries_if`][cache-invalidate-if] /// method. /// diff --git a/src/future/cache.rs b/src/future/cache.rs index 40112f88..812e26fb 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -5,7 +5,7 @@ use super::{ WriteOp, }; use crate::{ - common::concurrent::Weigher, + common::{concurrent::Weigher, HousekeeperConfig}, notification::AsyncEvictionListener, ops::compute::{self, CompResult}, policy::{EvictionPolicy, ExpirationPolicy}, @@ -789,6 +789,7 @@ where EvictionPolicy::default(), None, ExpirationPolicy::default(), + HousekeeperConfig::default(), false, ) } @@ -819,6 +820,7 @@ where eviction_policy: EvictionPolicy, eviction_listener: Option>, expiration_policy: ExpirationPolicy, + housekeeper_config: HousekeeperConfig, invalidator_enabled: bool, ) -> Self { Self { @@ -831,6 +833,7 @@ where eviction_policy, eviction_listener, expiration_policy, + housekeeper_config, invalidator_enabled, ), value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)), @@ -2113,7 +2116,7 @@ fn never_ignore<'a, V>() -> Option<&'a mut fn(&V) -> bool> { mod tests { use super::Cache; use crate::{ - common::time::Clock, + common::{time::Clock, HousekeeperConfig}, future::FutureExt, notification::{ListenerFuture, RemovalCause}, ops::compute, @@ -2125,7 +2128,7 @@ mod tests { use std::{ convert::Infallible, sync::{ - atomic::{AtomicU32, Ordering}, + atomic::{AtomicU32, AtomicU8, Ordering}, Arc, }, time::{Duration, Instant as StdInstant}, @@ -5103,6 +5106,159 @@ mod tests { verify_notification_vec(&cache, actual, &expected).await; } + // When the eviction listener is not set, calling `run_pending_tasks` once should + // evict all entries that can be removed. + #[tokio::test] + async fn no_batch_size_limit_on_eviction() { + const MAX_CAPACITY: u64 = 20; + + const EVICTION_TIMEOUT: Duration = Duration::from_millis(30); + const MAX_LOG_SYNC_REPEATS: usize = 1; + const EVICTION_BATCH_SIZE: usize = 1; + + let hk_conf = HousekeeperConfig::new( + Some(EVICTION_TIMEOUT), + Some(MAX_LOG_SYNC_REPEATS), + Some(EVICTION_BATCH_SIZE), + ); + + // Create a cache with the LRU policy. + let mut cache = Cache::builder() + .max_capacity(MAX_CAPACITY) + .eviction_policy(EvictionPolicy::lru()) + .housekeeper_config(hk_conf) + .build(); + cache.reconfigure_for_testing().await; + + // Make the cache exterior immutable. + let cache = cache; + + // Fill the cache. + for i in 0..MAX_CAPACITY { + let v = format!("v{i}"); + cache.insert(i, v).await + } + // The max capacity should not change because we have not called + // `run_pending_tasks` yet. + assert_eq!(cache.entry_count(), 0); + + cache.run_pending_tasks().await; + assert_eq!(cache.entry_count(), MAX_CAPACITY); + + // Insert more items the cache. + for i in MAX_CAPACITY..(MAX_CAPACITY * 2) { + let v = format!("v{i}"); + cache.insert(i, v).await + } + // The max capacity should not change because we have not called + // `run_pending_tasks` yet. + assert_eq!(cache.entry_count(), MAX_CAPACITY); + // Both old and new keys should exist. + assert!(cache.contains_key(&0)); // old + assert!(cache.contains_key(&(MAX_CAPACITY - 1))); // old + assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1))); // new + + // Process the remaining write op logs (there should be MAX_CAPACITY logs), + // and evict the LRU entries. + cache.run_pending_tasks().await; + assert_eq!(cache.entry_count(), MAX_CAPACITY); + + // Now the old keys should be gone. + assert!(!cache.contains_key(&0)); + assert!(!cache.contains_key(&(MAX_CAPACITY - 1))); + // And the new keys should exist. + assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1))); + } + + #[tokio::test] + async fn slow_eviction_listener() { + const MAX_CAPACITY: u64 = 20; + + const EVICTION_TIMEOUT: Duration = Duration::from_millis(30); + const LISTENER_DELAY: Duration = Duration::from_millis(11); + const MAX_LOG_SYNC_REPEATS: usize = 1; + const EVICTION_BATCH_SIZE: usize = 1; + + let hk_conf = HousekeeperConfig::new( + Some(EVICTION_TIMEOUT), + Some(MAX_LOG_SYNC_REPEATS), + Some(EVICTION_BATCH_SIZE), + ); + + let (clock, mock) = Clock::mock(); + let listener_call_count = Arc::new(AtomicU8::new(0)); + let lcc = Arc::clone(&listener_call_count); + + // A slow eviction listener that spend `LISTENER_DELAY` to process a removal + // notification. + let listener = move |_k, _v, _cause| { + mock.increment(LISTENER_DELAY); + lcc.fetch_add(1, Ordering::AcqRel); + }; + + // Create a cache with the LRU policy. + let mut cache = Cache::builder() + .max_capacity(MAX_CAPACITY) + .eviction_policy(EvictionPolicy::lru()) + .eviction_listener(listener) + .housekeeper_config(hk_conf) + .build(); + cache.reconfigure_for_testing().await; + cache.set_expiration_clock(Some(clock)).await; + + // Make the cache exterior immutable. + let cache = cache; + + // Fill the cache. + for i in 0..MAX_CAPACITY { + let v = format!("v{i}"); + cache.insert(i, v).await + } + // The max capacity should not change because we have not called + // `run_pending_tasks` yet. + assert_eq!(cache.entry_count(), 0); + + cache.run_pending_tasks().await; + assert_eq!(listener_call_count.load(Ordering::Acquire), 0); + assert_eq!(cache.entry_count(), MAX_CAPACITY); + + // Insert more items the cache. + for i in MAX_CAPACITY..(MAX_CAPACITY * 2) { + let v = format!("v{i}"); + cache.insert(i, v).await + } + assert_eq!(cache.entry_count(), MAX_CAPACITY); + + cache.run_pending_tasks().await; + // Because of the slow listener, cache should get an over capacity. + let mut expected_call_count = 3; + assert_eq!( + listener_call_count.load(Ordering::Acquire) as u64, + expected_call_count + ); + assert_eq!(cache.entry_count(), MAX_CAPACITY * 2 - expected_call_count); + + loop { + cache.run_pending_tasks().await; + + expected_call_count += 3; + if expected_call_count > MAX_CAPACITY { + expected_call_count = MAX_CAPACITY; + } + + let actual_count = listener_call_count.load(Ordering::Acquire) as u64; + assert_eq!(actual_count, expected_call_count); + let expected_entry_count = MAX_CAPACITY * 2 - expected_call_count; + assert_eq!(cache.entry_count(), expected_entry_count); + + if expected_call_count >= MAX_CAPACITY { + break; + } + } + + assert_eq!(cache.entry_count(), MAX_CAPACITY); + } + // NOTE: To enable the panic logging, run the following command: // // RUST_LOG=moka=info cargo test --features 'future, logging' -- \ diff --git a/src/future/housekeeper.rs b/src/future/housekeeper.rs index 2fd81e44..d20cad31 100644 --- a/src/future/housekeeper.rs +++ b/src/future/housekeeper.rs @@ -1,12 +1,10 @@ use crate::common::{ concurrent::{ atomic_time::AtomicInstant, - constants::{ - DEFAULT_RUN_PENDING_TASKS_TIMEOUT_MILLIS, MAX_SYNC_REPEATS, - PERIODICAL_SYNC_INITIAL_DELAY_MILLIS, READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT, - }, + constants::{LOG_SYNC_INTERVAL_MILLIS, READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT}, }, time::{CheckedTimeOps, Instant}, + HousekeeperConfig, }; use std::{ @@ -27,7 +25,12 @@ use futures_util::future::{BoxFuture, Shared}; #[async_trait] pub(crate) trait InnerSync { /// Runs the pending tasks. Returns `true` if there are more entries to evict. - async fn run_pending_tasks(&self, max_sync_repeats: usize, timeout: Option) -> bool; + async fn run_pending_tasks( + &self, + timeout: Option, + max_log_sync_repeats: usize, + eviction_batch_size: usize, + ) -> bool; /// Notifies all the async tasks waiting in `BaseCache::schedule_write_op` method /// for the write op channel to have enough room. @@ -40,11 +43,11 @@ pub(crate) struct Housekeeper { /// A shared `Future` of the maintenance task that is currently being resolved. current_task: Mutex>>>, run_after: AtomicInstant, - /// A flag to indicate if the last `run_pending_tasks` call left more entries to - /// evict. + /// A flag to indicate if the last call on `run_pending_tasks` method left some + /// entries to evict. /// /// Used only when the eviction listener closure is set for this cache instance - /// because, if not, `run_pending_tasks` will never leave more entries to evict. + /// because, if not, `run_pending_tasks` will never leave entries to evict. more_entries_to_evict: Option, /// The timeout duration for the `run_pending_tasks` method. This is a safe-guard /// to prevent cache read/write operations (that may call `run_pending_tasks` @@ -53,6 +56,12 @@ pub(crate) struct Housekeeper { /// /// Used only when the eviction listener closure is set for this cache instance. maintenance_task_timeout: Option, + /// The maximum repeat count for receiving operation logs from the read and write + /// log channels. Default: `MAX_LOG_SYNC_REPEATS`. + max_log_sync_repeats: usize, + /// The batch size of entries to be processed by each internal eviction method. + /// Default: `EVICTION_BATCH_SIZE`. + eviction_batch_size: usize, auto_run_enabled: AtomicBool, #[cfg(test)] pub(crate) start_count: AtomicUsize, @@ -61,13 +70,11 @@ pub(crate) struct Housekeeper { } impl Housekeeper { - pub(crate) fn new(is_eviction_listener_enabled: bool) -> Self { + pub(crate) fn new(is_eviction_listener_enabled: bool, config: HousekeeperConfig) -> Self { let (more_entries_to_evict, maintenance_task_timeout) = if is_eviction_listener_enabled { ( Some(AtomicBool::new(false)), - Some(Duration::from_millis( - DEFAULT_RUN_PENDING_TASKS_TIMEOUT_MILLIS, - )), + Some(config.maintenance_task_timeout), ) } else { (None, None) @@ -78,6 +85,8 @@ impl Housekeeper { run_after: AtomicInstant::new(Self::sync_after(Instant::now())), more_entries_to_evict, maintenance_task_timeout, + max_log_sync_repeats: config.max_log_sync_repeats, + eviction_batch_size: config.eviction_batch_size, auto_run_enabled: AtomicBool::new(true), #[cfg(test)] start_count: Default::default(), @@ -168,8 +177,10 @@ impl Housekeeper { more_to_evict = task.clone().await; } else { let timeout = self.maintenance_task_timeout; + let repeats = self.max_log_sync_repeats; + let batch_size = self.eviction_batch_size; // Create a new maintenance task and await it. - let task = async move { cache.run_pending_tasks(MAX_SYNC_REPEATS, timeout).await } + let task = async move { cache.run_pending_tasks(timeout, repeats, batch_size).await } .boxed() .shared(); *current_task = Some(task.clone()); @@ -191,7 +202,7 @@ impl Housekeeper { } fn sync_after(now: Instant) -> Instant { - let dur = Duration::from_millis(PERIODICAL_SYNC_INITIAL_DELAY_MILLIS); + let dur = Duration::from_millis(LOG_SYNC_INTERVAL_MILLIS); let ts = now.checked_add(dur); // Assuming that `now` is current wall clock time, this should never fail at // least next millions of years. diff --git a/src/sync/builder.rs b/src/sync/builder.rs index cd26dd23..4fec063e 100644 --- a/src/sync/builder.rs +++ b/src/sync/builder.rs @@ -1,6 +1,6 @@ use super::{Cache, SegmentedCache}; use crate::{ - common::{builder_utils, concurrent::Weigher}, + common::{builder_utils, concurrent::Weigher, HousekeeperConfig}, notification::{EvictionListener, RemovalCause}, policy::{EvictionPolicy, ExpirationPolicy}, Expiry, @@ -56,6 +56,7 @@ pub struct CacheBuilder { eviction_policy: EvictionPolicy, eviction_listener: Option>, expiration_policy: ExpirationPolicy, + housekeeper_config: HousekeeperConfig, invalidator_enabled: bool, cache_type: PhantomData, } @@ -75,6 +76,7 @@ where eviction_listener: None, eviction_policy: EvictionPolicy::default(), expiration_policy: ExpirationPolicy::default(), + housekeeper_config: HousekeeperConfig::default(), invalidator_enabled: false, cache_type: PhantomData, } @@ -115,6 +117,7 @@ where eviction_policy: self.eviction_policy, eviction_listener: self.eviction_listener, expiration_policy: self.expiration_policy, + housekeeper_config: self.housekeeper_config, invalidator_enabled: self.invalidator_enabled, cache_type: PhantomData, } @@ -143,6 +146,7 @@ where self.eviction_policy, self.eviction_listener, self.expiration_policy, + self.housekeeper_config, self.invalidator_enabled, ) } @@ -230,6 +234,7 @@ where self.eviction_policy, self.eviction_listener, self.expiration_policy, + self.housekeeper_config, self.invalidator_enabled, ) } @@ -264,6 +269,7 @@ where self.eviction_policy, self.eviction_listener, self.expiration_policy, + self.housekeeper_config, self.invalidator_enabled, ) } @@ -353,6 +359,7 @@ where self.eviction_policy, self.eviction_listener, self.expiration_policy, + self.housekeeper_config, self.invalidator_enabled, ) } @@ -476,6 +483,14 @@ impl CacheBuilder { builder } + #[cfg(test)] + pub(crate) fn housekeeper_config(self, conf: HousekeeperConfig) -> Self { + Self { + housekeeper_config: conf, + ..self + } + } + /// Enables support for [`Cache::invalidate_entries_if`][cache-invalidate-if] /// method. /// diff --git a/src/sync/cache.rs b/src/sync/cache.rs index af3547f1..bd231251 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -8,6 +8,7 @@ use crate::{ constants::WRITE_RETRY_INTERVAL_MICROS, housekeeper::InnerSync, Weigher, WriteOp, }, time::Instant, + HousekeeperConfig, }, notification::EvictionListener, ops::compute::{self, CompResult}, @@ -709,6 +710,7 @@ where EvictionPolicy::default(), None, ExpirationPolicy::default(), + HousekeeperConfig::default(), false, ) } @@ -739,6 +741,7 @@ where eviction_policy: EvictionPolicy, eviction_listener: Option>, expiration_policy: ExpirationPolicy, + housekeeper_config: HousekeeperConfig, invalidator_enabled: bool, ) -> Self { Self { @@ -751,6 +754,7 @@ where eviction_policy, eviction_listener, expiration_policy, + housekeeper_config, invalidator_enabled, ), value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)), @@ -1906,7 +1910,7 @@ where mod tests { use super::Cache; use crate::{ - common::time::Clock, + common::{time::Clock, HousekeeperConfig}, notification::RemovalCause, policy::{test_utils::ExpiryCallCounters, EvictionPolicy}, Expiry, @@ -1915,7 +1919,10 @@ mod tests { use parking_lot::Mutex; use std::{ convert::Infallible, - sync::Arc, + sync::{ + atomic::{AtomicU8, Ordering}, + Arc, + }, time::{Duration, Instant as StdInstant}, }; @@ -4789,6 +4796,159 @@ mod tests { assert!(cache.key_locks_map_is_empty()); } + // When the eviction listener is not set, calling `run_pending_tasks` once should + // evict all entries that can be removed. + #[test] + fn no_batch_size_limit_on_eviction() { + const MAX_CAPACITY: u64 = 20; + + const EVICTION_TIMEOUT: Duration = Duration::from_millis(30); + const MAX_LOG_SYNC_REPEATS: usize = 1; + const EVICTION_BATCH_SIZE: usize = 1; + + let hk_conf = HousekeeperConfig::new( + Some(EVICTION_TIMEOUT), + Some(MAX_LOG_SYNC_REPEATS), + Some(EVICTION_BATCH_SIZE), + ); + + // Create a cache with the LRU policy. + let mut cache = Cache::builder() + .max_capacity(MAX_CAPACITY) + .eviction_policy(EvictionPolicy::lru()) + .housekeeper_config(hk_conf) + .build(); + cache.reconfigure_for_testing(); + + // Make the cache exterior immutable. + let cache = cache; + + // Fill the cache. + for i in 0..MAX_CAPACITY { + let v = format!("v{i}"); + cache.insert(i, v) + } + // The max capacity should not change because we have not called + // `run_pending_tasks` yet. + assert_eq!(cache.entry_count(), 0); + + cache.run_pending_tasks(); + assert_eq!(cache.entry_count(), MAX_CAPACITY); + + // Insert more items the cache. + for i in MAX_CAPACITY..(MAX_CAPACITY * 2) { + let v = format!("v{i}"); + cache.insert(i, v) + } + // The max capacity should not change because we have not called + // `run_pending_tasks` yet. + assert_eq!(cache.entry_count(), MAX_CAPACITY); + // Both old and new keys should exist. + assert!(cache.contains_key(&0)); // old + assert!(cache.contains_key(&(MAX_CAPACITY - 1))); // old + assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1))); // new + + // Process the remaining write op logs (there should be MAX_CAPACITY logs), + // and evict the LRU entries. + cache.run_pending_tasks(); + assert_eq!(cache.entry_count(), MAX_CAPACITY); + + // Now the old keys should be gone. + assert!(!cache.contains_key(&0)); + assert!(!cache.contains_key(&(MAX_CAPACITY - 1))); + // And the new keys should exist. + assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1))); + } + + #[test] + fn slow_eviction_listener() { + const MAX_CAPACITY: u64 = 20; + + const EVICTION_TIMEOUT: Duration = Duration::from_millis(30); + const LISTENER_DELAY: Duration = Duration::from_millis(11); + const MAX_LOG_SYNC_REPEATS: usize = 1; + const EVICTION_BATCH_SIZE: usize = 1; + + let hk_conf = HousekeeperConfig::new( + Some(EVICTION_TIMEOUT), + Some(MAX_LOG_SYNC_REPEATS), + Some(EVICTION_BATCH_SIZE), + ); + + let (clock, mock) = Clock::mock(); + let listener_call_count = Arc::new(AtomicU8::new(0)); + let lcc = Arc::clone(&listener_call_count); + + // A slow eviction listener that spend `LISTENER_DELAY` to process a removal + // notification. + let listener = move |_k, _v, _cause| { + mock.increment(LISTENER_DELAY); + lcc.fetch_add(1, Ordering::AcqRel); + }; + + // Create a cache with the LRU policy. + let mut cache = Cache::builder() + .max_capacity(MAX_CAPACITY) + .eviction_policy(EvictionPolicy::lru()) + .eviction_listener(listener) + .housekeeper_config(hk_conf) + .build(); + cache.reconfigure_for_testing(); + cache.set_expiration_clock(Some(clock)); + + // Make the cache exterior immutable. + let cache = cache; + + // Fill the cache. + for i in 0..MAX_CAPACITY { + let v = format!("v{i}"); + cache.insert(i, v) + } + // The max capacity should not change because we have not called + // `run_pending_tasks` yet. + assert_eq!(cache.entry_count(), 0); + + cache.run_pending_tasks(); + assert_eq!(listener_call_count.load(Ordering::Acquire), 0); + assert_eq!(cache.entry_count(), MAX_CAPACITY); + + // Insert more items the cache. + for i in MAX_CAPACITY..(MAX_CAPACITY * 2) { + let v = format!("v{i}"); + cache.insert(i, v); + } + assert_eq!(cache.entry_count(), MAX_CAPACITY); + + cache.run_pending_tasks(); + // Because of the slow listener, cache should get an over capacity. + let mut expected_call_count = 3; + assert_eq!( + listener_call_count.load(Ordering::Acquire) as u64, + expected_call_count + ); + assert_eq!(cache.entry_count(), MAX_CAPACITY * 2 - expected_call_count); + + loop { + cache.run_pending_tasks(); + + expected_call_count += 3; + if expected_call_count > MAX_CAPACITY { + expected_call_count = MAX_CAPACITY; + } + + let actual_count = listener_call_count.load(Ordering::Acquire) as u64; + assert_eq!(actual_count, expected_call_count); + let expected_entry_count = MAX_CAPACITY * 2 - expected_call_count; + assert_eq!(cache.entry_count(), expected_entry_count); + + if expected_call_count >= MAX_CAPACITY { + break; + } + } + + assert_eq!(cache.entry_count(), MAX_CAPACITY); + } + // NOTE: To enable the panic logging, run the following command: // // RUST_LOG=moka=info cargo test --features 'logging' -- \ diff --git a/src/sync/segment.rs b/src/sync/segment.rs index c22acd9a..f3bd626b 100644 --- a/src/sync/segment.rs +++ b/src/sync/segment.rs @@ -1,6 +1,7 @@ use super::{cache::Cache, CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector}; +use crate::common::concurrent::Weigher; use crate::{ - common::concurrent::Weigher, + common::HousekeeperConfig, notification::EvictionListener, policy::{EvictionPolicy, ExpirationPolicy}, sync_base::iter::{Iter, ScanningGet}, @@ -105,6 +106,7 @@ where EvictionPolicy::default(), None, ExpirationPolicy::default(), + HousekeeperConfig::default(), false, ) } @@ -211,6 +213,7 @@ where eviction_policy: EvictionPolicy, eviction_listener: Option>, expiration_policy: ExpirationPolicy, + housekeeper_config: HousekeeperConfig, invalidator_enabled: bool, ) -> Self { Self { @@ -224,6 +227,7 @@ where eviction_policy, eviction_listener, expiration_policy, + housekeeper_config, invalidator_enabled, )), } @@ -735,6 +739,7 @@ where eviction_policy: EvictionPolicy, eviction_listener: Option>, expiration_policy: ExpirationPolicy, + housekeeper_config: HousekeeperConfig, invalidator_enabled: bool, ) -> Self { assert!(num_segments > 0); @@ -758,6 +763,7 @@ where eviction_policy.clone(), eviction_listener.clone(), expiration_policy.clone(), + housekeeper_config.clone(), invalidator_enabled, ) }) diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index 61d557cc..fe6cc9ab 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -11,8 +11,7 @@ use crate::{ concurrent::{ atomic_time::AtomicInstant, constants::{ - EVICTION_BATCH_SIZE, INVALIDATION_BATCH_SIZE, READ_LOG_FLUSH_POINT, READ_LOG_SIZE, - WRITE_LOG_FLUSH_POINT, WRITE_LOG_SIZE, + READ_LOG_CH_SIZE, READ_LOG_FLUSH_POINT, WRITE_LOG_CH_SIZE, WRITE_LOG_FLUSH_POINT, }, deques::Deques, entry_info::EntryInfo, @@ -24,7 +23,7 @@ use crate::{ frequency_sketch::FrequencySketch, time::{CheckedTimeOps, Clock, Instant}, timer_wheel::{ReschedulingResult, TimerWheel}, - CacheRegion, + CacheRegion, HousekeeperConfig, }, notification::{notifier::RemovalNotifier, EvictionListener, RemovalCause}, policy::{EvictionPolicy, EvictionPolicyConfig, ExpirationPolicy}, @@ -146,12 +145,13 @@ where eviction_policy: EvictionPolicy, eviction_listener: Option>, expiration_policy: ExpirationPolicy, + housekeeper_config: HousekeeperConfig, invalidator_enabled: bool, ) -> Self { let (r_size, w_size) = if max_capacity == Some(0) { (0, 0) } else { - (READ_LOG_SIZE, WRITE_LOG_SIZE) + (READ_LOG_CH_SIZE, WRITE_LOG_CH_SIZE) }; let is_eviction_listener_enabled = eviction_listener.is_some(); @@ -176,7 +176,10 @@ where inner, read_op_ch: r_snd, write_op_ch: w_snd, - housekeeper: Some(Arc::new(Housekeeper::new(is_eviction_listener_enabled))), + housekeeper: Some(Arc::new(Housekeeper::new( + is_eviction_listener_enabled, + housekeeper_config, + ))), } } @@ -1058,7 +1061,7 @@ where (1, 0) } else { let ic = initial_capacity - .map(|cap| cap + WRITE_LOG_SIZE) + .map(|cap| cap + WRITE_LOG_CH_SIZE) .unwrap_or_default(); (64, ic) }; @@ -1236,8 +1239,13 @@ where V: Clone + Send + Sync + 'static, S: BuildHasher + Clone + Send + Sync + 'static, { - fn run_pending_tasks(&self, max_repeats: usize, timeout: Option) -> bool { - self.do_run_pending_tasks(max_repeats, timeout) + fn run_pending_tasks( + &self, + timeout: Option, + max_log_sync_repeats: usize, + eviction_batch_size: usize, + ) -> bool { + self.do_run_pending_tasks(timeout, max_log_sync_repeats, eviction_batch_size) } fn now(&self) -> Instant { @@ -1251,7 +1259,12 @@ where V: Clone + Send + Sync + 'static, S: BuildHasher + Clone + Send + Sync + 'static, { - fn do_run_pending_tasks(&self, max_repeats: usize, timeout: Option) -> bool { + fn do_run_pending_tasks( + &self, + timeout: Option, + max_log_sync_repeats: usize, + eviction_batch_size: usize, + ) -> bool { if self.max_capacity == Some(0) { return false; } @@ -1311,7 +1324,7 @@ where self.evict_expired_entries_using_deqs( &mut deqs, &mut timer_wheel, - EVICTION_BATCH_SIZE, + eviction_batch_size, &mut eviction_state, ); } @@ -1324,7 +1337,7 @@ where invalidator, &mut deqs, &mut timer_wheel, - INVALIDATION_BATCH_SIZE, + eviction_batch_size, &mut eviction_state, ); } @@ -1336,7 +1349,7 @@ where self.evict_lru_entries( &mut deqs, &mut timer_wheel, - EVICTION_BATCH_SIZE, + eviction_batch_size, weights_to_evict, &mut eviction_state, ); @@ -1344,7 +1357,7 @@ where // Check whether to continue this loop or not. - let should_process_logs = calls <= max_repeats + let should_process_logs = calls <= max_log_sync_repeats && (self.read_op_ch.len() >= READ_LOG_FLUSH_POINT || self.write_op_ch.len() >= WRITE_LOG_FLUSH_POINT); @@ -2565,7 +2578,10 @@ fn is_expired_by_ttl( #[cfg(test)] mod tests { - use crate::policy::{EvictionPolicy, ExpirationPolicy}; + use crate::{ + common::HousekeeperConfig, + policy::{EvictionPolicy, ExpirationPolicy}, + }; use super::BaseCache; @@ -2587,6 +2603,7 @@ mod tests { EvictionPolicy::default(), None, ExpirationPolicy::default(), + HousekeeperConfig::default(), false, ); cache.inner.enable_frequency_sketch_for_testing(); @@ -2671,19 +2688,19 @@ mod tests { ($cache:ident, $key:ident, $hash:ident, $mock:ident, $duration_secs:expr) => { // Increment the time. $mock.increment(Duration::from_millis($duration_secs * 1000 - 1)); - $cache.inner.run_pending_tasks(1, None); + $cache.inner.run_pending_tasks(None, 1, 10); assert!($cache.contains_key_with_hash(&$key, $hash)); assert_eq!($cache.entry_count(), 1); // Increment the time by 1ms (3). The entry should be expired. $mock.increment(Duration::from_millis(1)); - $cache.inner.run_pending_tasks(1, None); + $cache.inner.run_pending_tasks(None, 1, 10); assert!(!$cache.contains_key_with_hash(&$key, $hash)); // Increment the time again to ensure the entry has been evicted from the // cache. $mock.increment(Duration::from_secs(1)); - $cache.inner.run_pending_tasks(1, None); + $cache.inner.run_pending_tasks(None, 1, 10); assert_eq!($cache.entry_count(), 0); }; } @@ -2937,6 +2954,7 @@ mod tests { Some(Duration::from_secs(TTI)), expiry, ), + HousekeeperConfig::default(), false, ); cache.reconfigure_for_testing(); @@ -2967,7 +2985,7 @@ mod tests { insert(&cache, key, hash, value); // Run a sync to register the entry to the internal data structures including // the timer wheel. - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert_eq!(cache.entry_count(), 1); assert_expiry!(cache, key, hash, mock, 1); @@ -2989,12 +3007,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None); let inserted_at = current_time(&cache); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(1)); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert!(cache.contains_key_with_hash(&key, hash)); // Read the entry (2). @@ -3013,7 +3031,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert_expiry!(cache, key, hash, mock, 3); @@ -3035,12 +3053,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None); let inserted_at = current_time(&cache); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(1)); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert!(cache.contains_key_with_hash(&key, hash)); // Read the entry (2). @@ -3059,11 +3077,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); // Increment the time. mock.increment(Duration::from_secs(2)); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3078,7 +3096,7 @@ mod tests { Some(3), ); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert_eq!(cache.entry_count(), 1); assert_expiry!(cache, key, hash, mock, 3); @@ -3101,12 +3119,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None); let inserted_at = current_time(&cache); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(1)); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3126,11 +3144,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); // Increment the time. mock.increment(Duration::from_secs(2)); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3145,7 +3163,7 @@ mod tests { None, ); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert_eq!(cache.entry_count(), 1); assert_expiry!(cache, key, hash, mock, 7); @@ -3167,12 +3185,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8)); let inserted_at = current_time(&cache); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(5)); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3192,7 +3210,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert_expiry!(cache, key, hash, mock, 7); @@ -3214,12 +3232,12 @@ mod tests { ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8)); let inserted_at = current_time(&cache); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(5)); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3239,11 +3257,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3263,7 +3281,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert_expiry!(cache, key, hash, mock, 5); @@ -3284,12 +3302,12 @@ mod tests { *expectation.lock().unwrap() = ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(9)); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3305,12 +3323,12 @@ mod tests { ); let updated_at = current_time(&cache); insert(&cache, key, hash, value); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert_eq!(cache.entry_count(), 1); // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3330,11 +3348,11 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); // Increment the time. mock.increment(Duration::from_secs(6)); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert!(cache.contains_key_with_hash(&key, hash)); assert_eq!(cache.entry_count(), 1); @@ -3354,7 +3372,7 @@ mod tests { .map(Entry::into_value), Some(value) ); - cache.inner.run_pending_tasks(1, None); + cache.inner.run_pending_tasks(None, 1, 10); assert_expiry!(cache, key, hash, mock, 4); }