Skip to content

Commit

Permalink
net: support non-blocking vectored I/O (#3761)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kestrer authored May 14, 2021
1 parent d846bf2 commit 0b93bd5
Show file tree
Hide file tree
Showing 4 changed files with 374 additions and 4 deletions.
140 changes: 140 additions & 0 deletions tokio/src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,84 @@ impl TcpStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}

/// Try to read data from the stream into the provided buffers, returning
/// how many bytes were read.
///
/// Data is copied to fill each buffer in order, with the final buffer
/// written to possibly being only partially filled. This method behaves
/// equivalently to a single call to [`try_read()`] with concatenated
/// buffers.
///
/// Receives any pending data from the socket but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read_vectored()` is non-blocking, the buffer does not have to be
/// stored by the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`try_read()`]: TcpStream::try_read()
/// [`readable()`]: TcpStream::readable()
/// [`ready()`]: TcpStream::ready()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the stream's read half is closed
/// and will no longer yield data. If the stream is not ready to read data
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::error::Error;
/// use std::io::{self, IoSliceMut};
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// // Connect to a peer
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// loop {
/// // Wait for the socket to be readable
/// stream.readable().await?;
///
/// // Creating the buffer **after** the `await` prevents it from
/// // being stored in the async task.
/// let mut buf_a = [0; 512];
/// let mut buf_b = [0; 1024];
/// let mut bufs = [
/// IoSliceMut::new(&mut buf_a),
/// IoSliceMut::new(&mut buf_b),
/// ];
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match stream.try_read_vectored(&mut bufs) {
/// Ok(0) => break,
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
use std::io::Read;

self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
}

cfg_io_util! {
/// Try to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
Expand Down Expand Up @@ -775,6 +853,68 @@ impl TcpStream {
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}

/// Try to write several buffers to the stream, returning how many bytes
/// were written.
///
/// Data is written from each buffer in order, with the final buffer read
/// from possible being only partially consumed. This method behaves
/// equivalently to a single call to [`try_write()`] with concatenated
/// buffers.
///
/// This function is usually paired with `writable()`.
///
/// [`try_write()`]: TcpStream::try_write()
///
/// # Return
///
/// If data is successfully written, `Ok(n)` is returned, where `n` is the
/// number of bytes written. If the stream is not ready to write data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::TcpStream;
/// use std::error::Error;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// // Connect to a peer
/// let stream = TcpStream::connect("127.0.0.1:8080").await?;
///
/// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
///
/// loop {
/// // Wait for the socket to be writable
/// stream.writable().await?;
///
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match stream.try_write_vectored(&bufs) {
/// Ok(n) => {
/// break;
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
use std::io::Write;

self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
}

/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
Expand Down
140 changes: 140 additions & 0 deletions tokio/src/net/unix/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,84 @@ impl UnixStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}

/// Try to read data from the stream into the provided buffers, returning
/// how many bytes were read.
///
/// Data is copied to fill each buffer in order, with the final buffer
/// written to possibly being only partially filled. This method behaves
/// equivalently to a single call to [`try_read()`] with concatenated
/// buffers.
///
/// Receives any pending data from the socket but does not wait for new data
/// to arrive. On success, returns the number of bytes read. Because
/// `try_read_vectored()` is non-blocking, the buffer does not have to be
/// stored by the async task and can exist entirely on the stack.
///
/// Usually, [`readable()`] or [`ready()`] is used with this function.
///
/// [`try_read()`]: UnixStream::try_read()
/// [`readable()`]: UnixStream::readable()
/// [`ready()`]: UnixStream::ready()
///
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(0)` indicates the stream's read half is closed
/// and will no longer yield data. If the stream is not ready to read data
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UnixStream;
/// use std::error::Error;
/// use std::io::{self, IoSliceMut};
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// // Connect to a peer
/// let dir = tempfile::tempdir().unwrap();
/// let bind_path = dir.path().join("bind_path");
/// let stream = UnixStream::connect(bind_path).await?;
///
/// loop {
/// // Wait for the socket to be readable
/// stream.readable().await?;
///
/// // Creating the buffer **after** the `await` prevents it from
/// // being stored in the async task.
/// let mut buf_a = [0; 512];
/// let mut buf_b = [0; 1024];
/// let mut bufs = [
/// IoSliceMut::new(&mut buf_a),
/// IoSliceMut::new(&mut buf_b),
/// ];
///
/// // Try to read data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match stream.try_read_vectored(&mut bufs) {
/// Ok(0) => break,
/// Ok(n) => {
/// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
}

cfg_io_util! {
/// Try to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
Expand Down Expand Up @@ -487,6 +565,68 @@ impl UnixStream {
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}

/// Try to write several buffers to the stream, returning how many bytes
/// were written.
///
/// Data is written from each buffer in order, with the final buffer read
/// from possible being only partially consumed. This method behaves
/// equivalently to a single call to [`try_write()`] with concatenated
/// buffers.
///
/// This function is usually paired with `writable()`.
///
/// [`try_write()`]: UnixStream::try_write()
///
/// # Return
///
/// If data is successfully written, `Ok(n)` is returned, where `n` is the
/// number of bytes written. If the stream is not ready to write data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UnixStream;
/// use std::error::Error;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// // Connect to a peer
/// let dir = tempfile::tempdir().unwrap();
/// let bind_path = dir.path().join("bind_path");
/// let stream = UnixStream::connect(bind_path).await?;
///
/// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
///
/// loop {
/// // Wait for the socket to be writable
/// stream.writable().await?;
///
/// // Try to write data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match stream.try_write_vectored(&bufs) {
/// Ok(n) => {
/// break;
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e.into());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}

/// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`.
///
/// This function is intended to be used to wrap a UnixStream from the
Expand Down
49 changes: 47 additions & 2 deletions tokio/tests/tcp_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn try_read_write() {
tokio::task::yield_now().await;
}

// Fill the write buffer
// Fill the write buffer using non-vectored I/O
loop {
// Still ready
let mut writable = task::spawn(client.writable());
Expand All @@ -75,7 +75,7 @@ async fn try_read_write() {
let mut writable = task::spawn(client.writable());
assert_pending!(writable.poll());

// Drain the socket from the server end
// Drain the socket from the server end using non-vectored I/O
let mut read = vec![0; written.len()];
let mut i = 0;

Expand All @@ -92,6 +92,51 @@ async fn try_read_write() {
assert_eq!(read, written);
}

written.clear();
client.writable().await.unwrap();

// Fill the write buffer using vectored I/O
let data_bufs: Vec<_> = DATA.chunks(10).map(io::IoSlice::new).collect();
loop {
// Still ready
let mut writable = task::spawn(client.writable());
assert_ready_ok!(writable.poll());

match client.try_write_vectored(&data_bufs) {
Ok(n) => written.extend(&DATA[..n]),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
break;
}
Err(e) => panic!("error = {:?}", e),
}
}

{
// Write buffer full
let mut writable = task::spawn(client.writable());
assert_pending!(writable.poll());

// Drain the socket from the server end using vectored I/O
let mut read = vec![0; written.len()];
let mut i = 0;

while i < read.len() {
server.readable().await.unwrap();

let mut bufs: Vec<_> = read[i..]
.chunks_mut(0x10000)
.map(io::IoSliceMut::new)
.collect();
match server.try_read_vectored(&mut bufs) {
Ok(n) => i += n,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("error = {:?}", e),
}
}

assert_eq!(read, written);
}

// Now, we listen for shutdown
drop(client);

Expand Down
Loading

0 comments on commit 0b93bd5

Please sign in to comment.