From 5c4c7bb6491789b413c30e58fe11eeaebbd1bb5d Mon Sep 17 00:00:00 2001 From: Wenhao Ren Date: Wed, 25 Oct 2023 11:15:08 +0800 Subject: [PATCH] nydusd: add the config support of `amplify_io` Add the support of `amplify_io` in the config file of nydusd to configure read amplification. Signed-off-by: Wenhao Ren --- api/src/config.rs | 16 ++++++++++------ service/src/fs_cache.rs | 4 ++-- storage/src/cache/cachedfile.rs | 18 +++++++++--------- storage/src/cache/filecache/mod.rs | 8 ++++++-- storage/src/cache/fscache/mod.rs | 8 ++++++-- storage/src/factory.rs | 13 ++++++++++++- 6 files changed, 45 insertions(+), 22 deletions(-) diff --git a/api/src/config.rs b/api/src/config.rs index e606770ddc9..86e6c3466ba 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -820,7 +820,7 @@ pub struct RafsConfigV2 { #[serde(default = "default_rafs_mode")] pub mode: String, /// Batch size to read data from storage cache layer. - #[serde(default = "default_batch_size")] + #[serde(default = "default_user_io_batch_size")] pub batch_size: usize, /// Whether to validate data digest. #[serde(default)] @@ -875,7 +875,7 @@ pub struct PrefetchConfigV2 { #[serde(default = "default_prefetch_threads")] pub threads: usize, /// The batch size to prefetch data from backend. - #[serde(default = "default_prefetch_batch_size")] + #[serde(default = "default_prefetch_request_batch_size")] pub batch_size: usize, /// Network bandwidth rate limit in unit of Bytes and Zero means no limit. #[serde(default)] @@ -1194,11 +1194,15 @@ fn default_work_dir() -> String { ".".to_string() } -pub fn default_batch_size() -> usize { +fn default_merging_size() -> usize { 128 * 1024 } -fn default_prefetch_batch_size() -> usize { +fn default_user_io_batch_size() -> usize { + 1024 * 1024 +} + +pub fn default_prefetch_request_batch_size() -> usize { 1024 * 1024 } @@ -1364,7 +1368,7 @@ struct RafsConfig { #[serde(default)] pub latest_read_files: bool, // ZERO value means, amplifying user io is not enabled. - #[serde(default = "default_batch_size")] + #[serde(default = "default_user_io_batch_size")] pub amplify_io: usize, } @@ -1411,7 +1415,7 @@ struct FsPrefetchControl { pub threads_count: usize, /// Window size in unit of bytes to merge request to backend. - #[serde(default = "default_batch_size")] + #[serde(default = "default_merging_size")] pub merging_size: usize, /// Network bandwidth limitation for prefetching. diff --git a/service/src/fs_cache.rs b/service/src/fs_cache.rs index a98fb1c5cf3..aa0f72c9233 100644 --- a/service/src/fs_cache.rs +++ b/service/src/fs_cache.rs @@ -518,8 +518,8 @@ impl FsCacheHandler { .map_err(|e| eother!(format!("failed to start prefetch worker, {}", e)))?; let size = match cache_cfg.prefetch.batch_size.checked_next_power_of_two() { - None => nydus_api::default_batch_size() as u64, - Some(1) => nydus_api::default_batch_size() as u64, + None => nydus_api::default_prefetch_request_batch_size() as u64, + Some(1) => nydus_api::default_prefetch_request_batch_size() as u64, Some(s) => s as u64, }; let size = std::cmp::max(0x4_0000u64, size); diff --git a/storage/src/cache/cachedfile.rs b/storage/src/cache/cachedfile.rs index 0fc96c9cdc2..d9843afc152 100644 --- a/storage/src/cache/cachedfile.rs +++ b/storage/src/cache/cachedfile.rs @@ -300,7 +300,7 @@ impl FileCacheEntry { } } - fn prefetch_batch_size(&self) -> u64 { + fn prefetch_request_batch_size(&self) -> u64 { if self.prefetch_config.merging_size < 0x2_0000 { 0x2_0000 } else { @@ -308,7 +308,7 @@ impl FileCacheEntry { } } - fn ondemand_batch_size(&self) -> u64 { + fn user_io_batch_size(&self) -> u64 { if self.batch_size < 0x2_0000 { 0x2_0000 } else { @@ -559,7 +559,7 @@ impl BlobCache for FileCacheEntry { } // Then handle fs prefetch - let max_comp_size = self.prefetch_batch_size(); + let max_comp_size = self.prefetch_request_batch_size(); let mut bios = bios.to_vec(); bios.sort_unstable_by_key(|entry| entry.chunkinfo.compressed_offset()); self.metrics.prefetch_unmerged_chunks.add(bios.len() as u64); @@ -719,7 +719,7 @@ impl BlobObject for FileCacheEntry { let meta = self.meta.as_ref().ok_or_else(|| enoent!())?; let meta = meta.get_blob_meta().ok_or_else(|| einval!())?; let mut chunks = - meta.get_chunks_compressed(offset, size, self.prefetch_batch_size(), prefetch)?; + meta.get_chunks_compressed(offset, size, self.prefetch_request_batch_size(), prefetch)?; if !chunks.is_empty() { if let Some(meta) = self.get_blob_meta_info()? { chunks = self.strip_ready_chunks(meta, None, chunks); @@ -745,7 +745,7 @@ impl BlobObject for FileCacheEntry { let meta = self.meta.as_ref().ok_or_else(|| einval!())?; let meta = meta.get_blob_meta().ok_or_else(|| einval!())?; - let mut chunks = meta.get_chunks_uncompressed(offset, size, self.ondemand_batch_size())?; + let mut chunks = meta.get_chunks_uncompressed(offset, size, self.user_io_batch_size())?; if let Some(meta) = self.get_blob_meta_info()? { chunks = self.strip_ready_chunks(meta, None, chunks); } @@ -764,7 +764,7 @@ impl BlobObject for FileCacheEntry { let chunks_extended; let mut chunks = &range.chunks; - if let Some(v) = self.extend_pending_chunks(chunks, self.prefetch_batch_size())? { + if let Some(v) = self.extend_pending_chunks(chunks, self.prefetch_request_batch_size())? { chunks_extended = v; chunks = &chunks_extended; } @@ -934,7 +934,7 @@ impl FileCacheEntry { fn read_iter(&self, bios: &mut [BlobIoDesc], buffers: &[FileVolatileSlice]) -> Result { // Merge requests with continuous blob addresses. let requests = self - .merge_requests_for_user(bios, self.ondemand_batch_size()) + .merge_requests_for_user(bios, self.user_io_batch_size()) .ok_or_else(|| { for bio in bios.iter() { self.update_chunk_pending_status(&bio.chunkinfo, false); @@ -1100,14 +1100,14 @@ impl FileCacheEntry { + region.chunks[idx].compressed_size() as u64; let start = region.chunks[idx + 1].compressed_offset(); assert!(end <= start); - assert!(start - end <= self.ondemand_batch_size() >> RAFS_BATCH_SIZE_TO_GAP_SHIFT); + assert!(start - end <= self.user_io_batch_size() >> RAFS_BATCH_SIZE_TO_GAP_SHIFT); assert!(region.chunks[idx].id() < region.chunks[idx + 1].id()); } } // Try to extend requests. let mut region_hold; - if let Some(v) = self.extend_pending_chunks(®ion.chunks, self.ondemand_batch_size())? { + if let Some(v) = self.extend_pending_chunks(®ion.chunks, self.user_io_batch_size())? { if v.len() > r.chunks.len() { let mut tag_set = HashSet::new(); for (idx, chunk) in region.chunks.iter().enumerate() { diff --git a/storage/src/cache/filecache/mod.rs b/storage/src/cache/filecache/mod.rs index 2b158ca09b1..5cc509e63ed 100644 --- a/storage/src/cache/filecache/mod.rs +++ b/storage/src/cache/filecache/mod.rs @@ -23,7 +23,6 @@ use crate::cache::state::{ use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; use crate::cache::{BlobCache, BlobCacheMgr}; use crate::device::{BlobFeatures, BlobInfo}; -use crate::RAFS_DEFAULT_CHUNK_SIZE; pub const BLOB_RAW_FILE_SUFFIX: &str = ".blob.raw"; pub const BLOB_DATA_FILE_SUFFIX: &str = ".blob.data"; @@ -46,6 +45,7 @@ pub struct FileCacheMgr { cache_convergent_encryption: bool, cache_encryption_key: String, closed: Arc, + user_io_batch_size: Option, } impl FileCacheMgr { @@ -55,6 +55,7 @@ impl FileCacheMgr { backend: Arc, runtime: Arc, id: &str, + user_io_batch_size: Option, ) -> Result { let blob_cfg = config.get_filecache_config()?; let work_dir = blob_cfg.get_work_dir()?; @@ -77,6 +78,7 @@ impl FileCacheMgr { cache_convergent_encryption: blob_cfg.enable_convergent_encryption, cache_encryption_key: blob_cfg.encryption_key.clone(), closed: Arc::new(AtomicBool::new(false)), + user_io_batch_size, }) } @@ -339,7 +341,9 @@ impl FileCacheEntry { is_zran, dio_enabled: false, need_validation, - batch_size: RAFS_DEFAULT_CHUNK_SIZE, + // If none, use default 0 since it's not used. + // e.g., at build time. + batch_size: mgr.user_io_batch_size.unwrap_or(0), prefetch_config, }) } diff --git a/storage/src/cache/fscache/mod.rs b/storage/src/cache/fscache/mod.rs index cf624f4f427..b344e1fa207 100644 --- a/storage/src/cache/fscache/mod.rs +++ b/storage/src/cache/fscache/mod.rs @@ -20,7 +20,6 @@ use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; use crate::cache::{BlobCache, BlobCacheMgr}; use crate::device::{BlobFeatures, BlobInfo, BlobObject}; use crate::factory::BLOB_FACTORY; -use crate::RAFS_DEFAULT_CHUNK_SIZE; use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX; @@ -40,6 +39,7 @@ pub struct FsCacheMgr { need_validation: bool, blobs_check_count: Arc, closed: Arc, + user_io_batch_size: Option, } impl FsCacheMgr { @@ -49,6 +49,7 @@ impl FsCacheMgr { backend: Arc, runtime: Arc, id: &str, + user_io_batch_size: Option, ) -> Result { if config.cache_compressed { return Err(enosys!("fscache doesn't support compressed cache mode")); @@ -73,6 +74,7 @@ impl FsCacheMgr { need_validation: config.cache_validate, blobs_check_count: Arc::new(AtomicU8::new(0)), closed: Arc::new(AtomicBool::new(false)), + user_io_batch_size, }) } @@ -290,7 +292,9 @@ impl FileCacheEntry { is_zran, dio_enabled: true, need_validation, - batch_size: RAFS_DEFAULT_CHUNK_SIZE, + // If none, use default 0 since it's not used. + // e.g., at build time. + batch_size: mgr.user_io_batch_size.unwrap_or(0), prefetch_config, }) } diff --git a/storage/src/factory.rs b/storage/src/factory.rs index cc37a4e913c..8197fb7b8fc 100644 --- a/storage/src/factory.rs +++ b/storage/src/factory.rs @@ -117,6 +117,10 @@ impl BlobFactory { ) -> IOResult> { let backend_cfg = config.get_backend_config()?; let cache_cfg = config.get_cache_config()?; + let user_io_batch_size = match config.get_rafs_config() { + Ok(v) => Some(v.batch_size as u64), + Err(_) => None, + }; let key = BlobCacheMgrKey { config: config.clone(), }; @@ -128,7 +132,13 @@ impl BlobFactory { let backend = Self::new_backend(backend_cfg, &blob_info.blob_id())?; let mgr = match cache_cfg.cache_type.as_str() { "blobcache" | "filecache" => { - let mgr = FileCacheMgr::new(cache_cfg, backend, ASYNC_RUNTIME.clone(), &config.id)?; + let mgr = FileCacheMgr::new( + cache_cfg, + backend, + ASYNC_RUNTIME.clone(), + &config.id, + user_io_batch_size, + )?; mgr.init()?; Arc::new(mgr) as Arc } @@ -139,6 +149,7 @@ impl BlobFactory { backend, ASYNC_RUNTIME.clone(), &config.id, + user_io_batch_size, )?; mgr.init()?; Arc::new(mgr) as Arc