From cf3967352e1be79b9a3b9661bac09c90cc8d3622 Mon Sep 17 00:00:00 2001 From: Wenhao Ren Date: Mon, 6 Nov 2023 10:54:27 +0800 Subject: [PATCH] rafs: rename variable names about prefetch configuration Variable names about prefetch are confused currently. So we merge variable names that have the same meaning, while DO NOT affect the field names read from the configuration file. Signed-off-by: Wenhao Ren --- api/src/config.rs | 92 +++++++++++++++++---------------- rafs/src/fs.rs | 17 +++--- rafs/src/metadata/md_v5.rs | 2 +- service/src/fs_cache.rs | 6 ++- src/bin/nydusctl/commands.rs | 2 +- storage/src/cache/cachedfile.rs | 4 +- storage/src/cache/worker.rs | 20 +++---- 7 files changed, 75 insertions(+), 68 deletions(-) diff --git a/api/src/config.rs b/api/src/config.rs index 589e4519cf2..b9cc387b2eb 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -681,10 +681,10 @@ impl CacheConfigV2 { } if self.prefetch.enable { - if self.prefetch.batch_size > 0x10000000 { + if self.prefetch.prefetch_request_batch_size > 0x10000000 { return false; } - if self.prefetch.threads == 0 || self.prefetch.threads > 1024 { + if self.prefetch.threads_count == 0 || self.prefetch.threads_count > 1024 { return false; } } @@ -820,8 +820,8 @@ pub struct RafsConfigV2 { #[serde(default = "default_rafs_mode")] pub mode: String, /// Batch size to read data from storage cache layer. - #[serde(default = "default_batch_size")] - pub batch_size: usize, + #[serde(rename = "batch_size", default = "default_batch_size")] + pub user_io_batch_size: usize, /// Whether to validate data digest. #[serde(default)] pub validate: bool, @@ -850,14 +850,14 @@ impl RafsConfigV2 { if self.mode != "direct" && self.mode != "cached" { return false; } - if self.batch_size > 0x10000000 { + if self.user_io_batch_size > 0x10000000 { return false; } if self.prefetch.enable { - if self.prefetch.batch_size > 0x10000000 { + if self.prefetch.prefetch_request_batch_size > 0x10000000 { return false; } - if self.prefetch.threads == 0 || self.prefetch.threads > 1024 { + if self.prefetch.threads_count == 0 || self.prefetch.threads_count > 1024 { return false; } } @@ -872,11 +872,11 @@ pub struct PrefetchConfigV2 { /// Whether to enable blob data prefetching. pub enable: bool, /// Number of data prefetching working threads. - #[serde(default = "default_prefetch_threads")] - pub threads: usize, + #[serde(rename = "threads", default = "default_prefetch_threads_count")] + pub threads_count: usize, /// The batch size to prefetch data from backend. - #[serde(default = "default_prefetch_batch_size")] - pub batch_size: usize, + #[serde(rename = "batch_size", default = "default_prefetch_batch_size")] + pub prefetch_request_batch_size: usize, /// Network bandwidth rate limit in unit of Bytes and Zero means no limit. #[serde(default)] pub bandwidth_limit: u32, @@ -1202,7 +1202,7 @@ fn default_prefetch_batch_size() -> usize { 1024 * 1024 } -fn default_prefetch_threads() -> usize { +fn default_prefetch_threads_count() -> usize { 8 } @@ -1364,8 +1364,8 @@ struct RafsConfig { #[serde(default)] pub latest_read_files: bool, // ZERO value means, amplifying user io is not enabled. - #[serde(default = "default_batch_size")] - pub amplify_io: usize, + #[serde(rename = "amplify_io", default = "default_batch_size")] + pub user_io_batch_size: usize, } impl TryFrom for ConfigV2 { @@ -1376,7 +1376,7 @@ impl TryFrom for ConfigV2 { let mut cache: CacheConfigV2 = (&v.device.cache).try_into()?; let rafs = RafsConfigV2 { mode: v.mode, - batch_size: v.amplify_io, + user_io_batch_size: v.user_io_batch_size, validate: v.digest_validate, enable_xattr: v.enable_xattr, iostats_files: v.iostats_files, @@ -1407,23 +1407,23 @@ struct FsPrefetchControl { pub enable: bool, /// How many working threads to prefetch data. - #[serde(default = "default_prefetch_threads")] + #[serde(default = "default_prefetch_threads_count")] pub threads_count: usize, /// Window size in unit of bytes to merge request to backend. - #[serde(default = "default_batch_size")] - pub merging_size: usize, + #[serde(rename = "merging_size", default = "default_batch_size")] + pub prefetch_request_batch_size: usize, /// Network bandwidth limitation for prefetching. /// /// In unit of Bytes. It sets a limit to prefetch bandwidth usage in order to /// reduce congestion with normal user IO. - /// bandwidth_rate == 0 -- prefetch bandwidth ratelimit disabled - /// bandwidth_rate > 0 -- prefetch bandwidth ratelimit enabled. + /// bandwidth_limit == 0 -- prefetch bandwidth ratelimit disabled + /// bandwidth_limit > 0 -- prefetch bandwidth ratelimit enabled. /// Please note that if the value is less than Rafs chunk size, /// it will be raised to the chunk size. - #[serde(default)] - pub bandwidth_rate: u32, + #[serde(default, rename = "bandwidth_rate")] + pub bandwidth_limit: u32, /// Whether to prefetch all filesystem data. #[serde(default = "default_prefetch_all")] @@ -1434,9 +1434,9 @@ impl From for PrefetchConfigV2 { fn from(v: FsPrefetchControl) -> Self { PrefetchConfigV2 { enable: v.enable, - threads: v.threads_count, - batch_size: v.merging_size, - bandwidth_limit: v.bandwidth_rate, + threads_count: v.threads_count, + prefetch_request_batch_size: v.prefetch_request_batch_size, + bandwidth_limit: v.bandwidth_limit, prefetch_all: v.prefetch_all, } } @@ -1450,18 +1450,20 @@ struct BlobPrefetchConfig { /// Number of data prefetching working threads. pub threads_count: usize, /// The maximum size of a merged IO request. - pub merging_size: usize, + #[serde(rename = "merging_size")] + pub prefetch_request_batch_size: usize, /// Network bandwidth rate limit in unit of Bytes and Zero means no limit. - pub bandwidth_rate: u32, + #[serde(rename = "bandwidth_rate")] + pub bandwidth_limit: u32, } impl From<&BlobPrefetchConfig> for PrefetchConfigV2 { fn from(v: &BlobPrefetchConfig) -> Self { PrefetchConfigV2 { enable: v.enable, - threads: v.threads_count, - batch_size: v.merging_size, - bandwidth_limit: v.bandwidth_rate, + threads_count: v.threads_count, + prefetch_request_batch_size: v.prefetch_request_batch_size, + bandwidth_limit: v.bandwidth_limit, prefetch_all: true, } } @@ -1530,8 +1532,8 @@ mod tests { let config = BlobPrefetchConfig::default(); assert!(!config.enable); assert_eq!(config.threads_count, 0); - assert_eq!(config.merging_size, 0); - assert_eq!(config.bandwidth_rate, 0); + assert_eq!(config.prefetch_request_batch_size, 0); + assert_eq!(config.bandwidth_limit, 0); let content = r#"{ "enable": true, @@ -1542,13 +1544,13 @@ mod tests { let config: BlobPrefetchConfig = serde_json::from_str(content).unwrap(); assert!(config.enable); assert_eq!(config.threads_count, 2); - assert_eq!(config.merging_size, 4); - assert_eq!(config.bandwidth_rate, 5); + assert_eq!(config.prefetch_request_batch_size, 4); + assert_eq!(config.bandwidth_limit, 5); let config: PrefetchConfigV2 = (&config).into(); assert!(config.enable); - assert_eq!(config.threads, 2); - assert_eq!(config.batch_size, 4); + assert_eq!(config.threads_count, 2); + assert_eq!(config.prefetch_request_batch_size, 4); assert_eq!(config.bandwidth_limit, 5); assert!(config.prefetch_all); } @@ -1618,7 +1620,7 @@ mod tests { assert!(blob_config.cache_config.is_object()); assert!(blob_config.prefetch_config.enable); assert_eq!(blob_config.prefetch_config.threads_count, 2); - assert_eq!(blob_config.prefetch_config.merging_size, 4); + assert_eq!(blob_config.prefetch_config.prefetch_request_batch_size, 4); assert_eq!( blob_config.metadata_path.as_ref().unwrap().as_str(), "/tmp/metadata1" @@ -1630,8 +1632,8 @@ mod tests { assert_eq!(blob_config.cache.cache_type, "fscache"); assert!(blob_config.cache.fs_cache.is_some()); assert!(blob_config.cache.prefetch.enable); - assert_eq!(blob_config.cache.prefetch.threads, 2); - assert_eq!(blob_config.cache.prefetch.batch_size, 4); + assert_eq!(blob_config.cache.prefetch.threads_count, 2); + assert_eq!(blob_config.cache.prefetch.prefetch_request_batch_size, 4); assert_eq!( blob_config.metadata_path.as_ref().unwrap().as_str(), "/tmp/metadata1" @@ -1654,7 +1656,7 @@ mod tests { let blob_config = config.blob_config_legacy.as_ref().unwrap(); assert!(!blob_config.prefetch_config.enable); assert_eq!(blob_config.prefetch_config.threads_count, 0); - assert_eq!(blob_config.prefetch_config.merging_size, 0); + assert_eq!(blob_config.prefetch_config.prefetch_request_batch_size, 0); } #[test] @@ -1967,8 +1969,8 @@ mod tests { let prefetch = &cache.prefetch; assert!(prefetch.enable); - assert_eq!(prefetch.threads, 8); - assert_eq!(prefetch.batch_size, 1000000); + assert_eq!(prefetch.threads_count, 8); + assert_eq!(prefetch.prefetch_request_batch_size, 1000000); assert_eq!(prefetch.bandwidth_limit, 10000000); } @@ -1998,15 +2000,15 @@ mod tests { let rafs = config.rafs.as_ref().unwrap(); assert_eq!(&rafs.mode, "direct"); - assert_eq!(rafs.batch_size, 1000000); + assert_eq!(rafs.user_io_batch_size, 1000000); assert!(rafs.validate); assert!(rafs.enable_xattr); assert!(rafs.iostats_files); assert!(rafs.access_pattern); assert!(rafs.latest_read_files); assert!(rafs.prefetch.enable); - assert_eq!(rafs.prefetch.threads, 4); - assert_eq!(rafs.prefetch.batch_size, 1000000); + assert_eq!(rafs.prefetch.threads_count, 4); + assert_eq!(rafs.prefetch.prefetch_request_batch_size, 1000000); assert_eq!(rafs.prefetch.bandwidth_limit, 10000000); assert!(rafs.prefetch.prefetch_all) } diff --git a/rafs/src/fs.rs b/rafs/src/fs.rs index 1d1627c94f7..e32eb3eb1ab 100644 --- a/rafs/src/fs.rs +++ b/rafs/src/fs.rs @@ -68,7 +68,7 @@ pub struct Rafs { fs_prefetch: bool, prefetch_all: bool, xattr_enabled: bool, - amplify_io: u32, + user_io_batch_size: u32, // static inode attributes i_uid: u32, @@ -102,7 +102,7 @@ impl Rafs { initialized: false, digest_validate: rafs_cfg.validate, fs_prefetch: rafs_cfg.prefetch.enable, - amplify_io: rafs_cfg.batch_size as u32, + user_io_batch_size: rafs_cfg.user_io_batch_size as u32, prefetch_all: rafs_cfg.prefetch.prefetch_all, xattr_enabled: rafs_cfg.enable_xattr, @@ -621,20 +621,21 @@ impl FileSystem for Rafs { assert!(!io_vecs.is_empty() && !io_vecs[0].is_empty()); // Try to amplify user io for Rafs v5, to improve performance. - let amplify_io = cmp::min(self.amplify_io as usize, w.available_bytes()) as u32; - if self.sb.meta.is_v5() && size < amplify_io { + let user_io_batch_size = + cmp::min(self.user_io_batch_size as usize, w.available_bytes()) as u32; + if self.sb.meta.is_v5() && size < user_io_batch_size { let all_chunks_ready = self.device.all_chunks_ready(&io_vecs); if !all_chunks_ready { let chunk_mask = self.metadata().chunk_size as u64 - 1; let next_chunk_base = (offset + (size as u64) + chunk_mask) & !chunk_mask; let window_base = cmp::min(next_chunk_base, inode_size); let actual_size = window_base - (offset & !chunk_mask); - if actual_size < amplify_io as u64 { - let window_size = amplify_io as u64 - actual_size; + if actual_size < user_io_batch_size as u64 { + let window_size = user_io_batch_size as u64 - actual_size; let orig_cnt = io_vecs.iter().fold(0, |s, d| s + d.len()); - self.sb.amplify_io( + self.sb.amplify_user_io( &self.device, - amplify_io, + user_io_batch_size, &mut io_vecs, &inode, window_base, diff --git a/rafs/src/metadata/md_v5.rs b/rafs/src/metadata/md_v5.rs index 039dcd05811..4972317c664 100644 --- a/rafs/src/metadata/md_v5.rs +++ b/rafs/src/metadata/md_v5.rs @@ -128,7 +128,7 @@ impl RafsSuper { // not overlap user IO's chunk. // V5 rafs tries to amplify user IO by expanding more chunks to user IO and // expect that those chunks are likely to be continuous with user IO's chunks. - pub(crate) fn amplify_io( + pub(crate) fn amplify_user_io( &self, device: &BlobDevice, max_uncomp_size: u32, diff --git a/service/src/fs_cache.rs b/service/src/fs_cache.rs index 31812aa09ac..34958c9ff73 100644 --- a/service/src/fs_cache.rs +++ b/service/src/fs_cache.rs @@ -517,7 +517,11 @@ impl FsCacheHandler { blob.start_prefetch() .map_err(|e| eother!(format!("failed to start prefetch worker, {}", e)))?; - let size = match cache_cfg.prefetch.batch_size.checked_next_power_of_two() { + let size = match cache_cfg + .prefetch + .prefetch_request_batch_size + .checked_next_power_of_two() + { None => nydus_api::default_batch_size() as u64, Some(1) => nydus_api::default_batch_size() as u64, Some(s) => s as u64, diff --git a/src/bin/nydusctl/commands.rs b/src/bin/nydusctl/commands.rs index 036518540cc..5372f415e1a 100644 --- a/src/bin/nydusctl/commands.rs +++ b/src/bin/nydusctl/commands.rs @@ -429,7 +429,7 @@ Commit: {git_commit} println!("\tPrefetch: {}", cache_cfg.prefetch.enable); println!( "\tPrefetch Merging Size: {}", - cache_cfg.prefetch.batch_size + cache_cfg.prefetch.prefetch_request_batch_size ); println!(); } diff --git a/storage/src/cache/cachedfile.rs b/storage/src/cache/cachedfile.rs index 0fc96c9cdc2..d9c8d32b612 100644 --- a/storage/src/cache/cachedfile.rs +++ b/storage/src/cache/cachedfile.rs @@ -301,10 +301,10 @@ impl FileCacheEntry { } fn prefetch_batch_size(&self) -> u64 { - if self.prefetch_config.merging_size < 0x2_0000 { + if self.prefetch_config.prefetch_request_batch_size < 0x2_0000 { 0x2_0000 } else { - self.prefetch_config.merging_size as u64 + self.prefetch_config.prefetch_request_batch_size as u64 } } diff --git a/storage/src/cache/worker.rs b/storage/src/cache/worker.rs index 295629546d4..14bc6d5eb17 100644 --- a/storage/src/cache/worker.rs +++ b/storage/src/cache/worker.rs @@ -26,19 +26,19 @@ pub(crate) struct AsyncPrefetchConfig { /// Number of working threads. pub threads_count: usize, /// Window size to merge/amplify requests. - pub merging_size: usize, + pub prefetch_request_batch_size: usize, /// Network bandwidth for prefetch, in unit of Bytes and Zero means no rate limit is set. #[allow(unused)] - pub bandwidth_rate: u32, + pub bandwidth_limit: u32, } impl From<&PrefetchConfigV2> for AsyncPrefetchConfig { fn from(p: &PrefetchConfigV2) -> Self { AsyncPrefetchConfig { enable: p.enable, - threads_count: p.threads, - merging_size: p.batch_size, - bandwidth_rate: p.bandwidth_limit, + threads_count: p.threads_count, + prefetch_request_batch_size: p.prefetch_request_batch_size, + bandwidth_limit: p.bandwidth_limit, } } } @@ -97,7 +97,7 @@ impl AsyncWorkerMgr { prefetch_config: Arc, ) -> Result { #[cfg(feature = "prefetch-rate-limit")] - let prefetch_limiter = match prefetch_config.bandwidth_rate { + let prefetch_limiter = match prefetch_config.bandwidth_limit { 0 => None, v => { // If the given value is less than maximum blob chunk size, it exceeds burst size of the @@ -437,8 +437,8 @@ mod tests { let config = Arc::new(AsyncPrefetchConfig { enable: true, threads_count: 2, - merging_size: 0x100000, - bandwidth_rate: 0x100000, + prefetch_request_batch_size: 0x100000, + bandwidth_limit: 0x100000, }); let mgr = Arc::new(AsyncWorkerMgr::new(metrics, config).unwrap()); @@ -477,8 +477,8 @@ mod tests { let config = Arc::new(AsyncPrefetchConfig { enable: true, threads_count: 4, - merging_size: 0x1000000, - bandwidth_rate: 0x1000000, + prefetch_request_batch_size: 0x1000000, + bandwidth_limit: 0x1000000, }); let mgr = Arc::new(AsyncWorkerMgr::new(metrics, config).unwrap());