Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamExt::buffer_unordered_adaptable #1956

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions futures-util/src/stream/stream/buffer_unordered_adaptable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use crate::stream::{Fuse, FuturesUnordered, StreamExt};
use futures_core::future::Future;
use futures_core::stream::{Stream, FusedStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use core::fmt;
use core::pin::Pin;
use core::sync::atomic::{AtomicUsize, Ordering};
use alloc::sync::Arc;

/// Stream for the [`buffer_unordered_adaptable`](super::StreamExt::buffer_unordered_adaptable)
/// method.
#[must_use = "streams do nothing unless polled"]
pub struct BufferUnorderedAdaptable<St>
where
St: Stream,
{
stream: Fuse<St>,
in_progress_queue: FuturesUnordered<St::Item>,
max: Arc<AtomicUsize>,
}

impl<St> Unpin for BufferUnorderedAdaptable<St>
where
St: Stream + Unpin,
{}

impl<St> fmt::Debug for BufferUnorderedAdaptable<St>
where
St: Stream + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BufferUnorderedAdaptable")
.field("stream", &self.stream)
.field("in_progress_queue", &self.in_progress_queue)
.field("max", &self.max.load(Ordering::Relaxed))
.finish()
}
}

impl<St> BufferUnorderedAdaptable<St>
where
St: Stream,
St::Item: Future,
{
unsafe_pinned!(stream: Fuse<St>);
unsafe_unpinned!(in_progress_queue: FuturesUnordered<St::Item>);

pub(super) fn new(stream: St, n: Arc<AtomicUsize>) -> BufferUnorderedAdaptable<St>
where
St: Stream,
St::Item: Future,
{
BufferUnorderedAdaptable {
stream: super::Fuse::new(stream),
in_progress_queue: FuturesUnordered::new(),
max: n,
}
}

/// Acquires a reference to the underlying stream that this combinator is
/// pulling from.
pub fn get_ref(&self) -> &St {
self.stream.get_ref()
}

/// Acquires a mutable reference to the underlying stream that this
/// combinator is pulling from.
///
/// Note that care must be taken to avoid tampering with the state of the
/// stream which may otherwise confuse this combinator.
pub fn get_mut(&mut self) -> &mut St {
self.stream.get_mut()
}

/// Acquires a pinned mutable reference to the underlying stream that this
/// combinator is pulling from.
///
/// Note that care must be taken to avoid tampering with the state of the
/// stream which may otherwise confuse this combinator.
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
self.stream().get_pin_mut()
}

/// Consumes this combinator, returning the underlying stream.
///
/// Note that this may discard intermediate state of this combinator, so
/// care should be taken to avoid losing resources when this is called.
pub fn into_inner(self) -> St {
self.stream.into_inner()
}
}

impl<St> Stream for BufferUnorderedAdaptable<St>
where
St: Stream,
St::Item: Future,
{
type Item = <St::Item as Future>::Output;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
// First up, try to spawn off as many futures as possible by filling up
// our queue of futures.
while self.in_progress_queue.len() < self.max.load(Ordering::Relaxed) {
match self.as_mut().stream().poll_next(cx) {
Poll::Ready(Some(fut)) => self.as_mut().in_progress_queue().push(fut),
Poll::Ready(None) | Poll::Pending => break,
}
}

// Attempt to pull the next value from the in_progress_queue
match self.as_mut().in_progress_queue().poll_next_unpin(cx) {
x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
Poll::Ready(None) => {}
}

// If more values are still coming from the stream, we're not done yet
if self.stream.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let queue_len = self.in_progress_queue.len();
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(queue_len);
let upper = match upper {
Some(x) => x.checked_add(queue_len),
None => None,
};
(lower, upper)
}
}

impl<St> FusedStream for BufferUnorderedAdaptable<St>
where
St: Stream,
St::Item: Future,
{
fn is_terminated(&self) -> bool {
self.in_progress_queue.is_terminated() && self.stream.is_terminated()
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for BufferUnorderedAdaptable<S>
where
S: Stream + Sink<Item>,
S::Item: Future,
{
type Error = S::Error;

delegate_sink!(stream, Item);
}
62 changes: 62 additions & 0 deletions futures-util/src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,16 @@ cfg_target_has_atomic! {
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::buffer_unordered::BufferUnordered;

#[cfg(feature = "alloc")]
mod buffer_unordered_adaptable;
#[cfg(feature = "alloc")]
use core::sync::atomic::AtomicUsize;
#[cfg(feature = "alloc")]
use alloc::sync::Arc;
#[cfg(feature = "alloc")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::buffer_unordered_adaptable::BufferUnorderedAdaptable;

#[cfg(feature = "alloc")]
mod buffered;
#[cfg(feature = "alloc")]
Expand Down Expand Up @@ -1003,6 +1013,58 @@ pub trait StreamExt: Stream {
BufferUnordered::new(self, n)
}

/// An adaptor for creating a dynamically adaptable buffered list of pending
/// futures (unordered).
///
/// If this stream's item can be converted into a future, then this adaptor
/// will buffer up to `n` futures and then return the outputs in the order
/// in which they complete. No more than `n` futures will be buffered at
/// any point in time, and less than `n` may also be buffered depending on
/// the state of each future. `n` can be mutated to alter buffering
/// behavior.
///
/// The returned stream will be a stream of each future's output.
///
/// This method is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::channel::oneshot;
/// use futures::stream::{self, StreamExt};
/// use core::sync::atomic::{AtomicUsize, Ordering};
/// use std::sync::Arc;
///
/// let (send_one, recv_one) = oneshot::channel();
/// let (send_two, recv_two) = oneshot::channel();
///
/// let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
/// let atomic_n = Arc::new(AtomicUsize::new(10));
/// let mut buffered = stream_of_futures.buffer_unordered_adaptable(atomic_n.clone());
///
/// send_two.send(2i32)?;
/// assert_eq!(buffered.next().await, Some(Ok(2i32)));
///
/// atomic_n.store(20, Ordering::Relaxed);
///
/// send_one.send(1i32)?;
/// assert_eq!(buffered.next().await, Some(Ok(1i32)));
///
/// assert_eq!(buffered.next().await, None);
/// # Ok::<(), i32>(()) }).unwrap();
/// ```
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
fn buffer_unordered_adaptable(self, n: Arc<AtomicUsize>) -> BufferUnorderedAdaptable<Self>
where
Self::Item: Future,
Self: Sized,
{
BufferUnorderedAdaptable::new(self, n)
}

/// An adapter for zipping two streams together.
///
/// The zipped stream waits for both streams to produce an item, and then
Expand Down