Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make client able to use non-Send executor #3184

Merged
merged 28 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d88ec30
create example
Ruben2424 Mar 9, 2023
44587d6
make anonymous futures structs
Ruben2424 May 21, 2023
55753a9
remove exec and fix warnings
Ruben2424 Jun 6, 2023
ad57f1c
remove bound
Ruben2424 Jun 6, 2023
4132aa6
fmt
Ruben2424 Jun 6, 2023
4138669
use right future
Ruben2424 Jun 6, 2023
ce4619e
fix features ci
Ruben2424 Jun 6, 2023
4ac835a
fix ffi
Ruben2424 Jun 8, 2023
27eb1e2
Merge branch 'master' into issue-3017
Ruben2424 Jun 8, 2023
ed95666
move client example to single_threaded example
Ruben2424 Jun 8, 2023
beb3ee4
Merge branch 'issue-3017' of https://github.com/Ruben2424/hyper into …
Ruben2424 Jun 8, 2023
c4a51b3
remove from cargo toml
Ruben2424 Jun 8, 2023
7cbfc72
use pin_project_lite
Ruben2424 Jun 9, 2023
d3821a9
deny warnings
Ruben2424 Jun 9, 2023
2394949
error bounds
Ruben2424 Jun 9, 2023
8e0f907
fix test
Ruben2424 Jun 9, 2023
6d44c29
sealed ExecutorClient
Ruben2424 Jun 9, 2023
c7d59fe
better error message
Ruben2424 Jun 9, 2023
0269396
make it work also for io types
Ruben2424 Jun 9, 2023
3a78a57
improve example
Ruben2424 Jun 12, 2023
be5c654
fmt
Ruben2424 Jun 12, 2023
b39d5d4
Merge branch 'master' into issue-3017
Ruben2424 Jun 16, 2023
15be46f
fix merge fail
Ruben2424 Jun 16, 2023
8391604
fix error bounds
Ruben2424 Jun 16, 2023
a276efc
Merge branch 'master' into issue-3017
Ruben2424 Jun 19, 2023
3dd0579
Merge branch 'master' into issue-3017
Ruben2424 Jun 29, 2023
98b0599
Merge branch 'issue-3017' of https://github.com/Ruben2424/hyper into …
Ruben2424 Jun 29, 2023
25ff95c
Merge branch 'master' into issue-3017
Ruben2424 Jun 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 157 additions & 14 deletions examples/single_threaded.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
#![deny(warnings)]

use http_body_util::BodyExt;
use hyper::server::conn::http2;
use std::cell::Cell;
use std::net::SocketAddr;
use std::rc::Rc;
use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpListener;

use hyper::body::{Body as HttpBody, Bytes, Frame};
use hyper::service::service_fn;
use hyper::Request;
use hyper::{Error, Response};
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use tokio::net::TcpStream;

struct Body {
// Our Body type is !Send and !Sync:
Expand Down Expand Up @@ -40,28 +45,57 @@ impl HttpBody for Body {
}
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
fn main() {
pretty_env_logger::init();

// Configure a runtime that runs everything on the current thread
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build runtime");

// Combine it with a `LocalSet, which means it can spawn !Send futures...
let local = tokio::task::LocalSet::new();
local.block_on(&rt, run())
let server = thread::spawn(move || {
// Configure a runtime for the server that runs everything on the current thread
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build runtime");

// Combine it with a `LocalSet, which means it can spawn !Send futures...
let local = tokio::task::LocalSet::new();
local.block_on(&rt, server()).unwrap();
});

let client = thread::spawn(move || {
// Configure a runtime for the client that runs everything on the current thread
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build runtime");

// Combine it with a `LocalSet, which means it can spawn !Send futures...
let local = tokio::task::LocalSet::new();
local
.block_on(
&rt,
client("http://localhost:3000".parse::<hyper::Uri>().unwrap()),
)
.unwrap();
});

server.join().unwrap();
client.join().unwrap();
}

async fn run() -> Result<(), Box<dyn std::error::Error>> {
let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
async fn server() -> Result<(), Box<dyn std::error::Error>> {
let mut stdout = io::stdout();

let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
// Using a !Send request counter is fine on 1 thread...
let counter = Rc::new(Cell::new(0));

let listener = TcpListener::bind(addr).await?;
println!("Listening on http://{}", addr);

stdout
.write_all(format!("Listening on http://{}", addr).as_bytes())
.await
.unwrap();
stdout.flush().await.unwrap();

loop {
let (stream, _) = listener.accept().await?;

Expand All @@ -80,12 +114,121 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
.serve_connection(stream, service)
.await
{
println!("Error serving connection: {:?}", err);
let mut stdout = io::stdout();
stdout
.write_all(format!("Error serving connection: {:?}", err).as_bytes())
.await
.unwrap();
stdout.flush().await.unwrap();
}
});
}
}

