From e7bed08d23d6dc57cfbcabf3a3f24584b779447b Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Fri, 1 Oct 2021 15:29:28 -0700 Subject: [PATCH] remove support for AWS_PREFIX (#474) The AWS_PREFIX code has fallen into disrepair, and the concept is not as useful as it once was, now that we have per-instance buckets for testing, `zpool import`, `zpool export`, and (soon) a working `zpool destroy`. This PR removes support for the AWS_PREFIX environment variable. Additionally, I updated the ObjectAccess code and its callers to follow the Rust API design guideline "Caller decides where to copy and place data": https://rust-lang.github.io/api-guidelines/flexibility.html#caller-decides-where-to-copy-and-place-data-c-caller-control Functions should take owned values (in this case, `String` instead of `&str`) when they need ownership of them. This makes the cost of allocation/copying clear to the callers, and in some cases this cost can be avoided. For example, if the caller has a String that it doesn't need later, today it creates a reference and passes it, then the callee has to allocate a new String, copying the &str to it. With this change the caller gives its String to the callee and no copy is needed. Although the performance impact of this change is negligible, I wanted to make the code a better example of the recommended design principles. Note: I restructured DataObjectPhys::get() to not use ::get_from_key() to eliminate a copy of the key String. For some workloads, every malloc() is noticeable on the FlameGraph. Note: I replaced `.context()` calls with `.with_context()`, which avoids allocating a String and formatting the arguments, when the Result is Ok. --- cmd/zfs_object_agent/client/src/main.rs | 44 ++---- cmd/zfs_object_agent/object_perf/src/main.rs | 4 +- .../object_perf/src/s3perf.rs | 40 ++---- .../zettacache/src/block_based_log.rs | 2 +- .../zettaobject/src/data_object.rs | 36 +++-- .../zettaobject/src/heartbeat.rs | 9 +- .../zettaobject/src/object_access.rs | 130 +++++++----------- .../zettaobject/src/object_based_log.rs | 23 ++-- cmd/zfs_object_agent/zettaobject/src/pool.rs | 52 ++++--- .../zettaobject/src/pool_destroy.rs | 4 +- .../zettaobject/src/public_connection.rs | 2 +- 11 files changed, 150 insertions(+), 196 deletions(-) diff --git a/cmd/zfs_object_agent/client/src/main.rs b/cmd/zfs_object_agent/client/src/main.rs index 58b1745a4a57..a22b69200ead 100644 --- a/cmd/zfs_object_agent/client/src/main.rs +++ b/cmd/zfs_object_agent/client/src/main.rs @@ -4,7 +4,6 @@ use clap::Arg; use clap::SubCommand; use client::Client; use futures::stream::StreamExt; -use lazy_static::lazy_static; use nvpair::*; use rand::prelude::*; use rusoto_core::ByteStream; @@ -15,7 +14,6 @@ use rusoto_credential::ProfileProvider; use rusoto_credential::ProvideAwsCredentials; use rusoto_s3::*; use std::collections::BTreeSet; -use std::env; use std::error::Error; use std::fs; use std::fs::File; @@ -35,13 +33,6 @@ const BUCKET_NAME: &str = "cloudburst-data-2"; const POOL_NAME: &str = "testpool"; const POOL_GUID: u64 = 1234; -lazy_static! { - static ref AWS_PREFIX: String = match env::var("AWS_PREFIX") { - Ok(val) => format!("{}/", val), - Err(_) => "".to_string(), - }; -} - async fn do_rusoto_provider

(credentials_provider: P, file: &str) where P: ProvideAwsCredentials + Send + Sync + 'static, @@ -323,34 +314,21 @@ async fn print_super( ); } Err(_e) => { - /* - * XXX Pool::get_config() only works for pools under the AWS_PREFIX because it assumes the - * path to the "super" object. - */ - if AWS_PREFIX.len() == 0 && !pool_key.starts_with("zfs/") { - println!("\t(pool inside an alt AWS_PREFIX)"); - } else { - println!("\t-unknown format-"); - }; + println!("\t-unknown format-"); } } } } -fn strip_prefix(prefix: &str) -> &str { - if prefix.starts_with(AWS_PREFIX.as_str()) { - &prefix[AWS_PREFIX.len()..] - } else { - prefix - } -} - async fn find_old_pools(object_access: &ObjectAccess, min_age: Duration) -> Vec { - let pool_keys: Vec = object_access.list_prefixes("zfs/").collect().await; + let pool_keys: Vec = object_access + .list_prefixes("zfs/".to_string()) + .collect() + .await; let mut vec = Vec::new(); for pool_key in pool_keys { match object_access - .head_object(strip_prefix(&format!("{}super", pool_key))) + .head_object(format!("{}super", pool_key)) .await { Some(output) => { @@ -358,7 +336,7 @@ async fn find_old_pools(object_access: &ObjectAccess, min_age: Duration) -> Vec< DateTime::parse_from_rfc2822(output.last_modified.as_ref().unwrap()).unwrap(); print_super(object_access, &pool_key, &mod_time).await; if has_expired(&mod_time, min_age) { - vec.push(strip_prefix(&pool_key).to_string()); + vec.push(pool_key); } else { println!( "Skipping pool as it is not {} days old.", @@ -382,7 +360,7 @@ async fn do_list_pools( // Lookup all objects in the pool. if list_all_objects { object_access - .list_objects(&pool_key, None, false) + .list_objects(pool_key, None, false) .for_each(|object| async move { println!(" {}", object) }) .await; } @@ -396,7 +374,7 @@ async fn do_destroy_old_pools( ) -> Result<(), Box> { for pool_keys in find_old_pools(object_access, min_age).await { object_access - .delete_objects(object_access.list_objects(&pool_keys, None, false)) + .delete_objects(object_access.list_objects(pool_keys, None, false)) .await; } Ok(()) @@ -430,8 +408,8 @@ async fn do_test_connectivity(object_access: &ObjectAccess) { let file = format!("test/test_connectivity_{}", num); let content = "test connectivity to S3".as_bytes().to_vec(); - object_access.put_object(&file, content).await; - object_access.delete_object(&file).await; + object_access.put_object(file.clone(), content).await; + object_access.delete_object(file).await; } async fn test_connectivity(object_access: &ObjectAccess) -> Result<(), Box> { diff --git a/cmd/zfs_object_agent/object_perf/src/main.rs b/cmd/zfs_object_agent/object_perf/src/main.rs index c719d9444aa5..be53ac3772e0 100644 --- a/cmd/zfs_object_agent/object_perf/src/main.rs +++ b/cmd/zfs_object_agent/object_perf/src/main.rs @@ -136,12 +136,12 @@ async fn main() { println!("Using prefix: '{}'", key_prefix); match matches.subcommand() { ("write", Some(_matches)) => { - s3perf::write_test(&object_access, &key_prefix, objsize_bytes, qdepth, duration) + s3perf::write_test(&object_access, key_prefix, objsize_bytes, qdepth, duration) .await .unwrap(); } ("read", Some(_matches)) => { - s3perf::read_test(&object_access, &key_prefix, objsize_bytes, qdepth, duration) + s3perf::read_test(&object_access, key_prefix, objsize_bytes, qdepth, duration) .await .unwrap(); } diff --git a/cmd/zfs_object_agent/object_perf/src/s3perf.rs b/cmd/zfs_object_agent/object_perf/src/s3perf.rs index a071d8d18d15..f550d178b441 100644 --- a/cmd/zfs_object_agent/object_perf/src/s3perf.rs +++ b/cmd/zfs_object_agent/object_perf/src/s3perf.rs @@ -29,16 +29,16 @@ impl Perf { #[measure(InFlight)] #[measure(Throughput)] #[measure(HitCount)] - async fn put(&self, object_access: &ObjectAccess, key: &str, data: Vec) { - object_access.put_object(&key.to_string(), data).await; + async fn put(&self, object_access: &ObjectAccess, key: String, data: Vec) { + object_access.put_object(key, data).await; } #[measure(type = ResponseTime)] #[measure(InFlight)] #[measure(Throughput)] #[measure(HitCount)] - async fn get(&self, object_access: &ObjectAccess, key: &str) { - object_access.get_object(&key.to_string()).await.unwrap(); + async fn get(&self, object_access: &ObjectAccess, key: String) { + object_access.get_object(key).await.unwrap(); } fn log_metrics(&self, duration: Duration) { @@ -60,7 +60,7 @@ impl Perf { duration: Duration, ) { let num_objects = object_access - .list_objects(&key_prefix, None, true) + .list_objects(key_prefix.clone(), None, true) .fold(0, |count, _key| async move { count + 1 }) .await; let mut key_id = 0; @@ -74,7 +74,7 @@ impl Perf { my_perf .get( &my_object_access, - &format!("{}{}", my_key_prefix, key_id % num_objects + 1), + format!("{}{}", my_key_prefix, key_id % num_objects + 1), ) .await; }) @@ -106,7 +106,7 @@ impl Perf { my_perf .put( &my_object_access, - &format!("{}{}", my_key_prefix, key_id), + format!("{}{}", my_key_prefix, key_id), my_data, ) .await @@ -135,7 +135,7 @@ impl Perf { pub async fn write_test( object_access: &ObjectAccess, - key_prefix: &str, + key_prefix: String, objsize: u64, qdepth: u64, duration: Duration, @@ -144,14 +144,8 @@ pub async fn write_test( let bounds = WriteTestBounds::Time(duration); perf.log_metrics(Duration::from_secs(1)); - perf.write_objects( - object_access, - key_prefix.to_string(), - objsize, - qdepth, - bounds, - ) - .await; + perf.write_objects(object_access, key_prefix.clone(), objsize, qdepth, bounds) + .await; println!("{:#?}", perf.metrics.put); @@ -164,7 +158,7 @@ pub async fn write_test( pub async fn read_test( object_access: &ObjectAccess, - key_prefix: &str, + key_prefix: String, objsize: u64, qdepth: u64, duration: Duration, @@ -173,16 +167,10 @@ pub async fn read_test( let bounds = WriteTestBounds::Objects(max(qdepth * 10, 200)); perf.log_metrics(Duration::from_secs(1)); - perf.write_objects( - object_access, - key_prefix.to_string(), - objsize, - qdepth, - bounds, - ) - .await; + perf.write_objects(object_access, key_prefix.clone(), objsize, qdepth, bounds) + .await; - perf.read_objects(object_access, key_prefix.to_string(), qdepth, duration) + perf.read_objects(object_access, key_prefix.clone(), qdepth, duration) .await; println!("{:#?}", perf.metrics.get); diff --git a/cmd/zfs_object_agent/zettacache/src/block_based_log.rs b/cmd/zfs_object_agent/zettacache/src/block_based_log.rs index e86ffee9e9b6..6a394799a918 100644 --- a/cmd/zfs_object_agent/zettacache/src/block_based_log.rs +++ b/cmd/zfs_object_agent/zettacache/src/block_based_log.rs @@ -277,7 +277,7 @@ impl BlockBasedLog { // XXX handle checksum error here let (chunk, consumed): (BlockBasedLogChunk, usize) = block_access .chunk_from_raw(&extent_bytes[total_consumed..]) - .context(format!("{:?} at {:?}", chunk_id, chunk_location)) + .with_context(|| format!("{:?} at {:?}", chunk_id, chunk_location)) .unwrap(); assert_eq!(chunk.id, chunk_id); for entry in chunk.entries { diff --git a/cmd/zfs_object_agent/zettaobject/src/data_object.rs b/cmd/zfs_object_agent/zettaobject/src/data_object.rs index bad93b428496..56341153c3a8 100644 --- a/cmd/zfs_object_agent/zettaobject/src/data_object.rs +++ b/cmd/zfs_object_agent/zettaobject/src/data_object.rs @@ -99,27 +99,45 @@ impl DataObjectPhys { pub async fn get( object_access: &ObjectAccess, guid: PoolGuid, - obj: ObjectId, + object: ObjectId, bypass_cache: bool, ) -> Result { - let this = Self::get_from_key(object_access, &Self::key(guid, obj), bypass_cache).await?; + let buf = match bypass_cache { + true => { + object_access + .get_object_uncached(Self::key(guid, object)) + .await? + } + false => object_access.get_object(Self::key(guid, object)).await?, + }; + let begin = Instant::now(); + let this: DataObjectPhys = bincode::deserialize(&buf) + .with_context(|| format!("Failed to decode contents of {}", Self::key(guid, object)))?; + trace!( + "{:?}: deserialized {} blocks from {} bytes in {}ms", + this.object, + this.blocks.len(), + buf.len(), + begin.elapsed().as_millis() + ); assert_eq!(this.guid, guid); - assert_eq!(this.object, obj); + assert_eq!(this.object, object); + this.verify(); Ok(this) } pub async fn get_from_key( object_access: &ObjectAccess, - key: &str, + key: String, bypass_cache: bool, ) -> Result { let buf = match bypass_cache { - true => object_access.get_object_uncached(key).await?, - false => object_access.get_object(key).await?, + true => object_access.get_object_uncached(key.clone()).await?, + false => object_access.get_object(key.clone()).await?, }; let begin = Instant::now(); - let this: DataObjectPhys = - bincode::deserialize(&buf).context(format!("Failed to decode contents of {}", key))?; + let this: DataObjectPhys = bincode::deserialize(&buf) + .with_context(|| format!("Failed to decode contents of {}", key))?; trace!( "{:?}: deserialized {} blocks from {} bytes in {}ms", this.object, @@ -143,7 +161,7 @@ impl DataObjectPhys { ); self.verify(); object_access - .put_object(&Self::key(self.guid, self.object), contents) + .put_object(Self::key(self.guid, self.object), contents) .await; } diff --git a/cmd/zfs_object_agent/zettaobject/src/heartbeat.rs b/cmd/zfs_object_agent/zettaobject/src/heartbeat.rs index 44db39374cca..2b6f7a8fd3e5 100644 --- a/cmd/zfs_object_agent/zettaobject/src/heartbeat.rs +++ b/cmd/zfs_object_agent/zettaobject/src/heartbeat.rs @@ -35,10 +35,9 @@ impl HeartbeatPhys { } pub async fn get(object_access: &ObjectAccess, id: Uuid) -> anyhow::Result { - let key = Self::key(id); - let buf = object_access.get_object_impl(&key, None).await?; + let buf = object_access.get_object_impl(Self::key(id), None).await?; let this: Self = serde_json::from_slice(&buf) - .context(format!("Failed to decode contents of {}", key))?; + .with_context(|| format!("Failed to decode contents of {}", Self::key(id)))?; debug!("got {:#?}", this); assert_eq!(this.id, id); Ok(this) @@ -53,12 +52,12 @@ impl HeartbeatPhys { debug!("putting {:#?}", self); let buf = serde_json::to_vec(&self).unwrap(); object_access - .put_object_timed(&Self::key(self.id), buf, timeout) + .put_object_timed(Self::key(self.id), buf, timeout) .await } pub async fn delete(object_access: &ObjectAccess, id: Uuid) { - object_access.delete_object(&Self::key(id)).await; + object_access.delete_object(Self::key(id)).await; } } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access.rs b/cmd/zfs_object_agent/zettaobject/src/object_access.rs index 0f39c5de99fe..1b8a0f689ac0 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access.rs @@ -15,11 +15,11 @@ use rusoto_core::{ByteStream, RusotoError}; use rusoto_credential::{AutoRefreshingProvider, ChainProvider, ProfileProvider}; use rusoto_s3::*; use std::convert::TryFrom; +use std::error::Error; use std::iter; use std::sync::Arc; use std::time::Instant; use std::{collections::HashMap, fmt::Display}; -use std::{env, error::Error}; use tokio::{sync::watch, time::error::Elapsed}; use zettacache::get_tunable; @@ -34,10 +34,6 @@ lazy_static! { cache: LruCache::new(100), reading: HashMap::new(), }); - static ref PREFIX: String = match env::var("AWS_PREFIX") { - Ok(val) => format!("{}/", val), - Err(_) => "".to_string(), - }; static ref NON_RETRYABLE_ERRORS: Vec = vec![ StatusCode::BAD_REQUEST, StatusCode::FORBIDDEN, @@ -62,20 +58,6 @@ pub struct ObjectAccess { credentials_profile: Option, } -/* - * For testing, prefix all object keys with this string. In cases where objects are returned from - * a call like list_objects and then fetched with get, we could end up doubling the prefix. We - * could either strip the prefix from the beginning of every object we return, or we can only - * prefix an object if it isn't already prefixed. We do the latter here, for conciseness, but in - * the future we may want to revisit this decision. - */ -fn prefixed(key: &str) -> String { - match key.starts_with(format!("{}zfs", *PREFIX).as_str()) { - true => key.to_string(), - false => format!("{}{}", *PREFIX, key), - } -} - #[derive(Debug)] #[allow(clippy::upper_case_acronyms)] pub enum OAError { @@ -259,12 +241,12 @@ impl ObjectAccess { self.client } - pub async fn get_object_impl(&self, key: &str, timeout: Option) -> Result> { - let msg = format!("get {}", prefixed(key)); + pub async fn get_object_impl(&self, key: String, timeout: Option) -> Result> { + let msg = format!("get {}", key); let v = retry(&msg, timeout, || async { let req = GetObjectRequest { bucket: self.bucket_str.clone(), - key: prefixed(key), + key: key.clone(), ..Default::default() }; let output = self.client.get_object(req).await?; @@ -304,8 +286,8 @@ impl ObjectAccess { Ok(v) } - pub async fn get_object_uncached(&self, key: &str) -> Result>> { - let vec = self.get_object_impl(key, None).await?; + pub async fn get_object_uncached(&self, key: String) -> Result>> { + let vec = self.get_object_impl(key.clone(), None).await?; // Note: we *should* have the same data from S3 (in the `vec`) and in // the cache, so this invalidation is normally not necessary. However, // in case a bug (or undetected RAM error) resulted in incorrect cached @@ -315,26 +297,25 @@ impl ObjectAccess { Ok(Arc::new(vec)) } - pub async fn get_object(&self, key: &str) -> Result>> { + pub async fn get_object(&self, key: String) -> Result>> { let either = { // need this block separate so that we can drop the mutex before the .await let mut c = CACHE.lock().unwrap(); - let mykey = key.to_string(); - match c.cache.get(&mykey) { + match c.cache.get(&key) { Some(v) => { debug!("found {} in cache", key); return Ok(v.clone()); } - None => match c.reading.get(key) { + None => match c.reading.get(&key) { None => { let (tx, rx) = watch::channel::>>>(None); - c.reading.insert(mykey, rx); + c.reading.insert(key.clone(), rx); Either::Left(async move { - let v = Arc::new(self.get_object_impl(key, None).await?); + let v = Arc::new(self.get_object_impl(key.clone(), None).await?); let mut myc = CACHE.lock().unwrap(); tx.send(Some(v.clone())).unwrap(); myc.cache.put(key.to_string(), v.clone()); - myc.reading.remove(key); + myc.reading.remove(&key); Ok(v) }) } @@ -372,13 +353,11 @@ impl ObjectAccess { fn list_impl( &self, - prefix: &str, + prefix: String, start_after: Option, use_delimiter: bool, list_prefixes: bool, ) -> impl Stream { - let full_prefix = prefixed(prefix); - let full_start_after = start_after.map(|sa| prefixed(&sa)); let mut continuation_token = None; // XXX ObjectAccess should really be refcounted (behind Arc) let client = self.client.clone(); @@ -390,7 +369,7 @@ impl ObjectAccess { stream! { loop { let output = retry( - &format!("list {} (after {:?})", full_prefix, full_start_after), + &format!("list {} (after {:?})", prefix, start_after), None, || async { let req = ListObjectsV2Request { @@ -398,8 +377,8 @@ impl ObjectAccess { continuation_token: continuation_token.clone(), delimiter: delimiter.clone(), fetch_owner: Some(false), - prefix: Some(full_prefix.clone()), - start_after: full_start_after.clone(), + prefix: Some(prefix.clone()), + start_after: start_after.clone(), ..Default::default() }; // Note: Ok(...?) converts the RusotoError to an OAError for us @@ -432,26 +411,30 @@ impl ObjectAccess { pub fn list_objects( &self, - prefix: &str, + prefix: String, start_after: Option, use_delimiter: bool, ) -> impl Stream { self.list_impl(prefix, start_after, use_delimiter, false) } - pub fn list_prefixes(&self, prefix: &str) -> impl Stream { + pub fn list_prefixes(&self, prefix: String) -> impl Stream { self.list_impl(prefix, None, true, true) } - pub async fn collect_objects(&self, prefix: &str, start_after: Option) -> Vec { + pub async fn collect_objects( + &self, + prefix: String, + start_after: Option, + ) -> Vec { self.list_objects(prefix, start_after, true).collect().await } - pub async fn head_object(&self, key: &str) -> Option { - let res = retry(&format!("head {}", prefixed(key)), None, || async { + pub async fn head_object(&self, key: String) -> Option { + let res = retry(&format!("head {}", key), None, || async { let req = HeadObjectRequest { bucket: self.bucket_str.clone(), - key: prefixed(key), + key: key.clone(), ..Default::default() }; // Note: Ok(...?) converts the RusotoError to an OAError for us @@ -461,72 +444,66 @@ impl ObjectAccess { res.ok() } - pub async fn object_exists(&self, key: &str) -> bool { + pub async fn object_exists(&self, key: String) -> bool { self.head_object(key).await.is_some() } async fn put_object_impl( &self, - key: &str, + key: String, data: Vec, timeout: Option, ) -> Result> { let len = data.len(); let bytes = Bytes::from(data); assert!(!self.readonly); - retry( - &format!("put {} ({} bytes)", prefixed(key), len), - timeout, - || async { - let my_bytes = bytes.clone(); - let stream = ByteStream::new_with_size(stream! { yield Ok(my_bytes)}, len); - - let req = PutObjectRequest { - bucket: self.bucket_str.clone(), - key: prefixed(key), - body: Some(stream), - ..Default::default() - }; - // Note: Ok(...?) converts the RusotoError to an OAError for us - Ok(self.client.put_object(req).await?) - }, - ) + retry(&format!("put {} ({} bytes)", key, len), timeout, || async { + let my_bytes = bytes.clone(); + let stream = ByteStream::new_with_size(stream! { yield Ok(my_bytes)}, len); + + let req = PutObjectRequest { + bucket: self.bucket_str.clone(), + key: key.clone(), + body: Some(stream), + ..Default::default() + }; + // Note: Ok(...?) converts the RusotoError to an OAError for us + Ok(self.client.put_object(req).await?) + }) .await } - fn invalidate_cache(key: &str, data: &[u8]) { + fn invalidate_cache(key: String, data: &[u8]) { let mut c = CACHE.lock().unwrap(); - let mykey = key.to_string(); - if c.cache.contains(&mykey) { + if c.cache.contains(&key) { debug!("found {} in cache when putting - invalidating", key); // XXX unfortuate to be copying; this happens every time when // freeing (we get/modify/put the object). Maybe when freeing, // the get() should not add to the cache since it's probably // just polluting. - c.cache.put(mykey, Arc::new(data.to_vec())); + c.cache.put(key, Arc::new(data.to_vec())); } } - pub async fn put_object(&self, key: &str, data: Vec) { - Self::invalidate_cache(key, &data); + pub async fn put_object(&self, key: String, data: Vec) { + Self::invalidate_cache(key.clone(), &data); self.put_object_impl(key, data, None).await.unwrap(); } pub async fn put_object_timed( &self, - key: &str, + key: String, data: Vec, timeout: Option, ) -> Result> { - Self::invalidate_cache(key, &data); + Self::invalidate_cache(key.clone(), &data); self.put_object_impl(key, data, timeout).await } - pub async fn delete_object(&self, key: &str) { - self.delete_objects(stream::iter(iter::once(key.to_string()))) - .await; + pub async fn delete_object(&self, key: String) { + self.delete_objects(stream::iter(iter::once(key))).await; } // Note: Stream is of raw keys (with prefix) @@ -588,11 +565,4 @@ impl ObjectAccess { pub fn readonly(&self) -> bool { self.readonly } - - pub fn strip_prefix(key: &str) -> &str { - match key.strip_prefix(PREFIX.as_str()) { - Some(stripped_key) => stripped_key, - None => key, - } - } } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_based_log.rs b/cmd/zfs_object_agent/zettaobject/src/object_based_log.rs index a7cd5f96e65e..8f1fb8e8689b 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_based_log.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_based_log.rs @@ -45,7 +45,7 @@ impl ObjectBasedLogPhys { pub async fn cleanup_older_generations(&self, object_access: &ObjectAccess) { // Stream of generation prefixes let generations = object_access - .list_prefixes(&format!("{}/", self.key)) + .list_prefixes(format!("{}/", self.key)) .filter(|key| { future::ready( key.rsplit('/').collect::>()[1] @@ -60,7 +60,7 @@ impl ObjectBasedLogPhys { .delete_objects( generations .flat_map(|generation| { - Box::pin(object_access.list_objects(&generation, None, false)) + Box::pin(object_access.list_objects(generation, None, false)) }) .inspect(|key| trace!("cleanup: old generation chunk {}", key)), ) @@ -90,11 +90,16 @@ impl ObjectBasedLogChunk { generation: u64, chunk: u64, ) -> Result { - let key = Self::key(name, generation, chunk); - let buf = object_access.get_object(&key).await?; + let buf = object_access + .get_object(Self::key(name, generation, chunk)) + .await?; let begin = Instant::now(); - let this: Self = serde_json::from_slice(&buf) - .context(format!("Failed to decode contents of {}", key))?; + let this: Self = serde_json::from_slice(&buf).with_context(|| { + format!( + "Failed to decode contents of {}", + Self::key(name, generation, chunk) + ) + })?; debug!( "deserialized {} log entries in {}ms", this.entries.len(), @@ -114,7 +119,7 @@ impl ObjectBasedLogChunk { begin.elapsed().as_millis() ); object_access - .put_object(&Self::key(name, self.generation, self.chunk), buf) + .put_object(Self::key(name, self.generation, self.chunk), buf) .await; } } @@ -201,7 +206,7 @@ impl ObjectBasedLog { .delete_objects( shared_state .object_access - .list_objects(&last_generation_key, start_after, true) + .list_objects(last_generation_key, start_after, true) .inspect(|key| { info!( "cleanup: deleting future chunk of current generation: {}", @@ -221,7 +226,7 @@ impl ObjectBasedLog { .delete_objects( shared_state .object_access - .list_objects(&next_generation_key, None, true) + .list_objects(next_generation_key, None, true) .inspect(|key| { info!("cleanup: deleting chunk of future generation: {}", key) }), diff --git a/cmd/zfs_object_agent/zettaobject/src/pool.rs b/cmd/zfs_object_agent/zettaobject/src/pool.rs index c5a3a5854481..608b073e202b 100644 --- a/cmd/zfs_object_agent/zettaobject/src/pool.rs +++ b/cmd/zfs_object_agent/zettaobject/src/pool.rs @@ -118,10 +118,9 @@ impl PoolOwnerPhys { } async fn get(object_access: &ObjectAccess, id: PoolGuid) -> anyhow::Result { - let key = Self::key(id); - let buf = object_access.get_object_impl(&key, None).await?; + let buf = object_access.get_object_impl(Self::key(id), None).await?; let this: Self = serde_json::from_slice(&buf) - .context(format!("Failed to decode contents of {}", key))?; + .with_context(|| format!("Failed to decode contents of {}", Self::key(id)))?; debug!("got {:#?}", this); assert_eq!(this.id, id); Ok(this) @@ -136,12 +135,12 @@ impl PoolOwnerPhys { debug!("putting {:#?}", self); let buf = serde_json::to_vec(&self).unwrap(); object_access - .put_object_timed(&Self::key(self.id), buf, timeout) + .put_object_timed(Self::key(self.id), buf, timeout) .await } async fn delete(object_access: &ObjectAccess, id: PoolGuid) { - object_access.delete_object(&Self::key(id)).await; + object_access.delete_object(Self::key(id)).await; } } #[derive(Debug)] @@ -278,14 +277,13 @@ impl PoolPhys { } pub async fn exists(object_access: &ObjectAccess, guid: PoolGuid) -> bool { - object_access.object_exists(&Self::key(guid)).await + object_access.object_exists(Self::key(guid)).await } pub async fn get(object_access: &ObjectAccess, guid: PoolGuid) -> Result { - let key = Self::key(guid); - let buf = object_access.get_object(&key).await?; + let buf = object_access.get_object(Self::key(guid)).await?; let this: Self = serde_json::from_slice(&buf) - .context(format!("Failed to decode contents of {}", key))?; + .with_context(|| format!("Failed to decode contents of {}", Self::key(guid)))?; debug!("got {:#?}", this); assert_eq!(this.guid, guid); Ok(this) @@ -295,7 +293,7 @@ impl PoolPhys { maybe_die_with(|| format!("before putting {:#?}", self)); debug!("putting {:#?}", self); let buf = serde_json::to_vec(&self).unwrap(); - object_access.put_object(&Self::key(self.guid), buf).await; + object_access.put_object(Self::key(self.guid), buf).await; } } @@ -318,10 +316,9 @@ impl UberblockPhys { } async fn get(object_access: &ObjectAccess, guid: PoolGuid, txg: Txg) -> Result { - let key = Self::key(guid, txg); - let buf = object_access.get_object(&key).await?; + let buf = object_access.get_object(Self::key(guid, txg)).await?; let this: Self = serde_json::from_slice(&buf) - .context(format!("Failed to decode contents of {}", key))?; + .with_context(|| format!("Failed to decode contents of {}", Self::key(guid, txg)))?; debug!("got {:#?}", this); assert_eq!(this.guid, guid); assert_eq!(this.txg, txg); @@ -333,7 +330,7 @@ impl UberblockPhys { debug!("putting {:#?}", self); let buf = serde_json::to_vec(&self).unwrap(); object_access - .put_object(&Self::key(self.guid, self.txg), buf) + .put_object(Self::key(self.guid, self.txg), buf) .await; } @@ -347,7 +344,7 @@ impl UberblockPhys { async fn cleanup_older_uberblocks(object_access: &ObjectAccess, ub: UberblockPhys) { let mut txgs: Vec = object_access - .collect_objects(&format!("zfs/{}/txg/", ub.guid), None) + .collect_objects(format!("zfs/{}/txg/", ub.guid), None) .await .iter() .map(|prefix| { @@ -679,21 +676,22 @@ impl PoolState { .delete_objects( shared_state .object_access - .list_objects(&txg_key, start_after, true) + .list_objects(txg_key, start_after, true) .inspect(|key| info!("cleanup: deleting future uberblock: {}", key)), ) .await; } /// Remove log objects from log at prefix starting at next_id - async fn cleanup_orphaned_logs(&self, prefix: &str, next_id: ReclaimLogId) { + async fn cleanup_orphaned_logs(&self, prefix: String, next_id: ReclaimLogId) { let shared_state = &self.shared_state.clone(); + let start_after = Some(format!("{}/{}", prefix, next_id)); shared_state .object_access .delete_objects( shared_state .object_access - .list_objects(prefix, Some(format!("{}/{}", prefix, next_id)), false) + .list_objects(prefix, start_after, false) .inspect(|key| info!("cleanup: deleting orphaned log object: {}", key)), ) .await; @@ -733,8 +731,8 @@ impl PoolState { syncing_state.storage_object_log.cleanup(), frees_log_stream.for_each(|_| future::ready(())), size_log_stream.for_each(|_| future::ready(())), - self.cleanup_orphaned_logs(&pending_frees_log_prefix, next_log_id), - self.cleanup_orphaned_logs(&object_size_log_prefix, next_log_id), + self.cleanup_orphaned_logs(pending_frees_log_prefix, next_log_id), + self.cleanup_orphaned_logs(object_size_log_prefix, next_log_id), ) .await; assert!(self.syncing_state.lock().unwrap().is_none()); @@ -758,13 +756,10 @@ impl PoolState { oa.delete_objects( select_all( DataObjectPhys::prefixes(shared_state.guid) - .iter() + .into_iter() .map(|prefix| { - Box::pin(oa.list_objects( - prefix, - Some(format!("{}{}", prefix, last_obj)), - true, - )) + let start_after = Some(format!("{}{}", prefix, last_obj)); + Box::pin(oa.list_objects(prefix, start_after, true)) }), ) .inspect(|_| count += 1), @@ -1054,9 +1049,10 @@ impl Pool { for prefix in DataObjectPhys::prefixes(shared_state.guid) { let shared_state = shared_state.clone(); list_stream.push(async move { + let start_after = Some(format!("{}{}", prefix, last_obj)); shared_state .object_access - .collect_objects(&prefix, Some(format!("{}{}", prefix, last_obj))) + .collect_objects(prefix, start_after) .await }); } @@ -1067,7 +1063,7 @@ impl Pool { for key in vec { let shared_state = shared_state.clone(); sub_stream.push(future::ready(async move { - DataObjectPhys::get_from_key(&shared_state.object_access, &key, false).await + DataObjectPhys::get_from_key(&shared_state.object_access, key, false).await })); } sub_stream diff --git a/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs b/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs index 4a140ed2d0a8..03e8a66311aa 100644 --- a/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs +++ b/cmd/zfs_object_agent/zettaobject/src/pool_destroy.rs @@ -333,7 +333,7 @@ fn delete_pool_objects( let oa: ObjectAccess = object_access.clone(); tokio::spawn(async move { - let prefix = &format!("zfs/{}/", guid); + let prefix = format!("zfs/{}/", guid); let super_object = PoolPhys::key(guid); let batch_size = *OBJECT_DELETION_BATCH_SIZE * *DESTROY_PROGRESS_FREQUENCY; let mut count = 0; @@ -395,7 +395,7 @@ async fn destroy_task(object_access: ObjectAccess, guid: PoolGuid) { } // The super object is destroyed last as it is used to keep track of the progress made. - object_access.delete_object(&PoolPhys::key(guid)).await; + object_access.delete_object(PoolPhys::key(guid)).await; let mut maybe_pool_destroyer = POOL_DESTROYER.lock().await; let pool_destroyer = maybe_pool_destroyer.as_mut().unwrap(); diff --git a/cmd/zfs_object_agent/zettaobject/src/public_connection.rs b/cmd/zfs_object_agent/zettaobject/src/public_connection.rs index 8b920d32cd96..8ec682137d22 100644 --- a/cmd/zfs_object_agent/zettaobject/src/public_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/public_connection.rs @@ -120,7 +120,7 @@ impl PublicConnectionState { } object_access - .list_prefixes("zfs/") + .list_prefixes("zfs/".to_string()) .for_each_concurrent(*GET_POOLS_QUEUE_DEPTH, |prefix| { let my_object_access = object_access.clone(); let my_response = response.clone();