From 2dbd1e746202e9a5c208b05a41096dbcdaf09e91 Mon Sep 17 00:00:00 2001 From: Wenhao Ren Date: Mon, 6 Nov 2023 10:54:03 +0800 Subject: [PATCH] nydusd: add the config support of `amplify_io` Add the support of `amplify_io` in the config file of nydusd to configure read amplification. Signed-off-by: Wenhao Ren --- api/src/config.rs | 26 +++++++++++++++----------- docs/nydusd.md | 3 +++ misc/configs/nydusd-config-v2.toml | 3 ++- service/src/fs_cache.rs | 4 ++-- storage/src/cache/cachedfile.rs | 25 +++++++++++++------------ storage/src/cache/filecache/mod.rs | 6 ++++-- storage/src/cache/fscache/mod.rs | 6 ++++-- storage/src/cache/worker.rs | 2 +- storage/src/factory.rs | 15 +++++++++++++-- 9 files changed, 57 insertions(+), 33 deletions(-) diff --git a/api/src/config.rs b/api/src/config.rs index b9cc387b2eb..55106c35c04 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -819,8 +819,8 @@ pub struct RafsConfigV2 { /// Filesystem metadata cache mode. #[serde(default = "default_rafs_mode")] pub mode: String, - /// Batch size to read data from storage cache layer. - #[serde(rename = "batch_size", default = "default_batch_size")] + /// Amplified user IO request batch size to read data from remote storage backend / local cache. + #[serde(rename = "batch_size", default = "default_user_io_batch_size")] pub user_io_batch_size: usize, /// Whether to validate data digest. #[serde(default)] @@ -874,8 +874,8 @@ pub struct PrefetchConfigV2 { /// Number of data prefetching working threads. #[serde(rename = "threads", default = "default_prefetch_threads_count")] pub threads_count: usize, - /// The batch size to prefetch data from backend. - #[serde(rename = "batch_size", default = "default_prefetch_batch_size")] + /// The amplify batch size to prefetch data from backend. + #[serde(rename = "batch_size", default = "default_prefetch_request_batch_size")] pub prefetch_request_batch_size: usize, /// Network bandwidth rate limit in unit of Bytes and Zero means no limit. #[serde(default)] @@ -1194,11 +1194,11 @@ fn default_work_dir() -> String { ".".to_string() } -pub fn default_batch_size() -> usize { - 128 * 1024 +pub fn default_user_io_batch_size() -> usize { + 1024 * 1024 } -fn default_prefetch_batch_size() -> usize { +pub fn default_prefetch_request_batch_size() -> usize { 1024 * 1024 } @@ -1363,8 +1363,9 @@ struct RafsConfig { /// Record file name if file access trace log. #[serde(default)] pub latest_read_files: bool, + // Amplified user IO request batch size to read data from remote storage backend / local cache. // ZERO value means, amplifying user io is not enabled. - #[serde(rename = "amplify_io", default = "default_batch_size")] + #[serde(rename = "amplify_io", default = "default_user_io_batch_size")] pub user_io_batch_size: usize, } @@ -1410,8 +1411,11 @@ struct FsPrefetchControl { #[serde(default = "default_prefetch_threads_count")] pub threads_count: usize, - /// Window size in unit of bytes to merge request to backend. - #[serde(rename = "merging_size", default = "default_batch_size")] + /// The amplify batch size to prefetch data from backend. + #[serde( + rename = "merging_size", + default = "default_prefetch_request_batch_size" + )] pub prefetch_request_batch_size: usize, /// Network bandwidth limitation for prefetching. @@ -1449,7 +1453,7 @@ struct BlobPrefetchConfig { pub enable: bool, /// Number of data prefetching working threads. pub threads_count: usize, - /// The maximum size of a merged IO request. + /// The amplify batch size to prefetch data from backend. #[serde(rename = "merging_size")] pub prefetch_request_batch_size: usize, /// Network bandwidth rate limit in unit of Bytes and Zero means no limit. diff --git a/docs/nydusd.md b/docs/nydusd.md index 6697bb326f0..8e35fc05e5a 100644 --- a/docs/nydusd.md +++ b/docs/nydusd.md @@ -130,6 +130,9 @@ We are working on enabling cloud-hypervisor support for nydus. "iostats_files": true, // Enable support of fs extended attributes "enable_xattr": false, + // Amplified user IO request batch size to read data from remote storage backend / local cache + // in unit of Bytes, valid values: 0-268435456, default: 1048576 + "amplify_io": 1048576, "fs_prefetch": { // Enable blob prefetch "enable": false, diff --git a/misc/configs/nydusd-config-v2.toml b/misc/configs/nydusd-config-v2.toml index ed33ec77c4d..30e8bffca64 100644 --- a/misc/configs/nydusd-config-v2.toml +++ b/misc/configs/nydusd-config-v2.toml @@ -142,7 +142,8 @@ bandwidth_limit = 10000000 [rafs] # Filesystem metadata cache mode, "direct" or "cached". "direct" is almost what you want. mode = "direct" -# Batch size to read data from storage cache layer, valid values: 0-0x10000000 +# Amplified user IO request batch size to read data from remote storage backend / local cache, +# valid values: 0-0x10000000 batch_size = 1000000 # Whether to validate data digest. validate = true diff --git a/service/src/fs_cache.rs b/service/src/fs_cache.rs index 34958c9ff73..bacab138d0a 100644 --- a/service/src/fs_cache.rs +++ b/service/src/fs_cache.rs @@ -522,8 +522,8 @@ impl FsCacheHandler { .prefetch_request_batch_size .checked_next_power_of_two() { - None => nydus_api::default_batch_size() as u64, - Some(1) => nydus_api::default_batch_size() as u64, + None => nydus_api::default_prefetch_request_batch_size() as u64, + Some(1) => nydus_api::default_prefetch_request_batch_size() as u64, Some(s) => s as u64, }; let size = std::cmp::max(0x4_0000u64, size); diff --git a/storage/src/cache/cachedfile.rs b/storage/src/cache/cachedfile.rs index d9c8d32b612..c39cb12cbe0 100644 --- a/storage/src/cache/cachedfile.rs +++ b/storage/src/cache/cachedfile.rs @@ -164,7 +164,8 @@ pub(crate) struct FileCacheEntry { pub(crate) dio_enabled: bool, // Data from the file cache should be validated before use. pub(crate) need_validation: bool, - pub(crate) batch_size: u64, + // Amplified user IO request batch size to read data from remote storage backend / local cache. + pub(crate) user_io_batch_size: u32, pub(crate) prefetch_config: Arc, } @@ -300,7 +301,7 @@ impl FileCacheEntry { } } - fn prefetch_batch_size(&self) -> u64 { + fn prefetch_request_batch_size(&self) -> u64 { if self.prefetch_config.prefetch_request_batch_size < 0x2_0000 { 0x2_0000 } else { @@ -308,11 +309,11 @@ impl FileCacheEntry { } } - fn ondemand_batch_size(&self) -> u64 { - if self.batch_size < 0x2_0000 { + fn user_io_batch_size(&self) -> u64 { + if self.user_io_batch_size < 0x2_0000 { 0x2_0000 } else { - self.batch_size + self.user_io_batch_size as u64 } } @@ -559,7 +560,7 @@ impl BlobCache for FileCacheEntry { } // Then handle fs prefetch - let max_comp_size = self.prefetch_batch_size(); + let max_comp_size = self.prefetch_request_batch_size(); let mut bios = bios.to_vec(); bios.sort_unstable_by_key(|entry| entry.chunkinfo.compressed_offset()); self.metrics.prefetch_unmerged_chunks.add(bios.len() as u64); @@ -719,7 +720,7 @@ impl BlobObject for FileCacheEntry { let meta = self.meta.as_ref().ok_or_else(|| enoent!())?; let meta = meta.get_blob_meta().ok_or_else(|| einval!())?; let mut chunks = - meta.get_chunks_compressed(offset, size, self.prefetch_batch_size(), prefetch)?; + meta.get_chunks_compressed(offset, size, self.prefetch_request_batch_size(), prefetch)?; if !chunks.is_empty() { if let Some(meta) = self.get_blob_meta_info()? { chunks = self.strip_ready_chunks(meta, None, chunks); @@ -745,7 +746,7 @@ impl BlobObject for FileCacheEntry { let meta = self.meta.as_ref().ok_or_else(|| einval!())?; let meta = meta.get_blob_meta().ok_or_else(|| einval!())?; - let mut chunks = meta.get_chunks_uncompressed(offset, size, self.ondemand_batch_size())?; + let mut chunks = meta.get_chunks_uncompressed(offset, size, self.user_io_batch_size())?; if let Some(meta) = self.get_blob_meta_info()? { chunks = self.strip_ready_chunks(meta, None, chunks); } @@ -764,7 +765,7 @@ impl BlobObject for FileCacheEntry { let chunks_extended; let mut chunks = &range.chunks; - if let Some(v) = self.extend_pending_chunks(chunks, self.prefetch_batch_size())? { + if let Some(v) = self.extend_pending_chunks(chunks, self.prefetch_request_batch_size())? { chunks_extended = v; chunks = &chunks_extended; } @@ -934,7 +935,7 @@ impl FileCacheEntry { fn read_iter(&self, bios: &mut [BlobIoDesc], buffers: &[FileVolatileSlice]) -> Result { // Merge requests with continuous blob addresses. let requests = self - .merge_requests_for_user(bios, self.ondemand_batch_size()) + .merge_requests_for_user(bios, self.user_io_batch_size()) .ok_or_else(|| { for bio in bios.iter() { self.update_chunk_pending_status(&bio.chunkinfo, false); @@ -1100,14 +1101,14 @@ impl FileCacheEntry { + region.chunks[idx].compressed_size() as u64; let start = region.chunks[idx + 1].compressed_offset(); assert!(end <= start); - assert!(start - end <= self.ondemand_batch_size() >> RAFS_BATCH_SIZE_TO_GAP_SHIFT); + assert!(start - end <= self.user_io_batch_size() >> RAFS_BATCH_SIZE_TO_GAP_SHIFT); assert!(region.chunks[idx].id() < region.chunks[idx + 1].id()); } } // Try to extend requests. let mut region_hold; - if let Some(v) = self.extend_pending_chunks(®ion.chunks, self.ondemand_batch_size())? { + if let Some(v) = self.extend_pending_chunks(®ion.chunks, self.user_io_batch_size())? { if v.len() > r.chunks.len() { let mut tag_set = HashSet::new(); for (idx, chunk) in region.chunks.iter().enumerate() { diff --git a/storage/src/cache/filecache/mod.rs b/storage/src/cache/filecache/mod.rs index 2b158ca09b1..d42ad388fcf 100644 --- a/storage/src/cache/filecache/mod.rs +++ b/storage/src/cache/filecache/mod.rs @@ -23,7 +23,6 @@ use crate::cache::state::{ use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; use crate::cache::{BlobCache, BlobCacheMgr}; use crate::device::{BlobFeatures, BlobInfo}; -use crate::RAFS_DEFAULT_CHUNK_SIZE; pub const BLOB_RAW_FILE_SUFFIX: &str = ".blob.raw"; pub const BLOB_DATA_FILE_SUFFIX: &str = ".blob.data"; @@ -46,6 +45,7 @@ pub struct FileCacheMgr { cache_convergent_encryption: bool, cache_encryption_key: String, closed: Arc, + user_io_batch_size: u32, } impl FileCacheMgr { @@ -55,6 +55,7 @@ impl FileCacheMgr { backend: Arc, runtime: Arc, id: &str, + user_io_batch_size: u32, ) -> Result { let blob_cfg = config.get_filecache_config()?; let work_dir = blob_cfg.get_work_dir()?; @@ -77,6 +78,7 @@ impl FileCacheMgr { cache_convergent_encryption: blob_cfg.enable_convergent_encryption, cache_encryption_key: blob_cfg.encryption_key.clone(), closed: Arc::new(AtomicBool::new(false)), + user_io_batch_size, }) } @@ -339,7 +341,7 @@ impl FileCacheEntry { is_zran, dio_enabled: false, need_validation, - batch_size: RAFS_DEFAULT_CHUNK_SIZE, + user_io_batch_size: mgr.user_io_batch_size, prefetch_config, }) } diff --git a/storage/src/cache/fscache/mod.rs b/storage/src/cache/fscache/mod.rs index cf624f4f427..e9309fa2ef8 100644 --- a/storage/src/cache/fscache/mod.rs +++ b/storage/src/cache/fscache/mod.rs @@ -20,7 +20,6 @@ use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; use crate::cache::{BlobCache, BlobCacheMgr}; use crate::device::{BlobFeatures, BlobInfo, BlobObject}; use crate::factory::BLOB_FACTORY; -use crate::RAFS_DEFAULT_CHUNK_SIZE; use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX; @@ -40,6 +39,7 @@ pub struct FsCacheMgr { need_validation: bool, blobs_check_count: Arc, closed: Arc, + user_io_batch_size: u32, } impl FsCacheMgr { @@ -49,6 +49,7 @@ impl FsCacheMgr { backend: Arc, runtime: Arc, id: &str, + user_io_batch_size: u32, ) -> Result { if config.cache_compressed { return Err(enosys!("fscache doesn't support compressed cache mode")); @@ -73,6 +74,7 @@ impl FsCacheMgr { need_validation: config.cache_validate, blobs_check_count: Arc::new(AtomicU8::new(0)), closed: Arc::new(AtomicBool::new(false)), + user_io_batch_size, }) } @@ -290,7 +292,7 @@ impl FileCacheEntry { is_zran, dio_enabled: true, need_validation, - batch_size: RAFS_DEFAULT_CHUNK_SIZE, + user_io_batch_size: mgr.user_io_batch_size, prefetch_config, }) } diff --git a/storage/src/cache/worker.rs b/storage/src/cache/worker.rs index 14bc6d5eb17..0a3cb241ca6 100644 --- a/storage/src/cache/worker.rs +++ b/storage/src/cache/worker.rs @@ -25,7 +25,7 @@ pub(crate) struct AsyncPrefetchConfig { pub enable: bool, /// Number of working threads. pub threads_count: usize, - /// Window size to merge/amplify requests. + /// The amplify batch size to prefetch data from backend. pub prefetch_request_batch_size: usize, /// Network bandwidth for prefetch, in unit of Bytes and Zero means no rate limit is set. #[allow(unused)] diff --git a/storage/src/factory.rs b/storage/src/factory.rs index cc37a4e913c..ef74a129b8d 100644 --- a/storage/src/factory.rs +++ b/storage/src/factory.rs @@ -17,7 +17,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use lazy_static::lazy_static; -use nydus_api::{BackendConfigV2, ConfigV2}; +use nydus_api::{default_user_io_batch_size, BackendConfigV2, ConfigV2}; use tokio::runtime::{Builder, Runtime}; use tokio::time; @@ -117,6 +117,10 @@ impl BlobFactory { ) -> IOResult> { let backend_cfg = config.get_backend_config()?; let cache_cfg = config.get_cache_config()?; + let user_io_batch_size = config + .get_rafs_config() + .map_or_else(|_| default_user_io_batch_size(), |v| v.user_io_batch_size) + as u32; let key = BlobCacheMgrKey { config: config.clone(), }; @@ -128,7 +132,13 @@ impl BlobFactory { let backend = Self::new_backend(backend_cfg, &blob_info.blob_id())?; let mgr = match cache_cfg.cache_type.as_str() { "blobcache" | "filecache" => { - let mgr = FileCacheMgr::new(cache_cfg, backend, ASYNC_RUNTIME.clone(), &config.id)?; + let mgr = FileCacheMgr::new( + cache_cfg, + backend, + ASYNC_RUNTIME.clone(), + &config.id, + user_io_batch_size, + )?; mgr.init()?; Arc::new(mgr) as Arc } @@ -139,6 +149,7 @@ impl BlobFactory { backend, ASYNC_RUNTIME.clone(), &config.id, + user_io_batch_size, )?; mgr.init()?; Arc::new(mgr) as Arc