Skip to content

Commit

Permalink
storage: enable chunk deduplication for file cache
Browse files Browse the repository at this point in the history
Enable chunk deduplication for file cache. It works in this way:
- When a chunk is not in blob cache file yet, inquery CAS database
  whether other blob data files have the required chunk. If there's
  duplicated data chunk in other data files, copy the chunk data
  into current blob cache file by using copy_file_range().
- After downloading a data chunk from remote, save file/offset/chunk-id
  into CAS database, so it can be reused later.

Signed-off-by: Jiang Liu <[email protected]>
  • Loading branch information
jiangliu committed Dec 7, 2023
1 parent e188240 commit b56a24b
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 14 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ default = [
"backend-s3",
"backend-http-proxy",
"backend-localdisk",
"dedup",
]
virtiofs = [
"nydus-service/virtiofs",
Expand All @@ -106,5 +107,7 @@ backend-oss = ["nydus-storage/backend-oss"]
backend-registry = ["nydus-storage/backend-registry"]
backend-s3 = ["nydus-storage/backend-s3"]

dedup = ["nydus-storage/dedup"]

[workspace]
members = ["api", "builder", "clib", "rafs", "storage", "service", "utils"]
members = ["api", "builder", "clib", "rafs", "storage", "service", "utils"]
23 changes: 21 additions & 2 deletions src/bin/nydusd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use nydus_service::{
create_daemon, create_fuse_daemon, create_vfs_backend, validate_threads_configuration,
Error as NydusError, FsBackendMountCmd, FsBackendType, ServiceArgs,
};
use nydus_storage::cache::CasMgr;

use crate::api_server_glue::ApiServerController;

Expand All @@ -50,7 +51,7 @@ fn thread_validator(v: &str) -> std::result::Result<String, String> {
}

fn append_fs_options(app: Command) -> Command {
app.arg(
let mut app = app.arg(
Arg::new("bootstrap")
.long("bootstrap")
.short('B')
Expand Down Expand Up @@ -87,7 +88,18 @@ fn append_fs_options(app: Command) -> Command {
.help("Mountpoint within the FUSE/virtiofs device to mount the RAFS/passthroughfs filesystem")
.default_value("/")
.required(false),
)
);

#[cfg(feature = "dedup")]
{
app = app.arg(
Arg::new("dedup-db")
.long("dedup-db")
.help("Database file for chunk deduplication"),
);
}

app
}

fn append_fuse_options(app: Command) -> Command {
Expand Down Expand Up @@ -748,6 +760,13 @@ fn main() -> Result<()> {
dump_program_info();
handle_rlimit_nofile_option(&args, "rlimit-nofile")?;

#[cfg(feature = "dedup")]
if let Some(db) = args.get_one::<String>("dedup-db") {
let mgr = CasMgr::new(db).map_err(|e| eother!(format!("{}", e)))?;
info!("Enable chunk deduplication by using database at {}", db);
CasMgr::set_singleton(mgr);
}

match args.subcommand_name() {
Some("singleton") => {
// Safe to unwrap because the subcommand is `singleton`.
Expand Down
1 change: 0 additions & 1 deletion storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ regex = "1.7.0"
toml = "0.5"

[features]
default = ["dedup"]
backend-localdisk = []
backend-localdisk-gpt = ["gpt", "backend-localdisk"]
backend-localfs = []
Expand Down
24 changes: 22 additions & 2 deletions storage/src/cache/cachedfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::collections::HashSet;
use std::fs::File;
use std::io::{ErrorKind, Read, Result};
use std::mem::ManuallyDrop;
use std::ops::Deref;
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
Expand All @@ -29,7 +30,7 @@ use tokio::runtime::Runtime;
use crate::backend::BlobReader;
use crate::cache::state::ChunkMap;
use crate::cache::worker::{AsyncPrefetchConfig, AsyncPrefetchMessage, AsyncWorkerMgr};
use crate::cache::{BlobCache, BlobIoMergeState};
use crate::cache::{BlobCache, BlobIoMergeState, CasMgr};
use crate::device::{
BlobChunkInfo, BlobInfo, BlobIoDesc, BlobIoRange, BlobIoSegment, BlobIoTag, BlobIoVec,
BlobObject, BlobPrefetchRequest,
Expand Down Expand Up @@ -133,8 +134,10 @@ pub(crate) struct FileCacheEntry {
pub(crate) blob_info: Arc<BlobInfo>,
pub(crate) cache_cipher_object: Arc<Cipher>,
pub(crate) cache_cipher_context: Arc<CipherContext>,
pub(crate) cas_mgr: Option<Arc<CasMgr>>,
pub(crate) chunk_map: Arc<dyn ChunkMap>,
pub(crate) file: Arc<File>,
pub(crate) file_path: Arc<String>,
pub(crate) meta: Option<FileCacheMeta>,
pub(crate) metrics: Arc<BlobcacheMetrics>,
pub(crate) prefetch_state: Arc<AtomicU32>,
Expand Down Expand Up @@ -182,13 +185,16 @@ impl FileCacheEntry {
}

fn delay_persist_chunk_data(&self, chunk: Arc<dyn BlobChunkInfo>, buffer: Arc<DataBuffer>) {
let blob_info = self.blob_info.clone();
let delayed_chunk_map = self.chunk_map.clone();
let file = self.file.clone();
let file_path = self.file_path.clone();
let metrics = self.metrics.clone();
let is_raw_data = self.is_raw_data;
let is_cache_encrypted = self.is_cache_encrypted;
let cipher_object = self.cache_cipher_object.clone();
let cipher_context = self.cache_cipher_context.clone();
let cas_mgr = self.cas_mgr.clone();

metrics.buffered_backend_size.add(buffer.size() as u64);
self.runtime.spawn_blocking(move || {
Expand Down Expand Up @@ -240,6 +246,11 @@ impl FileCacheEntry {
};
let res = Self::persist_cached_data(&file, offset, buf);
Self::_update_chunk_pending_status(&delayed_chunk_map, chunk.as_ref(), res.is_ok());
if let Some(mgr) = cas_mgr {
if let Err(e) = mgr.record_chunk(&blob_info, chunk.deref(), file_path.as_ref()) {
warn!("failed to record chunk state for dedup, {}", e);
}
}
});
}

Expand Down Expand Up @@ -973,13 +984,22 @@ impl FileCacheEntry {

trace!("dispatch single io range {:?}", req);
for (i, chunk) in req.chunks.iter().enumerate() {
let is_ready = match self.chunk_map.check_ready_and_mark_pending(chunk.as_ref()) {
let mut is_ready = match self.chunk_map.check_ready_and_mark_pending(chunk.as_ref()) {
Ok(true) => true,
Ok(false) => false,
Err(StorageError::Timeout) => false, // Retry if waiting for inflight IO timeouts
Err(e) => return Err(einval!(e)),
};

if !is_ready {
if let Some(mgr) = self.cas_mgr.as_ref() {
is_ready = mgr.dedup_chunk(&self.blob_info, chunk.deref(), &self.file);
if is_ready {
self.update_chunk_pending_status(chunk.deref(), true);
}
}
}

// Directly read chunk data from file cache into user buffer iff:
// - the chunk is ready in the file cache
// - data in the file cache is plaintext.
Expand Down
22 changes: 18 additions & 4 deletions storage/src/cache/filecache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use crate::cache::state::{
BlobStateMap, ChunkMap, DigestedChunkMap, IndexedChunkMap, NoopChunkMap,
};
use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr};
use crate::cache::{BlobCache, BlobCacheMgr};
use crate::cache::{BlobCache, BlobCacheMgr, CasMgr};
use crate::device::{BlobFeatures, BlobInfo};
use crate::utils::get_path_from_file;

pub const BLOB_RAW_FILE_SUFFIX: &str = ".blob.raw";
pub const BLOB_DATA_FILE_SUFFIX: &str = ".blob.data";
Expand Down Expand Up @@ -208,11 +209,18 @@ impl FileCacheEntry {
} else {
reader.clone()
};
// Turn off chunk deduplication in case of cache data encryption is enabled or is tarfs.
let cas_mgr = if mgr.cache_encrypted || mgr.cache_raw_data || is_tarfs {
None
} else {
CasMgr::get_singleton()
};

let blob_compressed_size = Self::get_blob_size(&reader, &blob_info)?;
let blob_uncompressed_size = blob_info.uncompressed_size();
let is_legacy_stargz = blob_info.is_legacy_stargz();

let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id);
let (
file,
meta,
Expand All @@ -221,7 +229,6 @@ impl FileCacheEntry {
is_get_blob_object_supported,
need_validation,
) = if is_tarfs {
let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id);
let file = OpenOptions::new()
.create(false)
.write(false)
Expand All @@ -231,7 +238,6 @@ impl FileCacheEntry {
Arc::new(BlobStateMap::from(NoopChunkMap::new(true))) as Arc<dyn ChunkMap>;
(file, None, chunk_map, true, true, false)
} else {
let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id);
let (chunk_map, is_direct_chunkmap) =
Self::create_chunk_map(mgr, &blob_info, &blob_file_path)?;
// Validation is supported by RAFS v5 (which has no meta_ci) or v6 with chunk digest array.
Expand Down Expand Up @@ -266,14 +272,15 @@ impl FileCacheEntry {
);
return Err(einval!(msg));
}
let load_chunk_digest = need_validation || cas_mgr.is_some();
let meta = if blob_info.meta_ci_is_valid() {
let meta = FileCacheMeta::new(
blob_file_path,
blob_info.clone(),
Some(blob_meta_reader),
Some(runtime.clone()),
false,
need_validation,
load_chunk_digest,
)?;
Some(meta)
} else {
Expand Down Expand Up @@ -305,6 +312,11 @@ impl FileCacheEntry {
(Default::default(), Default::default())
};

let mut blob_data_file_path = String::new();
if cas_mgr.is_some() {
blob_data_file_path = get_path_from_file(&file);
}

trace!(
"filecache entry: is_raw_data {}, direct {}, legacy_stargz {}, separate_meta {}, tarfs {}, batch {}, zran {}",
mgr.cache_raw_data,
Expand All @@ -320,8 +332,10 @@ impl FileCacheEntry {
blob_info,
cache_cipher_object,
cache_cipher_context,
cas_mgr,
chunk_map,
file: Arc::new(file),
file_path: Arc::new(blob_data_file_path),
meta,
metrics: mgr.metrics.clone(),
prefetch_state: Arc::new(AtomicU32::new(0)),
Expand Down
22 changes: 18 additions & 4 deletions storage/src/cache/fscache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ use tokio::runtime::Runtime;

use crate::backend::BlobBackend;
use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta};
use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX;
use crate::cache::state::{BlobStateMap, IndexedChunkMap, RangeMap};
use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr};
use crate::cache::{BlobCache, BlobCacheMgr};
use crate::cache::{BlobCache, BlobCacheMgr, CasMgr};
use crate::device::{BlobFeatures, BlobInfo, BlobObject};
use crate::factory::BLOB_FACTORY;

use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX;
use crate::utils::get_path_from_file;

const FSCACHE_BLOBS_CHECK_NUM: u8 = 1;

Expand Down Expand Up @@ -240,9 +240,16 @@ impl FileCacheEntry {
};
let blob_compressed_size = Self::get_blob_size(&reader, &blob_info)?;

// Turn off chunk deduplication in case of tarfs.
let cas_mgr = if is_tarfs {
None
} else {
CasMgr::get_singleton()
};
let need_validation = mgr.need_validation
&& !blob_info.is_legacy_stargz()
&& blob_info.has_feature(BlobFeatures::INLINED_CHUNK_DIGEST);
let load_chunk_digest = need_validation || cas_mgr.is_some();
let blob_file_path = format!("{}/{}", mgr.work_dir, blob_meta_id);
let meta = if blob_info.meta_ci_is_valid() {
FileCacheMeta::new(
Expand All @@ -251,7 +258,7 @@ impl FileCacheEntry {
Some(blob_meta_reader),
None,
true,
need_validation,
load_chunk_digest,
)?
} else {
return Err(enosys!(
Expand All @@ -266,13 +273,20 @@ impl FileCacheEntry {
)?));
Self::restore_chunk_map(blob_info.clone(), file.clone(), &meta, &chunk_map);

let mut blob_data_file_path = String::new();
if cas_mgr.is_some() {
blob_data_file_path = get_path_from_file(&file);
}

Ok(FileCacheEntry {
blob_id,
blob_info: blob_info.clone(),
cache_cipher_object: Default::default(),
cache_cipher_context: Default::default(),
cas_mgr,
chunk_map,
file,
file_path: Arc::new(blob_data_file_path),
meta: Some(meta),
metrics: mgr.metrics.clone(),
prefetch_state: Arc::new(AtomicU32::new(0)),
Expand Down

0 comments on commit b56a24b

Please sign in to comment.