Skip to content

Commit

Permalink
feat: Add into_seekable_read_by_range support for blocking read (#2799)
Browse files Browse the repository at this point in the history
* polish

Signed-off-by: Xuanwo <[email protected]>

* feat: Add into_seekable_read_by_range support for blocking read

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Aug 7, 2023
1 parent d8563da commit fc19dad
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 30 deletions.
49 changes: 40 additions & 9 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,18 +218,46 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
let seekable = capability.read_can_seek;
let streamable = capability.read_can_next;

let range = args.range();
let (rp, r) = self.inner.blocking_read(path, args)?;
let content_length = rp.metadata().content_length();

match (seekable, streamable) {
(true, true) => Ok((rp, CompleteReader::AlreadyComplete(r))),
(true, false) => {
let r = oio::into_streamable_read(r, 256 * 1024);
Ok((rp, CompleteReader::NeedStreamable(r)))
}
(false, _) => Err(Error::new(
ErrorKind::Unsupported,
"non seekable blocking reader is not supported",
)),
_ => {
let (offset, size) = match (range.offset(), range.size()) {
(Some(offset), _) => (offset, content_length),
(None, None) => (0, content_length),
(None, Some(size)) => {
// TODO: we can read content range to calculate
// the total content length.
let om = self
.inner
.blocking_stat(path, OpStat::new())?
.into_metadata();
let total_size = om.content_length();
let (offset, size) = if size > total_size {
(0, total_size)
} else {
(total_size - size, size)
};

(offset, size)
}
};
let r = oio::into_seekable_read_by_range(self.inner.clone(), path, r, offset, size);

if streamable {
Ok((rp, CompleteReader::NeedSeekable(r)))
} else {
let r = oio::into_streamable_read(r, 256 * 1024);
Ok((rp, CompleteReader::NeedBoth(r)))
}
}
}
}

Expand Down Expand Up @@ -532,9 +560,9 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {

pub enum CompleteReader<A: Accessor, R> {
AlreadyComplete(R),
NeedSeekable(ByRangeSeekableReader<A>),
NeedSeekable(ByRangeSeekableReader<A, R>),
NeedStreamable(StreamableReader<R>),
NeedBoth(StreamableReader<ByRangeSeekableReader<A>>),
NeedBoth(StreamableReader<ByRangeSeekableReader<A, R>>),
}

impl<A, R> oio::Read for CompleteReader<A, R>
Expand Down Expand Up @@ -586,8 +614,9 @@ where

match self {
AlreadyComplete(r) => r.read(buf),
NeedSeekable(r) => r.read(buf),
NeedStreamable(r) => r.read(buf),
_ => unreachable!("not supported types of complete reader"),
NeedBoth(r) => r.read(buf),
}
}

Expand All @@ -596,8 +625,9 @@ where

match self {
AlreadyComplete(r) => r.seek(pos),
NeedSeekable(r) => r.seek(pos),
NeedStreamable(r) => r.seek(pos),
_ => unreachable!("not supported types of complete reader"),
NeedBoth(r) => r.seek(pos),
}
}

Expand All @@ -606,8 +636,9 @@ where

