Skip to content

Commit

Permalink
feat: Improve the performance of s3 services (#3622)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored Nov 20, 2023
1 parent 84dfbfb commit e3100c6
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 68 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions core/benches/vs_s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,8 @@ aws-sdk-s3 = "0.38"
aws-config = { version = "0.101.0", features = ["behavior-version-latest"] }
dotenvy = "0.15"
aws-credential-types = { version = "0.101.0", features = ["hardcoded-credentials"] }
tracing-subscriber = { version = "0.3", features = [
"env-filter",
"tracing-log",
] }

47 changes: 28 additions & 19 deletions core/benches/vs_s3/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,52 @@ This benchmark compares the performance of OpenDAL with the performance of the `

## Goal

We expect OpenDAL to match `aws_sdk_s3` in speed: the throughput of OpenDAL should be within a `10%` range of `aws_sdk_s3`.
We expect OpenDAL to match `aws_sdk_s3` in speed: the throughput of OpenDAL should be within a `5%` range of `aws_sdk_s3`.

## Notes

Please run bench case separately, otherwise the result will be affected by each other since we are sharing the same runtime.

## Usage

For test: `cargo run`

```shell
Testing read/aws_s3_sdk_collect
> cargo run
Testing read/opendal_s3_reader
Success
Testing read/aws_s3_sdk_into_async_read
Success
Testing read/aws_s3_sdk_into_async_read_with_size_known
Testing read/opendal_s3_reader_with_capacity
Success
Testing read/opendal_s3
Success
Testing read/opendal_s3_with_range
Testing read/aws_s3_sdk_into_async_read_with_capacity
Success
```

For bench case: `cargo run --release -- opendal_s3_reader --bench`

```shell
> cargo run --release -- opendal_s3_reader --bench
read/opendal_s3_reader time: [12.773 ms 13.004 ms 13.232 ms]
thrpt: [1.1809 GiB/s 1.2016 GiB/s 1.2232 GiB/s]
```

For bench: `cargo run --release -- --bench`

```shell
read/aws_s3_sdk_collect time: [47.264 ms 47.378 ms 47.504 ms]
thrpt: [336.82 MiB/s 337.71 MiB/s 338.53 MiB/s]
> cargo run --release -- --bench
read/opendal_s3_reader time: [12.773 ms 13.004 ms 13.232 ms]
thrpt: [1.1809 GiB/s 1.2016 GiB/s 1.2232 GiB/s]

read/aws_s3_sdk_into_async_read
time: [9.8422 ms 11.607 ms 13.703 ms]
thrpt: [1.1403 GiB/s 1.3462 GiB/s 1.5876 GiB/s]

read/aws_s3_sdk_into_async_read_with_size_known
time: [7.9572 ms 8.1055 ms 8.2552 ms]
thrpt: [1.8927 GiB/s 1.9277 GiB/s 1.9636 GiB/s]
time: [12.527 ms 12.842 ms 13.158 ms]
thrpt: [1.1875 GiB/s 1.2168 GiB/s 1.2473 GiB/s]

read/opendal_s3 time: [8.9068 ms 9.2614 ms 9.6912 ms]
thrpt: [1.6123 GiB/s 1.6871 GiB/s 1.7543 GiB/s]
read/opendal_s3_reader_with_capacity
time: [9.6098 ms 9.8133 ms 10.017 ms]
thrpt: [1.5599 GiB/s 1.5922 GiB/s 1.6259 GiB/s]

read/opendal_s3_with_range
time: [8.5459 ms 8.7592 ms 8.9739 ms]
thrpt: [1.7412 GiB/s 1.7838 GiB/s 1.8284 GiB/s]
read/aws_s3_sdk_into_async_read_with_capacity
time: [9.8970 ms 10.113 ms 10.329 ms]
thrpt: [1.5128 GiB/s 1.5451 GiB/s 1.5788 GiB/s]
```
59 changes: 23 additions & 36 deletions core/benches/vs_s3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ use tokio::io::AsyncReadExt;

fn main() {
let _ = dotenvy::dotenv();
let _ = tracing_subscriber::fmt()
.pretty()
.with_test_writer()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

let endpoint = env::var("OPENDAL_S3_ENDPOINT").unwrap();
let access_key = env::var("OPENDAL_S3_ACCESS_KEY_ID").unwrap();
Expand Down Expand Up @@ -62,30 +67,21 @@ fn bench_read(c: &mut Criterion, op: Operator, s3_client: aws_sdk_s3::Client, bu
let mut group = c.benchmark_group("read");
group.throughput(criterion::Throughput::Bytes(16 * 1024 * 1024));

let path = TEST_RUNTIME.block_on(prepare(op.clone()));
TEST_RUNTIME.block_on(prepare(op.clone()));

group.bench_function("aws_s3_sdk_collect", |b| {
group.bench_function("opendal_s3_reader", |b| {
b.to_async(&*TEST_RUNTIME).iter(|| async {
let _: Vec<u8> = s3_client
.get_object()
.bucket(bucket)
.key(&path)
.send()
.await
.unwrap()
.body
.collect()
.await
.unwrap()
.to_vec();
let mut r = op.reader("file").await.unwrap();
let mut bs = Vec::new();
let _ = r.read_to_end(&mut bs).await.unwrap();
});
});
group.bench_function("aws_s3_sdk_into_async_read", |b| {
b.to_async(&*TEST_RUNTIME).iter(|| async {
let mut r = s3_client
.get_object()
.bucket(bucket)
.key(&path)
.key("file")
.send()
.await
.unwrap()
Expand All @@ -95,12 +91,20 @@ fn bench_read(c: &mut Criterion, op: Operator, s3_client: aws_sdk_s3::Client, bu
let _ = r.read_to_end(&mut bs).await.unwrap();
});
});
group.bench_function("aws_s3_sdk_into_async_read_with_size_known", |b| {

group.bench_function("opendal_s3_reader_with_capacity", |b| {
b.to_async(&*TEST_RUNTIME).iter(|| async {
let mut r = op.reader("file").await.unwrap();
let mut bs = Vec::with_capacity(16 * 1024 * 1024);
let _ = r.read_to_end(&mut bs).await.unwrap();
});
});
group.bench_function("aws_s3_sdk_into_async_read_with_capacity", |b| {
b.to_async(&*TEST_RUNTIME).iter(|| async {
let mut r = s3_client
.get_object()
.bucket(bucket)
.key(&path)
.key("file")
.send()
.await
.unwrap()
Expand All @@ -110,31 +114,14 @@ fn bench_read(c: &mut Criterion, op: Operator, s3_client: aws_sdk_s3::Client, bu
let _ = r.read_to_end(&mut bs).await.unwrap();
});
});
group.bench_function("opendal_s3", |b| {
b.to_async(&*TEST_RUNTIME).iter(|| async {
let _: Vec<u8> = op.read(&path).await.unwrap();
});
});
group.bench_function("opendal_s3_with_range", |b| {
b.to_async(&*TEST_RUNTIME).iter(|| async {
let _: Vec<u8> = op
.read_with(&path)
.range(0..16 * 1024 * 1024)
.await
.unwrap();
});
});

group.finish()
}

async fn prepare(op: Operator) -> String {
async fn prepare(op: Operator) {
let mut rng = thread_rng();
let mut content = vec![0; 16 * 1024 * 1024];
rng.fill_bytes(&mut content);

let name = uuid::Uuid::new_v4().to_string();
op.write(&name, content).await.unwrap();

name
op.write("file", content.clone()).await.unwrap();
}
1 change: 1 addition & 0 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ where
A: Accessor<Reader = R>,
R: oio::Read,
{
#[inline]
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
use CompleteReader::*;

Expand Down
21 changes: 13 additions & 8 deletions core/src/raw/http_util/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@ use std::task::Poll;
use bytes::Buf;
use bytes::BufMut;
use bytes::Bytes;
use futures::StreamExt;

use crate::raw::*;
use crate::Error;
use crate::ErrorKind;
use crate::Result;
use crate::*;

/// Body used in async HTTP requests.
#[derive(Default)]
Expand Down Expand Up @@ -172,13 +169,21 @@ impl oio::Read for IncomingAsyncBody {
chunk
} else {
loop {
match ready!(self.poll_next(cx)) {
match ready!(self.inner.poll_next(cx)) {
// It's possible for underlying stream to return empty bytes, we should continue
// to fetch next one.
Some(Ok(bs)) if bs.is_empty() => continue,
Some(Ok(bs)) => break bs,
Some(Ok(bs)) => {
self.consumed += bs.len() as u64;
break bs;
}
Some(Err(err)) => return Poll::Ready(Err(err)),
None => return Poll::Ready(Ok(0)),
None => {
if let Some(size) = self.size {
Self::check(size, self.consumed)?;
}
return Poll::Ready(Ok(0));
}
}
}
};
Expand Down Expand Up @@ -211,7 +216,7 @@ impl oio::Read for IncomingAsyncBody {
return Poll::Ready(Some(Ok(bs)));
}

let res = match ready!(self.inner.poll_next_unpin(cx)) {
let res = match ready!(self.inner.poll_next(cx)) {
Some(Ok(bs)) => {
self.consumed += bs.len() as u64;
Some(Ok(bs))
Expand Down
8 changes: 3 additions & 5 deletions core/src/types/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,9 @@ impl tokio::io::AsyncRead for Reader {
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let b = buf.initialize_unfilled();
let n = ready!(self.inner.poll_read(cx, b))?;
unsafe {
buf.assume_init(n);
}
// Safety: We make sure that we will set filled correctly.
unsafe { buf.assume_init(buf.remaining()) }
let n = ready!(self.inner.poll_read(cx, buf.initialize_unfilled()))?;
buf.advance(n);
Poll::Ready(Ok(()))
}
Expand Down

0 comments on commit e3100c6

Please sign in to comment.