diff --git a/bin/ofs/Cargo.lock b/bin/ofs/Cargo.lock index 591f06532f1..a3d40c91b62 100644 --- a/bin/ofs/Cargo.lock +++ b/bin/ofs/Cargo.lock @@ -114,9 +114,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "backon" -version = "0.4.1" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9" +checksum = "c491fa80d69c03084223a4e73c378dd9f9a1e612eb54051213f88b2d5249b458" dependencies = [ "fastrand", "futures-core", @@ -364,12 +364,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "1.9.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" -dependencies = [ - "instant", -] +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] name = "flagset" @@ -686,15 +683,6 @@ dependencies = [ "hashbrown", ] -[[package]] -name = "instant" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" -dependencies = [ - "cfg-if", -] - [[package]] name = "ipnet" version = "2.9.0" diff --git a/bin/oli/Cargo.lock b/bin/oli/Cargo.lock index a8f048c0cd0..f28a894261a 100644 --- a/bin/oli/Cargo.lock +++ b/bin/oli/Cargo.lock @@ -17,6 +17,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "aho-corasick" version = "1.1.2" @@ -365,11 +376,11 @@ dependencies = [ [[package]] name = "backon" -version = "0.4.1" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9" +checksum = "c491fa80d69c03084223a4e73c378dd9f9a1e612eb54051213f88b2d5249b458" dependencies = [ - "fastrand 1.9.0", + "fastrand 2.0.1", "futures-core", "pin-project", "tokio", @@ -457,6 +468,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + [[package]] name = "blocking" version = "1.5.1" @@ -550,6 +570,15 @@ dependencies = [ "serde_json", ] +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.0.83" @@ -589,6 +618,16 @@ dependencies = [ "windows-targets 0.52.0", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clang-sys" version = "1.7.0" @@ -1423,6 +1462,16 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "block-padding", + "generic-array", +] + [[package]] name = "instant" version = "0.1.12" @@ -1993,6 +2042,16 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "pbkdf2" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" +dependencies = [ + "digest", + "hmac", +] + [[package]] name = "peeking_take_while" version = "0.1.2" @@ -2088,6 +2147,21 @@ dependencies = [ "spki", ] +[[package]] +name = "pkcs5" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e847e2c91a18bfa887dd028ec33f2fe6f25db77db3619024764914affe8b69a6" +dependencies = [ + "aes", + "cbc", + "der", + "pbkdf2", + "scrypt", + "sha2", + "spki", +] + [[package]] name = "pkcs8" version = "0.10.2" @@ -2095,6 +2169,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" dependencies = [ "der", + "pkcs5", + "rand_core", "spki", ] @@ -2446,9 +2522,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqsign" -version = "0.14.7" +version = "0.14.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed08ac3aa0676637644b1b892202f1ae789c28c15ebfa906128d111ae8086062" +checksum = "43e319d9de9ff4d941abf4ac718897118b0fe04577ea3f8e0f5788971784eef5" dependencies = [ "anyhow", "async-trait", @@ -2556,6 +2632,7 @@ dependencies = [ "pkcs1", "pkcs8", "rand_core", + "sha2", "signature", "spki", "subtle", @@ -2675,6 +2752,15 @@ version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" +[[package]] +name = "salsa20" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213" +dependencies = [ + "cipher", +] + [[package]] name = "same-file" version = "1.0.6" @@ -2699,6 +2785,17 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "scrypt" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f" +dependencies = [ + "pbkdf2", + "salsa20", + "sha2", +] + [[package]] name = "sct" version = "0.7.1" diff --git a/bindings/c/src/reader.rs b/bindings/c/src/reader.rs index d0604aeebb8..8f233dd8e4d 100644 --- a/bindings/c/src/reader.rs +++ b/bindings/c/src/reader.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::io::Read; - use ::opendal as core; use super::*; @@ -51,12 +49,15 @@ impl opendal_reader { let buf = unsafe { std::slice::from_raw_parts_mut(buf, len) }; let inner = unsafe { &mut *(*reader).inner }; - let r = inner.read(buf); + let r = inner.read(buf.len()); match r { - Ok(n) => opendal_result_reader_read { - size: n, - error: std::ptr::null_mut(), - }, + Ok(bs) => { + buf[..bs.len()].copy_from_slice(&bs); + opendal_result_reader_read { + size: bs.len(), + error: std::ptr::null_mut(), + } + } Err(e) => opendal_result_reader_read { size: 0, error: opendal_error::new( diff --git a/bindings/cpp/src/reader.rs b/bindings/cpp/src/reader.rs index 6b56d1a4b52..6f9bfd8fe9e 100644 --- a/bindings/cpp/src/reader.rs +++ b/bindings/cpp/src/reader.rs @@ -16,7 +16,6 @@ // under the License. use anyhow::Result; -use od::raw::oio::BlockingRead; use opendal as od; use super::ffi; @@ -25,7 +24,9 @@ pub struct Reader(pub od::BlockingReader); impl Reader { pub fn read(&mut self, buf: &mut [u8]) -> Result { - Ok(self.0.read(buf)?) + let bs = self.0.read(buf.len())?; + buf[..bs.len()].copy_from_slice(&bs); + Ok(bs.len()) } pub fn seek(&mut self, offset: u64, dir: ffi::SeekFrom) -> Result { diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index 4a0a9e188d9..fec882b879f 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -26,7 +26,6 @@ use std::time::Duration; use futures::TryStreamExt; use napi::bindgen_prelude::*; -use opendal::raw::oio::BlockingRead; #[napi] pub struct Operator(opendal::Operator); @@ -648,7 +647,10 @@ pub struct BlockingReader(opendal::BlockingReader); impl BlockingReader { #[napi] pub fn read(&mut self, mut buf: Buffer) -> Result { - self.0.read(buf.as_mut()).map_err(format_napi_error) + let buf = buf.as_mut(); + let bs = self.0.read(buf.len()).map_err(format_napi_error)?; + buf[..bs.len()].copy_from_slice(&bs); + Ok(bs.len()) } } diff --git a/bindings/ocaml/src/operator/reader.rs b/bindings/ocaml/src/operator/reader.rs index b3bc7bc5873..8ae7b4583b9 100644 --- a/bindings/ocaml/src/operator/reader.rs +++ b/bindings/ocaml/src/operator/reader.rs @@ -17,14 +17,14 @@ use std::io; -use opendal::raw::oio::BlockingRead; - use super::*; #[ocaml::func] #[ocaml::sig("reader -> bytes -> (int, string) Result.t ")] pub fn reader_read(reader: &mut Reader, buf: &mut [u8]) -> Result { - map_res_error(reader.0.read(buf)) + let bs = map_res_error(reader.0.read(buf.len()))?; + buf[..bs.len()].copy_from_slice(&bs); + Ok(bs.len()) } #[ocaml::func] diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs index 806306ca5a5..f8a98c8f51c 100644 --- a/bindings/python/src/file.rs +++ b/bindings/python/src/file.rs @@ -18,7 +18,6 @@ // Remove this `allow` after fixed. #![allow(clippy::unnecessary_fallible_conversions)] -use std::io::Read; use std::io::Seek; use std::io::SeekFrom; use std::io::Write; @@ -77,11 +76,10 @@ impl File { let buffer = match size { Some(size) => { - let mut buffer = vec![0; size]; - reader - .read_exact(&mut buffer) + let bs = reader + .read_exact(size) .map_err(|err| PyIOError::new_err(err.to_string()))?; - buffer + bs.to_vec() } None => { let mut buffer = Vec::new(); diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index 3476dc6e90b..84a0c12292c 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -17,7 +17,7 @@ use async_trait::async_trait; use bytes; -use bytes::{BufMut, Bytes}; +use bytes::Bytes; use futures::future::poll_fn; use tokio::runtime::Handle; @@ -288,25 +288,13 @@ impl BlockingWrapper { } impl oio::BlockingRead for BlockingWrapper { - fn read(&mut self, mut buf: &mut [u8]) -> Result { - let bs = self.handle.block_on(self.inner.read(buf.len())); - let bs = bs?; - buf.put_slice(&bs); - Ok(bs.len()) + fn read(&mut self, limit: usize) -> Result { + self.handle.block_on(self.inner.read(limit)) } fn seek(&mut self, pos: std::io::SeekFrom) -> Result { self.handle.block_on(self.inner.seek(pos)) } - - fn next(&mut self) -> Option> { - let bs = self.handle.block_on(self.inner.read(4 * 1024 * 1024)); - match bs { - Ok(bs) if bs.is_empty() => None, - Ok(bs) => Some(Ok(bs)), - Err(err) => Some(Err(err)), - } - } } impl oio::BlockingWrite for BlockingWrapper { diff --git a/core/src/layers/chaos.rs b/core/src/layers/chaos.rs index 079500c4441..55a28a554ab 100644 --- a/core/src/layers/chaos.rs +++ b/core/src/layers/chaos.rs @@ -192,9 +192,9 @@ impl oio::Read for ChaosReader { } impl oio::BlockingRead for ChaosReader { - fn read(&mut self, buf: &mut [u8]) -> Result { + fn read(&mut self, limit: usize) -> Result { if self.i_feel_lucky() { - self.inner.read(buf) + self.inner.read(limit) } else { Err(Self::unexpected_eof()) } @@ -207,12 +207,4 @@ impl oio::BlockingRead for ChaosReader { Err(Self::unexpected_eof()) } } - - fn next(&mut self) -> Option> { - if self.i_feel_lucky() { - self.inner.next() - } else { - Some(Err(Self::unexpected_eof())) - } - } } diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 36fbcb92e79..8a663cfc475 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -267,17 +267,13 @@ impl oio::Read for ConcurrentLimitWrapper { } impl oio::BlockingRead for ConcurrentLimitWrapper { - fn read(&mut self, buf: &mut [u8]) -> Result { - self.inner.read(buf) + fn read(&mut self, limit: usize) -> Result { + self.inner.read(limit) } fn seek(&mut self, pos: SeekFrom) -> Result { self.inner.seek(pos) } - - fn next(&mut self) -> Option> { - self.inner.next() - } } impl oio::Write for ConcurrentLimitWrapper { diff --git a/core/src/layers/dtrace.rs b/core/src/layers/dtrace.rs index 4e488dd4997..4c4af268a20 100644 --- a/core/src/layers/dtrace.rs +++ b/core/src/layers/dtrace.rs @@ -376,14 +376,14 @@ impl oio::Read for DtraceLayerWrapper { } impl oio::BlockingRead for DtraceLayerWrapper { - fn read(&mut self, buf: &mut [u8]) -> Result { + fn read(&mut self, limit: usize) -> Result { let c_path = CString::new(self.path.clone()).unwrap(); probe_lazy!(opendal, blocking_reader_read_start, c_path.as_ptr()); self.inner - .read(buf) - .map(|n| { - probe_lazy!(opendal, blocking_reader_read_ok, c_path.as_ptr(), n); - n + .read(limit) + .map(|bs| { + probe_lazy!(opendal, blocking_reader_read_ok, c_path.as_ptr(), bs.len()); + bs }) .map_err(|e| { probe_lazy!(opendal, blocking_reader_read_error, c_path.as_ptr()); @@ -405,26 +405,6 @@ impl oio::BlockingRead for DtraceLayerWrapper { e }) } - - fn next(&mut self) -> Option> { - let c_path = CString::new(self.path.clone()).unwrap(); - probe_lazy!(opendal, blocking_reader_next_start, c_path.as_ptr()); - self.inner.next().map(|res| match res { - Ok(bytes) => { - probe_lazy!( - opendal, - blocking_reader_next_ok, - c_path.as_ptr(), - bytes.len() - ); - Ok(bytes) - } - Err(e) => { - probe_lazy!(opendal, blocking_reader_next_error, c_path.as_ptr()); - Err(e) - } - }) - } } impl oio::Write for DtraceLayerWrapper { diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 43b08ea540a..f8112951a1c 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -367,12 +367,12 @@ impl oio::Read for ErrorContextWrapper { } impl oio::BlockingRead for ErrorContextWrapper { - fn read(&mut self, buf: &mut [u8]) -> Result { - self.inner.read(buf).map_err(|err| { + fn read(&mut self, limit: usize) -> Result { + self.inner.read(limit).map_err(|err| { err.with_operation(ReadOperation::BlockingRead) .with_context("service", self.scheme) .with_context("path", &self.path) - .with_context("read_buf", buf.len().to_string()) + .with_context("limit", limit.to_string()) }) } @@ -384,16 +384,6 @@ impl oio::BlockingRead for ErrorContextWrapper { .with_context("seek", format!("{pos:?}")) }) } - - fn next(&mut self) -> Option> { - self.inner.next().map(|v| { - v.map_err(|err| { - err.with_operation(ReadOperation::BlockingNext) - .with_context("service", self.scheme) - .with_context("path", &self.path) - }) - }) - } } #[async_trait::async_trait] diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 2f9fed02b15..02abb3f3dff 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1057,10 +1057,10 @@ impl oio::Read for LoggingReader { } impl oio::BlockingRead for LoggingReader { - fn read(&mut self, buf: &mut [u8]) -> Result { - match self.inner.read(buf) { - Ok(n) => { - self.read += n as u64; + fn read(&mut self, limit: usize) -> Result { + match self.inner.read(limit) { + Ok(bs) => { + self.read += bs.len() as u64; trace!( target: LOGGING_TARGET, "service={} operation={} path={} read={} -> data read {}B", @@ -1068,9 +1068,9 @@ impl oio::BlockingRead for LoggingReader { ReadOperation::BlockingRead, self.path, self.read, - n + bs.len() ); - Ok(n) + Ok(bs) } Err(err) => { if let Some(lvl) = self.ctx.error_level(&err) { @@ -1121,40 +1121,6 @@ impl oio::BlockingRead for LoggingReader { } } } - - fn next(&mut self) -> Option> { - match self.inner.next() { - Some(Ok(bs)) => { - self.read += bs.len() as u64; - trace!( - target: LOGGING_TARGET, - "service={} operation={} path={} read={} -> data read {}B", - self.ctx.scheme, - ReadOperation::BlockingNext, - self.path, - self.read, - bs.len() - ); - Some(Ok(bs)) - } - Some(Err(err)) => { - if let Some(lvl) = self.ctx.error_level(&err) { - log!( - target: LOGGING_TARGET, - lvl, - "service={} operation={} path={} read={} -> data read failed: {}", - self.ctx.scheme, - ReadOperation::BlockingNext, - self.path, - self.read, - self.ctx.error_print(&err), - ) - } - Some(Err(err)) - } - None => None, - } - } } pub struct LoggingWriter { diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index c6001b86783..43b24f803da 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -798,12 +798,12 @@ impl oio::Read for MetricWrapper { } impl oio::BlockingRead for MetricWrapper { - fn read(&mut self, buf: &mut [u8]) -> Result { + fn read(&mut self, limit: usize) -> Result { self.inner - .read(buf) - .map(|n| { - self.bytes += n as u64; - n + .read(limit) + .map(|bs| { + self.bytes += bs.len() as u64; + bs }) .map_err(|e| { self.handle.increment_errors_total(self.op, e.kind()); @@ -817,19 +817,6 @@ impl oio::BlockingRead for MetricWrapper { err }) } - - fn next(&mut self) -> Option> { - self.inner.next().map(|res| match res { - Ok(bytes) => { - self.bytes += bytes.len() as u64; - Ok(bytes) - } - Err(e) => { - self.handle.increment_errors_total(self.op, e.kind()); - Err(e) - } - }) - } } impl oio::Write for MetricWrapper { diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index fbf4b2f0641..fdc6a1dc929 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -309,10 +309,10 @@ impl oio::Read for MinitraceWrapper { } impl oio::BlockingRead for MinitraceWrapper { - fn read(&mut self, buf: &mut [u8]) -> Result { + fn read(&mut self, limit: usize) -> Result { let _g = self.span.set_local_parent(); let _span = LocalSpan::enter_with_local_parent(ReadOperation::BlockingRead.into_static()); - self.inner.read(buf) + self.inner.read(limit) } fn seek(&mut self, pos: io::SeekFrom) -> Result { @@ -320,12 +320,6 @@ impl oio::BlockingRead for MinitraceWrapper { let _span = LocalSpan::enter_with_local_parent(ReadOperation::BlockingSeek.into_static()); self.inner.seek(pos) } - - fn next(&mut self) -> Option> { - let _g = self.span.set_local_parent(); - let _span = LocalSpan::enter_with_local_parent(ReadOperation::BlockingNext.into_static()); - self.inner.next() - } } impl oio::Write for MinitraceWrapper { diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index f8f85be255a..90e8d65392a 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -288,17 +288,13 @@ impl oio::Read for OtelTraceWrapper { } impl oio::BlockingRead for OtelTraceWrapper { - fn read(&mut self, buf: &mut [u8]) -> Result { - self.inner.read(buf) + fn read(&mut self, limit: usize) -> Result { + self.inner.read(limit) } fn seek(&mut self, pos: io::SeekFrom) -> Result { self.inner.seek(pos) } - - fn next(&mut self) -> Option> { - self.inner.next() - } } impl oio::Write for OtelTraceWrapper { diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 0037217d999..a5179908eb6 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -719,20 +719,20 @@ impl oio::Read for PrometheusMetricWrapper { } impl oio::BlockingRead for PrometheusMetricWrapper { - fn read(&mut self, buf: &mut [u8]) -> Result { + fn read(&mut self, limit: usize) -> Result { let labels = self.stats.generate_metric_label( self.scheme.into_static(), Operation::BlockingRead.into_static(), &self.path, ); self.inner - .read(buf) - .map(|n| { + .read(limit) + .map(|bs| { self.stats .bytes_total .with_label_values(&labels) - .observe(n as f64); - n + .observe(bs.len() as f64); + bs }) .map_err(|e| { self.stats.increment_errors_total(self.op, e.kind()); @@ -746,27 +746,6 @@ impl oio::BlockingRead for PrometheusMetricWrapper { err }) } - - fn next(&mut self) -> Option> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingRead.into_static(), - &self.path, - ); - self.inner.next().map(|res| match res { - Ok(bytes) => { - self.stats - .bytes_total - .with_label_values(&labels) - .observe(bytes.len() as f64); - Ok(bytes) - } - Err(e) => { - self.stats.increment_errors_total(self.op, e.kind()); - Err(e) - } - }) - } } impl oio::Write for PrometheusMetricWrapper { diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index 7c12f3ecfd9..95e9bd16317 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -566,12 +566,12 @@ impl oio::Read for PrometheusMetricWrapper { } impl oio::BlockingRead for PrometheusMetricWrapper { - fn read(&mut self, buf: &mut [u8]) -> Result { + fn read(&mut self, limit: usize) -> Result { self.inner - .read(buf) - .map(|n| { - self.bytes_total += n; - n + .read(limit) + .map(|bs| { + self.bytes_total += bs.len(); + bs }) .map_err(|e| { self.metrics @@ -587,20 +587,6 @@ impl oio::BlockingRead for PrometheusMetricWrapper { err }) } - - fn next(&mut self) -> Option> { - self.inner.next().map(|res| match res { - Ok(bytes) => { - self.bytes_total += bytes.len(); - Ok(bytes) - } - Err(e) => { - self.metrics - .increment_errors_total(self.scheme, self.op, e.kind()); - Err(e) - } - }) - } } impl oio::Write for PrometheusMetricWrapper { diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 5c4fce8dcd3..e6582114e84 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -737,8 +737,8 @@ impl oio::Read for RetryWrapper { } impl oio::BlockingRead for RetryWrapper { - fn read(&mut self, buf: &mut [u8]) -> Result { - { || self.inner.as_mut().unwrap().read(buf) } + fn read(&mut self, limit: usize) -> Result { + { || self.inner.as_mut().unwrap().read(limit) } .retry(&self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| { @@ -772,25 +772,6 @@ impl oio::BlockingRead for RetryWrapp .call() .map_err(|e| e.set_persistent()) } - - fn next(&mut self) -> Option> { - { || self.inner.as_mut().unwrap().next().transpose() } - .retry(&self.builder) - .when(|e| e.is_temporary()) - .notify(|err, dur| { - self.notify.intercept( - err, - dur, - &[ - ("operation", ReadOperation::BlockingNext.into_static()), - ("path", &self.path), - ], - ); - }) - .call() - .map_err(|e| e.set_persistent()) - .transpose() - } } impl oio::Write for RetryWrapper { diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index 3e42839f46d..0082a627358 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -196,18 +196,14 @@ impl oio::Read for ThrottleWrapper { } impl oio::BlockingRead for ThrottleWrapper { - fn read(&mut self, buf: &mut [u8]) -> Result { + fn read(&mut self, limit: usize) -> Result { // TODO: How can we handle buffer reads with a limiter? - self.inner.read(buf) + self.inner.read(limit) } fn seek(&mut self, pos: SeekFrom) -> Result { self.inner.seek(pos) } - - fn next(&mut self) -> Option> { - self.inner.next() - } } impl oio::Write for ThrottleWrapper { diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 6dfd34138d3..017264cd329 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -290,8 +290,8 @@ impl oio::BlockingRead for TracingWrapper { parent = &self.span, level = "trace", skip_all)] - fn read(&mut self, buf: &mut [u8]) -> Result { - self.inner.read(buf) + fn read(&mut self, limit: usize) -> Result { + self.inner.read(limit) } #[tracing::instrument( @@ -301,14 +301,6 @@ impl oio::BlockingRead for TracingWrapper { fn seek(&mut self, pos: io::SeekFrom) -> Result { self.inner.seek(pos) } - - #[tracing::instrument( - parent = &self.span, - level = "trace", - skip_all)] - fn next(&mut self) -> Option> { - self.inner.next() - } } impl oio::Write for TracingWrapper { diff --git a/core/src/raw/enum_utils.rs b/core/src/raw/enum_utils.rs index e2d2a10dff5..430e5137741 100644 --- a/core/src/raw/enum_utils.rs +++ b/core/src/raw/enum_utils.rs @@ -74,10 +74,10 @@ impl oio::Read for TwoWays { } impl oio::BlockingRead for TwoWays { - fn read(&mut self, buf: &mut [u8]) -> Result { + fn read(&mut self, limit: usize) -> Result { match self { - Self::One(v) => v.read(buf), - Self::Two(v) => v.read(buf), + Self::One(v) => v.read(limit), + Self::Two(v) => v.read(limit), } } @@ -87,13 +87,6 @@ impl oio::BlockingRead for TwoWa Self::Two(v) => v.seek(pos), } } - - fn next(&mut self) -> Option> { - match self { - Self::One(v) => v.next(), - Self::Two(v) => v.next(), - } - } } impl oio::Write for TwoWays { @@ -152,11 +145,11 @@ impl oio::Read for ThreeWays oio::BlockingRead for ThreeWays { - fn read(&mut self, buf: &mut [u8]) -> Result { + fn read(&mut self, limit: usize) -> Result { match self { - Self::One(v) => v.read(buf), - Self::Two(v) => v.read(buf), - Self::Three(v) => v.read(buf), + Self::One(v) => v.read(limit), + Self::Two(v) => v.read(limit), + Self::Three(v) => v.read(limit), } } @@ -167,14 +160,6 @@ impl o Self::Three(v) => v.seek(pos), } } - - fn next(&mut self) -> Option> { - match self { - Self::One(v) => v.next(), - Self::Two(v) => v.next(), - Self::Three(v) => v.next(), - } - } } impl oio::Write @@ -252,12 +237,12 @@ where THREE: oio::BlockingRead, FOUR: oio::BlockingRead, { - fn read(&mut self, buf: &mut [u8]) -> Result { + fn read(&mut self, limit: usize) -> Result { match self { - Self::One(v) => v.read(buf), - Self::Two(v) => v.read(buf), - Self::Three(v) => v.read(buf), - Self::Four(v) => v.read(buf), + Self::One(v) => v.read(limit), + Self::Two(v) => v.read(limit), + Self::Three(v) => v.read(limit), + Self::Four(v) => v.read(limit), } } @@ -269,15 +254,6 @@ where Self::Four(v) => v.seek(pos), } } - - fn next(&mut self) -> Option> { - match self { - Self::One(v) => v.next(), - Self::Two(v) => v.next(), - Self::Three(v) => v.next(), - Self::Four(v) => v.next(), - } - } } impl oio::List for FourWays diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs index a920652451a..75a034f1023 100644 --- a/core/src/raw/oio/cursor.rs +++ b/core/src/raw/oio/cursor.rs @@ -16,7 +16,6 @@ // under the License. use std::cmp::min; -use std::io::Read; use std::io::SeekFrom; use std::task::Context; use std::task::Poll; @@ -72,6 +71,18 @@ impl From> for Cursor { } impl oio::Read for Cursor { + async fn read(&mut self, limit: usize) -> Result { + if self.is_empty() { + Ok(Bytes::new()) + } else { + // The clone here is required as we don't want to change it. + let mut bs = self.inner.clone().split_off(self.pos as usize); + let bs = bs.split_to(min(bs.len(), limit)); + self.pos += bs.len() as u64; + Ok(bs) + } + } + async fn seek(&mut self, pos: SeekFrom) -> Result { let (base, amt) = match pos { SeekFrom::Start(n) => (0, n as i64), @@ -91,8 +102,10 @@ impl oio::Read for Cursor { self.pos = n; Ok(n) } +} - async fn read(&mut self, limit: usize) -> Result { +impl oio::BlockingRead for Cursor { + fn read(&mut self, limit: usize) -> Result { if self.is_empty() { Ok(Bytes::new()) } else { @@ -103,18 +116,6 @@ impl oio::Read for Cursor { Ok(bs) } } -} - -impl oio::BlockingRead for Cursor { - fn read(&mut self, buf: &mut [u8]) -> Result { - let n = Read::read(&mut self.remaining_slice(), buf).map_err(|err| { - Error::new(ErrorKind::Unexpected, "read data from Cursor") - .with_context("source", "Cursor") - .set_source(err) - })?; - self.pos += n as u64; - Ok(n) - } fn seek(&mut self, pos: SeekFrom) -> Result { let (base, amt) = match pos { @@ -135,17 +136,6 @@ impl oio::BlockingRead for Cursor { self.pos = n; Ok(n) } - - fn next(&mut self) -> Option> { - if self.is_empty() { - None - } else { - // The clone here is required as we don't want to change it. - let bs = self.inner.clone().split_off(self.pos as usize); - self.pos += bs.len() as u64; - Some(Ok(bs)) - } - } } impl oio::Stream for Cursor { diff --git a/core/src/raw/oio/read/api.rs b/core/src/raw/oio/read/api.rs index 22865375abb..d8ed05c4ece 100644 --- a/core/src/raw/oio/read/api.rs +++ b/core/src/raw/oio/read/api.rs @@ -22,7 +22,6 @@ use std::ops::DerefMut; use bytes::Bytes; use futures::Future; -use tokio::io::ReadBuf; use crate::raw::BoxedFuture; use crate::*; @@ -177,65 +176,15 @@ pub type BlockingReader = Box; /// is optional. We use `Read` to make users life easier. pub trait BlockingRead: Send + Sync { /// Read synchronously. - fn read(&mut self, buf: &mut [u8]) -> Result; + fn read(&mut self, limit: usize) -> Result; /// Seek synchronously. fn seek(&mut self, pos: io::SeekFrom) -> Result; - - /// Iterating [`Bytes`] from underlying reader. - fn next(&mut self) -> Option>; - - /// Read all data of current reader to the end of buf. - fn read_to_end(&mut self, buf: &mut Vec) -> Result { - let start_len = buf.len(); - let start_cap = buf.capacity(); - - loop { - if buf.len() == buf.capacity() { - buf.reserve(32); // buf is full, need more space - } - - let spare = buf.spare_capacity_mut(); - let mut read_buf: ReadBuf = ReadBuf::uninit(spare); - - // SAFETY: These bytes were initialized but not filled in the previous loop - unsafe { - read_buf.assume_init(read_buf.capacity()); - } - - match self.read(read_buf.initialize_unfilled()) { - Ok(0) => return Ok(buf.len() - start_len), - Ok(n) => { - // SAFETY: Read API makes sure that returning `n` is correct. - unsafe { - buf.set_len(buf.len() + n); - } - } - Err(e) => return Err(e), - } - - // The buffer might be an exact fit. Let's read into a probe buffer - // and see if it returns `Ok(0)`. If so, we've avoided an - // unnecessary doubling of the capacity. But if not, append the - // probe buffer to the primary buffer and let its capacity grow. - if buf.len() == buf.capacity() && buf.capacity() == start_cap { - let mut probe = [0u8; 32]; - - match self.read(&mut probe) { - Ok(0) => return Ok(buf.len() - start_len), - Ok(n) => { - buf.extend_from_slice(&probe[..n]); - } - Err(e) => return Err(e), - } - } - } - } } impl BlockingRead for () { - fn read(&mut self, buf: &mut [u8]) -> Result { - let _ = buf; + fn read(&mut self, limit: usize) -> Result { + let _ = limit; unimplemented!("read is required to be implemented for oio::BlockingRead") } @@ -248,27 +197,16 @@ impl BlockingRead for () { "output blocking reader doesn't support seeking", )) } - - fn next(&mut self) -> Option> { - Some(Err(Error::new( - ErrorKind::Unsupported, - "output reader doesn't support iterating", - ))) - } } /// `Box` won't implement `BlockingRead` automatically. /// To make BlockingReader work as expected, we must add this impl. impl BlockingRead for Box { - fn read(&mut self, buf: &mut [u8]) -> Result { - (**self).read(buf) + fn read(&mut self, limit: usize) -> Result { + (**self).read(limit) } fn seek(&mut self, pos: io::SeekFrom) -> Result { (**self).seek(pos) } - - fn next(&mut self) -> Option> { - (**self).next() - } } diff --git a/core/src/raw/oio/read/buffer_reader.rs b/core/src/raw/oio/read/buffer_reader.rs index 33709b2355d..733878d1cce 100644 --- a/core/src/raw/oio/read/buffer_reader.rs +++ b/core/src/raw/oio/read/buffer_reader.rs @@ -18,7 +18,6 @@ use std::cmp::min; use std::io::SeekFrom; -use bytes::BufMut; use bytes::Bytes; use tokio::io::ReadBuf; @@ -136,30 +135,6 @@ impl oio::Read for BufferReader where R: oio::Read, { - async fn seek(&mut self, pos: SeekFrom) -> Result { - match pos { - SeekFrom::Start(new_pos) => { - // TODO(weny): Check the overflowing. - let Some(offset) = (new_pos as i64).checked_sub(self.cur as i64) else { - return self.inner_seek(pos).await; - }; - - match self.seek_relative(offset) { - Some(cur) => Ok(cur), - None => self.inner_seek(pos).await, - } - } - SeekFrom::Current(offset) => match self.seek_relative(offset) { - Some(cur) => Ok(cur), - None => { - self.inner_seek(SeekFrom::Current(offset - self.unconsumed_buffer_len())) - .await - } - }, - SeekFrom::End(_) => self.inner_seek(pos).await, - } - } - async fn read(&mut self, limit: usize) -> Result { if limit == 0 { return Ok(Bytes::new()); @@ -190,6 +165,30 @@ where self.consume(bytes.len()); Ok(bytes) } + + async fn seek(&mut self, pos: SeekFrom) -> Result { + match pos { + SeekFrom::Start(new_pos) => { + // TODO(weny): Check the overflowing. + let Some(offset) = (new_pos as i64).checked_sub(self.cur as i64) else { + return self.inner_seek(pos).await; + }; + + match self.seek_relative(offset) { + Some(cur) => Ok(cur), + None => self.inner_seek(pos).await, + } + } + SeekFrom::Current(offset) => match self.seek_relative(offset) { + Some(cur) => Ok(cur), + None => { + self.inner_seek(SeekFrom::Current(offset - self.unconsumed_buffer_len())) + .await + } + }, + SeekFrom::End(_) => self.inner_seek(pos).await, + } + } } impl BufferReader @@ -210,11 +209,12 @@ where let mut buf = ReadBuf::uninit(dst); unsafe { buf.assume_init(cap) }; - let n = self.r.read(buf.initialized_mut())?; - unsafe { self.buf.set_len(n) } + let bs = self.r.read(cap)?; + buf.put_slice(&bs); + unsafe { self.buf.set_len(bs.len()) } self.pos = 0; - self.filled = n; + self.filled = bs.len(); } Ok(&self.buf[self.pos..self.filled]) @@ -233,32 +233,35 @@ impl BlockingRead for BufferReader where R: BlockingRead, { - fn read(&mut self, mut dst: &mut [u8]) -> Result { - // Sanity check for normal cases. - if dst.is_empty() { - return Ok(0); + fn read(&mut self, limit: usize) -> Result { + if limit == 0 { + return Ok(Bytes::new()); } // If we don't have any buffered data and we're doing a massive read // (larger than our internal buffer), bypass our internal buffer // entirely. - if self.pos == self.filled && dst.len() >= self.capacity() { - let res = self.r.read(dst); + if self.pos == self.filled && limit >= self.capacity() { + let res = self.r.read(limit); self.discard_buffer(); return match res { - Ok(nread) => { - self.cur += nread as u64; - Ok(nread) + Ok(bs) => { + self.cur += bs.len() as u64; + Ok(bs) } Err(err) => Err(err), }; } - let rem = self.blocking_fill_buf()?; - let amt = min(rem.len(), dst.len()); - dst.put(&rem[..amt]); - self.consume(amt); - Ok(amt) + let bytes = self.blocking_fill_buf()?; + + if bytes.is_empty() { + return Ok(Bytes::new()); + } + let size = min(bytes.len(), limit); + let bytes = Bytes::copy_from_slice(&bytes[..size]); + self.consume(bytes.len()); + Ok(bytes) } fn seek(&mut self, pos: SeekFrom) -> Result { @@ -282,21 +285,6 @@ where SeekFrom::End(_) => self.blocking_inner_seek(pos), } } - - fn next(&mut self) -> Option> { - match self.blocking_fill_buf() { - Ok(bytes) => { - if bytes.is_empty() { - return None; - } - - let bytes = Bytes::copy_from_slice(bytes); - self.consume(bytes.len()); - Some(Ok(bytes)) - } - Err(err) => Some(Err(err)), - } - } } #[cfg(test)] @@ -397,8 +385,8 @@ mod tests { } impl BlockingRead for MockReader { - fn read(&mut self, buf: &mut [u8]) -> Result { - self.inner.read(buf) + fn read(&mut self, limit: usize) -> Result { + self.inner.read(limit) } fn seek(&mut self, _pos: SeekFrom) -> Result { @@ -407,10 +395,6 @@ mod tests { "output reader doesn't support seeking", )) } - - fn next(&mut self) -> Option> { - self.inner.next() - } } #[tokio::test] @@ -634,20 +618,17 @@ mod tests { let r = Box::new(BufferReader::new(r, buf_cap)) as oio::BlockingReader; let mut r = BlockingReader::new(r); - let mut dst = [0u8; 5]; - let nread = r.read(&mut dst)?; - assert_eq!(nread, dst.len()); - assert_eq!(&dst, b"Hello"); + let buf = r.read(5)?; + assert_eq!(buf.len(), 5); + assert_eq!(buf.as_ref(), b"Hello"); - let mut dst = [0u8; 5]; - let nread = r.read(&mut dst)?; - assert_eq!(nread, dst.len()); - assert_eq!(&dst, b", Wor"); + let buf = r.read(5)?; + assert_eq!(buf.len(), 5); + assert_eq!(buf.as_ref(), b", Wor"); - let mut dst = [0u8; 3]; - let nread = r.read(&mut dst)?; - assert_eq!(nread, dst.len()); - assert_eq!(&dst, b"ld!"); + let buf = r.read(3)?; + assert_eq!(buf.len(), 3); + assert_eq!(buf.as_ref(), b"ld!"); Ok(()) } @@ -661,33 +642,29 @@ mod tests { let mut r = BlockingReader::new(r); // The underlying reader buffers the b"Hello, Wor". - let mut dst = [0u8; 5]; - let nread = r.read(&mut dst)?; - assert_eq!(nread, dst.len()); - assert_eq!(&dst, b"Hello"); + let buf = r.read(5)?; + assert_eq!(buf.len(), 5); + assert_eq!(buf.as_ref(), b"Hello"); let pos = r.seek(SeekFrom::Start(7))?; assert_eq!(pos, 7); - let mut dst = [0u8; 5]; - let nread = r.read(&mut dst)?; - assert_eq!(&dst[..nread], &bs[7..10]); - assert_eq!(nread, 3); + let buf = r.read(5)?; + assert_eq!(&buf[..], &bs[7..10]); + assert_eq!(buf.len(), 3); // Should perform a relative seek. let pos = r.seek(SeekFrom::Start(0))?; assert_eq!(pos, 0); - let mut dst = [0u8; 9]; - let nread = r.read(&mut dst)?; - assert_eq!(&dst[..nread], &bs[0..9]); - assert_eq!(nread, 9); + let buf = r.read(9)?; + assert_eq!(&buf[..], &bs[0..9]); + assert_eq!(buf.len(), 9); // Should perform a non-relative seek. let pos = r.seek(SeekFrom::Start(11))?; assert_eq!(pos, 11); - let mut dst = [0u8; 9]; - let nread = r.read(&mut dst)?; - assert_eq!(&dst[..nread], &bs[11..13]); - assert_eq!(nread, 2); + let buf = r.read(9)?; + assert_eq!(&buf[..], &bs[11..13]); + assert_eq!(buf.len(), 2); Ok(()) } @@ -734,9 +711,8 @@ mod tests { let mut cur = 0; for _ in 0..3 { - let mut dst = [0u8; 5]; - let nread = r.read(&mut dst)?; - assert_eq!(nread, 5); + let bs = r.read(5)?; + assert_eq!(bs.len(), 5); cur += 5; } @@ -757,9 +733,8 @@ mod tests { let mut cur = 0; for _ in 0..3 { - let mut dst = [0u8; 6]; - let nread = r.read(&mut dst)?; - assert_eq!(nread, 6); + let bs = r.read(6)?; + assert_eq!(bs.len(), 6); cur += 6; } @@ -771,8 +746,6 @@ mod tests { #[tokio::test] async fn test_blocking_read_part() -> anyhow::Result<()> { - use std::io::Read; - let (bs, _) = gen_bytes(); let acc = Arc::new(MockReadService::new(bs.clone())); let r = Box::new(RangeReader::new( @@ -784,7 +757,7 @@ mod tests { let mut r = BlockingReader::new(r); let mut buf = Vec::new(); - BlockingRead::read_to_end(&mut r, &mut buf)?; + r.read_to_end(&mut buf)?; assert_eq!(4096, buf.len(), "read size"); assert_eq!( format!("{:x}", Sha256::digest(&bs[4096..4096 + 4096])), @@ -796,7 +769,7 @@ mod tests { assert_eq!(n, 0, "seek position must be 0"); let mut buf = Vec::new(); - BlockingRead::read_to_end(&mut r, &mut buf)?; + r.read_to_end(&mut buf)?; assert_eq!(4096, buf.len(), "read twice size"); assert_eq!( format!("{:x}", Sha256::digest(&bs[4096..4096 + 4096])), @@ -807,8 +780,7 @@ mod tests { let n = r.seek(SeekFrom::Start(1024))?; assert_eq!(1024, n, "seek to 1024"); - let mut buf = vec![0; 1024]; - r.read_exact(&mut buf)?; + let buf = r.read_exact(1024)?; assert_eq!( format!("{:x}", Sha256::digest(&bs[4096 + 1024..4096 + 2048])), format!("{:x}", Sha256::digest(&buf)), @@ -818,8 +790,7 @@ mod tests { let n = r.seek(SeekFrom::Current(1024))?; assert_eq!(3072, n, "seek to 3072"); - let mut buf = vec![0; 1024]; - r.read_exact(&mut buf)?; + let buf = r.read_exact(1024)?; assert_eq!( format!("{:x}", Sha256::digest(&bs[4096 + 3072..4096 + 3072 + 1024])), format!("{:x}", Sha256::digest(&buf)), diff --git a/core/src/raw/oio/read/file_read.rs b/core/src/raw/oio/read/file_read.rs index 6a277336340..64596c754a0 100644 --- a/core/src/raw/oio/read/file_read.rs +++ b/core/src/raw/oio/read/file_read.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::cmp; use std::io::SeekFrom; use std::sync::Arc; @@ -40,7 +39,6 @@ pub struct FileReader { cur: u64, reader: Option, - buf: oio::AdaptiveBuf, /// Do we need to reset our cursor? seek_dirty: bool, } @@ -63,7 +61,6 @@ where offset: None, size: None, cur: 0, - buf: oio::AdaptiveBuf::default(), reader: None, seek_dirty: false, } @@ -213,6 +210,24 @@ where A: Accessor, R: oio::Read, { + async fn read(&mut self, limit: usize) -> Result { + if self.reader.is_none() { + // FileReader doesn't support range, we will always use full range to open a file. + let op = self.op.clone().with_range(BytesRange::from(..)); + let (_, r) = self.acc.read(&self.path, op).await?; + self.reader = Some(r); + } + + let r = self.reader.as_mut().expect("reader must be valid"); + + // We should know where to start read the data. + if self.offset.is_none() { + (self.offset, self.size) = Self::offset(r, self.op.range()).await?; + } + + r.read(limit).await + } + async fn seek(&mut self, pos: SeekFrom) -> Result { if self.reader.is_none() { // FileReader doesn't support range, we will always use full range to open a file. @@ -245,24 +260,6 @@ where self.cur = pos - self.offset.unwrap(); Ok(self.cur) } - - async fn read(&mut self, limit: usize) -> Result { - if self.reader.is_none() { - // FileReader doesn't support range, we will always use full range to open a file. - let op = self.op.clone().with_range(BytesRange::from(..)); - let (_, r) = self.acc.read(&self.path, op).await?; - self.reader = Some(r); - } - - let r = self.reader.as_mut().expect("reader must be valid"); - - // We should know where to start read the data. - if self.offset.is_none() { - (self.offset, self.size) = Self::offset(r, self.op.range()).await?; - } - - r.read(limit).await - } } impl oio::BlockingRead for FileReader @@ -270,7 +267,7 @@ where A: Accessor, R: oio::BlockingRead, { - fn read(&mut self, buf: &mut [u8]) -> Result { + fn read(&mut self, limit: usize) -> Result { if self.reader.is_none() { // FileReader doesn't support range, we will always use full range to open a file. let op = self.op.clone().with_range(BytesRange::from(..)); @@ -285,25 +282,7 @@ where (self.offset, self.size) = Self::calculate_offset(r, self.op.range())?; } - let size = if let Some(size) = self.size { - // Sanity check. - if self.cur >= size { - return Ok(0); - } - cmp::min(buf.len(), (size - self.cur) as usize) - } else { - buf.len() - }; - - match r.read(&mut buf[..size]) { - Ok(0) => Ok(0), - Ok(n) => { - self.cur += n as u64; - Ok(n) - } - // We don't need to reset state here since it's ok to poll the same reader. - Err(err) => Err(err), - } + r.read(limit) } fn seek(&mut self, pos: SeekFrom) -> Result { @@ -337,52 +316,4 @@ where self.cur = pos - self.offset.unwrap(); Ok(self.cur) } - - fn next(&mut self) -> Option> { - if self.reader.is_none() { - // FileReader doesn't support range, we will always use full range to open a file. - let op = self.op.clone().with_range(BytesRange::from(..)); - let (_, r) = match self.acc.blocking_read(&self.path, op) { - Ok(v) => v, - Err(err) => return Some(Err(err)), - }; - self.reader = Some(r); - } - - let r = self.reader.as_mut().expect("reader must be valid"); - - // We should know where to start read the data. - if self.offset.is_none() { - (self.offset, self.size) = match Self::calculate_offset(r, self.op.range()) { - Ok(v) => v, - Err(err) => return Some(Err(err)), - } - } - - self.buf.reserve(); - - let mut buf = self.buf.initialized_mut(); - let buf = buf.initialized_mut(); - - let size = if let Some(size) = self.size { - // Sanity check. - if self.cur >= size { - return None; - } - cmp::min(buf.len(), (size - self.cur) as usize) - } else { - buf.len() - }; - - match r.read(&mut buf[..size]) { - Ok(0) => None, - Ok(n) => { - self.cur += n as u64; - self.buf.record(n); - Some(Ok(self.buf.split(n))) - } - // We don't need to reset state here since it's ok to poll the same reader. - Err(err) => Some(Err(err)), - } - } } diff --git a/core/src/raw/oio/read/into_streamable_read.rs b/core/src/raw/oio/read/into_streamable_read.rs index e234eb654c6..6fe21a2c607 100644 --- a/core/src/raw/oio/read/into_streamable_read.rs +++ b/core/src/raw/oio/read/into_streamable_read.rs @@ -28,7 +28,6 @@ use crate::*; pub fn into_streamable_read(r: R, capacity: usize) -> StreamableReader { StreamableReader { r, - cap: capacity, buf: Vec::with_capacity(capacity), } } @@ -36,7 +35,6 @@ pub fn into_streamable_read(r: R, capacity: usize) -> StreamableReader { /// Make given read streamable. pub struct StreamableReader { r: R, - cap: usize, buf: Vec, } @@ -63,28 +61,13 @@ impl oio::Read for StreamableReader { } impl oio::BlockingRead for StreamableReader { - fn read(&mut self, buf: &mut [u8]) -> Result { - self.r.read(buf) + fn read(&mut self, limit: usize) -> Result { + self.r.read(limit) } fn seek(&mut self, pos: SeekFrom) -> Result { self.r.seek(pos) } - - fn next(&mut self) -> Option> { - let dst = self.buf.spare_capacity_mut(); - let mut buf = ReadBuf::uninit(dst); - unsafe { buf.assume_init(self.cap) }; - - match self.r.read(buf.initialized_mut()) { - Err(err) => Some(Err(err)), - Ok(0) => None, - Ok(n) => { - buf.set_filled(n); - Some(Ok(Bytes::from(buf.filled().to_vec()))) - } - } - } } #[cfg(test)] @@ -135,10 +118,13 @@ mod tests { let r = oio::Cursor::from(content.clone()); let mut s = into_streamable_read(Box::new(r) as oio::BlockingReader, cap); - let mut bs = BytesMut::new(); - while let Some(b) = s.next() { - let b = b.expect("read must success"); - bs.put_slice(&b); + let mut bs = BytesMut::with_capacity(size); + loop { + let buf = s.read(size).expect("read must success"); + if buf.is_empty() { + break; + } + bs.put_slice(&buf) } assert_eq!(bs.freeze().to_vec(), content) } diff --git a/core/src/raw/oio/read/lazy_read.rs b/core/src/raw/oio/read/lazy_read.rs index 89be1a3b6f3..1fd1c71b59f 100644 --- a/core/src/raw/oio/read/lazy_read.rs +++ b/core/src/raw/oio/read/lazy_read.rs @@ -99,20 +99,11 @@ where A: Accessor, R: oio::BlockingRead, { - fn read(&mut self, buf: &mut [u8]) -> Result { - self.blocking_reader()?.read(buf) + fn read(&mut self, limit: usize) -> Result { + self.blocking_reader()?.read(limit) } fn seek(&mut self, pos: SeekFrom) -> Result { self.blocking_reader()?.seek(pos) } - - fn next(&mut self) -> Option> { - let r = match self.blocking_reader() { - Ok(r) => r, - Err(err) => return Some(Err(err)), - }; - - r.next() - } } diff --git a/core/src/raw/oio/read/range_read.rs b/core/src/raw/oio/read/range_read.rs index d0551f86d24..56289d53412 100644 --- a/core/src/raw/oio/read/range_read.rs +++ b/core/src/raw/oio/read/range_read.rs @@ -252,6 +252,43 @@ where A: Accessor, R: oio::Read, { + async fn read(&mut self, limit: usize) -> Result { + // Sanity check for normal cases. + if self.cur >= self.size.unwrap_or(u64::MAX) { + return Ok(Bytes::new()); + } + + if self.offset.is_none() { + let rp = match self.stat_future().await { + Ok(v) => v, + Err(err) => return Err(err), + }; + let length = rp.into_metadata().content_length(); + self.ensure_offset(length)? + } + if self.reader.is_none() { + let (rp, r) = match self.read_future().await { + Ok((rp, r)) => (rp, r), + Err(err) => return Err(err), + }; + + self.ensure_size(rp.range().unwrap_or_default().size(), rp.size()); + self.reader = Some(r); + } + + let r = self.reader.as_mut().expect("reader must be valid"); + match r.read(limit).await { + Ok(bs) => { + self.cur += bs.len() as u64; + Ok(bs) + } + Err(err) => { + self.reader = None; + Err(err) + } + } + } + async fn seek(&mut self, pos: SeekFrom) -> Result { // There is an optimization here that we can calculate if users trying to seek // the same position, for example, `reader.seek(SeekFrom::Current(0))`. @@ -292,15 +329,21 @@ where self.cur = seek_pos; Ok(self.cur) } +} - async fn read(&mut self, limit: usize) -> Result { +impl oio::BlockingRead for RangeReader +where + A: Accessor, + R: oio::BlockingRead, +{ + fn read(&mut self, limit: usize) -> Result { // Sanity check for normal cases. if self.cur >= self.size.unwrap_or(u64::MAX) { return Ok(Bytes::new()); } if self.offset.is_none() { - let rp = match self.stat_future().await { + let rp = match self.stat_action() { Ok(v) => v, Err(err) => return Err(err), }; @@ -308,7 +351,7 @@ where self.ensure_offset(length)? } if self.reader.is_none() { - let (rp, r) = match self.read_future().await { + let (rp, r) = match self.read_action() { Ok((rp, r)) => (rp, r), Err(err) => return Err(err), }; @@ -318,7 +361,7 @@ where } let r = self.reader.as_mut().expect("reader must be valid"); - match r.read(limit).await { + match r.read(limit) { Ok(bs) => { self.cur += bs.len() as u64; Ok(bs) @@ -329,48 +372,6 @@ where } } } -} - -impl oio::BlockingRead for RangeReader -where - A: Accessor, - R: oio::BlockingRead, -{ - fn read(&mut self, buf: &mut [u8]) -> Result { - // Sanity check for normal cases. - if buf.is_empty() || self.cur >= self.size.unwrap_or(u64::MAX) { - return Ok(0); - } - - if self.offset.is_none() { - let rp = self.stat_action()?; - let length = rp.into_metadata().content_length(); - self.ensure_offset(length)?; - } - if self.reader.is_none() { - let (rp, r) = self.read_action()?; - - self.ensure_size(rp.range().unwrap_or_default().size(), rp.size()); - self.reader = Some(r); - } - - let r = self.reader.as_mut().expect("reader must be valid"); - match r.read(buf) { - Ok(0) => { - // Reset state to Idle after all data has been consumed. - self.reader = None; - Ok(0) - } - Ok(n) => { - self.cur += n as u64; - Ok(n) - } - Err(e) => { - self.reader = None; - Err(e) - } - } - } fn seek(&mut self, pos: SeekFrom) -> Result { // There is an optimization here that we can calculate if users trying to seek @@ -412,49 +413,6 @@ where self.cur = seek_pos; Ok(self.cur) } - - fn next(&mut self) -> Option> { - // Sanity check for normal cases. - if self.cur >= self.size.unwrap_or(u64::MAX) { - return None; - } - - if self.offset.is_none() { - let rp = match self.stat_action() { - Ok(rp) => rp, - Err(err) => return Some(Err(err)), - }; - let length = rp.into_metadata().content_length(); - if let Err(err) = self.ensure_offset(length) { - return Some(Err(err)); - } - } - if self.reader.is_none() { - let (rp, r) = match self.read_action() { - Ok((rp, r)) => (rp, r), - Err(err) => return Some(Err(err)), - }; - - self.ensure_size(rp.range().unwrap_or_default().size(), rp.size()); - self.reader = Some(r); - } - - let r = self.reader.as_mut().expect("reader must be valid"); - match r.next() { - Some(Ok(bs)) => { - self.cur += bs.len() as u64; - Some(Ok(bs)) - } - Some(Err(err)) => { - self.reader = None; - Some(Err(err)) - } - None => { - self.reader = None; - None - } - } - } } #[cfg(test)] diff --git a/core/src/raw/oio/read/std_read.rs b/core/src/raw/oio/read/std_read.rs index d28f8d5ca34..8726ebf2499 100644 --- a/core/src/raw/oio/read/std_read.rs +++ b/core/src/raw/oio/read/std_read.rs @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. +use bytes::Bytes; use std::io::Read; use std::io::Seek; use std::io::SeekFrom; - -use bytes::Bytes; +use tokio::io::ReadBuf; use crate::raw::*; use crate::*; @@ -27,12 +27,16 @@ use crate::*; /// FuturesReader implements [`oio::BlockingRead`] via [`Read`] + [`Seek`]. pub struct StdReader { inner: R, + buf: Vec, } impl StdReader { /// Create a new std reader. pub fn new(inner: R) -> Self { - Self { inner } + Self { + inner, + buf: Vec::with_capacity(64 * 1024), + } } } @@ -40,12 +44,27 @@ impl oio::BlockingRead for StdReader where R: Read + Seek + Send + Sync, { - fn read(&mut self, buf: &mut [u8]) -> Result { - self.inner.read(buf).map_err(|err| { + fn read(&mut self, limit: usize) -> Result { + // Make sure buf has enough space. + if self.buf.capacity() < limit { + self.buf.reserve(limit); + } + let buf = self.buf.spare_capacity_mut(); + let mut read_buf: ReadBuf = ReadBuf::uninit(buf); + + // SAFETY: Read at most `size` bytes into `read_buf`. + unsafe { + read_buf.assume_init(limit); + } + + let n = self.inner.read(read_buf.initialized_mut()).map_err(|err| { new_std_io_error(err) - .with_operation(oio::ReadOperation::BlockingRead) - .with_context("source", "StdReader") - }) + .with_operation(oio::ReadOperation::Read) + .with_context("source", "TokioReader") + })?; + read_buf.set_filled(n); + + Ok(Bytes::copy_from_slice(read_buf.filled())) } fn seek(&mut self, pos: SeekFrom) -> Result { @@ -55,11 +74,4 @@ where .with_context("source", "StdReader") }) } - - fn next(&mut self) -> Option> { - Some(Err(Error::new( - ErrorKind::Unsupported, - "StdReader doesn't support poll_next", - ))) - } } diff --git a/core/src/raw/oio/read/tokio_read.rs b/core/src/raw/oio/read/tokio_read.rs index 6dea5cec393..f89e2ec5f35 100644 --- a/core/src/raw/oio/read/tokio_read.rs +++ b/core/src/raw/oio/read/tokio_read.rs @@ -47,14 +47,6 @@ impl oio::Read for TokioReader where R: AsyncRead + AsyncSeek + Unpin + Send + Sync, { - async fn seek(&mut self, pos: SeekFrom) -> Result { - self.inner.seek(pos).await.map_err(|err| { - new_std_io_error(err) - .with_operation(oio::ReadOperation::Seek) - .with_context("source", "TokioReader") - }) - } - async fn read(&mut self, limit: usize) -> Result { // Make sure buf has enough space. if self.buf.capacity() < limit { @@ -81,4 +73,12 @@ where Ok(Bytes::copy_from_slice(read_buf.filled())) } + + async fn seek(&mut self, pos: SeekFrom) -> Result { + self.inner.seek(pos).await.map_err(|err| { + new_std_io_error(err) + .with_operation(oio::ReadOperation::Seek) + .with_context("source", "TokioReader") + }) + } } diff --git a/core/src/raw/tests/read.rs b/core/src/raw/tests/read.rs index 49b44b3682e..b2503901a32 100644 --- a/core/src/raw/tests/read.rs +++ b/core/src/raw/tests/read.rs @@ -179,16 +179,11 @@ impl ReadChecker { for action in actions { match action { ReadAction::Read(size) => { - use oio::BlockingRead; - - let mut buf = vec![0; *size]; - let n = r.read(&mut buf).expect("read must success"); - self.check_read(*size, &buf[..n]); + let bs = r.read(*size).expect("read must success"); + self.check_read(*size, &bs); } ReadAction::Seek(pos) => { - use oio::BlockingRead; - let res = r.seek(*pos); self.check_seek(*pos, res); } diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 9cb46efa30a..662f3d770a0 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -18,7 +18,6 @@ use bytes::Bytes; use super::operator_functions::*; -use crate::raw::oio::BlockingRead; use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; @@ -410,9 +409,10 @@ impl BlockingOperator { (range.size().unwrap(), range) }; - let (_, mut s) = inner.blocking_read(&path, args.with_range(range))?; + let (_, r) = inner.blocking_read(&path, args.with_range(range))?; + let mut r = BlockingReader::new(r); let mut buf = Vec::with_capacity(size_hint as usize); - s.read_to_end(&mut buf)?; + r.read_to_end(&mut buf)?; Ok(buf) }, diff --git a/core/src/types/reader.rs b/core/src/types/reader.rs index 2af5ca51442..8db4e9f9f4a 100644 --- a/core/src/types/reader.rs +++ b/core/src/types/reader.rs @@ -26,6 +26,7 @@ use bytes::{BufMut, Bytes, BytesMut}; use futures::Stream; use tokio::io::ReadBuf; +use crate::raw::oio::BlockingRead; use crate::raw::*; use crate::*; @@ -441,33 +442,107 @@ impl BlockingReader { } /// Create a new reader from an `oio::BlockingReader`. - #[cfg(test)] pub(crate) fn new(r: oio::BlockingReader) -> Self { BlockingReader { inner: r } } -} -impl oio::BlockingRead for BlockingReader { + /// Seek to the position of `pos` of reader. #[inline] - fn read(&mut self, buf: &mut [u8]) -> Result { - self.inner.read(buf) + pub fn seek(&mut self, pos: SeekFrom) -> Result { + self.inner.seek(pos) } + /// Read at most `size` bytes of data from reader. #[inline] - fn seek(&mut self, pos: io::SeekFrom) -> Result { - self.inner.seek(pos) + pub fn read(&mut self, limit: usize) -> Result { + self.inner.read(limit) } - #[inline] - fn next(&mut self) -> Option> { - oio::BlockingRead::next(&mut self.inner) + /// Read exact `size` bytes of data from reader. + pub fn read_exact(&mut self, size: usize) -> Result { + // Lucky path. + let bs1 = self.inner.read(size)?; + debug_assert!( + bs1.len() <= size, + "read should not return more bytes than expected" + ); + if bs1.len() == size { + return Ok(bs1); + } + if bs1.is_empty() { + return Err( + Error::new(ErrorKind::ContentIncomplete, "reader got too little data") + .with_context("expect", size.to_string()), + ); + } + + let mut bs = BytesMut::with_capacity(size); + bs.put_slice(&bs1); + + let mut remaining = size - bs.len(); + + loop { + let tmp = self.inner.read(remaining)?; + if tmp.is_empty() { + return Err( + Error::new(ErrorKind::ContentIncomplete, "reader got too little data") + .with_context("expect", size.to_string()) + .with_context("actual", bs.len().to_string()), + ); + } + bs.put_slice(&tmp); + debug_assert!( + tmp.len() <= remaining, + "read should not return more bytes than expected" + ); + + remaining -= tmp.len(); + if remaining == 0 { + break; + } + } + + Ok(bs.freeze()) + } + /// Reads all bytes until EOF in this source, placing them into buf. + pub fn read_to_end(&mut self, buf: &mut Vec) -> Result { + let start_len = buf.len(); + + loop { + if buf.len() == buf.capacity() { + buf.reserve(32); // buf is full, need more space + } + + let spare = buf.spare_capacity_mut(); + let mut read_buf: ReadBuf = ReadBuf::uninit(spare); + + // SAFETY: These bytes were initialized but not filled in the previous loop + unsafe { + read_buf.assume_init(read_buf.capacity()); + } + + match self.read(read_buf.initialized_mut().len()) { + Ok(bs) if bs.is_empty() => return Ok(buf.len() - start_len), + Ok(bs) => { + read_buf.initialized_mut()[..bs.len()].copy_from_slice(&bs); + + // SAFETY: Read API makes sure that returning `n` is correct. + unsafe { + buf.set_len(buf.len() + bs.len()); + } + } + Err(e) => return Err(e), + } + } } } impl io::Read for BlockingReader { #[inline] fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.inner.read(buf).map_err(format_std_io_error) + let bs = self.inner.read(buf.len()).map_err(format_std_io_error)?; + buf[..bs.len()].copy_from_slice(&bs); + Ok(bs.len()) } } @@ -483,9 +558,15 @@ impl Iterator for BlockingReader { #[inline] fn next(&mut self) -> Option { - self.inner - .next() - .map(|v| v.map_err(|err| io::Error::new(io::ErrorKind::Interrupted, err))) + match self + .inner + .read(4 * 1024 * 1024) + .map_err(format_std_io_error) + { + Ok(bs) if bs.is_empty() => None, + Ok(bs) => Some(Ok(bs)), + Err(err) => Some(Err(err)), + } } }