Skip to content

Commit

Permalink
local cas: solving the mismatch between nydus chunk amplification and…
Browse files Browse the repository at this point in the history
… dynamic dedup.

The mismatch between dynamic dedup and nydus' chunk amplification can
result in a larger cache size after dedup than without dedup. Because
chunk amplification can cause reused chunks to be pulled multiple
times, resulting in a larger cache size after dedup is enabled than when
dedup is not enabled.

To address this issue, a dedup_bitmap was introduced. When initializing
rafs, dedup_bitmap is generated based on the chunk information in blob.
The determination of whether a chunk in a blob is ready requires both
the chunk map and deduplication bitmap to make a joint decision.

Signed-off-by: xwb1136021767 <[email protected]>
  • Loading branch information
xwb1136021767 committed Aug 9, 2023
1 parent 091718a commit cd84507
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 122 deletions.
17 changes: 5 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ erased-serde = "0.3"
sha2 = "0.10.2"
tar = "0.4.40"
vmm-sys-util = "0.11.0"
xattr = "0.2.3"
xattr = "1.0.1"
toml = "0.5"
bitvec = { version="1", default-features = false, features = ["alloc",
"atomic",
Expand Down
6 changes: 5 additions & 1 deletion rafs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ lazy_static = "1.4.0"
libc = "0.2"
log = "0.4"
nix = "0.24"
serde = { version = "1.0.110", features = ["serde_derive", "rc"] }
serde = { version = "1.0.137", features = ["derive", "rc"] }
serde_json = "1.0.53"
vm-memory = "0.10"
fuse-backend-rs = "^0.10.3"
bitvec = { version="1", default-features = false, features = ["alloc",
"atomic",
"serde",
"std",]}

nydus-api = { version = "0.3", path = "../api" }
nydus-storage = { version = "0.6", path = "../storage", features = ["backend-localfs"] }
Expand Down
34 changes: 32 additions & 2 deletions rafs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ use std::any::Any;
use std::cmp;
use std::ffi::{CStr, OsStr, OsString};
use std::io::Result;
use std::mem::size_of;
use std::ops::Deref;
use std::os::unix::ffi::OsStrExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use bitvec::prelude::*;
use fuse_backend_rs::abi::fuse_abi::Attr;
use fuse_backend_rs::abi::fuse_abi::{stat64, statvfs64};
use fuse_backend_rs::api::filesystem::*;
Expand All @@ -38,6 +40,7 @@ use nydus_utils::{
metrics::{self, FopRecorder, StatsFop::*},
};

use crate::metadata::layout::v5::RafsV5ChunkInfo;
use crate::metadata::{
Inode, RafsInode, RafsInodeWalkAction, RafsSuper, RafsSuperMeta, DOT, DOTDOT,
};
Expand Down Expand Up @@ -81,12 +84,39 @@ impl Rafs {
pub fn new(cfg: &Arc<ConfigV2>, id: &str, path: &Path) -> RafsResult<(Self, RafsIoReader)> {
// Assume all meta/data blobs are accessible, otherwise it will always cause IO errors.
cfg.internal.set_blob_accessible(true);

let cache_cfg = cfg.get_cache_config().map_err(RafsError::LoadConfig)?;
let rafs_cfg = cfg.get_rafs_config().map_err(RafsError::LoadConfig)?;
let (sb, reader) = RafsSuper::load_from_file(path, cfg.clone(), false)
.map_err(RafsError::FillSuperblock)?;
let blob_infos = sb.superblock.get_blob_infos();
let mut blob_infos = sb.superblock.get_blob_infos();

// if enable chunk dedup, modify blob's dedup_bitmap.
let is_dedup = cfg.dedup.is_some() && cfg.dedup.as_ref().unwrap().enable;
if is_dedup {
let mut dedup_blobs = vec![];
for blob in blob_infos.iter() {
let mut blob = blob.deref().clone();
let chunk_count = blob.chunk_count() as usize;
let mut dedup_bitmap = bitvec!(0; chunk_count);
let size = sb.meta.chunk_table_size as usize;
let unit_size = size_of::<RafsV5ChunkInfo>();
if size % unit_size != 0 {
return Err(RafsError::InvalidImageData);
}

for idx in 0..(size / unit_size) {
let chunk = sb.superblock.get_chunk_info(idx as usize).unwrap();
let chunk_idx = chunk.id() as usize;
if chunk_idx < chunk_count {
dedup_bitmap.set(chunk_idx, chunk.is_deduped());
}
}
blob.set_dedup_bitmap(Some(dedup_bitmap));
dedup_blobs.push(Arc::new(blob));
}
blob_infos = dedup_blobs;
}

let device = BlobDevice::new(cfg, &blob_infos).map_err(RafsError::CreateDevice)?;

if cfg.is_chunk_validation_enabled() && sb.meta.has_inlined_chunk_digest() {
Expand Down
6 changes: 5 additions & 1 deletion storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ libc = "0.2"
log = "0.4.8"
nix = "0.24"
reqwest = { version = "0.11.14", features = ["blocking", "json"], optional = true }
serde = { version = "1.0.110", features = ["serde_derive", "rc"] }
serde = { version = "1.0.137", features = ["derive", "rc"] }
serde_json = "1.0.53"
sha1 = { version = "0.10.5", optional = true }
sha2 = { version = "0.10.2", optional = true }
Expand All @@ -35,6 +35,10 @@ url = { version = "2.1.1", optional = true }
vm-memory = "0.10"
fuse-backend-rs = "^0.10.3"
gpt = { version = "3.0.0", optional = true }
bitvec = { version="1", default-features = false, features = ["alloc",
"atomic",
"serde",
"std",]}

nydus-api = { version = "0.3", path = "../api" }
nydus-utils = { version = "0.4", path = "../utils", features = ["encryption", "zran"] }
Expand Down
25 changes: 18 additions & 7 deletions storage/src/cache/cachedfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,15 @@ impl FileCacheEntry {
extended_chunks[start..=end].to_vec()
}
} else {
while !extended_chunks.is_empty() {
let chunk = &extended_chunks[extended_chunks.len() - 1];
if matches!(self.chunk_map.is_ready(chunk.as_ref()), Ok(true)) {
extended_chunks.pop();
} else {
// check from start to end for dedup
for idx in 0..extended_chunks.len() {
let chunk = &extended_chunks[idx];
if matches!(self.chunk_map.is_ready(chunk.as_ref()), Ok(true))
|| self
.blob_info
.get_dedup_by_chunk_idx(chunk.as_ref().id() as usize)
{
extended_chunks.truncate(idx);
break;
}
}
Expand Down Expand Up @@ -591,6 +595,10 @@ impl BlobCache for FileCacheEntry {
continue;
}

if self.blob_info.get_dedup_by_chunk_idx(c.id() as usize) {
continue;
}

// For digested chunk map, we must check whether the cached data is valid because
// the digested chunk map cannot persist readiness state.
let d_size = c.uncompressed_size() as usize;
Expand All @@ -606,9 +614,12 @@ impl BlobCache for FileCacheEntry {
if let Ok(true) = self.chunk_map.check_ready_and_mark_pending(c.as_ref()) {
// The chunk is ready, so skip it.
continue;
} else {
pending.push(c.clone());
}
if self.blob_info.get_dedup_by_chunk_idx(c.id() as usize) {
continue;
}

pending.push(c.clone());
}
}

Expand Down
6 changes: 6 additions & 0 deletions storage/src/cache/dummycache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ impl BlobCache for DummyCache {
if !bios[0].user_io {
return Ok(0);
}
if self
.blob_info
.get_dedup_by_chunk_idx(bios[0].chunkinfo.id() as usize)
{
return Ok(0);
}
let buf = unsafe { std::slice::from_raw_parts_mut(bufs[0].as_ptr(), d_size) };
self.read_chunk_from_backend(&bios[0].chunkinfo, buf)?;
return Ok(buf.len());
Expand Down
49 changes: 46 additions & 3 deletions storage/src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use std::path::Path;
use std::sync::{Arc, Mutex};

use arc_swap::ArcSwap;
use bitvec::prelude::*;
use fuse_backend_rs::api::filesystem::ZeroCopyWriter;
use fuse_backend_rs::file_buf::FileVolatileSlice;
use fuse_backend_rs::file_traits::FileReadWriteVolatile;
Expand Down Expand Up @@ -180,6 +181,9 @@ pub struct BlobInfo {
cipher_object: Arc<Cipher>,
/// Cipher context for encryption.
cipher_ctx: Option<CipherContext>,

/// Bitvector to indicate which chunks are deduplicated.
dedup_bitmap: Option<BitVec>,
}

fn serialize_cipher_object<S>(cipher_object: &Arc<Cipher>, serializer: S) -> Result<S::Ok, S::Error>
Expand Down Expand Up @@ -270,6 +274,7 @@ impl BlobInfo {
meta_path: Arc::new(Mutex::new(String::new())),
cipher_object: Default::default(),
cipher_ctx: None,
dedup_bitmap: None,
};

blob_info.compute_features();
Expand Down Expand Up @@ -622,6 +627,38 @@ impl BlobInfo {
self.cipher_ctx.clone(),
)
}

pub fn set_dedup_bitmap(&mut self, dedup_bitmap: Option<BitVec>) {
self.dedup_bitmap = dedup_bitmap;
}

pub fn set_dedup_by_chunk_idx(&mut self, index: usize, value: bool) {
if index >= self.chunk_count as usize || self.dedup_bitmap.is_none() {
return;
}

if let Some(bitmap) = &mut self.dedup_bitmap {
bitmap.set(index, value);
}
}

pub fn get_dedup_by_chunk_idx(&self, index: usize) -> bool {
if self.dedup_bitmap.is_none() {
return false;
}
// if chunk index > blob.chunk_count, means this chunk is from other blob.
if index >= self.chunk_count as usize {
return true;
}
if let Some(bitmap) = &self.dedup_bitmap {
match bitmap.get(index).as_deref() {
Some(v) => *v,
None => false,
}
} else {
false
}
}
}

bitflags! {
Expand Down Expand Up @@ -1144,11 +1181,17 @@ impl BlobDevice {
/// Create new blob device instance.
pub fn new(config: &Arc<ConfigV2>, blob_infos: &[Arc<BlobInfo>]) -> io::Result<BlobDevice> {
let mut blobs = Vec::with_capacity(blob_infos.len());
let dedup_config = config.get_dedup_config()?;
let mut is_dedup = dedup_config.get_enable();
let dedup_config = match config.get_dedup_config() {
Ok(config) => Some(config),
Err(_) => None,
};
let mut is_dedup = match dedup_config {
Some(config) => config.get_enable(),
None => false,
};
let cas_mgr = match is_dedup {
true => {
let db_path = dedup_config.get_work_dir()?;
let db_path = dedup_config.unwrap().get_work_dir()?;
match CasMgr::new(db_path) {
Ok(cas_mgr) => Some(cas_mgr),
Err(_) => {
Expand Down
Loading

0 comments on commit cd84507

Please sign in to comment.