Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simple benchmarks #75

Merged
merged 7 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,17 @@ cargo test -p hdfs-native --features token,kerberos,intergation-test
```

### Python tests
See the [Python README](./python/README.md)
See the [Python README](./python/README.md)

## Running benchmarks
Some of the benchmarks compare performance to the JVM based client through libhdfs via the fs-hdfs3 crate. Because of that, some extra setup is required to run the benchmarks:

```bash
export HADOOP_CONF_DIR=$(pwd)/crates/hdfs-native/target/test
export CLASSPATH=$(hadoop classpath)
```

then you can run the benchmarks with
```bash
cargo bench -p hdfs-native --features benchmark,integration-test
```
15 changes: 12 additions & 3 deletions crates/hdfs-native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ prost-types = "0.12"
roxmltree = "0.18"
socket2 = "0.5"
thiserror = "1"
tokio = { workspace = true, features = ["rt", "net", "io-util", "macros", "sync", "time"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "net", "io-util", "macros", "sync", "time"] }
url = "2"
users = { version = "0.11", default-features = false }
uuid = { version = "1", features = ["v4"] }
Expand All @@ -37,8 +37,9 @@ prost-build = { version = "0.12", optional = true }
protobuf-src = { version = "1.1", optional = true }

[dev-dependencies]
criterion = "0.5"
criterion = { version = "0.5", features = ["async_tokio", "async_futures"] }
env_logger = "0.10"
fs-hdfs3 = "0.1.12"
serial_test = "2.0.0"
tempfile = "3"
which = "4"
Expand All @@ -49,8 +50,16 @@ token = ["gsasl-sys"]

generate-protobuf = ["prost-build", "protobuf-src"]
integration-test = ["which"]
benchmark = []
benchmark = ["which"]

[[bench]]
name = "ec"
harness = false

[[bench]]
name = "io"
harness = false

[[bench]]
name = "rpc"
harness = false
110 changes: 110 additions & 0 deletions crates/hdfs-native/benches/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use std::collections::HashSet;

use bytes::{Buf, BufMut, BytesMut};
use criterion::*;
use hdfs::hdfs::get_hdfs;
use hdfs_native::{minidfs::MiniDfs, Client, WriteOptions};

async fn write_file(client: &Client, ints: usize) {
let mut writer = client
.create("/bench", WriteOptions::default())
.await
.unwrap();

let mut data = BytesMut::with_capacity(ints * 4);
for i in 0..ints {
data.put_u32(i as u32);
}
writer.write(data.freeze()).await.unwrap();
writer.close().await.unwrap();
}

fn bench(c: &mut Criterion) {
let _ = env_logger::builder().is_test(true).try_init();

let _dfs = MiniDfs::with_features(&HashSet::new());
let client = Client::default();

let ints_to_write: usize = 128 * 1024 * 1024; // 128 MiB file

let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

rt.block_on(async { write_file(&client, ints_to_write).await });

let fs = get_hdfs().unwrap();

let mut group = c.benchmark_group("read");
group.throughput(Throughput::Bytes((ints_to_write * 4) as u64));
group.sample_size(10);

let reader = rt.block_on(client.read("/bench")).unwrap();
group.bench_function("read-native", |b| {
b.to_async(&rt).iter(|| async {
// let reader = client.read("/bench").await.unwrap();

reader.read_range(0, reader.file_length()).await.unwrap()
})
});
group.sample_size(10);
group.bench_function("read-libhdfs", |b| {
b.iter(|| {
let mut buf = BytesMut::zeroed(ints_to_write * 4);
let mut bytes_read = 0;
let fs = get_hdfs().unwrap();
let reader = fs.open("/bench").unwrap();

while bytes_read < ints_to_write * 4 {
bytes_read += reader
.read(&mut buf[bytes_read..ints_to_write * 4])
.unwrap() as usize;
}
reader.close().unwrap();
buf
})
});

let mut data_to_write = BytesMut::with_capacity(ints_to_write * 4);
for i in 0..ints_to_write {
data_to_write.put_i32(i as i32);
}

let buf = data_to_write.freeze();

drop(group);

let mut group = c.benchmark_group("write");
group.throughput(Throughput::Bytes((ints_to_write * 4) as u64));
group.sample_size(10);

group.bench_function("write-native", |b| {
b.to_async(&rt).iter(|| async {
let mut writer = client
.create("/bench-write", WriteOptions::default().overwrite(true))
.await
.unwrap();

writer.write(buf.clone()).await.unwrap();
writer.close().await.unwrap();
})
});

group.sample_size(10);
group.bench_function("write-libhdfs", |b| {
b.iter(|| {
let mut buf = buf.clone();
let writer = fs.create_with_overwrite("/bench-write", true).unwrap();

while buf.remaining() > 0 {
let written = writer.write(&buf[..]).unwrap();
buf.advance(written as usize);
}
writer.close().unwrap();
})
});
}

criterion_group!(benches, bench);
criterion_main!(benches);
41 changes: 41 additions & 0 deletions crates/hdfs-native/benches/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::collections::HashSet;

use criterion::*;
use hdfs::hdfs::get_hdfs;
use hdfs_native::{minidfs::MiniDfs, Client, WriteOptions};

fn bench(c: &mut Criterion) {
let _ = env_logger::builder().is_test(true).try_init();

let _dfs = MiniDfs::with_features(&HashSet::new());
let client = Client::default();

let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

rt.block_on(async {
client
.create("/bench", WriteOptions::default())
.await
.unwrap()
.close()
.await
.unwrap();
});

let fs = get_hdfs().unwrap();

let mut group = c.benchmark_group("rpc");
group.bench_function("getFileInfo-native", |b| {
b.to_async(&rt)
.iter(|| async { client.get_file_info("/bench").await.unwrap() })
});
group.bench_function("getFileInfo-libhdfs", |b| {
b.iter(|| fs.get_file_status("/bench").unwrap())
});
}

criterion_group!(benches, bench);
criterion_main!(benches);
9 changes: 6 additions & 3 deletions crates/hdfs-native/src/hdfs/block_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,16 @@ impl ReplicatedBlockStream {
}

async fn next_packet(&mut self) -> Result<Option<Bytes>> {
if self.len == 0 {
return Ok(None);
}
if self.connection.is_none() {
self.select_next_datanode().await?;
}
let conn = self.connection.as_mut().unwrap();

if self.len == 0 {
conn.send_read_success().await?;
return Ok(None);
}

let packet = conn.read_packet().await?;

let packet_offset = if self.offset > packet.header.offset_in_block as usize {
Expand Down
14 changes: 14 additions & 0 deletions crates/hdfs-native/src/hdfs/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,20 @@ impl DatanodeConnection {
Ok(Packet::new(header, checksum, data))
}

pub(crate) async fn send_read_success(&mut self) -> Result<()> {
let client_read_status = hdfs::ClientReadStatusProto {
status: hdfs::Status::ChecksumOk as i32,
};

self.stream
.write_all(&client_read_status.encode_length_delimited_to_vec())
.await?;
self.stream.flush().await?;
self.stream.shutdown().await?;

Ok(())
}

pub(crate) fn split(self) -> (DatanodeReader, DatanodeWriter) {
let (reader, writer) = self.stream.into_inner().into_split();
let reader = DatanodeReader {
Expand Down
2 changes: 1 addition & 1 deletion crates/hdfs-native/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub(crate) mod ec;
pub(crate) mod error;
pub mod file;
pub(crate) mod hdfs;
#[cfg(feature = "integration-test")]
#[cfg(any(feature = "integration-test", feature = "benchmark"))]
pub mod minidfs;
pub(crate) mod proto;
pub(crate) mod security;
Expand Down
1 change: 0 additions & 1 deletion crates/hdfs-native/src/minidfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ impl MiniDfs {
};

env::set_var("HADOOP_CONF_DIR", "target/test");

MiniDfs {
process: child,
url: url.to_string(),
Expand Down
Loading