match self {
AlreadyComplete(r) => r.next(),
NeedSeekable(r) => r.next(),
NeedStreamable(r) => r.next(),
_ => unreachable!("not supported types of complete reader"),
NeedBoth(r) => r.next(),
}
}
}
Expand Down
170 changes: 149 additions & 21 deletions core/src/raw/oio/read/into_seekable_read_by_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use bytes::Bytes;
use std::future::Future;
use std::io::SeekFrom;
use std::pin::Pin;
Expand Down Expand Up @@ -42,13 +43,13 @@ use crate::*;
///
/// This operation is not zero cost. If the accessor already returns a
/// seekable reader, please don't use this.
pub fn into_seekable_read_by_range<A: Accessor>(
pub fn into_seekable_read_by_range<A: Accessor, R>(
acc: Arc<A>,
path: &str,
reader: A::Reader,
reader: R,
offset: u64,
size: u64,
) -> ByRangeSeekableReader<A> {
) -> ByRangeSeekableReader<A, R> {
ByRangeSeekableReader {
acc,
path: path.to_string(),
Expand All @@ -61,14 +62,14 @@ pub fn into_seekable_read_by_range<A: Accessor>(
}

/// ByRangeReader that can do seek on non-seekable reader.
pub struct ByRangeSeekableReader<A: Accessor> {
pub struct ByRangeSeekableReader<A: Accessor, R> {
acc: Arc<A>,
path: String,

offset: u64,
size: u64,
cur: u64,
state: State<A::Reader>,
state: State<R>,

/// Seek operation could return Pending which may lead
/// `SeekFrom::Current(off)` been input multiple times.
Expand All @@ -78,27 +79,19 @@ pub struct ByRangeSeekableReader<A: Accessor> {
last_seek_pos: Option<u64>,
}

enum State<R: oio::Read> {
enum State<R> {
Idle,
Sending(BoxFuture<'static, Result<(RpRead, R)>>),
Reading(R),
}

/// Safety: State will only be accessed under &mut.
unsafe impl<R: oio::Read> Sync for State<R> {}

impl<A: Accessor> ByRangeSeekableReader<A> {
fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, A::Reader)>> {
let acc = self.acc.clone();
let path = self.path.clone();
let op = OpRead::default().with_range(BytesRange::new(
Some(self.offset + self.cur),
Some(self.size - self.cur),
));

Box::pin(async move { acc.read(&path, op).await })
}
unsafe impl<R> Sync for State<R> {}

impl<A, R> ByRangeSeekableReader<A, R>
where
A: Accessor,
{
/// calculate the seek position.
///
/// This operation will not update the `self.cur`.
Expand Down Expand Up @@ -126,7 +119,45 @@ impl<A: Accessor> ByRangeSeekableReader<A> {
}
}

impl<A: Accessor> oio::Read for ByRangeSeekableReader<A> {
impl<A, R> ByRangeSeekableReader<A, R>
where
A: Accessor<Reader = R>,
R: oio::Read,
{
fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, R)>> {
let acc = self.acc.clone();
let path = self.path.clone();
let op = OpRead::default().with_range(BytesRange::new(
Some(self.offset + self.cur),
Some(self.size - self.cur),
));

Box::pin(async move { acc.read(&path, op).await })
}
}

impl<A, R> ByRangeSeekableReader<A, R>
where
A: Accessor<BlockingReader = R>,
R: oio::BlockingRead,
{
fn read_action(&self) -> Result<(RpRead, R)> {
let acc = self.acc.clone();
let path = self.path.clone();
let op = OpRead::default().with_range(BytesRange::new(
Some(self.offset + self.cur),
Some(self.size - self.cur),
));

acc.blocking_read(&path, op)
}
}

impl<A, R> oio::Read for ByRangeSeekableReader<A, R>
where
A: Accessor<Reader = R>,
R: oio::Read,
{
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
match &mut self.state {
State::Idle => {
Expand Down Expand Up @@ -202,7 +233,7 @@ impl<A: Accessor> oio::Read for ByRangeSeekableReader<A> {
}
}

fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<bytes::Bytes>>> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
match &mut self.state {
State::Idle => {
if self.cur >= self.size {
Expand Down Expand Up @@ -244,6 +275,103 @@ impl<A: Accessor> oio::Read for ByRangeSeekableReader<A> {
}
}

impl<A, R> oio::BlockingRead for ByRangeSeekableReader<A, R>
where
A: Accessor<BlockingReader = R>,
R: oio::BlockingRead,
{
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
match &mut self.state {
State::Idle => {
if self.cur >= self.size {
return Ok(0);
}

let (_, r) = self.read_action()?;
self.state = State::Reading(r);
self.read(buf)
}
State::Reading(r) => {
match r.read(buf) {
Ok(n) if n == 0 => {
// Reset state to Idle after all data has been consumed.
self.state = State::Idle;
Ok(0)
}
Ok(n) => {
self.cur += n as u64;
Ok(n)
}
Err(e) => {
self.state = State::Idle;
Err(e)
}
}
}
State::Sending(_) => {
unreachable!("It's invalid to go into State::Sending for BlockingRead, please report this bug")
}
}
}

fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
let seek_pos = self.seek_pos(pos)?;

match &mut self.state {
State::Idle => {
self.cur = seek_pos;
Ok(self.cur)
}
State::Reading(_) => {
if seek_pos == self.cur {
return Ok(self.cur);
}

self.state = State::Idle;
self.cur = seek_pos;
Ok(self.cur)
}
State::Sending(_) => {
unreachable!("It's invalid to go into State::Sending for BlockingRead, please report this bug")
}
}
}

fn next(&mut self) -> Option<Result<Bytes>> {
match &mut self.state {
State::Idle => {
if self.cur >= self.size {
return None;
}

let r = match self.read_action() {
Ok((_, r)) => r,
Err(err) => return Some(Err(err)),
};
self.state = State::Reading(r);
self.next()
}
State::Reading(r) => match r.next() {
Some(Ok(bs)) => {
self.cur += bs.len() as u64;
Some(Ok(bs))
}
Some(Err(err)) => {
self.state = State::Idle;
Some(Err(err))
}
None => {
self.state = State::Idle;
None
}
},
State::Sending(_) => {
unreachable!("It's invalid to go into State::Sending for BlockingRead, please report this bug")
}
}
}
}

#[cfg(test)]
mod tests {
use std::io::SeekFrom;
Expand Down

0 comments on commit fc19dad

Please sign in to comment.