Skip to content

Commit

Permalink
nydusd: achieve dynamic dedup for nydusd.
Browse files Browse the repository at this point in the history
The previous version of local cas was static dedup, which only modified
the chunk information in bootstrap. There is a serious problem: it may
reuse chunks that cannot be obtained by the backend of the current
image, resulting in the container being unable to load the corresponding
chunk data on demand during runtime.

To address this issue, dynamic dedup was introduced. When nydusd
initializes the blob cache, it reads the corresponding backend
configuration information of the blob from the CAS database, enabling
the blob cache to read chunk data from other backend.

Signed-off-by: xwb1136021767 <[email protected]>
  • Loading branch information
xwb1136021767 committed Aug 9, 2023
1 parent 49cdc51 commit f79015d
Show file tree
Hide file tree
Showing 12 changed files with 310 additions and 72 deletions.
17 changes: 5 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2287,4 +2287,69 @@ mod tests {
let auth = registry.auth.unwrap();
assert_eq!(auth, test_auth);
}

#[test]
fn test_dedup_config() {
let content = r#"{
"enable": true,
"work_dir": "/tmp/nydus-cas"
}"#;
let config: DeduplicationConfig = serde_json::from_str(content).unwrap();
assert!(config.enable, "{}", true);
assert_eq!(config.work_dir, "/tmp/nydus-cas");
}

