From 2f7b378865f4d3517089a4f56eddb8d8a679115f Mon Sep 17 00:00:00 2001 From: Wenhao Ren Date: Thu, 9 Nov 2023 12:15:34 +0800 Subject: [PATCH 1/2] rafs: rename variable names about prefetch configuration Variable names about prefetch are confused currently. So we merge variable names that have the same meaning, while DO NOT affect the field names read from the configuration file. Signed-off-by: Wenhao Ren --- api/src/config.rs | 78 +++++++++++++++++---------------- rafs/src/fs.rs | 19 ++++---- rafs/src/metadata/md_v5.rs | 2 +- storage/src/cache/cachedfile.rs | 4 +- storage/src/cache/worker.rs | 20 ++++----- 5 files changed, 63 insertions(+), 60 deletions(-) diff --git a/api/src/config.rs b/api/src/config.rs index 32c7541b117..c14387ae88d 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -684,7 +684,7 @@ impl CacheConfigV2 { if self.prefetch.batch_size > 0x10000000 { return false; } - if self.prefetch.threads == 0 || self.prefetch.threads > 1024 { + if self.prefetch.threads_count == 0 || self.prefetch.threads_count > 1024 { return false; } } @@ -820,8 +820,8 @@ pub struct RafsConfigV2 { #[serde(default = "default_rafs_mode")] pub mode: String, /// Batch size to read data from storage cache layer. - #[serde(default = "default_batch_size")] - pub batch_size: usize, + #[serde(rename = "batch_size", default = "default_batch_size")] + pub user_io_batch_size: usize, /// Whether to validate data digest. #[serde(default)] pub validate: bool, @@ -850,14 +850,14 @@ impl RafsConfigV2 { if self.mode != "direct" && self.mode != "cached" { return false; } - if self.batch_size > 0x10000000 { + if self.user_io_batch_size > 0x10000000 { return false; } if self.prefetch.enable { if self.prefetch.batch_size > 0x10000000 { return false; } - if self.prefetch.threads == 0 || self.prefetch.threads > 1024 { + if self.prefetch.threads_count == 0 || self.prefetch.threads_count > 1024 { return false; } } @@ -872,8 +872,8 @@ pub struct PrefetchConfigV2 { /// Whether to enable blob data prefetching. pub enable: bool, /// Number of data prefetching working threads. - #[serde(default = "default_prefetch_threads")] - pub threads: usize, + #[serde(rename = "threads", default = "default_prefetch_threads_count")] + pub threads_count: usize, /// The batch size to prefetch data from backend. #[serde(default = "default_prefetch_batch_size")] pub batch_size: usize, @@ -1202,7 +1202,7 @@ fn default_prefetch_batch_size() -> usize { 1024 * 1024 } -fn default_prefetch_threads() -> usize { +fn default_prefetch_threads_count() -> usize { 8 } @@ -1364,8 +1364,8 @@ struct RafsConfig { #[serde(default)] pub latest_read_files: bool, // ZERO value means, amplifying user io is not enabled. - #[serde(default = "default_batch_size")] - pub amplify_io: usize, + #[serde(rename = "amplify_io", default = "default_batch_size")] + pub user_io_batch_size: usize, } impl TryFrom for ConfigV2 { @@ -1376,7 +1376,7 @@ impl TryFrom for ConfigV2 { let mut cache: CacheConfigV2 = (&v.device.cache).try_into()?; let rafs = RafsConfigV2 { mode: v.mode, - batch_size: v.amplify_io, + user_io_batch_size: v.user_io_batch_size, validate: v.digest_validate, enable_xattr: v.enable_xattr, iostats_files: v.iostats_files, @@ -1407,23 +1407,23 @@ struct FsPrefetchControl { pub enable: bool, /// How many working threads to prefetch data. - #[serde(default = "default_prefetch_threads")] + #[serde(default = "default_prefetch_threads_count")] pub threads_count: usize, /// Window size in unit of bytes to merge request to backend. - #[serde(default = "default_batch_size")] - pub merging_size: usize, + #[serde(rename = "merging_size", default = "default_batch_size")] + pub batch_size: usize, /// Network bandwidth limitation for prefetching. /// /// In unit of Bytes. It sets a limit to prefetch bandwidth usage in order to /// reduce congestion with normal user IO. - /// bandwidth_rate == 0 -- prefetch bandwidth ratelimit disabled - /// bandwidth_rate > 0 -- prefetch bandwidth ratelimit enabled. + /// bandwidth_limit == 0 -- prefetch bandwidth ratelimit disabled + /// bandwidth_limit > 0 -- prefetch bandwidth ratelimit enabled. /// Please note that if the value is less than Rafs chunk size, /// it will be raised to the chunk size. - #[serde(default)] - pub bandwidth_rate: u32, + #[serde(default, rename = "bandwidth_rate")] + pub bandwidth_limit: u32, /// Whether to prefetch all filesystem data. #[serde(default = "default_prefetch_all")] @@ -1434,9 +1434,9 @@ impl From for PrefetchConfigV2 { fn from(v: FsPrefetchControl) -> Self { PrefetchConfigV2 { enable: v.enable, - threads: v.threads_count, - batch_size: v.merging_size, - bandwidth_limit: v.bandwidth_rate, + threads_count: v.threads_count, + batch_size: v.batch_size, + bandwidth_limit: v.bandwidth_limit, prefetch_all: v.prefetch_all, } } @@ -1450,18 +1450,20 @@ struct BlobPrefetchConfig { /// Number of data prefetching working threads. pub threads_count: usize, /// The maximum size of a merged IO request. - pub merging_size: usize, + #[serde(rename = "merging_size")] + pub batch_size: usize, /// Network bandwidth rate limit in unit of Bytes and Zero means no limit. - pub bandwidth_rate: u32, + #[serde(rename = "bandwidth_rate")] + pub bandwidth_limit: u32, } impl From<&BlobPrefetchConfig> for PrefetchConfigV2 { fn from(v: &BlobPrefetchConfig) -> Self { PrefetchConfigV2 { enable: v.enable, - threads: v.threads_count, - batch_size: v.merging_size, - bandwidth_limit: v.bandwidth_rate, + threads_count: v.threads_count, + batch_size: v.batch_size, + bandwidth_limit: v.bandwidth_limit, prefetch_all: true, } } @@ -1530,8 +1532,8 @@ mod tests { let config = BlobPrefetchConfig::default(); assert!(!config.enable); assert_eq!(config.threads_count, 0); - assert_eq!(config.merging_size, 0); - assert_eq!(config.bandwidth_rate, 0); + assert_eq!(config.batch_size, 0); + assert_eq!(config.bandwidth_limit, 0); let content = r#"{ "enable": true, @@ -1542,12 +1544,12 @@ mod tests { let config: BlobPrefetchConfig = serde_json::from_str(content).unwrap(); assert!(config.enable); assert_eq!(config.threads_count, 2); - assert_eq!(config.merging_size, 4); - assert_eq!(config.bandwidth_rate, 5); + assert_eq!(config.batch_size, 4); + assert_eq!(config.bandwidth_limit, 5); let config: PrefetchConfigV2 = (&config).into(); assert!(config.enable); - assert_eq!(config.threads, 2); + assert_eq!(config.threads_count, 2); assert_eq!(config.batch_size, 4); assert_eq!(config.bandwidth_limit, 5); assert!(config.prefetch_all); @@ -1618,7 +1620,7 @@ mod tests { assert!(blob_config.cache_config.is_object()); assert!(blob_config.prefetch_config.enable); assert_eq!(blob_config.prefetch_config.threads_count, 2); - assert_eq!(blob_config.prefetch_config.merging_size, 4); + assert_eq!(blob_config.prefetch_config.batch_size, 4); assert_eq!( blob_config.metadata_path.as_ref().unwrap().as_str(), "/tmp/metadata1" @@ -1630,7 +1632,7 @@ mod tests { assert_eq!(blob_config.cache.cache_type, "fscache"); assert!(blob_config.cache.fs_cache.is_some()); assert!(blob_config.cache.prefetch.enable); - assert_eq!(blob_config.cache.prefetch.threads, 2); + assert_eq!(blob_config.cache.prefetch.threads_count, 2); assert_eq!(blob_config.cache.prefetch.batch_size, 4); assert_eq!( blob_config.metadata_path.as_ref().unwrap().as_str(), @@ -1654,7 +1656,7 @@ mod tests { let blob_config = config.blob_config_legacy.as_ref().unwrap(); assert!(!blob_config.prefetch_config.enable); assert_eq!(blob_config.prefetch_config.threads_count, 0); - assert_eq!(blob_config.prefetch_config.merging_size, 0); + assert_eq!(blob_config.prefetch_config.batch_size, 0); } #[test] @@ -1967,7 +1969,7 @@ mod tests { let prefetch = &cache.prefetch; assert!(prefetch.enable); - assert_eq!(prefetch.threads, 8); + assert_eq!(prefetch.threads_count, 8); assert_eq!(prefetch.batch_size, 1000000); assert_eq!(prefetch.bandwidth_limit, 10000000); } @@ -1998,14 +2000,14 @@ mod tests { let rafs = config.rafs.as_ref().unwrap(); assert_eq!(&rafs.mode, "direct"); - assert_eq!(rafs.batch_size, 1000000); + assert_eq!(rafs.user_io_batch_size, 1000000); assert!(rafs.validate); assert!(rafs.enable_xattr); assert!(rafs.iostats_files); assert!(rafs.access_pattern); assert!(rafs.latest_read_files); assert!(rafs.prefetch.enable); - assert_eq!(rafs.prefetch.threads, 4); + assert_eq!(rafs.prefetch.threads_count, 4); assert_eq!(rafs.prefetch.batch_size, 1000000); assert_eq!(rafs.prefetch.bandwidth_limit, 10000000); assert!(rafs.prefetch.prefetch_all) @@ -2557,7 +2559,7 @@ mod tests { assert!(default_true()); assert_eq!(default_failure_limit(), 5); assert_eq!(default_prefetch_batch_size(), 1024 * 1024); - assert_eq!(default_prefetch_threads(), 8); + assert_eq!(default_prefetch_threads_count(), 8); } #[test] diff --git a/rafs/src/fs.rs b/rafs/src/fs.rs index 23f435733b6..c053d2e0c81 100644 --- a/rafs/src/fs.rs +++ b/rafs/src/fs.rs @@ -68,7 +68,7 @@ pub struct Rafs { fs_prefetch: bool, prefetch_all: bool, xattr_enabled: bool, - amplify_io: u32, + user_io_batch_size: u32, // static inode attributes i_uid: u32, @@ -102,7 +102,7 @@ impl Rafs { initialized: false, digest_validate: rafs_cfg.validate, fs_prefetch: rafs_cfg.prefetch.enable, - amplify_io: rafs_cfg.batch_size as u32, + user_io_batch_size: rafs_cfg.user_io_batch_size as u32, prefetch_all: rafs_cfg.prefetch.prefetch_all, xattr_enabled: rafs_cfg.enable_xattr, @@ -621,20 +621,21 @@ impl FileSystem for Rafs { assert!(!io_vecs.is_empty() && !io_vecs[0].is_empty()); // Try to amplify user io for Rafs v5, to improve performance. - let amplify_io = cmp::min(self.amplify_io as usize, w.available_bytes()) as u32; - if self.sb.meta.is_v5() && size < amplify_io { + let user_io_batch_size = + cmp::min(self.user_io_batch_size as usize, w.available_bytes()) as u32; + if self.sb.meta.is_v5() && size < user_io_batch_size { let all_chunks_ready = self.device.all_chunks_ready(&io_vecs); if !all_chunks_ready { let chunk_mask = self.metadata().chunk_size as u64 - 1; let next_chunk_base = (offset + (size as u64) + chunk_mask) & !chunk_mask; let window_base = cmp::min(next_chunk_base, inode_size); let actual_size = window_base - (offset & !chunk_mask); - if actual_size < amplify_io as u64 { - let window_size = amplify_io as u64 - actual_size; + if actual_size < user_io_batch_size as u64 { + let window_size = user_io_batch_size as u64 - actual_size; let orig_cnt = io_vecs.iter().fold(0, |s, d| s + d.len()); - self.sb.amplify_io( + self.sb.amplify_user_io( &self.device, - amplify_io, + user_io_batch_size, &mut io_vecs, &inode, window_base, @@ -1021,7 +1022,7 @@ mod tests { fs_prefetch: false, prefetch_all: false, xattr_enabled: false, - amplify_io: 0, + user_io_batch_size: 0, i_uid: 0, i_gid: 0, i_time: 0, diff --git a/rafs/src/metadata/md_v5.rs b/rafs/src/metadata/md_v5.rs index fd1362b2731..e0582808fea 100644 --- a/rafs/src/metadata/md_v5.rs +++ b/rafs/src/metadata/md_v5.rs @@ -128,7 +128,7 @@ impl RafsSuper { // not overlap user IO's chunk. // V5 rafs tries to amplify user IO by expanding more chunks to user IO and // expect that those chunks are likely to be continuous with user IO's chunks. - pub(crate) fn amplify_io( + pub(crate) fn amplify_user_io( &self, device: &BlobDevice, max_uncomp_size: u32, diff --git a/storage/src/cache/cachedfile.rs b/storage/src/cache/cachedfile.rs index 0fc96c9cdc2..76e1f98a6ed 100644 --- a/storage/src/cache/cachedfile.rs +++ b/storage/src/cache/cachedfile.rs @@ -301,10 +301,10 @@ impl FileCacheEntry { } fn prefetch_batch_size(&self) -> u64 { - if self.prefetch_config.merging_size < 0x2_0000 { + if self.prefetch_config.batch_size < 0x2_0000 { 0x2_0000 } else { - self.prefetch_config.merging_size as u64 + self.prefetch_config.batch_size as u64 } } diff --git a/storage/src/cache/worker.rs b/storage/src/cache/worker.rs index 295629546d4..98b68724391 100644 --- a/storage/src/cache/worker.rs +++ b/storage/src/cache/worker.rs @@ -26,19 +26,19 @@ pub(crate) struct AsyncPrefetchConfig { /// Number of working threads. pub threads_count: usize, /// Window size to merge/amplify requests. - pub merging_size: usize, + pub batch_size: usize, /// Network bandwidth for prefetch, in unit of Bytes and Zero means no rate limit is set. #[allow(unused)] - pub bandwidth_rate: u32, + pub bandwidth_limit: u32, } impl From<&PrefetchConfigV2> for AsyncPrefetchConfig { fn from(p: &PrefetchConfigV2) -> Self { AsyncPrefetchConfig { enable: p.enable, - threads_count: p.threads, - merging_size: p.batch_size, - bandwidth_rate: p.bandwidth_limit, + threads_count: p.threads_count, + batch_size: p.batch_size, + bandwidth_limit: p.bandwidth_limit, } } } @@ -97,7 +97,7 @@ impl AsyncWorkerMgr { prefetch_config: Arc, ) -> Result { #[cfg(feature = "prefetch-rate-limit")] - let prefetch_limiter = match prefetch_config.bandwidth_rate { + let prefetch_limiter = match prefetch_config.bandwidth_limit { 0 => None, v => { // If the given value is less than maximum blob chunk size, it exceeds burst size of the @@ -437,8 +437,8 @@ mod tests { let config = Arc::new(AsyncPrefetchConfig { enable: true, threads_count: 2, - merging_size: 0x100000, - bandwidth_rate: 0x100000, + batch_size: 0x100000, + bandwidth_limit: 0x100000, }); let mgr = Arc::new(AsyncWorkerMgr::new(metrics, config).unwrap()); @@ -477,8 +477,8 @@ mod tests { let config = Arc::new(AsyncPrefetchConfig { enable: true, threads_count: 4, - merging_size: 0x1000000, - bandwidth_rate: 0x1000000, + batch_size: 0x1000000, + bandwidth_limit: 0x1000000, }); let mgr = Arc::new(AsyncWorkerMgr::new(metrics, config).unwrap()); From 0063e89b65585c4da846f865057cd2da5b5aa062 Mon Sep 17 00:00:00 2001 From: Wenhao Ren Date: Thu, 9 Nov 2023 12:15:34 +0800 Subject: [PATCH 2/2] nydusd: add the config support of `amplify_io` Add the support of `amplify_io` in the config file of nydusd to configure read amplification. Signed-off-by: Wenhao Ren --- api/src/config.rs | 21 +++++++++++---------- docs/nydusd.md | 3 +++ misc/configs/nydusd-config-v2.toml | 3 ++- service/src/fs_cache.rs | 4 ++-- storage/src/cache/cachedfile.rs | 17 +++++++++-------- storage/src/cache/filecache/mod.rs | 6 ++++-- storage/src/cache/fscache/mod.rs | 9 ++++++--- storage/src/cache/worker.rs | 2 +- storage/src/factory.rs | 15 +++++++++++++-- 9 files changed, 51 insertions(+), 29 deletions(-) diff --git a/api/src/config.rs b/api/src/config.rs index c14387ae88d..5f8ce55c183 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -819,8 +819,8 @@ pub struct RafsConfigV2 { /// Filesystem metadata cache mode. #[serde(default = "default_rafs_mode")] pub mode: String, - /// Batch size to read data from storage cache layer. - #[serde(rename = "batch_size", default = "default_batch_size")] + /// Amplified user IO request batch size to read data from remote storage backend / local cache. + #[serde(rename = "batch_size", default = "default_user_io_batch_size")] pub user_io_batch_size: usize, /// Whether to validate data digest. #[serde(default)] @@ -874,7 +874,7 @@ pub struct PrefetchConfigV2 { /// Number of data prefetching working threads. #[serde(rename = "threads", default = "default_prefetch_threads_count")] pub threads_count: usize, - /// The batch size to prefetch data from backend. + /// The amplify batch size to prefetch data from backend. #[serde(default = "default_prefetch_batch_size")] pub batch_size: usize, /// Network bandwidth rate limit in unit of Bytes and Zero means no limit. @@ -1194,11 +1194,11 @@ fn default_work_dir() -> String { ".".to_string() } -pub fn default_batch_size() -> usize { - 128 * 1024 +pub fn default_user_io_batch_size() -> usize { + 1024 * 1024 } -fn default_prefetch_batch_size() -> usize { +pub fn default_prefetch_batch_size() -> usize { 1024 * 1024 } @@ -1363,8 +1363,9 @@ struct RafsConfig { /// Record file name if file access trace log. #[serde(default)] pub latest_read_files: bool, + // Amplified user IO request batch size to read data from remote storage backend / local cache. // ZERO value means, amplifying user io is not enabled. - #[serde(rename = "amplify_io", default = "default_batch_size")] + #[serde(rename = "amplify_io", default = "default_user_io_batch_size")] pub user_io_batch_size: usize, } @@ -1410,8 +1411,8 @@ struct FsPrefetchControl { #[serde(default = "default_prefetch_threads_count")] pub threads_count: usize, - /// Window size in unit of bytes to merge request to backend. - #[serde(rename = "merging_size", default = "default_batch_size")] + /// The amplify batch size to prefetch data from backend. + #[serde(rename = "merging_size", default = "default_prefetch_batch_size")] pub batch_size: usize, /// Network bandwidth limitation for prefetching. @@ -1449,7 +1450,7 @@ struct BlobPrefetchConfig { pub enable: bool, /// Number of data prefetching working threads. pub threads_count: usize, - /// The maximum size of a merged IO request. + /// The amplify batch size to prefetch data from backend. #[serde(rename = "merging_size")] pub batch_size: usize, /// Network bandwidth rate limit in unit of Bytes and Zero means no limit. diff --git a/docs/nydusd.md b/docs/nydusd.md index 20aa319449c..a536e4ea62b 100644 --- a/docs/nydusd.md +++ b/docs/nydusd.md @@ -130,6 +130,9 @@ We are working on enabling cloud-hypervisor support for nydus. "iostats_files": true, // Enable support of fs extended attributes "enable_xattr": false, + // Amplified user IO request batch size to read data from remote storage backend / local cache + // in unit of Bytes, valid values: 0-268435456, default: 1048576 + "amplify_io": 1048576, "fs_prefetch": { // Enable blob prefetch "enable": false, diff --git a/misc/configs/nydusd-config-v2.toml b/misc/configs/nydusd-config-v2.toml index ed33ec77c4d..30e8bffca64 100644 --- a/misc/configs/nydusd-config-v2.toml +++ b/misc/configs/nydusd-config-v2.toml @@ -142,7 +142,8 @@ bandwidth_limit = 10000000 [rafs] # Filesystem metadata cache mode, "direct" or "cached". "direct" is almost what you want. mode = "direct" -# Batch size to read data from storage cache layer, valid values: 0-0x10000000 +# Amplified user IO request batch size to read data from remote storage backend / local cache, +# valid values: 0-0x10000000 batch_size = 1000000 # Whether to validate data digest. validate = true diff --git a/service/src/fs_cache.rs b/service/src/fs_cache.rs index 31812aa09ac..f750c1a649a 100644 --- a/service/src/fs_cache.rs +++ b/service/src/fs_cache.rs @@ -518,8 +518,8 @@ impl FsCacheHandler { .map_err(|e| eother!(format!("failed to start prefetch worker, {}", e)))?; let size = match cache_cfg.prefetch.batch_size.checked_next_power_of_two() { - None => nydus_api::default_batch_size() as u64, - Some(1) => nydus_api::default_batch_size() as u64, + None => nydus_api::default_prefetch_batch_size() as u64, + Some(1) => nydus_api::default_prefetch_batch_size() as u64, Some(s) => s as u64, }; let size = std::cmp::max(0x4_0000u64, size); diff --git a/storage/src/cache/cachedfile.rs b/storage/src/cache/cachedfile.rs index 76e1f98a6ed..a18f8004fd2 100644 --- a/storage/src/cache/cachedfile.rs +++ b/storage/src/cache/cachedfile.rs @@ -164,7 +164,8 @@ pub(crate) struct FileCacheEntry { pub(crate) dio_enabled: bool, // Data from the file cache should be validated before use. pub(crate) need_validation: bool, - pub(crate) batch_size: u64, + // Amplified user IO request batch size to read data from remote storage backend / local cache. + pub(crate) user_io_batch_size: u32, pub(crate) prefetch_config: Arc, } @@ -308,11 +309,11 @@ impl FileCacheEntry { } } - fn ondemand_batch_size(&self) -> u64 { - if self.batch_size < 0x2_0000 { + fn user_io_batch_size(&self) -> u64 { + if self.user_io_batch_size < 0x2_0000 { 0x2_0000 } else { - self.batch_size + self.user_io_batch_size as u64 } } @@ -745,7 +746,7 @@ impl BlobObject for FileCacheEntry { let meta = self.meta.as_ref().ok_or_else(|| einval!())?; let meta = meta.get_blob_meta().ok_or_else(|| einval!())?; - let mut chunks = meta.get_chunks_uncompressed(offset, size, self.ondemand_batch_size())?; + let mut chunks = meta.get_chunks_uncompressed(offset, size, self.user_io_batch_size())?; if let Some(meta) = self.get_blob_meta_info()? { chunks = self.strip_ready_chunks(meta, None, chunks); } @@ -934,7 +935,7 @@ impl FileCacheEntry { fn read_iter(&self, bios: &mut [BlobIoDesc], buffers: &[FileVolatileSlice]) -> Result { // Merge requests with continuous blob addresses. let requests = self - .merge_requests_for_user(bios, self.ondemand_batch_size()) + .merge_requests_for_user(bios, self.user_io_batch_size()) .ok_or_else(|| { for bio in bios.iter() { self.update_chunk_pending_status(&bio.chunkinfo, false); @@ -1100,14 +1101,14 @@ impl FileCacheEntry { + region.chunks[idx].compressed_size() as u64; let start = region.chunks[idx + 1].compressed_offset(); assert!(end <= start); - assert!(start - end <= self.ondemand_batch_size() >> RAFS_BATCH_SIZE_TO_GAP_SHIFT); + assert!(start - end <= self.user_io_batch_size() >> RAFS_BATCH_SIZE_TO_GAP_SHIFT); assert!(region.chunks[idx].id() < region.chunks[idx + 1].id()); } } // Try to extend requests. let mut region_hold; - if let Some(v) = self.extend_pending_chunks(®ion.chunks, self.ondemand_batch_size())? { + if let Some(v) = self.extend_pending_chunks(®ion.chunks, self.user_io_batch_size())? { if v.len() > r.chunks.len() { let mut tag_set = HashSet::new(); for (idx, chunk) in region.chunks.iter().enumerate() { diff --git a/storage/src/cache/filecache/mod.rs b/storage/src/cache/filecache/mod.rs index 2b158ca09b1..d42ad388fcf 100644 --- a/storage/src/cache/filecache/mod.rs +++ b/storage/src/cache/filecache/mod.rs @@ -23,7 +23,6 @@ use crate::cache::state::{ use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; use crate::cache::{BlobCache, BlobCacheMgr}; use crate::device::{BlobFeatures, BlobInfo}; -use crate::RAFS_DEFAULT_CHUNK_SIZE; pub const BLOB_RAW_FILE_SUFFIX: &str = ".blob.raw"; pub const BLOB_DATA_FILE_SUFFIX: &str = ".blob.data"; @@ -46,6 +45,7 @@ pub struct FileCacheMgr { cache_convergent_encryption: bool, cache_encryption_key: String, closed: Arc, + user_io_batch_size: u32, } impl FileCacheMgr { @@ -55,6 +55,7 @@ impl FileCacheMgr { backend: Arc, runtime: Arc, id: &str, + user_io_batch_size: u32, ) -> Result { let blob_cfg = config.get_filecache_config()?; let work_dir = blob_cfg.get_work_dir()?; @@ -77,6 +78,7 @@ impl FileCacheMgr { cache_convergent_encryption: blob_cfg.enable_convergent_encryption, cache_encryption_key: blob_cfg.encryption_key.clone(), closed: Arc::new(AtomicBool::new(false)), + user_io_batch_size, }) } @@ -339,7 +341,7 @@ impl FileCacheEntry { is_zran, dio_enabled: false, need_validation, - batch_size: RAFS_DEFAULT_CHUNK_SIZE, + user_io_batch_size: mgr.user_io_batch_size, prefetch_config, }) } diff --git a/storage/src/cache/fscache/mod.rs b/storage/src/cache/fscache/mod.rs index 95aef041cac..5b2285c9b0e 100644 --- a/storage/src/cache/fscache/mod.rs +++ b/storage/src/cache/fscache/mod.rs @@ -20,7 +20,6 @@ use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; use crate::cache::{BlobCache, BlobCacheMgr}; use crate::device::{BlobFeatures, BlobInfo, BlobObject}; use crate::factory::BLOB_FACTORY; -use crate::RAFS_DEFAULT_CHUNK_SIZE; use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX; @@ -40,6 +39,7 @@ pub struct FsCacheMgr { need_validation: bool, blobs_check_count: Arc, closed: Arc, + user_io_batch_size: u32, } impl FsCacheMgr { @@ -49,6 +49,7 @@ impl FsCacheMgr { backend: Arc, runtime: Arc, id: &str, + user_io_batch_size: u32, ) -> Result { if config.cache_compressed { return Err(enosys!("fscache doesn't support compressed cache mode")); @@ -73,6 +74,7 @@ impl FsCacheMgr { need_validation: config.cache_validate, blobs_check_count: Arc::new(AtomicU8::new(0)), closed: Arc::new(AtomicBool::new(false)), + user_io_batch_size, }) } @@ -290,7 +292,7 @@ impl FileCacheEntry { is_zran, dio_enabled: true, need_validation, - batch_size: RAFS_DEFAULT_CHUNK_SIZE, + user_io_batch_size: mgr.user_io_batch_size, prefetch_config, }) } @@ -374,7 +376,7 @@ mod tests { use nydus_api::ConfigV2; use nydus_utils::{compress, metrics::BackendMetrics}; - use crate::{factory::ASYNC_RUNTIME, test::MockBackend}; + use crate::{factory::ASYNC_RUNTIME, test::MockBackend, RAFS_DEFAULT_CHUNK_SIZE}; use super::*; @@ -407,6 +409,7 @@ mod tests { Arc::new(backend), ASYNC_RUNTIME.clone(), &cfg.id, + 0, ) .unwrap(); assert!(mgr.init().is_ok()); diff --git a/storage/src/cache/worker.rs b/storage/src/cache/worker.rs index 98b68724391..d76f4fe251d 100644 --- a/storage/src/cache/worker.rs +++ b/storage/src/cache/worker.rs @@ -25,7 +25,7 @@ pub(crate) struct AsyncPrefetchConfig { pub enable: bool, /// Number of working threads. pub threads_count: usize, - /// Window size to merge/amplify requests. + /// The amplify batch size to prefetch data from backend. pub batch_size: usize, /// Network bandwidth for prefetch, in unit of Bytes and Zero means no rate limit is set. #[allow(unused)] diff --git a/storage/src/factory.rs b/storage/src/factory.rs index cc37a4e913c..ef74a129b8d 100644 --- a/storage/src/factory.rs +++ b/storage/src/factory.rs @@ -17,7 +17,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use lazy_static::lazy_static; -use nydus_api::{BackendConfigV2, ConfigV2}; +use nydus_api::{default_user_io_batch_size, BackendConfigV2, ConfigV2}; use tokio::runtime::{Builder, Runtime}; use tokio::time; @@ -117,6 +117,10 @@ impl BlobFactory { ) -> IOResult> { let backend_cfg = config.get_backend_config()?; let cache_cfg = config.get_cache_config()?; + let user_io_batch_size = config + .get_rafs_config() + .map_or_else(|_| default_user_io_batch_size(), |v| v.user_io_batch_size) + as u32; let key = BlobCacheMgrKey { config: config.clone(), }; @@ -128,7 +132,13 @@ impl BlobFactory { let backend = Self::new_backend(backend_cfg, &blob_info.blob_id())?; let mgr = match cache_cfg.cache_type.as_str() { "blobcache" | "filecache" => { - let mgr = FileCacheMgr::new(cache_cfg, backend, ASYNC_RUNTIME.clone(), &config.id)?; + let mgr = FileCacheMgr::new( + cache_cfg, + backend, + ASYNC_RUNTIME.clone(), + &config.id, + user_io_batch_size, + )?; mgr.init()?; Arc::new(mgr) as Arc } @@ -139,6 +149,7 @@ impl BlobFactory { backend, ASYNC_RUNTIME.clone(), &config.id, + user_io_batch_size, )?; mgr.init()?; Arc::new(mgr) as Arc