Skip to content

Commit

Permalink
remove support for AWS_PREFIX (openzfs#474)
Browse files Browse the repository at this point in the history
The AWS_PREFIX code has fallen into disrepair, and the concept is not as
useful as it once was, now that we have per-instance buckets for
testing, `zpool import`, `zpool export`, and (soon) a working `zpool
destroy`.

This PR removes support for the AWS_PREFIX environment variable.

Additionally, I updated the ObjectAccess code and its callers to follow
the Rust API design guideline "Caller decides where to copy and place
data":
https://rust-lang.github.io/api-guidelines/flexibility.html#caller-decides-where-to-copy-and-place-data-c-caller-control

Functions should take owned values (in this case, `String` instead of
`&str`) when they need ownership of them.  This makes the cost of
allocation/copying clear to the callers, and in some cases this cost can
be avoided.

For example, if the caller has a String that it doesn't need later,
today it creates a reference and passes it, then the callee has to
allocate a new String, copying the &str to it.  With this change the
caller gives its String to the callee and no copy is needed.

Although the performance impact of this change is negligible, I wanted
to make the code a better example of the recommended design principles.

Note: I restructured DataObjectPhys::get() to not use ::get_from_key()
to eliminate a copy of the key String.  For some workloads, every
malloc() is noticeable on the FlameGraph.

Note: I replaced `.context()` calls with `.with_context()`, which avoids
allocating a String and formatting the arguments, when the Result is Ok.
  • Loading branch information
ahrens authored Oct 1, 2021
1 parent ca58e1a commit e7bed08
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 196 deletions.
44 changes: 11 additions & 33 deletions cmd/zfs_object_agent/client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use clap::Arg;
use clap::SubCommand;
use client::Client;
use futures::stream::StreamExt;
use lazy_static::lazy_static;
use nvpair::*;
use rand::prelude::*;
use rusoto_core::ByteStream;
Expand All @@ -15,7 +14,6 @@ use rusoto_credential::ProfileProvider;
use rusoto_credential::ProvideAwsCredentials;
use rusoto_s3::*;
use std::collections::BTreeSet;
use std::env;
use std::error::Error;
use std::fs;
use std::fs::File;
Expand All @@ -35,13 +33,6 @@ const BUCKET_NAME: &str = "cloudburst-data-2";
const POOL_NAME: &str = "testpool";
const POOL_GUID: u64 = 1234;

lazy_static! {
static ref AWS_PREFIX: String = match env::var("AWS_PREFIX") {
Ok(val) => format!("{}/", val),
Err(_) => "".to_string(),
};
}

async fn do_rusoto_provider<P>(credentials_provider: P, file: &str)
where
P: ProvideAwsCredentials + Send + Sync + 'static,
Expand Down Expand Up @@ -323,42 +314,29 @@ async fn print_super(
);
}
Err(_e) => {
/*
* XXX Pool::get_config() only works for pools under the AWS_PREFIX because it assumes the
* path to the "super" object.
*/
if AWS_PREFIX.len() == 0 && !pool_key.starts_with("zfs/") {
println!("\t(pool inside an alt AWS_PREFIX)");
} else {
println!("\t-unknown format-");
};
println!("\t-unknown format-");
}
}
}
}

fn strip_prefix(prefix: &str) -> &str {
if prefix.starts_with(AWS_PREFIX.as_str()) {
&prefix[AWS_PREFIX.len()..]
} else {
prefix
}
}

