diff --git a/Cargo.toml b/Cargo.toml index 69de7eae25d..d17bf270b5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,6 +95,7 @@ default = [ "backend-s3", "backend-http-proxy", "backend-localdisk", + "dedup", ] virtiofs = [ "nydus-service/virtiofs", @@ -116,6 +117,8 @@ backend-oss = ["nydus-storage/backend-oss"] backend-registry = ["nydus-storage/backend-registry"] backend-s3 = ["nydus-storage/backend-s3"] +dedup = ["nydus-storage/dedup"] + [workspace] members = [ "api", diff --git a/src/bin/nydusd/main.rs b/src/bin/nydusd/main.rs index e06693e7a55..914e2d8d18e 100644 --- a/src/bin/nydusd/main.rs +++ b/src/bin/nydusd/main.rs @@ -26,6 +26,7 @@ use nydus_service::{ create_daemon, create_fuse_daemon, create_vfs_backend, validate_threads_configuration, Error as NydusError, FsBackendMountCmd, FsBackendType, ServiceArgs, }; +use nydus_storage::cache::CasMgr; use crate::api_server_glue::ApiServerController; @@ -50,7 +51,7 @@ fn thread_validator(v: &str) -> std::result::Result { } fn append_fs_options(app: Command) -> Command { - app.arg( + let mut app = app.arg( Arg::new("bootstrap") .long("bootstrap") .short('B') @@ -87,7 +88,18 @@ fn append_fs_options(app: Command) -> Command { .help("Mountpoint within the FUSE/virtiofs device to mount the RAFS/passthroughfs filesystem") .default_value("/") .required(false), - ) + ); + + #[cfg(feature = "dedup")] + { + app = app.arg( + Arg::new("dedup-db") + .long("dedup-db") + .help("Database file for chunk deduplication"), + ); + } + + app } fn append_fuse_options(app: Command) -> Command { @@ -750,6 +762,13 @@ fn main() -> Result<()> { dump_program_info(); handle_rlimit_nofile_option(&args, "rlimit-nofile")?; + #[cfg(feature = "dedup")] + if let Some(db) = args.get_one::("dedup-db") { + let mgr = CasMgr::new(db).map_err(|e| eother!(format!("{}", e)))?; + info!("Enable chunk deduplication by using database at {}", db); + CasMgr::set_singleton(mgr); + } + match args.subcommand_name() { Some("singleton") => { // Safe to unwrap because the subcommand is `singleton`. diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 84cfd50495f..81be68097a1 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -58,7 +58,6 @@ regex = "1.7.0" toml = "0.5" [features] -default = ["dedup"] backend-localdisk = [] backend-localdisk-gpt = ["gpt", "backend-localdisk"] backend-localfs = [] diff --git a/storage/src/cache/cachedfile.rs b/storage/src/cache/cachedfile.rs index a18f8004fd2..26c3cf8510e 100644 --- a/storage/src/cache/cachedfile.rs +++ b/storage/src/cache/cachedfile.rs @@ -13,6 +13,7 @@ use std::collections::HashSet; use std::fs::File; use std::io::{ErrorKind, Read, Result}; use std::mem::ManuallyDrop; +use std::ops::Deref; use std::os::unix::io::{AsRawFd, RawFd}; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; @@ -29,7 +30,7 @@ use tokio::runtime::Runtime; use crate::backend::BlobReader; use crate::cache::state::ChunkMap; use crate::cache::worker::{AsyncPrefetchConfig, AsyncPrefetchMessage, AsyncWorkerMgr}; -use crate::cache::{BlobCache, BlobIoMergeState}; +use crate::cache::{BlobCache, BlobIoMergeState, CasMgr}; use crate::device::{ BlobChunkInfo, BlobInfo, BlobIoDesc, BlobIoRange, BlobIoSegment, BlobIoTag, BlobIoVec, BlobObject, BlobPrefetchRequest, @@ -133,8 +134,10 @@ pub(crate) struct FileCacheEntry { pub(crate) blob_info: Arc, pub(crate) cache_cipher_object: Arc, pub(crate) cache_cipher_context: Arc, + pub(crate) cas_mgr: Option>, pub(crate) chunk_map: Arc, pub(crate) file: Arc, + pub(crate) file_path: Arc, pub(crate) meta: Option, pub(crate) metrics: Arc, pub(crate) prefetch_state: Arc, @@ -182,13 +185,16 @@ impl FileCacheEntry { } fn delay_persist_chunk_data(&self, chunk: Arc, buffer: Arc) { + let blob_info = self.blob_info.clone(); let delayed_chunk_map = self.chunk_map.clone(); let file = self.file.clone(); + let file_path = self.file_path.clone(); let metrics = self.metrics.clone(); let is_raw_data = self.is_raw_data; let is_cache_encrypted = self.is_cache_encrypted; let cipher_object = self.cache_cipher_object.clone(); let cipher_context = self.cache_cipher_context.clone(); + let cas_mgr = self.cas_mgr.clone(); metrics.buffered_backend_size.add(buffer.size() as u64); self.runtime.spawn_blocking(move || { @@ -240,6 +246,11 @@ impl FileCacheEntry { }; let res = Self::persist_cached_data(&file, offset, buf); Self::_update_chunk_pending_status(&delayed_chunk_map, chunk.as_ref(), res.is_ok()); + if let Some(mgr) = cas_mgr { + if let Err(e) = mgr.record_chunk(&blob_info, chunk.deref(), file_path.as_ref()) { + warn!("failed to record chunk state for dedup, {}", e); + } + } }); } @@ -973,13 +984,22 @@ impl FileCacheEntry { trace!("dispatch single io range {:?}", req); for (i, chunk) in req.chunks.iter().enumerate() { - let is_ready = match self.chunk_map.check_ready_and_mark_pending(chunk.as_ref()) { + let mut is_ready = match self.chunk_map.check_ready_and_mark_pending(chunk.as_ref()) { Ok(true) => true, Ok(false) => false, Err(StorageError::Timeout) => false, // Retry if waiting for inflight IO timeouts Err(e) => return Err(einval!(e)), }; + if !is_ready { + if let Some(mgr) = self.cas_mgr.as_ref() { + is_ready = mgr.dedup_chunk(&self.blob_info, chunk.deref(), &self.file); + if is_ready { + self.update_chunk_pending_status(chunk.deref(), true); + } + } + } + // Directly read chunk data from file cache into user buffer iff: // - the chunk is ready in the file cache // - data in the file cache is plaintext. diff --git a/storage/src/cache/dedup/mod.rs b/storage/src/cache/dedup/mod.rs index 6f4a58eec70..0763c955b07 100644 --- a/storage/src/cache/dedup/mod.rs +++ b/storage/src/cache/dedup/mod.rs @@ -204,6 +204,7 @@ mod tests { BlobFeatures::empty(), ); let mut chunk = MockChunkInfo::new(); + chunk.block_id = RafsDigest { data: [3u8; 32] }; chunk.uncompress_offset = 0; chunk.uncompress_size = 8192; let chunk = Arc::new(chunk) as Arc; diff --git a/storage/src/cache/filecache/mod.rs b/storage/src/cache/filecache/mod.rs index d42ad388fcf..c0fa1af9537 100644 --- a/storage/src/cache/filecache/mod.rs +++ b/storage/src/cache/filecache/mod.rs @@ -21,8 +21,9 @@ use crate::cache::state::{ BlobStateMap, ChunkMap, DigestedChunkMap, IndexedChunkMap, NoopChunkMap, }; use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; -use crate::cache::{BlobCache, BlobCacheMgr}; +use crate::cache::{BlobCache, BlobCacheMgr, CasMgr}; use crate::device::{BlobFeatures, BlobInfo}; +use crate::utils::get_path_from_file; pub const BLOB_RAW_FILE_SUFFIX: &str = ".blob.raw"; pub const BLOB_DATA_FILE_SUFFIX: &str = ".blob.data"; @@ -208,11 +209,18 @@ impl FileCacheEntry { } else { reader.clone() }; + // Turn off chunk deduplication in case of cache data encryption is enabled or is tarfs. + let cas_mgr = if mgr.cache_encrypted || mgr.cache_raw_data || is_tarfs { + None + } else { + CasMgr::get_singleton() + }; let blob_compressed_size = Self::get_blob_size(&reader, &blob_info)?; let blob_uncompressed_size = blob_info.uncompressed_size(); let is_legacy_stargz = blob_info.is_legacy_stargz(); + let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id); let ( file, meta, @@ -221,7 +229,6 @@ impl FileCacheEntry { is_get_blob_object_supported, need_validation, ) = if is_tarfs { - let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id); let file = OpenOptions::new() .create(false) .write(false) @@ -231,7 +238,6 @@ impl FileCacheEntry { Arc::new(BlobStateMap::from(NoopChunkMap::new(true))) as Arc; (file, None, chunk_map, true, true, false) } else { - let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id); let (chunk_map, is_direct_chunkmap) = Self::create_chunk_map(mgr, &blob_info, &blob_file_path)?; // Validation is supported by RAFS v5 (which has no meta_ci) or v6 with chunk digest array. @@ -266,6 +272,7 @@ impl FileCacheEntry { ); return Err(einval!(msg)); } + let load_chunk_digest = need_validation || cas_mgr.is_some(); let meta = if blob_info.meta_ci_is_valid() { let meta = FileCacheMeta::new( blob_file_path, @@ -273,7 +280,7 @@ impl FileCacheEntry { Some(blob_meta_reader), Some(runtime.clone()), false, - need_validation, + load_chunk_digest, )?; Some(meta) } else { @@ -305,6 +312,11 @@ impl FileCacheEntry { (Default::default(), Default::default()) }; + let mut blob_data_file_path = String::new(); + if cas_mgr.is_some() { + blob_data_file_path = get_path_from_file(&file); + } + trace!( "filecache entry: is_raw_data {}, direct {}, legacy_stargz {}, separate_meta {}, tarfs {}, batch {}, zran {}", mgr.cache_raw_data, @@ -320,8 +332,10 @@ impl FileCacheEntry { blob_info, cache_cipher_object, cache_cipher_context, + cas_mgr, chunk_map, file: Arc::new(file), + file_path: Arc::new(blob_data_file_path), meta, metrics: mgr.metrics.clone(), prefetch_state: Arc::new(AtomicU32::new(0)), diff --git a/storage/src/cache/fscache/mod.rs b/storage/src/cache/fscache/mod.rs index 5b2285c9b0e..8418307cc5e 100644 --- a/storage/src/cache/fscache/mod.rs +++ b/storage/src/cache/fscache/mod.rs @@ -15,13 +15,13 @@ use tokio::runtime::Runtime; use crate::backend::BlobBackend; use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta}; +use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX; use crate::cache::state::{BlobStateMap, IndexedChunkMap, RangeMap}; use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; -use crate::cache::{BlobCache, BlobCacheMgr}; +use crate::cache::{BlobCache, BlobCacheMgr, CasMgr}; use crate::device::{BlobFeatures, BlobInfo, BlobObject}; use crate::factory::BLOB_FACTORY; - -use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX; +use crate::utils::get_path_from_file; const FSCACHE_BLOBS_CHECK_NUM: u8 = 1; @@ -240,9 +240,16 @@ impl FileCacheEntry { }; let blob_compressed_size = Self::get_blob_size(&reader, &blob_info)?; + // Turn off chunk deduplication in case of tarfs. + let cas_mgr = if is_tarfs { + None + } else { + CasMgr::get_singleton() + }; let need_validation = mgr.need_validation && !blob_info.is_legacy_stargz() && blob_info.has_feature(BlobFeatures::INLINED_CHUNK_DIGEST); + let load_chunk_digest = need_validation || cas_mgr.is_some(); let blob_file_path = format!("{}/{}", mgr.work_dir, blob_meta_id); let meta = if blob_info.meta_ci_is_valid() { FileCacheMeta::new( @@ -251,7 +258,7 @@ impl FileCacheEntry { Some(blob_meta_reader), None, true, - need_validation, + load_chunk_digest, )? } else { return Err(enosys!( @@ -266,13 +273,20 @@ impl FileCacheEntry { )?)); Self::restore_chunk_map(blob_info.clone(), file.clone(), &meta, &chunk_map); + let mut blob_data_file_path = String::new(); + if cas_mgr.is_some() { + blob_data_file_path = get_path_from_file(&file); + } + Ok(FileCacheEntry { blob_id, blob_info: blob_info.clone(), cache_cipher_object: Default::default(), cache_cipher_context: Default::default(), + cas_mgr, chunk_map, file, + file_path: Arc::new(blob_data_file_path), meta: Some(meta), metrics: mgr.metrics.clone(), prefetch_state: Arc::new(AtomicU32::new(0)),