Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nydusd: add the config support of amplify_io #1452

Merged
merged 2 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 48 additions & 45 deletions api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ impl CacheConfigV2 {
if self.prefetch.batch_size > 0x10000000 {
return false;
}
if self.prefetch.threads == 0 || self.prefetch.threads > 1024 {
if self.prefetch.threads_count == 0 || self.prefetch.threads_count > 1024 {
return false;
}
}
Expand Down Expand Up @@ -819,9 +819,9 @@ pub struct RafsConfigV2 {
/// Filesystem metadata cache mode.
#[serde(default = "default_rafs_mode")]
pub mode: String,
/// Batch size to read data from storage cache layer.
#[serde(default = "default_batch_size")]
pub batch_size: usize,
/// Amplified user IO request batch size to read data from remote storage backend / local cache.
#[serde(rename = "batch_size", default = "default_user_io_batch_size")]
imeoer marked this conversation as resolved.
Show resolved Hide resolved
pub user_io_batch_size: usize,
/// Whether to validate data digest.
#[serde(default)]
pub validate: bool,
Expand Down Expand Up @@ -850,14 +850,14 @@ impl RafsConfigV2 {
if self.mode != "direct" && self.mode != "cached" {
return false;
}
if self.batch_size > 0x10000000 {
if self.user_io_batch_size > 0x10000000 {
return false;
}
if self.prefetch.enable {
if self.prefetch.batch_size > 0x10000000 {
return false;
}
if self.prefetch.threads == 0 || self.prefetch.threads > 1024 {
if self.prefetch.threads_count == 0 || self.prefetch.threads_count > 1024 {
return false;
}
}
Expand All @@ -872,9 +872,9 @@ pub struct PrefetchConfigV2 {
/// Whether to enable blob data prefetching.
pub enable: bool,
/// Number of data prefetching working threads.
#[serde(default = "default_prefetch_threads")]
pub threads: usize,
/// The batch size to prefetch data from backend.
#[serde(rename = "threads", default = "default_prefetch_threads_count")]
pub threads_count: usize,
/// The amplify batch size to prefetch data from backend.
#[serde(default = "default_prefetch_batch_size")]
pub batch_size: usize,
/// Network bandwidth rate limit in unit of Bytes and Zero means no limit.
Expand Down Expand Up @@ -1194,15 +1194,15 @@ fn default_work_dir() -> String {
".".to_string()
}

pub fn default_batch_size() -> usize {
128 * 1024
pub fn default_user_io_batch_size() -> usize {
1024 * 1024
ccx1024cc marked this conversation as resolved.
Show resolved Hide resolved
}

fn default_prefetch_batch_size() -> usize {
pub fn default_prefetch_batch_size() -> usize {
1024 * 1024
}

fn default_prefetch_threads() -> usize {
fn default_prefetch_threads_count() -> usize {
8
}

Expand Down Expand Up @@ -1363,9 +1363,10 @@ struct RafsConfig {
/// Record file name if file access trace log.
#[serde(default)]
pub latest_read_files: bool,
// Amplified user IO request batch size to read data from remote storage backend / local cache.
// ZERO value means, amplifying user io is not enabled.
#[serde(default = "default_batch_size")]
pub amplify_io: usize,
#[serde(rename = "amplify_io", default = "default_user_io_batch_size")]
pub user_io_batch_size: usize,
}

impl TryFrom<RafsConfig> for ConfigV2 {
Expand All @@ -1376,7 +1377,7 @@ impl TryFrom<RafsConfig> for ConfigV2 {
let mut cache: CacheConfigV2 = (&v.device.cache).try_into()?;
let rafs = RafsConfigV2 {
mode: v.mode,
batch_size: v.amplify_io,
user_io_batch_size: v.user_io_batch_size,
validate: v.digest_validate,
enable_xattr: v.enable_xattr,
iostats_files: v.iostats_files,
Expand Down Expand Up @@ -1407,23 +1408,23 @@ struct FsPrefetchControl {
pub enable: bool,

/// How many working threads to prefetch data.
#[serde(default = "default_prefetch_threads")]
#[serde(default = "default_prefetch_threads_count")]
pub threads_count: usize,

/// Window size in unit of bytes to merge request to backend.
#[serde(default = "default_batch_size")]
pub merging_size: usize,
/// The amplify batch size to prefetch data from backend.
#[serde(rename = "merging_size", default = "default_prefetch_batch_size")]
pub batch_size: usize,

/// Network bandwidth limitation for prefetching.
///
/// In unit of Bytes. It sets a limit to prefetch bandwidth usage in order to
/// reduce congestion with normal user IO.
/// bandwidth_rate == 0 -- prefetch bandwidth ratelimit disabled
/// bandwidth_rate > 0 -- prefetch bandwidth ratelimit enabled.
/// bandwidth_limit == 0 -- prefetch bandwidth ratelimit disabled
/// bandwidth_limit > 0 -- prefetch bandwidth ratelimit enabled.
/// Please note that if the value is less than Rafs chunk size,
/// it will be raised to the chunk size.
#[serde(default)]
pub bandwidth_rate: u32,
#[serde(default, rename = "bandwidth_rate")]
pub bandwidth_limit: u32,

/// Whether to prefetch all filesystem data.
#[serde(default = "default_prefetch_all")]
Expand All @@ -1434,9 +1435,9 @@ impl From<FsPrefetchControl> for PrefetchConfigV2 {
fn from(v: FsPrefetchControl) -> Self {
PrefetchConfigV2 {
enable: v.enable,
threads: v.threads_count,
batch_size: v.merging_size,
bandwidth_limit: v.bandwidth_rate,
threads_count: v.threads_count,
batch_size: v.batch_size,
bandwidth_limit: v.bandwidth_limit,
prefetch_all: v.prefetch_all,
}
}
Expand All @@ -1449,19 +1450,21 @@ struct BlobPrefetchConfig {
pub enable: bool,
/// Number of data prefetching working threads.
pub threads_count: usize,
/// The maximum size of a merged IO request.
pub merging_size: usize,
/// The amplify batch size to prefetch data from backend.
#[serde(rename = "merging_size")]
pub batch_size: usize,
/// Network bandwidth rate limit in unit of Bytes and Zero means no limit.
pub bandwidth_rate: u32,
#[serde(rename = "bandwidth_rate")]
pub bandwidth_limit: u32,
}

impl From<&BlobPrefetchConfig> for PrefetchConfigV2 {
fn from(v: &BlobPrefetchConfig) -> Self {
PrefetchConfigV2 {
enable: v.enable,
threads: v.threads_count,
batch_size: v.merging_size,
bandwidth_limit: v.bandwidth_rate,
threads_count: v.threads_count,
batch_size: v.batch_size,
bandwidth_limit: v.bandwidth_limit,
prefetch_all: true,
}
}
Expand Down Expand Up @@ -1530,8 +1533,8 @@ mod tests {
let config = BlobPrefetchConfig::default();
assert!(!config.enable);
assert_eq!(config.threads_count, 0);
assert_eq!(config.merging_size, 0);
assert_eq!(config.bandwidth_rate, 0);
assert_eq!(config.batch_size, 0);
assert_eq!(config.bandwidth_limit, 0);

let content = r#"{
"enable": true,
Expand All @@ -1542,12 +1545,12 @@ mod tests {
let config: BlobPrefetchConfig = serde_json::from_str(content).unwrap();
assert!(config.enable);
assert_eq!(config.threads_count, 2);
assert_eq!(config.merging_size, 4);
assert_eq!(config.bandwidth_rate, 5);
assert_eq!(config.batch_size, 4);
assert_eq!(config.bandwidth_limit, 5);

let config: PrefetchConfigV2 = (&config).into();
assert!(config.enable);
assert_eq!(config.threads, 2);
assert_eq!(config.threads_count, 2);
assert_eq!(config.batch_size, 4);
assert_eq!(config.bandwidth_limit, 5);
assert!(config.prefetch_all);
Expand Down Expand Up @@ -1618,7 +1621,7 @@ mod tests {
assert!(blob_config.cache_config.is_object());
assert!(blob_config.prefetch_config.enable);
assert_eq!(blob_config.prefetch_config.threads_count, 2);
assert_eq!(blob_config.prefetch_config.merging_size, 4);
assert_eq!(blob_config.prefetch_config.batch_size, 4);
assert_eq!(
blob_config.metadata_path.as_ref().unwrap().as_str(),
"/tmp/metadata1"
Expand All @@ -1630,7 +1633,7 @@ mod tests {
assert_eq!(blob_config.cache.cache_type, "fscache");
assert!(blob_config.cache.fs_cache.is_some());
assert!(blob_config.cache.prefetch.enable);
assert_eq!(blob_config.cache.prefetch.threads, 2);
assert_eq!(blob_config.cache.prefetch.threads_count, 2);
assert_eq!(blob_config.cache.prefetch.batch_size, 4);
assert_eq!(
blob_config.metadata_path.as_ref().unwrap().as_str(),
Expand All @@ -1654,7 +1657,7 @@ mod tests {
let blob_config = config.blob_config_legacy.as_ref().unwrap();
assert!(!blob_config.prefetch_config.enable);
assert_eq!(blob_config.prefetch_config.threads_count, 0);
assert_eq!(blob_config.prefetch_config.merging_size, 0);
assert_eq!(blob_config.prefetch_config.batch_size, 0);
}

#[test]
Expand Down Expand Up @@ -1967,7 +1970,7 @@ mod tests {

let prefetch = &cache.prefetch;
assert!(prefetch.enable);
assert_eq!(prefetch.threads, 8);
assert_eq!(prefetch.threads_count, 8);
assert_eq!(prefetch.batch_size, 1000000);
assert_eq!(prefetch.bandwidth_limit, 10000000);
}
Expand Down Expand Up @@ -1998,14 +2001,14 @@ mod tests {

let rafs = config.rafs.as_ref().unwrap();
assert_eq!(&rafs.mode, "direct");
assert_eq!(rafs.batch_size, 1000000);
assert_eq!(rafs.user_io_batch_size, 1000000);
assert!(rafs.validate);
assert!(rafs.enable_xattr);
assert!(rafs.iostats_files);
assert!(rafs.access_pattern);
assert!(rafs.latest_read_files);
assert!(rafs.prefetch.enable);
assert_eq!(rafs.prefetch.threads, 4);
assert_eq!(rafs.prefetch.threads_count, 4);
assert_eq!(rafs.prefetch.batch_size, 1000000);
assert_eq!(rafs.prefetch.bandwidth_limit, 10000000);
assert!(rafs.prefetch.prefetch_all)
Expand Down Expand Up @@ -2557,7 +2560,7 @@ mod tests {
assert!(default_true());
assert_eq!(default_failure_limit(), 5);
assert_eq!(default_prefetch_batch_size(), 1024 * 1024);
assert_eq!(default_prefetch_threads(), 8);
assert_eq!(default_prefetch_threads_count(), 8);
}

#[test]
Expand Down
3 changes: 3 additions & 0 deletions docs/nydusd.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ We are working on enabling cloud-hypervisor support for nydus.
"iostats_files": true,
// Enable support of fs extended attributes
"enable_xattr": false,
// Amplified user IO request batch size to read data from remote storage backend / local cache
// in unit of Bytes, valid values: 0-268435456, default: 1048576
"amplify_io": 1048576,
"fs_prefetch": {
// Enable blob prefetch
"enable": false,
Expand Down
3 changes: 2 additions & 1 deletion misc/configs/nydusd-config-v2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ bandwidth_limit = 10000000
[rafs]
# Filesystem metadata cache mode, "direct" or "cached". "direct" is almost what you want.
mode = "direct"
# Batch size to read data from storage cache layer, valid values: 0-0x10000000
# Amplified user IO request batch size to read data from remote storage backend / local cache,
# valid values: 0-0x10000000
batch_size = 1000000
# Whether to validate data digest.
validate = true
Expand Down
19 changes: 10 additions & 9 deletions rafs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub struct Rafs {
fs_prefetch: bool,
prefetch_all: bool,
xattr_enabled: bool,
amplify_io: u32,
user_io_batch_size: u32,

// static inode attributes
i_uid: u32,
Expand Down Expand Up @@ -102,7 +102,7 @@ impl Rafs {
initialized: false,
digest_validate: rafs_cfg.validate,
fs_prefetch: rafs_cfg.prefetch.enable,
amplify_io: rafs_cfg.batch_size as u32,
user_io_batch_size: rafs_cfg.user_io_batch_size as u32,
prefetch_all: rafs_cfg.prefetch.prefetch_all,
xattr_enabled: rafs_cfg.enable_xattr,

Expand Down Expand Up @@ -621,20 +621,21 @@ impl FileSystem for Rafs {
assert!(!io_vecs.is_empty() && !io_vecs[0].is_empty());

// Try to amplify user io for Rafs v5, to improve performance.
let amplify_io = cmp::min(self.amplify_io as usize, w.available_bytes()) as u32;
if self.sb.meta.is_v5() && size < amplify_io {
let user_io_batch_size =
cmp::min(self.user_io_batch_size as usize, w.available_bytes()) as u32;
if self.sb.meta.is_v5() && size < user_io_batch_size {
let all_chunks_ready = self.device.all_chunks_ready(&io_vecs);
if !all_chunks_ready {
let chunk_mask = self.metadata().chunk_size as u64 - 1;
let next_chunk_base = (offset + (size as u64) + chunk_mask) & !chunk_mask;
let window_base = cmp::min(next_chunk_base, inode_size);
let actual_size = window_base - (offset & !chunk_mask);
if actual_size < amplify_io as u64 {
let window_size = amplify_io as u64 - actual_size;
if actual_size < user_io_batch_size as u64 {
let window_size = user_io_batch_size as u64 - actual_size;
let orig_cnt = io_vecs.iter().fold(0, |s, d| s + d.len());
self.sb.amplify_io(
self.sb.amplify_user_io(
&self.device,
amplify_io,
user_io_batch_size,
&mut io_vecs,
&inode,
window_base,
Expand Down Expand Up @@ -1021,7 +1022,7 @@ mod tests {
fs_prefetch: false,
prefetch_all: false,
xattr_enabled: false,
amplify_io: 0,
user_io_batch_size: 0,
i_uid: 0,
i_gid: 0,
i_time: 0,
Expand Down
2 changes: 1 addition & 1 deletion rafs/src/metadata/md_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl RafsSuper {
// not overlap user IO's chunk.
// V5 rafs tries to amplify user IO by expanding more chunks to user IO and
// expect that those chunks are likely to be continuous with user IO's chunks.
pub(crate) fn amplify_io(
pub(crate) fn amplify_user_io(
&self,
device: &BlobDevice,
max_uncomp_size: u32,
Expand Down
4 changes: 2 additions & 2 deletions service/src/fs_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,8 @@ impl FsCacheHandler {
.map_err(|e| eother!(format!("failed to start prefetch worker, {}", e)))?;

let size = match cache_cfg.prefetch.batch_size.checked_next_power_of_two() {
None => nydus_api::default_batch_size() as u64,
Some(1) => nydus_api::default_batch_size() as u64,
None => nydus_api::default_prefetch_batch_size() as u64,
Some(1) => nydus_api::default_prefetch_batch_size() as u64,
Some(s) => s as u64,
};
let size = std::cmp::max(0x4_0000u64, size);
Expand Down
Loading
Loading