Skip to content

Commit

Permalink
feat(core): Add reader size check in complete reader (#4690)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Jun 5, 2024
1 parent 979ceb7 commit e948ec8
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 7 deletions.
63 changes: 58 additions & 5 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::cmp::Ordering;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
Expand Down Expand Up @@ -398,10 +399,12 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
if !capability.read {
return Err(self.new_unsupported_error(Operation::Read));
}

let size = args.range().size();
self.inner
.read(path, args)
.await
.map(|(rp, r)| (rp, CompleteReader(r)))
.map(|(rp, r)| (rp, CompleteReader::new(r, size)))
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
Expand Down Expand Up @@ -514,9 +517,11 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
if !capability.read || !capability.blocking {
return Err(self.new_unsupported_error(Operation::Read));
}

let size = args.range().size();
self.inner
.blocking_read(path, args)
.map(|(rp, r)| (rp, CompleteReader(r)))
.map(|(rp, r)| (rp, CompleteReader::new(r, size)))
}

fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
Expand Down Expand Up @@ -584,18 +589,66 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
pub type CompleteLister<A, P> =
FourWays<P, FlatLister<Arc<A>, P>, PrefixLister<P>, PrefixLister<FlatLister<Arc<A>, P>>>;

pub struct CompleteReader<R>(R);
pub struct CompleteReader<R> {
inner: R,
size: Option<u64>,
read: u64,
}

impl<R> CompleteReader<R> {
pub fn new(inner: R, size: Option<u64>) -> Self {
Self {
inner,
size,
read: 0,
}
}

pub fn check(&self) -> Result<()> {
let Some(size) = self.size else {
return Ok(());
};

match self.read.cmp(&size) {
Ordering::Equal => Ok(()),
Ordering::Less => Err(
Error::new(ErrorKind::Unexpected, "reader got too little data")
.with_context("expect", size)
.with_context("actual", self.read),
),
Ordering::Greater => Err(
Error::new(ErrorKind::Unexpected, "reader got too much data")
.with_context("expect", size)
.with_context("actual", self.read),
),
}
}
}

impl<R: oio::Read> oio::Read for CompleteReader<R> {
async fn read(&mut self) -> Result<Buffer> {
let buf = self.0.read().await?;
let buf = self.inner.read().await?;

if buf.is_empty() {
self.check()?;
} else {
self.read += buf.len() as u64;
}

Ok(buf)
}
}

impl<R: oio::BlockingRead> oio::BlockingRead for CompleteReader<R> {
fn read(&mut self) -> Result<Buffer> {
let buf = self.0.read()?;
let buf = self.inner.read()?;

if buf.is_empty() {
self.check()?;
} else {
self.read += buf.len() as u64;
}

Ok(buf)
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/types/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ impl Error {
}

/// Add more context in error.
pub fn with_context(mut self, key: &'static str, value: impl Into<String>) -> Self {
self.context.push((key, value.into()));
pub fn with_context(mut self, key: &'static str, value: impl ToString) -> Self {
self.context.push((key, value.to_string()));
self
}

Expand Down

0 comments on commit e948ec8

Please sign in to comment.