Skip to content

Commit

Permalink
feat: Add content range support for RpRead (#3777)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Dec 19, 2023
1 parent 7463f91 commit 535e120
Show file tree
Hide file tree
Showing 20 changed files with 230 additions and 63 deletions.
92 changes: 62 additions & 30 deletions core/src/raw/oio/read/range_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,14 @@ where
}
}

/// Fill current reader's range by total_size.
fn fill_range(&mut self, total_size: u64) -> Result<()> {
/// Ensure current reader's offset is valid via total_size.
fn ensure_offset(&mut self, total_size: u64) -> Result<()> {
(self.offset, self.size) = match (self.offset, self.size) {
(None, Some(size)) => {
if size > total_size {
// If returns an error, we should reset
// state to Idle so that we can retry it.
self.state = State::Idle;
return Err(Error::new(
ErrorKind::InvalidInput,
"read to a negative or overflowing position is invalid",
Expand All @@ -125,6 +128,48 @@ where
Ok(())
}

/// Ensure size will use the information returned by RpRead to calculate the correct size for reader.
///
/// - If `RpRead` returns `range`, we can calculate the correct size by `range.size()`.
/// - If `RpRead` returns `size`, we can use it's as the returning body's size.
fn ensure_size(&mut self, total_size: Option<u64>, content_size: Option<u64>) {
if let Some(total_size) = total_size {
// It's valid for reader to seek to a position that out of the content length.
// We should return `Ok(0)` instead of an error at this case to align fs behavior.
let size = total_size
.checked_sub(self.offset.expect("reader offset must be valid"))
.unwrap_or_default();

// Ensure size when:
//
// - reader's size is unknown.
// - reader's size is larger than file's size.
if self.size.is_none() || Some(size) < self.size {
self.size = Some(size);
return;
}
}

if let Some(content_size) = content_size {
if content_size == 0 {
// Skip size set if content size is 0 since it could be invalid.
//
// For example, users seek to `u64::MAX` and calling read.
return;
}

let calculated_size = content_size + self.cur;

// Ensure size when:
//
// - reader's size is unknown.
// - reader's size is larger than file's size.
if self.size.is_none() || Some(calculated_size) < self.size {
self.size = Some(calculated_size);
}
}
}

/// Calculate the current range, maybe sent as next read request.
///
/// # Panics
Expand Down Expand Up @@ -256,12 +301,7 @@ where
})?;

let length = rp.into_metadata().content_length();
self.fill_range(length).map_err(|err| {
// If stat future returns an error, we should reset
// state to Idle so that we can retry it.
self.state = State::Idle;
err
})?;
self.ensure_offset(length)?;

self.state = State::Idle;
self.poll_read(cx, buf)
Expand All @@ -274,12 +314,8 @@ where
err
})?;

// Set size if read returns size hint.
if let Some(size) = rp.size() {
if size != 0 && self.size.is_none() {
self.size = Some(size + self.cur);
}
}
self.ensure_size(rp.range().unwrap_or_default().size(), rp.size());

self.state = State::Read(r);
self.poll_read(cx, buf)
}
Expand Down Expand Up @@ -339,7 +375,7 @@ where
})?;

let length = rp.into_metadata().content_length();
self.fill_range(length)?;
self.ensure_offset(length)?;

self.state = State::Idle;
self.poll_seek(cx, pos)
Expand Down Expand Up @@ -391,7 +427,7 @@ where
})?;

let length = rp.into_metadata().content_length();
self.fill_range(length)?;
self.ensure_offset(length)?;

self.state = State::Idle;
self.poll_next(cx)
Expand All @@ -405,11 +441,8 @@ where
})?;

// Set size if read returns size hint.
if let Some(size) = rp.size() {
if size != 0 && self.size.is_none() {
self.size = Some(size + self.cur);
}
}
self.ensure_size(rp.range().unwrap_or_default().size(), rp.size());

self.state = State::Read(r);
self.poll_next(cx)
}
Expand Down Expand Up @@ -450,17 +483,13 @@ where
let rp = self.stat_action()?;

let length = rp.into_metadata().content_length();
self.fill_range(length)?;
self.ensure_offset(length)?;
}

let (rp, r) = self.read_action()?;

// Set size if read returns size hint.
if let Some(size) = rp.size() {
if size != 0 && self.size.is_none() {
self.size = Some(size + self.cur);
}
}
self.ensure_size(rp.range().unwrap_or_default().size(), rp.size());

self.state = State::Read(r);
self.read(buf)
Expand Down Expand Up @@ -502,7 +531,7 @@ where
} else {
let rp = self.stat_action()?;
let length = rp.into_metadata().content_length();
self.fill_range(length)?;
self.ensure_offset(length)?;

let size = self.size.expect("size must be valid after fill_range");
(size as i64, n)
Expand Down Expand Up @@ -561,13 +590,16 @@ where
};

let length = rp.into_metadata().content_length();
if let Err(err) = self.fill_range(length) {
if let Err(err) = self.ensure_offset(length) {
return Some(Err(err));
}
}

let r = match self.read_action() {
Ok((_, r)) => r,
Ok((rp, r)) => {
self.ensure_size(rp.range().unwrap_or_default().size(), rp.size());
r
}
Err(err) => return Some(Err(err)),
};
self.state = State::Read(r);
Expand Down
24 changes: 23 additions & 1 deletion core/src/raw/rps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use http::Request;

