Skip to content

Commit

Permalink
storage: support manually add blob object to localdisk backend driver
Browse files Browse the repository at this point in the history
Enhance the localdisk storage backend, so we can manually add blob
objects in the disk, in addition to discovering blob objects by
scanning GPT partition table.

Signed-off-by: Jiang Liu <[email protected]>
  • Loading branch information
jiangliu committed Jun 21, 2023
1 parent 89666be commit 5dc39e9
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 3 deletions.
3 changes: 3 additions & 0 deletions api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,9 @@ pub struct LocalDiskConfig {
/// Mounted block device path or original localdisk image file path.
#[serde(default)]
pub device_path: String,
/// Disable discover blob objects by scanning GPT table.
#[serde(default)]
pub disable_gpt: bool,
}

/// Configuration information for localfs storage backend.
Expand Down
91 changes: 89 additions & 2 deletions storage/src/backend/localdisk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ pub struct LocalDisk {
device_file: File,
// The disk device path specified by the user
device_path: String,
// Size of the block device.
device_capacity: u64,
// Blobs are discovered by scanning GPT or not.
is_gpt_mode: bool,
// Metrics collector.
Expand All @@ -169,19 +171,65 @@ impl LocalDisk {
path, e
))
})?;
let md = device_file.metadata().map_err(|e| {
eio!(format!(
"localdisk: can not get file meta data about disk device {}, {}",
path, e
))
})?;
let mut local_disk = LocalDisk {
device_file,
device_path: path.clone(),
device_capacity: md.len(),
is_gpt_mode: false,
metrics: BackendMetrics::new(id, "localdisk"),
entries: RwLock::new(HashMap::new()),
};

local_disk.scan_blobs_by_gpt()?;
if !config.disable_gpt {
local_disk.scan_blobs_by_gpt()?;
}

Ok(local_disk)
}

pub fn add_blob(&self, blob_id: &str, offset: u64, length: u64) -> LocalDiskResult<()> {
if self.is_gpt_mode {
let msg = format!(
"localdisk: device {} is in legacy gpt mode",
self.device_path
);
return Err(LocalDiskError::BlobFile(msg));
}
if offset.checked_add(length).is_none() || offset + length > self.device_capacity {
let msg = format!(
"localdisk: add blob {} with invalid offset 0x{:x} and length 0x{:x}, device size 0x{:x}",
blob_id, offset, length, self.device_capacity
);
return Err(LocalDiskError::BlobFile(msg));
};

let device_file = self.device_file.try_clone().map_err(|e| {
LocalDiskError::BlobFile(format!("localdisk: can not duplicate file, {}", e))
})?;
let blob = Arc::new(LocalDiskBlob {
blob_id: blob_id.to_string(),
device_file,
blob_offset: offset,
blob_length: length,
metrics: self.metrics.clone(),
});

let mut table_guard = self.entries.write().unwrap();
if table_guard.contains_key(blob_id) {
let msg = format!("localdisk: blob {} already exists", blob_id);
return Err(LocalDiskError::BlobFile(msg));
}
table_guard.insert(blob_id.to_string(), blob);

Ok(())
}

fn get_blob(&self, blob_id: &str) -> LocalDiskResult<Arc<dyn BlobReader>> {
// Don't expect poisoned lock here.
if let Some(entry) = self.entries.read().unwrap().get(blob_id) {
Expand Down Expand Up @@ -236,7 +284,9 @@ impl LocalDisk {
for (k, v) in partitions {
let length = v.bytes_len(sector_size)?;
let base_offset = v.bytes_start(sector_size)?;
if base_offset.checked_add(length).is_none() {
if base_offset.checked_add(length).is_none()
|| base_offset + length > self.device_capacity
{
let msg = format!(
"localdisk: partition {} with invalid offset and length",
v.part_guid
Expand All @@ -258,6 +308,10 @@ impl LocalDisk {
let msg = format!("localdisk: partition {} has empty blob id", v.part_guid);
return Err(einval!(msg));
}
if table_guard.contains_key(&name) {
let msg = format!("localdisk: blob {} already exists", name);
return Err(einval!(msg));
}

let device_file = self.device_file.try_clone()?;
let partition = Arc::new(LocalDiskBlob {
Expand Down Expand Up @@ -322,15 +376,48 @@ mod tests {
fn test_invalid_localdisk_new() {
let config = LocalDiskConfig {
device_path: "".to_string(),
disable_gpt: true,
};
assert!(LocalDisk::new(&config, Some("test")).is_err());

let config = LocalDiskConfig {
device_path: "/a/b/c".to_string(),
disable_gpt: true,
};
assert!(LocalDisk::new(&config, None).is_err());
}

#[test]
fn test_add_disk_blob() {
let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR");
let root_dir = Path::new(root_dir).join("../tests/texture/blobs/");

let config = LocalDiskConfig {
device_path: root_dir.join("nonexist_blob_file").display().to_string(),
disable_gpt: true,
};
assert!(LocalDisk::new(&config, Some("test")).is_err());

let blob_id = "be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef";
let config = LocalDiskConfig {
device_path: root_dir.join(blob_id).display().to_string(),
disable_gpt: true,
};
let disk = LocalDisk::new(&config, Some("test")).unwrap();

assert!(disk.add_blob(blob_id, u64::MAX, 1).is_err());
assert!(disk.add_blob(blob_id, 14553, 2).is_err());
assert!(disk.add_blob(blob_id, 14554, 1).is_err());
assert!(disk.add_blob(blob_id, 0, 4096).is_ok());
assert!(disk.add_blob(blob_id, 0, 4096).is_err());
let blob = disk.get_blob(blob_id).unwrap();
assert_eq!(blob.blob_size().unwrap(), 4096);

let mut buf = vec![0u8; 4096];
let sz = blob.read(&mut buf, 0).unwrap();
assert_eq!(sz, 4096);
}

#[cfg(feature = "backend-localdisk-gpt")]
#[test]
fn test_truncate_blob_id() {
Expand Down
2 changes: 1 addition & 1 deletion storage/src/cache/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub(crate) enum AsyncPrefetchMessage {
#[cfg_attr(not(test), allow(unused))]
/// Ping for test.
Ping,
#[cfg_attr(not(test), allow(unused))]
#[allow(unused)]
RateLimiter(u64),
}

Expand Down

0 comments on commit 5dc39e9

Please sign in to comment.