diff --git a/api/src/config.rs b/api/src/config.rs index 32c7541b117..c14387ae88d 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -684,7 +684,7 @@ impl CacheConfigV2 { if self.prefetch.batch_size > 0x10000000 { return false; } - if self.prefetch.threads == 0 || self.prefetch.threads > 1024 { + if self.prefetch.threads_count == 0 || self.prefetch.threads_count > 1024 { return false; } } @@ -820,8 +820,8 @@ pub struct RafsConfigV2 { #[serde(default = "default_rafs_mode")] pub mode: String, /// Batch size to read data from storage cache layer. - #[serde(default = "default_batch_size")] - pub batch_size: usize, + #[serde(rename = "batch_size", default = "default_batch_size")] + pub user_io_batch_size: usize, /// Whether to validate data digest. #[serde(default)] pub validate: bool, @@ -850,14 +850,14 @@ impl RafsConfigV2 { if self.mode != "direct" && self.mode != "cached" { return false; } - if self.batch_size > 0x10000000 { + if self.user_io_batch_size > 0x10000000 { return false; } if self.prefetch.enable { if self.prefetch.batch_size > 0x10000000 { return false; } - if self.prefetch.threads == 0 || self.prefetch.threads > 1024 { + if self.prefetch.threads_count == 0 || self.prefetch.threads_count > 1024 { return false; } } @@ -872,8 +872,8 @@ pub struct PrefetchConfigV2 { /// Whether to enable blob data prefetching. pub enable: bool, /// Number of data prefetching working threads. - #[serde(default = "default_prefetch_threads")] - pub threads: usize, + #[serde(rename = "threads", default = "default_prefetch_threads_count")] + pub threads_count: usize, /// The batch size to prefetch data from backend. #[serde(default = "default_prefetch_batch_size")] pub batch_size: usize, @@ -1202,7 +1202,7 @@ fn default_prefetch_batch_size() -> usize { 1024 * 1024 } -fn default_prefetch_threads() -> usize { +fn default_prefetch_threads_count() -> usize { 8 } @@ -1364,8 +1364,8 @@ struct RafsConfig { #[serde(default)] pub latest_read_files: bool, // ZERO value means, amplifying user io is not enabled. - #[serde(default = "default_batch_size")] - pub amplify_io: usize, + #[serde(rename = "amplify_io", default = "default_batch_size")] + pub user_io_batch_size: usize, } impl TryFrom for ConfigV2 { @@ -1376,7 +1376,7 @@ impl TryFrom for ConfigV2 { let mut cache: CacheConfigV2 = (&v.device.cache).try_into()?; let rafs = RafsConfigV2 { mode: v.mode, - batch_size: v.amplify_io, + user_io_batch_size: v.user_io_batch_size, validate: v.digest_validate, enable_xattr: v.enable_xattr, iostats_files: v.iostats_files, @@ -1407,23 +1407,23 @@ struct FsPrefetchControl { pub enable: bool, /// How many working threads to prefetch data. - #[serde(default = "default_prefetch_threads")] + #[serde(default = "default_prefetch_threads_count")] pub threads_count: usize, /// Window size in unit of bytes to merge request to backend. - #[serde(default = "default_batch_size")] - pub merging_size: usize, + #[serde(rename = "merging_size", default = "default_batch_size")] + pub batch_size: usize, /// Network bandwidth limitation for prefetching. /// /// In unit of Bytes. It sets a limit to prefetch bandwidth usage in order to /// reduce congestion with normal user IO. - /// bandwidth_rate == 0 -- prefetch bandwidth ratelimit disabled - /// bandwidth_rate > 0 -- prefetch bandwidth ratelimit enabled. + /// bandwidth_limit == 0 -- prefetch bandwidth ratelimit disabled + /// bandwidth_limit > 0 -- prefetch bandwidth ratelimit enabled. /// Please note that if the value is less than Rafs chunk size, /// it will be raised to the chunk size. - #[serde(default)] - pub bandwidth_rate: u32, + #[serde(default, rename = "bandwidth_rate")] + pub bandwidth_limit: u32, /// Whether to prefetch all filesystem data. #[serde(default = "default_prefetch_all")] @@ -1434,9 +1434,9 @@ impl From for PrefetchConfigV2 { fn from(v: FsPrefetchControl) -> Self { PrefetchConfigV2 { enable: v.enable, - threads: v.threads_count, - batch_size: v.merging_size, - bandwidth_limit: v.bandwidth_rate, + threads_count: v.threads_count, + batch_size: v.batch_size, + bandwidth_limit: v.bandwidth_limit, prefetch_all: v.prefetch_all, } } @@ -1450,18 +1450,20 @@ struct BlobPrefetchConfig { /// Number of data prefetching working threads. pub threads_count: usize, /// The maximum size of a merged IO request. - pub merging_size: usize, + #[serde(rename = "merging_size")] + pub batch_size: usize, /// Network bandwidth rate limit in unit of Bytes and Zero means no limit. - pub bandwidth_rate: u32, + #[serde(rename = "bandwidth_rate")] + pub bandwidth_limit: u32, } impl From<&BlobPrefetchConfig> for PrefetchConfigV2 { fn from(v: &BlobPrefetchConfig) -> Self { PrefetchConfigV2 { enable: v.enable, - threads: v.threads_count, - batch_size: v.merging_size, - bandwidth_limit: v.bandwidth_rate, + threads_count: v.threads_count, + batch_size: v.batch_size, + bandwidth_limit: v.bandwidth_limit, prefetch_all: true, } } @@ -1530,8 +1532,8 @@ mod tests { let config = BlobPrefetchConfig::default(); assert!(!config.enable); assert_eq!(config.threads_count, 0); - assert_eq!(config.merging_size, 0); - assert_eq!(config.bandwidth_rate, 0); + assert_eq!(config.batch_size, 0); + assert_eq!(config.bandwidth_limit, 0); let content = r#"{ "enable": true, @@ -1542,12 +1544,12 @@ mod tests { let config: BlobPrefetchConfig = serde_json::from_str(content).unwrap(); assert!(config.enable); assert_eq!(config.threads_count, 2); - assert_eq!(config.merging_size, 4); - assert_eq!(config.bandwidth_rate, 5); + assert_eq!(config.batch_size, 4); + assert_eq!(config.bandwidth_limit, 5); let config: PrefetchConfigV2 = (&config).into(); assert!(config.enable); - assert_eq!(config.threads, 2); + assert_eq!(config.threads_count, 2); assert_eq!(config.batch_size, 4); assert_eq!(config.bandwidth_limit, 5); assert!(config.prefetch_all); @@ -1618,7 +1620,7 @@ mod tests { assert!(blob_config.cache_config.is_object()); assert!(blob_config.prefetch_config.enable); assert_eq!(blob_config.prefetch_config.threads_count, 2); - assert_eq!(blob_config.prefetch_config.merging_size, 4); + assert_eq!(blob_config.prefetch_config.batch_size, 4); assert_eq!( blob_config.metadata_path.as_ref().unwrap().as_str(), "/tmp/metadata1" @@ -1630,7 +1632,7 @@ mod tests { assert_eq!(blob_config.cache.cache_type, "fscache"); assert!(blob_config.cache.fs_cache.is_some()); assert!(blob_config.cache.prefetch.enable); - assert_eq!(blob_config.cache.prefetch.threads, 2); + assert_eq!(blob_config.cache.prefetch.threads_count, 2); assert_eq!(blob_config.cache.prefetch.batch_size, 4); assert_eq!( blob_config.metadata_path.as_ref().unwrap().as_str(), @@ -1654,7 +1656,7 @@ mod tests { let blob_config = config.blob_config_legacy.as_ref().unwrap(); assert!(!blob_config.prefetch_config.enable); assert_eq!(blob_config.prefetch_config.threads_count, 0); - assert_eq!(blob_config.prefetch_config.merging_size, 0); + assert_eq!(blob_config.prefetch_config.batch_size, 0); } #[test] @@ -1967,7 +1969,7 @@ mod tests { let prefetch = &cache.prefetch; assert!(prefetch.enable); - assert_eq!(prefetch.threads, 8); + assert_eq!(prefetch.threads_count, 8); assert_eq!(prefetch.batch_size, 1000000); assert_eq!(prefetch.bandwidth_limit, 10000000); } @@ -1998,14 +2000,14 @@ mod tests { let rafs = config.rafs.as_ref().unwrap(); assert_eq!(&rafs.mode, "direct"); - assert_eq!(rafs.batch_size, 1000000); + assert_eq!(rafs.user_io_batch_size, 1000000); assert!(rafs.validate); assert!(rafs.enable_xattr); assert!(rafs.iostats_files); assert!(rafs.access_pattern); assert!(rafs.latest_read_files); assert!(rafs.prefetch.enable); - assert_eq!(rafs.prefetch.threads, 4); + assert_eq!(rafs.prefetch.threads_count, 4); assert_eq!(rafs.prefetch.batch_size, 1000000); assert_eq!(rafs.prefetch.bandwidth_limit, 10000000); assert!(rafs.prefetch.prefetch_all) @@ -2557,7 +2559,7 @@ mod tests { assert!(default_true()); assert_eq!(default_failure_limit(), 5); assert_eq!(default_prefetch_batch_size(), 1024 * 1024); - assert_eq!(default_prefetch_threads(), 8); + assert_eq!(default_prefetch_threads_count(), 8); } #[test] diff --git a/rafs/src/fs.rs b/rafs/src/fs.rs index 23f435733b6..c053d2e0c81 100644 --- a/rafs/src/fs.rs +++ b/rafs/src/fs.rs @@ -68,7 +68,7 @@ pub struct Rafs { fs_prefetch: bool, prefetch_all: bool, xattr_enabled: bool, - amplify_io: u32, + user_io_batch_size: u32, // static inode attributes i_uid: u32, @@ -102,7 +102,7 @@ impl Rafs { initialized: false, digest_validate: rafs_cfg.validate, fs_prefetch: rafs_cfg.prefetch.enable, - amplify_io: rafs_cfg.batch_size as u32, + user_io_batch_size: rafs_cfg.user_io_batch_size as u32, prefetch_all: rafs_cfg.prefetch.prefetch_all, xattr_enabled: rafs_cfg.enable_xattr, @@ -621,20 +621,21 @@ impl FileSystem for Rafs { assert!(!io_vecs.is_empty() && !io_vecs[0].is_empty()); // Try to amplify user io for Rafs v5, to improve performance. - let amplify_io = cmp::min(self.amplify_io as usize, w.available_bytes()) as u32; - if self.sb.meta.is_v5() && size < amplify_io { + let user_io_batch_size = + cmp::min(self.user_io_batch_size as usize, w.available_bytes()) as u32; + if self.sb.meta.is_v5() && size < user_io_batch_size { let all_chunks_ready = self.device.all_chunks_ready(&io_vecs); if !all_chunks_ready { let chunk_mask = self.metadata().chunk_size as u64 - 1; let next_chunk_base = (offset + (size as u64) + chunk_mask) & !chunk_mask; let window_base = cmp::min(next_chunk_base, inode_size); let actual_size = window_base - (offset & !chunk_mask); - if actual_size < amplify_io as u64 { - let window_size = amplify_io as u64 - actual_size; + if actual_size < user_io_batch_size as u64 { + let window_size = user_io_batch_size as u64 - actual_size; let orig_cnt = io_vecs.iter().fold(0, |s, d| s + d.len()); - self.sb.amplify_io( + self.sb.amplify_user_io( &self.device, - amplify_io, + user_io_batch_size, &mut io_vecs, &inode, window_base, @@ -1021,7 +1022,7 @@ mod tests { fs_prefetch: false, prefetch_all: false, xattr_enabled: false, - amplify_io: 0, + user_io_batch_size: 0, i_uid: 0, i_gid: 0, i_time: 0, diff --git a/rafs/src/metadata/md_v5.rs b/rafs/src/metadata/md_v5.rs index fd1362b2731..e0582808fea 100644 --- a/rafs/src/metadata/md_v5.rs +++ b/rafs/src/metadata/md_v5.rs @@ -128,7 +128,7 @@ impl RafsSuper { // not overlap user IO's chunk. // V5 rafs tries to amplify user IO by expanding more chunks to user IO and // expect that those chunks are likely to be continuous with user IO's chunks. - pub(crate) fn amplify_io( + pub(crate) fn amplify_user_io( &self, device: &BlobDevice, max_uncomp_size: u32, diff --git a/storage/src/cache/cachedfile.rs b/storage/src/cache/cachedfile.rs index 0fc96c9cdc2..76e1f98a6ed 100644 --- a/storage/src/cache/cachedfile.rs +++ b/storage/src/cache/cachedfile.rs @@ -301,10 +301,10 @@ impl FileCacheEntry { } fn prefetch_batch_size(&self) -> u64 { - if self.prefetch_config.merging_size < 0x2_0000 { + if self.prefetch_config.batch_size < 0x2_0000 { 0x2_0000 } else { - self.prefetch_config.merging_size as u64 + self.prefetch_config.batch_size as u64 } } diff --git a/storage/src/cache/worker.rs b/storage/src/cache/worker.rs index 295629546d4..98b68724391 100644 --- a/storage/src/cache/worker.rs +++ b/storage/src/cache/worker.rs @@ -26,19 +26,19 @@ pub(crate) struct AsyncPrefetchConfig { /// Number of working threads. pub threads_count: usize, /// Window size to merge/amplify requests. - pub merging_size: usize, + pub batch_size: usize, /// Network bandwidth for prefetch, in unit of Bytes and Zero means no rate limit is set. #[allow(unused)] - pub bandwidth_rate: u32, + pub bandwidth_limit: u32, } impl From<&PrefetchConfigV2> for AsyncPrefetchConfig { fn from(p: &PrefetchConfigV2) -> Self { AsyncPrefetchConfig { enable: p.enable, - threads_count: p.threads, - merging_size: p.batch_size, - bandwidth_rate: p.bandwidth_limit, + threads_count: p.threads_count, + batch_size: p.batch_size, + bandwidth_limit: p.bandwidth_limit, } } } @@ -97,7 +97,7 @@ impl AsyncWorkerMgr { prefetch_config: Arc, ) -> Result { #[cfg(feature = "prefetch-rate-limit")] - let prefetch_limiter = match prefetch_config.bandwidth_rate { + let prefetch_limiter = match prefetch_config.bandwidth_limit { 0 => None, v => { // If the given value is less than maximum blob chunk size, it exceeds burst size of the @@ -437,8 +437,8 @@ mod tests { let config = Arc::new(AsyncPrefetchConfig { enable: true, threads_count: 2, - merging_size: 0x100000, - bandwidth_rate: 0x100000, + batch_size: 0x100000, + bandwidth_limit: 0x100000, }); let mgr = Arc::new(AsyncWorkerMgr::new(metrics, config).unwrap()); @@ -477,8 +477,8 @@ mod tests { let config = Arc::new(AsyncPrefetchConfig { enable: true, threads_count: 4, - merging_size: 0x1000000, - bandwidth_rate: 0x1000000, + batch_size: 0x1000000, + bandwidth_limit: 0x1000000, }); let mgr = Arc::new(AsyncWorkerMgr::new(metrics, config).unwrap());