Skip to content

Commit

Permalink
stream: add StreamExt::peekable (#6095)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma authored Nov 1, 2023
1 parent 4c85801 commit 65f861f
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 0 deletions.
28 changes: 28 additions & 0 deletions tokio-stream/src/stream_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ use then::Then;
mod try_next;
use try_next::TryNext;

mod peekable;
use peekable::Peekable;

cfg_time! {
pub(crate) mod timeout;
pub(crate) mod timeout_repeating;
Expand Down Expand Up @@ -1176,6 +1179,31 @@ pub trait StreamExt: Stream {
assert!(max_size > 0, "`max_size` must be non-zero.");
ChunksTimeout::new(self, max_size, duration)
}

/// Turns the stream into a peekable stream, whose first element can be peeked at without being
/// consumed.
/// ```rust
/// use tokio_stream::{self as stream, StreamExt};
///
/// #[tokio::main]
/// # async fn _unused() {}
/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
/// async fn main() {
/// let iter = vec![1, 2, 3, 4].into_iter();
/// let mut stream = stream::iter(iter).peekable();
///
/// assert_eq!(*stream.peek().await.unwrap(), 1);
/// assert_eq!(*stream.peek().await.unwrap(), 1);
/// assert_eq!(stream.next().await.unwrap(), 1);
/// assert_eq!(*stream.peek().await.unwrap(), 2);
/// }
/// ```
fn peekable(self) -> Peekable<Self>
where
Self: Sized,
{
Peekable::new(self)
}
}

impl<St: ?Sized> StreamExt for St where St: Stream {}
Expand Down
50 changes: 50 additions & 0 deletions tokio-stream/src/stream_ext/peekable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::pin::Pin;
use std::task::{Context, Poll};

use futures_core::Stream;
use pin_project_lite::pin_project;

use crate::stream_ext::Fuse;
use crate::StreamExt;

pin_project! {
/// Stream returned by the [`chain`](super::StreamExt::peekable) method.
pub struct Peekable<T: Stream> {
peek: Option<T::Item>,
#[pin]
stream: Fuse<T>,
}
}

impl<T: Stream> Peekable<T> {
pub(crate) fn new(stream: T) -> Self {
let stream = stream.fuse();
Self { peek: None, stream }
}

/// Peek at the next item in the stream.
pub async fn peek(&mut self) -> Option<&T::Item>
where
T: Unpin,
{
if let Some(ref it) = self.peek {
Some(it)
} else {
self.peek = self.next().await;
self.peek.as_ref()
}
}
}

impl<T: Stream> Stream for Peekable<T> {
type Item = T::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if let Some(it) = this.peek.take() {
Poll::Ready(Some(it))
} else {
this.stream.poll_next(cx)
}
}
}

0 comments on commit 65f861f

Please sign in to comment.