From 46af376d3c8d2f24514b1d221f6a096a0715ffdc Mon Sep 17 00:00:00 2001 From: Gus Caplan Date: Sat, 16 Dec 2023 11:19:26 -0800 Subject: [PATCH 1/4] io: add Join utility --- tokio/src/io/join.rs | 122 +++++++++++++++++++++++++++++++++++++++++ tokio/src/io/mod.rs | 2 + tokio/tests/io_join.rs | 83 ++++++++++++++++++++++++++++ 3 files changed, 207 insertions(+) create mode 100644 tokio/src/io/join.rs create mode 100644 tokio/tests/io_join.rs diff --git a/tokio/src/io/join.rs b/tokio/src/io/join.rs new file mode 100644 index 00000000000..84030948af8 --- /dev/null +++ b/tokio/src/io/join.rs @@ -0,0 +1,122 @@ +//! Join two values implementing `AsyncRead` and `AsyncWrite` into a single one. + +use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; + +use std::fmt; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +cfg_io_util! { + pin_project_lite::pin_project! { + /// Joins two values implementing `AsyncRead` and `AsyncWrite` into a + /// single handle. + pub struct Join { + #[pin] + reader: R, + #[pin] + writer: W, + } + } + + impl Join + where + R: AsyncRead + Unpin, + W: AsyncWrite + Unpin, + { + /// Join two values implementing `AsyncRead` and `AsyncWrite` into a + /// single handle. + pub fn new(reader: R, writer: W) -> Self { + Self { reader, writer } + } + + /// Splits this `Join` back into its `AsyncRead` and `AsyncWrite` + /// components. + pub fn split(self) -> (R, W) { + (self.reader, self.writer) + } + + /// Returns a reference to the inner reader. + pub fn reader(&self) -> &R { + &self.reader + } + + /// Returns a reference to the inner writer. + pub fn writer(&self) -> &W { + &self.writer + } + + /// Returns a mutable reference to the inner reader. + pub fn reader_mut(&mut self) -> &mut R { + &mut self.reader + } + + /// Returns a mutable reference to the inner writer. + pub fn writer_mut(&mut self) -> &mut W { + &mut self.writer + } + } + + impl AsyncRead for Join + where + R: AsyncRead + Unpin, + { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + self.project().reader.poll_read(cx, buf) + } + } + + impl AsyncWrite for Join + where + W: AsyncWrite + Unpin, + { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().writer.poll_write(cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().writer.poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().writer.poll_shutdown(cx) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>] + ) -> Poll> { + self.project().writer.poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.writer.is_write_vectored() + } + } + + impl fmt::Debug for Join + where R: fmt::Debug, W: fmt::Debug + { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Join") + .field("writer", &self.writer) + .field("reader", &self.reader) + .finish() + } + } +} diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 0fd6cc2c5cb..11bdf75f4cf 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -265,6 +265,8 @@ cfg_io_std! { cfg_io_util! { mod split; pub use split::{split, ReadHalf, WriteHalf}; + mod join; + pub use join::Join; pub(crate) mod seek; pub(crate) mod util; diff --git a/tokio/tests/io_join.rs b/tokio/tests/io_join.rs new file mode 100644 index 00000000000..29f2b7745ca --- /dev/null +++ b/tokio/tests/io_join.rs @@ -0,0 +1,83 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support panic recovery + +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Join, ReadBuf}; + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +struct R; + +impl AsyncRead for R { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + buf.put_slice(&[b'z']); + Poll::Ready(Ok(())) + } +} + +struct W; + +impl AsyncWrite for W { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &[u8], + ) -> Poll> { + Poll::Ready(Ok(1)) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _bufs: &[io::IoSlice<'_>], + ) -> Poll> { + Poll::Ready(Ok(2)) + } + + fn is_write_vectored(&self) -> bool { + true + } +} + +#[test] +fn is_send_and_sync() { + fn assert_bound() {} + + assert_bound::>(); +} + +#[test] +fn method_delegation() { + let mut rw = Join::new(R, W); + let mut buf = [0; 1]; + + tokio_test::block_on(async move { + assert_eq!(1, rw.read(&mut buf).await.unwrap()); + assert_eq!(b'z', buf[0]); + + assert_eq!(1, rw.write(&[b'x']).await.unwrap()); + assert_eq!( + 2, + rw.write_vectored(&[io::IoSlice::new(&[b'x'])]) + .await + .unwrap() + ); + assert!(rw.is_write_vectored()); + + assert!(rw.flush().await.is_ok()); + assert!(rw.shutdown().await.is_ok()); + }); +} From 71543db5fc1b57d2fada0a33e274001a9f7d5ffc Mon Sep 17 00:00:00 2001 From: Gus Caplan Date: Sat, 16 Dec 2023 12:04:29 -0800 Subject: [PATCH 2/4] review --- tokio/src/io/join.rs | 214 +++++++++++++++++++++-------------------- tokio/src/io/mod.rs | 2 +- tokio/tests/io_join.rs | 6 +- 3 files changed, 115 insertions(+), 107 deletions(-) diff --git a/tokio/src/io/join.rs b/tokio/src/io/join.rs index 84030948af8..ea867b0a980 100644 --- a/tokio/src/io/join.rs +++ b/tokio/src/io/join.rs @@ -7,116 +7,124 @@ use std::io; use std::pin::Pin; use std::task::{Context, Poll}; -cfg_io_util! { - pin_project_lite::pin_project! { - /// Joins two values implementing `AsyncRead` and `AsyncWrite` into a - /// single handle. - pub struct Join { - #[pin] - reader: R, - #[pin] - writer: W, - } +/// Join two values implementing `AsyncRead` and `AsyncWrite` into a +/// single handle. +pub fn join(reader: R, writer: W) -> Join +where + R: AsyncRead, + W: AsyncWrite, +{ + Join { reader, writer } +} + +pin_project_lite::pin_project! { + /// Joins two values implementing `AsyncRead` and `AsyncWrite` into a + /// single handle. + pub struct Join { + #[pin] + reader: R, + #[pin] + writer: W, + } +} + +impl Join +where + R: AsyncRead, + W: AsyncWrite, +{ + /// Splits this `Join` back into its `AsyncRead` and `AsyncWrite` + /// components. + pub fn into_inner(self) -> (R, W) { + (self.reader, self.writer) + } + + /// Returns a reference to the inner reader. + pub fn reader(&self) -> &R { + &self.reader + } + + /// Returns a reference to the inner writer. + pub fn writer(&self) -> &W { + &self.writer + } + + /// Returns a mutable reference to the inner reader. + pub fn reader_mut(&mut self) -> &mut R { + &mut self.reader + } + + /// Returns a mutable reference to the inner writer. + pub fn writer_mut(&mut self) -> &mut W { + &mut self.writer } - impl Join - where - R: AsyncRead + Unpin, - W: AsyncWrite + Unpin, - { - /// Join two values implementing `AsyncRead` and `AsyncWrite` into a - /// single handle. - pub fn new(reader: R, writer: W) -> Self { - Self { reader, writer } - } - - /// Splits this `Join` back into its `AsyncRead` and `AsyncWrite` - /// components. - pub fn split(self) -> (R, W) { - (self.reader, self.writer) - } - - /// Returns a reference to the inner reader. - pub fn reader(&self) -> &R { - &self.reader - } - - /// Returns a reference to the inner writer. - pub fn writer(&self) -> &W { - &self.writer - } - - /// Returns a mutable reference to the inner reader. - pub fn reader_mut(&mut self) -> &mut R { - &mut self.reader - } - - /// Returns a mutable reference to the inner writer. - pub fn writer_mut(&mut self) -> &mut W { - &mut self.writer - } + /// Returns a pinned mutable reference to the inner reader. + pub fn reader_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { + self.project().reader } - impl AsyncRead for Join - where - R: AsyncRead + Unpin, - { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - self.project().reader.poll_read(cx, buf) - } + /// Returns a pinned mutable reference to the inner writer. + pub fn writer_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { + self.project().writer } +} + +impl AsyncRead for Join +where + R: AsyncRead + Unpin, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + self.project().reader.poll_read(cx, buf) + } +} - impl AsyncWrite for Join - where - W: AsyncWrite + Unpin, - { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.project().writer.poll_write(cx, buf) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.project().writer.poll_flush(cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.project().writer.poll_shutdown(cx) - } - - fn poll_write_vectored( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &[io::IoSlice<'_>] - ) -> Poll> { - self.project().writer.poll_write_vectored(cx, bufs) - } - - fn is_write_vectored(&self) -> bool { - self.writer.is_write_vectored() - } +impl AsyncWrite for Join +where + W: AsyncWrite + Unpin, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().writer.poll_write(cx, buf) } - impl fmt::Debug for Join - where R: fmt::Debug, W: fmt::Debug - { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Join") - .field("writer", &self.writer) - .field("reader", &self.reader) - .finish() - } + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().writer.poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().writer.poll_shutdown(cx) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + self.project().writer.poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.writer.is_write_vectored() + } +} + +impl fmt::Debug for Join +where + R: fmt::Debug, + W: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Join") + .field("writer", &self.writer) + .field("reader", &self.reader) + .finish() } } diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 11bdf75f4cf..ff35a0e0f7e 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -266,7 +266,7 @@ cfg_io_util! { mod split; pub use split::{split, ReadHalf, WriteHalf}; mod join; - pub use join::Join; + pub use join::{join, Join}; pub(crate) mod seek; pub(crate) mod util; diff --git a/tokio/tests/io_join.rs b/tokio/tests/io_join.rs index 29f2b7745ca..69b09393311 100644 --- a/tokio/tests/io_join.rs +++ b/tokio/tests/io_join.rs @@ -1,7 +1,7 @@ #![warn(rust_2018_idioms)] -#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support panic recovery +#![cfg(feature = "full")] -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Join, ReadBuf}; +use tokio::io::{join, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Join, ReadBuf}; use std::io; use std::pin::Pin; @@ -61,7 +61,7 @@ fn is_send_and_sync() { #[test] fn method_delegation() { - let mut rw = Join::new(R, W); + let mut rw = join(R, W); let mut buf = [0; 1]; tokio_test::block_on(async move { From 4ddf877b026b7e7270e589044d60255871853250 Mon Sep 17 00:00:00 2001 From: Gus Caplan Date: Sat, 16 Dec 2023 12:35:59 -0800 Subject: [PATCH 3/4] review --- tokio/src/io/join.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/io/join.rs b/tokio/src/io/join.rs index ea867b0a980..fb40b23c609 100644 --- a/tokio/src/io/join.rs +++ b/tokio/src/io/join.rs @@ -72,7 +72,7 @@ where impl AsyncRead for Join where - R: AsyncRead + Unpin, + R: AsyncRead, { fn poll_read( self: Pin<&mut Self>, @@ -85,7 +85,7 @@ where impl AsyncWrite for Join where - W: AsyncWrite + Unpin, + W: AsyncWrite, { fn poll_write( self: Pin<&mut Self>, From d32ea4d3a7ff525b8477411f9bf7d59e06dc6ede Mon Sep 17 00:00:00 2001 From: Gus Caplan Date: Sat, 16 Dec 2023 14:51:18 -0800 Subject: [PATCH 4/4] review --- tokio/src/io/join.rs | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/tokio/src/io/join.rs b/tokio/src/io/join.rs index fb40b23c609..dbc7043b67e 100644 --- a/tokio/src/io/join.rs +++ b/tokio/src/io/join.rs @@ -2,7 +2,6 @@ use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; -use std::fmt; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -20,6 +19,7 @@ where pin_project_lite::pin_project! { /// Joins two values implementing `AsyncRead` and `AsyncWrite` into a /// single handle. + #[derive(Debug)] pub struct Join { #[pin] reader: R, @@ -115,16 +115,3 @@ where self.writer.is_write_vectored() } } - -impl fmt::Debug for Join -where - R: fmt::Debug, - W: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Join") - .field("writer", &self.writer) - .field("reader", &self.reader) - .finish() - } -}