Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core/oio): Make ConcurrentTasks cancel safe by only pop after ready #4707

Merged
merged 2 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions core/src/raw/futures_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,25 +171,27 @@ impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {

loop {
// Try poll once to see if there is any ready task.
if let Some(mut task) = self.tasks.pop_front() {
if let Poll::Ready((i, o)) = poll!(&mut task) {
if let Some(task) = self.tasks.front_mut() {
if let Poll::Ready((i, o)) = poll!(task) {
match o {
Ok(o) => self.results.push_back(o),
Ok(o) => {
let _ = self.tasks.pop_front();
self.results.push_back(o)
}
Err(err) => {
// Retry this task if the error is temporary
if err.is_temporary() {
self.tasks
.push_front(self.executor.execute((self.factory)(i)));
.front_mut()
.expect("tasks must have at least one task")
.replace(self.executor.execute((self.factory)(i)));
} else {
self.clear();
self.errored = true;
}
return Err(err);
}
}
} else {
// task is not ready, push it back.
self.tasks.push_front(task)
}
}

Expand All @@ -203,19 +205,22 @@ impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {
// Wait for the next task to be ready.
let task = self
.tasks
.pop_front()
.front_mut()
.expect("tasks must have at least one task");
let (i, o) = task.await;
match o {
Ok(o) => {
let _ = self.tasks.pop_front();
self.results.push_back(o);
continue;
}
Err(err) => {
// Retry this task if the error is temporary
if err.is_temporary() {
self.tasks
.push_front(self.executor.execute((self.factory)(i)));
.front_mut()
.expect("tasks must have at least one task")
.replace(self.executor.execute((self.factory)(i)));
} else {
self.clear();
self.errored = true;
Expand All @@ -239,15 +244,20 @@ impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {
return Some(Ok(result));
}

if let Some(task) = self.tasks.pop_front() {
if let Some(task) = self.tasks.front_mut() {
let (i, o) = task.await;
return match o {
Ok(o) => Some(Ok(o)),
Ok(o) => {
let _ = self.tasks.pop_front();
Some(Ok(o))
}
Err(err) => {
// Retry this task if the error is temporary
if err.is_temporary() {
self.tasks
.push_front(self.executor.execute((self.factory)(i)));
.front_mut()
.expect("tasks must have at least one task")
.replace(self.executor.execute((self.factory)(i)));
} else {
self.clear();
self.errored = true;
Expand Down
60 changes: 51 additions & 9 deletions core/src/raw/oio/write/multipart_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,15 @@ where
self.parts.push(result)
}

if self.parts.len() != self.next_part_number {
return Err(Error::new(
ErrorKind::Unexpected,
"multipart part numbers mismatch, please report bug to opendal",
)
.with_context("expected", self.next_part_number)
.with_context("actual", self.parts.len())
.with_context("upload_id", upload_id));
}
self.w.complete_part(&upload_id, &self.parts).await
}

Expand All @@ -300,7 +309,7 @@ mod tests {
use rand::Rng;
use rand::RngCore;
use tokio::sync::Mutex;
use tokio::time::sleep;
use tokio::time::{sleep, timeout};

use super::*;
use crate::raw::oio::Write;
Expand Down Expand Up @@ -347,7 +356,7 @@ mod tests {
}

// Add an async sleep here to enforce some pending.
sleep(Duration::from_millis(50)).await;
sleep(Duration::from_nanos(50)).await;

// We will have 10% percent rate for write part to fail.
if thread_rng().gen_bool(1.0 / 10.0) {
Expand Down Expand Up @@ -385,11 +394,38 @@ mod tests {
}
}

struct TimeoutExecutor {
exec: Arc<dyn Execute>,
}

impl TimeoutExecutor {
pub fn new() -> Self {
Self {
exec: Executor::new().into_inner(),
}
}
}

impl Execute for TimeoutExecutor {
fn execute(&self, f: BoxedStaticFuture<()>) {
self.exec.execute(f)
}

fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
let time = thread_rng().gen_range(0..100);
Some(Box::pin(tokio::time::sleep(Duration::from_nanos(time))))
}
}

#[tokio::test]
async fn test_multipart_upload_writer_with_concurrent_errors() {
let mut rng = thread_rng();

let mut w = MultipartWriter::new(TestWrite::new(), Some(Executor::new()), 200);
let mut w = MultipartWriter::new(
TestWrite::new(),
Some(Executor::with(TimeoutExecutor::new())),
200,
);
let mut total_size = 0u64;

for _ in 0..1000 {
Expand All @@ -400,17 +436,23 @@ mod tests {
rng.fill_bytes(&mut bs);

loop {
match w.write(bs.clone().into()).await {
Ok(_) => break,
Err(_) => continue,
match timeout(Duration::from_nanos(10), w.write(bs.clone().into())).await {
Ok(Ok(_)) => break,
Ok(Err(_)) => continue,
Err(_) => {
continue;
}
}
}
}

loop {
match w.close().await {
Ok(_) => break,
Err(_) => continue,
match timeout(Duration::from_nanos(10), w.close()).await {
Ok(Ok(_)) => break,
Ok(Err(_)) => continue,
Err(_) => {
continue;
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions core/src/types/execute/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ impl<T: 'static> Task<T> {
pub fn new(handle: RemoteHandle<T>) -> Self {
Self { handle }
}

/// Replace the task with a new task.
///
/// The old task will be dropped directly.
#[inline]
pub fn replace(&mut self, new_task: Self) {
self.handle = new_task.handle;
}
}

impl<T: 'static> Future for Task<T> {
Expand Down
Loading