diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index cae6ee91843..c05d211f90b 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -15,11 +15,12 @@ // specific language governing permissions and limitations // under the License. +use std::future::Future; use std::io::SeekFrom; -use std::task::Context; +use std::pin::Pin; use std::task::Poll; +use std::task::{ready, Context}; use std::time::Duration; -use std::time::Instant; use async_trait::async_trait; use bytes::Bytes; @@ -30,76 +31,98 @@ use crate::raw::oio::WriteOperation; use crate::raw::*; use crate::*; -/// Add timeout for every operations. +/// Add timeout for every operations to avoid slow or unexpected hang operations. /// -/// # Notes -/// -/// - For IO operations like `read`, `write`, we will set a timeout -/// for each single IO operation. -/// - For other operations like `stat`, and `delete`, the timeout is for the whole -/// operation. +/// For example, a dead connection could hang a databases sql query. TimeoutLayer +/// will break this connection and returns an error so users can handle it by +/// retrying or print to users. /// -/// Besides, we will also set a slow speed for each IO operation. If the IO -/// operation's speed is lower than the slow speed, we will return a timeout error -/// instead of kept waiting for it. +/// # Notes /// -/// For examples, if we set timeout to 60 seconds and speed to 1MiB/s, then: +/// `TimeoutLayer` treats all operations in two kinds: /// -/// - If `stat` didn't return in 60 seconds, we will return a timeout error. -/// - If `Reader::read` didn't return in 60 seconds, we will return a timeout error. -/// - For `Writer::write(vec![1024*1024*1024])` -/// - didn't return in 60s, it's ok, we will keep waiting. -/// - didn't return in 1024s (1GiB/1MiB), we will return a timeout error. +/// - Non IO Operation like `stat`, `delete` they operate on a single file. We control +/// them by setting `timeout`. +/// - IO Operation like `read`, `Reader::read` and `Writer::write`, they operate on data directly, we +/// control them by setting `io_timeout`. /// /// # Default /// /// - timeout: 60 seconds -/// - speed: 1024 bytes per second, aka, 1KiB/s. +/// - io_timeout: 10 seconds /// /// # Examples /// +/// The following examples will create a timeout layer with 10 seconds timeout for all non-io +/// operations, 3 seconds timeout for all io operations. +/// /// ``` /// use anyhow::Result; /// use opendal::layers::TimeoutLayer; /// use opendal::services; /// use opendal::Operator; /// use opendal::Scheme; +/// use std::time::Duration; /// /// let _ = Operator::new(services::Memory::default()) /// .expect("must init") -/// .layer(TimeoutLayer::default()) +/// .layer(TimeoutLayer::default() +/// .with_timeout(Duration::from_secs(10)) +/// .with_io_timeout(Duration::from_secs(3))) /// .finish(); /// ``` +/// +/// # Implementation Notes +/// +/// TimeoutLayer is using [`tokio::time::timeout`] to implement timeout for operations. And IO +/// Operations insides `reader`, `writer` will use `Pin>` to track the +/// timeout. +/// +/// This might introduce a bit overhead for IO operations, but it's the only way to implement +/// timeout correctly. We used to implement timeout layer in zero cost way that only stores +/// a [`std::time::Instant`] and check the timeout by comparing the instant with current time. +/// However, it doesn't works for all cases. +/// +/// For examples, users TCP connection could be in [Busy ESTAB](https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die) state. In this state, no IO event will be emit. The runtime +/// will never poll our future again. From the application side, this future is hanging forever +/// until this TCP connection is closed for reaching the linux [net.ipv4.tcp_retries2](https://man7.org/linux/man-pages/man7/tcp.7.html) times. #[derive(Clone)] pub struct TimeoutLayer { timeout: Duration, - speed: u64, + io_timeout: Duration, } impl Default for TimeoutLayer { fn default() -> Self { Self { timeout: Duration::from_secs(60), - speed: 1024, + io_timeout: Duration::from_secs(10), } } } impl TimeoutLayer { /// Create a new `TimeoutLayer` with default settings. - /// - /// - timeout: 60 seconds - /// - speed: 1024 bytes per second, aka, 1KiB/s. pub fn new() -> Self { Self::default() } /// Set timeout for TimeoutLayer with given value. + /// + /// This timeout is for all non-io operations like `stat`, `delete`. pub fn with_timeout(mut self, timeout: Duration) -> Self { self.timeout = timeout; self } + /// Set io timeout for TimeoutLayer with given value. + /// + /// This timeout is for all io operations like `read`, `Reader::read` and `Writer::write`. + pub fn with_io_timeout(mut self, timeout: Duration) -> Self { + self.io_timeout = timeout; + self + } + /// Set speed for TimeoutLayer with given value. /// /// # Notes @@ -110,10 +133,8 @@ impl TimeoutLayer { /// # Panics /// /// This function will panic if speed is 0. - pub fn with_speed(mut self, speed: u64) -> Self { - assert_ne!(speed, 0, "TimeoutLayer speed must not be 0"); - - self.speed = speed; + #[deprecated(note = "with speed is not supported anymore, please use with_io_timeout instead")] + pub fn with_speed(self, _: u64) -> Self { self } } @@ -126,7 +147,7 @@ impl Layer for TimeoutLayer { inner, timeout: self.timeout, - speed: self.speed, + io_timeout: self.io_timeout, } } } @@ -136,7 +157,33 @@ pub struct TimeoutAccessor { inner: A, timeout: Duration, - speed: u64, + io_timeout: Duration, +} + +impl TimeoutAccessor { + async fn timeout>, T>(&self, op: Operation, fut: F) -> Result { + tokio::time::timeout(self.timeout, fut).await.map_err(|_| { + Error::new(ErrorKind::Unexpected, "operation timeout reached") + .with_operation(op) + .with_context("timeout", self.timeout.as_secs_f64().to_string()) + .set_temporary() + })? + } + + async fn io_timeout>, T>( + &self, + op: Operation, + fut: F, + ) -> Result { + tokio::time::timeout(self.io_timeout, fut) + .await + .map_err(|_| { + Error::new(ErrorKind::Unexpected, "io timeout reached") + .with_operation(op) + .with_context("timeout", self.io_timeout.as_secs_f64().to_string()) + .set_temporary() + })? + } } #[cfg_attr(not(target_arch = "wasm32"), async_trait)] @@ -154,40 +201,56 @@ impl LayeredAccessor for TimeoutAccessor { &self.inner } + async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { + self.timeout(Operation::CreateDir, self.inner.create_dir(path, args)) + .await + } + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - tokio::time::timeout(self.timeout, self.inner.read(path, args)) + self.io_timeout(Operation::Read, self.inner.read(path, args)) .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") - .with_operation(Operation::Read) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary() - })? - .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.timeout, self.speed))) + .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout))) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - tokio::time::timeout(self.timeout, self.inner.write(path, args)) + self.io_timeout(Operation::Write, self.inner.write(path, args)) + .await + .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout))) + } + + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { + self.timeout(Operation::Copy, self.inner.copy(from, to, args)) + .await + } + + async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result { + self.timeout(Operation::Rename, self.inner.rename(from, to, args)) + .await + } + + async fn stat(&self, path: &str, args: OpStat) -> Result { + self.timeout(Operation::Stat, self.inner.stat(path, args)) + .await + } + + async fn delete(&self, path: &str, args: OpDelete) -> Result { + self.timeout(Operation::Delete, self.inner.delete(path, args)) .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") - .with_operation(Operation::Write) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary() - })? - .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.timeout, self.speed))) } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - tokio::time::timeout(self.timeout, self.inner.list(path, args)) + self.io_timeout(Operation::List, self.inner.list(path, args)) + .await + .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout))) + } + + async fn batch(&self, args: OpBatch) -> Result { + self.timeout(Operation::Batch, self.inner.batch(args)).await + } + + async fn presign(&self, path: &str, args: OpPresign) -> Result { + self.timeout(Operation::Presign, self.inner.presign(path, args)) .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") - .with_operation(Operation::List) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary() - })? - .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.timeout, self.speed))) } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { @@ -207,242 +270,207 @@ pub struct TimeoutWrapper { inner: R, timeout: Duration, - #[allow(dead_code)] - speed: u64, - - start: Option, + sleep: Option>>, } impl TimeoutWrapper { - fn new(inner: R, timeout: Duration, speed: u64) -> Self { + fn new(inner: R, timeout: Duration) -> Self { Self { inner, timeout, - speed, - start: None, + sleep: None, } } - #[allow(dead_code)] - fn io_timeout(&self, size: u64) -> Duration { - let timeout = Duration::from_millis(size * 1000 / self.speed + 1); - - timeout.max(self.timeout) + #[inline] + fn poll_timeout(&mut self, cx: &mut Context<'_>, op: &'static str) -> Result<()> { + if let Some(sleep) = self.sleep.as_mut() { + match sleep.as_mut().poll(cx) { + Poll::Pending => Ok(()), + Poll::Ready(_) => { + self.sleep = None; + Err( + Error::new(ErrorKind::Unexpected, "io operation timeout reached") + .with_operation(op) + .with_context("io_timeout", self.timeout.as_secs_f64().to_string()) + .set_temporary(), + ) + } + } + } else { + self.sleep = Some(Box::pin(tokio::time::sleep(self.timeout))); + Ok(()) + } } } impl oio::Read for TimeoutWrapper { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - match self.start { - Some(start) => { - if start.elapsed() > self.timeout { - // Clean up the start time before return ready. - self.start = None; - - return Poll::Ready(Err(Error::new( - ErrorKind::Unexpected, - "operation timeout", - ) - .with_operation(ReadOperation::Read) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary())); - } - } - None => { - self.start = Some(Instant::now()); - } - } + self.poll_timeout(cx, ReadOperation::Read.into_static())?; - match self.inner.poll_read(cx, buf) { - Poll::Pending => Poll::Pending, - Poll::Ready(v) => { - self.start = None; - Poll::Ready(v) - } - } + let v = ready!(self.inner.poll_read(cx, buf)); + self.sleep = None; + Poll::Ready(v) } fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { - match self.start { - Some(start) => { - if start.elapsed() > self.timeout { - // Clean up the start time before return ready. - self.start = None; - - return Poll::Ready(Err(Error::new( - ErrorKind::Unexpected, - "operation timeout", - ) - .with_operation(ReadOperation::Seek) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary())); - } - } - None => { - self.start = Some(Instant::now()); - } - } + self.poll_timeout(cx, ReadOperation::Seek.into_static())?; - match self.inner.poll_seek(cx, pos) { - Poll::Pending => Poll::Pending, - Poll::Ready(v) => { - self.start = None; - Poll::Ready(v) - } - } + let v = ready!(self.inner.poll_seek(cx, pos)); + self.sleep = None; + Poll::Ready(v) } fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - match self.start { - Some(start) => { - if start.elapsed() > self.timeout { - // Clean up the start time before return ready. - self.start = None; - - return Poll::Ready(Some(Err(Error::new( - ErrorKind::Unexpected, - "operation timeout", - ) - .with_operation(ReadOperation::Next) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary()))); - } - } - None => { - self.start = Some(Instant::now()); - } - } + self.poll_timeout(cx, ReadOperation::Next.into_static())?; - match self.inner.poll_next(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(v) => { - self.start = None; - Poll::Ready(v) - } - } + let v = ready!(self.inner.poll_next(cx)); + self.sleep = None; + Poll::Ready(v) } } impl oio::Write for TimeoutWrapper { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { - match self.start { - Some(start) => { - if start.elapsed() > self.timeout { - // Clean up the start time before return ready. - self.start = None; - - return Poll::Ready(Err(Error::new( - ErrorKind::Unexpected, - "operation timeout", - ) - .with_operation(WriteOperation::Write) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary())); - } - } - None => { - self.start = Some(Instant::now()); - } - } + self.poll_timeout(cx, WriteOperation::Write.into_static())?; - match self.inner.poll_write(cx, bs) { - Poll::Pending => Poll::Pending, - Poll::Ready(v) => { - self.start = None; - Poll::Ready(v) - } - } + let v = ready!(self.inner.poll_write(cx, bs)); + self.sleep = None; + Poll::Ready(v) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll> { - match self.start { - Some(start) => { - if start.elapsed() > self.timeout { - // Clean up the start time before return ready. - self.start = None; - - return Poll::Ready(Err(Error::new( - ErrorKind::Unexpected, - "operation timeout", - ) - .with_operation(WriteOperation::Abort) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary())); - } - } - None => { - self.start = Some(Instant::now()); - } - } + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + self.poll_timeout(cx, WriteOperation::Close.into_static())?; - match self.inner.poll_abort(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(v) => { - self.start = None; - Poll::Ready(v) - } - } + let v = ready!(self.inner.poll_close(cx)); + self.sleep = None; + Poll::Ready(v) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { - match self.start { - Some(start) => { - if start.elapsed() > self.timeout { - // Clean up the start time before return ready. - self.start = None; - - return Poll::Ready(Err(Error::new( - ErrorKind::Unexpected, - "operation timeout", - ) - .with_operation(WriteOperation::Close) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary())); - } - } - None => { - self.start = Some(Instant::now()); - } - } + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll> { + self.poll_timeout(cx, WriteOperation::Abort.into_static())?; - match self.inner.poll_close(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(v) => { - self.start = None; - Poll::Ready(v) - } - } + let v = ready!(self.inner.poll_abort(cx)); + self.sleep = None; + Poll::Ready(v) } } impl oio::List for TimeoutWrapper { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - match self.start { - Some(start) => { - if start.elapsed() > self.timeout { - // Clean up the start time before return ready. - self.start = None; - - return Poll::Ready(Err(Error::new( - ErrorKind::Unexpected, - "operation timeout", - ) - .with_operation(ListOperation::Next) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary())); - } - } - None => { - self.start = Some(Instant::now()); - } + self.poll_timeout(cx, ListOperation::Next.into_static())?; + + let v = ready!(self.inner.poll_next(cx)); + self.sleep = None; + Poll::Ready(v) + } +} + +#[cfg(test)] +mod tests { + use crate::layers::{TimeoutLayer, TypeEraseLayer}; + use crate::raw::oio::ReadExt; + use crate::raw::*; + use crate::*; + use async_trait::async_trait; + use bytes::Bytes; + use std::io::SeekFrom; + use std::sync::Arc; + use std::task::{Context, Poll}; + use std::time::Duration; + use tokio::time::{sleep, timeout}; + + #[derive(Debug, Clone, Default)] + struct MockService; + + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] + #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] + impl Accessor for MockService { + type Reader = MockReader; + type Writer = (); + type Lister = (); + type BlockingReader = (); + type BlockingWriter = (); + type BlockingLister = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_native_capability(Capability { + read: true, + delete: true, + ..Default::default() + }); + + am } - match self.inner.poll_next(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(v) => { - self.start = None; - Poll::Ready(v) - } + /// This function will build a reader that always return pending. + async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { + Ok((RpRead::new(), MockReader)) } + + /// This function will never return. + async fn delete(&self, _: &str, _: OpDelete) -> Result { + sleep(Duration::from_secs(u64::MAX)).await; + + Ok(RpDelete::default()) + } + } + + #[derive(Debug, Clone, Default)] + struct MockReader; + + impl oio::Read for MockReader { + fn poll_read(&mut self, _: &mut Context<'_>, _: &mut [u8]) -> Poll> { + Poll::Pending + } + + fn poll_seek(&mut self, _: &mut Context<'_>, _: SeekFrom) -> Poll> { + Poll::Pending + } + + fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + Poll::Pending + } + } + + #[tokio::test] + async fn test_operation_timeout() { + let acc = Arc::new(TypeEraseLayer.layer(MockService)) as FusedAccessor; + let op = Operator::from_inner(acc) + .layer(TimeoutLayer::new().with_timeout(Duration::from_secs(1))); + + let fut = async { + let res = op.delete("test").await; + assert!(res.is_err()); + let err = res.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::Unexpected); + assert!(err.to_string().contains("timeout")) + }; + + timeout(Duration::from_secs(2), fut) + .await + .expect("this test should not exceed 2 seconds") + } + + #[tokio::test] + async fn test_io_timeout() { + let acc = Arc::new(TypeEraseLayer.layer(MockService)) as FusedAccessor; + let op = Operator::from_inner(acc) + .layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(1))); + + let fut = async { + let mut reader = op.reader("test").await.unwrap(); + + let res = reader.read(&mut [0; 4]).await; + assert!(res.is_err()); + let err = res.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::Unexpected); + assert!(err.to_string().contains("timeout")) + }; + + timeout(Duration::from_secs(2), fut) + .await + .expect("this test should not exceed 2 seconds") } }