From 43161a762e1679a59d544ac7d0a3fa230778f245 Mon Sep 17 00:00:00 2001 From: elijah Date: Mon, 9 Jan 2023 20:37:46 +0800 Subject: [PATCH 1/4] feat: impl tokio::io::{AsyncRead, AsyncSeek} for ObjectReader --- Cargo.toml | 2 +- src/object/reader.rs | 109 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 108 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ca77a6502e9..407361bc59d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -148,7 +148,7 @@ suppaftp = { version = "4.5", default-features = false, features = [ "async-rustls", ], optional = true } time = { version = "0.3", features = ["serde"] } -tokio = { version = "1.20", features = ["fs"] } +tokio = { version = "1.20", features = ["fs", "io-util"] } tracing = { version = "0.1", optional = true } ureq = { version = "2", default-features = false } uuid = { version = "1", features = ["serde", "v4"] } diff --git a/src/object/reader.rs b/src/object/reader.rs index 70b1f1ee5e1..eeeae7f2944 100644 --- a/src/object/reader.rs +++ b/src/object/reader.rs @@ -19,7 +19,7 @@ use std::task::Context; use std::task::Poll; use bytes::Bytes; -use futures::AsyncRead; +use futures::{AsyncRead, ready}; use futures::AsyncSeek; use futures::Stream; use parking_lot::Mutex; @@ -99,6 +99,7 @@ use crate::OpStat; /// In this way, we can reduce the extra cost of dropping reader. pub struct ObjectReader { inner: output::Reader, + seek_state: SeekState, } impl ObjectReader { @@ -148,7 +149,7 @@ impl ObjectReader { Box::new(output::into_reader::as_streamable(r, 256 * 1024)) }; - Ok(ObjectReader { inner: r }) + Ok(ObjectReader { inner: r , seek_state: SeekState::Init }) } } @@ -186,6 +187,56 @@ impl AsyncSeek for ObjectReader { } } +impl tokio::io::AsyncRead for ObjectReader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + let b = buf.initialize_unfilled(); + let n = ready!(futures::AsyncRead::poll_read(self.as_mut(), cx, b))?; + unsafe { + buf.assume_init(n); + } + buf.advance(n); + Poll::Ready(Ok(())) + } +} + +impl tokio::io::AsyncSeek for ObjectReader { + fn start_seek(self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> { + let this = self.get_mut(); + this.seek_state = SeekState::Start(pos); + Ok(()) + } + + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let state = self.seek_state.clone(); + match state { + SeekState::Init => { + // AsyncSeek recommends calling poll_complete before start_seek. + // We don't have to guarantee that the value returned by + // poll_complete called without start_seek is correct, + // so we'll return 0. + Poll::Ready(Ok(0)) + }, + SeekState::Start(pos) => { + let n = ready!(futures::AsyncSeek::poll_seek(self.as_mut(), cx, pos))?; + Poll::Ready(Ok(n)) + } + } + } +} + +#[derive(Debug, Clone, Copy)] +/// SeekState is used to track the tokio seek state of ObjectReader. +enum SeekState { + /// start_seek has not been called. + Init, + /// start_seek has been called, but poll_complete has not yet been called. + Start(io::SeekFrom), +} + impl Stream for ObjectReader { type Item = io::Result; @@ -209,3 +260,57 @@ async fn get_total_size( *(meta.lock()) = om; Ok(size) } + +#[cfg(test)] +mod tests { + use rand::{Rng, RngCore}; + use rand::rngs::ThreadRng; + use crate::{Operator, Scheme}; + use tokio::io::AsyncReadExt; + use tokio::io::AsyncSeekExt; + + fn gen_random_bytes() -> Vec { + let mut rng = ThreadRng::default(); + // Generate size between 1B..16MB. + let size = rng.gen_range(1..16*1024*1024); + let mut content = vec![0; size]; + rng.fill_bytes(&mut content); + content + } + + #[tokio::test] + async fn test_reader_async_read() { + let op = Operator::from_env(Scheme::Memory).unwrap(); + let obj = op.object("test_file"); + + let content = gen_random_bytes(); + obj.write(&*content).await.expect("writ to object must succeed"); + + let mut reader = obj.reader().await.unwrap(); + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.expect("read to end must succeed"); + + assert_eq!(buf, content); + } + + #[tokio::test] + async fn test_reader_async_seek() { + let op = Operator::from_env(Scheme::Memory).unwrap(); + let obj = op.object("test_file"); + + let content = gen_random_bytes(); + obj.write(&*content).await.expect("writ to object must succeed"); + + let mut reader = obj.reader().await.unwrap(); + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.expect("read to end must succeed"); + assert_eq!(buf, content); + + let n = reader.seek(tokio::io::SeekFrom::Start(0)).await.unwrap(); + assert_eq!(n, 0, "seekp osition must be 0"); + + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.expect("read to end must succeed"); + assert_eq!(buf, content); + } +} \ No newline at end of file From ca6f5bc62864a1af43fb8da8540e40091e090659 Mon Sep 17 00:00:00 2001 From: elijah Date: Mon, 9 Jan 2023 21:46:12 +0800 Subject: [PATCH 2/4] chore: fmt code --- src/object/reader.rs | 40 ++++++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/src/object/reader.rs b/src/object/reader.rs index eeeae7f2944..b16f99264e6 100644 --- a/src/object/reader.rs +++ b/src/object/reader.rs @@ -19,9 +19,9 @@ use std::task::Context; use std::task::Poll; use bytes::Bytes; -use futures::{AsyncRead, ready}; use futures::AsyncSeek; use futures::Stream; +use futures::{ready, AsyncRead}; use parking_lot::Mutex; use crate::error::Result; @@ -149,7 +149,10 @@ impl ObjectReader { Box::new(output::into_reader::as_streamable(r, 256 * 1024)) }; - Ok(ObjectReader { inner: r , seek_state: SeekState::Init }) + Ok(ObjectReader { + inner: r, + seek_state: SeekState::Init, + }) } } @@ -219,7 +222,7 @@ impl tokio::io::AsyncSeek for ObjectReader { // poll_complete called without start_seek is correct, // so we'll return 0. Poll::Ready(Ok(0)) - }, + } SeekState::Start(pos) => { let n = ready!(futures::AsyncSeek::poll_seek(self.as_mut(), cx, pos))?; Poll::Ready(Ok(n)) @@ -263,16 +266,16 @@ async fn get_total_size( #[cfg(test)] mod tests { - use rand::{Rng, RngCore}; - use rand::rngs::ThreadRng; use crate::{Operator, Scheme}; + use rand::rngs::ThreadRng; + use rand::{Rng, RngCore}; use tokio::io::AsyncReadExt; use tokio::io::AsyncSeekExt; fn gen_random_bytes() -> Vec { let mut rng = ThreadRng::default(); // Generate size between 1B..16MB. - let size = rng.gen_range(1..16*1024*1024); + let size = rng.gen_range(1..16 * 1024 * 1024); let mut content = vec![0; size]; rng.fill_bytes(&mut content); content @@ -284,11 +287,16 @@ mod tests { let obj = op.object("test_file"); let content = gen_random_bytes(); - obj.write(&*content).await.expect("writ to object must succeed"); + obj.write(&*content) + .await + .expect("writ to object must succeed"); let mut reader = obj.reader().await.unwrap(); let mut buf = Vec::new(); - reader.read_to_end(&mut buf).await.expect("read to end must succeed"); + reader + .read_to_end(&mut buf) + .await + .expect("read to end must succeed"); assert_eq!(buf, content); } @@ -299,18 +307,26 @@ mod tests { let obj = op.object("test_file"); let content = gen_random_bytes(); - obj.write(&*content).await.expect("writ to object must succeed"); + obj.write(&*content) + .await + .expect("writ to object must succeed"); let mut reader = obj.reader().await.unwrap(); let mut buf = Vec::new(); - reader.read_to_end(&mut buf).await.expect("read to end must succeed"); + reader + .read_to_end(&mut buf) + .await + .expect("read to end must succeed"); assert_eq!(buf, content); let n = reader.seek(tokio::io::SeekFrom::Start(0)).await.unwrap(); assert_eq!(n, 0, "seekp osition must be 0"); let mut buf = Vec::new(); - reader.read_to_end(&mut buf).await.expect("read to end must succeed"); + reader + .read_to_end(&mut buf) + .await + .expect("read to end must succeed"); assert_eq!(buf, content); } -} \ No newline at end of file +} From 8a5ecd4df66ec806069722d9768976b7757cfe7d Mon Sep 17 00:00:00 2001 From: elijah Date: Mon, 9 Jan 2023 22:51:59 +0800 Subject: [PATCH 3/4] chore: fix clippy --- src/object/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/object/reader.rs b/src/object/reader.rs index b16f99264e6..beaeb74a911 100644 --- a/src/object/reader.rs +++ b/src/object/reader.rs @@ -214,7 +214,7 @@ impl tokio::io::AsyncSeek for ObjectReader { } fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let state = self.seek_state.clone(); + let state = self.seek_state; match state { SeekState::Init => { // AsyncSeek recommends calling poll_complete before start_seek. From 00e4b432a1a1af9e33c7eaf03abd657cecb56759 Mon Sep 17 00:00:00 2001 From: elijah Date: Mon, 9 Jan 2023 23:45:24 +0800 Subject: [PATCH 4/4] chore: improve the code --- Cargo.toml | 2 +- src/object/reader.rs | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 407361bc59d..ca77a6502e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -148,7 +148,7 @@ suppaftp = { version = "4.5", default-features = false, features = [ "async-rustls", ], optional = true } time = { version = "0.3", features = ["serde"] } -tokio = { version = "1.20", features = ["fs", "io-util"] } +tokio = { version = "1.20", features = ["fs"] } tracing = { version = "0.1", optional = true } ureq = { version = "2", default-features = false } uuid = { version = "1", features = ["serde", "v4"] } diff --git a/src/object/reader.rs b/src/object/reader.rs index beaeb74a911..f2ceb0ea337 100644 --- a/src/object/reader.rs +++ b/src/object/reader.rs @@ -197,7 +197,7 @@ impl tokio::io::AsyncRead for ObjectReader { buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { let b = buf.initialize_unfilled(); - let n = ready!(futures::AsyncRead::poll_read(self.as_mut(), cx, b))?; + let n = ready!(self.inner.poll_read(cx, b))?; unsafe { buf.assume_init(n); } @@ -209,6 +209,12 @@ impl tokio::io::AsyncRead for ObjectReader { impl tokio::io::AsyncSeek for ObjectReader { fn start_seek(self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> { let this = self.get_mut(); + if let SeekState::Start(_) = this.seek_state { + return Err(io::Error::new( + io::ErrorKind::Other, + "another search is in progress.", + )); + } this.seek_state = SeekState::Start(pos); Ok(()) } @@ -224,7 +230,7 @@ impl tokio::io::AsyncSeek for ObjectReader { Poll::Ready(Ok(0)) } SeekState::Start(pos) => { - let n = ready!(futures::AsyncSeek::poll_seek(self.as_mut(), cx, pos))?; + let n = ready!(self.inner.poll_seek(cx, pos))?; Poll::Ready(Ok(n)) } }