async fn find_old_pools(object_access: &ObjectAccess, min_age: Duration) -> Vec<String> {
let pool_keys: Vec<String> = object_access.list_prefixes("zfs/").collect().await;
let pool_keys: Vec<String> = object_access
.list_prefixes("zfs/".to_string())
.collect()
.await;
let mut vec = Vec::new();
for pool_key in pool_keys {
match object_access
.head_object(strip_prefix(&format!("{}super", pool_key)))
.head_object(format!("{}super", pool_key))
.await
{
Some(output) => {
let mod_time =
DateTime::parse_from_rfc2822(output.last_modified.as_ref().unwrap()).unwrap();
print_super(object_access, &pool_key, &mod_time).await;
if has_expired(&mod_time, min_age) {
vec.push(strip_prefix(&pool_key).to_string());
vec.push(pool_key);
} else {
println!(
"Skipping pool as it is not {} days old.",
Expand All @@ -382,7 +360,7 @@ async fn do_list_pools(
// Lookup all objects in the pool.
if list_all_objects {
object_access
.list_objects(&pool_key, None, false)
.list_objects(pool_key, None, false)
.for_each(|object| async move { println!(" {}", object) })
.await;
}
Expand All @@ -396,7 +374,7 @@ async fn do_destroy_old_pools(
) -> Result<(), Box<dyn Error>> {
for pool_keys in find_old_pools(object_access, min_age).await {
object_access
.delete_objects(object_access.list_objects(&pool_keys, None, false))
.delete_objects(object_access.list_objects(pool_keys, None, false))
.await;
}
Ok(())
Expand Down Expand Up @@ -430,8 +408,8 @@ async fn do_test_connectivity(object_access: &ObjectAccess) {
let file = format!("test/test_connectivity_{}", num);
let content = "test connectivity to S3".as_bytes().to_vec();

object_access.put_object(&file, content).await;
object_access.delete_object(&file).await;
object_access.put_object(file.clone(), content).await;
object_access.delete_object(file).await;
}

async fn test_connectivity(object_access: &ObjectAccess) -> Result<(), Box<dyn Error>> {
Expand Down
4 changes: 2 additions & 2 deletions cmd/zfs_object_agent/object_perf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ async fn main() {
println!("Using prefix: '{}'", key_prefix);
match matches.subcommand() {
("write", Some(_matches)) => {
s3perf::write_test(&object_access, &key_prefix, objsize_bytes, qdepth, duration)
s3perf::write_test(&object_access, key_prefix, objsize_bytes, qdepth, duration)
.await
.unwrap();
}
("read", Some(_matches)) => {
s3perf::read_test(&object_access, &key_prefix, objsize_bytes, qdepth, duration)
s3perf::read_test(&object_access, key_prefix, objsize_bytes, qdepth, duration)
.await
.unwrap();
}
Expand Down
40 changes: 14 additions & 26 deletions cmd/zfs_object_agent/object_perf/src/s3perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ impl Perf {
#[measure(InFlight)]
#[measure(Throughput)]
#[measure(HitCount)]
async fn put(&self, object_access: &ObjectAccess, key: &str, data: Vec<u8>) {
object_access.put_object(&key.to_string(), data).await;
async fn put(&self, object_access: &ObjectAccess, key: String, data: Vec<u8>) {
object_access.put_object(key, data).await;
}

#[measure(type = ResponseTime<AtomicHdrHistogram, StdInstantMicros>)]
#[measure(InFlight)]
#[measure(Throughput)]
#[measure(HitCount)]
async fn get(&self, object_access: &ObjectAccess, key: &str) {
object_access.get_object(&key.to_string()).await.unwrap();
async fn get(&self, object_access: &ObjectAccess, key: String) {
object_access.get_object(key).await.unwrap();
}

fn log_metrics(&self, duration: Duration) {
Expand All @@ -60,7 +60,7 @@ impl Perf {
duration: Duration,
) {
let num_objects = object_access
.list_objects(&key_prefix, None, true)
.list_objects(key_prefix.clone(), None, true)
.fold(0, |count, _key| async move { count + 1 })
.await;
let mut key_id = 0;
Expand All @@ -74,7 +74,7 @@ impl Perf {
my_perf
.get(
&my_object_access,
&format!("{}{}", my_key_prefix, key_id % num_objects + 1),
format!("{}{}", my_key_prefix, key_id % num_objects + 1),
)
.await;
})
Expand Down Expand Up @@ -106,7 +106,7 @@ impl Perf {
my_perf
.put(
&my_object_access,
&format!("{}{}", my_key_prefix, key_id),
format!("{}{}", my_key_prefix, key_id),
my_data,
)
.await
Expand Down Expand Up @@ -135,7 +135,7 @@ impl Perf {

pub async fn write_test(
object_access: &ObjectAccess,
key_prefix: &str,
key_prefix: String,
objsize: u64,
qdepth: u64,
duration: Duration,
Expand All @@ -144,14 +144,8 @@ pub async fn write_test(
let bounds = WriteTestBounds::Time(duration);
perf.log_metrics(Duration::from_secs(1));

perf.write_objects(
object_access,
key_prefix.to_string(),
objsize,
qdepth,
bounds,
)
.await;
perf.write_objects(object_access, key_prefix.clone(), objsize, qdepth, bounds)
.await;

println!("{:#?}", perf.metrics.put);

Expand All @@ -164,7 +158,7 @@ pub async fn write_test(

pub async fn read_test(
object_access: &ObjectAccess,
key_prefix: &str,
key_prefix: String,
objsize: u64,
qdepth: u64,
duration: Duration,
Expand All @@ -173,16 +167,10 @@ pub async fn read_test(
let bounds = WriteTestBounds::Objects(max(qdepth * 10, 200));
perf.log_metrics(Duration::from_secs(1));

perf.write_objects(
object_access,
key_prefix.to_string(),
objsize,
qdepth,
bounds,
)
.await;
perf.write_objects(object_access, key_prefix.clone(), objsize, qdepth, bounds)
.await;

perf.read_objects(object_access, key_prefix.to_string(), qdepth, duration)
perf.read_objects(object_access, key_prefix.clone(), qdepth, duration)
.await;

println!("{:#?}", perf.metrics.get);
Expand Down
2 changes: 1 addition & 1 deletion cmd/zfs_object_agent/zettacache/src/block_based_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ impl<T: BlockBasedLogEntry> BlockBasedLog<T> {
// XXX handle checksum error here
let (chunk, consumed): (BlockBasedLogChunk<T>, usize) = block_access
.chunk_from_raw(&extent_bytes[total_consumed..])
.context(format!("{:?} at {:?}", chunk_id, chunk_location))
.with_context(|| format!("{:?} at {:?}", chunk_id, chunk_location))
.unwrap();
assert_eq!(chunk.id, chunk_id);
for entry in chunk.entries {
Expand Down
36 changes: 27 additions & 9 deletions cmd/zfs_object_agent/zettaobject/src/data_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,27 +99,45 @@ impl DataObjectPhys {
pub async fn get(
object_access: &ObjectAccess,
guid: PoolGuid,
obj: ObjectId,
object: ObjectId,
bypass_cache: bool,
) -> Result<Self> {
let this = Self::get_from_key(object_access, &Self::key(guid, obj), bypass_cache).await?;
let buf = match bypass_cache {
true => {
object_access
.get_object_uncached(Self::key(guid, object))
.await?
}
false => object_access.get_object(Self::key(guid, object)).await?,
};
let begin = Instant::now();
let this: DataObjectPhys = bincode::deserialize(&buf)
.with_context(|| format!("Failed to decode contents of {}", Self::key(guid, object)))?;
trace!(
"{:?}: deserialized {} blocks from {} bytes in {}ms",
this.object,
this.blocks.len(),
buf.len(),
begin.elapsed().as_millis()
);
assert_eq!(this.guid, guid);
assert_eq!(this.object, obj);
assert_eq!(this.object, object);
this.verify();
Ok(this)
}

pub async fn get_from_key(
object_access: &ObjectAccess,
key: &str,
key: String,
bypass_cache: bool,
) -> Result<Self> {
let buf = match bypass_cache {
true => object_access.get_object_uncached(key).await?,
false => object_access.get_object(key).await?,
true => object_access.get_object_uncached(key.clone()).await?,
false => object_access.get_object(key.clone()).await?,
};
let begin = Instant::now();
let this: DataObjectPhys =
bincode::deserialize(&buf).context(format!("Failed to decode contents of {}", key))?;
let this: DataObjectPhys = bincode::deserialize(&buf)
.with_context(|| format!("Failed to decode contents of {}", key))?;
trace!(
"{:?}: deserialized {} blocks from {} bytes in {}ms",
this.object,
Expand All @@ -143,7 +161,7 @@ impl DataObjectPhys {
);
self.verify();
object_access
.put_object(&Self::key(self.guid, self.object), contents)
.put_object(Self::key(self.guid, self.object), contents)
.await;
}

Expand Down
9 changes: 4 additions & 5 deletions cmd/zfs_object_agent/zettaobject/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ impl HeartbeatPhys {
}

pub async fn get(object_access: &ObjectAccess, id: Uuid) -> anyhow::Result<Self> {
let key = Self::key(id);
let buf = object_access.get_object_impl(&key, None).await?;
let buf = object_access.get_object_impl(Self::key(id), None).await?;
let this: Self = serde_json::from_slice(&buf)
.context(format!("Failed to decode contents of {}", key))?;
.with_context(|| format!("Failed to decode contents of {}", Self::key(id)))?;
debug!("got {:#?}", this);
assert_eq!(this.id, id);
Ok(this)
Expand All @@ -53,12 +52,12 @@ impl HeartbeatPhys {
debug!("putting {:#?}", self);
let buf = serde_json::to_vec(&self).unwrap();
object_access
.put_object_timed(&Self::key(self.id), buf, timeout)
.put_object_timed(Self::key(self.id), buf, timeout)
.await
}

pub async fn delete(object_access: &ObjectAccess, id: Uuid) {
object_access.delete_object(&Self::key(id)).await;
object_access.delete_object(Self::key(id)).await;
}
}

Expand Down
Loading

0 comments on commit e7bed08

Please sign in to comment.