Skip to content

Commit

Permalink
feat(chunk): implement Extend and IntoIterator for Chunk
Browse files Browse the repository at this point in the history
The real reason to provide these implementations is so that `concat` on
the `Body` can work to easily join all the chunks into 1.
  • Loading branch information
seanmonstar committed Apr 9, 2017
1 parent 5c1cfa2 commit 78512bd
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
15 changes: 15 additions & 0 deletions src/http/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,18 @@ fn _assert_send_sync() {
_assert_send::<Chunk>();
_assert_sync::<Chunk>();
}

#[test]
fn test_body_stream_concat() {
use futures::{Sink, Stream, Future};
let (tx, body) = Body::pair();

::std::thread::spawn(move || {
let tx = tx.send(Ok("hello ".into())).wait().unwrap();
tx.send(Ok("world".into())).wait().unwrap();
});

let total = body.concat().wait().unwrap();
assert_eq!(total.as_ref(), b"hello world");

}
56 changes: 55 additions & 1 deletion src/http/chunk.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,41 @@
use std::fmt;
use std::mem;

use bytes::Bytes;
use bytes::{Bytes, BytesMut, BufMut};

/// A piece of a message body.
pub struct Chunk(Inner);

enum Inner {
Mut(BytesMut),
Shared(Bytes),
Swapping,
}

impl Inner {
fn as_bytes_mut(&mut self, reserve: usize) -> &mut BytesMut {
match *self {
Inner::Mut(ref mut bytes) => return bytes,
_ => ()
}

let bytes = match mem::replace(self, Inner::Swapping) {
Inner::Shared(bytes) => bytes,
_ => unreachable!(),
};

let bytes_mut = bytes.try_mut().unwrap_or_else(|bytes| {
let mut bytes_mut = BytesMut::with_capacity(reserve + bytes.len());
bytes_mut.put_slice(bytes.as_ref());
bytes_mut
});

*self = Inner::Mut(bytes_mut);
match *self {
Inner::Mut(ref mut bytes) => bytes,
_ => unreachable!(),
}
}
}

impl From<Vec<u8>> for Chunk {
Expand Down Expand Up @@ -46,7 +75,9 @@ impl From<Bytes> for Chunk {
impl From<Chunk> for Bytes {
fn from(chunk: Chunk) -> Bytes {
match chunk.0 {
Inner::Mut(bytes_mut) => bytes_mut.freeze(),
Inner::Shared(bytes) => bytes,
Inner::Swapping => unreachable!(),
}
}
}
Expand All @@ -64,7 +95,9 @@ impl AsRef<[u8]> for Chunk {
#[inline]
fn as_ref(&self) -> &[u8] {
match self.0 {
Inner::Mut(ref slice) => slice,
Inner::Shared(ref slice) => slice,
Inner::Swapping => unreachable!(),
}
}
}
Expand All @@ -75,3 +108,24 @@ impl fmt::Debug for Chunk {
fmt::Debug::fmt(self.as_ref(), f)
}
}

impl IntoIterator for Chunk {
type Item = u8;
type IntoIter = <Bytes as IntoIterator>::IntoIter;

fn into_iter(self) -> Self::IntoIter {
match self.0 {
Inner::Mut(bytes) => bytes.freeze().into_iter(),
Inner::Shared(bytes) => bytes.into_iter(),
Inner::Swapping => unreachable!(),
}
}
}

impl Extend<u8> for Chunk {
fn extend<T>(&mut self, iter: T) where T: IntoIterator<Item=u8> {
let iter = iter.into_iter();

self.0.as_bytes_mut(iter.size_hint().0).extend(iter);
}
}

0 comments on commit 78512bd

Please sign in to comment.