diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs new file mode 100644 index 00000000000..46bab650c45 --- /dev/null +++ b/core/src/raw/futures_util.rs @@ -0,0 +1,243 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use futures::stream::FuturesOrdered; +use futures::{FutureExt, StreamExt}; +use std::collections::VecDeque; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// CONCURRENT_LARGE_THRESHOLD is the threshold to determine whether to use +/// [`FuturesOrdered`] or not. +/// +/// The value of `8` is picked by random, no strict benchmark is done. +/// Please raise an issue if you found the value is not good enough or you want to configure +/// this value at runtime. +const CONCURRENT_LARGE_THRESHOLD: usize = 8; + +/// ConcurrentFutures is a stream that can hold a stream of concurrent futures. +/// +/// - the order of the futures is the same. +/// - the number of concurrent futures is limited by concurrent. +/// - optimized for small number of concurrent futures. +/// - zero cost for non-concurrent futures cases (concurrent == 1). +pub struct ConcurrentFutures { + tasks: Tasks, + concurrent: usize, +} + +/// Tasks is used to hold the entire task queue. +enum Tasks { + /// The special case for concurrent == 1. + /// + /// It works exactly the same like `Option` in a struct. + Once(Option), + /// The special cases for concurrent is small. + /// + /// At this case, the cost to loop poll is lower than using `FuturesOrdered`. + /// + /// We will replace the future by `TaskResult::Ready` once it's ready to avoid consume it again. + Small(VecDeque>), + /// The general cases for large concurrent. + /// + /// We use `FuturesOrdered` to avoid huge amount of poll on futures. + Large(FuturesOrdered), +} + +impl Unpin for Tasks {} + +enum TaskResult { + Polling(F), + Ready(F::Output), +} + +impl ConcurrentFutures +where + F: Future + Unpin + 'static, +{ + /// Create a new ConcurrentFutures by specifying the number of concurrent futures. + pub fn new(concurrent: usize) -> Self { + if (0..2).contains(&concurrent) { + Self { + tasks: Tasks::Once(None), + concurrent, + } + } else if (2..=CONCURRENT_LARGE_THRESHOLD).contains(&concurrent) { + Self { + tasks: Tasks::Small(VecDeque::with_capacity(concurrent)), + concurrent, + } + } else { + Self { + tasks: Tasks::Large(FuturesOrdered::new()), + concurrent, + } + } + } + + /// Return the length of current concurrent futures (both ongoing and ready). + pub fn len(&self) -> usize { + match &self.tasks { + Tasks::Once(fut) => fut.is_some() as usize, + Tasks::Small(v) => v.len(), + Tasks::Large(v) => v.len(), + } + } + + /// Return true if there is no futures in the queue. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Return the number of remaining space to push new futures. + pub fn remaining(&self) -> usize { + self.concurrent - self.len() + } + + /// Return true if there is remaining space to push new futures. + pub fn has_remaining(&self) -> bool { + self.remaining() > 0 + } + + /// Push new future into the queue. + pub fn push(&mut self, f: F) { + debug_assert!( + self.has_remaining(), + "concurrent futures must have remaining space" + ); + + match &mut self.tasks { + Tasks::Once(fut) => { + *fut = Some(f); + } + Tasks::Small(v) => v.push_back(TaskResult::Polling(f)), + Tasks::Large(v) => v.push_back(f), + } + } +} + +impl futures::Stream for ConcurrentFutures +where + F: Future + Unpin + 'static, +{ + type Item = F::Output; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match &mut self.get_mut().tasks { + Tasks::Once(fut) => match fut { + Some(x) => x.poll_unpin(cx).map(|v| { + *fut = None; + Some(v) + }), + None => Poll::Ready(None), + }, + Tasks::Small(v) => { + // Poll all tasks together. + for task in v.iter_mut() { + if let TaskResult::Polling(f) = task { + match f.poll_unpin(cx) { + Poll::Pending => {} + Poll::Ready(res) => { + // Replace with ready value if this future has been resolved. + *task = TaskResult::Ready(res); + } + } + } + } + + // Pick the first one to check. + match v.front_mut() { + // Return pending if the first one is still polling. + Some(TaskResult::Polling(_)) => Poll::Pending, + Some(TaskResult::Ready(_)) => { + let res = v.pop_front().unwrap(); + match res { + TaskResult::Polling(_) => unreachable!(), + TaskResult::Ready(res) => Poll::Ready(Some(res)), + } + } + None => Poll::Ready(None), + } + } + Tasks::Large(v) => v.poll_next_unpin(cx), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::future::BoxFuture; + use futures::Stream; + use rand::Rng; + use std::task::ready; + use std::time::Duration; + + struct Lister { + size: usize, + idx: usize, + concurrent: usize, + tasks: ConcurrentFutures>, + } + + impl Stream for Lister { + type Item = usize; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Randomly sleep for a while, simulate some io operations that up to 100 microseconds. + let timeout = Duration::from_micros(rand::thread_rng().gen_range(0..100)); + let idx = self.idx; + if self.tasks.len() < self.concurrent && self.idx < self.size { + let fut = async move { + tokio::time::sleep(timeout).await; + idx + }; + self.idx += 1; + self.tasks.push(Box::pin(fut)); + } + + if let Some(v) = ready!(self.tasks.poll_next_unpin(cx)) { + Poll::Ready(Some(v)) + } else { + Poll::Ready(None) + } + } + } + + #[tokio::test] + async fn test_concurrent_futures() { + let cases = vec![ + ("once", 1), + ("small", CONCURRENT_LARGE_THRESHOLD - 1), + ("large", CONCURRENT_LARGE_THRESHOLD + 1), + ]; + + for (name, concurrent) in cases { + let lister = Lister { + size: 1000, + idx: 0, + concurrent, + tasks: ConcurrentFutures::new(concurrent), + }; + let expected: Vec = (0..1000).collect(); + let result: Vec = lister.collect().await; + + assert_eq!(expected, result, "concurrent futures failed: {}", name); + } + } +} diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs index 3150791ec8b..7ce67834c7e 100644 --- a/core/src/raw/mod.rs +++ b/core/src/raw/mod.rs @@ -62,6 +62,9 @@ pub use tokio_util::*; mod std_io_util; pub use std_io_util::*; +mod futures_util; +pub use futures_util::ConcurrentFutures; + // Expose as a pub mod to avoid confusing. pub mod adapters; pub mod oio; diff --git a/core/src/types/list.rs b/core/src/types/list.rs index d9140c9dad6..5f1c3f60e09 100644 --- a/core/src/types/list.rs +++ b/core/src/types/list.rs @@ -16,16 +16,16 @@ // under the License. use std::cmp; -use std::collections::VecDeque; +use std::future::Future; use std::pin::Pin; use std::task::ready; use std::task::Context; use std::task::Poll; use flagset::FlagSet; -use futures::FutureExt; +use futures::future::BoxFuture; use futures::Stream; -use tokio::task::JoinHandle; +use futures::StreamExt; use crate::raw::oio::List; use crate::raw::*; @@ -47,7 +47,7 @@ pub struct Lister { required_metakey: FlagSet, /// tasks is used to store tasks that are run in concurrent. - tasks: VecDeque, + tasks: ConcurrentFutures, errored: bool, } @@ -62,8 +62,8 @@ pub struct Lister { /// --> core/src/types/list.rs:64:1 /// | /// 64 | / enum StatTask { -/// 65 | | /// Handle is used to store the join handle of spawned task. -/// 66 | | Handle(JoinHandle<(String, Result)>), +/// 65 | | /// BoxFuture is used to store the join handle of spawned task. +/// 66 | | Handle(BoxFuture<(String, Result)>), /// | | -------------------------------------------- the second-largest variant contains at least 0 bytes /// 67 | | /// KnownEntry is used to store the entry that already contains the required metakey. /// 68 | | KnownEntry(Option), @@ -96,9 +96,25 @@ pub struct Lister { #[allow(clippy::large_enum_variant)] enum StatTask { /// Stating is used to store the join handle of spawned task. - Stating(JoinHandle<(String, Result)>), + /// + /// TODO: Replace with static future type after rust supported. + Stating(BoxFuture<'static, (String, Result)>), /// Known is used to store the entry that already contains the required metakey. - Known(Option), + Known(Option<(String, Metadata)>), +} + +impl Future for StatTask { + type Output = (String, Result); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.get_mut() { + StatTask::Stating(fut) => Pin::new(fut).poll(cx), + StatTask::Known(entry) => { + let (path, metadata) = entry.take().expect("entry should not be None"); + Poll::Ready((path, Ok(metadata))) + } + } + } } /// # Safety @@ -119,7 +135,7 @@ impl Lister { lister: Some(lister), required_metakey, - tasks: VecDeque::with_capacity(concurrent), + tasks: ConcurrentFutures::new(concurrent), errored: false, }) } @@ -135,22 +151,21 @@ impl Stream for Lister { } // Trying to pull more tasks if there are more space. - if self.tasks.len() < self.tasks.capacity() { + if self.tasks.has_remaining() { if let Some(lister) = self.lister.as_mut() { match lister.poll_next(cx) { Poll::Pending => {} Poll::Ready(Ok(Some(oe))) => { let (path, metadata) = oe.into_entry().into_parts(); if metadata.contains_metakey(self.required_metakey) { - self.tasks - .push_back(StatTask::Known(Some(Entry::new(path, metadata)))); + self.tasks.push(StatTask::Known(Some((path, metadata)))); } else { let acc = self.acc.clone(); let fut = async move { let res = acc.stat(&path, OpStat::default()).await; - (path, res) + (path, res.map(|rp| rp.into_metadata())) }; - self.tasks.push_back(StatTask::Stating(tokio::spawn(fut))); + self.tasks.push(StatTask::Stating(Box::pin(fut))); } } Poll::Ready(Ok(None)) => { @@ -164,37 +179,16 @@ impl Stream for Lister { } } - if let Some(handle) = self.tasks.front_mut() { - return match handle { - StatTask::Stating(handle) => { - let (path, rp) = ready!(handle.poll_unpin(cx)).map_err(new_task_join_error)?; - - // Make sure this task has been popped after it's ready. - self.tasks.pop_front(); - - match rp { - Ok(rp) => { - let metadata = rp.into_metadata(); - Poll::Ready(Some(Ok(Entry::new(path, metadata)))) - } - Err(err) => { - self.errored = true; - Poll::Ready(Some(Err(err))) - } - } - } - StatTask::Known(entry) => { - let entry = entry.take().expect("entry must be valid"); - self.tasks.pop_front(); - Poll::Ready(Some(Ok(entry))) - } - }; + // Try to poll tasks + if let Some((path, rp)) = ready!(self.tasks.poll_next_unpin(cx)) { + let metadata = rp?; + return Poll::Ready(Some(Ok(Entry::new(path, metadata)))); } - if self.lister.is_none() { - Poll::Ready(None) - } else { + if self.lister.is_some() { Poll::Pending + } else { + Poll::Ready(None) } } } diff --git a/core/tests/behavior/list.rs b/core/tests/behavior/list.rs index d9da4909030..c29f0e2dc13 100644 --- a/core/tests/behavior/list.rs +++ b/core/tests/behavior/list.rs @@ -216,6 +216,23 @@ pub async fn test_list_rich_dir(op: Operator) -> Result<()> { assert_eq!(actual, expected); + // List concurrently. + let mut objects = op + .with_limit(10) + .lister_with("test_list_rich_dir/") + .concurrent(5) + .metakey(Metakey::Complete) + .await?; + let mut actual = vec![]; + while let Some(o) = objects.try_next().await? { + let path = o.path().to_string(); + actual.push(path) + } + expected.sort_unstable(); + actual.sort_unstable(); + + assert_eq!(actual, expected); + op.remove_all("test_list_rich_dir/").await?; Ok(()) }