From e18824081abb10bb6ac31e56732c0821660d3063 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Wed, 29 Nov 2023 16:43:07 +0800 Subject: [PATCH] storage: implement CasManager to support chunk dedup at runtime Implement CasManager to support chunk dedup at runtime. The manager provides to major interfaces: - add chunk data to the CAS database - check whether a chunk exists in CAS database and copy it to blob file by copy_file_range() if the chunk exists. Signed-off-by: Jiang Liu --- storage/src/cache/dedup/db.rs | 5 +- storage/src/cache/dedup/mod.rs | 173 +++++++++++++++++++++++++++++++++ storage/src/cache/mod.rs | 6 ++ 3 files changed, 182 insertions(+), 2 deletions(-) diff --git a/storage/src/cache/dedup/db.rs b/storage/src/cache/dedup/db.rs index 6daff37c70b..c505cde8480 100644 --- a/storage/src/cache/dedup/db.rs +++ b/storage/src/cache/dedup/db.rs @@ -8,7 +8,7 @@ use std::path::Path; use r2d2::{Pool, PooledConnection}; use r2d2_sqlite::SqliteConnectionManager; -use rusqlite::{Connection, DropBehavior, OptionalExtension, Transaction}; +use rusqlite::{Connection, DropBehavior, OpenFlags, OptionalExtension, Transaction}; use super::Result; @@ -24,7 +24,8 @@ impl CasDb { } pub fn from_file(db_path: impl AsRef) -> Result { - let mgr = SqliteConnectionManager::file(db_path); + let mgr = SqliteConnectionManager::file(db_path) + .with_flags(OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE); let pool = r2d2::Pool::new(mgr)?; let conn = pool.get()?; diff --git a/storage/src/cache/dedup/mod.rs b/storage/src/cache/dedup/mod.rs index f52a8fcc1de..6f4a58eec70 100644 --- a/storage/src/cache/dedup/mod.rs +++ b/storage/src/cache/dedup/mod.rs @@ -2,11 +2,26 @@ // // SPDX-License-Identifier: Apache-2.0 +use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::fmt::{self, Display, Formatter}; +use std::fs::{File, OpenOptions}; use std::io::Error; +use std::path::Path; +use std::sync::{Arc, Mutex, RwLock}; + +use nydus_utils::digest::RafsDigest; + +use crate::cache::dedup::db::CasDb; +use crate::device::{BlobChunkInfo, BlobInfo}; +use crate::utils::copy_file_range; mod db; +lazy_static::lazy_static!( + static ref CAS_MGR: Mutex>> = Mutex::new(None); +); + /// Error codes related to local cas. #[derive(Debug)] pub enum CasError { @@ -47,3 +62,161 @@ impl From for CasError { /// Specialized `Result` for local cas. type Result = std::result::Result; + +pub struct CasMgr { + db: CasDb, + fds: RwLock>>, +} + +impl CasMgr { + pub fn new(db_path: impl AsRef) -> Result { + let db = CasDb::from_file(db_path.as_ref())?; + + Ok(CasMgr { + db, + fds: RwLock::new(HashMap::new()), + }) + } + + pub fn set_singleton(mgr: CasMgr) { + *CAS_MGR.lock().unwrap() = Some(Arc::new(mgr)); + } + + pub fn get_singleton() -> Option> { + CAS_MGR.lock().unwrap().clone() + } + + /// Deduplicate chunk data from existing data files. + /// + /// If any error happens, just pretend there's no source data available for dedup. + pub fn dedup_chunk( + &self, + blob: &BlobInfo, + chunk: &dyn BlobChunkInfo, + cache_file: &File, + ) -> bool { + let key = Self::chunk_key(blob, chunk); + if key.is_empty() { + return false; + } + + if let Ok(Some((path, offset))) = self.db.get_chunk_info(&key) { + let guard = self.fds.read().unwrap(); + let mut d_file = guard.get(&path).cloned(); + drop(guard); + + // Open the source file for dedup on demand. + if d_file.is_none() { + match OpenOptions::new().read(true).open(&path) { + Err(e) => warn!("failed to open dedup source file {}, {}", path, e), + Ok(f) => { + let mut guard = self.fds.write().unwrap(); + match guard.entry(path) { + Entry::Vacant(e) => { + let f = Arc::new(f); + e.insert(f.clone()); + d_file = Some(f); + } + Entry::Occupied(f) => { + // Somebody else has inserted the file, use it + d_file = Some(f.get().clone()); + } + } + } + } + } + + if let Some(f) = d_file { + match copy_file_range( + f, + offset, + cache_file, + chunk.uncompressed_offset(), + chunk.uncompressed_size() as usize, + ) { + Ok(_) => { + return true; + } + Err(e) => warn!("{e}"), + } + } + } + + false + } + + /// Add an available chunk data into the CAS database. + pub fn record_chunk( + &self, + blob: &BlobInfo, + chunk: &dyn BlobChunkInfo, + path: impl AsRef, + ) -> Result<()> { + let key = Self::chunk_key(blob, chunk); + if key.is_empty() { + return Ok(()); + } + + let path = path.as_ref().canonicalize()?; + let path = path.display().to_string(); + self.record_chunk_raw(&key, &path, chunk.uncompressed_offset()) + } + + pub fn record_chunk_raw(&self, chunk_id: &str, path: &str, offset: u64) -> Result<()> { + self.db.add_blob(path)?; + self.db.add_chunk(chunk_id, offset, path)?; + Ok(()) + } + + fn chunk_key(blob: &BlobInfo, chunk: &dyn BlobChunkInfo) -> String { + let id = chunk.chunk_id(); + if *id == RafsDigest::default() { + String::new() + } else { + blob.digester().to_string() + ":" + &chunk.chunk_id().to_string() + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::device::BlobFeatures; + use crate::test::MockChunkInfo; + use crate::RAFS_DEFAULT_CHUNK_SIZE; + use std::io::{Read, Write}; + use vmm_sys_util::tempfile::TempFile; + + #[test] + fn test_cas_chunk_op() { + let dbfile = TempFile::new().unwrap(); + let tmpfile2 = TempFile::new().unwrap(); + let src_path = tmpfile2.as_path().display().to_string(); + let mgr = CasMgr::new(dbfile.as_path()).unwrap(); + + let blob = BlobInfo::new( + 1, + src_path.clone(), + 8192, + 8192, + RAFS_DEFAULT_CHUNK_SIZE as u32, + 1, + BlobFeatures::empty(), + ); + let mut chunk = MockChunkInfo::new(); + chunk.uncompress_offset = 0; + chunk.uncompress_size = 8192; + let chunk = Arc::new(chunk) as Arc; + + let buf = vec![0x9u8; 8192]; + let mut src_file = tmpfile2.as_file().try_clone().unwrap(); + src_file.write_all(&buf).unwrap(); + mgr.record_chunk(&blob, chunk.as_ref(), &src_path).unwrap(); + + let mut tmpfile3 = TempFile::new().unwrap().into_file(); + assert!(mgr.dedup_chunk(&blob, chunk.as_ref(), &tmpfile3)); + let mut buf2 = vec![0x0u8; 8192]; + tmpfile3.read_exact(&mut buf2).unwrap(); + assert_eq!(buf, buf2); + } +} diff --git a/storage/src/cache/mod.rs b/storage/src/cache/mod.rs index 1ae6dda497e..fd0f77c0c4a 100644 --- a/storage/src/cache/mod.rs +++ b/storage/src/cache/mod.rs @@ -660,6 +660,12 @@ pub(crate) trait BlobCacheMgr: Send + Sync { fn check_stat(&self); } +#[cfg(feature = "dedup")] +pub use dedup::CasMgr; + +#[cfg(not(feature = "dedup"))] +pub struct CasMgr {} + #[cfg(test)] mod tests { use crate::device::{BlobChunkFlags, BlobFeatures};