-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(body): add
body::aggregate
and body::to_bytes
functions
Adds utility functions to `hyper::body` to help asynchronously collecting all the buffers of some `HttpBody` into one. - `aggregate` will collect all into an `impl Buf` without copying the contents. This is ideal if you don't need a contiguous buffer. - `to_bytes` will copy all the data into a single contiguous `Bytes` buffer.
- Loading branch information
1 parent
5a59875
commit 8ba9a8d
Showing
15 changed files
with
282 additions
and
128 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
#![feature(test)] | ||
#![deny(warnings)] | ||
|
||
extern crate test; | ||
|
||
use bytes::Buf; | ||
use futures_util::stream; | ||
use futures_util::StreamExt; | ||
use hyper::body::Body; | ||
|
||
macro_rules! bench_stream { | ||
($bencher:ident, bytes: $bytes:expr, count: $count:expr, $total_ident:ident, $body_pat:pat, $block:expr) => {{ | ||
let mut rt = tokio::runtime::Builder::new() | ||
.basic_scheduler() | ||
.build() | ||
.expect("rt build"); | ||
|
||
let $total_ident: usize = $bytes * $count; | ||
$bencher.bytes = $total_ident as u64; | ||
let __s: &'static [&'static [u8]] = &[&[b'x'; $bytes] as &[u8]; $count] as _; | ||
|
||
$bencher.iter(|| { | ||
rt.block_on(async { | ||
let $body_pat = Body::wrap_stream( | ||
stream::iter(__s.iter()).map(|&s| Ok::<_, std::convert::Infallible>(s)), | ||
); | ||
$block; | ||
}); | ||
}); | ||
}}; | ||
} | ||
|
||
macro_rules! benches { | ||
($($name:ident, $bytes:expr, $count:expr;)+) => ( | ||
mod aggregate { | ||
use super::*; | ||
|
||
$( | ||
#[bench] | ||
fn $name(b: &mut test::Bencher) { | ||
bench_stream!(b, bytes: $bytes, count: $count, total, body, { | ||
let buf = hyper::body::aggregate(body).await.unwrap(); | ||
assert_eq!(buf.remaining(), total); | ||
}); | ||
} | ||
)+ | ||
} | ||
|
||
mod manual_into_vec { | ||
use super::*; | ||
|
||
$( | ||
#[bench] | ||
fn $name(b: &mut test::Bencher) { | ||
bench_stream!(b, bytes: $bytes, count: $count, total, mut body, { | ||
let mut vec = Vec::new(); | ||
while let Some(chunk) = body.next().await { | ||
vec.extend_from_slice(&chunk.unwrap()); | ||
} | ||
assert_eq!(vec.len(), total); | ||
}); | ||
} | ||
)+ | ||
} | ||
|
||
mod to_bytes { | ||
use super::*; | ||
|
||
$( | ||
#[bench] | ||
fn $name(b: &mut test::Bencher) { | ||
bench_stream!(b, bytes: $bytes, count: $count, total, body, { | ||
let bytes = hyper::body::to_bytes(body).await.unwrap(); | ||
assert_eq!(bytes.len(), total); | ||
}); | ||
} | ||
)+ | ||
} | ||
) | ||
} | ||
|
||
// ===== Actual Benchmarks ===== | ||
|
||
benches! { | ||
bytes_1_000_count_2, 1_000, 2; | ||
bytes_1_000_count_10, 1_000, 10; | ||
bytes_10_000_count_1, 10_000, 1; | ||
bytes_10_000_count_10, 10_000, 10; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
use bytes::Buf; | ||
|
||
use super::HttpBody; | ||
use crate::common::buf::BufList; | ||
|
||
/// Aggregate the data buffers from a body asynchronously. | ||
/// | ||
/// The returned `impl Buf` groups the `Buf`s from the `HttpBody` without | ||
/// copying them. This is ideal if you don't require a contiguous buffer. | ||
pub async fn aggregate<T>(body: T) -> Result<impl Buf, T::Error> | ||
where | ||
T: HttpBody, | ||
{ | ||
let mut bufs = BufList::new(); | ||
|
||
futures_util::pin_mut!(body); | ||
while let Some(buf) = body.data().await { | ||
let buf = buf?; | ||
if buf.has_remaining() { | ||
bufs.push(buf); | ||
} | ||
} | ||
|
||
Ok(bufs) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
use bytes::{Buf, BufMut, Bytes}; | ||
|
||
use super::HttpBody; | ||
|
||
/// dox | ||
pub async fn to_bytes<T>(body: T) -> Result<Bytes, T::Error> | ||
where | ||
T: HttpBody, | ||
{ | ||
futures_util::pin_mut!(body); | ||
|
||
// If there's only 1 chunk, we can just return Buf::to_bytes() | ||
let mut first = if let Some(buf) = body.data().await { | ||
buf? | ||
} else { | ||
return Ok(Bytes::new()); | ||
}; | ||
|
||
let second = if let Some(buf) = body.data().await { | ||
buf? | ||
} else { | ||
return Ok(first.to_bytes()); | ||
}; | ||
|
||
// With more than 1 buf, we gotta flatten into a Vec first. | ||
let cap = first.remaining() + second.remaining() + body.size_hint().lower() as usize; | ||
let mut vec = Vec::with_capacity(cap); | ||
vec.put(first); | ||
vec.put(second); | ||
|
||
while let Some(buf) = body.data().await { | ||
vec.put(buf?); | ||
} | ||
|
||
Ok(vec.into()) | ||
} |
Oops, something went wrong.