Skip to content

Commit

Permalink
Try to introduce zero cost reader
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Jan 30, 2023
1 parent 2d3a1dc commit 1f04990
Show file tree
Hide file tree
Showing 10 changed files with 268 additions and 20 deletions.
77 changes: 60 additions & 17 deletions benches/io/blocking_seek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,82 @@ use std::io::{Seek, SeekFrom};

use opendal::{
layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer},
Operator,
raw::output::BlockingRead,
services::fs,
Operator, OperatorNext,
};
use rand::prelude::*;

// pub fn bench(c: &mut Criterion) {
// let mut group = c.benchmark_group("blocking_seek");
// let size = 64 * 1024 * 1024;

// let op = Operator::from_iter(
// opendal::Scheme::Fs,
// vec![("root".to_string(), "/tmp".to_string())].into_iter(),
// )
// .unwrap();

// op.object("test_file")
// .blocking_write(vec![0; size])
// .unwrap();

// let op = op
// .layer(LoggingLayer::default())
// .layer(TracingLayer)
// .layer(MetricsLayer)
// .layer(RetryLayer::new(backon::ExponentialBackoff::default()));

// let mut rng = rand::thread_rng();

// let o = op.object("test_file");
// let mut r = o.blocking_reader().unwrap();

// group.bench_function("seek", |b| {
// b.iter(|| {
// let off = rng.gen_range(0..size as u64);
// r.seek(SeekFrom::Start(off)).unwrap();
// })
// });
// }

pub fn bench(c: &mut Criterion) {
let mut group = c.benchmark_group("blocking_seek");
let size = 64 * 1024 * 1024;

let op = Operator::from_iter(
opendal::Scheme::Fs,
vec![("root".to_string(), "/tmp".to_string())].into_iter(),
)
.unwrap();
let op = OperatorNext {
accessor: {
let mut builder = fs::Builder::default();
builder.root("/tmp");
builder.build_next().unwrap()
},
}
.layer(MetricsLayer);

// let op = Operator::from_iter(
// opendal::Scheme::Fs,
// vec![("root".to_string(), "/tmp".to_string())].into_iter(),
// )
// .unwrap();

op.object("test_file")
.blocking_write(vec![0; size])
.unwrap();
// op.object("test_file")
// .blocking_write(vec![0; size])
// .unwrap();

let op = op
.layer(LoggingLayer::default())
.layer(TracingLayer)
.layer(MetricsLayer)
.layer(RetryLayer::new(backon::ExponentialBackoff::default()));
// let op = op
// .layer(LoggingLayer::default())
// .layer(TracingLayer)
// .layer(MetricsLayer)
// .layer(RetryLayer::new(backon::ExponentialBackoff::default()));

let mut rng = rand::thread_rng();

let o = op.object("test_file");
let mut r = o.blocking_reader().unwrap();
let mut r = op.blocking_reader("test_file").unwrap();

group.bench_function("seek", |b| {
b.iter(|| {
let off = rng.gen_range(0..size as u64);
r.seek(SeekFrom::Start(off)).unwrap();
std::io::Seek::seek(&mut r, SeekFrom::Start(off)).unwrap();
})
});
}
42 changes: 42 additions & 0 deletions src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -965,3 +965,45 @@ impl<R> Drop for MetricReader<R> {
}
}
}

impl<A: AccessorNext<OR = IR>, IR: output::BlockingRead> LayerNext<A, IR> for MetricsLayer {
type OR = MetricReaderNext<IR>;
type OA = MetricsAccessorNext<A, IR>;

fn layer(self, inner: A) -> Self::OA {
MetricsAccessorNext { inner }
}
}

#[derive(Clone)]
pub struct MetricsAccessorNext<A: AccessorNext<OR = OR>, OR: output::BlockingRead> {
inner: A,
}

impl<A: AccessorNext<OR = OR>, OR: output::BlockingRead> Debug for MetricsAccessorNext<A, OR> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "MetricsAccessorNext")
}
}