struct IOTypeNotSend {
_marker: PhantomData<*const ()>,
stream: TcpStream,
}

impl IOTypeNotSend {
fn new(stream: TcpStream) -> Self {
Self {
_marker: PhantomData,
stream,
}
}
}

impl AsyncWrite for IOTypeNotSend {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
Pin::new(&mut self.stream).poll_write(cx, buf)
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.stream).poll_flush(cx)
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.stream).poll_shutdown(cx)
}
}

impl AsyncRead for IOTypeNotSend {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.stream).poll_read(cx, buf)
}
}

async fn client(url: hyper::Uri) -> Result<(), Box<dyn std::error::Error>> {
let host = url.host().expect("uri has no host");
let port = url.port_u16().unwrap_or(80);
let addr = format!("{}:{}", host, port);
let stream = TcpStream::connect(addr).await?;

let stream = IOTypeNotSend::new(stream);

let (mut sender, conn) = hyper::client::conn::http2::handshake(LocalExec, stream).await?;

tokio::task::spawn_local(async move {
if let Err(err) = conn.await {
let mut stdout = io::stdout();
stdout
.write_all(format!("Connection failed: {:?}", err).as_bytes())
.await
.unwrap();
stdout.flush().await.unwrap();
}
});

let authority = url.authority().unwrap().clone();

// Make 4 requests
for _ in 0..4 {
let req = Request::builder()
.uri(url.clone())
.header(hyper::header::HOST, authority.as_str())
.body(Body::from("test".to_string()))?;

let mut res = sender.send_request(req).await?;

let mut stdout = io::stdout();
stdout
.write_all(format!("Response: {}\n", res.status()).as_bytes())
.await
.unwrap();
stdout
.write_all(format!("Headers: {:#?}\n", res.headers()).as_bytes())
.await
.unwrap();
stdout.flush().await.unwrap();

// Print the response body
while let Some(next) = res.frame().await {
let frame = next?;
if let Some(chunk) = frame.data_ref() {
stdout.write_all(&chunk).await.unwrap();
}
}
stdout.write_all(b"\n-----------------\n").await.unwrap();
stdout.flush().await.unwrap();
}
Ok(())
}

// NOTE: This part is only needed for HTTP/2. HTTP/1 doesn't need an executor.
//
// Since the Server needs to spawn some background tasks, we needed
Expand Down
77 changes: 43 additions & 34 deletions src/client/conn/http2.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! HTTP/2 client connections

use std::error::Error as StdError;
use std::error::Error;
use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;
Expand All @@ -12,12 +12,10 @@ use tokio::io::{AsyncRead, AsyncWrite};
use super::super::dispatch;
use crate::body::{Body, Incoming as IncomingBody};
use crate::common::time::Time;
use crate::common::{
exec::{BoxSendFuture, Exec},
task, Future, Pin, Poll,
};
use crate::common::{task, Future, Pin, Poll};
use crate::proto;
use crate::rt::{Executor, Timer};
use crate::rt::bounds::ExecutorClient;
use crate::rt::Timer;