use crate::raw::*;
use crate::*;

/// Reply for `create_dir` operation
Expand Down Expand Up @@ -107,6 +108,14 @@ pub struct RpRead {
/// It's ok to leave size as empty, but it's recommended to set size if possible. We will use
/// this size as hint to do some optimization like avoid an extra stat or read.
size: Option<u64>,
/// Range is the range of the reader returned by this read operation.
///
/// - `Some(range)` means the reader's content range inside the whole file.
/// - `None` means the reader's content range is unknown.
///
/// It's ok to leave range as empty, but it's recommended to set range if possible. We will use
/// this range as hint to do some optimization like avoid an extra stat or read.
range: Option<BytesContentRange>,
}

impl RpRead {
Expand All @@ -128,6 +137,20 @@ impl RpRead {
self.size = size;
self
}

/// Got the range of the reader returned by this read operation.
///
/// - `Some(range)` means the reader has content range inside the whole file.
/// - `None` means the reader has unknown size.
pub fn range(&self) -> Option<BytesContentRange> {
self.range
}

/// Set the range of the reader returned by this read operation.
pub fn with_range(mut self, range: Option<BytesContentRange>) -> Self {
self.range = range;
self
}
}

/// Reply for `batch` operation.
Expand Down Expand Up @@ -231,7 +254,6 @@ mod tests {
use http::Uri;

use super::*;
use crate::raw::*;

#[test]
fn test_presigned_request_convert() -> Result<()> {
Expand Down
11 changes: 9 additions & 2 deletions core/src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,16 @@ impl Accessor for AzblobBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
Ok((RpRead::new().with_size(size), resp.into_body()))
let range = parse_content_range(resp.headers())?;
Ok((
RpRead::new().with_size(size).with_range(range),
resp.into_body(),
))
}
StatusCode::RANGE_NOT_SATISFIABLE => {
resp.into_body().consume().await?;
Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty()))
}
StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/services/azdls/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,16 @@ impl Accessor for AzdlsBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
Ok((RpRead::new().with_size(size), resp.into_body()))
let range = parse_content_range(resp.headers())?;
Ok((
RpRead::new().with_size(size).with_range(range),
resp.into_body(),
))
}
StatusCode::RANGE_NOT_SATISFIABLE => {
resp.into_body().consume().await?;
Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty()))
}
StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/services/azfile/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,16 @@ impl Accessor for AzfileBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
Ok((RpRead::new().with_size(size), resp.into_body()))
let range = parse_content_range(resp.headers())?;
Ok((
RpRead::new().with_size(size).with_range(range),
resp.into_body(),
))
}
StatusCode::RANGE_NOT_SATISFIABLE => {
resp.into_body().consume().await?;
Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty()))
}
StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/services/b2/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,16 @@ impl Accessor for B2Backend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
Ok((RpRead::new().with_size(size), resp.into_body()))
let range = parse_content_range(resp.headers())?;
Ok((
RpRead::new().with_size(size).with_range(range),
resp.into_body(),
))
}
StatusCode::RANGE_NOT_SATISFIABLE => {
resp.into_body().consume().await?;
Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty()))
}
StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/services/cos/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,16 @@ impl Accessor for CosBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
Ok((RpRead::new().with_size(size), resp.into_body()))
let range = parse_content_range(resp.headers())?;
Ok((
RpRead::new().with_size(size).with_range(range),
resp.into_body(),
))
}
StatusCode::RANGE_NOT_SATISFIABLE => {
resp.into_body().consume().await?;
Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty()))
}
StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/services/dropbox/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ impl Accessor for DropboxBackend {
let status = resp.status();
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())),
StatusCode::RANGE_NOT_SATISFIABLE => {
resp.into_body().consume().await?;
Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty()))
}
_ => Err(parse_error(resp).await?),
}
}
Expand Down
6 changes: 5 additions & 1 deletion core/src/services/gdrive/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ impl Accessor for GdriveBackend {
match status {
StatusCode::OK => {
let size = parse_content_length(resp.headers())?;
Ok((RpRead::new().with_size(size), resp.into_body()))
let range = parse_content_range(resp.headers())?;
Ok((
RpRead::new().with_size(size).with_range(range),
resp.into_body(),
))
}
_ => Err(parse_error(resp).await?),
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/services/ghac/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,16 @@ impl Accessor for GhacBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
Ok((RpRead::new().with_size(size), resp.into_body()))
let range = parse_content_range(resp.headers())?;
Ok((
RpRead::new().with_size(size).with_range(range),
resp.into_body(),
))
}
StatusCode::RANGE_NOT_SATISFIABLE => {
resp.into_body().consume().await?;
Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty()))
}
StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/services/http/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,16 @@ impl Accessor for HttpBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
Ok((RpRead::new().with_size(size), resp.into_body()))
let range = parse_content_range(resp.headers())?;
Ok((
RpRead::new().with_size(size).with_range(range),
resp.into_body(),
))
}
StatusCode::RANGE_NOT_SATISFIABLE => {
resp.into_body().consume().await?;
Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty()))
}
StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
Expand Down
Loading

0 comments on commit 535e120

Please sign in to comment.