From 07518c8a1b4a8eee8a2757484bc73ce851438fbf Mon Sep 17 00:00:00 2001 From: J Robert Ray Date: Thu, 23 Mar 2023 11:59:59 -0700 Subject: [PATCH 1/8] stream: extend timeout doc test Include a test for timeout behavior of `timeout()`, demonstrating and verifying that it will not repeat timeouts. --- tokio-stream/src/stream_ext.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs index 52d32024ff3..56ff7f04b1d 100644 --- a/tokio-stream/src/stream_ext.rs +++ b/tokio-stream/src/stream_ext.rs @@ -941,7 +941,7 @@ pub trait StreamExt: Stream { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt, wrappers::IntervalStream}; /// use std::time::Duration; /// # let int_stream = stream::iter(1..=3); /// @@ -969,6 +969,17 @@ pub trait StreamExt: Stream { /// /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // Once a timeout error is received, no further events will be received + /// // unless the wrapped stream yields a value (timeouts do not repeat). + /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23))); + /// let timeout_stream = interval_stream.timeout(Duration::from_millis(9)); + /// tokio::pin!(timeout_stream); + /// + /// // Only one timeout will be received between values in the source stream. + /// assert!(timeout_stream.try_next().await.is_ok()); + /// assert!(timeout_stream.try_next().await.is_err()); + /// assert!(timeout_stream.try_next().await.is_ok()); /// # } /// ``` #[cfg(all(feature = "time"))] From 7c97156737ff85b8a92932dc6fe5eb4274da798f Mon Sep 17 00:00:00 2001 From: J Robert Ray Date: Thu, 23 Mar 2023 12:14:20 -0700 Subject: [PATCH 2/8] stream: add timeout_repeating Extend `Timeout` with a new `TimeoutMode` that controls the behavior of how timeouts are produced. A new extension method `timeout_repeating` makes use of a newly added behavior where the stream continuously produces timeouts at the specified interval rather than stalling after a single timeout. --- tokio-stream/src/stream_ext.rs | 87 +++++++++++++++++++++++++- tokio-stream/src/stream_ext/timeout.rs | 61 +++++++++++++----- 2 files changed, 129 insertions(+), 19 deletions(-) diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs index 56ff7f04b1d..5670318183d 100644 --- a/tokio-stream/src/stream_ext.rs +++ b/tokio-stream/src/stream_ext.rs @@ -57,7 +57,7 @@ use try_next::TryNext; cfg_time! { pub(crate) mod timeout; - use timeout::Timeout; + use timeout::{Timeout, TimeoutMode}; use tokio::time::Duration; mod throttle; use throttle::{throttle, Throttle}; @@ -924,7 +924,9 @@ pub trait StreamExt: Stream { /// If the wrapped stream yields a value before the deadline is reached, the /// value is returned. Otherwise, an error is returned. The caller may decide /// to continue consuming the stream and will eventually get the next source - /// stream value once it becomes available. + /// stream value once it becomes available. See + /// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative + /// where the timeouts will repeat. /// /// # Notes /// @@ -988,7 +990,86 @@ pub trait StreamExt: Stream { where Self: Sized, { - Timeout::new(self, duration) + Timeout::new(self, duration, TimeoutMode::Once) + } + + /// Applies a per-item timeout to the passed stream. + /// + /// `timeout_repeating()` takes a `Duration` that represents the maximum amount of + /// time each element of the stream has to complete before timing out. + /// + /// If the wrapped stream yields a value before the deadline is reached, the + /// value is returned. Otherwise, an error is returned. The caller may decide + /// to continue consuming the stream and will eventually get the next source + /// stream value once it becomes available. Unlike `timeout()`, if no value + /// becomes available before the deadline is reached, additional errors are + /// returned at the specified interval. See [`timeout`](StreamExt::timeout) + /// for an alternative where the timeouts do not repeat. + /// + /// # Notes + /// + /// This function consumes the stream passed into it and returns a + /// wrapped version of it. + /// + /// Polling the returned stream will continue to poll the inner stream even + /// if one or more items time out. + /// + /// # Examples + /// + /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use tokio_stream::{self as stream, StreamExt, wrappers::IntervalStream}; + /// use std::time::Duration; + /// # let int_stream = stream::iter(1..=3); + /// + /// let int_stream = int_stream.timeout_repeating(Duration::from_secs(1)); + /// tokio::pin!(int_stream); + /// + /// // When no items time out, we get the 3 elements in succession: + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If the second item times out, we get an error and continue polling the stream: + /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert!(int_stream.try_next().await.is_err()); + /// assert_eq!(int_stream.try_next().await, Ok(Some(2))); + /// assert_eq!(int_stream.try_next().await, Ok(Some(3))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // If we want to stop consuming the source stream the first time an + /// // element times out, we can use the `take_while` operator: + /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]); + /// let mut int_stream = int_stream.take_while(Result::is_ok); + /// + /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); + /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// + /// // Timeout errors will be continuously produced at the specified + /// // interval until the wrapped stream yields a value. + /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23))); + /// let timeout_stream = interval_stream.timeout_repeating(Duration::from_millis(9)); + /// tokio::pin!(timeout_stream); + /// + /// // Multiple timeouts will be received between values in the source stream. + /// assert!(timeout_stream.try_next().await.is_ok()); + /// assert!(timeout_stream.try_next().await.is_err()); + /// assert!(timeout_stream.try_next().await.is_err()); + /// assert!(timeout_stream.try_next().await.is_ok()); + /// # } + /// ``` + #[cfg(all(feature = "time"))] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + fn timeout_repeating(self, duration: Duration) -> Timeout + where + Self: Sized, + { + Timeout::new(self, duration, TimeoutMode::Repeating) } /// Slows down a stream by enforcing a delay between items. diff --git a/tokio-stream/src/stream_ext/timeout.rs b/tokio-stream/src/stream_ext/timeout.rs index a440d203ec4..e66d1c3738d 100644 --- a/tokio-stream/src/stream_ext/timeout.rs +++ b/tokio-stream/src/stream_ext/timeout.rs @@ -9,6 +9,15 @@ use pin_project_lite::pin_project; use std::fmt; use std::time::Duration; +/// Behavior of timeouts. +#[derive(Debug)] +pub(super) enum TimeoutMode { + /// A timeout will only fire once. + Once, + /// A timeout will fire repeatedly. + Repeating, +} + pin_project! { /// Stream returned by the [`timeout`](super::StreamExt::timeout) method. #[must_use = "streams do nothing unless polled"] @@ -20,6 +29,7 @@ pin_project! { deadline: Sleep, duration: Duration, poll_deadline: bool, + timeout_mode: TimeoutMode, } } @@ -28,7 +38,7 @@ pin_project! { pub struct Elapsed(()); impl Timeout { - pub(super) fn new(stream: S, duration: Duration) -> Self { + pub(super) fn new(stream: S, duration: Duration, timeout_mode: TimeoutMode) -> Self { let next = Instant::now() + duration; let deadline = tokio::time::sleep_until(next); @@ -37,6 +47,7 @@ impl Timeout { deadline, duration, poll_deadline: true, + timeout_mode, } } } @@ -45,7 +56,7 @@ impl Stream for Timeout { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let me = self.project(); + let mut me = self.project(); match me.stream.poll_next(cx) { Poll::Ready(v) => { @@ -59,28 +70,46 @@ impl Stream for Timeout { Poll::Pending => {} }; - if *me.poll_deadline { - ready!(me.deadline.poll(cx)); - *me.poll_deadline = false; - return Poll::Ready(Some(Err(Elapsed::new()))); - } + match me.timeout_mode { + TimeoutMode::Once => { + if *me.poll_deadline { + ready!(me.deadline.poll(cx)); + *me.poll_deadline = false; + return Poll::Ready(Some(Err(Elapsed::new()))); + } - Poll::Pending + Poll::Pending + } + TimeoutMode::Repeating => { + ready!(me.deadline.as_mut().poll(cx)); + let next = Instant::now() + *me.duration; + me.deadline.reset(next); + Poll::Ready(Some(Err(Elapsed::new()))) + } + } } fn size_hint(&self) -> (usize, Option) { let (lower, upper) = self.stream.size_hint(); - // The timeout stream may insert an error before and after each message - // from the underlying stream, but no more than one error between each - // message. Hence the upper bound is computed as 2x+1. + match self.timeout_mode { + TimeoutMode::Once => { + // The timeout stream may insert an error before and after each message + // from the underlying stream, but no more than one error between each + // message. Hence the upper bound is computed as 2x+1. - // Using a helper function to enable use of question mark operator. - fn twice_plus_one(value: Option) -> Option { - value?.checked_mul(2)?.checked_add(1) - } + // Using a helper function to enable use of question mark operator. + fn twice_plus_one(value: Option) -> Option { + value?.checked_mul(2)?.checked_add(1) + } - (lower, twice_plus_one(upper)) + (lower, twice_plus_one(upper)) + } + TimeoutMode::Repeating => { + // The timeout stream may insert an error an infinite number of times. + (lower, None) + } + } } } From 63c445b14874c347c9725a85fd306d89b8d1a703 Mon Sep 17 00:00:00 2001 From: J Robert Ray Date: Thu, 23 Mar 2023 15:24:41 -0700 Subject: [PATCH 3/8] stream: change timeout_repeating doctest Use a strategy for testing the repeating timeout behavior that is less sensitive to timing to avoid spurious failures. --- tokio-stream/src/stream_ext.rs | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs index 5670318183d..22a42f8f8ef 100644 --- a/tokio-stream/src/stream_ext.rs +++ b/tokio-stream/src/stream_ext.rs @@ -974,14 +974,14 @@ pub trait StreamExt: Stream { /// /// // Once a timeout error is received, no further events will be received /// // unless the wrapped stream yields a value (timeouts do not repeat). - /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23))); - /// let timeout_stream = interval_stream.timeout(Duration::from_millis(9)); + /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100))); + /// let timeout_stream = interval_stream.timeout(Duration::from_millis(10)); /// tokio::pin!(timeout_stream); /// /// // Only one timeout will be received between values in the source stream. /// assert!(timeout_stream.try_next().await.is_ok()); - /// assert!(timeout_stream.try_next().await.is_err()); - /// assert!(timeout_stream.try_next().await.is_ok()); + /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout"); + /// assert!(timeout_stream.try_next().await.is_ok(), "expected no more timeouts"); /// # } /// ``` #[cfg(all(feature = "time"))] @@ -1052,15 +1052,25 @@ pub trait StreamExt: Stream { /// /// // Timeout errors will be continuously produced at the specified /// // interval until the wrapped stream yields a value. - /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23))); - /// let timeout_stream = interval_stream.timeout_repeating(Duration::from_millis(9)); + /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100))); + /// let timeout_stream = interval_stream.timeout_repeating(Duration::from_millis(10)); /// tokio::pin!(timeout_stream); /// /// // Multiple timeouts will be received between values in the source stream. /// assert!(timeout_stream.try_next().await.is_ok()); - /// assert!(timeout_stream.try_next().await.is_err()); - /// assert!(timeout_stream.try_next().await.is_err()); - /// assert!(timeout_stream.try_next().await.is_ok()); + /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout"); + /// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout"); + /// // Will eventually receive another value from the source stream... + /// # let mut count = 0; + /// loop { + /// if timeout_stream.try_next().await.is_ok() { + /// break; + /// } + /// # count += 1; + /// # if count > 20 { + /// # panic!("did not receive another event from the wrapped stream") + /// # } + /// } /// # } /// ``` #[cfg(all(feature = "time"))] From e98018c84ceaf5525974ce3fe0d0405200be1b96 Mon Sep 17 00:00:00 2001 From: J Robert Ray Date: Fri, 24 Mar 2023 11:14:21 -0700 Subject: [PATCH 4/8] stream: use mocked time for timeout tests Use `start_paused = true` so the interval and timeouts in the stream timeout doc tests happen in a consistent order. --- tokio-stream/src/stream_ext.rs | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs index 22a42f8f8ef..8ae9935341a 100644 --- a/tokio-stream/src/stream_ext.rs +++ b/tokio-stream/src/stream_ext.rs @@ -941,7 +941,7 @@ pub trait StreamExt: Stream { /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): /// /// ``` - /// # #[tokio::main] + /// # #[tokio::main(flavor = "current_thread", start_paused = true)] /// # async fn main() { /// use tokio_stream::{self as stream, StreamExt, wrappers::IntervalStream}; /// use std::time::Duration; @@ -1019,7 +1019,7 @@ pub trait StreamExt: Stream { /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): /// /// ``` - /// # #[tokio::main] + /// # #[tokio::main(flavor = "current_thread", start_paused = true)] /// # async fn main() { /// use tokio_stream::{self as stream, StreamExt, wrappers::IntervalStream}; /// use std::time::Duration; @@ -1052,8 +1052,8 @@ pub trait StreamExt: Stream { /// /// // Timeout errors will be continuously produced at the specified /// // interval until the wrapped stream yields a value. - /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100))); - /// let timeout_stream = interval_stream.timeout_repeating(Duration::from_millis(10)); + /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23))); + /// let timeout_stream = interval_stream.timeout_repeating(Duration::from_millis(9)); /// tokio::pin!(timeout_stream); /// /// // Multiple timeouts will be received between values in the source stream. @@ -1061,16 +1061,7 @@ pub trait StreamExt: Stream { /// assert!(timeout_stream.try_next().await.is_err(), "expected one timeout"); /// assert!(timeout_stream.try_next().await.is_err(), "expected a second timeout"); /// // Will eventually receive another value from the source stream... - /// # let mut count = 0; - /// loop { - /// if timeout_stream.try_next().await.is_ok() { - /// break; - /// } - /// # count += 1; - /// # if count > 20 { - /// # panic!("did not receive another event from the wrapped stream") - /// # } - /// } + /// assert!(timeout_stream.try_next().await.is_ok(), "expected non-timeout"); /// # } /// ``` #[cfg(all(feature = "time"))] From 71e1c8b1b14d557b47704e78d8cf15637cbc4d60 Mon Sep 17 00:00:00 2001 From: J Robert Ray Date: Sun, 16 Apr 2023 01:21:10 -0700 Subject: [PATCH 5/8] stream: implement timeout_repeating with separate struct Revert adding conditional behavior to the `Timeout` struct and implement the new behavior in a new struct, while reusing the `Elapsed` struct to represent timeouts. --- tokio-stream/src/stream_ext.rs | 10 +-- tokio-stream/src/stream_ext/timeout.rs | 63 +++++------------- .../src/stream_ext/timeout_repeating.rs | 66 +++++++++++++++++++ 3 files changed, 89 insertions(+), 50 deletions(-) create mode 100644 tokio-stream/src/stream_ext/timeout_repeating.rs diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs index 8ae9935341a..9e7a30df8c7 100644 --- a/tokio-stream/src/stream_ext.rs +++ b/tokio-stream/src/stream_ext.rs @@ -57,7 +57,9 @@ use try_next::TryNext; cfg_time! { pub(crate) mod timeout; - use timeout::{Timeout, TimeoutMode}; + pub(crate) mod timeout_repeating; + use timeout::Timeout; + use timeout_repeating::TimeoutRepeating; use tokio::time::Duration; mod throttle; use throttle::{throttle, Throttle}; @@ -990,7 +992,7 @@ pub trait StreamExt: Stream { where Self: Sized, { - Timeout::new(self, duration, TimeoutMode::Once) + Timeout::new(self, duration) } /// Applies a per-item timeout to the passed stream. @@ -1066,11 +1068,11 @@ pub trait StreamExt: Stream { /// ``` #[cfg(all(feature = "time"))] #[cfg_attr(docsrs, doc(cfg(feature = "time")))] - fn timeout_repeating(self, duration: Duration) -> Timeout + fn timeout_repeating(self, duration: Duration) -> TimeoutRepeating where Self: Sized, { - Timeout::new(self, duration, TimeoutMode::Repeating) + TimeoutRepeating::new(self, duration) } /// Slows down a stream by enforcing a delay between items. diff --git a/tokio-stream/src/stream_ext/timeout.rs b/tokio-stream/src/stream_ext/timeout.rs index e66d1c3738d..17d1349022e 100644 --- a/tokio-stream/src/stream_ext/timeout.rs +++ b/tokio-stream/src/stream_ext/timeout.rs @@ -9,15 +9,6 @@ use pin_project_lite::pin_project; use std::fmt; use std::time::Duration; -/// Behavior of timeouts. -#[derive(Debug)] -pub(super) enum TimeoutMode { - /// A timeout will only fire once. - Once, - /// A timeout will fire repeatedly. - Repeating, -} - pin_project! { /// Stream returned by the [`timeout`](super::StreamExt::timeout) method. #[must_use = "streams do nothing unless polled"] @@ -29,16 +20,15 @@ pin_project! { deadline: Sleep, duration: Duration, poll_deadline: bool, - timeout_mode: TimeoutMode, } } -/// Error returned by `Timeout`. +/// Error returned by `Timeout` and `TimeoutRepeating`. #[derive(Debug, PartialEq, Eq)] pub struct Elapsed(()); impl Timeout { - pub(super) fn new(stream: S, duration: Duration, timeout_mode: TimeoutMode) -> Self { + pub(super) fn new(stream: S, duration: Duration) -> Self { let next = Instant::now() + duration; let deadline = tokio::time::sleep_until(next); @@ -47,7 +37,6 @@ impl Timeout { deadline, duration, poll_deadline: true, - timeout_mode, } } } @@ -56,7 +45,7 @@ impl Stream for Timeout { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut me = self.project(); + let me = self.project(); match me.stream.poll_next(cx) { Poll::Ready(v) => { @@ -70,46 +59,28 @@ impl Stream for Timeout { Poll::Pending => {} }; - match me.timeout_mode { - TimeoutMode::Once => { - if *me.poll_deadline { - ready!(me.deadline.poll(cx)); - *me.poll_deadline = false; - return Poll::Ready(Some(Err(Elapsed::new()))); - } - - Poll::Pending - } - TimeoutMode::Repeating => { - ready!(me.deadline.as_mut().poll(cx)); - let next = Instant::now() + *me.duration; - me.deadline.reset(next); - Poll::Ready(Some(Err(Elapsed::new()))) - } + if *me.poll_deadline { + ready!(me.deadline.poll(cx)); + *me.poll_deadline = false; + return Poll::Ready(Some(Err(Elapsed::new()))); } + + Poll::Pending } fn size_hint(&self) -> (usize, Option) { let (lower, upper) = self.stream.size_hint(); - match self.timeout_mode { - TimeoutMode::Once => { - // The timeout stream may insert an error before and after each message - // from the underlying stream, but no more than one error between each - // message. Hence the upper bound is computed as 2x+1. - - // Using a helper function to enable use of question mark operator. - fn twice_plus_one(value: Option) -> Option { - value?.checked_mul(2)?.checked_add(1) - } + // The timeout stream may insert an error before and after each message + // from the underlying stream, but no more than one error between each + // message. Hence the upper bound is computed as 2x+1. - (lower, twice_plus_one(upper)) - } - TimeoutMode::Repeating => { - // The timeout stream may insert an error an infinite number of times. - (lower, None) - } + // Using a helper function to enable use of question mark operator. + fn twice_plus_one(value: Option) -> Option { + value?.checked_mul(2)?.checked_add(1) } + + (lower, twice_plus_one(upper)) } } diff --git a/tokio-stream/src/stream_ext/timeout_repeating.rs b/tokio-stream/src/stream_ext/timeout_repeating.rs new file mode 100644 index 00000000000..509be8eb25a --- /dev/null +++ b/tokio-stream/src/stream_ext/timeout_repeating.rs @@ -0,0 +1,66 @@ +use crate::stream_ext::Fuse; +use crate::{Elapsed, Stream}; +use tokio::time::{Instant, Sleep}; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use pin_project_lite::pin_project; +use std::time::Duration; + +pin_project! { + /// Stream returned by the [`timeout_repeating`](super::StreamExt::timeout_repeating) method. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub struct TimeoutRepeating { + #[pin] + stream: Fuse, + #[pin] + deadline: Sleep, + duration: Duration, + } +} + +impl TimeoutRepeating { + pub(super) fn new(stream: S, duration: Duration) -> Self { + let next = Instant::now() + duration; + let deadline = tokio::time::sleep_until(next); + + TimeoutRepeating { + stream: Fuse::new(stream), + deadline, + duration, + } + } +} + +impl Stream for TimeoutRepeating { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut me = self.project(); + + match me.stream.poll_next(cx) { + Poll::Ready(v) => { + if v.is_some() { + let next = Instant::now() + *me.duration; + me.deadline.reset(next); + } + return Poll::Ready(v.map(Ok)); + } + Poll::Pending => {} + }; + + ready!(me.deadline.as_mut().poll(cx)); + let next = Instant::now() + *me.duration; + me.deadline.reset(next); + Poll::Ready(Some(Err(Elapsed::new()))) + } + + fn size_hint(&self) -> (usize, Option) { + let (lower, _) = self.stream.size_hint(); + + // The timeout stream may insert an error an infinite number of times. + (lower, None) + } +} From 9e644ae65901beec9b12babca8d3ae0b63b01cd2 Mon Sep 17 00:00:00 2001 From: J Robert Ray Date: Sun, 16 Apr 2023 13:22:08 -0700 Subject: [PATCH 6/8] stream: use an Interval in TimeoutRepeating Avoid stretching the time between timeouts if the stream is not polled regularly. --- .../src/stream_ext/timeout_repeating.rs | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/tokio-stream/src/stream_ext/timeout_repeating.rs b/tokio-stream/src/stream_ext/timeout_repeating.rs index 509be8eb25a..7a4573f1976 100644 --- a/tokio-stream/src/stream_ext/timeout_repeating.rs +++ b/tokio-stream/src/stream_ext/timeout_repeating.rs @@ -1,8 +1,7 @@ use crate::stream_ext::Fuse; use crate::{Elapsed, Stream}; -use tokio::time::{Instant, Sleep}; +use tokio::time::{self, Interval, MissedTickBehavior}; -use core::future::Future; use core::pin::Pin; use core::task::{Context, Poll}; use pin_project_lite::pin_project; @@ -16,20 +15,20 @@ pin_project! { #[pin] stream: Fuse, #[pin] - deadline: Sleep, - duration: Duration, + interval: Interval, } } impl TimeoutRepeating { pub(super) fn new(stream: S, duration: Duration) -> Self { - let next = Instant::now() + duration; - let deadline = tokio::time::sleep_until(next); + let mut interval = time::interval(duration); + // Use Delay behavior so timeouts are always emitted at least + // `duration` apart. + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); TimeoutRepeating { stream: Fuse::new(stream), - deadline, - duration, + interval, } } } @@ -43,17 +42,14 @@ impl Stream for TimeoutRepeating { match me.stream.poll_next(cx) { Poll::Ready(v) => { if v.is_some() { - let next = Instant::now() + *me.duration; - me.deadline.reset(next); + me.interval.reset(); } return Poll::Ready(v.map(Ok)); } Poll::Pending => {} }; - ready!(me.deadline.as_mut().poll(cx)); - let next = Instant::now() + *me.duration; - me.deadline.reset(next); + ready!(me.interval.poll_tick(cx)); Poll::Ready(Some(Err(Elapsed::new()))) } From be9c683f36a9a08e1026d4c40c1aca0bc687ef90 Mon Sep 17 00:00:00 2001 From: J Robert Ray Date: Sun, 16 Apr 2023 17:21:06 -0700 Subject: [PATCH 7/8] stream: change timeout_repeating to take an Interval To give the caller control over the MissedTickBehavior of the Interval used internally, take an Interval argument instead of a Duration and MissedTickBehavior argument. This simplifies usage for users that do not care about which MissedTickBehavior is used. --- tokio-stream/src/stream_ext.rs | 17 +++++++++-------- .../src/stream_ext/timeout_repeating.rs | 10 ++-------- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs index 9e7a30df8c7..302871b6ec3 100644 --- a/tokio-stream/src/stream_ext.rs +++ b/tokio-stream/src/stream_ext.rs @@ -60,7 +60,7 @@ cfg_time! { pub(crate) mod timeout_repeating; use timeout::Timeout; use timeout_repeating::TimeoutRepeating; - use tokio::time::Duration; + use tokio::time::{Duration, Interval}; mod throttle; use throttle::{throttle, Throttle}; mod chunks_timeout; @@ -997,15 +997,16 @@ pub trait StreamExt: Stream { /// Applies a per-item timeout to the passed stream. /// - /// `timeout_repeating()` takes a `Duration` that represents the maximum amount of - /// time each element of the stream has to complete before timing out. + /// `timeout_repeating()` takes an [`Interval`](tokio::time::Interval) that + /// controls the time each element of the stream has to complete before + /// timing out. /// /// If the wrapped stream yields a value before the deadline is reached, the /// value is returned. Otherwise, an error is returned. The caller may decide /// to continue consuming the stream and will eventually get the next source /// stream value once it becomes available. Unlike `timeout()`, if no value /// becomes available before the deadline is reached, additional errors are - /// returned at the specified interval. See [`timeout`](StreamExt::timeout) + /// returned at the specified interval. See [`timeout`](StreamExt::timeout) /// for an alternative where the timeouts do not repeat. /// /// # Notes @@ -1027,7 +1028,7 @@ pub trait StreamExt: Stream { /// use std::time::Duration; /// # let int_stream = stream::iter(1..=3); /// - /// let int_stream = int_stream.timeout_repeating(Duration::from_secs(1)); + /// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1))); /// tokio::pin!(int_stream); /// /// // When no items time out, we get the 3 elements in succession: @@ -1055,7 +1056,7 @@ pub trait StreamExt: Stream { /// // Timeout errors will be continuously produced at the specified /// // interval until the wrapped stream yields a value. /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23))); - /// let timeout_stream = interval_stream.timeout_repeating(Duration::from_millis(9)); + /// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9))); /// tokio::pin!(timeout_stream); /// /// // Multiple timeouts will be received between values in the source stream. @@ -1068,11 +1069,11 @@ pub trait StreamExt: Stream { /// ``` #[cfg(all(feature = "time"))] #[cfg_attr(docsrs, doc(cfg(feature = "time")))] - fn timeout_repeating(self, duration: Duration) -> TimeoutRepeating + fn timeout_repeating(self, interval: Interval) -> TimeoutRepeating where Self: Sized, { - TimeoutRepeating::new(self, duration) + TimeoutRepeating::new(self, interval) } /// Slows down a stream by enforcing a delay between items. diff --git a/tokio-stream/src/stream_ext/timeout_repeating.rs b/tokio-stream/src/stream_ext/timeout_repeating.rs index 7a4573f1976..253d2fd677e 100644 --- a/tokio-stream/src/stream_ext/timeout_repeating.rs +++ b/tokio-stream/src/stream_ext/timeout_repeating.rs @@ -1,11 +1,10 @@ use crate::stream_ext::Fuse; use crate::{Elapsed, Stream}; -use tokio::time::{self, Interval, MissedTickBehavior}; +use tokio::time::Interval; use core::pin::Pin; use core::task::{Context, Poll}; use pin_project_lite::pin_project; -use std::time::Duration; pin_project! { /// Stream returned by the [`timeout_repeating`](super::StreamExt::timeout_repeating) method. @@ -20,12 +19,7 @@ pin_project! { } impl TimeoutRepeating { - pub(super) fn new(stream: S, duration: Duration) -> Self { - let mut interval = time::interval(duration); - // Use Delay behavior so timeouts are always emitted at least - // `duration` apart. - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - + pub(super) fn new(stream: S, interval: Interval) -> Self { TimeoutRepeating { stream: Fuse::new(stream), interval, From cdd0b47ab1001ce1ea6951d77c95dd1ce264e85e Mon Sep 17 00:00:00 2001 From: J Robert Ray Date: Sun, 23 Apr 2023 17:26:06 -0700 Subject: [PATCH 8/8] stream: split doc tests into multiple examples Split out the repeating vs. non-repeating example as a separate code unit. --- tokio-stream/src/stream_ext.rs | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs index 302871b6ec3..a4ab8a03676 100644 --- a/tokio-stream/src/stream_ext.rs +++ b/tokio-stream/src/stream_ext.rs @@ -943,9 +943,9 @@ pub trait StreamExt: Stream { /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): /// /// ``` - /// # #[tokio::main(flavor = "current_thread", start_paused = true)] + /// # #[tokio::main] /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt, wrappers::IntervalStream}; + /// use tokio_stream::{self as stream, StreamExt}; /// use std::time::Duration; /// # let int_stream = stream::iter(1..=3); /// @@ -973,9 +973,17 @@ pub trait StreamExt: Stream { /// /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// # } + /// ``` /// - /// // Once a timeout error is received, no further events will be received - /// // unless the wrapped stream yields a value (timeouts do not repeat). + /// Once a timeout error is received, no further events will be received + /// unless the wrapped stream yields a value (timeouts do not repeat). + /// + /// ``` + /// # #[tokio::main(flavor = "current_thread", start_paused = true)] + /// # async fn main() { + /// use tokio_stream::{StreamExt, wrappers::IntervalStream}; + /// use std::time::Duration; /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100))); /// let timeout_stream = interval_stream.timeout(Duration::from_millis(10)); /// tokio::pin!(timeout_stream); @@ -1022,9 +1030,9 @@ pub trait StreamExt: Stream { /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3): /// /// ``` - /// # #[tokio::main(flavor = "current_thread", start_paused = true)] + /// # #[tokio::main] /// # async fn main() { - /// use tokio_stream::{self as stream, StreamExt, wrappers::IntervalStream}; + /// use tokio_stream::{self as stream, StreamExt}; /// use std::time::Duration; /// # let int_stream = stream::iter(1..=3); /// @@ -1052,9 +1060,17 @@ pub trait StreamExt: Stream { /// /// assert_eq!(int_stream.try_next().await, Ok(Some(1))); /// assert_eq!(int_stream.try_next().await, Ok(None)); + /// # } + /// ``` /// - /// // Timeout errors will be continuously produced at the specified - /// // interval until the wrapped stream yields a value. + /// Timeout errors will be continuously produced at the specified interval + /// until the wrapped stream yields a value. + /// + /// ``` + /// # #[tokio::main(flavor = "current_thread", start_paused = true)] + /// # async fn main() { + /// use tokio_stream::{StreamExt, wrappers::IntervalStream}; + /// use std::time::Duration; /// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23))); /// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9))); /// tokio::pin!(timeout_stream);