Skip to content

Commit

Permalink
storage: enable chunk deduplication for file cache
Browse files Browse the repository at this point in the history
Enable chunk deduplication for file cache. It works in this way:
- When a chunk is not in blob cache file yet, inquery CAS database
  whether other blob data files have the required chunk. If there's
  duplicated data chunk in other data files, copy the chunk data
  into current blob cache file by using copy_file_range().
- After downloading a data chunk from remote, save file/offset/chunk-id
  into CAS database, so it can be reused later.

Co-authored-by: Jiang Liu <[email protected]>
Co-authored-by: Yading Ding <[email protected]>
Signed-off-by: Yadong Ding <[email protected]>
  • Loading branch information
Desiki-high and jiangliu committed Sep 27, 2024
1 parent 3cac5ec commit 1b7bd3e
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 13 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ default = [
"backend-s3",
"backend-http-proxy",
"backend-localdisk",
"dedup",
]
virtiofs = [
"nydus-service/virtiofs",
Expand All @@ -116,6 +117,8 @@ backend-oss = ["nydus-storage/backend-oss"]
backend-registry = ["nydus-storage/backend-registry"]
backend-s3 = ["nydus-storage/backend-s3"]

dedup = ["nydus-storage/dedup"]

[workspace]
members = [
"api",
Expand Down
23 changes: 21 additions & 2 deletions src/bin/nydusd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use nydus_service::{
create_daemon, create_fuse_daemon, create_vfs_backend, validate_threads_configuration,
Error as NydusError, FsBackendMountCmd, FsBackendType, ServiceArgs,
};
use nydus_storage::cache::CasMgr;

use crate::api_server_glue::ApiServerController;

Expand All @@ -50,7 +51,7 @@ fn thread_validator(v: &str) -> std::result::Result<String, String> {
}

fn append_fs_options(app: Command) -> Command {
app.arg(
let mut app = app.arg(
Arg::new("bootstrap")
.long("bootstrap")
.short('B')
Expand Down Expand Up @@ -87,7 +88,18 @@ fn append_fs_options(app: Command) -> Command {
.help("Mountpoint within the FUSE/virtiofs device to mount the RAFS/passthroughfs filesystem")
.default_value("/")
.required(false),
)
);

#[cfg(feature = "dedup")]
{
app = app.arg(
Arg::new("dedup-db")
.long("dedup-db")
.help("Database file for chunk deduplication"),
);
}

app
}

fn append_fuse_options(app: Command) -> Command {
Expand Down Expand Up @@ -750,6 +762,13 @@ fn main() -> Result<()> {
dump_program_info();
handle_rlimit_nofile_option(&args, "rlimit-nofile")?;

#[cfg(feature = "dedup")]
if let Some(db) = args.get_one::<String>("dedup-db") {
let mgr = CasMgr::new(db).map_err(|e| eother!(format!("{}", e)))?;
info!("Enable chunk deduplication by using database at {}", db);
CasMgr::set_singleton(mgr);
}

match args.subcommand_name() {
Some("singleton") => {
// Safe to unwrap because the subcommand is `singleton`.
Expand Down
1 change: 0 additions & 1 deletion storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ regex = "1.7.0"
toml = "0.5"

[features]
default = ["dedup"]
backend-localdisk = []
backend-localdisk-gpt = ["gpt", "backend-localdisk"]
backend-localfs = []
Expand Down
33 changes: 31 additions & 2 deletions storage/src/cache/cachedfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::collections::HashSet;
use std::fs::File;
use std::io::{ErrorKind, Read, Result};
use std::mem::ManuallyDrop;
use std::ops::Deref;
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
Expand All @@ -29,7 +30,7 @@ use tokio::runtime::Runtime;
use crate::backend::BlobReader;
use crate::cache::state::ChunkMap;
use crate::cache::worker::{AsyncPrefetchConfig, AsyncPrefetchMessage, AsyncWorkerMgr};
use crate::cache::{BlobCache, BlobIoMergeState};
use crate::cache::{BlobCache, BlobIoMergeState, CasMgr};
use crate::device::{
BlobChunkInfo, BlobInfo, BlobIoDesc, BlobIoRange, BlobIoSegment, BlobIoTag, BlobIoVec,
BlobObject, BlobPrefetchRequest,
Expand Down Expand Up @@ -184,8 +185,10 @@ pub(crate) struct FileCacheEntry {
pub(crate) blob_info: Arc<BlobInfo>,
pub(crate) cache_cipher_object: Arc<Cipher>,
pub(crate) cache_cipher_context: Arc<CipherContext>,
pub(crate) cas_mgr: Option<Arc<CasMgr>>,
pub(crate) chunk_map: Arc<dyn ChunkMap>,
pub(crate) file: Arc<File>,
pub(crate) file_path: Arc<String>,
pub(crate) meta: Option<FileCacheMeta>,
pub(crate) metrics: Arc<BlobcacheMetrics>,
pub(crate) prefetch_state: Arc<AtomicU32>,
Expand Down Expand Up @@ -233,13 +236,16 @@ impl FileCacheEntry {
}

fn delay_persist_chunk_data(&self, chunk: Arc<dyn BlobChunkInfo>, buffer: Arc<DataBuffer>) {
let blob_info = self.blob_info.clone();
let delayed_chunk_map = self.chunk_map.clone();
let file = self.file.clone();
let file_path = self.file_path.clone();
let metrics = self.metrics.clone();
let is_raw_data = self.is_raw_data;
let is_cache_encrypted = self.is_cache_encrypted;
let cipher_object = self.cache_cipher_object.clone();
let cipher_context = self.cache_cipher_context.clone();
let cas_mgr = self.cas_mgr.clone();

metrics.buffered_backend_size.add(buffer.size() as u64);
self.runtime.spawn_blocking(move || {
Expand Down Expand Up @@ -291,6 +297,11 @@ impl FileCacheEntry {
};
let res = Self::persist_cached_data(&file, offset, buf);
Self::_update_chunk_pending_status(&delayed_chunk_map, chunk.as_ref(), res.is_ok());
if let Some(mgr) = cas_mgr {
if let Err(e) = mgr.record_chunk(&blob_info, chunk.deref(), file_path.as_ref()) {
warn!("failed to record chunk state for dedup, {}", e);
}
}
});
}

Expand Down Expand Up @@ -1051,13 +1062,21 @@ impl FileCacheEntry {
trace!("dispatch single io range {:?}", req);
let mut blob_cci = BlobCCI::new();
for (i, chunk) in req.chunks.iter().enumerate() {
let is_ready = match self.chunk_map.check_ready_and_mark_pending(chunk.as_ref()) {
let mut is_ready = match self.chunk_map.check_ready_and_mark_pending(chunk.as_ref()) {
Ok(true) => true,
Ok(false) => false,
Err(StorageError::Timeout) => false, // Retry if waiting for inflight IO timeouts
Err(e) => return Err(einval!(e)),
};

if !is_ready {
if let Some(mgr) = self.cas_mgr.as_ref() {
is_ready = mgr.dedup_chunk(&self.blob_info, chunk.deref(), &self.file);
if is_ready {
self.update_chunk_pending_status(chunk.deref(), true);
}
}
}
// Directly read chunk data from file cache into user buffer iff:
// - the chunk is ready in the file cache
// - data in the file cache is plaintext.
Expand Down Expand Up @@ -1454,6 +1473,16 @@ impl FileCacheEntry {
}
}

impl Drop for FileCacheEntry {
fn drop(&mut self) {
if let Some(cas_mgr) = &self.cas_mgr {
if let Err(e) = cas_mgr.gc() {
warn!("cas_mgr gc failed: {}", e);
}
}
}
}

/// An enum to reuse existing buffers for IO operations, and CoW on demand.
#[allow(dead_code)]
enum DataBuffer {
Expand Down
29 changes: 25 additions & 4 deletions storage/src/cache/filecache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use crate::cache::state::{
BlobStateMap, ChunkMap, DigestedChunkMap, IndexedChunkMap, NoopChunkMap,
};
use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr};
use crate::cache::{BlobCache, BlobCacheMgr};
use crate::cache::{BlobCache, BlobCacheMgr, CasMgr};
use crate::device::{BlobFeatures, BlobInfo};
use crate::utils::get_path_from_file;

pub const BLOB_RAW_FILE_SUFFIX: &str = ".blob.raw";
pub const BLOB_DATA_FILE_SUFFIX: &str = ".blob.data";
Expand Down Expand Up @@ -209,10 +210,19 @@ impl FileCacheEntry {
reader.clone()
};

// Turn off chunk deduplication in case of cache data encryption is enabled or is tarfs.
let cas_mgr = if mgr.cache_encrypted || mgr.cache_raw_data || is_tarfs {
warn!("chunk deduplication trun off");
None
} else {
CasMgr::get_singleton()
};

let blob_compressed_size = Self::get_blob_size(&reader, &blob_info)?;
let blob_uncompressed_size = blob_info.uncompressed_size();
let is_legacy_stargz = blob_info.is_legacy_stargz();

let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id);
let (
file,
meta,
Expand All @@ -221,7 +231,6 @@ impl FileCacheEntry {
is_get_blob_object_supported,
need_validation,
) = if is_tarfs {
let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id);
let file = OpenOptions::new()
.create(false)
.write(false)
Expand All @@ -231,7 +240,6 @@ impl FileCacheEntry {
Arc::new(BlobStateMap::from(NoopChunkMap::new(true))) as Arc<dyn ChunkMap>;
(file, None, chunk_map, true, true, false)
} else {
let blob_file_path = format!("{}/{}", mgr.work_dir, blob_id);
let (chunk_map, is_direct_chunkmap) =
Self::create_chunk_map(mgr, &blob_info, &blob_file_path)?;
// Validation is supported by RAFS v5 (which has no meta_ci) or v6 with chunk digest array.
Expand Down Expand Up @@ -266,6 +274,7 @@ impl FileCacheEntry {
);
return Err(einval!(msg));
}
let load_chunk_digest = need_validation || cas_mgr.is_some();
let meta = if blob_info.meta_ci_is_valid()
|| blob_info.has_feature(BlobFeatures::IS_CHUNKDICT_GENERATED)
{
Expand All @@ -275,7 +284,7 @@ impl FileCacheEntry {
Some(blob_meta_reader),
Some(runtime.clone()),
false,
need_validation,
load_chunk_digest,
)?;
Some(meta)
} else {
Expand Down Expand Up @@ -307,6 +316,16 @@ impl FileCacheEntry {
(Default::default(), Default::default())
};

let mut blob_data_file_path = String::new();
if cas_mgr.is_some() {
blob_data_file_path = if let Some(path) = get_path_from_file(&file) {
path
} else {
warn!("can't get path from file");
"".to_string()
}
}

trace!(
"filecache entry: is_raw_data {}, direct {}, legacy_stargz {}, separate_meta {}, tarfs {}, batch {}, zran {}",
mgr.cache_raw_data,
Expand All @@ -322,8 +341,10 @@ impl FileCacheEntry {
blob_info,
cache_cipher_object,
cache_cipher_context,
cas_mgr,
chunk_map,
file: Arc::new(file),
file_path: Arc::new(blob_data_file_path),
meta,
metrics: mgr.metrics.clone(),
prefetch_state: Arc::new(AtomicU32::new(0)),
Expand Down
29 changes: 25 additions & 4 deletions storage/src/cache/fscache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ use tokio::runtime::Runtime;

use crate::backend::BlobBackend;
use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta};
use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX;
use crate::cache::state::{BlobStateMap, IndexedChunkMap, RangeMap};
use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr};
use crate::cache::{BlobCache, BlobCacheMgr};
use crate::cache::{BlobCache, BlobCacheMgr, CasMgr};
use crate::device::{BlobFeatures, BlobInfo, BlobObject};
use crate::factory::BLOB_FACTORY;

use crate::cache::filecache::BLOB_DATA_FILE_SUFFIX;
use crate::utils::get_path_from_file;

const FSCACHE_BLOBS_CHECK_NUM: u8 = 1;

Expand Down Expand Up @@ -240,9 +240,18 @@ impl FileCacheEntry {
};
let blob_compressed_size = Self::get_blob_size(&reader, &blob_info)?;

// Turn off chunk deduplication in case of tarfs.
let cas_mgr = if is_tarfs {
warn!("chunk deduplication trun off");
None
} else {
CasMgr::get_singleton()
};

let need_validation = mgr.need_validation
&& !blob_info.is_legacy_stargz()
&& blob_info.has_feature(BlobFeatures::INLINED_CHUNK_DIGEST);
let load_chunk_digest = need_validation || cas_mgr.is_some();
let blob_file_path = format!("{}/{}", mgr.work_dir, blob_meta_id);
let meta = if blob_info.meta_ci_is_valid() {
FileCacheMeta::new(
Expand All @@ -251,7 +260,7 @@ impl FileCacheEntry {
Some(blob_meta_reader),
None,
true,
need_validation,
load_chunk_digest,
)?
} else {
return Err(enosys!(
Expand All @@ -266,13 +275,25 @@ impl FileCacheEntry {
)?));
Self::restore_chunk_map(blob_info.clone(), file.clone(), &meta, &chunk_map);

let mut blob_data_file_path = String::new();
if cas_mgr.is_some() {
blob_data_file_path = if let Some(path) = get_path_from_file(&file) {
path
} else {
warn!("can't get path from file");
"".to_string()
}
}

Ok(FileCacheEntry {
blob_id,
blob_info: blob_info.clone(),
cache_cipher_object: Default::default(),
cache_cipher_context: Default::default(),
cas_mgr,
chunk_map,
file,
file_path: Arc::new(blob_data_file_path),
meta: Some(meta),
metrics: mgr.metrics.clone(),
prefetch_state: Arc::new(AtomicU32::new(0)),
Expand Down

0 comments on commit 1b7bd3e

Please sign in to comment.