Skip to content

Commit

Permalink
Fix lio_listio resubmission with Tokio 1.13.0 and later
Browse files Browse the repository at this point in the history
According to @Darksonn there have never been a guarantee that a future
will get poll()'d a second time if you do not call poll_ready() a second
time after it previously returned Poll::Ready.  But due to a bug in
Tokio 1.12.0 and earlier we got away with it.  Fix it the right way, and
update the Tokio dependency now that the PollAio stuff is merged.

Fixes #27
  • Loading branch information
asomers committed Nov 22, 2021
1 parent 0fb990b commit b632a1c
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 68 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ futures = "0.3.0"
mio = "0.7.7"
nix = "0.22.0"
mio-aio = { version = "0.6.0", features = ["tokio"] }
tokio = { git = "https://github.com/tokio-rs/tokio", rev = "957ed3e", features = [ "net" ] }
tokio = { version = "1.12.0", features = [ "net" ] }

[dev-dependencies]
rstest = "0.11.0"
getopts = "0.2.18"
sysctl = "0.1"
tempfile = "3.0"
tokio = { git = "https://github.com/tokio-rs/tokio", rev = "957ed3e", features = [ "net", "rt" ] }
tokio = { version = "1.12.0", features = [ "net", "rt" ] }

[[test]]
name = "functional"
Expand Down
134 changes: 69 additions & 65 deletions src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@ macro_rules! lio_resubmit {
match result {
Ok(()) => {
$self.state = AioState::InProgress;
Poll::Pending
None
},
Err(LioError::EINCOMPLETE) => Poll::Pending,
Err(LioError::EAGAIN) =>
Poll::Ready(Err(Errno::EAGAIN)),
Err(LioError::EIO(_)) =>
Poll::Ready(Err(Errno::EIO)),
Err(LioError::EINCOMPLETE) => None,
Err(LioError::EAGAIN) => Some(Err(Errno::EAGAIN)),
Err(LioError::EIO(_)) => Some(Err(Errno::EIO)),
}
}
}
Expand Down Expand Up @@ -167,12 +165,7 @@ impl<'a> Future for ReadvAt<'a> {
type Output = Result<usize, nix::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let poll_result = self.op
.as_mut()
.unwrap()
.poll_ready(cx);
if let AioState::Allocated = self.state {
assert!(poll_result.is_pending());
let result = (*self.op.as_mut().unwrap()).0.submit();
match result {
Ok(()) => self.state = AioState::InProgress,
Expand All @@ -188,42 +181,50 @@ impl<'a> Future for ReadvAt<'a> {
},
}
}
match poll_result {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => conv_poll_err(e),
Poll::Ready(Ok(ev)) => {
if AioState::Incomplete == self.state {
lio_resubmit!(self, ev)
} else {
let r = self.op.take()
.unwrap()
.into_inner()
.0
.into_results(|mut iter|
iter.try_fold(0, |total, lr|
lr.result.map(|r| total + r as usize)
)
);
if let Ok(v) = r {
if let Some((accum, ob)) = &mut self.bufsav {
// Copy results back into the individual buffers
let mut i = 0;
let mut j = 0;
let mut total = 0;
while total < v {
let z = (v - total).min(ob[i].len() - j);
ob[i][j..j + z]
.copy_from_slice(&accum[total..total + z]);
j += z;
total += z;
if j == ob[i].len() {
j = 0;
i += 1;
loop {
let poll_result = self.op
.as_mut()
.unwrap()
.poll_ready(cx);
match poll_result {
Poll::Pending => break Poll::Pending,
Poll::Ready(Err(e)) => break conv_poll_err(e),
Poll::Ready(Ok(ev)) => {
if AioState::Incomplete == self.state {
if let Some(r) = lio_resubmit!(self, ev) {
break Poll::Ready(r);
}
} else {
let r = self.op.take()
.unwrap()
.into_inner()
.0
.into_results(|mut iter|
iter.try_fold(0, |total, lr|
lr.result.map(|r| total + r as usize)
)
);
if let Ok(v) = r {
if let Some((accum, ob)) = &mut self.bufsav {
// Copy results back into the individual buffers
let mut i = 0;
let mut j = 0;
let mut tot = 0;
while tot < v {
let z = (v - tot).min(ob[i].len() - j);
ob[i][j..j + z]
.copy_from_slice(&accum[tot..tot + z]);
j += z;
tot += z;
if j == ob[i].len() {
j = 0;
i += 1;
}
}
}
}
break Poll::Ready(r)
}
Poll::Ready(r)
}
}
}
Expand All @@ -234,12 +235,7 @@ impl<'a> Future for WritevAt<'a> {
type Output = Result<usize, nix::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let poll_result = self.op
.as_mut()
.unwrap()
.poll_ready(cx);
if let AioState::Allocated = self.state {
assert!(poll_result.is_pending());
let result = (*self.op.as_mut().unwrap()).0.submit();
match result {
Ok(()) => self.state = AioState::InProgress,
Expand All @@ -255,23 +251,31 @@ impl<'a> Future for WritevAt<'a> {
},
}
}
match poll_result {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => conv_poll_err(e),
Poll::Ready(Ok(ev)) => {
if AioState::Incomplete == self.state {
lio_resubmit!(self, ev)
} else {
let r = self.op.take()
.unwrap()
.into_inner()
.0
.into_results(|mut iter|
iter.try_fold(0, |total, lr|
lr.result.map(|r| total + r as usize)
)
);
Poll::Ready(r)
loop {
let poll_result = self.op
.as_mut()
.unwrap()
.poll_ready(cx);
match poll_result {
Poll::Pending => break Poll::Pending,
Poll::Ready(Err(e)) => break conv_poll_err(e),
Poll::Ready(Ok(ev)) => {
if AioState::Incomplete == self.state {
if let Some(r) = lio_resubmit!(self, ev) {
break Poll::Ready(r);
}
} else {
let r = self.op.take()
.unwrap()
.into_inner()
.0
.into_results(|mut iter|
iter.try_fold(0, |total, lr|
lr.result.map(|r| total + r as usize)
)
);
break Poll::Ready(r);
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/lio_listio_incomplete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ macro_rules! t {
// A writev_at call fails because lio_listio(2) returns EIO. That means that
// some of the AioCbs may have been initiated, but not all.
// This test must run in its own process since it intentionally uses all of the
// system's AIO resources.
// process's AIO resources.
#[test]
fn writev_at_eio() {
let alm = sysconf(SysconfVar::AIO_LISTIO_MAX).expect("sysconf").unwrap();
Expand Down

0 comments on commit b632a1c

Please sign in to comment.