diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs index 6fe4472d6eb..3abf645b1c5 100644 --- a/core/src/raw/futures_util.rs +++ b/core/src/raw/futures_util.rs @@ -171,15 +171,20 @@ impl ConcurrentTasks { loop { // Try poll once to see if there is any ready task. - if let Some(mut task) = self.tasks.pop_front() { - if let Poll::Ready((i, o)) = poll!(&mut task) { + if let Some(task) = self.tasks.front_mut() { + if let Poll::Ready((i, o)) = poll!(task) { match o { - Ok(o) => self.results.push_back(o), + Ok(o) => { + let _ = self.tasks.pop_front(); + self.results.push_back(o) + } Err(err) => { // Retry this task if the error is temporary if err.is_temporary() { self.tasks - .push_front(self.executor.execute((self.factory)(i))); + .front_mut() + .expect("tasks must have at least one task") + .replace(self.executor.execute((self.factory)(i))); } else { self.clear(); self.errored = true; @@ -187,9 +192,6 @@ impl ConcurrentTasks { return Err(err); } } - } else { - // task is not ready, push it back. - self.tasks.push_front(task) } } @@ -203,11 +205,12 @@ impl ConcurrentTasks { // Wait for the next task to be ready. let task = self .tasks - .pop_front() + .front_mut() .expect("tasks must have at least one task"); let (i, o) = task.await; match o { Ok(o) => { + let _ = self.tasks.pop_front(); self.results.push_back(o); continue; } @@ -215,7 +218,9 @@ impl ConcurrentTasks { // Retry this task if the error is temporary if err.is_temporary() { self.tasks - .push_front(self.executor.execute((self.factory)(i))); + .front_mut() + .expect("tasks must have at least one task") + .replace(self.executor.execute((self.factory)(i))); } else { self.clear(); self.errored = true; @@ -239,15 +244,20 @@ impl ConcurrentTasks { return Some(Ok(result)); } - if let Some(task) = self.tasks.pop_front() { + if let Some(task) = self.tasks.front_mut() { let (i, o) = task.await; return match o { - Ok(o) => Some(Ok(o)), + Ok(o) => { + let _ = self.tasks.pop_front(); + Some(Ok(o)) + } Err(err) => { // Retry this task if the error is temporary if err.is_temporary() { self.tasks - .push_front(self.executor.execute((self.factory)(i))); + .front_mut() + .expect("tasks must have at least one task") + .replace(self.executor.execute((self.factory)(i))); } else { self.clear(); self.errored = true; diff --git a/core/src/raw/oio/write/multipart_write.rs b/core/src/raw/oio/write/multipart_write.rs index 03549e80e0a..0d893d7cb32 100644 --- a/core/src/raw/oio/write/multipart_write.rs +++ b/core/src/raw/oio/write/multipart_write.rs @@ -276,6 +276,15 @@ where self.parts.push(result) } + if self.parts.len() != self.next_part_number { + return Err(Error::new( + ErrorKind::Unexpected, + "multipart part numbers mismatch, please report bug to opendal", + ) + .with_context("expected", self.next_part_number) + .with_context("actual", self.parts.len()) + .with_context("upload_id", upload_id)); + } self.w.complete_part(&upload_id, &self.parts).await } @@ -300,7 +309,7 @@ mod tests { use rand::Rng; use rand::RngCore; use tokio::sync::Mutex; - use tokio::time::sleep; + use tokio::time::{sleep, timeout}; use super::*; use crate::raw::oio::Write; @@ -347,7 +356,7 @@ mod tests { } // Add an async sleep here to enforce some pending. - sleep(Duration::from_millis(50)).await; + sleep(Duration::from_nanos(50)).await; // We will have 10% percent rate for write part to fail. if thread_rng().gen_bool(1.0 / 10.0) { @@ -385,11 +394,38 @@ mod tests { } } + struct TimeoutExecutor { + exec: Arc, + } + + impl TimeoutExecutor { + pub fn new() -> Self { + Self { + exec: Executor::new().into_inner(), + } + } + } + + impl Execute for TimeoutExecutor { + fn execute(&self, f: BoxedStaticFuture<()>) { + self.exec.execute(f) + } + + fn timeout(&self) -> Option> { + let time = thread_rng().gen_range(0..100); + Some(Box::pin(tokio::time::sleep(Duration::from_nanos(time)))) + } + } + #[tokio::test] async fn test_multipart_upload_writer_with_concurrent_errors() { let mut rng = thread_rng(); - let mut w = MultipartWriter::new(TestWrite::new(), Some(Executor::new()), 200); + let mut w = MultipartWriter::new( + TestWrite::new(), + Some(Executor::with(TimeoutExecutor::new())), + 200, + ); let mut total_size = 0u64; for _ in 0..1000 { @@ -400,17 +436,23 @@ mod tests { rng.fill_bytes(&mut bs); loop { - match w.write(bs.clone().into()).await { - Ok(_) => break, - Err(_) => continue, + match timeout(Duration::from_nanos(10), w.write(bs.clone().into())).await { + Ok(Ok(_)) => break, + Ok(Err(_)) => continue, + Err(_) => { + continue; + } } } } loop { - match w.close().await { - Ok(_) => break, - Err(_) => continue, + match timeout(Duration::from_nanos(10), w.close()).await { + Ok(Ok(_)) => break, + Ok(Err(_)) => continue, + Err(_) => { + continue; + } } } diff --git a/core/src/types/execute/api.rs b/core/src/types/execute/api.rs index 2317434cb8d..53da1a749d7 100644 --- a/core/src/types/execute/api.rs +++ b/core/src/types/execute/api.rs @@ -91,6 +91,14 @@ impl Task { pub fn new(handle: RemoteHandle) -> Self { Self { handle } } + + /// Replace the task with a new task. + /// + /// The old task will be dropped directly. + #[inline] + pub fn replace(&mut self, new_task: Self) { + self.handle = new_task.handle; + } } impl Future for Task {