Skip to content

Commit

Permalink
feat: Reduce stat operation if we are reading all (apache#5146)
Browse files Browse the repository at this point in the history
* feat: Reduce stat request if we are reading to end

Signed-off-by: Xuanwo <[email protected]>

* Fix sftp read

Signed-off-by: Xuanwo <[email protected]>

* Try fix sftp read

Signed-off-by: Xuanwo <[email protected]>

* Fix typo

Signed-off-by: Xuanwo <[email protected]>

* Should be split off

Signed-off-by: Xuanwo <[email protected]>

* fix sftp read

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Sep 28, 2024
1 parent 43e4d7d commit 860f5b2
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 162 deletions.
2 changes: 1 addition & 1 deletion core/src/services/sftp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ impl Access for SftpBackend {

Ok((
RpRead::default(),
SftpReader::new(client, f, args.range().size().unwrap_or(u64::MAX) as _),
SftpReader::new(client, f, args.range().size()),
))
}

Expand Down
49 changes: 14 additions & 35 deletions core/src/services/sftp/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ pub struct SftpReader {

file: File,
chunk: usize,
size: usize,
size: Option<usize>,
read: usize,
buf: BytesMut,
}

impl SftpReader {
pub fn new(conn: PooledConnection<'static, Manager>, file: File, size: usize) -> Self {
pub fn new(conn: PooledConnection<'static, Manager>, file: File, size: Option<u64>) -> Self {
Self {
_conn: conn,
file,
size,
size: size.map(|v| v as usize),
chunk: 2 * 1024 * 1024,
read: 0,
buf: BytesMut::new(),
Expand All @@ -50,50 +50,29 @@ impl SftpReader {

impl oio::Read for SftpReader {
async fn read(&mut self) -> Result<Buffer> {
// let client = self.inner.connect().await?;
//
// let mut fs = client.fs();
// fs.set_cwd(&self.root);
//
// let path = fs
// .canonicalize(&self.path)
// .await
// .map_err(parse_sftp_error)?;
//
// let mut f = client
// .open(path.as_path())
// .await
// .map_err(parse_sftp_error)?;

// f.seek(SeekFrom::Start(offset))
// .await
// .map_err(new_std_io_error)?;

// let mut size = size;
// if size == 0 {
// return Ok(Buffer::new());
// }

if self.read >= self.size {
if self.read >= self.size.unwrap_or(usize::MAX) {
return Ok(Buffer::new());
}

let size = (self.size - self.read).min(self.chunk);
let size = if let Some(size) = self.size {
(size - self.read).min(self.chunk)
} else {
self.chunk
};
self.buf.reserve(size);

let Some(bytes) = self
.file
.read(size as u32, self.buf.split_off(size))
.read(size as u32, self.buf.split_off(0))
.await
.map_err(parse_sftp_error)?
else {
return Err(Error::new(
ErrorKind::RangeNotSatisfied,
"sftp read file reaching EoF",
));
return Ok(Buffer::new());
};

self.read += bytes.len();
Ok(Buffer::from(bytes.freeze()))
self.buf = bytes;
let bs = self.buf.split();
Ok(Buffer::from(bs.freeze()))
}
}
10 changes: 5 additions & 5 deletions core/src/types/blocking_read/buffer_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::Range;
use std::ops::RangeBounds;
use std::sync::Arc;

use crate::raw::*;
Expand All @@ -30,8 +30,8 @@ struct IteratingReader {
impl IteratingReader {
/// Create a new iterating reader.
#[inline]
fn new(ctx: Arc<ReadContext>, range: Range<u64>) -> Self {
let generator = ReadGenerator::new(ctx.clone(), range);
fn new(ctx: Arc<ReadContext>, range: BytesRange) -> Self {
let generator = ReadGenerator::new(ctx.clone(), range.offset(), range.size());
Self {
generator,
reader: None,
Expand Down Expand Up @@ -73,9 +73,9 @@ pub struct BufferIterator {
impl BufferIterator {
/// Create a new buffer iterator.
#[inline]
pub fn new(ctx: Arc<ReadContext>, range: Range<u64>) -> Self {
pub fn new(ctx: Arc<ReadContext>, range: impl RangeBounds<u64>) -> Self {
Self {
inner: IteratingReader::new(ctx, range),
inner: IteratingReader::new(ctx, range.into()),
}
}
}
Expand Down
139 changes: 100 additions & 39 deletions core/src/types/context/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::Range;
use std::ops::{Bound, Range, RangeBounds};
use std::sync::Arc;

use crate::raw::*;
Expand Down Expand Up @@ -68,6 +68,38 @@ impl ReadContext {
pub fn options(&self) -> &OpReader {
&self.options
}

/// Parse the range bounds into a range.
pub(crate) async fn parse_into_range(
&self,
range: impl RangeBounds<u64>,
) -> Result<Range<u64>> {
let start = match range.start_bound() {
Bound::Included(v) => *v,
Bound::Excluded(v) => v + 1,
Bound::Unbounded => 0,
};

let end = match range.end_bound() {
Bound::Included(v) => v + 1,
Bound::Excluded(v) => *v,
Bound::Unbounded => {
let mut op_stat = OpStat::new();

if let Some(v) = self.args().version() {
op_stat = op_stat.with_version(v);
}

self.accessor()
.stat(self.path(), op_stat)
.await?
.into_metadata()
.content_length()
}
};

Ok(start..end)
}
}

/// ReadGenerator is used to generate new readers.
Expand All @@ -83,62 +115,65 @@ pub struct ReadGenerator {
ctx: Arc<ReadContext>,

offset: u64,
end: u64,
size: Option<u64>,
}

impl ReadGenerator {
/// Create a new ReadGenerator.
#[inline]
pub fn new(ctx: Arc<ReadContext>, range: Range<u64>) -> Self {
Self {
ctx,
offset: range.start,
end: range.end,
pub fn new(ctx: Arc<ReadContext>, offset: u64, size: Option<u64>) -> Self {
Self { ctx, offset, size }
}

/// Generate next range to read.
fn next_range(&mut self) -> Option<BytesRange> {
if self.size == Some(0) {
return None;
}

let next_offset = self.offset;
let next_size = match self.size {
// Given size is None, read all data.
None => {
// Update size to Some(0) to indicate that there is no more data to read.
self.size = Some(0);
None
}
Some(remaining) => {
// If chunk is set, read data in chunks.
let read_size = self
.ctx
.options
.chunk()
.map_or(remaining, |chunk| remaining.min(chunk as u64));
// Update (offset, size) before building future.
self.offset += read_size;
self.size = Some(remaining - read_size);
Some(read_size)
}
};

Some(BytesRange::new(next_offset, next_size))
}

/// Generate next reader.
pub async fn next_reader(&mut self) -> Result<Option<oio::Reader>> {
if self.offset >= self.end {
let Some(range) = self.next_range() else {
return Ok(None);
}
};

let offset = self.offset;
let mut size = (self.end - self.offset) as usize;
if let Some(chunk) = self.ctx.options.chunk() {
size = size.min(chunk)
}

// Update self.offset before building future.
self.offset += size as u64;
let args = self
.ctx
.args
.clone()
.with_range(BytesRange::new(offset, Some(size as u64)));
let args = self.ctx.args.clone().with_range(range);
let (_, r) = self.ctx.acc.read(&self.ctx.path, args).await?;
Ok(Some(r))
}

/// Generate next blocking reader.
pub fn next_blocking_reader(&mut self) -> Result<Option<oio::BlockingReader>> {
if self.offset >= self.end {
let Some(range) = self.next_range() else {
return Ok(None);
}
};

let offset = self.offset;
let mut size = (self.end - self.offset) as usize;
if let Some(chunk) = self.ctx.options.chunk() {
size = size.min(chunk)
}

// Update self.offset before building future.
self.offset += size as u64;
let args = self
.ctx
.args
.clone()
.with_range(BytesRange::new(offset, Some(size as u64)));
let args = self.ctx.args.clone().with_range(range);
let (_, r) = self.ctx.acc.blocking_read(&self.ctx.path, args)?;
Ok(Some(r))
}
Expand Down Expand Up @@ -167,7 +202,7 @@ mod tests {
OpRead::new(),
OpReader::new().with_chunk(3),
));
let mut generator = ReadGenerator::new(ctx, 0..10);
let mut generator = ReadGenerator::new(ctx, 0, Some(10));
let mut readers = vec![];
while let Some(r) = generator.next_reader().await? {
readers.push(r);
Expand All @@ -177,6 +212,32 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_next_reader_without_size() -> Result<()> {
let op = Operator::via_iter(Scheme::Memory, [])?;
op.write(
"test",
Buffer::from(vec![Bytes::from("Hello"), Bytes::from("World")]),
)
.await?;

let acc = op.into_inner();
let ctx = Arc::new(ReadContext::new(
acc,
"test".to_string(),
OpRead::new(),
OpReader::new().with_chunk(3),
));
let mut generator = ReadGenerator::new(ctx, 0, None);
let mut readers = vec![];
while let Some(r) = generator.next_reader().await? {
readers.push(r);
}

pretty_assertions::assert_eq!(readers.len(), 1);
Ok(())
}

#[test]
fn test_next_blocking_reader() -> Result<()> {
let op = Operator::via_iter(Scheme::Memory, [])?;
Expand All @@ -192,7 +253,7 @@ mod tests {
OpRead::new(),
OpReader::new().with_chunk(3),
));
let mut generator = ReadGenerator::new(ctx, 0..10);
let mut generator = ReadGenerator::new(ctx, 0, Some(10));
let mut readers = vec![];
while let Some(r) = generator.next_blocking_reader()? {
readers.push(r);
Expand Down
Loading

0 comments on commit 860f5b2

Please sign in to comment.