Skip to content

Commit

Permalink
feat: Add retry for Writer::sink operation (#2896)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Aug 21, 2023
1 parent 6f74f55 commit 8649189
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 9 deletions.
3 changes: 2 additions & 1 deletion bindings/ocaml/src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ mod _type;
mod metadata;
mod reader;

use super::*;
use _type::*;

use super::*;

#[ocaml::func]
#[ocaml::sig("string -> (string * string) list -> (operator, string) Result.t ")]
pub fn operator(
Expand Down
4 changes: 2 additions & 2 deletions bindings/ocaml/src/operator/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

use std::io;

use super::*;

use opendal::raw::oio::BlockingRead;

use super::*;

#[ocaml::func]
#[ocaml::sig("reader -> bytes -> (int, string) Result.t ")]
pub fn reader_read(reader: &mut Reader, buf: &mut [u8]) -> Result<usize, String> {
Expand Down
60 changes: 58 additions & 2 deletions core/src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use backon::Retryable;
use bytes::Bytes;
use futures::FutureExt;
use log::warn;
use tokio::sync::Mutex;

use crate::raw::oio::PageOperation;
use crate::raw::oio::ReadOperation;
Expand Down Expand Up @@ -898,9 +899,64 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
}
}

/// Sink will move the input stream, so we can't retry it.
/// > Ooooooooooops, are you crazy!? Why we need to do `Arc<Mutex<S>>` here? Adding a lock has
/// a lot overhead!
///
/// Yes, you are right. But we have no choice. This is the only safe way for us to add retry
/// support for stream.
///
/// And the overhead is acceptable. Based on our benchmark, adding a lock
/// that has no conflicts will only cost 5ns.
///
/// ```shell
/// stream/without_arc_mutex
/// time: [10.715 ns 10.729 ns 10.744 ns]
/// thrpt: [ 90896 GiB/s 91019 GiB/s 91139 GiB/s]
/// stream/with_arc_mutex time: [14.891 ns 14.905 ns 14.928 ns]
/// thrpt: [ 65418 GiB/s 65517 GiB/s 65581 GiB/s]
/// ```
///
/// The overhead is constant, which means the overhead will not increase with the size of
/// stream. For example, if every `next` call cost 1ms, then the overhead will only take 0.005%
/// which is acceptable.
async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
self.inner.sink(size, s).await
let s = Arc::new(Mutex::new(s));

let mut backoff = self.builder.build();

loop {
match self.inner.sink(size, Box::new(s.clone())).await {
Ok(_) => return Ok(()),
Err(e) if !e.is_temporary() => return Err(e),
Err(e) => match backoff.next() {
None => return Err(e),
Some(dur) => {
{
use oio::StreamExt;

let mut stream = s.lock().await;
// Try to reset this stream.
//
// If error happened, we will return the sink error directly and stop retry.
if stream.reset().await.is_err() {
return Err(e);
}
}

self.notify.intercept(
&e,
dur,
&[
("operation", WriteOperation::Sink.into_static()),
("path", &self.path),
],
);
tokio::time::sleep(dur).await;
continue;
}
},
}
}
}

async fn abort(&mut self) -> Result<()> {
Expand Down
118 changes: 114 additions & 4 deletions core/src/raw/oio/stream/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use bytes::Bytes;
use pin_project::pin_project;

use crate::*;

Expand All @@ -32,6 +36,9 @@ pub type Streamer = Box<dyn Stream>;
pub trait Stream: Unpin + Send + Sync {
/// Poll next item `Result<Bytes>` from the stream.
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>>;

/// Reset this stream to the beginning.
fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>;
}

impl Stream for () {
Expand All @@ -40,6 +47,12 @@ impl Stream for () {

unimplemented!("poll_next is required to be implemented for oio::Stream")
}

fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
let _ = cx;

unimplemented!("poll_reset is required to be implemented for oio::Stream")
}
}

/// `Box<dyn Stream>` won't implement `Stream` automatically.
Expand All @@ -48,17 +61,114 @@ impl<T: Stream + ?Sized> Stream for Box<T> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
(**self).poll_next(cx)
}

fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
(**self).poll_reset(cx)
}
}

impl<T: Stream + ?Sized> Stream for Arc<std::sync::Mutex<T>> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
match self.try_lock() {
Ok(mut this) => this.poll_next(cx),
Err(_) => Poll::Ready(Some(Err(Error::new(
ErrorKind::Unexpected,
"the stream is expected to have only one consumer, but it's not",
)))),
}
}

fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self.try_lock() {
Ok(mut this) => this.poll_reset(cx),
Err(_) => Poll::Ready(Err(Error::new(
ErrorKind::Unexpected,
"the stream is expected to have only one consumer, but it's not",
))),
}
}
}

impl<T: Stream + ?Sized> Stream for Arc<tokio::sync::Mutex<T>> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
match self.try_lock() {
Ok(mut this) => this.poll_next(cx),
Err(_) => Poll::Ready(Some(Err(Error::new(
ErrorKind::Unexpected,
"the stream is expected to have only one consumer, but it's not",
)))),
}
}

fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self.try_lock() {
Ok(mut this) => this.poll_reset(cx),
Err(_) => Poll::Ready(Err(Error::new(
ErrorKind::Unexpected,
"the stream is expected to have only one consumer, but it's not",
))),
}
}
}

impl futures::Stream for dyn Stream {
type Item = Result<Bytes>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this: &mut dyn Stream = &mut *self;

this.poll_next(cx)
}
}

/// Impl StreamExt for all T: Stream
impl<T: Stream> StreamExt for T {}

/// Extension of [`Stream`] to make it easier for use.
pub trait StreamExt: Stream {
/// Build a future for `poll_next`.
fn next(&mut self) -> NextFuture<'_, Self> {
NextFuture { inner: self }
}

/// Build a future for `poll_reset`.
fn reset(&mut self) -> ResetFuture<'_, Self> {
ResetFuture { inner: self }
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct NextFuture<'a, T: Stream + Unpin + ?Sized> {
inner: &'a mut T,
}

impl<T> Future for NextFuture<'_, T>
where
T: Stream + Unpin + ?Sized,
{
type Output = Option<Result<Bytes>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
let this = self.project();
Pin::new(this.inner).poll_next(cx)
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct ResetFuture<'a, T: Stream + Unpin + ?Sized> {
inner: &'a mut T,
}

impl<T> Future for ResetFuture<'_, T>
where
T: Stream + Unpin + ?Sized,
{
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.project();
Pin::new(this.inner).poll_reset(cx)
}
}
7 changes: 7 additions & 0 deletions core/src/raw/oio/stream/into_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,11 @@ where
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
self.inner.try_poll_next_unpin(cx)
}

fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"IntoStream doesn't support reset",
)))
}
}
7 changes: 7 additions & 0 deletions core/src/raw/oio/stream/into_stream_from_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,11 @@ where
.set_source(err)))),
}
}

fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"FromReaderStream doesn't support reset",
)))
}
}
1 change: 1 addition & 0 deletions core/src/raw/oio/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

mod api;
pub use api::Stream;
pub use api::StreamExt;
pub use api::Streamer;

mod into_stream_from_reader;
Expand Down

0 comments on commit 8649189

Please sign in to comment.