Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add content range support for RpRead #3777

Merged
merged 1 commit into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 62 additions & 30 deletions core/src/raw/oio/read/range_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,14 @@ where
}
}

/// Fill current reader's range by total_size.
fn fill_range(&mut self, total_size: u64) -> Result<()> {
/// Ensure current reader's offset is valid via total_size.
fn ensure_offset(&mut self, total_size: u64) -> Result<()> {
(self.offset, self.size) = match (self.offset, self.size) {
(None, Some(size)) => {
if size > total_size {
// If returns an error, we should reset
// state to Idle so that we can retry it.
self.state = State::Idle;
return Err(Error::new(
ErrorKind::InvalidInput,
"read to a negative or overflowing position is invalid",
Expand All @@ -125,6 +128,48 @@ where
Ok(())
}

/// Ensure size will use the information returned by RpRead to calculate the correct size for reader.
///
/// - If `RpRead` returns `range`, we can calculate the correct size by `range.size()`.
/// - If `RpRead` returns `size`, we can use it's as the returning body's size.
fn ensure_size(&mut self, total_size: Option<u64>, content_size: Option<u64>) {
if let Some(total_size) = total_size {
// It's valid for reader to seek to a position that out of the content length.
// We should return `Ok(0)` instead of an error at this case to align fs behavior.
let size = total_size
.checked_sub(self.offset.expect("reader offset must be valid"))
.unwrap_or_default();

// Ensure size when:
//
// - reader's size is unknown.
// - reader's size is larger than file's size.
if self.size.is_none() || Some(size) < self.size {
self.size = Some(size);
return;
}
}

if let Some(content_size) = content_size {
if content_size == 0 {
// Skip size set if content size is 0 since it could be invalid.
//
// For example, users seek to `u64::MAX` and calling read.
return;
}

let calculated_size = content_size + self.cur;

// Ensure size when:
//
// - reader's size is unknown.
// - reader's size is larger than file's size.
if self.size.is_none() || Some(calculated_size) < self.size {
self.size = Some(calculated_size);
}
}
}

/// Calculate the current range, maybe sent as next read request.
///
/// # Panics
Expand Down Expand Up @@ -256,12 +301,7 @@ where
})?;

let length = rp.into_metadata().content_length();
self.fill_range(length).map_err(|err| {
// If stat future returns an error, we should reset
// state to Idle so that we can retry it.
self.state = State::Idle;
err
})?;
self.ensure_offset(length)?;

self.state = State::Idle;
self.poll_read(cx, buf)
Expand All @@ -274,12 +314,8 @@ where
err
})?;

// Set size if read returns size hint.
if let Some(size) = rp.size() {
if size != 0 && self.size.is_none() {
self.size = Some(size + self.cur);
}
}
self.ensure_size(rp.range().unwrap_or_default().size(), rp.size());

self.state = State::Read(r);
self.poll_read(cx, buf)
}
Expand Down Expand Up @@ -339,7 +375,7 @@ where
})?;

let length = rp.into_metadata().content_length();
self.fill_range(length)?;
self.ensure_offset(length)?;

self.state = State::Idle;
self.poll_seek(cx, pos)
Expand Down Expand Up @@ -391,7 +427,7 @@ where
})?;

let length = rp.into_metadata().content_length();
self.fill_range(length)?;
self.ensure_offset(length)?;

self.state = State::Idle;
self.poll_next(cx)
Expand All @@ -405,11 +441,8 @@ where
})?;

// Set size if read returns size hint.
if let Some(size) = rp.size() {
if size != 0 && self.size.is_none() {
self.size = Some(size + self.cur);
}
}
self.ensure_size(rp.range().unwrap_or_default().size(), rp.size());

self.state = State::Read(r);
self.poll_next(cx)
}
Expand Down Expand Up @@ -450,17 +483,13 @@ where
let rp = self.stat_action()?;

let length = rp.into_metadata().content_length();
self.fill_range(length)?;
self.ensure_offset(length)?;
}

let (rp, r) = self.read_action()?;

// Set size if read returns size hint.
if let Some(size) = rp.size() {
if size != 0 && self.size.is_none() {
self.size = Some(size + self.cur);
}
}
self.ensure_size(rp.range().unwrap_or_default().size(), rp.size());

self.state = State::Read(r);
self.read(buf)
Expand Down Expand Up @@ -502,7 +531,7 @@ where
} else {
let rp = self.stat_action()?;
let length = rp.into_metadata().content_length();
self.fill_range(length)?;
self.ensure_offset(length)?;

let size = self.size.expect("size must be valid after fill_range");
(size as i64, n)
Expand Down Expand Up @@ -561,13 +590,16 @@ where
};

let length = rp.into_metadata().content_length();
if let Err(err) = self.fill_range(length) {
if let Err(err) = self.ensure_offset(length) {
return Some(Err(err));
}
}

let r = match self.read_action() {
Ok((_, r)) => r,
Ok((rp, r)) => {
self.ensure_size(rp.range().unwrap_or_default().size(), rp.size());
r
}
Err(err) => return Some(Err(err)),
};
self.state = State::Read(r);
Expand Down
24 changes: 23 additions & 1 deletion core/src/raw/rps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use http::Request;

