diff --git a/benches/io/blocking_seek.rs b/benches/io/blocking_seek.rs index b8ec9b4b8a9..6c31e317658 100644 --- a/benches/io/blocking_seek.rs +++ b/benches/io/blocking_seek.rs @@ -17,39 +17,82 @@ use std::io::{Seek, SeekFrom}; use opendal::{ layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer}, - Operator, + raw::output::BlockingRead, + services::fs, + Operator, OperatorNext, }; use rand::prelude::*; +// pub fn bench(c: &mut Criterion) { +// let mut group = c.benchmark_group("blocking_seek"); +// let size = 64 * 1024 * 1024; + +// let op = Operator::from_iter( +// opendal::Scheme::Fs, +// vec![("root".to_string(), "/tmp".to_string())].into_iter(), +// ) +// .unwrap(); + +// op.object("test_file") +// .blocking_write(vec![0; size]) +// .unwrap(); + +// let op = op +// .layer(LoggingLayer::default()) +// .layer(TracingLayer) +// .layer(MetricsLayer) +// .layer(RetryLayer::new(backon::ExponentialBackoff::default())); + +// let mut rng = rand::thread_rng(); + +// let o = op.object("test_file"); +// let mut r = o.blocking_reader().unwrap(); + +// group.bench_function("seek", |b| { +// b.iter(|| { +// let off = rng.gen_range(0..size as u64); +// r.seek(SeekFrom::Start(off)).unwrap(); +// }) +// }); +// } + pub fn bench(c: &mut Criterion) { let mut group = c.benchmark_group("blocking_seek"); let size = 64 * 1024 * 1024; - let op = Operator::from_iter( - opendal::Scheme::Fs, - vec![("root".to_string(), "/tmp".to_string())].into_iter(), - ) - .unwrap(); + let op = OperatorNext { + accessor: { + let mut builder = fs::Builder::default(); + builder.root("/tmp"); + builder.build_next().unwrap() + }, + } + .layer(MetricsLayer); + + // let op = Operator::from_iter( + // opendal::Scheme::Fs, + // vec![("root".to_string(), "/tmp".to_string())].into_iter(), + // ) + // .unwrap(); - op.object("test_file") - .blocking_write(vec![0; size]) - .unwrap(); + // op.object("test_file") + // .blocking_write(vec![0; size]) + // .unwrap(); - let op = op - .layer(LoggingLayer::default()) - .layer(TracingLayer) - .layer(MetricsLayer) - .layer(RetryLayer::new(backon::ExponentialBackoff::default())); + // let op = op + // .layer(LoggingLayer::default()) + // .layer(TracingLayer) + // .layer(MetricsLayer) + // .layer(RetryLayer::new(backon::ExponentialBackoff::default())); let mut rng = rand::thread_rng(); - let o = op.object("test_file"); - let mut r = o.blocking_reader().unwrap(); + let mut r = op.blocking_reader("test_file").unwrap(); group.bench_function("seek", |b| { b.iter(|| { let off = rng.gen_range(0..size as u64); - r.seek(SeekFrom::Start(off)).unwrap(); + std::io::Seek::seek(&mut r, SeekFrom::Start(off)).unwrap(); }) }); } diff --git a/src/layers/metrics.rs b/src/layers/metrics.rs index 31d898f742f..033553ab130 100644 --- a/src/layers/metrics.rs +++ b/src/layers/metrics.rs @@ -965,3 +965,45 @@ impl Drop for MetricReader { } } } + +impl, IR: output::BlockingRead> LayerNext for MetricsLayer { + type OR = MetricReaderNext; + type OA = MetricsAccessorNext; + + fn layer(self, inner: A) -> Self::OA { + MetricsAccessorNext { inner } + } +} + +#[derive(Clone)] +pub struct MetricsAccessorNext, OR: output::BlockingRead> { + inner: A, +} + +impl, OR: output::BlockingRead> Debug for MetricsAccessorNext { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "MetricsAccessorNext") + } +} + +impl, OR: output::BlockingRead> AccessorNext + for MetricsAccessorNext +{ + type OR = MetricReaderNext; + + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::OR)> { + let (rp, r) = self.inner.blocking_read(path, args)?; + Ok((rp, MetricReaderNext { inner: r })) + } +} + +pub struct MetricReaderNext { + inner: R, +} + +impl output::BlockingRead for MetricReaderNext { + #[inline] + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + self.inner.seek(pos) + } +} diff --git a/src/lib.rs b/src/lib.rs index 6a023abe59c..589756e6662 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -153,6 +153,7 @@ mod operator; pub use operator::BatchOperator; pub use operator::Operator; pub use operator::OperatorMetadata; +pub use raw::OperatorNext; mod object; pub use object::Object; diff --git a/src/object/blocking_reader.rs b/src/object/blocking_reader.rs index c55363e62c5..a434f3d600e 100644 --- a/src/object/blocking_reader.rs +++ b/src/object/blocking_reader.rs @@ -28,7 +28,7 @@ use crate::OpRead; /// BlockingObjectReader is the public API for users. pub struct BlockingObjectReader { - inner: output::BlockingReader, + pub(crate) inner: output::BlockingReader, } impl BlockingObjectReader { diff --git a/src/raw/accessor.rs b/src/raw/accessor.rs index a876db556b8..2626bccf40b 100644 --- a/src/raw/accessor.rs +++ b/src/raw/accessor.rs @@ -13,12 +13,14 @@ // limitations under the License. use std::fmt::Debug; +use std::marker::PhantomData; use std::sync::Arc; use async_trait::async_trait; use flagset::flags; use flagset::FlagSet; +use crate::object::BlockingObjectReader; use crate::raw::*; use crate::*; @@ -584,3 +586,54 @@ flags! { ReadIsStreamable, } } + +#[derive(Clone, Debug)] +pub struct OperatorNext, OR: output::BlockingRead> { + pub accessor: A, +} + +impl OperatorNext +where + A: AccessorNext, + OR: output::BlockingRead, +{ + pub fn layer>(self, layer: L) -> OperatorNext { + OperatorNext { + accessor: layer.layer(self.accessor), + } + } + + // pub fn blocking_reader(&self, path: &str) -> Result { + // let (_, r) = self.accessor.blocking_read(path, OpRead::default())?; + // Ok(r) + // } + + pub fn blocking_reader(&self, path: &str) -> Result { + let (_, r) = self.accessor.blocking_read(path, OpRead::default())?; + Ok(BlockingObjectReader { inner: Box::new(r) }) + } +} + +#[async_trait] +pub trait AccessorNext: Send + Sync + Debug + 'static { + type OR: output::BlockingRead; + + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::OR)> { + unimplemented!() + } + + fn boxed(self) -> Box> + where + Self: Sized, + { + Box::new(self) + } +} + +pub trait LayerNext, IR: output::BlockingRead> { + type OR: output::BlockingRead; + type OA: AccessorNext; + + /// Intercept the operations on the underlying storage. + fn layer(self, inner: A) -> Self::OA; +} diff --git a/src/raw/io/output/blocking_read.rs b/src/raw/io/output/blocking_read.rs index 34e81c9440a..c48cba9b1c9 100644 --- a/src/raw/io/output/blocking_read.rs +++ b/src/raw/io/output/blocking_read.rs @@ -32,7 +32,7 @@ pub type BlockingReader = Box; /// /// `Read` is required to be implemented, `Seek` and `Iterator` /// is optional. We use `Read` to make users life easier. -pub trait BlockingRead: Send + Sync { +pub trait BlockingRead: Send + Sync + 'static { /// Return the inner output bytes reader if there is one. #[inline] fn inner(&mut self) -> Option<&mut BlockingReader> { diff --git a/src/raw/io/output/into_blocking_reader/from_fd.rs b/src/raw/io/output/into_blocking_reader/from_fd.rs index 6a7e07cee26..5ef53dc2a1c 100644 --- a/src/raw/io/output/into_blocking_reader/from_fd.rs +++ b/src/raw/io/output/into_blocking_reader/from_fd.rs @@ -57,7 +57,7 @@ where impl output::BlockingRead for FdReader where - R: Read + Seek + Send + Sync, + R: Read + Seek + Send + Sync + 'static, { fn read(&mut self, buf: &mut [u8]) -> Result { if self.current_size() <= 0 { diff --git a/src/raw/io/output/into_blocking_reader/mod.rs b/src/raw/io/output/into_blocking_reader/mod.rs index 25cefeea99f..e1b27b93a03 100644 --- a/src/raw/io/output/into_blocking_reader/mod.rs +++ b/src/raw/io/output/into_blocking_reader/mod.rs @@ -20,3 +20,4 @@ pub use as_iterable::as_iterable; mod from_fd; pub use from_fd::from_fd; +pub use from_fd::FdReader; diff --git a/src/raw/mod.rs b/src/raw/mod.rs index 6e21b8e13e4..a5913a95e2d 100644 --- a/src/raw/mod.rs +++ b/src/raw/mod.rs @@ -28,6 +28,9 @@ pub use accessor::Accessor; pub use accessor::AccessorCapability; pub use accessor::AccessorHint; pub use accessor::AccessorMetadata; +pub use accessor::AccessorNext; +pub use accessor::LayerNext; +pub use accessor::OperatorNext; mod io; pub use io::*; diff --git a/src/services/fs/backend.rs b/src/services/fs/backend.rs index cd3154f7c48..54e88a80a09 100644 --- a/src/services/fs/backend.rs +++ b/src/services/fs/backend.rs @@ -89,6 +89,46 @@ impl Builder { self } + pub fn build_next(&mut self) -> Result { + debug!("backend build started: {:?}", &self); + + let root = normalize_root(&self.root.take().unwrap_or_default()); + let atomic_write_dir = self.atomic_write_dir.as_deref().map(normalize_root); + + // If root dir is not exist, we must create it. + if let Err(e) = std::fs::metadata(&root) { + if e.kind() == io::ErrorKind::NotFound { + std::fs::create_dir_all(&root).map_err(|e| { + Error::new(ErrorKind::Unexpected, "create root dir failed") + .with_operation("Builder::build") + .with_context("root", &root) + .set_source(e) + })?; + } + } + + // If atomic write dir is not exist, we must create it. + if let Some(d) = &atomic_write_dir { + if let Err(e) = std::fs::metadata(d) { + if e.kind() == io::ErrorKind::NotFound { + std::fs::create_dir_all(d).map_err(|e| { + Error::new(ErrorKind::Unexpected, "create atomic write dir failed") + .with_operation("Builder::build") + .with_context("atomic_write_dir", d) + .set_source(e) + })?; + } + } + } + + debug!("backend build finished: {:?}", &self); + Ok(Backend { + root, + atomic_write_dir, + enable_path_check: self.enable_path_check, + }) + } + /// Consume current builder to build a fs backend. pub fn build(&mut self) -> Result { debug!("backend build started: {:?}", &self); @@ -645,6 +685,71 @@ impl Accessor for Backend { } } +#[async_trait] +impl AccessorNext for Backend { + type OR = output::into_blocking_reader::FdReader; + + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::OR)> { + use output::BlockingRead; + + let p = build_rooted_abs_path(&self.root, path); + + let mut f = std::fs::OpenOptions::new() + .read(true) + .open(p) + .map_err(parse_io_error)?; + + let total_length = if self.enable_path_check { + // Get fs metadata of file at given path, ensuring it is not a false-positive due to slash normalization. + let meta = f.metadata().map_err(parse_io_error)?; + if meta.is_dir() != path.ends_with('/') { + return Err(Error::new( + ErrorKind::ObjectNotFound, + "file mode is not match with its path", + )); + } + if meta.is_dir() { + return Err(Error::new( + ErrorKind::ObjectIsADirectory, + "given path is a directoty", + )); + } + + meta.len() + } else { + use std::io::Seek; + + f.seek(SeekFrom::End(0)).map_err(parse_io_error)? + }; + + let br = args.range(); + let (start, end) = match (br.offset(), br.size()) { + // Read a specific range. + (Some(offset), Some(size)) => (offset, min(offset + size, total_length)), + // Read from offset. + (Some(offset), None) => (offset, total_length), + // Read the last size bytes. + (None, Some(size)) => ( + if total_length > size { + total_length - size + } else { + 0 + }, + total_length, + ), + // Read the whole file. + (None, None) => (0, total_length), + }; + + let mut r = output::into_blocking_reader::from_fd(f, start, end); + + // Rewind to make sure we are on the correct offset. + r.seek(SeekFrom::Start(0)).map_err(parse_io_error)?; + + Ok((RpRead::new(end - start), r)) + } +} + #[cfg(test)] mod tests { use super::*;