#[test]
fn test_snapshotter_config_with_dedup() {
let content = r#"
{
"device": {
"backend": {
"type": "registry",
"config": {
"readahead": false,
"host": "localhost",
"repo": "vke/golang",
"auth": "",
"scheme": "https",
"proxy": {
"fallback": false
},
"timeout": 5,
"connect_timeout": 5,
"retry_limit": 2
}
},
"cache": {
"type": "blobcache",
"compressed": true,
"config": {
"work_dir": "/var/lib/containerd-nydus/cache",
"disable_indexed_map": false
}
},
"dedup": {
"work_dir": "/home/t4/containerd/io.containerd.snapshotter.v1.nydus"
}
},
"mode": "direct",
"digest_validate": false,
"enable_xattr": true,
"fs_prefetch": {
"enable": true,
"prefetch_all": true,
"threads_count": 8,
"merging_size": 1048576,
"bandwidth_rate": 0
}
}
"#;
let config = ConfigV2::from_str(content).unwrap();
assert_eq!(&config.id, "");
assert!(config.dedup.as_ref().unwrap().enable, "{}", false);
assert_eq!(
&config.dedup.unwrap().work_dir,
"/home/t4/containerd/io.containerd.snapshotter.v1.nydus"
);
}
}
2 changes: 1 addition & 1 deletion builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ erased-serde = "0.3"
sha2 = "0.10.2"
tar = "0.4.40"
vmm-sys-util = "0.11.0"
xattr = "0.2.3"
xattr = "1.0.1"
toml = "0.5"
bitvec = { version="1", default-features = false, features = ["alloc",
"atomic",
Expand Down
26 changes: 7 additions & 19 deletions builder/src/core/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl NodeChunk {
}

/// Struct to host sharable fields of [Node].
#[derive(Clone, Default)]
#[derive(Clone, Default, Debug)]
pub struct NodeInfo {
/// Whether the explicit UID/GID feature is enabled or not.
pub explicit_uidgid: bool,
Expand Down Expand Up @@ -736,7 +736,7 @@ impl Node {
.with_context(|| format!("failed to get metadata of {}", self.path().display()))
}

pub fn get_chunk_ofs(&mut self, meta: &RafsSuperMeta) -> Result<(u64, u64)> {
fn get_chunk_ofs(&mut self, meta: &RafsSuperMeta) -> Result<(u64, u64)> {
if meta.version == RAFS_SUPER_VERSION_V6 {
self.get_chunk_ofs_v6(meta)
} else {
Expand Down Expand Up @@ -900,23 +900,6 @@ impl Node {
}
}

pub fn update_inode_digest_for_bootstrap(&self, writer: &mut dyn RafsIoWrite) -> Result<()> {
// Dump inode info
if let InodeWrapper::V5(raw_inode) = &self.inode {
let name = self.name();
let inode = RafsV5InodeWrapper {
name,
symlink: self.info.symlink.as_deref(),
inode: raw_inode,
};
inode
.store(writer)
.context("failed to dump inode to bootstrap")?;
}

Ok(())
}

fn dedup_bootstrap_v5(
&self,
build_ctx: &BuildContext,
Expand Down Expand Up @@ -1097,3 +1080,8 @@ impl Node {
self.info = Arc::new(info);
}
}

#[cfg(test)]
mod tests {
use super::*;
}
6 changes: 5 additions & 1 deletion rafs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ lazy_static = "1.4.0"
libc = "0.2"
log = "0.4"
nix = "0.24"
serde = { version = "1.0.110", features = ["serde_derive", "rc"] }
serde = { version = "1.0.137", features = ["derive", "rc"] }
serde_json = "1.0.53"
vm-memory = "0.10"
fuse-backend-rs = "^0.10.3"
bitvec = { version="1", default-features = false, features = ["alloc",
"atomic",
"serde",
"std",]}

nydus-api = { version = "0.3", path = "../api" }
nydus-storage = { version = "0.6", path = "../storage", features = ["backend-localfs"] }
Expand Down
46 changes: 44 additions & 2 deletions rafs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ use std::any::Any;
use std::cmp;
use std::ffi::{CStr, OsStr, OsString};
use std::io::Result;
use std::mem::size_of;
use std::ops::Deref;
use std::os::unix::ffi::OsStrExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use bitvec::prelude::*;
use fuse_backend_rs::abi::fuse_abi::Attr;
use fuse_backend_rs::abi::fuse_abi::{stat64, statvfs64};
use fuse_backend_rs::api::filesystem::*;
Expand All @@ -37,7 +39,9 @@ use nydus_utils::{
div_round_up,
metrics::{self, FopRecorder, StatsFop::*},
};
use storage::device::BlobInfo;

use crate::metadata::layout::v5::RafsV5ChunkInfo;
use crate::metadata::{
Inode, RafsInode, RafsInodeWalkAction, RafsSuper, RafsSuperMeta, DOT, DOTDOT,
};
Expand Down Expand Up @@ -81,12 +85,14 @@ impl Rafs {
pub fn new(cfg: &Arc<ConfigV2>, id: &str, path: &Path) -> RafsResult<(Self, RafsIoReader)> {
// Assume all meta/data blobs are accessible, otherwise it will always cause IO errors.
cfg.internal.set_blob_accessible(true);

let cache_cfg = cfg.get_cache_config().map_err(RafsError::LoadConfig)?;
let rafs_cfg = cfg.get_rafs_config().map_err(RafsError::LoadConfig)?;
let (sb, reader) = RafsSuper::load_from_file(path, cfg.clone(), false)
.map_err(RafsError::FillSuperblock)?;
let blob_infos = sb.superblock.get_blob_infos();
let mut blob_infos = sb.superblock.get_blob_infos();

// if enable chunk dedup, modify blob's dedup_bitmap.
blob_infos = Self::generate_dedup_bitmap_by_chunk_info(cfg, &blob_infos, &sb)?;
let device = BlobDevice::new(cfg, &blob_infos).map_err(RafsError::CreateDevice)?;

if cfg.is_chunk_validation_enabled() && sb.meta.has_inlined_chunk_digest() {
Expand Down Expand Up @@ -324,6 +330,42 @@ impl Rafs {

entry
}

fn generate_dedup_bitmap_by_chunk_info(
cfg: &Arc<ConfigV2>,
blob_infos: &Vec<Arc<BlobInfo>>,
sb: &RafsSuper,
) -> RafsResult<Vec<Arc<BlobInfo>>> {
if let Some(dedup_config) = &cfg.dedup {
if dedup_config.enable {
let mut dedup_blobs = vec![];
for blob in blob_infos.iter() {
let mut blob = blob.deref().clone();
let chunk_count = blob.chunk_count() as usize;
let mut dedup_bitmap = bitvec!(0; chunk_count);
let size = sb.meta.chunk_table_size as usize;
let unit_size = size_of::<RafsV5ChunkInfo>();
if size % unit_size != 0 {
return Err(RafsError::InvalidImageData);
}

for idx in 0..(size / unit_size) {
let chunk = sb.superblock.get_chunk_info(idx as usize).unwrap();
let chunk_idx = chunk.id() as usize;
if chunk_idx < chunk_count {
dedup_bitmap.set(chunk_idx, chunk.is_deduped());
}
}
blob.set_dedup_bitmap(Some(dedup_bitmap));
dedup_blobs.push(Arc::new(blob));
}

return Ok(dedup_blobs);
}
}

Ok(blob_infos.to_owned())
}
}

impl Rafs {
Expand Down
23 changes: 23 additions & 0 deletions rafs/src/metadata/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,3 +449,26 @@ pub fn convert_ref_to_rafs_v5_chunk_info(cki: &dyn BlobChunkInfo) -> RafsV5Chunk
let chunk = to_rafs_v5_chunk_info(as_blob_v5_chunk_info(cki.deref()));
chunk
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_set_deduped_for_chunk_v5() {
let mut chunk = ChunkWrapper::new(RafsVersion::V5);
assert!(!chunk.is_deduped());

chunk.set_deduped(true);
assert!(chunk.is_deduped());
}

#[test]
fn test_set_deduped_for_chunk_v6() {
let mut chunk = ChunkWrapper::new(RafsVersion::V6);
assert!(!chunk.is_deduped());

chunk.set_deduped(true);
assert!(chunk.is_deduped());
}
}
6 changes: 5 additions & 1 deletion storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ libc = "0.2"
log = "0.4.8"
nix = "0.24"
reqwest = { version = "0.11.14", features = ["blocking", "json"], optional = true }
serde = { version = "1.0.110", features = ["serde_derive", "rc"] }
serde = { version = "1.0.137", features = ["derive", "rc"] }
serde_json = "1.0.53"
sha1 = { version = "0.10.5", optional = true }
sha2 = { version = "0.10.2", optional = true }
Expand All @@ -35,6 +35,10 @@ url = { version = "2.1.1", optional = true }
vm-memory = "0.10"
fuse-backend-rs = "^0.10.3"
gpt = { version = "3.0.0", optional = true }
bitvec = { version="1", default-features = false, features = ["alloc",
"atomic",
"serde",
"std",]}

nydus-api = { version = "0.3", path = "../api" }
nydus-utils = { version = "0.4", path = "../utils", features = ["encryption", "zran"] }
Expand Down
25 changes: 18 additions & 7 deletions storage/src/cache/cachedfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,15 @@ impl FileCacheEntry {
extended_chunks[start..=end].to_vec()
}
} else {
while !extended_chunks.is_empty() {
let chunk = &extended_chunks[extended_chunks.len() - 1];
if matches!(self.chunk_map.is_ready(chunk.as_ref()), Ok(true)) {
extended_chunks.pop();
} else {
// check from start to end for dedup
for idx in 0..extended_chunks.len() {
let chunk = &extended_chunks[idx];
if matches!(self.chunk_map.is_ready(chunk.as_ref()), Ok(true))
|| self
.blob_info
.get_dedup_by_chunk_idx(chunk.as_ref().id() as usize)
{
extended_chunks.truncate(idx);
break;
}
}
Expand Down Expand Up @@ -591,6 +595,10 @@ impl BlobCache for FileCacheEntry {
continue;
}

if self.blob_info.get_dedup_by_chunk_idx(c.id() as usize) {
continue;
}

// For digested chunk map, we must check whether the cached data is valid because
// the digested chunk map cannot persist readiness state.
let d_size = c.uncompressed_size() as usize;
Expand All @@ -606,9 +614,12 @@ impl BlobCache for FileCacheEntry {
if let Ok(true) = self.chunk_map.check_ready_and_mark_pending(c.as_ref()) {
// The chunk is ready, so skip it.
continue;
} else {
pending.push(c.clone());
}
if self.blob_info.get_dedup_by_chunk_idx(c.id() as usize) {
continue;
}

pending.push(c.clone());
}
}

Expand Down
6 changes: 6 additions & 0 deletions storage/src/cache/dummycache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ impl BlobCache for DummyCache {
if !bios[0].user_io {
return Ok(0);
}
if self
.blob_info
.get_dedup_by_chunk_idx(bios[0].chunkinfo.id() as usize)
{
return Ok(0);
}
let buf = unsafe { std::slice::from_raw_parts_mut(bufs[0].as_ptr(), d_size) };
self.read_chunk_from_backend(&bios[0].chunkinfo, buf)?;
return Ok(buf.len());
Expand Down
Loading

0 comments on commit f79015d

Please sign in to comment.