From 92babec345258e496615272bf4a7d1cf8b53caa6 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sun, 4 Dec 2022 23:38:51 +0800 Subject: [PATCH] refactor(layer/cache): Allow users implement cache by themselves (#1040) Signed-off-by: Xuanwo Signed-off-by: Xuanwo --- src/layers/cache/accessor.rs | 342 ++--------------------------------- src/layers/cache/mod.rs | 6 - src/layers/cache/policy.rs | 234 +++++++++--------------- src/ops.rs | 4 +- 4 files changed, 101 insertions(+), 485 deletions(-) diff --git a/src/layers/cache/accessor.rs b/src/layers/cache/accessor.rs index 39f44d752e5..5780516d881 100644 --- a/src/layers/cache/accessor.rs +++ b/src/layers/cache/accessor.rs @@ -13,24 +13,10 @@ // limitations under the License. use std::fmt::Debug; -use std::pin::Pin; use std::sync::Arc; -use std::task::Context; -use std::task::Poll; use async_trait::async_trait; -use bytes::Bytes; -use futures::future::BoxFuture; -use futures::io; -use futures::io::Cursor; -use futures::ready; -use futures::AsyncRead; -use futures::AsyncReadExt; -use futures::FutureExt; -use super::policy::CacheReadEntry; -use super::policy::CacheReadEntryIterator; -use super::policy::CacheUpdateMethod; use super::*; use crate::raw::*; use crate::*; @@ -54,22 +40,6 @@ impl CacheAccessor { policy, } } - - #[inline] - async fn update_cache(&self, path: &str, op: Operation) { - let it = self.policy.on_update(path, op).await; - for entry in it { - match entry.update_method { - CacheUpdateMethod::Skip => continue, - CacheUpdateMethod::Delete => { - let _ = self - .cache - .delete(&entry.cache_path, OpDelete::default()) - .await; - } - } - } - } } #[async_trait] @@ -79,45 +49,21 @@ impl Accessor for CacheAccessor { } async fn create(&self, path: &str, args: OpCreate) -> Result { - self.update_cache(path, Operation::Create).await; - - self.inner.create(path, args).await + self.policy + .on_create(self.inner.clone(), self.cache.clone(), path, args) + .await } - async fn read(&self, path: &str, mut args: OpRead) -> Result<(RpRead, BytesReader)> { - let total_size = if let Some(total_size) = args.total_size_hint() { - total_size - } else { - let rp = self.inner.stat(path, OpStat::default()).await?; - rp.into_metadata().content_length() - }; - - let bcr = BytesContentRange::from_bytes_range(total_size, args.range()); - let br = bcr.to_bytes_range().expect("bytes range must be valid"); - args = args.with_range(br); - - let (offset, size) = ( - args.range().offset().expect("offset must be valid"), - args.range().size().expect("size must be valid"), - ); - - let it = self.policy.on_read(path, offset, size, total_size).await; - - Ok(( - RpRead::new(size), - Box::new(CacheReader::new( - self.inner.clone(), - self.cache.clone(), - path, - it, - )) as BytesReader, - )) + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, BytesReader)> { + self.policy + .on_read(self.inner.clone(), self.cache.clone(), path, args) + .await } async fn write(&self, path: &str, args: OpWrite, r: BytesReader) -> Result { - self.update_cache(path, Operation::Write).await; - - self.inner.write(path, args, r).await + self.policy + .on_write(self.inner.clone(), self.cache.clone(), path, args, r) + .await } async fn stat(&self, path: &str, args: OpStat) -> Result { @@ -125,270 +71,8 @@ impl Accessor for CacheAccessor { } async fn delete(&self, path: &str, args: OpDelete) -> Result { - self.update_cache(path, Operation::Delete).await; - - self.inner.delete(path, args).await - } -} - -struct CacheReader { - inner: Arc, - cache: Arc, - - path: String, - it: CacheReadEntryIterator, - state: CacheState, -} - -enum CacheState { - Idle, - Polling(BoxFuture<'static, Result<(RpRead, BytesReader)>>), - Reading((RpRead, BytesReader)), -} - -impl CacheReader { - fn new( - inner: Arc, - cache: Arc, - path: &str, - it: CacheReadEntryIterator, - ) -> Self { - Self { - inner, - cache, - path: path.to_string(), - it, - state: CacheState::Idle, - } - } -} - -impl AsyncRead for CacheReader { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - let cache = self.cache.clone(); - let inner = self.inner.clone(); - let path = self.path.clone(); - - match &mut self.state { - CacheState::Idle => { - let entry = match self.it.next() { - Some(entry) => entry, - None => return Poll::Ready(Ok(0)), - }; - - let fut = Box::pin(read_cache_entry(inner, cache, path, entry)); - self.state = CacheState::Polling(fut); - self.poll_read(cx, buf) - } - CacheState::Polling(fut) => { - let r = ready!(fut.poll_unpin(cx))?; - self.state = CacheState::Reading(r); - self.poll_read(cx, buf) - } - CacheState::Reading((_, r)) => match ready!(Pin::new(r).poll_read(cx, buf)) { - Ok(n) if n == 0 => { - self.state = CacheState::Idle; - self.poll_read(cx, buf) - } - Ok(n) => Poll::Ready(Ok(n)), - Err(err) => Poll::Ready(Err(err)), - }, - } - } -} - -async fn read_cache_entry( - inner: Arc, - cache: Arc, - path: String, - entry: CacheReadEntry, -) -> Result<(RpRead, BytesReader)> { - // If we don't need to fill the cache, we can read with inner_read_cache - // directly. - if entry.fill_method == CacheFillMethod::Skip { - let (rp, r, _) = read_for_load_cache(&inner, &cache, &path, &entry).await?; - - return Ok((rp, r)); - } - - // If we need to fill cache in sync way, we can fill cache first - // and try to load from cache. - if entry.fill_method == CacheFillMethod::Sync { - let (rp, mut r, cache_hit) = read_for_fill_cache(&inner, &cache, &path, &entry).await?; - if cache_hit { - return Ok((rp, r)); - } - - let meta = rp.into_metadata(); - let size = meta.content_length(); - // if the size is small enough, we can load in memory to avoid - // load from cache again. Otherwise, we will fallback to write - // in to cache first and than read from cache. - // - // TODO: make this a config value. - if size < 8 * 1024 * 1024 { - let mut bs = Vec::with_capacity(size as usize); - r.read_to_end(&mut bs).await.map_err(|err| { - Error::new(ErrorKind::Unexpected, "read from underlying storage") - .set_source(err) - .set_temporary() - })?; - let bs = Bytes::from(bs); - - // Ignore error happened during writing cache. - let _ = cache - .write( - &entry.cache_path, - OpWrite::new(size), - Box::new(Cursor::new(bs.clone())), - ) - .await; - - // Make sure the reading range has been applied on cache. - let bs = entry.cache_read_range.apply_on_bytes(bs); - - return Ok((RpRead::new(bs.len() as u64), Box::new(Cursor::new(bs)))); - } else { - // Ignore error happened during writing cache. - let _ = cache.write(&entry.cache_path, OpWrite::new(size), r).await; - - let (rp, r, _) = read_for_load_cache(&inner, &cache, &path, &entry).await?; - - return Ok((rp, r)); - } - } - - // If we need to fill cache in async way. - let (rp, r, cache_hit) = read_for_load_cache(&inner, &cache, &path, &entry).await?; - if cache_hit { - return Ok((rp, r)); - } - - // # Notes - // - // If a `JoinHandle` is dropped, then the task continues running - // in the background and its return value is lost. - // - // It's safe to just drop the handle here. - // - // # Todo - // - // We can support other runtime in the future. - let moved_inner = inner.clone(); - let moved_cache = cache.clone(); - let moved_path = path.clone(); - let moved_entry = entry.clone(); - let _ = tokio::spawn(async move { - let (rp, r) = moved_inner - .read(&moved_path, moved_entry.cache_fill_op()) - .await?; - let length = rp.into_metadata().content_length(); - moved_cache - .write(&moved_entry.cache_path, OpWrite::new(length), r) - .await?; - - Ok::<(), Error>(()) - }); - - Ok((rp, r)) -} - -/// Read for loading cache. -/// -/// This function is used to load cache. -async fn read_for_load_cache( - inner: &Arc, - cache: &Arc, - path: &str, - entry: &CacheReadEntry, -) -> Result<(RpRead, BytesReader, bool)> { - if !entry.read_cache { - let (rp, r) = inner.read(path, entry.inner_read_op()).await?; - - return Ok((rp, r, false)); - } - - let res = match cache.read(&entry.cache_path, entry.cache_read_op()).await { - Ok((rp, r)) => (rp, r, true), - Err(_) => { - let (rp, r) = inner.read(path, entry.inner_read_op()).await?; - (rp, r, false) - } - }; - - Ok(res) -} - -/// Read for filling cache. -/// -/// This function is used to read data that can by cached. -/// -/// - If the cache is exist, we will return the real content. -/// - If the cache is missing or not read from cache, we will return the data -/// for filling cache. -async fn read_for_fill_cache( - inner: &Arc, - cache: &Arc, - path: &str, - entry: &CacheReadEntry, -) -> Result<(RpRead, BytesReader, bool)> { - if !entry.read_cache { - let (rp, r) = inner.read(path, entry.cache_fill_op()).await?; - - return Ok((rp, r, false)); - } - - // If cache does exists. - if let Ok((rp, r)) = cache.read(&entry.cache_path, entry.cache_read_op()).await { - return Ok((rp, r, true)); - } - - let (rp, r) = inner.read(path, entry.cache_fill_op()).await?; - - Ok((rp, r, false)) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::services::memory; - use crate::Operator; - - #[tokio::test] - async fn test_default_content_cache() -> anyhow::Result<()> { - let op = Operator::new(memory::Builder::default().build()?); - - let cache_layer = CacheLayer::new(Arc::new(memory::Builder::default().build()?).into()); - let cached_op = op.clone().layer(cache_layer); - - // Write a new object into op. - op.object("test_exist") - .write("Hello, World!".as_bytes()) - .await?; - - // Read from cached op. - let data = cached_op.object("test_exist").read().await?; - assert_eq!(data.len(), 13); - - // Write into cache op. - cached_op - .object("test_exist") - .write("Hello, Xuanwo!".as_bytes()) - .await?; - // op and cached op should have same data. - let data = op.object("test_exist").read().await?; - assert_eq!(data.len(), 14); - let data = cached_op.object("test_exist").read().await?; - assert_eq!(data.len(), 14); - - // Read not exist object. - let data = cached_op.object("test_not_exist").read().await; - assert_eq!(data.unwrap_err().kind(), ErrorKind::ObjectNotFound); - - Ok(()) + self.policy + .on_delete(self.inner.clone(), self.cache.clone(), path, args) + .await } } diff --git a/src/layers/cache/mod.rs b/src/layers/cache/mod.rs index f4210587c9f..aafa012935a 100644 --- a/src/layers/cache/mod.rs +++ b/src/layers/cache/mod.rs @@ -19,11 +19,5 @@ mod accessor; use accessor::CacheAccessor; mod policy; -pub use policy::CacheFillMethod; pub use policy::CachePolicy; -pub use policy::CacheReadEntry; -pub use policy::CacheReadEntryIterator; -pub use policy::CacheUpdateEntry; -pub use policy::CacheUpdateEntryIterator; -pub use policy::CacheUpdateMethod; use policy::DefaultCachePolicy; diff --git a/src/layers/cache/policy.rs b/src/layers/cache/policy.rs index 66fa2a19e36..e3521f6e937 100644 --- a/src/layers/cache/policy.rs +++ b/src/layers/cache/policy.rs @@ -13,181 +13,119 @@ // limitations under the License. use std::fmt::Debug; +use std::sync::Arc; -use async_trait::async_trait; +use futures::future::BoxFuture; use crate::raw::*; use crate::*; +pub type CacheResult = BoxFuture<'static, Result>; + /// CachePolicy allows user to specify the policy while caching. -#[async_trait] pub trait CachePolicy: Send + Sync + Debug + 'static { - /// The policy for reading cache. - /// - /// `on_read` will return a [`CacheReadEntryIterator`] which can iterate - /// serval [`CacheReadEntry`]. Cache layer will take different operations - /// as specified by [`CacheReadEntry`]. - /// - /// # Notes - /// - /// It's implementor's abailty to make sure the returning entry is - /// correct. - async fn on_read( + /// on_create returns the cache policy on create operation. + fn on_create( &self, + inner: Arc, + cache: Arc, path: &str, - offset: u64, - size: u64, - total_size: u64, - ) -> CacheReadEntryIterator; - - /// The policy for updating cache. - /// - /// `on_update` will return a [`CacheUpdateEntryIterator`] which can - /// iterate serval [`CacheUpdateEntry`]. Cache layer will take different - /// operations as specified by [`CacheUpdateEntry`]. - /// - /// # Notes - /// - /// It's implementor's abailty to make sure the returning entry is - /// correct. - /// - /// on_update will be called on `create`, `write` and `delete`. - async fn on_update(&self, path: &str, op: Operation) -> CacheUpdateEntryIterator; -} - -#[derive(Debug)] -pub struct DefaultCachePolicy; - -#[async_trait] -impl CachePolicy for DefaultCachePolicy { - async fn on_read(&self, path: &str, offset: u64, size: u64, _: u64) -> CacheReadEntryIterator { - let br: BytesRange = (offset..offset + size).into(); + args: OpCreate, + ) -> CacheResult { + let _ = cache; - Box::new( - vec![CacheReadEntry { - cache_path: path.to_string(), + let path = path.to_string(); + Box::pin(async move { inner.create(&path, args).await }) + } - read_cache: true, - cache_read_range: br, - inner_read_range: br, + /// on_read returns the cache policy on read operation. + fn on_read( + &self, + inner: Arc, + cache: Arc, + path: &str, + args: OpRead, + ) -> CacheResult<(RpRead, BytesReader)> { + let _ = cache; - fill_method: CacheFillMethod::Async, - cache_fill_range: br, - }] - .into_iter(), - ) + let path = path.to_string(); + Box::pin(async move { inner.read(&path, args).await }) } - async fn on_update(&self, path: &str, _: Operation) -> CacheUpdateEntryIterator { - Box::new( - vec![CacheUpdateEntry { - cache_path: path.to_string(), + /// on_write returns the cache policy on write operation. + fn on_write( + &self, + inner: Arc, + cache: Arc, + path: &str, + args: OpWrite, + r: BytesReader, + ) -> CacheResult { + let _ = cache; - update_method: CacheUpdateMethod::Delete, - }] - .into_iter(), - ) + let path = path.to_string(); + Box::pin(async move { inner.write(&path, args, r).await }) } -} -/// CacheFillMethod specify the cache fill method while cache missing. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum CacheFillMethod { - /// Don't fill cache. - /// - /// Return data from inner directly without fill into the cache. - Skip, - /// Fill cache in sync way. - /// - /// Write data into cache first and than read from cache. - Sync, - /// Fill cache in async way. - /// - /// Spawn an async task to runtime and return data directly. - Async, -} - -/// CacheReadEntryIterator is a boxed iterator for [`CacheReadEntry`]. -pub type CacheReadEntryIterator = Box + Send>; - -/// CacheReadEntry indicates the operations that cache layer needs to take. -/// -/// # TODO -/// -/// Add debug_assert to make sure: -/// -/// - cache_read_range.size() == inner_read_range.size() -/// - cache_fill_range contains inner_read_range ? -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct CacheReadEntry { - /// cache_path is the path that we need to read or fill. - pub cache_path: String, - - /// read_method indicates that do we need to read from cache. - pub read_cache: bool, - /// the range to read from cache file if we decide to read cache. - pub cache_read_range: BytesRange, - /// the range to read from inner file if we decide to skip cache - /// or cache missed. - pub inner_read_range: BytesRange, - - /// fill_method indicates that how we will fill the cache. - pub fill_method: CacheFillMethod, - /// the range to read from inner file to fill the cache. - pub cache_fill_range: BytesRange, -} + /// on_delete returns the cache policy on delete operation. + fn on_delete( + &self, + inner: Arc, + cache: Arc, + path: &str, + args: OpDelete, + ) -> CacheResult { + let _ = cache; -impl CacheReadEntry { - /// Build an OpRead from cache read range. - pub fn cache_read_op(&self) -> OpRead { - OpRead::new().with_range(self.cache_read_range) + let path = path.to_string(); + Box::pin(async move { inner.delete(&path, args).await }) } +} - /// Build an OpRead from inner read range. - pub fn inner_read_op(&self) -> OpRead { - OpRead::new().with_range(self.inner_read_range) +impl CachePolicy for Arc { + fn on_create( + &self, + inner: Arc, + cache: Arc, + path: &str, + args: OpCreate, + ) -> CacheResult { + self.as_ref().on_create(inner, cache, path, args) } - /// The size for cache fill. - pub fn inner_read_size(&self) -> u64 { - self.inner_read_range.size().expect("size must be valid") + fn on_read( + &self, + inner: Arc, + cache: Arc, + path: &str, + args: OpRead, + ) -> CacheResult<(RpRead, BytesReader)> { + self.as_ref().on_read(inner, cache, path, args) } - /// Build an OpRead from cache fill range. - pub fn cache_fill_op(&self) -> OpRead { - OpRead::new().with_range(self.cache_fill_range) + fn on_write( + &self, + inner: Arc, + cache: Arc, + path: &str, + args: OpWrite, + r: BytesReader, + ) -> CacheResult { + self.as_ref().on_write(inner, cache, path, args, r) } - /// The size for cache fill. - pub fn cache_fill_size(&self) -> u64 { - self.cache_fill_range.size().expect("size must be valid") + fn on_delete( + &self, + inner: Arc, + cache: Arc, + path: &str, + args: OpDelete, + ) -> CacheResult { + self.as_ref().on_delete(inner, cache, path, args) } } -/// CacheUpdateEntryIterator is a boxed iterator for [`CacheUpdateEntry`]. -pub type CacheUpdateEntryIterator = Box + Send>; - -#[derive(Debug, Clone, PartialEq, Eq)] -/// CacheUpdateEntry indicates the operations that cache layer needs to take. -pub struct CacheUpdateEntry { - /// cache_path is the path that we need to read or fill. - pub cache_path: String, - - /// update_method indicates that do we need to update the cache. - pub update_method: CacheUpdateMethod, -} +#[derive(Debug)] +pub struct DefaultCachePolicy; -/// CacheUpdateMethod specify the cache update method while inner files changed. -/// -/// # Notes -/// -/// We could add new method in the future. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum CacheUpdateMethod { - /// Don't do anything on cache. - /// - /// Level the cache AS-IS until they cleaned by service itself. - Skip, - /// Delete the cache path. - Delete, -} +impl CachePolicy for DefaultCachePolicy {} diff --git a/src/ops.rs b/src/ops.rs index 6e5a8a5f417..1aee0b58456 100644 --- a/src/ops.rs +++ b/src/ops.rs @@ -238,13 +238,13 @@ impl OpRead { /// Add total size hint for `OpRead` so that our layers can reuse already /// known metadata. - pub(crate) fn with_total_size_hint(mut self, total_size_hint: u64) -> Self { + pub fn with_total_size_hint(mut self, total_size_hint: u64) -> Self { self.total_size_hint = Some(total_size_hint); self } /// Get totoal size hint from op read. - pub(crate) fn total_size_hint(&self) -> Option { + pub fn total_size_hint(&self) -> Option { self.total_size_hint } }