impl<A: AccessorNext<OR = OR>, OR: output::BlockingRead> AccessorNext
for MetricsAccessorNext<A, OR>
{
type OR = MetricReaderNext<OR>;

fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::OR)> {
let (rp, r) = self.inner.blocking_read(path, args)?;
Ok((rp, MetricReaderNext { inner: r }))
}
}

pub struct MetricReaderNext<R> {
inner: R,
}

impl<R: output::BlockingRead> output::BlockingRead for MetricReaderNext<R> {
#[inline]
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
self.inner.seek(pos)
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ mod operator;
pub use operator::BatchOperator;
pub use operator::Operator;
pub use operator::OperatorMetadata;
pub use raw::OperatorNext;

mod object;
pub use object::Object;
Expand Down
2 changes: 1 addition & 1 deletion src/object/blocking_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::OpRead;

/// BlockingObjectReader is the public API for users.
pub struct BlockingObjectReader {
inner: output::BlockingReader,
pub(crate) inner: output::BlockingReader,
}

impl BlockingObjectReader {
Expand Down
53 changes: 53 additions & 0 deletions src/raw/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
// limitations under the License.

use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;

use async_trait::async_trait;
use flagset::flags;
use flagset::FlagSet;

use crate::object::BlockingObjectReader;
use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -584,3 +586,54 @@ flags! {
ReadIsStreamable,
}
}

#[derive(Clone, Debug)]
pub struct OperatorNext<A: AccessorNext<OR = OR>, OR: output::BlockingRead> {
pub accessor: A,
}

impl<A, OR> OperatorNext<A, OR>
where
A: AccessorNext<OR = OR>,
OR: output::BlockingRead,
{
pub fn layer<L: LayerNext<A, OR>>(self, layer: L) -> OperatorNext<L::OA, L::OR> {
OperatorNext {
accessor: layer.layer(self.accessor),
}
}

// pub fn blocking_reader(&self, path: &str) -> Result<OR> {
// let (_, r) = self.accessor.blocking_read(path, OpRead::default())?;
// Ok(r)
// }

pub fn blocking_reader(&self, path: &str) -> Result<BlockingObjectReader> {
let (_, r) = self.accessor.blocking_read(path, OpRead::default())?;
Ok(BlockingObjectReader { inner: Box::new(r) })
}
}

#[async_trait]
pub trait AccessorNext: Send + Sync + Debug + 'static {
type OR: output::BlockingRead;

fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::OR)> {
unimplemented!()
}

fn boxed(self) -> Box<dyn AccessorNext<OR = Self::OR>>
where
Self: Sized,
{
Box::new(self)
}
}

