From 90ec37720c67c3734bb27d40b3ffcae6d6ba4abb Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sun, 21 Jan 2024 12:23:18 +0800 Subject: [PATCH 01/10] set tcp keepalive for client Signed-off-by: Xuanwo --- core/src/raw/http_util/client.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index e4d47d88e86..4bd3921fb2d 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -35,6 +35,7 @@ use crate::ErrorKind; use crate::Result; const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(60); +const DEFAULT_TCP_KEEPALIVE: Duration = Duration::from_secs(60); /// HttpClient that used across opendal. #[derive(Clone)] @@ -64,6 +65,8 @@ impl HttpClient { builder = builder.no_brotli(); // Make sure we don't enable auto deflate decompress. builder = builder.no_deflate(); + // Make sure we keep tcp alive for 60s to avoid waiting dead connection. + builder = builder.tcp_keepalive(DEFAULT_TCP_KEEPALIVE); // Make sure we don't wait a connection establishment forever. builder = builder.connect_timeout(DEFAULT_CONNECT_TIMEOUT); From 91e31db0a4fc50f26710aa2e1fbba56efc96e282 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 22 Jan 2024 00:58:41 +0800 Subject: [PATCH 02/10] Save work Signed-off-by: Xuanwo --- core/src/layers/timeout.rs | 156 ++++++++++++++++++++++--------------- 1 file changed, 92 insertions(+), 64 deletions(-) diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index cae6ee91843..6f50177f086 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::future::Future; use std::io::SeekFrom; use std::task::Context; use std::task::Poll; @@ -30,76 +31,96 @@ use crate::raw::oio::WriteOperation; use crate::raw::*; use crate::*; -/// Add timeout for every operations. +/// Add timeout for every operations to avoid slow or unexpected hang operations. /// -/// # Notes +/// For example, a dead connection could hang a databases sql query. TimeoutLayer +/// will break this connection and returns an error so users can handle it by +/// retrying or print to users. /// -/// - For IO operations like `read`, `write`, we will set a timeout -/// for each single IO operation. -/// - For other operations like `stat`, and `delete`, the timeout is for the whole -/// operation. +/// # Notes /// -/// Besides, we will also set a slow speed for each IO operation. If the IO -/// operation's speed is lower than the slow speed, we will return a timeout error -/// instead of kept waiting for it. +/// `TimeoutLayer` treats all operations in two kinds: /// -/// For examples, if we set timeout to 60 seconds and speed to 1MiB/s, then: +/// - Non IO Operation like `stat`, `delete` they operate on a single file. We control +/// them by setting `timeout`. +/// - IO Operation like `read`, `Reader::read` and `Writer::write`, they operate on data directly, we +/// control them by setting `io_timeout`. /// -/// - If `stat` didn't return in 60 seconds, we will return a timeout error. -/// - If `Reader::read` didn't return in 60 seconds, we will return a timeout error. -/// - For `Writer::write(vec![1024*1024*1024])` -/// - didn't return in 60s, it's ok, we will keep waiting. -/// - didn't return in 1024s (1GiB/1MiB), we will return a timeout error. +/// It happens that a connection could be slow but not dead, so we have a `max_io_timeouts` to +/// control how many consecutive IO timeouts we can tolerate. If `max_io_timeouts` is not reached, +/// we will print a warning and keep waiting this io operation instead. /// /// # Default /// /// - timeout: 60 seconds -/// - speed: 1024 bytes per second, aka, 1KiB/s. +/// - io_timeout: 10 seconds +/// - max_io_timeouts: 3 times /// /// # Examples /// +/// The following examples will create a timeout layer with 10 seconds timeout for all non-io +/// operations, 3 seconds timeout for all io operations and 2 consecutive io timeouts are allowed. +/// /// ``` /// use anyhow::Result; /// use opendal::layers::TimeoutLayer; /// use opendal::services; /// use opendal::Operator; /// use opendal::Scheme; +/// use std::time::Duration; /// /// let _ = Operator::new(services::Memory::default()) /// .expect("must init") -/// .layer(TimeoutLayer::default()) +/// .layer(TimeoutLayer::default().with_timeout(Duration::from_secs(10)).with_io_timeout(3).with_max_io_timeouts(2)) /// .finish(); /// ``` #[derive(Clone)] pub struct TimeoutLayer { timeout: Duration, - speed: u64, + io_timeout: Duration, + max_io_timeouts: usize, } impl Default for TimeoutLayer { fn default() -> Self { Self { timeout: Duration::from_secs(60), - speed: 1024, + io_timeout: Duration::from_secs(10), + max_io_timeouts: 3, } } } impl TimeoutLayer { /// Create a new `TimeoutLayer` with default settings. - /// - /// - timeout: 60 seconds - /// - speed: 1024 bytes per second, aka, 1KiB/s. pub fn new() -> Self { Self::default() } /// Set timeout for TimeoutLayer with given value. + /// + /// This timeout is for all non-io operations like `stat`, `delete`. pub fn with_timeout(mut self, timeout: Duration) -> Self { self.timeout = timeout; self } + /// Set io timeout for TimeoutLayer with given value. + /// + /// This timeout is for all io operations like `read`, `Reader::read` and `Writer::write`. + pub fn with_io_timeout(mut self, timeout: Duration) -> Self { + self.io_timeout = timeout; + self + } + + /// Set max io timeouts for TimeoutLayer with given value. + /// + /// This value is used to control how many consecutive io timeouts we can tolerate. + pub fn with_max_io_timeouts(mut self, v: usize) -> Self { + self.max_io_timeouts = v; + self + } + /// Set speed for TimeoutLayer with given value. /// /// # Notes @@ -110,10 +131,8 @@ impl TimeoutLayer { /// # Panics /// /// This function will panic if speed is 0. - pub fn with_speed(mut self, speed: u64) -> Self { - assert_ne!(speed, 0, "TimeoutLayer speed must not be 0"); - - self.speed = speed; + #[deprecated(note = "with speed is not supported anymore, please use with_io_timeout instead")] + pub fn with_speed(mut self, _: u64) -> Self { self } } @@ -126,7 +145,8 @@ impl Layer for TimeoutLayer { inner, timeout: self.timeout, - speed: self.speed, + io_timeout: self.io_timeout, + max_io_timeouts: self.max_io_timeouts, } } } @@ -136,7 +156,25 @@ pub struct TimeoutAccessor { inner: A, timeout: Duration, - speed: u64, + io_timeout: Duration, + max_io_timeouts: usize, +} + +impl TimeoutAccessor { + async fn io_timeout>, T>( + &self, + op: Operation, + fut: F, + ) -> Result { + tokio::time::timeout(self.io_timeout, fut) + .await + .map_err(|_| { + Error::new(ErrorKind::Unexpected, "io operation timeout reached") + .with_operation(op) + .with_context("io_timeout", self.io_timeout.as_secs_f64().to_string()) + .set_temporary() + })? + } } #[cfg_attr(not(target_arch = "wasm32"), async_trait)] @@ -155,39 +193,36 @@ impl LayeredAccessor for TimeoutAccessor { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - tokio::time::timeout(self.timeout, self.inner.read(path, args)) + self.io_timeout(Operation::Read, self.inner.read(path, args)) .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") - .with_operation(Operation::Read) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary() - })? - .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.timeout, self.speed))) + .map(|(rp, r)| { + ( + rp, + TimeoutWrapper::new(r, self.io_timeout, self.max_io_timeouts), + ) + }) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - tokio::time::timeout(self.timeout, self.inner.write(path, args)) + self.io_timeout(Operation::Write, self.inner.write(path, args)) .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") - .with_operation(Operation::Write) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary() - })? - .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.timeout, self.speed))) + .map(|(rp, r)| { + ( + rp, + TimeoutWrapper::new(r, self.io_timeout, self.max_io_timeouts), + ) + }) } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - tokio::time::timeout(self.timeout, self.inner.list(path, args)) + self.io_timeout(Operation::List, self.inner.list(path, args)) .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") - .with_operation(Operation::List) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary() - })? - .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.timeout, self.speed))) + .map(|(rp, r)| { + ( + rp, + TimeoutWrapper::new(r, self.io_timeout, self.max_io_timeouts), + ) + }) } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { @@ -207,28 +242,21 @@ pub struct TimeoutWrapper { inner: R, timeout: Duration, - #[allow(dead_code)] - speed: u64, + max_timeouts: usize, - start: Option, + current_timeouts: usize, + futures: Option<(BoxedFuture)>, } impl TimeoutWrapper { - fn new(inner: R, timeout: Duration, speed: u64) -> Self { + fn new(inner: R, timeout: Duration, max_timeouts: usize) -> Self { Self { inner, timeout, - speed, + max_timeouts, start: None, } } - - #[allow(dead_code)] - fn io_timeout(&self, size: u64) -> Duration { - let timeout = Duration::from_millis(size * 1000 / self.speed + 1); - - timeout.max(self.timeout) - } } impl oio::Read for TimeoutWrapper { From 02320a866b921159abad51612cac49279a5356c9 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 23 Jan 2024 22:46:20 +0800 Subject: [PATCH 03/10] Implement timeout layer correctly Signed-off-by: Xuanwo --- core/src/layers/timeout.rs | 333 ++++++++++++------------------------- 1 file changed, 104 insertions(+), 229 deletions(-) diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 6f50177f086..77e55c60122 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -17,13 +17,14 @@ use std::future::Future; use std::io::SeekFrom; -use std::task::Context; +use std::pin::Pin; use std::task::Poll; +use std::task::{ready, Context}; use std::time::Duration; -use std::time::Instant; use async_trait::async_trait; use bytes::Bytes; +use tokio::time::Sleep; use crate::raw::oio::ListOperation; use crate::raw::oio::ReadOperation; @@ -46,15 +47,10 @@ use crate::*; /// - IO Operation like `read`, `Reader::read` and `Writer::write`, they operate on data directly, we /// control them by setting `io_timeout`. /// -/// It happens that a connection could be slow but not dead, so we have a `max_io_timeouts` to -/// control how many consecutive IO timeouts we can tolerate. If `max_io_timeouts` is not reached, -/// we will print a warning and keep waiting this io operation instead. -/// /// # Default /// /// - timeout: 60 seconds /// - io_timeout: 10 seconds -/// - max_io_timeouts: 3 times /// /// # Examples /// @@ -71,14 +67,13 @@ use crate::*; /// /// let _ = Operator::new(services::Memory::default()) /// .expect("must init") -/// .layer(TimeoutLayer::default().with_timeout(Duration::from_secs(10)).with_io_timeout(3).with_max_io_timeouts(2)) +/// .layer(TimeoutLayer::default().with_timeout(Duration::from_secs(10)).with_io_timeout(3)) /// .finish(); /// ``` #[derive(Clone)] pub struct TimeoutLayer { timeout: Duration, io_timeout: Duration, - max_io_timeouts: usize, } impl Default for TimeoutLayer { @@ -86,7 +81,6 @@ impl Default for TimeoutLayer { Self { timeout: Duration::from_secs(60), io_timeout: Duration::from_secs(10), - max_io_timeouts: 3, } } } @@ -113,14 +107,6 @@ impl TimeoutLayer { self } - /// Set max io timeouts for TimeoutLayer with given value. - /// - /// This value is used to control how many consecutive io timeouts we can tolerate. - pub fn with_max_io_timeouts(mut self, v: usize) -> Self { - self.max_io_timeouts = v; - self - } - /// Set speed for TimeoutLayer with given value. /// /// # Notes @@ -132,7 +118,7 @@ impl TimeoutLayer { /// /// This function will panic if speed is 0. #[deprecated(note = "with speed is not supported anymore, please use with_io_timeout instead")] - pub fn with_speed(mut self, _: u64) -> Self { + pub fn with_speed(self, _: u64) -> Self { self } } @@ -146,7 +132,6 @@ impl Layer for TimeoutLayer { timeout: self.timeout, io_timeout: self.io_timeout, - max_io_timeouts: self.max_io_timeouts, } } } @@ -157,10 +142,18 @@ pub struct TimeoutAccessor { timeout: Duration, io_timeout: Duration, - max_io_timeouts: usize, } impl TimeoutAccessor { + async fn timeout>, T>(&self, op: Operation, fut: F) -> Result { + tokio::time::timeout(self.timeout, fut).await.map_err(|_| { + Error::new(ErrorKind::Unexpected, "operation timeout reached") + .with_operation(op) + .with_context("timeout", self.timeout.as_secs_f64().to_string()) + .set_temporary() + })? + } + async fn io_timeout>, T>( &self, op: Operation, @@ -169,9 +162,9 @@ impl TimeoutAccessor { tokio::time::timeout(self.io_timeout, fut) .await .map_err(|_| { - Error::new(ErrorKind::Unexpected, "io operation timeout reached") + Error::new(ErrorKind::Unexpected, "io timeout reached") .with_operation(op) - .with_context("io_timeout", self.io_timeout.as_secs_f64().to_string()) + .with_context("timeout", self.io_timeout.as_secs_f64().to_string()) .set_temporary() })? } @@ -192,37 +185,56 @@ impl LayeredAccessor for TimeoutAccessor { &self.inner } + async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { + self.timeout(Operation::CreateDir, self.inner.create_dir(path, args)) + .await + } + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { self.io_timeout(Operation::Read, self.inner.read(path, args)) .await - .map(|(rp, r)| { - ( - rp, - TimeoutWrapper::new(r, self.io_timeout, self.max_io_timeouts), - ) - }) + .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout))) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { self.io_timeout(Operation::Write, self.inner.write(path, args)) .await - .map(|(rp, r)| { - ( - rp, - TimeoutWrapper::new(r, self.io_timeout, self.max_io_timeouts), - ) - }) + .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout))) + } + + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { + self.timeout(Operation::Copy, self.inner.copy(from, to, args)) + .await + } + + async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result { + self.timeout(Operation::Rename, self.inner.rename(from, to, args)) + .await + } + + async fn stat(&self, path: &str, args: OpStat) -> Result { + self.timeout(Operation::Stat, self.inner.stat(path, args)) + .await + } + + async fn delete(&self, path: &str, args: OpDelete) -> Result { + self.timeout(Operation::Delete, self.inner.delete(path, args)) + .await } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { self.io_timeout(Operation::List, self.inner.list(path, args)) .await - .map(|(rp, r)| { - ( - rp, - TimeoutWrapper::new(r, self.io_timeout, self.max_io_timeouts), - ) - }) + .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout))) + } + + async fn batch(&self, args: OpBatch) -> Result { + self.timeout(Operation::Batch, self.inner.batch(args)).await + } + + async fn presign(&self, path: &str, args: OpPresign) -> Result { + self.timeout(Operation::Presign, self.inner.presign(path, args)) + .await } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { @@ -242,235 +254,98 @@ pub struct TimeoutWrapper { inner: R, timeout: Duration, - max_timeouts: usize, - - current_timeouts: usize, - futures: Option<(BoxedFuture)>, + sleep: Option>>, } impl TimeoutWrapper { - fn new(inner: R, timeout: Duration, max_timeouts: usize) -> Self { + fn new(inner: R, timeout: Duration) -> Self { Self { inner, timeout, - max_timeouts, - start: None, + sleep: None, } } -} -impl oio::Read for TimeoutWrapper { - fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - match self.start { - Some(start) => { - if start.elapsed() > self.timeout { - // Clean up the start time before return ready. - self.start = None; - - return Poll::Ready(Err(Error::new( - ErrorKind::Unexpected, - "operation timeout", + #[inline] + fn poll_timeout(&mut self, cx: &mut Context<'_>, op: &'static str) -> Result<()> { + if let Some(sleep) = self.sleep.as_mut() { + match sleep.as_mut().poll(cx) { + Poll::Pending => Ok(()), + Poll::Ready(_) => { + self.sleep = None; + Err( + Error::new(ErrorKind::Unexpected, "io operation timeout reached") + .with_operation(op) + .with_context("io_timeout", self.timeout.as_secs_f64().to_string()) + .set_temporary(), ) - .with_operation(ReadOperation::Read) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary())); } } - None => { - self.start = Some(Instant::now()); - } + } else { + self.sleep = Some(Box::pin(tokio::time::sleep(self.timeout))); + Ok(()) } + } +} - match self.inner.poll_read(cx, buf) { - Poll::Pending => Poll::Pending, - Poll::Ready(v) => { - self.start = None; - Poll::Ready(v) - } - } +impl oio::Read for TimeoutWrapper { + fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + self.poll_timeout(cx, ReadOperation::Read.into_static())?; + + let v = ready!(self.inner.poll_read(cx, buf)); + self.sleep = None; + Poll::Ready(v) } fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { - match self.start { - Some(start) => { - if start.elapsed() > self.timeout { - // Clean up the start time before return ready. - self.start = None; - - return Poll::Ready(Err(Error::new( - ErrorKind::Unexpected, - "operation timeout", - ) - .with_operation(ReadOperation::Seek) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary())); - } - } - None => { - self.start = Some(Instant::now()); - } - } + self.poll_timeout(cx, ReadOperation::Seek.into_static())?; - match self.inner.poll_seek(cx, pos) { - Poll::Pending => Poll::Pending, - Poll::Ready(v) => { - self.start = None; - Poll::Ready(v) - } - } + let v = ready!(self.inner.poll_seek(cx, pos)); + self.sleep = None; + Poll::Ready(v) } fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - match self.start { - Some(start) => { - if start.elapsed() > self.timeout { - // Clean up the start time before return ready. - self.start = None; - - return Poll::Ready(Some(Err(Error::new( - ErrorKind::Unexpected, - "operation timeout", - ) - .with_operation(ReadOperation::Next) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary()))); - } - } - None => { - self.start = Some(Instant::now()); - } - } + self.poll_timeout(cx, ReadOperation::Next.into_static())?; - match self.inner.poll_next(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(v) => { - self.start = None; - Poll::Ready(v) - } - } + let v = ready!(self.inner.poll_next(cx)); + self.sleep = None; + Poll::Ready(v) } } impl oio::Write for TimeoutWrapper { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { - match self.start { - Some(start) => { - if start.elapsed() > self.timeout { - // Clean up the start time before return ready. - self.start = None; - - return Poll::Ready(Err(Error::new( - ErrorKind::Unexpected, - "operation timeout", - ) - .with_operation(WriteOperation::Write) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary())); - } - } - None => { - self.start = Some(Instant::now()); - } - } + self.poll_timeout(cx, WriteOperation::Write.into_static())?; - match self.inner.poll_write(cx, bs) { - Poll::Pending => Poll::Pending, - Poll::Ready(v) => { - self.start = None; - Poll::Ready(v) - } - } + let v = ready!(self.inner.poll_write(cx, bs)); + self.sleep = None; + Poll::Ready(v) } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll> { - match self.start { - Some(start) => { - if start.elapsed() > self.timeout { - // Clean up the start time before return ready. - self.start = None; - - return Poll::Ready(Err(Error::new( - ErrorKind::Unexpected, - "operation timeout", - ) - .with_operation(WriteOperation::Abort) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary())); - } - } - None => { - self.start = Some(Instant::now()); - } - } + self.poll_timeout(cx, WriteOperation::Abort.into_static())?; - match self.inner.poll_abort(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(v) => { - self.start = None; - Poll::Ready(v) - } - } + let v = ready!(self.inner.poll_abort(cx)); + self.sleep = None; + Poll::Ready(v) } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { - match self.start { - Some(start) => { - if start.elapsed() > self.timeout { - // Clean up the start time before return ready. - self.start = None; - - return Poll::Ready(Err(Error::new( - ErrorKind::Unexpected, - "operation timeout", - ) - .with_operation(WriteOperation::Close) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary())); - } - } - None => { - self.start = Some(Instant::now()); - } - } + self.poll_timeout(cx, WriteOperation::Close.into_static())?; - match self.inner.poll_close(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(v) => { - self.start = None; - Poll::Ready(v) - } - } + let v = ready!(self.inner.poll_close(cx)); + self.sleep = None; + Poll::Ready(v) } } impl oio::List for TimeoutWrapper { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - match self.start { - Some(start) => { - if start.elapsed() > self.timeout { - // Clean up the start time before return ready. - self.start = None; - - return Poll::Ready(Err(Error::new( - ErrorKind::Unexpected, - "operation timeout", - ) - .with_operation(ListOperation::Next) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary())); - } - } - None => { - self.start = Some(Instant::now()); - } - } + self.poll_timeout(cx, ListOperation::Next.into_static())?; - match self.inner.poll_next(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(v) => { - self.start = None; - Poll::Ready(v) - } - } + let v = ready!(self.inner.poll_next(cx)); + self.sleep = None; + Poll::Ready(v) } } From e5eb1271bba79b440116c89f2dc4c5933065abda Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 23 Jan 2024 22:46:37 +0800 Subject: [PATCH 04/10] Format code Signed-off-by: Xuanwo --- core/src/layers/timeout.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 77e55c60122..6450726f76a 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -323,18 +323,18 @@ impl oio::Write for TimeoutWrapper { Poll::Ready(v) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll> { - self.poll_timeout(cx, WriteOperation::Abort.into_static())?; + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + self.poll_timeout(cx, WriteOperation::Close.into_static())?; - let v = ready!(self.inner.poll_abort(cx)); + let v = ready!(self.inner.poll_close(cx)); self.sleep = None; Poll::Ready(v) } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { - self.poll_timeout(cx, WriteOperation::Close.into_static())?; + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll> { + self.poll_timeout(cx, WriteOperation::Abort.into_static())?; - let v = ready!(self.inner.poll_close(cx)); + let v = ready!(self.inner.poll_abort(cx)); self.sleep = None; Poll::Ready(v) } From 38097c5493bfae9b1e86dadcb9871c78c21b5a26 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 23 Jan 2024 22:47:46 +0800 Subject: [PATCH 05/10] Format code Signed-off-by: Xuanwo --- core/src/layers/timeout.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 6450726f76a..a8957bca773 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -55,7 +55,7 @@ use crate::*; /// # Examples /// /// The following examples will create a timeout layer with 10 seconds timeout for all non-io -/// operations, 3 seconds timeout for all io operations and 2 consecutive io timeouts are allowed. +/// operations, 3 seconds timeout for all io operations. /// /// ``` /// use anyhow::Result; @@ -67,7 +67,9 @@ use crate::*; /// /// let _ = Operator::new(services::Memory::default()) /// .expect("must init") -/// .layer(TimeoutLayer::default().with_timeout(Duration::from_secs(10)).with_io_timeout(3)) +/// .layer(TimeoutLayer::default() +/// .with_timeout(Duration::from_secs(10)) +/// .with_io_timeout(Duration::from_secs(3))) /// .finish(); /// ``` #[derive(Clone)] From 06aee752657a3056135b3ef90d65dc1af7c17375 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 23 Jan 2024 23:04:58 +0800 Subject: [PATCH 06/10] Add test for timeout Signed-off-by: Xuanwo --- core/src/layers/timeout.rs | 109 +++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index a8957bca773..cd227283d5b 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -351,3 +351,112 @@ impl oio::List for TimeoutWrapper { Poll::Ready(v) } } + +#[cfg(test)] +mod tests { + use crate::layers::{TimeoutLayer, TypeEraseLayer}; + use crate::raw::oio::ReadExt; + use crate::raw::*; + use crate::*; + use async_trait::async_trait; + use bytes::Bytes; + use std::io::SeekFrom; + use std::sync::Arc; + use std::task::{Context, Poll}; + use std::time::Duration; + use tokio::time::{sleep, timeout}; + + #[derive(Debug, Clone, Default)] + struct MockService; + + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] + #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] + impl Accessor for MockService { + type Reader = MockReader; + type Writer = (); + type Lister = (); + type BlockingReader = (); + type BlockingWriter = (); + type BlockingLister = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_native_capability(Capability { + read: true, + delete: true, + ..Default::default() + }); + + am + } + + /// This function will build a reader that always return pending. + async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { + Ok((RpRead::new(), MockReader)) + } + + /// This function will never return. + async fn delete(&self, _: &str, _: OpDelete) -> Result { + sleep(Duration::from_secs(u64::MAX)).await; + + Ok(RpDelete::default()) + } + } + + #[derive(Debug, Clone, Default)] + struct MockReader; + + impl oio::Read for MockReader { + fn poll_read(&mut self, _: &mut Context<'_>, _: &mut [u8]) -> Poll> { + Poll::Pending + } + + fn poll_seek(&mut self, _: &mut Context<'_>, _: SeekFrom) -> Poll> { + Poll::Pending + } + + fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + Poll::Pending + } + } + + #[tokio::test] + async fn test_operation_timeout() { + let acc = Arc::new(TypeEraseLayer.layer(MockService)) as FusedAccessor; + let op = Operator::from_inner(acc) + .layer(TimeoutLayer::new().with_timeout(Duration::from_secs(1))); + + let fut = async { + let res = op.delete("test").await; + assert!(res.is_err()); + let err = res.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::Unexpected); + assert!(err.to_string().contains("timeout")) + }; + + timeout(Duration::from_secs(2), fut) + .await + .expect("this test should not exceed 2 seconds") + } + + #[tokio::test] + async fn test_io_timeout() { + let acc = Arc::new(TypeEraseLayer.layer(MockService)) as FusedAccessor; + let op = Operator::from_inner(acc) + .layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(1))); + + let fut = async { + let mut reader = op.reader("test").await.unwrap(); + + let res = reader.read(&mut [0; 4]).await; + assert!(res.is_err()); + let err = res.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::Unexpected); + assert!(err.to_string().contains("timeout")) + }; + + timeout(Duration::from_secs(2), fut) + .await + .expect("this test should not exceed 2 seconds") + } +} From 6d9af4f8b8114d6966da98e90f72d3d446af4211 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 23 Jan 2024 23:06:32 +0800 Subject: [PATCH 07/10] Revert "set tcp keepalive for client" This reverts commit 90ec37720c67c3734bb27d40b3ffcae6d6ba4abb. --- core/src/raw/http_util/client.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index 4bd3921fb2d..e4d47d88e86 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -35,7 +35,6 @@ use crate::ErrorKind; use crate::Result; const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(60); -const DEFAULT_TCP_KEEPALIVE: Duration = Duration::from_secs(60); /// HttpClient that used across opendal. #[derive(Clone)] @@ -65,8 +64,6 @@ impl HttpClient { builder = builder.no_brotli(); // Make sure we don't enable auto deflate decompress. builder = builder.no_deflate(); - // Make sure we keep tcp alive for 60s to avoid waiting dead connection. - builder = builder.tcp_keepalive(DEFAULT_TCP_KEEPALIVE); // Make sure we don't wait a connection establishment forever. builder = builder.connect_timeout(DEFAULT_CONNECT_TIMEOUT); From 92d7f407f5fc3f888340db0257070f5184ec3303 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 23 Jan 2024 23:17:03 +0800 Subject: [PATCH 08/10] Add notes Signed-off-by: Xuanwo --- core/src/layers/timeout.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index cd227283d5b..c4a0d5773f9 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -24,7 +24,6 @@ use std::time::Duration; use async_trait::async_trait; use bytes::Bytes; -use tokio::time::Sleep; use crate::raw::oio::ListOperation; use crate::raw::oio::ReadOperation; @@ -72,6 +71,21 @@ use crate::*; /// .with_io_timeout(Duration::from_secs(3))) /// .finish(); /// ``` +/// +/// # Implementation Notes +/// +/// TimeoutLayer is using [`tokio::time::timeout`] to implement timeout for operations. And IO +/// Operations insides `reader`, `writer` will use `Pin>` to track the +/// timeout. +/// +/// This might introduce a bit overhead for IO operations, but it's the only way to implement +/// timeout correctly. We used to implement tiemout layer in zero cost way that only stores +/// a [`std::time::Instant`] and check the timeout by comparing the instant with current time. +/// However, it doesn't works for all cases. +/// +/// For examples, users TCP connection could be in [Busy ESTAB](https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die) state. In this state, no IO event will be omit. The runtime +/// will never poll our future again. From the application side, this future is hanging forever +/// until this TCP connection is closed for reaching the linux [net.ipv4.tcp_retries2](https://man7.org/linux/man-pages/man7/tcp.7.html) times. #[derive(Clone)] pub struct TimeoutLayer { timeout: Duration, @@ -256,7 +270,7 @@ pub struct TimeoutWrapper { inner: R, timeout: Duration, - sleep: Option>>, + sleep: Option>>, } impl TimeoutWrapper { From 0628a0989765b4f473885cca384b800e5114e232 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 23 Jan 2024 23:18:58 +0800 Subject: [PATCH 09/10] FIx Signed-off-by: Xuanwo --- core/src/layers/timeout.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index c4a0d5773f9..8ba93a200af 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -79,7 +79,7 @@ use crate::*; /// timeout. /// /// This might introduce a bit overhead for IO operations, but it's the only way to implement -/// timeout correctly. We used to implement tiemout layer in zero cost way that only stores +/// timeout correctly. We used to implement timeout layer in zero cost way that only stores /// a [`std::time::Instant`] and check the timeout by comparing the instant with current time. /// However, it doesn't works for all cases. /// From adb6bf8b3e2554bdb6bb30ed4a7d7ebd3975fa9d Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 23 Jan 2024 23:20:56 +0800 Subject: [PATCH 10/10] Fix typo Signed-off-by: Xuanwo --- core/src/layers/timeout.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 8ba93a200af..c05d211f90b 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -83,7 +83,7 @@ use crate::*; /// a [`std::time::Instant`] and check the timeout by comparing the instant with current time. /// However, it doesn't works for all cases. /// -/// For examples, users TCP connection could be in [Busy ESTAB](https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die) state. In this state, no IO event will be omit. The runtime +/// For examples, users TCP connection could be in [Busy ESTAB](https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die) state. In this state, no IO event will be emit. The runtime /// will never poll our future again. From the application side, this future is hanging forever /// until this TCP connection is closed for reaching the linux [net.ipv4.tcp_retries2](https://man7.org/linux/man-pages/man7/tcp.7.html) times. #[derive(Clone)]