Skip to content

Commit

Permalink
storage: implement CasManager to support chunk dedup at runtime
Browse files Browse the repository at this point in the history
Implement CasManager to support chunk dedup at runtime.
The manager provides to major interfaces:
- add chunk data to the CAS database
- check whether a chunk exists in CAS database and copy it to blob file
  by copy_file_range() if the chunk exists.

Signed-off-by: Jiang Liu <[email protected]>
  • Loading branch information
jiangliu committed Dec 7, 2023
1 parent f9396dc commit e188240
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 2 deletions.
5 changes: 3 additions & 2 deletions storage/src/cache/dedup/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::path::Path;

use r2d2::{Pool, PooledConnection};
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::{Connection, DropBehavior, OptionalExtension, Transaction};
use rusqlite::{Connection, DropBehavior, OpenFlags, OptionalExtension, Transaction};

use super::Result;

Expand All @@ -24,7 +24,8 @@ impl CasDb {
}

pub fn from_file(db_path: impl AsRef<Path>) -> Result<CasDb> {
let mgr = SqliteConnectionManager::file(db_path);
let mgr = SqliteConnectionManager::file(db_path)
.with_flags(OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE);
let pool = r2d2::Pool::new(mgr)?;
let conn = pool.get()?;

Expand Down
173 changes: 173 additions & 0 deletions storage/src/cache/dedup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,26 @@
//
// SPDX-License-Identifier: Apache-2.0

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::{self, Display, Formatter};
use std::fs::{File, OpenOptions};
use std::io::Error;
use std::path::Path;
use std::sync::{Arc, Mutex, RwLock};

use nydus_utils::digest::RafsDigest;

use crate::cache::dedup::db::CasDb;
use crate::device::{BlobChunkInfo, BlobInfo};
use crate::utils::copy_file_range;

mod db;

lazy_static::lazy_static!(
static ref CAS_MGR: Mutex<Option<Arc<CasMgr>>> = Mutex::new(None);
);

/// Error codes related to local cas.
#[derive(Debug)]
pub enum CasError {
Expand Down Expand Up @@ -47,3 +62,161 @@ impl From<Error> for CasError {

/// Specialized `Result` for local cas.
type Result<T> = std::result::Result<T, CasError>;

pub struct CasMgr {
db: CasDb,
fds: RwLock<HashMap<String, Arc<File>>>,
}

impl CasMgr {
pub fn new(db_path: impl AsRef<Path>) -> Result<Self> {
let db = CasDb::from_file(db_path.as_ref())?;

Ok(CasMgr {
db,
fds: RwLock::new(HashMap::new()),
})
}

pub fn set_singleton(mgr: CasMgr) {
*CAS_MGR.lock().unwrap() = Some(Arc::new(mgr));
}

pub fn get_singleton() -> Option<Arc<CasMgr>> {
CAS_MGR.lock().unwrap().clone()
}

/// Deduplicate chunk data from existing data files.
///
/// If any error happens, just pretend there's no source data available for dedup.
pub fn dedup_chunk(
&self,
blob: &BlobInfo,
chunk: &dyn BlobChunkInfo,
cache_file: &File,
) -> bool {
let key = Self::chunk_key(blob, chunk);
if key.is_empty() {
return false;
}

if let Ok(Some((path, offset))) = self.db.get_chunk_info(&key) {
let guard = self.fds.read().unwrap();
let mut d_file = guard.get(&path).cloned();
drop(guard);

// Open the source file for dedup on demand.
if d_file.is_none() {
match OpenOptions::new().read(true).open(&path) {
Err(e) => warn!("failed to open dedup source file {}, {}", path, e),
Ok(f) => {
let mut guard = self.fds.write().unwrap();
match guard.entry(path) {
Entry::Vacant(e) => {
let f = Arc::new(f);
e.insert(f.clone());
d_file = Some(f);
}
Entry::Occupied(f) => {
// Somebody else has inserted the file, use it
d_file = Some(f.get().clone());
}
}
}
}
}

if let Some(f) = d_file {
match copy_file_range(
f,
offset,
cache_file,
chunk.uncompressed_offset(),
chunk.uncompressed_size() as usize,
) {
Ok(_) => {
return true;
}
Err(e) => warn!("{e}"),
}
}
}

false
}

/// Add an available chunk data into the CAS database.
pub fn record_chunk(
&self,
blob: &BlobInfo,
chunk: &dyn BlobChunkInfo,
path: impl AsRef<Path>,
) -> Result<()> {
let key = Self::chunk_key(blob, chunk);
if key.is_empty() {
return Ok(());
}

let path = path.as_ref().canonicalize()?;
let path = path.display().to_string();
self.record_chunk_raw(&key, &path, chunk.uncompressed_offset())
}

pub fn record_chunk_raw(&self, chunk_id: &str, path: &str, offset: u64) -> Result<()> {
self.db.add_blob(path)?;
self.db.add_chunk(chunk_id, offset, path)?;
Ok(())
}

fn chunk_key(blob: &BlobInfo, chunk: &dyn BlobChunkInfo) -> String {
let id = chunk.chunk_id();
if *id == RafsDigest::default() {
String::new()
} else {
blob.digester().to_string() + ":" + &chunk.chunk_id().to_string()
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::device::BlobFeatures;
use crate::test::MockChunkInfo;
use crate::RAFS_DEFAULT_CHUNK_SIZE;
use std::io::{Read, Write};
use vmm_sys_util::tempfile::TempFile;

#[test]
fn test_cas_chunk_op() {
let dbfile = TempFile::new().unwrap();
let tmpfile2 = TempFile::new().unwrap();
let src_path = tmpfile2.as_path().display().to_string();
let mgr = CasMgr::new(dbfile.as_path()).unwrap();

let blob = BlobInfo::new(
1,
src_path.clone(),
8192,
8192,
RAFS_DEFAULT_CHUNK_SIZE as u32,
1,
BlobFeatures::empty(),
);
let mut chunk = MockChunkInfo::new();
chunk.uncompress_offset = 0;
chunk.uncompress_size = 8192;
let chunk = Arc::new(chunk) as Arc<dyn BlobChunkInfo>;

let buf = vec![0x9u8; 8192];
let mut src_file = tmpfile2.as_file().try_clone().unwrap();
src_file.write_all(&buf).unwrap();
mgr.record_chunk(&blob, chunk.as_ref(), &src_path).unwrap();

let mut tmpfile3 = TempFile::new().unwrap().into_file();
assert!(mgr.dedup_chunk(&blob, chunk.as_ref(), &tmpfile3));
let mut buf2 = vec![0x0u8; 8192];
tmpfile3.read_exact(&mut buf2).unwrap();
assert_eq!(buf, buf2);
}
}
6 changes: 6 additions & 0 deletions storage/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,12 @@ pub(crate) trait BlobCacheMgr: Send + Sync {
fn check_stat(&self);
}

#[cfg(feature = "dedup")]
pub use dedup::CasMgr;

#[cfg(not(feature = "dedup"))]
pub struct CasMgr {}

#[cfg(test)]
mod tests {
use crate::device::{BlobChunkFlags, BlobFeatures};
Expand Down

0 comments on commit e188240

Please sign in to comment.