From b33b711733e500b26005f5421777c3cdcdb3464e Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 1 Mar 2023 13:29:11 +0800 Subject: [PATCH 1/5] Remove not used operations Signed-off-by: Xuanwo --- src/raw/operation.rs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/raw/operation.rs b/src/raw/operation.rs index f73da224296..10aeb88176e 100644 --- a/src/raw/operation.rs +++ b/src/raw/operation.rs @@ -39,14 +39,6 @@ pub enum Operation { Batch, /// Operation for [`crate::raw::Accessor::presign`] Presign, - /// Operation for [`crate::raw::Accessor::create_multipart`] - CreateMultipart, - /// Operation for [`crate::raw::Accessor::write_multipart`] - WriteMultipart, - /// Operation for [`crate::raw::Accessor::complete_multipart`] - CompleteMultipart, - /// Operation for [`crate::raw::Accessor::abort_multipart`] - AbortMultipart, /// Operation for [`crate::raw::Accessor::blocking_create`] BlockingCreate, /// Operation for [`crate::raw::Accessor::blocking_read`] @@ -95,10 +87,6 @@ impl From for &'static str { Operation::Scan => "scan", Operation::Presign => "presign", Operation::Batch => "batch", - Operation::CreateMultipart => "create_multipart", - Operation::WriteMultipart => "write_multipart", - Operation::CompleteMultipart => "complete_multipart", - Operation::AbortMultipart => "abort_multipart", Operation::BlockingCreate => "blocking_create", Operation::BlockingRead => "blocking_read", Operation::BlockingWrite => "blocking_write", From 76d0f9a0375d18df57bf9586a8a5c7c455835f24 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 1 Mar 2023 13:31:29 +0800 Subject: [PATCH 2/5] Cleanup code Signed-off-by: Xuanwo --- src/docs/internals/accessor.rs | 2 +- src/services/azblob/backend.rs | 1 - src/services/azdfs/backend.rs | 1 - src/services/dashmap/backend.rs | 1 - src/services/fs/backend.rs | 1 - src/services/ftp/backend.rs | 1 - src/services/gcs/backend.rs | 1 - src/services/ghac/backend.rs | 1 - src/services/hdfs/backend.rs | 1 - src/services/http/backend.rs | 1 - src/services/ipfs/backend.rs | 1 - src/services/ipmfs/builder.rs | 1 - src/services/memcached/backend.rs | 1 - src/services/memory/backend.rs | 1 - src/services/moka/backend.rs | 1 - src/services/obs/backend.rs | 1 - src/services/oss/backend.rs | 1 - src/services/redis/backend.rs | 1 - src/services/rocksdb/backend.rs | 1 - src/services/s3/backend.rs | 1 - src/services/sled/backend.rs | 1 - src/services/webdav/backend.rs | 1 - src/services/webhdfs/backend.rs | 1 - 23 files changed, 1 insertion(+), 23 deletions(-) diff --git a/src/docs/internals/accessor.rs b/src/docs/internals/accessor.rs index 388243477d1..fd495c8b162 100644 --- a/src/docs/internals/accessor.rs +++ b/src/docs/internals/accessor.rs @@ -124,7 +124,7 @@ //! use AccessorCapability::*; //! //! let mut am = AccessorMetadata::default(); -//! am.set_capabilities(Read | Write | List | Scan | Presign | Multipart | Batch); +//! am.set_capabilities(Read | Write | List | Scan | Presign | Batch); //! //! am //! } diff --git a/src/services/azblob/backend.rs b/src/services/azblob/backend.rs index 5b7e5297828..018cb6b5f81 100644 --- a/src/services/azblob/backend.rs +++ b/src/services/azblob/backend.rs @@ -50,7 +50,6 @@ const X_MS_BLOB_TYPE: &str = "x-ms-blob-type"; /// - [x] list /// - [x] scan /// - [ ] presign -/// - [ ] multipart /// - [ ] blocking /// /// # Configuration diff --git a/src/services/azdfs/backend.rs b/src/services/azdfs/backend.rs index a104683c36e..ff60cdc0ab2 100644 --- a/src/services/azdfs/backend.rs +++ b/src/services/azdfs/backend.rs @@ -52,7 +52,6 @@ use crate::*; /// - [x] list /// - [ ] ~~scan~~ /// - [ ] presign -/// - [ ] multipart /// - [ ] blocking /// /// # Configuration diff --git a/src/services/dashmap/backend.rs b/src/services/dashmap/backend.rs index dd921b10ddb..8c4c76fed9e 100644 --- a/src/services/dashmap/backend.rs +++ b/src/services/dashmap/backend.rs @@ -33,7 +33,6 @@ use crate::*; /// - [ ] ~~list~~ /// - [ ] ~~scan~~ /// - [ ] ~~presign~~ -/// - [ ] ~~multipart~~ /// - [x] blocking #[derive(Default)] pub struct DashmapBuilder {} diff --git a/src/services/fs/backend.rs b/src/services/fs/backend.rs index 2ac9b647e12..e5b905acc14 100644 --- a/src/services/fs/backend.rs +++ b/src/services/fs/backend.rs @@ -46,7 +46,6 @@ use crate::*; /// - [x] list /// - [ ] ~~scan~~ /// - [ ] ~~presign~~ -/// - [ ] ~~multipart~~ /// - [x] blocking /// /// # Configuration diff --git a/src/services/ftp/backend.rs b/src/services/ftp/backend.rs index 7722a912462..6e68e2236af 100644 --- a/src/services/ftp/backend.rs +++ b/src/services/ftp/backend.rs @@ -54,7 +54,6 @@ use crate::*; /// - [x] list /// - [ ] ~~scan~~ /// - [ ] ~~presign~~ -/// - [ ] ~~multipart~~ /// - [ ] blocking /// /// # Configuration diff --git a/src/services/gcs/backend.rs b/src/services/gcs/backend.rs index b14dbe72c10..1f9736f6e4f 100644 --- a/src/services/gcs/backend.rs +++ b/src/services/gcs/backend.rs @@ -53,7 +53,6 @@ const DEFAULT_GCS_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.read /// - [x] list /// - [x] scan /// - [ ] presign -/// - [ ] multipart /// - [ ] blocking /// /// # Configuration diff --git a/src/services/ghac/backend.rs b/src/services/ghac/backend.rs index 8b47ee063f8..55adb53c51f 100644 --- a/src/services/ghac/backend.rs +++ b/src/services/ghac/backend.rs @@ -69,7 +69,6 @@ const GITHUB_API_VERSION: &str = "2022-11-28"; /// - [ ] list /// - [ ] ~~scan~~ /// - [ ] ~~presign~~ -/// - [ ] ~~multipart~~ /// - [ ] blocking /// /// # Notes diff --git a/src/services/hdfs/backend.rs b/src/services/hdfs/backend.rs index df9fe049654..631efde7d57 100644 --- a/src/services/hdfs/backend.rs +++ b/src/services/hdfs/backend.rs @@ -44,7 +44,6 @@ use crate::*; /// - [x] list /// - [ ] ~~scan~~ /// - [ ] ~~presign~~ -/// - [ ] ~~multipart~~ /// - [x] blocking /// /// # Differences with webhdfs diff --git a/src/services/http/backend.rs b/src/services/http/backend.rs index 55fa0741450..305e40fb736 100644 --- a/src/services/http/backend.rs +++ b/src/services/http/backend.rs @@ -39,7 +39,6 @@ use crate::*; /// - [ ] ~~list~~ /// - [ ] ~~scan~~ /// - [ ] ~~presign~~ -/// - [ ] ~~multipart~~ /// - [ ] blocking /// /// # Notes diff --git a/src/services/ipfs/backend.rs b/src/services/ipfs/backend.rs index 83d44d39abb..8e7f031d831 100644 --- a/src/services/ipfs/backend.rs +++ b/src/services/ipfs/backend.rs @@ -41,7 +41,6 @@ use crate::*; /// - [x] list /// - [ ] ~~scan~~ /// - [ ] presign -/// - [ ] ~~multipart~~ /// - [ ] blocking /// /// # Configuration diff --git a/src/services/ipmfs/builder.rs b/src/services/ipmfs/builder.rs index bc423b269d8..d8cc64caf90 100644 --- a/src/services/ipmfs/builder.rs +++ b/src/services/ipmfs/builder.rs @@ -31,7 +31,6 @@ use crate::*; /// - [x] list /// - [ ] ~~scan~~ /// - [ ] presign -/// - [ ] ~~multipart~~ /// - [ ] blocking /// /// # Configuration diff --git a/src/services/memcached/backend.rs b/src/services/memcached/backend.rs index 04f70d98c43..2a5afb30c6f 100644 --- a/src/services/memcached/backend.rs +++ b/src/services/memcached/backend.rs @@ -36,7 +36,6 @@ use crate::*; /// - [ ] ~~list~~ /// - [ ] scan /// - [ ] ~~presign~~ -/// - [ ] ~~multipart~~ /// - [x] blocking /// /// # Configuration diff --git a/src/services/memory/backend.rs b/src/services/memory/backend.rs index 976a01cd6da..9857008ca59 100644 --- a/src/services/memory/backend.rs +++ b/src/services/memory/backend.rs @@ -34,7 +34,6 @@ use crate::*; /// - [ ] ~~list~~ /// - [x] scan /// - [ ] ~~presign~~ -/// - [ ] ~~multipart~~ /// - [x] blocking #[derive(Default)] pub struct MemoryBuilder {} diff --git a/src/services/moka/backend.rs b/src/services/moka/backend.rs index e6beb19de88..3883596b2a0 100644 --- a/src/services/moka/backend.rs +++ b/src/services/moka/backend.rs @@ -36,7 +36,6 @@ use crate::*; /// - [ ] ~~list~~ /// - [ ] ~~scan~~ /// - [ ] ~~presign~~ -/// - [ ] ~~multipart~~ /// - [x] blocking #[derive(Default, Debug)] pub struct MokaBuilder { diff --git a/src/services/obs/backend.rs b/src/services/obs/backend.rs index 40913da3642..fa6aadf1b7d 100644 --- a/src/services/obs/backend.rs +++ b/src/services/obs/backend.rs @@ -44,7 +44,6 @@ use crate::*; /// - [x] list /// - [x] scan /// - [ ] presign -/// - [ ] multipart /// - [ ] blocking /// /// # Configuration diff --git a/src/services/oss/backend.rs b/src/services/oss/backend.rs index 9df3e1d95a0..aaeaf7c557a 100644 --- a/src/services/oss/backend.rs +++ b/src/services/oss/backend.rs @@ -53,7 +53,6 @@ use crate::*; /// - [x] list /// - [x] scan /// - [ ] presign -/// - [ ] multipart /// - [ ] blocking /// /// # Configuration diff --git a/src/services/redis/backend.rs b/src/services/redis/backend.rs index 72b45c7e6ae..975a1921167 100644 --- a/src/services/redis/backend.rs +++ b/src/services/redis/backend.rs @@ -47,7 +47,6 @@ const DEFAULT_REDIS_PORT: u16 = 6379; /// - [ ] ~~list~~ /// - [ ] scan /// - [ ] ~~presign~~ -/// - [ ] ~~multipart~~ /// - [ ] blocking /// /// # Configuration diff --git a/src/services/rocksdb/backend.rs b/src/services/rocksdb/backend.rs index 7eaded53159..856c20097db 100644 --- a/src/services/rocksdb/backend.rs +++ b/src/services/rocksdb/backend.rs @@ -36,7 +36,6 @@ use crate::*; /// - [ ] ~~list~~ /// - [ ] scan /// - [ ] ~~presign~~ -/// - [ ] ~~multipart~~ /// - [x] blocking /// /// # Note diff --git a/src/services/s3/backend.rs b/src/services/s3/backend.rs index 89db304c141..2b98eb1e578 100644 --- a/src/services/s3/backend.rs +++ b/src/services/s3/backend.rs @@ -86,7 +86,6 @@ mod constants { /// - [x] list /// - [x] scan /// - [x] presign -/// - [x] multipart /// - [ ] blocking /// /// # Configuration diff --git a/src/services/sled/backend.rs b/src/services/sled/backend.rs index ec1806ae013..238698462af 100644 --- a/src/services/sled/backend.rs +++ b/src/services/sled/backend.rs @@ -37,7 +37,6 @@ use crate::*; /// - [ ] ~~list~~ /// - [ ] scan /// - [ ] ~~presign~~ -/// - [ ] ~~multipart~~ /// - [x] blocking /// /// # Note diff --git a/src/services/webdav/backend.rs b/src/services/webdav/backend.rs index 0f556df2a31..8b74ba53e4e 100644 --- a/src/services/webdav/backend.rs +++ b/src/services/webdav/backend.rs @@ -43,7 +43,6 @@ use crate::*; /// - [x] list /// - [ ] ~~scan~~ /// - [ ] ~~presign~~ -/// - [ ] ~~multipart~~ /// - [ ] blocking /// /// # Notes diff --git a/src/services/webhdfs/backend.rs b/src/services/webhdfs/backend.rs index 5b58b10733c..39ef215bebb 100644 --- a/src/services/webhdfs/backend.rs +++ b/src/services/webhdfs/backend.rs @@ -56,7 +56,6 @@ const WEBHDFS_DEFAULT_ENDPOINT: &str = "http://127.0.0.1:9870"; /// - [x] list /// - [ ] ~~scan~~ /// - [ ] ~~presign~~ -/// - [ ] ~~multipart~~ /// - [ ] blocking /// /// # Differences with hdfs From 1fc74a0801ae10f097a672fcca8d24c4cfd8dc12 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 1 Mar 2023 13:40:13 +0800 Subject: [PATCH 3/5] Refactor pager API Signed-off-by: Xuanwo --- src/layers/complete.rs | 16 +++++++------- src/layers/concurrent_limit.rs | 8 +++---- src/layers/error_context.rs | 8 +++---- src/layers/immutable_index.rs | 4 ++-- src/layers/logging.rs | 8 +++---- src/layers/retry.rs | 12 +++++------ src/layers/tracing.rs | 8 +++---- src/object/list.rs | 8 +++---- src/raw/adapters/kv/backend.rs | 4 ++-- src/raw/io/output/page.rs | 28 ++++++++++++------------- src/raw/io/output/to_flat_pager.rs | 12 +++++------ src/raw/io/output/to_hierarchy_pager.rs | 12 +++++------ src/services/azblob/dir_stream.rs | 2 +- src/services/azdfs/dir_stream.rs | 2 +- src/services/fs/dir_stream.rs | 4 ++-- src/services/ftp/dir_stream.rs | 2 +- src/services/gcs/dir_stream.rs | 2 +- src/services/hdfs/dir_stream.rs | 4 ++-- src/services/ipfs/backend.rs | 2 +- src/services/ipmfs/dir_stream.rs | 2 +- src/services/obs/dir_stream.rs | 2 +- src/services/oss/dir_stream.rs | 2 +- src/services/s3/dir_stream.rs | 2 +- src/services/webdav/dir_stream.rs | 2 +- src/services/webhdfs/dir_stream.rs | 2 +- src/services/webhdfs/message.rs | 2 +- 26 files changed, 80 insertions(+), 80 deletions(-) diff --git a/src/layers/complete.rs b/src/layers/complete.rs index 0bec5aad558..dbbd9661e91 100644 --- a/src/layers/complete.rs +++ b/src/layers/complete.rs @@ -480,13 +480,13 @@ where A: Accessor, P: output::Page, { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { use CompletePager::*; match self { - AlreadyComplete(p) => p.next_page().await, - NeedFlat(p) => p.next_page().await, - NeedHierarchy(p) => p.next_page().await, + AlreadyComplete(p) => p.next().await, + NeedFlat(p) => p.next().await, + NeedHierarchy(p) => p.next().await, } } } @@ -496,13 +496,13 @@ where A: Accessor, P: output::BlockingPage, { - fn next_page(&mut self) -> Result>> { + fn next(&mut self) -> Result>> { use CompletePager::*; match self { - AlreadyComplete(p) => p.next_page(), - NeedFlat(p) => p.next_page(), - NeedHierarchy(p) => p.next_page(), + AlreadyComplete(p) => p.next(), + NeedFlat(p) => p.next(), + NeedHierarchy(p) => p.next(), } } } diff --git a/src/layers/concurrent_limit.rs b/src/layers/concurrent_limit.rs index 9af48612658..fa202a442fc 100644 --- a/src/layers/concurrent_limit.rs +++ b/src/layers/concurrent_limit.rs @@ -337,13 +337,13 @@ impl output::BlockingWrite for ConcurrentLimitWrapper< #[async_trait] impl output::Page for ConcurrentLimitWrapper { - async fn next_page(&mut self) -> Result>> { - self.inner.next_page().await + async fn next(&mut self) -> Result>> { + self.inner.next().await } } impl output::BlockingPage for ConcurrentLimitWrapper { - fn next_page(&mut self) -> Result>> { - self.inner.next_page() + fn next(&mut self) -> Result>> { + self.inner.next() } } diff --git a/src/layers/error_context.rs b/src/layers/error_context.rs index f7619193852..45bf7b782ca 100644 --- a/src/layers/error_context.rs +++ b/src/layers/error_context.rs @@ -296,8 +296,8 @@ pub struct ErrorContextWrapper { #[async_trait::async_trait] impl output::Page for ErrorContextWrapper { - async fn next_page(&mut self) -> Result>> { - self.inner.next_page().await.map_err(|err| { + async fn next(&mut self) -> Result>> { + self.inner.next().await.map_err(|err| { err.with_operation("Page::next_page") .with_context("service", self.scheme) .with_context("path", &self.path) @@ -306,8 +306,8 @@ impl output::Page for ErrorContextWrapper { } impl output::BlockingPage for ErrorContextWrapper { - fn next_page(&mut self) -> Result>> { - self.inner.next_page().map_err(|err| { + fn next(&mut self) -> Result>> { + self.inner.next().map_err(|err| { err.with_operation("Page::next_page") .with_context("service", self.scheme) .with_context("path", &self.path) diff --git a/src/layers/immutable_index.rs b/src/layers/immutable_index.rs index 5ee8f8164b3..472efee23bf 100644 --- a/src/layers/immutable_index.rs +++ b/src/layers/immutable_index.rs @@ -254,13 +254,13 @@ impl ImmutableDir { #[async_trait] impl output::Page for ImmutableDir { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { Ok(self.inner_next_page()) } } impl output::BlockingPage for ImmutableDir { - fn next_page(&mut self) -> Result>> { + fn next(&mut self) -> Result>> { Ok(self.inner_next_page()) } } diff --git a/src/layers/logging.rs b/src/layers/logging.rs index c7d661cd66d..8b046ca5f54 100644 --- a/src/layers/logging.rs +++ b/src/layers/logging.rs @@ -1244,8 +1244,8 @@ impl

LoggingPager

{ #[async_trait] impl output::Page for LoggingPager

{ - async fn next_page(&mut self) -> Result>> { - let res = self.inner.next_page().await; + async fn next(&mut self) -> Result>> { + let res = self.inner.next().await; match &res { Ok(Some(des)) => { @@ -1288,8 +1288,8 @@ impl output::Page for LoggingPager

{ } impl output::BlockingPage for LoggingPager

{ - fn next_page(&mut self) -> Result>> { - let res = self.inner.next_page(); + fn next(&mut self) -> Result>> { + let res = self.inner.next(); match &res { Ok(Some(des)) => { diff --git a/src/layers/retry.rs b/src/layers/retry.rs index 24eedba6069..e5c7a98ef16 100644 --- a/src/layers/retry.rs +++ b/src/layers/retry.rs @@ -681,11 +681,11 @@ impl

RetryPager

{ #[async_trait] impl output::Page for RetryPager

{ - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { if let Some(sleep) = self.sleep.take() { tokio::time::sleep(sleep).await; } - match self.inner.next_page().await { + match self.inner.next().await { Ok(v) => { // request successful, reset backoff self.reset_backoff(); @@ -720,7 +720,7 @@ impl output::Page for RetryPager

{ "operation={} path={} -> pager retry after {}s: error={:?}", Operation::List, self.path, dur.as_secs_f64(), e); self.sleep = Some(dur); - self.next_page().await + self.next().await } } } @@ -729,8 +729,8 @@ impl output::Page for RetryPager

{ } impl output::BlockingPage for RetryPager

{ - fn next_page(&mut self) -> Result>> { - { || self.inner.next_page() } + fn next(&mut self) -> Result>> { + { || self.inner.next() } .retry(&self.policy) .when(|e| e.is_temporary()) .notify(move |err, dur| { @@ -859,7 +859,7 @@ mod tests { } #[async_trait] impl output::Page for MockPager { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { self.attempt += 1; match self.attempt { 1 => Err(Error::new( diff --git a/src/layers/tracing.rs b/src/layers/tracing.rs index b1712b17370..3ab09cea989 100644 --- a/src/layers/tracing.rs +++ b/src/layers/tracing.rs @@ -322,14 +322,14 @@ impl Read for TracingWrapper { #[async_trait] impl output::Page for TracingWrapper { #[tracing::instrument(parent = &self.span, level = "debug", skip_all)] - async fn next_page(&mut self) -> Result>> { - self.inner.next_page().await + async fn next(&mut self) -> Result>> { + self.inner.next().await } } impl output::BlockingPage for TracingWrapper { #[tracing::instrument(parent = &self.span, level = "debug", skip_all)] - fn next_page(&mut self) -> Result>> { - self.inner.next_page() + fn next(&mut self) -> Result>> { + self.inner.next() } } diff --git a/src/object/list.rs b/src/object/list.rs index e1e8b213a6c..86e40d86350 100644 --- a/src/object/list.rs +++ b/src/object/list.rs @@ -77,7 +77,7 @@ impl ObjectLister { .pager .as_mut() .expect("pager must be valid") - .next_page() + .next() .await? { // Ideally, the convert from `Vec` to `VecDeque` will not do reallocation. @@ -124,7 +124,7 @@ impl Stream for ObjectLister { let mut pager = self.pager.take().expect("pager must be valid"); let fut = async move { - let res = pager.next_page().await; + let res = pager.next().await; (pager, res) }; @@ -159,7 +159,7 @@ impl BlockingObjectLister { let entries = if !self.buf.is_empty() { mem::take(&mut self.buf) } else { - match self.pager.next_page()? { + match self.pager.next()? { // Ideally, the convert from `Vec` to `VecDeque` will not do reallocation. // // However, this could be changed as described in [impl From> for VecDeque](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E) @@ -186,7 +186,7 @@ impl Iterator for BlockingObjectLister { return Some(Ok(oe.into_object(self.operator()))); } - self.buf = match self.pager.next_page() { + self.buf = match self.pager.next() { // Ideally, the convert from `Vec` to `VecDeque` will not do reallocation. // // However, this could be changed as described in [impl From> for VecDeque](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E) diff --git a/src/raw/adapters/kv/backend.rs b/src/raw/adapters/kv/backend.rs index c81a9510c79..1db5861bb17 100644 --- a/src/raw/adapters/kv/backend.rs +++ b/src/raw/adapters/kv/backend.rs @@ -260,13 +260,13 @@ impl KvPager { #[async_trait] impl output::Page for KvPager { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { Ok(self.inner_next_page()) } } impl output::BlockingPage for KvPager { - fn next_page(&mut self) -> Result>> { + fn next(&mut self) -> Result>> { Ok(self.inner_next_page()) } } diff --git a/src/raw/io/output/page.rs b/src/raw/io/output/page.rs index 8878edac83b..b3fd7f497c7 100644 --- a/src/raw/io/output/page.rs +++ b/src/raw/io/output/page.rs @@ -24,8 +24,8 @@ pub trait Page: Send + Sync + 'static { /// Fetch a new page of [`Entry`] /// /// `Ok(None)` means all object pages have been returned. Any following call - /// to `next_page` will always get the same result. - async fn next_page(&mut self) -> Result>>; + /// to `next` will always get the same result. + async fn next(&mut self) -> Result>>; } /// The boxed version of [`Page`] @@ -33,23 +33,23 @@ pub type Pager = Box; #[async_trait] impl Page for Pager { - async fn next_page(&mut self) -> Result>> { - self.as_mut().next_page().await + async fn next(&mut self) -> Result>> { + self.as_mut().next().await } } #[async_trait] impl Page for () { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { Ok(None) } } #[async_trait] impl Page for Option

{ - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { match self { - Some(p) => p.next_page().await, + Some(p) => p.next().await, None => Ok(None), } } @@ -60,30 +60,30 @@ pub trait BlockingPage: 'static { /// Fetch a new page of [`Entry`] /// /// `Ok(None)` means all object pages have been returned. Any following call - /// to `next_page` will always get the same result. - fn next_page(&mut self) -> Result>>; + /// to `next` will always get the same result. + fn next(&mut self) -> Result>>; } /// BlockingPager is a boxed [`BlockingPage`] pub type BlockingPager = Box; impl BlockingPage for BlockingPager { - fn next_page(&mut self) -> Result>> { - self.as_mut().next_page() + fn next(&mut self) -> Result>> { + self.as_mut().next() } } impl BlockingPage for () { - fn next_page(&mut self) -> Result>> { + fn next(&mut self) -> Result>> { Ok(None) } } #[async_trait] impl BlockingPage for Option

{ - fn next_page(&mut self) -> Result>> { + fn next(&mut self) -> Result>> { match self { - Some(p) => p.next_page(), + Some(p) => p.next(), None => Ok(None), } } diff --git a/src/raw/io/output/to_flat_pager.rs b/src/raw/io/output/to_flat_pager.rs index f184fd69957..3894306367c 100644 --- a/src/raw/io/output/to_flat_pager.rs +++ b/src/raw/io/output/to_flat_pager.rs @@ -98,7 +98,7 @@ where A: Accessor, P: output::Page, { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { loop { if let Some(de) = self.dirs.pop_back() { let (_, op) = self.acc.list(de.path(), OpList::new()).await?; @@ -116,7 +116,7 @@ where }; if buf.is_empty() { - match pager.next_page().await? { + match pager.next().await? { Some(v) => { buf = v; } @@ -155,7 +155,7 @@ where A: Accessor, P: output::BlockingPage, { - fn next_page(&mut self) -> Result>> { + fn next(&mut self) -> Result>> { loop { if let Some(de) = self.dirs.pop_back() { let (_, op) = self.acc.blocking_list(de.path(), OpList::new())?; @@ -173,7 +173,7 @@ where }; if buf.is_empty() { - match pager.next_page()? { + match pager.next()? { Some(v) => { buf = v; } @@ -267,7 +267,7 @@ mod tests { } impl BlockingPage for MockPager { - fn next_page(&mut self) -> Result>> { + fn next(&mut self) -> Result>> { if self.done { return Ok(None); } @@ -298,7 +298,7 @@ mod tests { let mut entries = Vec::default(); - while let Some(e) = pager.next_page()? { + while let Some(e) = pager.next()? { entries.extend_from_slice(&e) } diff --git a/src/raw/io/output/to_hierarchy_pager.rs b/src/raw/io/output/to_hierarchy_pager.rs index 670422f2408..0d3d374593a 100644 --- a/src/raw/io/output/to_hierarchy_pager.rs +++ b/src/raw/io/output/to_hierarchy_pager.rs @@ -115,8 +115,8 @@ impl

ToHierarchyPager

{ #[async_trait] impl output::Page for ToHierarchyPager

{ - async fn next_page(&mut self) -> Result>> { - let page = self.pager.next_page().await?; + async fn next(&mut self) -> Result>> { + let page = self.pager.next().await?; let entries = if let Some(entries) = page { entries @@ -131,8 +131,8 @@ impl output::Page for ToHierarchyPager

{ } impl output::BlockingPage for ToHierarchyPager

{ - fn next_page(&mut self) -> Result>> { - let page = self.pager.next_page()?; + fn next(&mut self) -> Result>> { + let page = self.pager.next()?; let entries = if let Some(entries) = page { entries @@ -171,7 +171,7 @@ mod tests { } impl BlockingPage for MockPager { - fn next_page(&mut self) -> Result>> { + fn next(&mut self) -> Result>> { if self.done { return Ok(None); } @@ -203,7 +203,7 @@ mod tests { let mut entries = Vec::default(); let mut set = HashSet::new(); - while let Some(e) = pager.next_page()? { + while let Some(e) = pager.next()? { for i in &e { debug!("got path {}", i.path()); assert!( diff --git a/src/services/azblob/dir_stream.rs b/src/services/azblob/dir_stream.rs index b07d981f99d..967b98d113a 100644 --- a/src/services/azblob/dir_stream.rs +++ b/src/services/azblob/dir_stream.rs @@ -60,7 +60,7 @@ impl DirStream { #[async_trait] impl output::Page for DirStream { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { if self.done { return Ok(None); } diff --git a/src/services/azdfs/dir_stream.rs b/src/services/azdfs/dir_stream.rs index 570dce6f2e5..4a83ed8add3 100644 --- a/src/services/azdfs/dir_stream.rs +++ b/src/services/azdfs/dir_stream.rs @@ -56,7 +56,7 @@ impl DirStream { #[async_trait] impl output::Page for DirStream { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { if self.done { return Ok(None); } diff --git a/src/services/fs/dir_stream.rs b/src/services/fs/dir_stream.rs index cc3a8cfa1b9..a09a60aa9c5 100644 --- a/src/services/fs/dir_stream.rs +++ b/src/services/fs/dir_stream.rs @@ -42,7 +42,7 @@ impl DirPager { #[async_trait] impl output::Page for DirPager { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { let mut oes: Vec = Vec::with_capacity(self.size); for _ in 0..self.size { @@ -103,7 +103,7 @@ impl BlockingDirPager { } impl output::BlockingPage for BlockingDirPager { - fn next_page(&mut self) -> Result>> { + fn next(&mut self) -> Result>> { let mut oes: Vec = Vec::with_capacity(self.size); for _ in 0..self.size { diff --git a/src/services/ftp/dir_stream.rs b/src/services/ftp/dir_stream.rs index 692ad611497..49f76e19b55 100644 --- a/src/services/ftp/dir_stream.rs +++ b/src/services/ftp/dir_stream.rs @@ -70,7 +70,7 @@ impl DirStream { #[async_trait] impl output::Page for DirStream { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { let mut oes: Vec = Vec::with_capacity(self.size); for _ in 0..self.size { diff --git a/src/services/gcs/dir_stream.rs b/src/services/gcs/dir_stream.rs index f17a4231d5b..f98a08c3ff6 100644 --- a/src/services/gcs/dir_stream.rs +++ b/src/services/gcs/dir_stream.rs @@ -62,7 +62,7 @@ impl DirStream { #[async_trait] impl output::Page for DirStream { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { if self.done { return Ok(None); } diff --git a/src/services/hdfs/dir_stream.rs b/src/services/hdfs/dir_stream.rs index e0254f4b21f..1441afee7b0 100644 --- a/src/services/hdfs/dir_stream.rs +++ b/src/services/hdfs/dir_stream.rs @@ -39,7 +39,7 @@ impl DirStream { #[async_trait] impl output::Page for DirStream { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { let mut oes: Vec = Vec::with_capacity(self.size); for _ in 0..self.size { @@ -70,7 +70,7 @@ impl output::Page for DirStream { } impl output::BlockingPage for DirStream { - fn next_page(&mut self) -> Result>> { + fn next(&mut self) -> Result>> { let mut oes: Vec = Vec::with_capacity(self.size); for _ in 0..self.size { diff --git a/src/services/ipfs/backend.rs b/src/services/ipfs/backend.rs index 8e7f031d831..debe9059400 100644 --- a/src/services/ipfs/backend.rs +++ b/src/services/ipfs/backend.rs @@ -469,7 +469,7 @@ impl DirStream { #[async_trait] impl output::Page for DirStream { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { if self.consumed { return Ok(None); } diff --git a/src/services/ipmfs/dir_stream.rs b/src/services/ipmfs/dir_stream.rs index c204c406898..a9f4404f3f4 100644 --- a/src/services/ipmfs/dir_stream.rs +++ b/src/services/ipmfs/dir_stream.rs @@ -45,7 +45,7 @@ impl DirStream { #[async_trait] impl output::Page for DirStream { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { if self.consumed { return Ok(None); } diff --git a/src/services/obs/dir_stream.rs b/src/services/obs/dir_stream.rs index 8dcf8da46e8..3ec786c6fac 100644 --- a/src/services/obs/dir_stream.rs +++ b/src/services/obs/dir_stream.rs @@ -62,7 +62,7 @@ impl DirStream { #[async_trait] impl output::Page for DirStream { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { if self.done { return Ok(None); } diff --git a/src/services/oss/dir_stream.rs b/src/services/oss/dir_stream.rs index e6ab3f05b8b..31bea3fb9c0 100644 --- a/src/services/oss/dir_stream.rs +++ b/src/services/oss/dir_stream.rs @@ -66,7 +66,7 @@ impl DirStream { #[async_trait] impl output::Page for DirStream { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { if self.done { return Ok(None); } diff --git a/src/services/s3/dir_stream.rs b/src/services/s3/dir_stream.rs index 31605977558..775a426c914 100644 --- a/src/services/s3/dir_stream.rs +++ b/src/services/s3/dir_stream.rs @@ -64,7 +64,7 @@ impl DirStream { #[async_trait] impl output::Page for DirStream { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { if self.done { return Ok(None); } diff --git a/src/services/webdav/dir_stream.rs b/src/services/webdav/dir_stream.rs index b85a6954454..f801ab707f3 100644 --- a/src/services/webdav/dir_stream.rs +++ b/src/services/webdav/dir_stream.rs @@ -41,7 +41,7 @@ impl DirStream { #[async_trait] impl output::Page for DirStream { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { if self.multistates.response.is_empty() { return Ok(None); }; diff --git a/src/services/webhdfs/dir_stream.rs b/src/services/webhdfs/dir_stream.rs index 77b91674e65..ce2f266b094 100644 --- a/src/services/webhdfs/dir_stream.rs +++ b/src/services/webhdfs/dir_stream.rs @@ -34,7 +34,7 @@ impl DirStream { #[async_trait] impl output::Page for DirStream { - async fn next_page(&mut self) -> Result>> { + async fn next(&mut self) -> Result>> { if self.statuses.is_empty() { return Ok(None); } diff --git a/src/services/webhdfs/message.rs b/src/services/webhdfs/message.rs index d1efd95a638..0bf98933680 100644 --- a/src/services/webhdfs/message.rs +++ b/src/services/webhdfs/message.rs @@ -167,7 +167,7 @@ mod test { let mut pager = DirStream::new("listing/directory", file_statuses); let mut entries = vec![]; - while let Some(oes) = pager.next_page().await.expect("must success") { + while let Some(oes) = pager.next().await.expect("must success") { entries.extend(oes); } From fe2e745ceb425aa056978032f00685b487360ee8 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 1 Mar 2023 14:05:01 +0800 Subject: [PATCH 4/5] Rename to pager Signed-off-by: Xuanwo --- src/services/azblob/backend.rs | 8 ++--- src/services/azblob/mod.rs | 2 +- .../azblob/{dir_stream.rs => pager.rs} | 6 ++-- src/services/azdfs/backend.rs | 6 ++-- src/services/azdfs/mod.rs | 2 +- .../azdfs/{dir_stream.rs => pager.rs} | 6 ++-- src/services/fs/backend.rs | 11 ++++--- src/services/fs/mod.rs | 2 +- src/services/fs/{dir_stream.rs => pager.rs} | 29 ++++--------------- src/services/ftp/backend.rs | 8 ++--- src/services/ftp/mod.rs | 2 +- src/services/ftp/{dir_stream.rs => pager.rs} | 9 ++++-- src/services/gcs/backend.rs | 8 ++--- src/services/gcs/mod.rs | 2 +- src/services/gcs/{dir_stream.rs => pager.rs} | 8 ++--- src/services/hdfs/backend.rs | 10 +++---- src/services/hdfs/mod.rs | 2 +- src/services/hdfs/{dir_stream.rs => pager.rs} | 8 ++--- src/services/ipmfs/backend.rs | 6 ++-- src/services/ipmfs/mod.rs | 2 +- .../ipmfs/{dir_stream.rs => pager.rs} | 6 ++-- src/services/obs/backend.rs | 8 ++--- src/services/obs/mod.rs | 2 +- src/services/obs/{dir_stream.rs => pager.rs} | 6 ++-- src/services/oss/backend.rs | 8 ++--- src/services/oss/mod.rs | 2 +- src/services/oss/{dir_stream.rs => pager.rs} | 6 ++-- src/services/s3/backend.rs | 8 ++--- src/services/s3/mod.rs | 2 +- src/services/s3/{dir_stream.rs => pager.rs} | 6 ++-- src/services/webdav/backend.rs | 11 ++++--- src/services/webdav/mod.rs | 2 +- .../webdav/{dir_stream.rs => pager.rs} | 6 ++-- src/services/webhdfs/backend.rs | 8 ++--- src/services/webhdfs/message.rs | 4 +-- src/services/webhdfs/mod.rs | 2 +- .../webhdfs/{dir_stream.rs => pager.rs} | 6 ++-- 37 files changed, 109 insertions(+), 121 deletions(-) rename src/services/azblob/{dir_stream.rs => pager.rs} (99%) rename src/services/azdfs/{dir_stream.rs => pager.rs} (98%) rename src/services/fs/{dir_stream.rs => pager.rs} (88%) rename src/services/ftp/{dir_stream.rs => pager.rs} (93%) rename src/services/gcs/{dir_stream.rs => pager.rs} (98%) rename src/services/hdfs/{dir_stream.rs => pager.rs} (96%) rename src/services/ipmfs/{dir_stream.rs => pager.rs} (97%) rename src/services/obs/{dir_stream.rs => pager.rs} (98%) rename src/services/oss/{dir_stream.rs => pager.rs} (99%) rename src/services/s3/{dir_stream.rs => pager.rs} (99%) rename src/services/webdav/{dir_stream.rs => pager.rs} (96%) rename src/services/webhdfs/{dir_stream.rs => pager.rs} (94%) diff --git a/src/services/azblob/backend.rs b/src/services/azblob/backend.rs index 018cb6b5f81..42e22e23ad3 100644 --- a/src/services/azblob/backend.rs +++ b/src/services/azblob/backend.rs @@ -29,8 +29,8 @@ use http::StatusCode; use log::debug; use reqsign::AzureStorageSigner; -use super::dir_stream::DirStream; use super::error::parse_error; +use super::pager::AzblobPager; use super::writer::AzblobWriter; use crate::object::ObjectMetadata; use crate::ops::*; @@ -419,7 +419,7 @@ impl Accessor for AzblobBackend { type BlockingReader = (); type Writer = AzblobWriter; type BlockingWriter = (); - type Pager = DirStream; + type Pager = AzblobPager; type BlockingPager = (); fn metadata(&self) -> AccessorMetadata { @@ -507,7 +507,7 @@ impl Accessor for AzblobBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - let op = DirStream::new( + let op = AzblobPager::new( Arc::new(self.clone()), self.root.clone(), path.to_string(), @@ -519,7 +519,7 @@ impl Accessor for AzblobBackend { } async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { - let op = DirStream::new( + let op = AzblobPager::new( Arc::new(self.clone()), self.root.clone(), path.to_string(), diff --git a/src/services/azblob/mod.rs b/src/services/azblob/mod.rs index 546beb7589f..4d4237b0e29 100644 --- a/src/services/azblob/mod.rs +++ b/src/services/azblob/mod.rs @@ -15,6 +15,6 @@ mod backend; pub use backend::AzblobBuilder as Azblob; -mod dir_stream; mod error; +mod pager; mod writer; diff --git a/src/services/azblob/dir_stream.rs b/src/services/azblob/pager.rs similarity index 99% rename from src/services/azblob/dir_stream.rs rename to src/services/azblob/pager.rs index 967b98d113a..91c03da5fd5 100644 --- a/src/services/azblob/dir_stream.rs +++ b/src/services/azblob/pager.rs @@ -26,7 +26,7 @@ use super::error::parse_error; use crate::raw::*; use crate::*; -pub struct DirStream { +pub struct AzblobPager { backend: Arc, root: String, path: String, @@ -37,7 +37,7 @@ pub struct DirStream { done: bool, } -impl DirStream { +impl AzblobPager { pub fn new( backend: Arc, root: String, @@ -59,7 +59,7 @@ impl DirStream { } #[async_trait] -impl output::Page for DirStream { +impl output::Page for AzblobPager { async fn next(&mut self) -> Result>> { if self.done { return Ok(None); diff --git a/src/services/azdfs/backend.rs b/src/services/azdfs/backend.rs index ff60cdc0ab2..96fcf3190e8 100644 --- a/src/services/azdfs/backend.rs +++ b/src/services/azdfs/backend.rs @@ -29,8 +29,8 @@ use http::StatusCode; use log::debug; use reqsign::AzureStorageSigner; -use super::dir_stream::DirStream; use super::error::parse_error; +use super::pager::AzdfsPager; use super::writer::AzdfsWriter; use crate::object::ObjectMetadata; use crate::ops::*; @@ -305,7 +305,7 @@ impl Accessor for AzdfsBackend { type BlockingReader = (); type Writer = AzdfsWriter; type BlockingWriter = (); - type Pager = DirStream; + type Pager = AzdfsPager; type BlockingPager = (); fn metadata(&self) -> AccessorMetadata { @@ -397,7 +397,7 @@ impl Accessor for AzdfsBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - let op = DirStream::new( + let op = AzdfsPager::new( Arc::new(self.clone()), self.root.clone(), path.to_string(), diff --git a/src/services/azdfs/mod.rs b/src/services/azdfs/mod.rs index 573f97292ca..5dbbbe0234b 100644 --- a/src/services/azdfs/mod.rs +++ b/src/services/azdfs/mod.rs @@ -15,6 +15,6 @@ mod backend; pub use backend::AzdfsBuilder as Azdfs; -mod dir_stream; +mod pager; mod error; mod writer; diff --git a/src/services/azdfs/dir_stream.rs b/src/services/azdfs/pager.rs similarity index 98% rename from src/services/azdfs/dir_stream.rs rename to src/services/azdfs/pager.rs index 4a83ed8add3..7ad0ef3cd34 100644 --- a/src/services/azdfs/dir_stream.rs +++ b/src/services/azdfs/pager.rs @@ -25,7 +25,7 @@ use super::error::parse_error; use crate::raw::*; use crate::*; -pub struct DirStream { +pub struct AzdfsPager { backend: Arc, root: String, path: String, @@ -35,7 +35,7 @@ pub struct DirStream { done: bool, } -impl DirStream { +impl AzdfsPager { pub fn new( backend: Arc, root: String, @@ -55,7 +55,7 @@ impl DirStream { } #[async_trait] -impl output::Page for DirStream { +impl output::Page for AzdfsPager { async fn next(&mut self) -> Result>> { if self.done { return Ok(None); diff --git a/src/services/fs/backend.rs b/src/services/fs/backend.rs index e5b905acc14..6c45e78014d 100644 --- a/src/services/fs/backend.rs +++ b/src/services/fs/backend.rs @@ -26,9 +26,8 @@ use time::OffsetDateTime; use tokio::fs; use uuid::Uuid; -use super::dir_stream::BlockingDirPager; -use super::dir_stream::DirPager; use super::error::parse_io_error; +use super::pager::FsPager; use super::writer::FsWriter; use crate::object::*; use crate::ops::*; @@ -294,8 +293,8 @@ impl Accessor for FsBackend { type BlockingReader = output::into_blocking_reader::FdReader; type Writer = FsWriter; type BlockingWriter = FsWriter; - type Pager = Option; - type BlockingPager = Option; + type Pager = Option>; + type BlockingPager = Option>; fn metadata(&self) -> AccessorMetadata { let mut am = AccessorMetadata::default(); @@ -508,7 +507,7 @@ impl Accessor for FsBackend { } }; - let rd = DirPager::new(&self.root, f, args.limit()); + let rd = FsPager::new(&self.root, f, args.limit()); Ok((RpList::default(), Some(rd))) } @@ -694,7 +693,7 @@ impl Accessor for FsBackend { } }; - let rd = BlockingDirPager::new(&self.root, f, args.limit()); + let rd = FsPager::new(&self.root, f, args.limit()); Ok((RpList::default(), Some(rd))) } diff --git a/src/services/fs/mod.rs b/src/services/fs/mod.rs index 040c414fb23..0dad67e185b 100644 --- a/src/services/fs/mod.rs +++ b/src/services/fs/mod.rs @@ -15,6 +15,6 @@ mod backend; pub use backend::FsBuilder as Fs; -mod dir_stream; mod error; +mod pager; mod writer; diff --git a/src/services/fs/dir_stream.rs b/src/services/fs/pager.rs similarity index 88% rename from src/services/fs/dir_stream.rs rename to src/services/fs/pager.rs index a09a60aa9c5..8757bfe72e2 100644 --- a/src/services/fs/dir_stream.rs +++ b/src/services/fs/pager.rs @@ -23,15 +23,15 @@ use crate::ObjectMetadata; use crate::ObjectMode; use crate::Result; -pub struct DirPager { +pub struct FsPager

{ root: PathBuf, size: usize, - rd: tokio::fs::ReadDir, + rd: P, } -impl DirPager { - pub fn new(root: &Path, rd: tokio::fs::ReadDir, limit: Option) -> Self { +impl

FsPager

{ + pub fn new(root: &Path, rd: P, limit: Option) -> Self { Self { root: root.to_owned(), size: limit.unwrap_or(1000), @@ -41,7 +41,7 @@ impl DirPager { } #[async_trait] -impl output::Page for DirPager { +impl output::Page for FsPager { async fn next(&mut self) -> Result>> { let mut oes: Vec = Vec::with_capacity(self.size); @@ -85,24 +85,7 @@ impl output::Page for DirPager { } } -pub struct BlockingDirPager { - root: PathBuf, - - size: usize, - rd: std::fs::ReadDir, -} - -impl BlockingDirPager { - pub fn new(root: &Path, rd: std::fs::ReadDir, limit: Option) -> Self { - Self { - root: root.to_owned(), - size: limit.unwrap_or(1000), - rd, - } - } -} - -impl output::BlockingPage for BlockingDirPager { +impl output::BlockingPage for FsPager { fn next(&mut self) -> Result>> { let mut oes: Vec = Vec::with_capacity(self.size); diff --git a/src/services/ftp/backend.rs b/src/services/ftp/backend.rs index 6e68e2236af..547f2b23923 100644 --- a/src/services/ftp/backend.rs +++ b/src/services/ftp/backend.rs @@ -35,8 +35,8 @@ use suppaftp::Status; use time::OffsetDateTime; use tokio::sync::OnceCell; -use super::dir_stream::DirStream; -use super::dir_stream::ReadDir; +use super::pager::FtpPager; +use super::pager::ReadDir; use super::util::FtpReader; use super::writer::FtpWriter; use crate::ops::*; @@ -314,7 +314,7 @@ impl Accessor for FtpBackend { type BlockingReader = (); type Writer = FtpWriter; type BlockingWriter = (); - type Pager = DirStream; + type Pager = FtpPager; type BlockingPager = (); fn metadata(&self) -> AccessorMetadata { @@ -454,7 +454,7 @@ impl Accessor for FtpBackend { Ok(( RpList::default(), - DirStream::new(if path == "/" { "" } else { path }, rd, args.limit()), + FtpPager::new(if path == "/" { "" } else { path }, rd, args.limit()), )) } } diff --git a/src/services/ftp/mod.rs b/src/services/ftp/mod.rs index 58f8b626a16..821d77238af 100644 --- a/src/services/ftp/mod.rs +++ b/src/services/ftp/mod.rs @@ -15,7 +15,7 @@ mod backend; pub use backend::FtpBuilder as Ftp; -mod dir_stream; mod err; +mod pager; mod util; mod writer; diff --git a/src/services/ftp/dir_stream.rs b/src/services/ftp/pager.rs similarity index 93% rename from src/services/ftp/dir_stream.rs rename to src/services/ftp/pager.rs index 49f76e19b55..67e8e3d745b 100644 --- a/src/services/ftp/dir_stream.rs +++ b/src/services/ftp/pager.rs @@ -22,6 +22,9 @@ use time::OffsetDateTime; use crate::raw::*; use crate::*; +/// TODO: refactor me, we don't need an extra ReadDir here. +/// +/// ref: pub struct ReadDir { files: Vec, index: usize, @@ -52,13 +55,13 @@ impl ReadDir { } } -pub struct DirStream { +pub struct FtpPager { path: String, size: usize, rd: ReadDir, } -impl DirStream { +impl FtpPager { pub fn new(path: &str, rd: ReadDir, limit: Option) -> Self { Self { path: path.to_string(), @@ -69,7 +72,7 @@ impl DirStream { } #[async_trait] -impl output::Page for DirStream { +impl output::Page for FtpPager { async fn next(&mut self) -> Result>> { let mut oes: Vec = Vec::with_capacity(self.size); diff --git a/src/services/gcs/backend.rs b/src/services/gcs/backend.rs index 1f9736f6e4f..708550c5d31 100644 --- a/src/services/gcs/backend.rs +++ b/src/services/gcs/backend.rs @@ -31,8 +31,8 @@ use serde_json; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; -use super::dir_stream::DirStream; use super::error::parse_error; +use super::pager::GcsPager; use super::uri::percent_encode_path; use super::writer::GcsWriter; use crate::ops::*; @@ -348,7 +348,7 @@ impl Accessor for GcsBackend { type BlockingReader = (); type Writer = GcsWriter; type BlockingWriter = (); - type Pager = DirStream; + type Pager = GcsPager; type BlockingPager = (); fn metadata(&self) -> AccessorMetadata { @@ -457,14 +457,14 @@ impl Accessor for GcsBackend { async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { Ok(( RpList::default(), - DirStream::new(Arc::new(self.clone()), &self.root, path, "/", args.limit()), + GcsPager::new(Arc::new(self.clone()), &self.root, path, "/", args.limit()), )) } async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { Ok(( RpScan::default(), - DirStream::new(Arc::new(self.clone()), &self.root, path, "", args.limit()), + GcsPager::new(Arc::new(self.clone()), &self.root, path, "", args.limit()), )) } } diff --git a/src/services/gcs/mod.rs b/src/services/gcs/mod.rs index 08850b157c5..49cc1a3b188 100644 --- a/src/services/gcs/mod.rs +++ b/src/services/gcs/mod.rs @@ -15,7 +15,7 @@ mod backend; pub use backend::GcsBuilder as Gcs; -mod dir_stream; mod error; +mod pager; mod uri; mod writer; diff --git a/src/services/gcs/dir_stream.rs b/src/services/gcs/pager.rs similarity index 98% rename from src/services/gcs/dir_stream.rs rename to src/services/gcs/pager.rs index f98a08c3ff6..7869099696b 100644 --- a/src/services/gcs/dir_stream.rs +++ b/src/services/gcs/pager.rs @@ -25,9 +25,9 @@ use super::error::parse_error; use crate::raw::*; use crate::*; -/// DirStream takes over task of listing objects and +/// GcsPager takes over task of listing objects and /// helps walking directory -pub struct DirStream { +pub struct GcsPager { backend: Arc, root: String, path: String, @@ -38,7 +38,7 @@ pub struct DirStream { done: bool, } -impl DirStream { +impl GcsPager { /// Generate a new directory walker pub fn new( backend: Arc, @@ -61,7 +61,7 @@ impl DirStream { } #[async_trait] -impl output::Page for DirStream { +impl output::Page for GcsPager { async fn next(&mut self) -> Result>> { if self.done { return Ok(None); diff --git a/src/services/hdfs/backend.rs b/src/services/hdfs/backend.rs index 631efde7d57..ee85c3e691a 100644 --- a/src/services/hdfs/backend.rs +++ b/src/services/hdfs/backend.rs @@ -24,8 +24,8 @@ use async_trait::async_trait; use log::debug; use time::OffsetDateTime; -use super::dir_stream::DirStream; use super::error::parse_io_error; +use super::pager::HdfsPager; use super::writer::HdfsWriter; use crate::ops::*; use crate::raw::*; @@ -225,8 +225,8 @@ impl Accessor for HdfsBackend { type BlockingReader = output::into_blocking_reader::FdReader; type Writer = HdfsWriter; type BlockingWriter = HdfsWriter; - type Pager = Option; - type BlockingPager = Option; + type Pager = Option; + type BlockingPager = Option; fn metadata(&self) -> AccessorMetadata { let mut am = AccessorMetadata::default(); @@ -407,7 +407,7 @@ impl Accessor for HdfsBackend { } }; - let rd = DirStream::new(&self.root, f, args.limit()); + let rd = HdfsPager::new(&self.root, f, args.limit()); Ok((RpList::default(), Some(rd))) } @@ -574,7 +574,7 @@ impl Accessor for HdfsBackend { } }; - let rd = DirStream::new(&self.root, f, args.limit()); + let rd = HdfsPager::new(&self.root, f, args.limit()); Ok((RpList::default(), Some(rd))) } diff --git a/src/services/hdfs/mod.rs b/src/services/hdfs/mod.rs index 73079835389..2695d576d4c 100644 --- a/src/services/hdfs/mod.rs +++ b/src/services/hdfs/mod.rs @@ -15,6 +15,6 @@ mod backend; pub use backend::HdfsBuilder as Hdfs; -mod dir_stream; mod error; +mod pager; mod writer; diff --git a/src/services/hdfs/dir_stream.rs b/src/services/hdfs/pager.rs similarity index 96% rename from src/services/hdfs/dir_stream.rs rename to src/services/hdfs/pager.rs index 1441afee7b0..09873f7a3a4 100644 --- a/src/services/hdfs/dir_stream.rs +++ b/src/services/hdfs/pager.rs @@ -19,14 +19,14 @@ use crate::ObjectMetadata; use crate::ObjectMode; use crate::Result; -pub struct DirStream { +pub struct HdfsPager { root: String, size: usize, rd: hdrs::Readdir, } -impl DirStream { +impl HdfsPager { pub fn new(root: &str, rd: hdrs::Readdir, limit: Option) -> Self { Self { root: root.to_string(), @@ -38,7 +38,7 @@ impl DirStream { } #[async_trait] -impl output::Page for DirStream { +impl output::Page for HdfsPager { async fn next(&mut self) -> Result>> { let mut oes: Vec = Vec::with_capacity(self.size); @@ -69,7 +69,7 @@ impl output::Page for DirStream { } } -impl output::BlockingPage for DirStream { +impl output::BlockingPage for HdfsPager { fn next(&mut self) -> Result>> { let mut oes: Vec = Vec::with_capacity(self.size); diff --git a/src/services/ipmfs/backend.rs b/src/services/ipmfs/backend.rs index 2fb05412ed5..d1538491458 100644 --- a/src/services/ipmfs/backend.rs +++ b/src/services/ipmfs/backend.rs @@ -23,8 +23,8 @@ use http::Response; use http::StatusCode; use serde::Deserialize; -use super::dir_stream::DirStream; use super::error::parse_error; +use super::pager::IpmfsPager; use super::writer::IpmfsWriter; use crate::ops::*; use crate::raw::*; @@ -63,7 +63,7 @@ impl Accessor for IpmfsBackend { type BlockingReader = (); type Writer = IpmfsWriter; type BlockingWriter = (); - type Pager = DirStream; + type Pager = IpmfsPager; type BlockingPager = (); fn metadata(&self) -> AccessorMetadata { @@ -166,7 +166,7 @@ impl Accessor for IpmfsBackend { async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Pager)> { Ok(( RpList::default(), - DirStream::new(Arc::new(self.clone()), &self.root, path), + IpmfsPager::new(Arc::new(self.clone()), &self.root, path), )) } } diff --git a/src/services/ipmfs/mod.rs b/src/services/ipmfs/mod.rs index 3ffd874f3e7..25cab4d8dbc 100644 --- a/src/services/ipmfs/mod.rs +++ b/src/services/ipmfs/mod.rs @@ -16,6 +16,6 @@ mod backend; mod builder; pub use builder::IpmfsBuilder as Ipmfs; -mod dir_stream; +mod pager; mod error; mod writer; diff --git a/src/services/ipmfs/dir_stream.rs b/src/services/ipmfs/pager.rs similarity index 97% rename from src/services/ipmfs/dir_stream.rs rename to src/services/ipmfs/pager.rs index a9f4404f3f4..89342d8ed8b 100644 --- a/src/services/ipmfs/dir_stream.rs +++ b/src/services/ipmfs/pager.rs @@ -25,14 +25,14 @@ use crate::ObjectMetadata; use crate::ObjectMode; use crate::Result; -pub struct DirStream { +pub struct IpmfsPager { backend: Arc, root: String, path: String, consumed: bool, } -impl DirStream { +impl IpmfsPager { pub fn new(backend: Arc, root: &str, path: &str) -> Self { Self { backend, @@ -44,7 +44,7 @@ impl DirStream { } #[async_trait] -impl output::Page for DirStream { +impl output::Page for IpmfsPager { async fn next(&mut self) -> Result>> { if self.consumed { return Ok(None); diff --git a/src/services/obs/backend.rs b/src/services/obs/backend.rs index fa6aadf1b7d..419fd4e0be7 100644 --- a/src/services/obs/backend.rs +++ b/src/services/obs/backend.rs @@ -26,8 +26,8 @@ use http::Uri; use log::debug; use reqsign::HuaweicloudObsSigner; -use super::dir_stream::DirStream; use super::error::parse_error; +use super::pager::ObsPager; use super::writer::ObsWriter; use crate::ops::*; use crate::raw::*; @@ -306,7 +306,7 @@ impl Accessor for ObsBackend { type BlockingReader = (); type Writer = ObsWriter; type BlockingWriter = (); - type Pager = DirStream; + type Pager = ObsPager; type BlockingPager = (); fn metadata(&self) -> AccessorMetadata { @@ -398,14 +398,14 @@ impl Accessor for ObsBackend { async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { Ok(( RpList::default(), - DirStream::new(Arc::new(self.clone()), &self.root, path, "/", args.limit()), + ObsPager::new(Arc::new(self.clone()), &self.root, path, "/", args.limit()), )) } async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { Ok(( RpScan::default(), - DirStream::new(Arc::new(self.clone()), &self.root, path, "", args.limit()), + ObsPager::new(Arc::new(self.clone()), &self.root, path, "", args.limit()), )) } } diff --git a/src/services/obs/mod.rs b/src/services/obs/mod.rs index 266cfc283ac..4c2e4cc80f7 100644 --- a/src/services/obs/mod.rs +++ b/src/services/obs/mod.rs @@ -15,6 +15,6 @@ mod backend; pub use backend::ObsBuilder as Obs; -mod dir_stream; +mod pager; mod error; mod writer; diff --git a/src/services/obs/dir_stream.rs b/src/services/obs/pager.rs similarity index 98% rename from src/services/obs/dir_stream.rs rename to src/services/obs/pager.rs index 3ec786c6fac..b757713e431 100644 --- a/src/services/obs/dir_stream.rs +++ b/src/services/obs/pager.rs @@ -28,7 +28,7 @@ use crate::ObjectMetadata; use crate::ObjectMode; use crate::Result; -pub struct DirStream { +pub struct ObsPager { backend: Arc, root: String, path: String, @@ -39,7 +39,7 @@ pub struct DirStream { done: bool, } -impl DirStream { +impl ObsPager { pub fn new( backend: Arc, root: &str, @@ -61,7 +61,7 @@ impl DirStream { } #[async_trait] -impl output::Page for DirStream { +impl output::Page for ObsPager { async fn next(&mut self) -> Result>> { if self.done { return Ok(None); diff --git a/src/services/oss/backend.rs b/src/services/oss/backend.rs index aaeaf7c557a..df93396b07f 100644 --- a/src/services/oss/backend.rs +++ b/src/services/oss/backend.rs @@ -35,8 +35,8 @@ use reqsign::AliyunOssSigner; use serde::Deserialize; use serde::Serialize; -use super::dir_stream::DirStream; use super::error::parse_error; +use super::pager::OssPager; use super::writer::OssWriter; use crate::ops::*; use crate::raw::*; @@ -408,7 +408,7 @@ impl Accessor for OssBackend { type BlockingReader = (); type Writer = OssWriter; type BlockingWriter = (); - type Pager = DirStream; + type Pager = OssPager; type BlockingPager = (); fn metadata(&self) -> AccessorMetadata { @@ -497,14 +497,14 @@ impl Accessor for OssBackend { async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { Ok(( RpList::default(), - DirStream::new(Arc::new(self.clone()), &self.root, path, "/", args.limit()), + OssPager::new(Arc::new(self.clone()), &self.root, path, "/", args.limit()), )) } async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { Ok(( RpScan::default(), - DirStream::new(Arc::new(self.clone()), &self.root, path, "", args.limit()), + OssPager::new(Arc::new(self.clone()), &self.root, path, "", args.limit()), )) } diff --git a/src/services/oss/mod.rs b/src/services/oss/mod.rs index 58acd56d826..e5b82178977 100644 --- a/src/services/oss/mod.rs +++ b/src/services/oss/mod.rs @@ -15,6 +15,6 @@ mod backend; pub use backend::OssBuilder as Oss; -mod dir_stream; +mod pager; mod error; mod writer; diff --git a/src/services/oss/dir_stream.rs b/src/services/oss/pager.rs similarity index 99% rename from src/services/oss/dir_stream.rs rename to src/services/oss/pager.rs index 31bea3fb9c0..065b64af9aa 100644 --- a/src/services/oss/dir_stream.rs +++ b/src/services/oss/pager.rs @@ -31,7 +31,7 @@ use crate::ObjectMetadata; use crate::ObjectMode; use crate::Result; -pub struct DirStream { +pub struct OssPager { backend: Arc, root: String, path: String, @@ -42,7 +42,7 @@ pub struct DirStream { done: bool, } -impl DirStream { +impl OssPager { pub fn new( backend: Arc, root: &str, @@ -65,7 +65,7 @@ impl DirStream { } #[async_trait] -impl output::Page for DirStream { +impl output::Page for OssPager { async fn next(&mut self) -> Result>> { if self.done { return Ok(None); diff --git a/src/services/s3/backend.rs b/src/services/s3/backend.rs index 2b98eb1e578..7c4d34bcf1b 100644 --- a/src/services/s3/backend.rs +++ b/src/services/s3/backend.rs @@ -44,8 +44,8 @@ use reqsign::AwsV4Signer; use serde::Deserialize; use serde::Serialize; -use super::dir_stream::DirStream; use super::error::parse_error; +use super::pager::S3Pager; use super::writer::S3Writer; use crate::ops::*; use crate::raw::*; @@ -1110,7 +1110,7 @@ impl Accessor for S3Backend { type BlockingReader = (); type Writer = S3Writer; type BlockingWriter = (); - type Pager = DirStream; + type Pager = S3Pager; type BlockingPager = (); fn metadata(&self) -> AccessorMetadata { @@ -1220,14 +1220,14 @@ impl Accessor for S3Backend { async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { Ok(( RpList::default(), - DirStream::new(Arc::new(self.clone()), &self.root, path, "/", args.limit()), + S3Pager::new(Arc::new(self.clone()), &self.root, path, "/", args.limit()), )) } async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { Ok(( RpScan::default(), - DirStream::new(Arc::new(self.clone()), &self.root, path, "", args.limit()), + S3Pager::new(Arc::new(self.clone()), &self.root, path, "", args.limit()), )) } diff --git a/src/services/s3/mod.rs b/src/services/s3/mod.rs index b64009665b7..b4f87b21a6a 100644 --- a/src/services/s3/mod.rs +++ b/src/services/s3/mod.rs @@ -15,6 +15,6 @@ mod backend; pub use backend::S3Builder as S3; -mod dir_stream; +mod pager; mod error; mod writer; diff --git a/src/services/s3/dir_stream.rs b/src/services/s3/pager.rs similarity index 99% rename from src/services/s3/dir_stream.rs rename to src/services/s3/pager.rs index 775a426c914..1d2fbd4c1d3 100644 --- a/src/services/s3/dir_stream.rs +++ b/src/services/s3/pager.rs @@ -30,7 +30,7 @@ use crate::ObjectMetadata; use crate::ObjectMode; use crate::Result; -pub struct DirStream { +pub struct S3Pager { backend: Arc, root: String, path: String, @@ -41,7 +41,7 @@ pub struct DirStream { done: bool, } -impl DirStream { +impl S3Pager { pub fn new( backend: Arc, root: &str, @@ -63,7 +63,7 @@ impl DirStream { } #[async_trait] -impl output::Page for DirStream { +impl output::Page for S3Pager { async fn next(&mut self) -> Result>> { if self.done { return Ok(None); diff --git a/src/services/webdav/backend.rs b/src/services/webdav/backend.rs index 8b74ba53e4e..d5d696dd751 100644 --- a/src/services/webdav/backend.rs +++ b/src/services/webdav/backend.rs @@ -24,9 +24,9 @@ use http::Response; use http::StatusCode; use log::debug; -use super::dir_stream::DirStream; use super::error::parse_error; use super::list_response::Multistatus; +use super::pager::WebdavPager; use super::writer::WebdavWriter; use crate::ops::*; use crate::raw::*; @@ -256,7 +256,7 @@ impl Accessor for WebdavBackend { type BlockingReader = (); type Writer = WebdavWriter; type BlockingWriter = (); - type Pager = DirStream; + type Pager = WebdavPager; type BlockingPager = (); fn metadata(&self) -> AccessorMetadata { @@ -362,11 +362,14 @@ impl Accessor for WebdavBackend { let result: Multistatus = quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; - Ok((RpList::default(), DirStream::new(&self.root, path, result))) + Ok(( + RpList::default(), + WebdavPager::new(&self.root, path, result), + )) } StatusCode::NOT_FOUND if path.ends_with('/') => Ok(( RpList::default(), - DirStream::new( + WebdavPager::new( &self.root, path, Multistatus { diff --git a/src/services/webdav/mod.rs b/src/services/webdav/mod.rs index 092ee8e2299..763a9d8d81e 100644 --- a/src/services/webdav/mod.rs +++ b/src/services/webdav/mod.rs @@ -15,7 +15,7 @@ mod backend; pub use backend::WebdavBuilder as Webdav; -mod dir_stream; mod error; mod list_response; +mod pager; mod writer; diff --git a/src/services/webdav/dir_stream.rs b/src/services/webdav/pager.rs similarity index 96% rename from src/services/webdav/dir_stream.rs rename to src/services/webdav/pager.rs index f801ab707f3..da4ca7a41c6 100644 --- a/src/services/webdav/dir_stream.rs +++ b/src/services/webdav/pager.rs @@ -23,13 +23,13 @@ use crate::ObjectMetadata; use crate::ObjectMode; use crate::Result; -pub struct DirStream { +pub struct WebdavPager { root: String, path: String, multistates: Multistatus, } -impl DirStream { +impl WebdavPager { pub fn new(root: &str, path: &str, multistates: Multistatus) -> Self { Self { root: root.into(), @@ -40,7 +40,7 @@ impl DirStream { } #[async_trait] -impl output::Page for DirStream { +impl output::Page for WebdavPager { async fn next(&mut self) -> Result>> { if self.multistates.response.is_empty() { return Ok(None); diff --git a/src/services/webhdfs/backend.rs b/src/services/webhdfs/backend.rs index 39ef215bebb..a2404f5215d 100644 --- a/src/services/webhdfs/backend.rs +++ b/src/services/webhdfs/backend.rs @@ -27,12 +27,12 @@ use log::debug; use log::error; use tokio::sync::OnceCell; -use super::dir_stream::DirStream; use super::error::parse_error; use super::message::BooleanResp; use super::message::FileStatusType; use super::message::FileStatusWrapper; use super::message::FileStatusesWrapper; +use super::pager::WebhdfsPager; use super::writer::WebhdfsWriter; use crate::ops::*; use crate::raw::*; @@ -538,7 +538,7 @@ impl Accessor for WebhdfsBackend { type BlockingReader = (); type Writer = WebhdfsWriter; type BlockingWriter = (); - type Pager = DirStream; + type Pager = WebhdfsPager; type BlockingPager = (); fn metadata(&self) -> AccessorMetadata { @@ -677,11 +677,11 @@ impl Accessor for WebhdfsBackend { .file_statuses .file_status; - let objects = DirStream::new(path, file_statuses); + let objects = WebhdfsPager::new(path, file_statuses); Ok((RpList::default(), objects)) } StatusCode::NOT_FOUND => { - let objects = DirStream::new(path, vec![]); + let objects = WebhdfsPager::new(path, vec![]); Ok((RpList::default(), objects)) } _ => Err(parse_error(resp).await?), diff --git a/src/services/webhdfs/message.rs b/src/services/webhdfs/message.rs index 0bf98933680..eda0df84885 100644 --- a/src/services/webhdfs/message.rs +++ b/src/services/webhdfs/message.rs @@ -80,7 +80,7 @@ pub enum FileStatusType { mod test { use super::*; use crate::raw::output::Page; - use crate::services::webhdfs::dir_stream::DirStream; + use crate::services::webhdfs::pager::WebhdfsPager; use crate::ObjectMode; #[test] @@ -165,7 +165,7 @@ mod test { .file_statuses .file_status; - let mut pager = DirStream::new("listing/directory", file_statuses); + let mut pager = WebhdfsPager::new("listing/directory", file_statuses); let mut entries = vec![]; while let Some(oes) = pager.next().await.expect("must success") { entries.extend(oes); diff --git a/src/services/webhdfs/mod.rs b/src/services/webhdfs/mod.rs index fec2ac43e1d..27fac217f2c 100644 --- a/src/services/webhdfs/mod.rs +++ b/src/services/webhdfs/mod.rs @@ -15,7 +15,7 @@ mod backend; pub use backend::WebhdfsBuilder as Webhdfs; -mod dir_stream; +mod pager; mod error; mod message; mod writer; diff --git a/src/services/webhdfs/dir_stream.rs b/src/services/webhdfs/pager.rs similarity index 94% rename from src/services/webhdfs/dir_stream.rs rename to src/services/webhdfs/pager.rs index ce2f266b094..6da7922bf7d 100644 --- a/src/services/webhdfs/dir_stream.rs +++ b/src/services/webhdfs/pager.rs @@ -18,12 +18,12 @@ use super::message::FileStatus; use crate::raw::*; use crate::*; -pub struct DirStream { +pub struct WebhdfsPager { path: String, statuses: Vec, } -impl DirStream { +impl WebhdfsPager { pub fn new(path: &str, statuses: Vec) -> Self { Self { path: path.to_string(), @@ -33,7 +33,7 @@ impl DirStream { } #[async_trait] -impl output::Page for DirStream { +impl output::Page for WebhdfsPager { async fn next(&mut self) -> Result>> { if self.statuses.is_empty() { return Ok(None); From a07d2875659858d46a38ff6ded79a05c3c0dba7d Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Wed, 1 Mar 2023 14:11:30 +0800 Subject: [PATCH 5/5] Format code Signed-off-by: Xuanwo --- src/object/writer.rs | 6 ++++-- src/raw/io/output/write.rs | 4 ++-- src/services/azdfs/mod.rs | 2 +- src/services/fs/writer.rs | 3 +-- src/services/hdfs/writer.rs | 3 +-- src/services/ipmfs/mod.rs | 2 +- src/services/obs/mod.rs | 2 +- src/services/oss/mod.rs | 2 +- src/services/s3/mod.rs | 2 +- src/services/s3/writer.rs | 3 ++- src/services/webhdfs/mod.rs | 2 +- 11 files changed, 16 insertions(+), 15 deletions(-) diff --git a/src/object/writer.rs b/src/object/writer.rs index 323ce6c5760..6b0c8ea9d5b 100644 --- a/src/object/writer.rs +++ b/src/object/writer.rs @@ -15,12 +15,14 @@ use std::fmt::Display; use std::io; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::Context; +use std::task::Poll; use bytes::Bytes; use futures::future::BoxFuture; +use futures::ready; +use futures::AsyncWrite; use futures::FutureExt; -use futures::{ready, AsyncWrite}; use crate::ops::OpWrite; use crate::raw::*; diff --git a/src/raw/io/output/write.rs b/src/raw/io/output/write.rs index d75ce41a123..b56b7fd4e59 100644 --- a/src/raw/io/output/write.rs +++ b/src/raw/io/output/write.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::*; - use async_trait::async_trait; use bytes::Bytes; +use crate::*; + /// Writer is a type erased [`Write`] pub type Writer = Box; diff --git a/src/services/azdfs/mod.rs b/src/services/azdfs/mod.rs index 5dbbbe0234b..404833dea81 100644 --- a/src/services/azdfs/mod.rs +++ b/src/services/azdfs/mod.rs @@ -15,6 +15,6 @@ mod backend; pub use backend::AzdfsBuilder as Azdfs; -mod pager; mod error; +mod pager; mod writer; diff --git a/src/services/fs/writer.rs b/src/services/fs/writer.rs index a2d7bdfde24..de24aaff4a9 100644 --- a/src/services/fs/writer.rs +++ b/src/services/fs/writer.rs @@ -22,11 +22,10 @@ use bytes::Bytes; use tokio::io::AsyncSeekExt; use tokio::io::AsyncWriteExt; +use super::error::parse_io_error; use crate::raw::*; use crate::*; -use super::error::parse_io_error; - pub struct FsWriter { target_path: PathBuf, tmp_path: Option, diff --git a/src/services/hdfs/writer.rs b/src/services/hdfs/writer.rs index eed4ad274d9..f7d88d0ad4c 100644 --- a/src/services/hdfs/writer.rs +++ b/src/services/hdfs/writer.rs @@ -21,11 +21,10 @@ use bytes::Bytes; use futures::AsyncSeekExt; use futures::AsyncWriteExt; +use super::error::parse_io_error; use crate::raw::*; use crate::*; -use super::error::parse_io_error; - pub struct HdfsWriter { f: F, pos: u64, diff --git a/src/services/ipmfs/mod.rs b/src/services/ipmfs/mod.rs index 25cab4d8dbc..c5ca5479ab6 100644 --- a/src/services/ipmfs/mod.rs +++ b/src/services/ipmfs/mod.rs @@ -16,6 +16,6 @@ mod backend; mod builder; pub use builder::IpmfsBuilder as Ipmfs; -mod pager; mod error; +mod pager; mod writer; diff --git a/src/services/obs/mod.rs b/src/services/obs/mod.rs index 4c2e4cc80f7..1001d802d8b 100644 --- a/src/services/obs/mod.rs +++ b/src/services/obs/mod.rs @@ -15,6 +15,6 @@ mod backend; pub use backend::ObsBuilder as Obs; -mod pager; mod error; +mod pager; mod writer; diff --git a/src/services/oss/mod.rs b/src/services/oss/mod.rs index e5b82178977..c29c7054dc2 100644 --- a/src/services/oss/mod.rs +++ b/src/services/oss/mod.rs @@ -15,6 +15,6 @@ mod backend; pub use backend::OssBuilder as Oss; -mod pager; mod error; +mod pager; mod writer; diff --git a/src/services/s3/mod.rs b/src/services/s3/mod.rs index b4f87b21a6a..fd999a7c82e 100644 --- a/src/services/s3/mod.rs +++ b/src/services/s3/mod.rs @@ -15,6 +15,6 @@ mod backend; pub use backend::S3Builder as S3; -mod pager; mod error; +mod pager; mod writer; diff --git a/src/services/s3/writer.rs b/src/services/s3/writer.rs index 2acb90d9fc0..8e7264728dc 100644 --- a/src/services/s3/writer.rs +++ b/src/services/s3/writer.rs @@ -16,7 +16,8 @@ use async_trait::async_trait; use bytes::Bytes; use http::StatusCode; -use super::backend::{CompleteMultipartUploadRequestPart, S3Backend}; +use super::backend::CompleteMultipartUploadRequestPart; +use super::backend::S3Backend; use super::error::parse_error; use crate::ops::OpWrite; use crate::raw::*; diff --git a/src/services/webhdfs/mod.rs b/src/services/webhdfs/mod.rs index 27fac217f2c..d0218affb08 100644 --- a/src/services/webhdfs/mod.rs +++ b/src/services/webhdfs/mod.rs @@ -15,7 +15,7 @@ mod backend; pub use backend::WebhdfsBuilder as Webhdfs; -mod pager; mod error; mod message; +mod pager; mod writer;