From 49b76242a16bb24b203b91d953186a0eea5ee4da Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 23 Nov 2023 18:16:04 +0800 Subject: [PATCH] refactor: Polish concurrent list Signed-off-by: Xuanwo --- core/src/types/list.rs | 101 ++++++++++++++++++++++++++++------------- 1 file changed, 70 insertions(+), 31 deletions(-) diff --git a/core/src/types/list.rs b/core/src/types/list.rs index 980f0db0e74..7a1699db959 100644 --- a/core/src/types/list.rs +++ b/core/src/types/list.rs @@ -44,16 +44,58 @@ pub struct Lister { /// required_metakey is the metakey required by users. required_metakey: FlagSet, - /// task_queue is used to store tasks that are run in concurrent. - task_queue: VecDeque, + /// tasks is used to store tasks that are run in concurrent. + tasks: VecDeque, errored: bool, } +/// StatTask is used to store the task that is run in concurrent. +/// +/// # Note for clippy +/// +/// Clippy will raise error for this enum like the following: +/// +/// ```shell +/// error: large size difference between variants +/// --> core/src/types/list.rs:64:1 +/// | +/// 64 | / enum StatTask { +/// 65 | | /// Handle is used to store the join handle of spawned task. +/// 66 | | Handle(JoinHandle<(String, Result)>), +/// | | -------------------------------------------- the second-largest variant contains at least 0 bytes +/// 67 | | /// KnownEntry is used to store the entry that already contains the required metakey. +/// 68 | | KnownEntry(Option), +/// | | ------------------------- the largest variant contains at least 264 bytes +/// 69 | | } +/// | |_^ the entire enum is at least 0 bytes +/// | +/// = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#large_enum_variant +/// = note: `-D clippy::large-enum-variant` implied by `-D warnings` +/// = help: to override `-D warnings` add `#[allow(clippy::large_enum_variant)]` +/// help: consider boxing the large fields to reduce the total size of the enum +/// | +/// 68 | KnownEntry(Box>), +/// | ~~~~~~~~~~~~~~~~~~ +/// ``` +/// But this lint is wrong since it doesn't take the generic param JoinHandle into account. In fact, they have exactly +/// the same size: +/// +/// ```rust +/// use std::mem::size_of; +/// use opendal::Result; +/// use opendal::Entry; +/// +/// assert_eq!(264, size_of::<(String, Result)>()); +/// assert_eq!(264, size_of::>()); +/// ``` +/// +/// So let's ignore this lint: +#[allow(clippy::large_enum_variant)] enum StatTask { - /// Handle is used to store the join handle of spawned task. - Handle(JoinHandle<(String, Result)>), - /// KnownEntry is used to store the entry that already contains the required metakey. - KnownEntry(Box>), + /// Stating is used to store the join handle of spawned task. + Stating(JoinHandle<(String, Result)>), + /// Known is used to store the entry that already contains the required metakey. + Known(Option), } /// # Safety @@ -74,7 +116,7 @@ impl Lister { lister: Some(lister), required_metakey, - task_queue: VecDeque::with_capacity(concurrent), + tasks: VecDeque::with_capacity(concurrent), errored: false, }) } @@ -89,31 +131,23 @@ impl Stream for Lister { return Poll::Ready(None); } - let task_queue_len = self.task_queue.len(); - let task_queue_cap = self.task_queue.capacity(); - - if let Some(lister) = self.lister.as_mut() { - if task_queue_len < task_queue_cap { + // Trying to pull more tasks if there are more space. + if self.tasks.len() < self.tasks.capacity() { + if let Some(lister) = self.lister.as_mut() { match lister.poll_next(cx) { - Poll::Pending => { - if task_queue_len == 0 { - return Poll::Pending; - } - } + Poll::Pending => {} Poll::Ready(Ok(Some(oe))) => { let (path, metadata) = oe.into_entry().into_parts(); - // TODO: we can optimize this by checking the provided metakey provided by services. if metadata.contains_metakey(self.required_metakey) { - self.task_queue - .push_back(StatTask::KnownEntry(Box::new(Some((path, metadata))))); + self.tasks + .push_back(StatTask::Known(Some(Entry::new(path, metadata)))); } else { let acc = self.acc.clone(); let fut = async move { let res = acc.stat(&path, OpStat::default()).await; (path, res) }; - self.task_queue - .push_back(StatTask::Handle(tokio::spawn(fut))); + self.tasks.push_back(StatTask::Stating(tokio::spawn(fut))); } } Poll::Ready(Ok(None)) => { @@ -127,14 +161,16 @@ impl Stream for Lister { } } - if let Some(handle) = self.task_queue.front_mut() { + if let Some(handle) = self.tasks.front_mut() { return match handle { - StatTask::Handle(handle) => { + StatTask::Stating(handle) => { let (path, rp) = ready!(handle.poll_unpin(cx)).map_err(new_task_join_error)?; + // Make sure this task has been popped after it's ready. + self.tasks.pop_front(); + match rp { Ok(rp) => { - self.task_queue.pop_front(); let metadata = rp.into_metadata(); Poll::Ready(Some(Ok(Entry::new(path, metadata)))) } @@ -144,15 +180,19 @@ impl Stream for Lister { } } } - StatTask::KnownEntry(entry) => { - let (path, metadata) = entry.take().expect("entry must be valid"); - self.task_queue.pop_front(); - Poll::Ready(Some(Ok(Entry::new(path, metadata)))) + StatTask::Known(entry) => { + let entry = entry.take().expect("entry must be valid"); + self.tasks.pop_front(); + Poll::Ready(Some(Ok(entry))) } }; } - Poll::Ready(None) + if self.lister.is_none() { + Poll::Ready(None) + } else { + Poll::Pending + } } } @@ -213,7 +253,6 @@ impl Iterator for BlockingLister { }; let (path, metadata) = entry.into_entry().into_parts(); - // TODO: we can optimize this by checking the provided metakey provided by services. if metadata.contains_metakey(self.required_metakey) { return Some(Ok(Entry::new(path, metadata))); }