From c1b1c7c43ddc23c6c98aab413b8a9dc80d6b4af5 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Sun, 18 Jun 2023 21:56:41 +0800 Subject: [PATCH] storage: support manually add blob object to localdisk backend driver Enhance the localdisk storage backend, so we can manually add blob objects in the disk, in addition to discovering blob objects by scanning GPT partition table. Signed-off-by: Jiang Liu --- api/src/config.rs | 3 + storage/src/backend/localdisk.rs | 123 ++++++++++++++++++++++++++----- storage/src/cache/worker.rs | 2 +- 3 files changed, 110 insertions(+), 18 deletions(-) diff --git a/api/src/config.rs b/api/src/config.rs index c6fb5ace227..c626afa1dbd 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -443,6 +443,9 @@ pub struct LocalDiskConfig { /// Mounted block device path or original localdisk image file path. #[serde(default)] pub device_path: String, + /// Disable discover blob objects by scanning GPT table. + #[serde(default)] + pub disable_gpt: bool, } /// Configuration information for localfs storage backend. diff --git a/storage/src/backend/localdisk.rs b/storage/src/backend/localdisk.rs index 64715ae25c6..329af02c35c 100644 --- a/storage/src/backend/localdisk.rs +++ b/storage/src/backend/localdisk.rs @@ -68,18 +68,18 @@ impl BlobReader for LocalDiskBlob { "localdisk: invalid offset 0x{:x}, base 0x{:x}, length 0x{:x}", offset, self.blob_offset, self.blob_length ); - if offset > self.blob_length { - return Err(LocalDiskError::ReadBlob(msg).into()); + if offset >= self.blob_length { + return Ok(0); } let actual_offset = self .blob_offset .checked_add(offset) .ok_or(LocalDiskError::ReadBlob(msg))?; - let sz = std::cmp::min(self.blob_length - offset, buf.len() as u64) as usize; + let len = std::cmp::min(self.blob_length - offset, buf.len() as u64) as usize; uio::pread( self.device_file.as_raw_fd(), - &mut buf[..sz], + &mut buf[..len], actual_offset as i64, ) .map_err(|e| { @@ -101,13 +101,13 @@ impl BlobReader for LocalDiskBlob { "localdisk: invalid offset 0x{:x}, base 0x{:x}, length 0x{:x}", offset, self.blob_offset, self.blob_length ); - if offset > self.blob_length { - return Err(LocalDiskError::ReadBlob(msg).into()); + if offset >= self.blob_length { + return Ok(0); } let actual_offset = self .blob_offset .checked_add(offset) - .ok_or(LocalDiskError::ReadBlob(msg))?; + .ok_or(LocalDiskError::ReadBlob(msg.clone()))?; let mut c = MemSliceCursor::new(bufs); let mut iovec = c.consume(max_size); @@ -118,10 +118,6 @@ impl BlobReader for LocalDiskBlob { // Guarantees that reads do not exceed the size of the blob if offset.checked_add(len as u64).is_none() || offset + len as u64 > self.blob_length { - let msg = format!( - "localdisk: invalid offset 0x{:x}, base 0x{:x}, length 0x{:x}", - offset, self.blob_offset, self.blob_length - ); return Err(LocalDiskError::ReadBlob(msg).into()); } @@ -145,6 +141,8 @@ pub struct LocalDisk { device_file: File, // The disk device path specified by the user device_path: String, + // Size of the block device. + device_capacity: u64, // Blobs are discovered by scanning GPT or not. is_gpt_mode: bool, // Metrics collector. @@ -155,7 +153,7 @@ pub struct LocalDisk { impl LocalDisk { pub fn new(config: &LocalDiskConfig, id: Option<&str>) -> Result { - let id = id.ok_or_else(|| einval!("localDisk: argument `id` is empty"))?; + let id = id.ok_or_else(|| einval!("localdisk: argument `id` is empty"))?; let path = &config.device_path; let path_buf = Path::new(path).to_path_buf().canonicalize().map_err(|e| { einval!(format!( @@ -169,25 +167,71 @@ impl LocalDisk { path, e )) })?; + let md = device_file.metadata().map_err(|e| { + eio!(format!( + "localdisk: can not get file meta data about disk device {}, {}", + path, e + )) + })?; let mut local_disk = LocalDisk { device_file, device_path: path.clone(), + device_capacity: md.len(), is_gpt_mode: false, metrics: BackendMetrics::new(id, "localdisk"), entries: RwLock::new(HashMap::new()), }; - local_disk.scan_blobs_by_gpt()?; + if !config.disable_gpt { + local_disk.scan_blobs_by_gpt()?; + } Ok(local_disk) } + pub fn add_blob(&self, blob_id: &str, offset: u64, length: u64) -> LocalDiskResult<()> { + if self.is_gpt_mode { + let msg = format!( + "localdisk: device {} is in legacy gpt mode", + self.device_path + ); + return Err(LocalDiskError::BlobFile(msg)); + } + if offset.checked_add(length).is_none() || offset + length > self.device_capacity { + let msg = format!( + "localdisk: add blob {} with invalid offset 0x{:x} and length 0x{:x}, device size 0x{:x}", + blob_id, offset, length, self.device_capacity + ); + return Err(LocalDiskError::BlobFile(msg)); + }; + + let device_file = self.device_file.try_clone().map_err(|e| { + LocalDiskError::BlobFile(format!("localdisk: can not duplicate file, {}", e)) + })?; + let blob = Arc::new(LocalDiskBlob { + blob_id: blob_id.to_string(), + device_file, + blob_offset: offset, + blob_length: length, + metrics: self.metrics.clone(), + }); + + let mut table_guard = self.entries.write().unwrap(); + if table_guard.contains_key(blob_id) { + let msg = format!("localdisk: blob {} already exists", blob_id); + return Err(LocalDiskError::BlobFile(msg)); + } + table_guard.insert(blob_id.to_string(), blob); + + Ok(()) + } + fn get_blob(&self, blob_id: &str) -> LocalDiskResult> { // Don't expect poisoned lock here. if let Some(entry) = self.entries.read().unwrap().get(blob_id) { Ok(entry.clone()) } else { - self.search_blob_from_gpt(blob_id) + self.get_blob_from_gpt(blob_id) } } } @@ -205,7 +249,7 @@ impl LocalDisk { } } - fn search_blob_from_gpt(&self, blob_id: &str) -> LocalDiskResult> { + fn get_blob_from_gpt(&self, blob_id: &str) -> LocalDiskResult> { if self.is_gpt_mode { if let Some(localdisk_blob_id) = LocalDisk::truncate_blob_id(blob_id) { // Don't expect poisoned lock here. @@ -236,7 +280,9 @@ impl LocalDisk { for (k, v) in partitions { let length = v.bytes_len(sector_size)?; let base_offset = v.bytes_start(sector_size)?; - if base_offset.checked_add(length).is_none() { + if base_offset.checked_add(length).is_none() + || base_offset + length > self.device_capacity + { let msg = format!( "localdisk: partition {} with invalid offset and length", v.part_guid @@ -258,6 +304,10 @@ impl LocalDisk { let msg = format!("localdisk: partition {} has empty blob id", v.part_guid); return Err(einval!(msg)); } + if table_guard.contains_key(&name) { + let msg = format!("localdisk: blob {} already exists", name); + return Err(einval!(msg)); + } let device_file = self.device_file.try_clone()?; let partition = Arc::new(LocalDiskBlob { @@ -284,7 +334,7 @@ impl LocalDisk { #[cfg(not(feature = "backend-localdisk-gpt"))] impl LocalDisk { - fn search_blob_from_gpt(&self, blob_id: &str) -> LocalDiskResult> { + fn get_blob_from_gpt(&self, blob_id: &str) -> LocalDiskResult> { Err(LocalDiskError::ReadBlob(format!( "can not find such blob: {}, this image might be corrupted", blob_id @@ -322,15 +372,54 @@ mod tests { fn test_invalid_localdisk_new() { let config = LocalDiskConfig { device_path: "".to_string(), + disable_gpt: true, }; assert!(LocalDisk::new(&config, Some("test")).is_err()); let config = LocalDiskConfig { device_path: "/a/b/c".to_string(), + disable_gpt: true, }; assert!(LocalDisk::new(&config, None).is_err()); } + #[test] + fn test_add_disk_blob() { + let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR"); + let root_dir = Path::new(root_dir).join("../tests/texture/blobs/"); + + let config = LocalDiskConfig { + device_path: root_dir.join("nonexist_blob_file").display().to_string(), + disable_gpt: true, + }; + assert!(LocalDisk::new(&config, Some("test")).is_err()); + + let blob_id = "be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef"; + let config = LocalDiskConfig { + device_path: root_dir.join(blob_id).display().to_string(), + disable_gpt: true, + }; + let disk = LocalDisk::new(&config, Some("test")).unwrap(); + + assert!(disk.add_blob(blob_id, u64::MAX, 1).is_err()); + assert!(disk.add_blob(blob_id, 14553, 2).is_err()); + assert!(disk.add_blob(blob_id, 14554, 1).is_err()); + assert!(disk.add_blob(blob_id, 0, 4096).is_ok()); + assert!(disk.add_blob(blob_id, 0, 4096).is_err()); + let blob = disk.get_blob(blob_id).unwrap(); + assert_eq!(blob.blob_size().unwrap(), 4096); + + let mut buf = vec![0u8; 4096]; + let sz = blob.read(&mut buf, 0).unwrap(); + assert_eq!(sz, 4096); + let sz = blob.read(&mut buf, 4095).unwrap(); + assert_eq!(sz, 1); + let sz = blob.read(&mut buf, 4096).unwrap(); + assert_eq!(sz, 0); + let sz = blob.read(&mut buf, 4097).unwrap(); + assert_eq!(sz, 0); + } + #[cfg(feature = "backend-localdisk-gpt")] #[test] fn test_truncate_blob_id() { diff --git a/storage/src/cache/worker.rs b/storage/src/cache/worker.rs index 965a09ea0fc..295629546d4 100644 --- a/storage/src/cache/worker.rs +++ b/storage/src/cache/worker.rs @@ -52,7 +52,7 @@ pub(crate) enum AsyncPrefetchMessage { #[cfg_attr(not(test), allow(unused))] /// Ping for test. Ping, - #[cfg_attr(not(test), allow(unused))] + #[allow(unused)] RateLimiter(u64), }