Skip to content

Commit

Permalink
storage: support manually add blob object to localdisk backend driver
Browse files Browse the repository at this point in the history
Enhance the localdisk storage backend, so we can manually add blob
objects in the disk, in addition to discovering blob objects by
scanning GPT partition table.

Signed-off-by: Jiang Liu <[email protected]>
  • Loading branch information
jiangliu committed Jul 18, 2023
1 parent 92113d0 commit c1b1c7c
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 18 deletions.
3 changes: 3 additions & 0 deletions api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,9 @@ pub struct LocalDiskConfig {
/// Mounted block device path or original localdisk image file path.
#[serde(default)]
pub device_path: String,
/// Disable discover blob objects by scanning GPT table.
#[serde(default)]
pub disable_gpt: bool,
}

/// Configuration information for localfs storage backend.
Expand Down
123 changes: 106 additions & 17 deletions storage/src/backend/localdisk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,18 @@ impl BlobReader for LocalDiskBlob {
"localdisk: invalid offset 0x{:x}, base 0x{:x}, length 0x{:x}",
offset, self.blob_offset, self.blob_length
);
if offset > self.blob_length {
return Err(LocalDiskError::ReadBlob(msg).into());
if offset >= self.blob_length {
return Ok(0);
}
let actual_offset = self
.blob_offset
.checked_add(offset)
.ok_or(LocalDiskError::ReadBlob(msg))?;
let sz = std::cmp::min(self.blob_length - offset, buf.len() as u64) as usize;
let len = std::cmp::min(self.blob_length - offset, buf.len() as u64) as usize;

uio::pread(
self.device_file.as_raw_fd(),
&mut buf[..sz],
&mut buf[..len],
actual_offset as i64,
)
.map_err(|e| {
Expand All @@ -101,13 +101,13 @@ impl BlobReader for LocalDiskBlob {
"localdisk: invalid offset 0x{:x}, base 0x{:x}, length 0x{:x}",
offset, self.blob_offset, self.blob_length
);
if offset > self.blob_length {
return Err(LocalDiskError::ReadBlob(msg).into());
if offset >= self.blob_length {
return Ok(0);
}
let actual_offset = self
.blob_offset
.checked_add(offset)
.ok_or(LocalDiskError::ReadBlob(msg))?;
.ok_or(LocalDiskError::ReadBlob(msg.clone()))?;

let mut c = MemSliceCursor::new(bufs);
let mut iovec = c.consume(max_size);
Expand All @@ -118,10 +118,6 @@ impl BlobReader for LocalDiskBlob {

// Guarantees that reads do not exceed the size of the blob
if offset.checked_add(len as u64).is_none() || offset + len as u64 > self.blob_length {
let msg = format!(
"localdisk: invalid offset 0x{:x}, base 0x{:x}, length 0x{:x}",
offset, self.blob_offset, self.blob_length
);
return Err(LocalDiskError::ReadBlob(msg).into());
}

Expand All @@ -145,6 +141,8 @@ pub struct LocalDisk {
device_file: File,
// The disk device path specified by the user
device_path: String,
// Size of the block device.
device_capacity: u64,
// Blobs are discovered by scanning GPT or not.
is_gpt_mode: bool,
// Metrics collector.
Expand All @@ -155,7 +153,7 @@ pub struct LocalDisk {

impl LocalDisk {
pub fn new(config: &LocalDiskConfig, id: Option<&str>) -> Result<LocalDisk> {
let id = id.ok_or_else(|| einval!("localDisk: argument `id` is empty"))?;
let id = id.ok_or_else(|| einval!("localdisk: argument `id` is empty"))?;
let path = &config.device_path;
let path_buf = Path::new(path).to_path_buf().canonicalize().map_err(|e| {
einval!(format!(
Expand All @@ -169,25 +167,71 @@ impl LocalDisk {
path, e
))
})?;
let md = device_file.metadata().map_err(|e| {
eio!(format!(
"localdisk: can not get file meta data about disk device {}, {}",
path, e
))
})?;
let mut local_disk = LocalDisk {
device_file,
device_path: path.clone(),
device_capacity: md.len(),
is_gpt_mode: false,
metrics: BackendMetrics::new(id, "localdisk"),
entries: RwLock::new(HashMap::new()),
};

local_disk.scan_blobs_by_gpt()?;
if !config.disable_gpt {
local_disk.scan_blobs_by_gpt()?;
}

Ok(local_disk)
}

pub fn add_blob(&self, blob_id: &str, offset: u64, length: u64) -> LocalDiskResult<()> {
if self.is_gpt_mode {
let msg = format!(
"localdisk: device {} is in legacy gpt mode",
self.device_path
);
return Err(LocalDiskError::BlobFile(msg));
}
if offset.checked_add(length).is_none() || offset + length > self.device_capacity {
let msg = format!(
"localdisk: add blob {} with invalid offset 0x{:x} and length 0x{:x}, device size 0x{:x}",
blob_id, offset, length, self.device_capacity
);
return Err(LocalDiskError::BlobFile(msg));
};

let device_file = self.device_file.try_clone().map_err(|e| {
LocalDiskError::BlobFile(format!("localdisk: can not duplicate file, {}", e))
})?;
let blob = Arc::new(LocalDiskBlob {
blob_id: blob_id.to_string(),
device_file,
blob_offset: offset,
blob_length: length,
metrics: self.metrics.clone(),
});

let mut table_guard = self.entries.write().unwrap();
if table_guard.contains_key(blob_id) {
let msg = format!("localdisk: blob {} already exists", blob_id);
return Err(LocalDiskError::BlobFile(msg));
}
table_guard.insert(blob_id.to_string(), blob);

Ok(())
}

fn get_blob(&self, blob_id: &str) -> LocalDiskResult<Arc<dyn BlobReader>> {
// Don't expect poisoned lock here.
if let Some(entry) = self.entries.read().unwrap().get(blob_id) {
Ok(entry.clone())
} else {
self.search_blob_from_gpt(blob_id)
self.get_blob_from_gpt(blob_id)
}
}
}
Expand All @@ -205,7 +249,7 @@ impl LocalDisk {
}
}

fn search_blob_from_gpt(&self, blob_id: &str) -> LocalDiskResult<Arc<dyn BlobReader>> {
fn get_blob_from_gpt(&self, blob_id: &str) -> LocalDiskResult<Arc<dyn BlobReader>> {
if self.is_gpt_mode {
if let Some(localdisk_blob_id) = LocalDisk::truncate_blob_id(blob_id) {
// Don't expect poisoned lock here.
Expand Down Expand Up @@ -236,7 +280,9 @@ impl LocalDisk {
for (k, v) in partitions {
let length = v.bytes_len(sector_size)?;
let base_offset = v.bytes_start(sector_size)?;
if base_offset.checked_add(length).is_none() {
if base_offset.checked_add(length).is_none()
|| base_offset + length > self.device_capacity
{
let msg = format!(
"localdisk: partition {} with invalid offset and length",
v.part_guid
Expand All @@ -258,6 +304,10 @@ impl LocalDisk {
let msg = format!("localdisk: partition {} has empty blob id", v.part_guid);
return Err(einval!(msg));
}
if table_guard.contains_key(&name) {
let msg = format!("localdisk: blob {} already exists", name);
return Err(einval!(msg));
}

let device_file = self.device_file.try_clone()?;
let partition = Arc::new(LocalDiskBlob {
Expand All @@ -284,7 +334,7 @@ impl LocalDisk {

#[cfg(not(feature = "backend-localdisk-gpt"))]
impl LocalDisk {
fn search_blob_from_gpt(&self, blob_id: &str) -> LocalDiskResult<Arc<dyn BlobReader>> {
fn get_blob_from_gpt(&self, blob_id: &str) -> LocalDiskResult<Arc<dyn BlobReader>> {
Err(LocalDiskError::ReadBlob(format!(
"can not find such blob: {}, this image might be corrupted",
blob_id
Expand Down Expand Up @@ -322,15 +372,54 @@ mod tests {
fn test_invalid_localdisk_new() {
let config = LocalDiskConfig {
device_path: "".to_string(),
disable_gpt: true,
};
assert!(LocalDisk::new(&config, Some("test")).is_err());

let config = LocalDiskConfig {
device_path: "/a/b/c".to_string(),
disable_gpt: true,
};
assert!(LocalDisk::new(&config, None).is_err());
}

#[test]
fn test_add_disk_blob() {
let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR");
let root_dir = Path::new(root_dir).join("../tests/texture/blobs/");

let config = LocalDiskConfig {
device_path: root_dir.join("nonexist_blob_file").display().to_string(),
disable_gpt: true,
};
assert!(LocalDisk::new(&config, Some("test")).is_err());

let blob_id = "be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef";
let config = LocalDiskConfig {
device_path: root_dir.join(blob_id).display().to_string(),
disable_gpt: true,
};
let disk = LocalDisk::new(&config, Some("test")).unwrap();

assert!(disk.add_blob(blob_id, u64::MAX, 1).is_err());
assert!(disk.add_blob(blob_id, 14553, 2).is_err());
assert!(disk.add_blob(blob_id, 14554, 1).is_err());
assert!(disk.add_blob(blob_id, 0, 4096).is_ok());
assert!(disk.add_blob(blob_id, 0, 4096).is_err());
let blob = disk.get_blob(blob_id).unwrap();
assert_eq!(blob.blob_size().unwrap(), 4096);

let mut buf = vec![0u8; 4096];
let sz = blob.read(&mut buf, 0).unwrap();
assert_eq!(sz, 4096);
let sz = blob.read(&mut buf, 4095).unwrap();
assert_eq!(sz, 1);
let sz = blob.read(&mut buf, 4096).unwrap();
assert_eq!(sz, 0);
let sz = blob.read(&mut buf, 4097).unwrap();
assert_eq!(sz, 0);
}

#[cfg(feature = "backend-localdisk-gpt")]
#[test]
fn test_truncate_blob_id() {
Expand Down
2 changes: 1 addition & 1 deletion storage/src/cache/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub(crate) enum AsyncPrefetchMessage {
#[cfg_attr(not(test), allow(unused))]
/// Ping for test.
Ping,
#[cfg_attr(not(test), allow(unused))]
#[allow(unused)]
RateLimiter(u64),
}

Expand Down

0 comments on commit c1b1c7c

Please sign in to comment.