Skip to content

Commit

Permalink
rollback zero-copy implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
zephyrchien committed Nov 8, 2021
1 parent 2631155 commit d089e3d
Showing 1 changed file with 26 additions and 35 deletions.
61 changes: 26 additions & 35 deletions src/relay/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use cfg_if::cfg_if;
cfg_if! {
if #[cfg(feature = "tfo")] {
use tfo::TcpStream;
use tfo::{ReadHalf, WriteHalf};
pub use tfo::TcpListener;
} else {
use tokio::net::TcpStream;
Expand Down Expand Up @@ -111,8 +110,9 @@ mod normal_copy {
mod zero_copy {
use super::*;
use std::ops::Drop;
use std::os::unix::io::AsRawFd;
use std::io::{Error, ErrorKind};
use tokio::io::Interest;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

struct Pipe(pub i32, pub i32);

Expand Down Expand Up @@ -164,55 +164,51 @@ mod zero_copy {
errno == EWOULDBLOCK || errno == EAGAIN
}

#[inline]
fn clear_readiness(x: &TcpStream, interest: Interest) {
let _ = x.try_io(interest, || {
Err(Error::new(ErrorKind::WouldBlock, "")) as Result<()>
});
}

pub async fn copy(r: ReadHalf<'_>, mut w: WriteHalf<'_>) -> Result<()> {
use std::os::unix::io::AsRawFd;
use tokio::io::AsyncWriteExt;
// init pipe
pub async fn copy<X, Y, R, W>(mut r: R, mut w: W) -> Result<()>
where
X: AsRawFd,
Y: AsRawFd,
R: AsyncRead + AsRef<X> + Unpin,
W: AsyncWrite + AsRef<Y> + Unpin,
{
// create pipe
let pipe = Pipe::create()?;
let (rpipe, wpipe) = (pipe.0, pipe.1);
// rw ref
let rx = r.as_ref();
let wx = w.as_ref();
// rw raw fd
let rfd = rx.as_raw_fd();
let wfd = wx.as_raw_fd();
// ctrl
// get raw fd
let rfd = r.as_ref().as_raw_fd();
let wfd = w.as_ref().as_raw_fd();
let mut n: usize = 0;
let mut done = false;

'LOOP: loop {
// read until the socket buffer is empty
// or the pipe is filled
rx.readable().await?;
// clear readiness (EPOLLIN)
r.read(&mut [0u8; 0]).await?;
while n < BUFFER_SIZE {
match splice_n(rfd, wpipe, BUFFER_SIZE - n) {
x if x > 0 => n += x as usize,
// read EOF
// after this the read() syscall always returns 0
x if x == 0 => {
done = true;
break;
}
x if x < 0 && is_wouldblock() => {
clear_readiness(rx, Interest::READABLE);
break;
}
// the recv socket buffer is drained out
x if x < 0 && is_wouldblock() => break,
// error occurs
_ => break 'LOOP,
}
}
// write until the pipe is empty
while n > 0 {
wx.writable().await?;
// clear readiness (EPOLLOUT)
w.write(&[0u8; 0]).await?;
match splice_n(rpipe, wfd, n) {
x if x > 0 => n -= x as usize,
x if x < 0 && is_wouldblock() => {
clear_readiness(wx, Interest::WRITABLE)
}
// continue to write
x if x < 0 && is_wouldblock() => {}
// error occurs
_ => break 'LOOP,
}
}
Expand All @@ -222,12 +218,7 @@ mod zero_copy {
}
}

if done {
w.shutdown().await?;
Ok(())
} else {
Err(Error::new(ErrorKind::ConnectionReset, "connection reset"))
}
Ok(())
}
}

Expand Down

0 comments on commit d089e3d

Please sign in to comment.