Skip to content

Commit

Permalink
fs: propagate flush for stdout / stderr. (#1528)
Browse files Browse the repository at this point in the history
  • Loading branch information
jothan authored and carllerche committed Sep 13, 2019
1 parent c0a64d6 commit 5b8fc19
Showing 1 changed file with 32 additions and 9 deletions.
41 changes: 32 additions & 9 deletions tokio-fs/src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use self::State::*;
pub(crate) struct Blocking<T> {
inner: Option<T>,
state: State<T>,
/// true if the lower IO layer needs flushing
need_flush: bool,
}

#[derive(Debug)]
Expand All @@ -39,6 +41,7 @@ impl<T> Blocking<T> {
Blocking {
inner: Some(inner),
state: State::Idle(Some(Buf::with_capacity(0))),
need_flush: false,
}
}
}
Expand Down Expand Up @@ -119,6 +122,7 @@ where

(res, buf, inner)
}));
self.need_flush = true;

return Ready(Ok(n));
}
Expand All @@ -135,16 +139,35 @@ where
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let (res, buf, inner) = match self.state {
Idle(_) => return Ready(Ok(())),
Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx)),
};

// The buffer is not used here
self.state = Idle(Some(buf));
self.inner = Some(inner);
loop {
let need_flush = self.need_flush;
match self.state {
// The buffer is not used here
Idle(ref mut buf_cell) => {
if need_flush {
let buf = buf_cell.take().unwrap();
let mut inner = self.inner.take().unwrap();

self.state = Busy(sys::run(move || {
let res = inner.flush().map(|_| 0);
(res, buf, inner)
}));

self.need_flush = false;
} else {
return Ready(Ok(()));
}
}
Busy(ref mut rx) => {
let (res, buf, inner) = ready!(Pin::new(rx).poll(cx));
self.state = Idle(Some(buf));
self.inner = Some(inner);

Ready(res.map(|_| ()))
// If error, return
res?;
}
}
}
}

fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Expand Down

0 comments on commit 5b8fc19

Please sign in to comment.