From d397e77c9039edf241ae1fb438bdb205f2bb2dfb Mon Sep 17 00:00:00 2001 From: Piepmatz <3590829+cptpiepmatz@users.noreply.github.com> Date: Thu, 21 Apr 2022 09:50:12 +0200 Subject: [PATCH] net: add `try_read_buf` for named pipes (#4626) --- tokio/src/net/windows/named_pipe.rs | 164 ++++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) diff --git a/tokio/src/net/windows/named_pipe.rs b/tokio/src/net/windows/named_pipe.rs index 550fd4df2bc..695b8eb3d39 100644 --- a/tokio/src/net/windows/named_pipe.rs +++ b/tokio/src/net/windows/named_pipe.rs @@ -12,6 +12,10 @@ use std::task::{Context, Poll}; use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready}; use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle}; +cfg_io_util! { + use bytes::BufMut; +} + // Hide imports which are not used when generating documentation. #[cfg(not(docsrs))] mod doc { @@ -528,6 +532,86 @@ impl NamedPipeServer { .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) } + cfg_io_util! { + /// Tries to read data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: NamedPipeServer::readable() + /// [`ready()`]: NamedPipeServer::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let server = named_pipe::ServerOptions::new().create(PIPE_NAME)?; + /// + /// loop { + /// // Wait for the socket to be readable + /// server.readable().await?; + /// + /// let mut buf = Vec::with_capacity(4096); + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match server.try_read_buf(&mut buf) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read_buf(&self, buf: &mut B) -> io::Result { + self.io.registration().try_io(Interest::READABLE, || { + use std::io::Read; + + let dst = buf.chunk_mut(); + let dst = + unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit] as *mut [u8]) }; + + // Safety: We trust `NamedPipeServer::read` to have filled up `n` bytes in the + // buffer. + let n = (&*self.io).read(dst)?; + + unsafe { + buf.advance_mut(n); + } + + Ok(n) + }) + } + } + /// Waits for the pipe to become writable. /// /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually @@ -1186,6 +1270,86 @@ impl NamedPipeClient { .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) } + cfg_io_util! { + /// Tries to read data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: NamedPipeClient::readable() + /// [`ready()`]: NamedPipeClient::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::windows::named_pipe; + /// use std::error::Error; + /// use std::io; + /// + /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable"; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?; + /// + /// loop { + /// // Wait for the socket to be readable + /// client.readable().await?; + /// + /// let mut buf = Vec::with_capacity(4096); + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match client.try_read_buf(&mut buf) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read_buf(&self, buf: &mut B) -> io::Result { + self.io.registration().try_io(Interest::READABLE, || { + use std::io::Read; + + let dst = buf.chunk_mut(); + let dst = + unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit] as *mut [u8]) }; + + // Safety: We trust `NamedPipeClient::read` to have filled up `n` bytes in the + // buffer. + let n = (&*self.io).read(dst)?; + + unsafe { + buf.advance_mut(n); + } + + Ok(n) + }) + } + } + /// Waits for the pipe to become writable. /// /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually