Skip to content

Commit

Permalink
storage: add garbage collection in CasMgr
Browse files Browse the repository at this point in the history
- Changed `delete_blobs` method in `CasDb` to take an immutable reference (`&self`) instead of a mutable reference (`&mut self`).
- Updated `dedup_chunk` method in `CasMgr` to correctly handle the deletion of non-existent blob files from both the file descriptor cache and the database.
- Implemented the `gc` (garbage collection) method in `CasMgr` to identify and remove blobs that no longer exist on the filesystem, ensuring the database and cache remain consistent.

Signed-off-by: Yadong Ding <[email protected]>
  • Loading branch information
Desiki-high committed Sep 27, 2024
1 parent 21dec2b commit 3cac5ec
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 9 deletions.
2 changes: 1 addition & 1 deletion storage/src/cache/dedup/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl CasDb {
Ok(conn.last_insert_rowid() as u64)
}

pub fn delete_blobs(&mut self, blobs: &[String]) -> Result<()> {
pub fn delete_blobs(&self, blobs: &[String]) -> Result<()> {
let delete_blobs_sql = "DELETE FROM Blobs WHERE BlobId = (?1)";
let delete_chunks_sql = "DELETE FROM Chunks WHERE BlobId = (?1)";
let mut conn = self.get_connection()?;
Expand Down
100 changes: 100 additions & 0 deletions storage/src/cache/dedup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ impl CasMgr {
}
}
}
} else if d_file.as_ref().unwrap().metadata().is_err() {
// If the blob file no longer exists, delete if from fds and db.
let mut guard = self.fds.write().unwrap();
guard.remove(&path);
let blob_ids: &[String] = &[path];
if let Err(e) = self.db.delete_blobs(&blob_ids) {
warn!("failed to delete blobs: {}", e);
}
return false;
}

if let Some(f) = d_file {
Expand Down Expand Up @@ -176,6 +185,33 @@ impl CasMgr {
blob.digester().to_string() + ":" + &chunk.chunk_id().to_string()
}
}

/// Check if blobs in the database still exist on the filesystem and perform garbage collection.
pub fn gc(&self) -> Result<()> {
let all_blobs = self.db.get_all_blobs()?;
let mut blobs_not_exist = Vec::new();
for (_, file_path) in all_blobs {
if !std::path::Path::new(&file_path).exists() {
blobs_not_exist.push(file_path);
}
}

// If there are any non-existent blobs, delete them from the database.
if !blobs_not_exist.is_empty() {
self.db.delete_blobs(&blobs_not_exist).map_err(|e| {
warn!("failed to delete blobs: {}", e);
e
})?;
}

let mut guard = self.fds.write().unwrap();
for path in blobs_not_exist {
// Remove the non-existent blob paths from the cache.
guard.remove(&path);
}

Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -220,4 +256,68 @@ mod tests {
tmpfile3.read_exact(&mut buf2).unwrap();
assert_eq!(buf, buf2);
}

#[test]
fn test_cas_dedup_chunk_failed() {
let dbfile = TempFile::new().unwrap();
let mgr = CasMgr::new(dbfile.as_path()).unwrap();

let new_blob = BlobInfo::new(
1,
"test_blob".to_string(),
8192,
8192,
RAFS_DEFAULT_CHUNK_SIZE as u32,
1,
BlobFeatures::empty(),
);

let mut chunk = MockChunkInfo::new();
chunk.block_id = RafsDigest::default();
chunk.uncompress_offset = 0;
chunk.uncompress_size = 8192;
let chunk = Arc::new(chunk) as Arc<dyn BlobChunkInfo>;

let tmpfile = TempFile::new().unwrap().into_file();

assert!(!mgr.dedup_chunk(&new_blob, chunk.as_ref(), &tmpfile));
}

#[test]
fn test_cas_gc() {
let dbfile = TempFile::new().unwrap();
let mgr = CasMgr::new(dbfile.as_path()).unwrap();

let tmpfile = TempFile::new().unwrap();
let blob_path = tmpfile
.as_path()
.canonicalize()
.unwrap()
.display()
.to_string();
let blob = BlobInfo::new(
1,
blob_path.clone(),
8192,
8192,
RAFS_DEFAULT_CHUNK_SIZE as u32,
1,
BlobFeatures::empty(),
);
let mut chunk = MockChunkInfo::new();
chunk.block_id = RafsDigest { data: [3u8; 32] };
chunk.uncompress_offset = 0;
chunk.uncompress_size = 8192;
let chunk = Arc::new(chunk) as Arc<dyn BlobChunkInfo>;
mgr.record_chunk(&blob, chunk.as_ref(), &blob_path).unwrap();

let all_blobs_before_gc = mgr.db.get_all_blobs().unwrap();
assert_eq!(all_blobs_before_gc.len(), 1);

drop(tmpfile);
mgr.gc().unwrap();

let all_blobs_after_gc = mgr.db.get_all_blobs().unwrap();
assert_eq!(all_blobs_after_gc.len(), 0);
}
}
30 changes: 22 additions & 8 deletions storage/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ use std::cmp::{self, min};
use std::io::{ErrorKind, IoSliceMut, Result};
use std::os::fd::{AsFd, AsRawFd};
use std::os::unix::io::RawFd;
#[cfg(target_os = "linux")]
use std::path::PathBuf;
use std::slice::from_raw_parts_mut;
#[cfg(target_os = "macos")]
use std::{ffi::CStr, mem, os::raw::c_char};
use vm_memory::bytes::Bytes;

use crate::{StorageError, StorageResult};
Expand Down Expand Up @@ -173,6 +176,7 @@ pub fn copy_file_range(
Ok(())
}

#[cfg(target_os = "linux")]
pub fn get_path_from_file(file: &impl AsRawFd) -> Option<String> {
let path = PathBuf::from("/proc/self/fd").join(file.as_raw_fd().to_string());
match std::fs::read_link(&path) {
Expand All @@ -184,6 +188,22 @@ pub fn get_path_from_file(file: &impl AsRawFd) -> Option<String> {
}
}

#[cfg(target_os = "macos")]
pub fn get_path_from_file(file: &impl AsRawFd) -> Option<String> {
let fd = file.as_raw_fd();
let mut buf: [c_char; 1024] = unsafe { mem::zeroed() };

let result = unsafe { fcntl(fd, libc::F_GETPATH, buf.as_mut_ptr()) };

if result == -1 {
warn!("Failed to get path from file descriptor");
return None;
}

let cstr = unsafe { CStr::from_ptr(buf.as_ptr()) };
cstr.to_str().ok().map(|s| s.to_string())
}

/// An memory cursor to access an `FileVolatileSlice` array.
pub struct MemSliceCursor<'a> {
pub mem_slice: &'a [FileVolatileSlice<'a>],
Expand Down Expand Up @@ -481,20 +501,14 @@ mod tests {
assert!(copy_file_range(&empty_src, 0, dst.as_file(), 4096, 4096).is_err());
}

#[cfg(target_os = "linux")]
#[test]
fn test_get_path_from_file() {
let temp_file = TempFile::new().unwrap();
let file = temp_file.as_file();
let path = get_path_from_file(file).unwrap();
assert_eq!(path, temp_file.as_path().display().to_string());
}

#[cfg(not(target_os = "linux"))]
#[test]
fn test_get_path_from_file_non_linux() {
let temp_file = TempFile::new().unwrap();
let file = temp_file.as_file();
assert_eq!(get_path_from_file(file).is_none());
let invalid_fd: RawFd = -1;
assert!(get_path_from_file(&invalid_fd).is_none());
}
}

0 comments on commit 3cac5ec

Please sign in to comment.