Skip to content

Commit

Permalink
rafs: rename variable names about prefetch configuration
Browse files Browse the repository at this point in the history
Variable names about prefetch are confused currently.
So we merge variable names that have the same meaning,
while DO NOT affect the field names read from the configuration file.

Signed-off-by: Wenhao Ren <[email protected]>
  • Loading branch information
hangvane committed Nov 9, 2023
1 parent 1c24213 commit 991ee2e
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 60 deletions.
78 changes: 40 additions & 38 deletions api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ impl CacheConfigV2 {
if self.prefetch.batch_size > 0x10000000 {
return false;
}
if self.prefetch.threads == 0 || self.prefetch.threads > 1024 {
if self.prefetch.threads_count == 0 || self.prefetch.threads_count > 1024 {
return false;
}
}
Expand Down Expand Up @@ -820,8 +820,8 @@ pub struct RafsConfigV2 {
#[serde(default = "default_rafs_mode")]
pub mode: String,
/// Batch size to read data from storage cache layer.
#[serde(default = "default_batch_size")]
pub batch_size: usize,
#[serde(rename = "batch_size", default = "default_batch_size")]
pub user_io_batch_size: usize,
/// Whether to validate data digest.
#[serde(default)]
pub validate: bool,
Expand Down Expand Up @@ -850,14 +850,14 @@ impl RafsConfigV2 {
if self.mode != "direct" && self.mode != "cached" {
return false;
}
if self.batch_size > 0x10000000 {
if self.user_io_batch_size > 0x10000000 {
return false;
}
if self.prefetch.enable {
if self.prefetch.batch_size > 0x10000000 {
return false;
}
if self.prefetch.threads == 0 || self.prefetch.threads > 1024 {
if self.prefetch.threads_count == 0 || self.prefetch.threads_count > 1024 {
return false;
}
}
Expand All @@ -872,8 +872,8 @@ pub struct PrefetchConfigV2 {
/// Whether to enable blob data prefetching.
pub enable: bool,
/// Number of data prefetching working threads.
#[serde(default = "default_prefetch_threads")]
pub threads: usize,
#[serde(rename = "threads", default = "default_prefetch_threads_count")]
pub threads_count: usize,
/// The batch size to prefetch data from backend.
#[serde(default = "default_prefetch_batch_size")]
pub batch_size: usize,
Expand Down Expand Up @@ -1202,7 +1202,7 @@ fn default_prefetch_batch_size() -> usize {
1024 * 1024
}

fn default_prefetch_threads() -> usize {
fn default_prefetch_threads_count() -> usize {
8
}

Expand Down Expand Up @@ -1364,8 +1364,8 @@ struct RafsConfig {
#[serde(default)]
pub latest_read_files: bool,
// ZERO value means, amplifying user io is not enabled.
#[serde(default = "default_batch_size")]
pub amplify_io: usize,
#[serde(rename = "amplify_io", default = "default_batch_size")]
pub user_io_batch_size: usize,
}

impl TryFrom<RafsConfig> for ConfigV2 {
Expand All @@ -1376,7 +1376,7 @@ impl TryFrom<RafsConfig> for ConfigV2 {
let mut cache: CacheConfigV2 = (&v.device.cache).try_into()?;
let rafs = RafsConfigV2 {
mode: v.mode,
batch_size: v.amplify_io,
user_io_batch_size: v.user_io_batch_size,
validate: v.digest_validate,
enable_xattr: v.enable_xattr,
iostats_files: v.iostats_files,
Expand Down Expand Up @@ -1407,23 +1407,23 @@ struct FsPrefetchControl {
pub enable: bool,

/// How many working threads to prefetch data.
#[serde(default = "default_prefetch_threads")]
#[serde(default = "default_prefetch_threads_count")]
pub threads_count: usize,

/// Window size in unit of bytes to merge request to backend.
#[serde(default = "default_batch_size")]
pub merging_size: usize,
#[serde(rename = "merging_size", default = "default_batch_size")]
pub batch_size: usize,

/// Network bandwidth limitation for prefetching.
///
/// In unit of Bytes. It sets a limit to prefetch bandwidth usage in order to
/// reduce congestion with normal user IO.
/// bandwidth_rate == 0 -- prefetch bandwidth ratelimit disabled
/// bandwidth_rate > 0 -- prefetch bandwidth ratelimit enabled.
/// bandwidth_limit == 0 -- prefetch bandwidth ratelimit disabled
/// bandwidth_limit > 0 -- prefetch bandwidth ratelimit enabled.
/// Please note that if the value is less than Rafs chunk size,
/// it will be raised to the chunk size.
#[serde(default)]
pub bandwidth_rate: u32,
#[serde(default, rename = "bandwidth_rate")]
pub bandwidth_limit: u32,

/// Whether to prefetch all filesystem data.
#[serde(default = "default_prefetch_all")]
Expand All @@ -1434,9 +1434,9 @@ impl From<FsPrefetchControl> for PrefetchConfigV2 {
fn from(v: FsPrefetchControl) -> Self {
PrefetchConfigV2 {
enable: v.enable,
threads: v.threads_count,
batch_size: v.merging_size,
bandwidth_limit: v.bandwidth_rate,
threads_count: v.threads_count,
batch_size: v.batch_size,
bandwidth_limit: v.bandwidth_limit,
prefetch_all: v.prefetch_all,
}
}
Expand All @@ -1450,18 +1450,20 @@ struct BlobPrefetchConfig {
/// Number of data prefetching working threads.
pub threads_count: usize,
/// The maximum size of a merged IO request.
pub merging_size: usize,
#[serde(rename = "merging_size")]
pub batch_size: usize,
/// Network bandwidth rate limit in unit of Bytes and Zero means no limit.
pub bandwidth_rate: u32,
#[serde(rename = "bandwidth_rate")]
pub bandwidth_limit: u32,
}

impl From<&BlobPrefetchConfig> for PrefetchConfigV2 {
fn from(v: &BlobPrefetchConfig) -> Self {
PrefetchConfigV2 {
enable: v.enable,
threads: v.threads_count,
batch_size: v.merging_size,
bandwidth_limit: v.bandwidth_rate,
threads_count: v.threads_count,
batch_size: v.batch_size,
bandwidth_limit: v.bandwidth_limit,
prefetch_all: true,
}
}
Expand Down Expand Up @@ -1530,8 +1532,8 @@ mod tests {
let config = BlobPrefetchConfig::default();
assert!(!config.enable);
assert_eq!(config.threads_count, 0);
assert_eq!(config.merging_size, 0);
assert_eq!(config.bandwidth_rate, 0);
assert_eq!(config.batch_size, 0);
assert_eq!(config.bandwidth_limit, 0);

let content = r#"{
"enable": true,
Expand All @@ -1542,12 +1544,12 @@ mod tests {
let config: BlobPrefetchConfig = serde_json::from_str(content).unwrap();
assert!(config.enable);
assert_eq!(config.threads_count, 2);
assert_eq!(config.merging_size, 4);
assert_eq!(config.bandwidth_rate, 5);
assert_eq!(config.batch_size, 4);
assert_eq!(config.bandwidth_limit, 5);

let config: PrefetchConfigV2 = (&config).into();
assert!(config.enable);
assert_eq!(config.threads, 2);
assert_eq!(config.threads_count, 2);
assert_eq!(config.batch_size, 4);
assert_eq!(config.bandwidth_limit, 5);
assert!(config.prefetch_all);
Expand Down Expand Up @@ -1618,7 +1620,7 @@ mod tests {
assert!(blob_config.cache_config.is_object());
assert!(blob_config.prefetch_config.enable);
assert_eq!(blob_config.prefetch_config.threads_count, 2);
assert_eq!(blob_config.prefetch_config.merging_size, 4);
assert_eq!(blob_config.prefetch_config.batch_size, 4);
assert_eq!(
blob_config.metadata_path.as_ref().unwrap().as_str(),
"/tmp/metadata1"
Expand All @@ -1630,7 +1632,7 @@ mod tests {
assert_eq!(blob_config.cache.cache_type, "fscache");
assert!(blob_config.cache.fs_cache.is_some());
assert!(blob_config.cache.prefetch.enable);
assert_eq!(blob_config.cache.prefetch.threads, 2);
assert_eq!(blob_config.cache.prefetch.threads_count, 2);
assert_eq!(blob_config.cache.prefetch.batch_size, 4);
assert_eq!(
blob_config.metadata_path.as_ref().unwrap().as_str(),
Expand All @@ -1654,7 +1656,7 @@ mod tests {
let blob_config = config.blob_config_legacy.as_ref().unwrap();
assert!(!blob_config.prefetch_config.enable);
assert_eq!(blob_config.prefetch_config.threads_count, 0);
assert_eq!(blob_config.prefetch_config.merging_size, 0);
assert_eq!(blob_config.prefetch_config.batch_size, 0);
}

#[test]
Expand Down Expand Up @@ -1967,7 +1969,7 @@ mod tests {

let prefetch = &cache.prefetch;
assert!(prefetch.enable);
assert_eq!(prefetch.threads, 8);
assert_eq!(prefetch.threads_count, 8);
assert_eq!(prefetch.batch_size, 1000000);
assert_eq!(prefetch.bandwidth_limit, 10000000);
}
Expand Down Expand Up @@ -1998,14 +2000,14 @@ mod tests {

let rafs = config.rafs.as_ref().unwrap();
assert_eq!(&rafs.mode, "direct");
assert_eq!(rafs.batch_size, 1000000);
assert_eq!(rafs.user_io_batch_size, 1000000);
assert!(rafs.validate);
assert!(rafs.enable_xattr);
assert!(rafs.iostats_files);
assert!(rafs.access_pattern);
assert!(rafs.latest_read_files);
assert!(rafs.prefetch.enable);
assert_eq!(rafs.prefetch.threads, 4);
assert_eq!(rafs.prefetch.threads_count, 4);
assert_eq!(rafs.prefetch.batch_size, 1000000);
assert_eq!(rafs.prefetch.bandwidth_limit, 10000000);
assert!(rafs.prefetch.prefetch_all)
Expand Down Expand Up @@ -2557,7 +2559,7 @@ mod tests {
assert!(default_true());
assert_eq!(default_failure_limit(), 5);
assert_eq!(default_prefetch_batch_size(), 1024 * 1024);
assert_eq!(default_prefetch_threads(), 8);
assert_eq!(default_prefetch_threads_count(), 8);
}

#[test]
Expand Down
19 changes: 10 additions & 9 deletions rafs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub struct Rafs {
fs_prefetch: bool,
prefetch_all: bool,
xattr_enabled: bool,
amplify_io: u32,
user_io_batch_size: u32,

// static inode attributes
i_uid: u32,
Expand Down Expand Up @@ -102,7 +102,7 @@ impl Rafs {
initialized: false,
digest_validate: rafs_cfg.validate,
fs_prefetch: rafs_cfg.prefetch.enable,
amplify_io: rafs_cfg.batch_size as u32,
user_io_batch_size: rafs_cfg.user_io_batch_size as u32,
prefetch_all: rafs_cfg.prefetch.prefetch_all,
xattr_enabled: rafs_cfg.enable_xattr,

Expand Down Expand Up @@ -621,20 +621,21 @@ impl FileSystem for Rafs {
assert!(!io_vecs.is_empty() && !io_vecs[0].is_empty());

// Try to amplify user io for Rafs v5, to improve performance.
let amplify_io = cmp::min(self.amplify_io as usize, w.available_bytes()) as u32;
if self.sb.meta.is_v5() && size < amplify_io {
let user_io_batch_size =
cmp::min(self.user_io_batch_size as usize, w.available_bytes()) as u32;
if self.sb.meta.is_v5() && size < user_io_batch_size {
let all_chunks_ready = self.device.all_chunks_ready(&io_vecs);
if !all_chunks_ready {
let chunk_mask = self.metadata().chunk_size as u64 - 1;
let next_chunk_base = (offset + (size as u64) + chunk_mask) & !chunk_mask;
let window_base = cmp::min(next_chunk_base, inode_size);
let actual_size = window_base - (offset & !chunk_mask);
if actual_size < amplify_io as u64 {
let window_size = amplify_io as u64 - actual_size;
if actual_size < user_io_batch_size as u64 {
let window_size = user_io_batch_size as u64 - actual_size;
let orig_cnt = io_vecs.iter().fold(0, |s, d| s + d.len());
self.sb.amplify_io(
self.sb.amplify_user_io(
&self.device,
amplify_io,
user_io_batch_size,
&mut io_vecs,
&inode,
window_base,
Expand Down Expand Up @@ -1021,7 +1022,7 @@ mod tests {
fs_prefetch: false,
prefetch_all: false,
xattr_enabled: false,
amplify_io: 0,
user_io_batch_size: 0,
i_uid: 0,
i_gid: 0,
i_time: 0,
Expand Down
2 changes: 1 addition & 1 deletion rafs/src/metadata/md_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl RafsSuper {
// not overlap user IO's chunk.
// V5 rafs tries to amplify user IO by expanding more chunks to user IO and
// expect that those chunks are likely to be continuous with user IO's chunks.
pub(crate) fn amplify_io(
pub(crate) fn amplify_user_io(
&self,
device: &BlobDevice,
max_uncomp_size: u32,
Expand Down
4 changes: 2 additions & 2 deletions storage/src/cache/cachedfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,10 @@ impl FileCacheEntry {
}

fn prefetch_batch_size(&self) -> u64 {
if self.prefetch_config.merging_size < 0x2_0000 {
if self.prefetch_config.batch_size < 0x2_0000 {
0x2_0000
} else {
self.prefetch_config.merging_size as u64
self.prefetch_config.batch_size as u64
}
}

Expand Down
20 changes: 10 additions & 10 deletions storage/src/cache/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@ pub(crate) struct AsyncPrefetchConfig {
/// Number of working threads.
pub threads_count: usize,
/// Window size to merge/amplify requests.
pub merging_size: usize,
pub batch_size: usize,
/// Network bandwidth for prefetch, in unit of Bytes and Zero means no rate limit is set.
#[allow(unused)]
pub bandwidth_rate: u32,
pub bandwidth_limit: u32,
}

impl From<&PrefetchConfigV2> for AsyncPrefetchConfig {
fn from(p: &PrefetchConfigV2) -> Self {
AsyncPrefetchConfig {
enable: p.enable,
threads_count: p.threads,
merging_size: p.batch_size,
bandwidth_rate: p.bandwidth_limit,
threads_count: p.threads_count,
batch_size: p.batch_size,
bandwidth_limit: p.bandwidth_limit,
}
}
}
Expand Down Expand Up @@ -97,7 +97,7 @@ impl AsyncWorkerMgr {
prefetch_config: Arc<AsyncPrefetchConfig>,
) -> Result<Self> {
#[cfg(feature = "prefetch-rate-limit")]
let prefetch_limiter = match prefetch_config.bandwidth_rate {
let prefetch_limiter = match prefetch_config.bandwidth_limit {
0 => None,
v => {
// If the given value is less than maximum blob chunk size, it exceeds burst size of the
Expand Down Expand Up @@ -437,8 +437,8 @@ mod tests {
let config = Arc::new(AsyncPrefetchConfig {
enable: true,
threads_count: 2,
merging_size: 0x100000,
bandwidth_rate: 0x100000,
batch_size: 0x100000,
bandwidth_limit: 0x100000,
});

let mgr = Arc::new(AsyncWorkerMgr::new(metrics, config).unwrap());
Expand Down Expand Up @@ -477,8 +477,8 @@ mod tests {
let config = Arc::new(AsyncPrefetchConfig {
enable: true,
threads_count: 4,
merging_size: 0x1000000,
bandwidth_rate: 0x1000000,
batch_size: 0x1000000,
bandwidth_limit: 0x1000000,
});

let mgr = Arc::new(AsyncWorkerMgr::new(metrics, config).unwrap());
Expand Down

0 comments on commit 991ee2e

Please sign in to comment.