pub trait LayerNext<A: AccessorNext<OR = IR>, IR: output::BlockingRead> {
type OR: output::BlockingRead;
type OA: AccessorNext<OR = Self::OR>;

/// Intercept the operations on the underlying storage.
fn layer(self, inner: A) -> Self::OA;
}
2 changes: 1 addition & 1 deletion src/raw/io/output/blocking_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub type BlockingReader = Box<dyn BlockingRead>;
///
/// `Read` is required to be implemented, `Seek` and `Iterator`
/// is optional. We use `Read` to make users life easier.
pub trait BlockingRead: Send + Sync {
pub trait BlockingRead: Send + Sync + 'static {
/// Return the inner output bytes reader if there is one.
#[inline]
fn inner(&mut self) -> Option<&mut BlockingReader> {
Expand Down
2 changes: 1 addition & 1 deletion src/raw/io/output/into_blocking_reader/from_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ where

impl<R> output::BlockingRead for FdReader<R>
where
R: Read + Seek + Send + Sync,
R: Read + Seek + Send + Sync + 'static,
{
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
if self.current_size() <= 0 {
Expand Down
1 change: 1 addition & 0 deletions src/raw/io/output/into_blocking_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ pub use as_iterable::as_iterable;

mod from_fd;
pub use from_fd::from_fd;
pub use from_fd::FdReader;
3 changes: 3 additions & 0 deletions src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub use accessor::Accessor;
pub use accessor::AccessorCapability;
pub use accessor::AccessorHint;
pub use accessor::AccessorMetadata;
pub use accessor::AccessorNext;
pub use accessor::LayerNext;
pub use accessor::OperatorNext;

mod io;
pub use io::*;
Expand Down
105 changes: 105 additions & 0 deletions src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,46 @@ impl Builder {
self
}

pub fn build_next(&mut self) -> Result<Backend> {
debug!("backend build started: {:?}", &self);

let root = normalize_root(&self.root.take().unwrap_or_default());
let atomic_write_dir = self.atomic_write_dir.as_deref().map(normalize_root);

// If root dir is not exist, we must create it.
if let Err(e) = std::fs::metadata(&root) {
if e.kind() == io::ErrorKind::NotFound {
std::fs::create_dir_all(&root).map_err(|e| {
Error::new(ErrorKind::Unexpected, "create root dir failed")
.with_operation("Builder::build")
.with_context("root", &root)
.set_source(e)
})?;
}
}

// If atomic write dir is not exist, we must create it.
if let Some(d) = &atomic_write_dir {
if let Err(e) = std::fs::metadata(d) {
if e.kind() == io::ErrorKind::NotFound {
std::fs::create_dir_all(d).map_err(|e| {
Error::new(ErrorKind::Unexpected, "create atomic write dir failed")
.with_operation("Builder::build")
.with_context("atomic_write_dir", d)
.set_source(e)
})?;
}
}
}

debug!("backend build finished: {:?}", &self);
Ok(Backend {
root,
atomic_write_dir,
enable_path_check: self.enable_path_check,
})
}

/// Consume current builder to build a fs backend.
pub fn build(&mut self) -> Result<impl Accessor> {
debug!("backend build started: {:?}", &self);
Expand Down Expand Up @@ -645,6 +685,71 @@ impl Accessor for Backend {
}
}

#[async_trait]
impl AccessorNext for Backend {
type OR = output::into_blocking_reader::FdReader<std::fs::File>;

fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::OR)> {
use output::BlockingRead;

let p = build_rooted_abs_path(&self.root, path);

let mut f = std::fs::OpenOptions::new()
.read(true)
.open(p)
.map_err(parse_io_error)?;

let total_length = if self.enable_path_check {
// Get fs metadata of file at given path, ensuring it is not a false-positive due to slash normalization.
let meta = f.metadata().map_err(parse_io_error)?;
if meta.is_dir() != path.ends_with('/') {
return Err(Error::new(
ErrorKind::ObjectNotFound,
"file mode is not match with its path",
));
}
if meta.is_dir() {
return Err(Error::new(
ErrorKind::ObjectIsADirectory,
"given path is a directoty",
));
}

meta.len()
} else {
use std::io::Seek;

f.seek(SeekFrom::End(0)).map_err(parse_io_error)?
};

let br = args.range();
let (start, end) = match (br.offset(), br.size()) {
// Read a specific range.
(Some(offset), Some(size)) => (offset, min(offset + size, total_length)),
// Read from offset.
(Some(offset), None) => (offset, total_length),
// Read the last size bytes.
(None, Some(size)) => (
if total_length > size {
total_length - size
} else {
0
},
total_length,
),
// Read the whole file.
(None, None) => (0, total_length),
};

let mut r = output::into_blocking_reader::from_fd(f, start, end);

// Rewind to make sure we are on the correct offset.
r.seek(SeekFrom::Start(0)).map_err(parse_io_error)?;

Ok((RpRead::new(end - start), r))
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

1 comment on commit 1f04990

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for opendal ready!

✅ Preview
https://opendal-lf46sb8dx-databend.vercel.app
https://opendal-git-type-exercise.vercel.app

Built with commit 1f04990.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.