Skip to content

Commit

Permalink
feat: Add benchmark for buf write (#2922)
Browse files Browse the repository at this point in the history
* Add benchmark

Signed-off-by: Xuanwo <[email protected]>

* Improve

Signed-off-by: Xuanwo <[email protected]>

* Fix build

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Aug 24, 2023
1 parent cbac9b8 commit 1bb977b
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 21 deletions.
4 changes: 4 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ bench = false
harness = false
name = "ops"

[[bench]]
harness = false
name = "oio"

[[test]]
harness = false
name = "behavior"
Expand Down
29 changes: 29 additions & 0 deletions core/benches/oio/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod utils;
mod write;

use criterion::criterion_group;
use criterion::criterion_main;

criterion_group!(
benches,
write::bench_at_least_buf_write,
write::bench_exact_buf_write,
);
criterion_main!(benches);
52 changes: 52 additions & 0 deletions core/benches/oio/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use async_trait::async_trait;
use bytes::Bytes;
use opendal::raw::oio;
use opendal::raw::oio::Streamer;
use rand::prelude::ThreadRng;
use rand::RngCore;

/// BlackHoleWriter will discard all data written to it so we can measure the buffer's cost.
pub struct BlackHoleWriter;

#[async_trait]
impl oio::Write for BlackHoleWriter {
async fn write(&mut self, _: Bytes) -> opendal::Result<()> {
Ok(())
}

async fn sink(&mut self, _: u64, _: Streamer) -> opendal::Result<()> {
Ok(())
}

async fn abort(&mut self) -> opendal::Result<()> {
Ok(())
}

async fn close(&mut self) -> opendal::Result<()> {
Ok(())
}
}

pub fn gen_bytes(rng: &mut ThreadRng, size: usize) -> Bytes {
let mut content = vec![0; size];
rng.fill_bytes(&mut content);

content.into()
}
79 changes: 79 additions & 0 deletions core/benches/oio/write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use criterion::Criterion;
use once_cell::sync::Lazy;
use opendal::raw::oio::{AtLeastBufWriter, ExactBufWriter, Write};
use rand::thread_rng;
use size::Size;

use super::utils::*;

pub static TOKIO: Lazy<tokio::runtime::Runtime> =
Lazy::new(|| tokio::runtime::Runtime::new().expect("build tokio runtime"));

pub fn bench_at_least_buf_write(c: &mut Criterion) {
let mut group = c.benchmark_group("at_least_buf_write");

let mut rng = thread_rng();

for size in [
Size::from_kibibytes(4),
Size::from_kibibytes(256),
Size::from_mebibytes(4),
Size::from_mebibytes(16),
] {
let content = gen_bytes(&mut rng, size.bytes() as usize);

group.throughput(criterion::Throughput::Bytes(size.bytes() as u64));
group.bench_with_input(size.to_string(), &content, |b, content| {
b.to_async(&*TOKIO).iter(|| async {
let mut w = AtLeastBufWriter::new(BlackHoleWriter, 256 * 1024);
w.write(content.clone()).await.unwrap();
w.close().await.unwrap();
})
});
}

group.finish()
}

pub fn bench_exact_buf_write(c: &mut Criterion) {
let mut group = c.benchmark_group("exact_buf_write");

let mut rng = thread_rng();

for size in [
Size::from_kibibytes(4),
Size::from_kibibytes(256),
Size::from_mebibytes(4),
Size::from_mebibytes(16),
] {
let content = gen_bytes(&mut rng, size.bytes() as usize);

group.throughput(criterion::Throughput::Bytes(size.bytes() as u64));
group.bench_with_input(size.to_string(), &content, |b, content| {
b.to_async(&*TOKIO).iter(|| async {
let mut w = ExactBufWriter::new(BlackHoleWriter, 256 * 1024);
w.write(content.clone()).await.unwrap();
w.close().await.unwrap();
})
});
}

group.finish()
}
2 changes: 1 addition & 1 deletion core/src/raw/oio/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ impl ChunkedCursor {
#[cfg(test)]
fn concat(&self) -> Bytes {
let mut bs = BytesMut::new();
for v in &self.inner {
for v in self.inner.iter().skip(self.idx) {
bs.extend_from_slice(v);
}
bs.freeze()
Expand Down
54 changes: 34 additions & 20 deletions core/src/raw/oio/write/exact_buf_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
/// # TODO
///
/// We know every stream size, we can collect them into a buffer without chain them every time.
async fn sink(&mut self, size: u64, mut s: Streamer) -> Result<()> {
// Collect the stream into buffer directly if the buffet is not full.
if self.buffer_stream.is_none()
&& self.buffer.len() as u64 + size <= self.buffer_size as u64
{
self.buffer.push(s.collect().await?);
return Ok(());
}

async fn sink(&mut self, _: u64, mut s: Streamer) -> Result<()> {
if self.buffer.len() >= self.buffer_size {
let mut buf = self.buffer.clone();
let to_write = buf.split_to(self.buffer_size);
Expand Down Expand Up @@ -151,17 +143,14 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
}

async fn close(&mut self) -> Result<()> {
loop {
if let Some(stream) = self.buffer_stream.as_mut() {
let bs = stream.next().await.transpose()?;
match bs {
None => {
self.buffer_stream = None;
break;
}
Some(bs) => {
self.buffer.push(bs);
}
while let Some(stream) = self.buffer_stream.as_mut() {
let bs = stream.next().await.transpose()?;
match bs {
None => {
self.buffer_stream = None;
}
Some(bs) => {
self.buffer.push(bs);
}
}

Expand Down Expand Up @@ -235,6 +224,31 @@ mod tests {
}
}

#[tokio::test]
async fn test_exact_buf_writer_short_write() -> Result<()> {
let _ = tracing_subscriber::fmt()
.pretty()
.with_test_writer()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

let mut rng = thread_rng();
let mut expected = vec![0; 5];
rng.fill_bytes(&mut expected);

let mut w = ExactBufWriter::new(MockWriter { buf: vec![] }, 10);

w.write(Bytes::from(expected.clone())).await?;
w.close().await?;

assert_eq!(w.inner.buf.len(), expected.len());
assert_eq!(
format!("{:x}", Sha256::digest(&w.inner.buf)),
format!("{:x}", Sha256::digest(&expected))
);
Ok(())
}

#[tokio::test]
async fn test_fuzz_exact_buf_writer() -> Result<()> {
let _ = tracing_subscriber::fmt()
Expand Down

0 comments on commit 1bb977b

Please sign in to comment.