Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Polish concurrent list #3658

Merged
merged 1 commit into from
Nov 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 70 additions & 31 deletions core/src/types/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,58 @@ pub struct Lister {
/// required_metakey is the metakey required by users.
required_metakey: FlagSet<Metakey>,

/// task_queue is used to store tasks that are run in concurrent.
task_queue: VecDeque<StatTask>,
/// tasks is used to store tasks that are run in concurrent.
tasks: VecDeque<StatTask>,
errored: bool,
}

/// StatTask is used to store the task that is run in concurrent.
///
/// # Note for clippy
///
/// Clippy will raise error for this enum like the following:
///
/// ```shell
/// error: large size difference between variants
/// --> core/src/types/list.rs:64:1
/// |
/// 64 | / enum StatTask {
/// 65 | | /// Handle is used to store the join handle of spawned task.
/// 66 | | Handle(JoinHandle<(String, Result<RpStat>)>),
/// | | -------------------------------------------- the second-largest variant contains at least 0 bytes
/// 67 | | /// KnownEntry is used to store the entry that already contains the required metakey.
/// 68 | | KnownEntry(Option<Entry>),
/// | | ------------------------- the largest variant contains at least 264 bytes
/// 69 | | }
/// | |_^ the entire enum is at least 0 bytes
/// |
/// = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#large_enum_variant
/// = note: `-D clippy::large-enum-variant` implied by `-D warnings`
/// = help: to override `-D warnings` add `#[allow(clippy::large_enum_variant)]`
/// help: consider boxing the large fields to reduce the total size of the enum
/// |
/// 68 | KnownEntry(Box<Option<Entry>>),
/// | ~~~~~~~~~~~~~~~~~~
/// ```
/// But this lint is wrong since it doesn't take the generic param JoinHandle into account. In fact, they have exactly
/// the same size:
///
/// ```rust
/// use std::mem::size_of;
/// use opendal::Result;
/// use opendal::Entry;
///
/// assert_eq!(264, size_of::<(String, Result<opendal::raw::RpStat>)>());
/// assert_eq!(264, size_of::<Option<Entry>>());
/// ```
///
/// So let's ignore this lint:
#[allow(clippy::large_enum_variant)]
enum StatTask {
/// Handle is used to store the join handle of spawned task.
Handle(JoinHandle<(String, Result<RpStat>)>),
/// KnownEntry is used to store the entry that already contains the required metakey.
KnownEntry(Box<Option<(String, Metadata)>>),
/// Stating is used to store the join handle of spawned task.
Stating(JoinHandle<(String, Result<RpStat>)>),
/// Known is used to store the entry that already contains the required metakey.
Known(Option<Entry>),
}

/// # Safety
Expand All @@ -74,7 +116,7 @@ impl Lister {
lister: Some(lister),
required_metakey,

task_queue: VecDeque::with_capacity(concurrent),
tasks: VecDeque::with_capacity(concurrent),
errored: false,
})
}
Expand All @@ -89,31 +131,23 @@ impl Stream for Lister {
return Poll::Ready(None);
}

let task_queue_len = self.task_queue.len();
let task_queue_cap = self.task_queue.capacity();

if let Some(lister) = self.lister.as_mut() {
if task_queue_len < task_queue_cap {
// Trying to pull more tasks if there are more space.
if self.tasks.len() < self.tasks.capacity() {
if let Some(lister) = self.lister.as_mut() {
match lister.poll_next(cx) {
Poll::Pending => {
if task_queue_len == 0 {
return Poll::Pending;
}
}
Poll::Pending => {}
Poll::Ready(Ok(Some(oe))) => {
let (path, metadata) = oe.into_entry().into_parts();
// TODO: we can optimize this by checking the provided metakey provided by services.
if metadata.contains_metakey(self.required_metakey) {
self.task_queue
.push_back(StatTask::KnownEntry(Box::new(Some((path, metadata)))));
self.tasks
.push_back(StatTask::Known(Some(Entry::new(path, metadata))));
} else {
let acc = self.acc.clone();
let fut = async move {
let res = acc.stat(&path, OpStat::default()).await;
(path, res)
};
self.task_queue
.push_back(StatTask::Handle(tokio::spawn(fut)));
self.tasks.push_back(StatTask::Stating(tokio::spawn(fut)));
}
}
Poll::Ready(Ok(None)) => {
Expand All @@ -127,14 +161,16 @@ impl Stream for Lister {
}
}

if let Some(handle) = self.task_queue.front_mut() {
if let Some(handle) = self.tasks.front_mut() {
return match handle {
StatTask::Handle(handle) => {
StatTask::Stating(handle) => {
let (path, rp) = ready!(handle.poll_unpin(cx)).map_err(new_task_join_error)?;

// Make sure this task has been popped after it's ready.
self.tasks.pop_front();

match rp {
Ok(rp) => {
self.task_queue.pop_front();
let metadata = rp.into_metadata();
Poll::Ready(Some(Ok(Entry::new(path, metadata))))
}
Expand All @@ -144,15 +180,19 @@ impl Stream for Lister {
}
}
}
StatTask::KnownEntry(entry) => {
let (path, metadata) = entry.take().expect("entry must be valid");
self.task_queue.pop_front();
Poll::Ready(Some(Ok(Entry::new(path, metadata))))
StatTask::Known(entry) => {
let entry = entry.take().expect("entry must be valid");
self.tasks.pop_front();
Poll::Ready(Some(Ok(entry)))
}
};
}

Poll::Ready(None)
if self.lister.is_none() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}

Expand Down Expand Up @@ -213,7 +253,6 @@ impl Iterator for BlockingLister {
};

let (path, metadata) = entry.into_entry().into_parts();
// TODO: we can optimize this by checking the provided metakey provided by services.
if metadata.contains_metakey(self.required_metakey) {
return Some(Ok(Entry::new(path, metadata)));
}
Expand Down
Loading