/// The sender side of an established connection.
pub struct SendRequest<B> {
Expand All @@ -37,20 +35,22 @@ impl<B> Clone for SendRequest<B> {
/// In most cases, this should just be spawned into an executor, so that it
/// can process incoming and outgoing messages, notice hangups, and the like.
#[must_use = "futures do nothing unless polled"]
pub struct Connection<T, B>
pub struct Connection<T, B, E>
where
T: AsyncRead + AsyncWrite + Send + 'static,
T: AsyncRead + AsyncWrite + 'static + Unpin,
B: Body + 'static,
E: ExecutorClient<B, T> + Unpin,
B::Error: Into<Box<dyn Error + Send + Sync>>,
{
inner: (PhantomData<T>, proto::h2::ClientTask<B>),
inner: (PhantomData<T>, proto::h2::ClientTask<B, E, T>),
}

/// A builder to configure an HTTP connection.
///
/// After setting options, the builder is used to create a handshake future.
#[derive(Clone, Debug)]
pub struct Builder {
pub(super) exec: Exec,
pub struct Builder<Ex> {
pub(super) exec: Ex,
pub(super) timer: Time,
h2_builder: proto::h2::client::Config,
}
Expand All @@ -59,13 +59,16 @@ pub struct Builder {
///
/// This is a shortcut for `Builder::new().handshake(io)`.
/// See [`client::conn`](crate::client::conn) for more.
pub async fn handshake<E, T, B>(exec: E, io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
pub async fn handshake<E, T, B>(
exec: E,
io: T,
) -> crate::Result<(SendRequest<B>, Connection<T, B, E>)>
where
E: Executor<BoxSendFuture> + Send + Sync + 'static,
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
T: AsyncRead + AsyncWrite + Unpin + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
B::Error: Into<Box<dyn Error + Send + Sync>>,
E: ExecutorClient<B, T> + Unpin + Clone,
{
Builder::new(exec).handshake(io).await
}
Expand Down Expand Up @@ -188,12 +191,13 @@ impl<B> fmt::Debug for SendRequest<B> {

// ===== impl Connection

impl<T, B> Connection<T, B>
impl<T, B, E> Connection<T, B, E>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
T: AsyncRead + AsyncWrite + Unpin + 'static,
B: Body + Unpin + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
B::Error: Into<Box<dyn Error + Send + Sync>>,
E: ExecutorClient<B, T> + Unpin,
{
/// Returns whether the [extended CONNECT protocol][1] is enabled or not.
///
Expand All @@ -209,22 +213,26 @@ where
}
}

impl<T, B> fmt::Debug for Connection<T, B>
impl<T, B, E> fmt::Debug for Connection<T, B, E>
where
T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
T: AsyncRead + AsyncWrite + fmt::Debug + 'static + Unpin,
B: Body + 'static,
E: ExecutorClient<B, T> + Unpin,
B::Error: Into<Box<dyn Error + Send + Sync>>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection").finish()
}
}

impl<T, B> Future for Connection<T, B>
impl<T, B, E> Future for Connection<T, B, E>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Body + Send + 'static,
T: AsyncRead + AsyncWrite + Unpin + 'static,
B: Body + 'static + Unpin,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: Unpin,
B::Error: Into<Box<dyn Error + Send + Sync>>,
E: ExecutorClient<B, T> + 'static + Send + Sync + Unpin,
{
type Output = crate::Result<()>;

Expand All @@ -239,22 +247,22 @@ where

// ===== impl Builder

impl Builder {
impl<Ex> Builder<Ex>
where
Ex: Clone,
{
/// Creates a new connection builder.
#[inline]
pub fn new<E>(exec: E) -> Builder
where
E: Executor<BoxSendFuture> + Send + Sync + 'static,
{
pub fn new(exec: Ex) -> Builder<Ex> {
Builder {
exec: Exec::new(exec),
exec,
timer: Time::Empty,
h2_builder: Default::default(),
}
}

/// Provide a timer to execute background HTTP2 tasks.
pub fn timer<M>(&mut self, timer: M) -> &mut Builder
pub fn timer<M>(&mut self, timer: M) -> &mut Builder<Ex>
where
M: Timer + Send + Sync + 'static,
{
Expand Down Expand Up @@ -388,12 +396,13 @@ impl Builder {
pub fn handshake<T, B>(
&self,
io: T,
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B, Ex>)>>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
T: AsyncRead + AsyncWrite + Unpin + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
B::Error: Into<Box<dyn Error + Send + Sync>>,
Ex: ExecutorClient<B, T> + Unpin,
{
let opts = self.clone();

Expand Down
Loading