diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 8e265b2171e..97483f10b17 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::cmp::Ordering; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; @@ -398,10 +399,12 @@ impl LayeredAccess for CompleteAccessor { if !capability.read { return Err(self.new_unsupported_error(Operation::Read)); } + + let size = args.range().size(); self.inner .read(path, args) .await - .map(|(rp, r)| (rp, CompleteReader(r))) + .map(|(rp, r)| (rp, CompleteReader::new(r, size))) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { @@ -514,9 +517,11 @@ impl LayeredAccess for CompleteAccessor { if !capability.read || !capability.blocking { return Err(self.new_unsupported_error(Operation::Read)); } + + let size = args.range().size(); self.inner .blocking_read(path, args) - .map(|(rp, r)| (rp, CompleteReader(r))) + .map(|(rp, r)| (rp, CompleteReader::new(r, size))) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { @@ -584,18 +589,66 @@ impl LayeredAccess for CompleteAccessor { pub type CompleteLister = FourWays, P>, PrefixLister

, PrefixLister, P>>>; -pub struct CompleteReader(R); +pub struct CompleteReader { + inner: R, + size: Option, + read: u64, +} + +impl CompleteReader { + pub fn new(inner: R, size: Option) -> Self { + Self { + inner, + size, + read: 0, + } + } + + pub fn check(&self) -> Result<()> { + let Some(size) = self.size else { + return Ok(()); + }; + + match self.read.cmp(&size) { + Ordering::Equal => Ok(()), + Ordering::Less => Err( + Error::new(ErrorKind::Unexpected, "reader got too little data") + .with_context("expect", size) + .with_context("actual", self.read), + ), + Ordering::Greater => Err( + Error::new(ErrorKind::Unexpected, "reader got too much data") + .with_context("expect", size) + .with_context("actual", self.read), + ), + } + } +} impl oio::Read for CompleteReader { async fn read(&mut self) -> Result { - let buf = self.0.read().await?; + let buf = self.inner.read().await?; + + if buf.is_empty() { + self.check()?; + } else { + self.read += buf.len() as u64; + } + Ok(buf) } } impl oio::BlockingRead for CompleteReader { fn read(&mut self) -> Result { - let buf = self.0.read()?; + let buf = self.inner.read()?; + + if buf.is_empty() { + self.check()?; + } else { + self.read += buf.len() as u64; + } + Ok(buf) } } diff --git a/core/src/types/error.rs b/core/src/types/error.rs index f2c258dc61a..6576f3dbfc4 100644 --- a/core/src/types/error.rs +++ b/core/src/types/error.rs @@ -334,8 +334,8 @@ impl Error { } /// Add more context in error. - pub fn with_context(mut self, key: &'static str, value: impl Into) -> Self { - self.context.push((key, value.into())); + pub fn with_context(mut self, key: &'static str, value: impl ToString) -> Self { + self.context.push((key, value.to_string())); self }