use crate::raw::*;
use crate::*;

/// Reply for `create_dir` operation
Expand Down Expand Up @@ -107,6 +108,14 @@ pub struct RpRead {
/// It's ok to leave size as empty, but it's recommended to set size if possible. We will use
/// this size as hint to do some optimization like avoid an extra stat or read.
size: Option<u64>,
/// Range is the range of the reader returned by this read operation.
///
/// - `Some(range)` means the reader's content range inside the whole file.
/// - `None` means the reader's content range is unknown.
///
/// It's ok to leave range as empty, but it's recommended to set range if possible. We will use
/// this range as hint to do some optimization like avoid an extra stat or read.
range: Option<BytesContentRange>,
}

impl RpRead {
Expand All @@ -128,6 +137,20 @@ impl RpRead {
self.size = size;
self
}

/// Got the range of the reader returned by this read operation.
///
/// - `Some(range)` means the reader has content range inside the whole file.
/// - `None` means the reader has unknown size.
pub fn range(&self) -> Option<BytesContentRange> {
self.range
}

/// Set the range of the reader returned by this read operation.
pub fn with_range(mut self, range: Option<BytesContentRange>) -> Self {
self.range = range;
self
}
}

/// Reply for `batch` operation.
Expand Down Expand Up @@ -231,7 +254,6 @@ mod tests {
use http::Uri;

use super::*;
use crate::raw::*;

#[test]
fn test_presigned_request_convert() -> Result<()> {
Expand Down
11 changes: 9 additions & 2 deletions core/src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,16 @@ impl Accessor for AzblobBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
Ok((RpRead::new().with_size(size), resp.into_body()))
let range = parse_content_range(resp.headers())?;
Ok((
RpRead::new().with_size(size).with_range(range),
resp.into_body(),
))
}
StatusCode::RANGE_NOT_SATISFIABLE => {
resp.into_body().consume().await?;
Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty()))
}
StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/services/azdls/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,16 @@ impl Accessor for AzdlsBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
Ok((RpRead::new().with_size(size), resp.into_body()))
let range = parse_content_range(resp.headers())?;
Ok((
RpRead::new().with_size(size).with_range(range),
resp.into_body(),
))
}
StatusCode::RANGE_NOT_SATISFIABLE => {
resp.into_body().consume().await?;
Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty()))
}
StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/services/azfile/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,16 @@ impl Accessor for AzfileBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
Ok((RpRead::new().with_size(size), resp.into_body()))
let range = parse_content_range(resp.headers())?;
Ok((
RpRead::new().with_size(size).with_range(range),
resp.into_body(),
))
}
StatusCode::RANGE_NOT_SATISFIABLE => {
resp.into_body().consume().await?;
Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty()))
}
StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/services/b2/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,16 @@ impl Accessor for B2Backend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
Ok((RpRead::new().with_size(size), resp.into_body()))
let range = parse_content_range(resp.headers())?;
Ok((
RpRead::new().with_size(size).with_range(range),
resp.into_body(),
))
}
StatusCode::RANGE_NOT_SATISFIABLE => {
resp.into_body().consume().await?;
Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty()))
}
StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/services/cos/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,16 @@ impl Accessor for CosBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
Ok((RpRead::new().with_size(size), resp.into_body()))
let range = parse_content_range(resp.headers())?;
Ok((
RpRead::new().with_size(size).with_range(range),
resp.into_body(),
))
}
StatusCode::RANGE_NOT_SATISFIABLE => {
resp.into_body().consume().await?;
Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty()))
}
StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/services/dropbox/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ impl Accessor for DropboxBackend {
let status = resp.status();
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())),
StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())),
StatusCode::RANGE_NOT_SATISFIABLE => {
resp.into_body().consume().await?;
Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty()))
}
_ => Err(parse_error(resp).await?),
}
}
Expand Down
6 changes: 5 additions & 1 deletion core/src/services/gdrive/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ impl Accessor for GdriveBackend {
match status {
StatusCode::OK => {
let size = parse_content_length(resp.headers())?;
Ok((RpRead::new().with_size(size), resp.into_body()))
let range = parse_content_range(resp.headers())?;
Ok((
RpRead::new().with_size(size).with_range(range),
resp.into_body(),
))
}
_ => Err(parse_error(resp).await?),
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/services/ghac/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,16 @@ impl Accessor for GhacBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
Ok((RpRead::new().with_size(size), resp.into_body()))
let range = parse_content_range(resp.headers())?;
Ok((
RpRead::new().with_size(size).with_range(range),
resp.into_body(),
))
}
StatusCode::RANGE_NOT_SATISFIABLE => {
resp.into_body().consume().await?;
Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty()))
}
StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/services/http/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,16 @@ impl Accessor for HttpBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
Ok((RpRead::new().with_size(size), resp.into_body()))
let range = parse_content_range(resp.headers())?;
Ok((
RpRead::new().with_size(size).with_range(range),
resp.into_body(),
))
}
StatusCode::RANGE_NOT_SATISFIABLE => {
resp.into_body().consume().await?;
Ok((RpRead::new().with_size(Some(0)), IncomingAsyncBody::empty()))
}
StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
Expand Down
Loading
Loading