Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make body::Sender and Body::channel private #2970

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ enum Kind {
/// [`Body::channel()`]: struct.Body.html#method.channel
/// [`Sender::abort()`]: struct.Sender.html#method.abort
#[must_use = "Sender does nothing unless sent on"]
pub struct Sender {
pub(crate) struct Sender {
want_rx: watch::Receiver,
data_tx: BodySender,
trailers_tx: Option<TrailersSender>,
Expand All @@ -75,7 +75,8 @@ impl Recv {
///
/// Useful when wanting to stream chunks from another thread.
#[inline]
pub fn channel() -> (Sender, Recv) {
#[allow(unused)]
pub(crate) fn channel() -> (Sender, Recv) {
Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
}

Expand Down Expand Up @@ -289,7 +290,7 @@ impl fmt::Debug for Recv {

impl Sender {
/// Check to see if this `Sender` can send more data.
pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
pub(crate) fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
// Check if the receiver end has tried polling for the body yet
ready!(self.poll_want(cx)?);
self.data_tx
Expand All @@ -311,15 +312,17 @@ impl Sender {
}

/// Send data on data channel when it is ready.
pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
#[allow(unused)]
pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
self.ready().await?;
self.data_tx
.try_send(Ok(chunk))
.map_err(|_| crate::Error::new_closed())
}

/// Send trailers on trailers channel.
pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
#[allow(unused)]
pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
let tx = match self.trailers_tx.take() {
Some(tx) => tx,
None => return Err(crate::Error::new_closed()),
Expand All @@ -339,14 +342,15 @@ impl Sender {
/// This is mostly useful for when trying to send from some other thread
/// that doesn't have an async context. If in an async context, prefer
/// `send_data()` instead.
pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
self.data_tx
.try_send(Ok(chunk))
.map_err(|err| err.into_inner().expect("just sent Ok"))
}

/// Aborts the body in an abnormal fashion.
pub fn abort(self) {
#[allow(unused)]
pub(crate) fn abort(self) {
let _ = self
.data_tx
// clone so the send works even if buffer is full
Expand Down
3 changes: 2 additions & 1 deletion src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ pub use http_body::Body as HttpBody;
pub use http_body::SizeHint;

pub use self::aggregate::aggregate;
pub use self::body::{Recv, Sender};
pub use self::body::Recv;
pub(crate) use self::body::Sender;
pub(crate) use self::length::DecodedLength;
pub use self::to_bytes::to_bytes;

Expand Down
30 changes: 19 additions & 11 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,7 @@ test! {
}

mod conn {
use std::error::Error;
use std::io::{self, Read, Write};
use std::net::{SocketAddr, TcpListener};
use std::pin::Pin;
Expand All @@ -1333,15 +1334,15 @@ mod conn {
use std::time::Duration;

use bytes::{Buf, Bytes};
use futures_channel::oneshot;
use futures_channel::{mpsc, oneshot};
use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt};
use http_body_util::Empty;
use hyper::upgrade::OnUpgrade;
use http_body_util::{Empty, StreamBody};
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _, ReadBuf};
use tokio::net::{TcpListener as TkTcpListener, TcpStream};

use hyper::body::HttpBody;
use hyper::client::conn;
use hyper::upgrade::OnUpgrade;
use hyper::{self, Method, Recv, Request, Response, StatusCode};

use super::{concat, s, support, tcp_connect, FutureHyperExt};
Expand Down Expand Up @@ -1524,17 +1525,23 @@ mod conn {

rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ()));

let (mut sender, body) = Recv::channel();
let (mut sender, recv) = mpsc::channel::<Result<Bytes, Box<dyn Error + Send + Sync>>>(0);

let sender = thread::spawn(move || {
sender.try_send_data("hello".into()).expect("try_send_data");
sender.try_send(Ok("hello".into())).expect("try_send_data");
support::runtime().block_on(rx).unwrap();
sender.abort();

// Aborts the body in an abnormal fashion.
let _ = sender.try_send(Err(Box::new(std::io::Error::new(
io::ErrorKind::Other,
"body write aborted",
))));
});

let req = Request::builder()
.method(Method::POST)
.uri("/")
.body(body)
.body(StreamBody::new(recv))
.unwrap();
let res = client.send_request(req);
rt.block_on(res).unwrap_err();
Expand Down Expand Up @@ -2111,7 +2118,7 @@ mod conn {
.http2_only(true)
.http2_keep_alive_interval(Duration::from_secs(1))
.http2_keep_alive_timeout(Duration::from_secs(1))
.handshake::<_, Recv>(io)
.handshake(io)
.await
.expect("http handshake");

Expand All @@ -2120,9 +2127,10 @@ mod conn {
});

// Use a channel to keep request stream open
let (_tx, body) = hyper::Recv::channel();
let req1 = http::Request::new(body);
let _resp = client.send_request(req1).await.expect("send_request");
let (_tx, recv) = mpsc::channel::<Result<Bytes, Box<dyn Error + Send + Sync>>>(0);
let req = http::Request::new(StreamBody::new(recv));

let _resp = client.send_request(req).await.expect("send_request");

// sleep longer than keepalive would trigger
tokio::time::sleep(Duration::from_secs(4)).await;
Expand Down