From ef8daf4e85d78acd7da30090d4aa37e166dc7a15 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 9 Feb 2023 15:04:40 +0800 Subject: [PATCH] Rename Pager Signed-off-by: Xuanwo --- src/docs/upgrade.md | 8 ++++---- src/layers/concurrent_limit.rs | 25 ++++++++++--------------- src/layers/error_context.rs | 16 ++++++---------- src/layers/immutable_index.rs | 16 ++++++---------- src/layers/logging.rs | 24 ++++++++++-------------- src/layers/metrics.rs | 8 ++------ src/layers/retry.rs | 22 +++++++++------------- src/layers/tracing.rs | 25 ++++++++++--------------- src/object/list.rs | 10 +++++----- src/raw/accessor.rs | 24 ++++++------------------ src/raw/io/output/entry.rs | 2 +- src/raw/io/output/mod.rs | 8 ++++---- src/raw/io/output/page.rs | 26 +++++++++++++------------- src/raw/io/walk.rs | 8 ++++---- src/raw/layer.rs | 16 ++++------------ src/services/azblob/backend.rs | 4 ++-- src/services/azblob/dir_stream.rs | 2 +- src/services/azdfs/backend.rs | 4 ++-- src/services/azdfs/dir_stream.rs | 2 +- src/services/fs/backend.rs | 15 ++++----------- src/services/fs/dir_stream.rs | 4 ++-- src/services/ftp/backend.rs | 5 ++--- src/services/ftp/dir_stream.rs | 2 +- src/services/gcs/backend.rs | 5 ++--- src/services/gcs/dir_stream.rs | 2 +- src/services/hdfs/backend.rs | 15 ++++----------- src/services/hdfs/dir_stream.rs | 4 ++-- src/services/ipfs/backend.rs | 6 +++--- src/services/ipmfs/backend.rs | 5 ++--- src/services/ipmfs/dir_stream.rs | 2 +- src/services/obs/backend.rs | 5 ++--- src/services/obs/dir_stream.rs | 2 +- src/services/oss/backend.rs | 5 ++--- src/services/oss/dir_stream.rs | 2 +- src/services/s3/backend.rs | 5 ++--- src/services/s3/dir_stream.rs | 2 +- src/services/webhdfs/backend.rs | 6 +++--- src/services/webhdfs/dir_stream.rs | 2 +- src/services/webhdfs/message.rs | 2 +- 39 files changed, 138 insertions(+), 208 deletions(-) diff --git a/src/docs/upgrade.md b/src/docs/upgrade.md index b3ae19e1d3b..e5133efa7c9 100644 --- a/src/docs/upgrade.md +++ b/src/docs/upgrade.md @@ -149,16 +149,16 @@ In v0.21, we refactor the whole `Accessor`'s API: Since v0.21, we will return a reply struct for different operations called `RpWrite` instead of an exact type. We can split OpenDAL's public API and raw API with this change. -## ObjectList and ObjectPage +## ObjectList and Page -Since v0.21, `Accessor` will return `ObjectPager` for `List`: +Since v0.21, `Accessor` will return `Pager` for `List`: ```diff - async fn list(&self, path: &str, args: OpList) -> Result -+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> ++ async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)> ``` -And `Object` will return an `ObjectLister` which is built upon `ObjectPage`: +And `Object` will return an `ObjectLister` which is built upon `Page`: ```rust pub async fn list(&self) -> Result { ... } diff --git a/src/layers/concurrent_limit.rs b/src/layers/concurrent_limit.rs index c008575da73..2cda44b14f6 100644 --- a/src/layers/concurrent_limit.rs +++ b/src/layers/concurrent_limit.rs @@ -135,7 +135,7 @@ impl LayeredAccessor for ConcurrentLimitAccessor { self.inner.delete(path, args).await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)> { let permit = self .semaphore .clone() @@ -146,7 +146,7 @@ impl LayeredAccessor for ConcurrentLimitAccessor { self.inner.list(path, args).await.map(|(rp, s)| { ( rp, - Box::new(ConcurrentLimitPager::new(s, permit)) as output::ObjectPager, + Box::new(ConcurrentLimitPager::new(s, permit)) as output::Pager, ) }) } @@ -261,11 +261,7 @@ impl LayeredAccessor for ConcurrentLimitAccessor { self.inner.blocking_delete(path, args) } - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> Result<(RpList, output::BlockingObjectPager)> { + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, output::BlockingPager)> { let permit = self .semaphore .clone() @@ -275,8 +271,7 @@ impl LayeredAccessor for ConcurrentLimitAccessor { self.inner.blocking_list(path, args).map(|(rp, it)| { ( rp, - Box::new(BlockingConcurrentLimitPager::new(it, permit)) - as output::BlockingObjectPager, + Box::new(BlockingConcurrentLimitPager::new(it, permit)) as output::BlockingPager, ) }) } @@ -327,14 +322,14 @@ impl output::BlockingRead for ConcurrentLimitReader } struct ConcurrentLimitPager { - inner: output::ObjectPager, + inner: output::Pager, // Hold on this permit until this streamer has been dropped. _permit: OwnedSemaphorePermit, } impl ConcurrentLimitPager { - fn new(inner: output::ObjectPager, permit: OwnedSemaphorePermit) -> Self { + fn new(inner: output::Pager, permit: OwnedSemaphorePermit) -> Self { Self { inner, _permit: permit, @@ -343,21 +338,21 @@ impl ConcurrentLimitPager { } #[async_trait] -impl output::ObjectPage for ConcurrentLimitPager { +impl output::Page for ConcurrentLimitPager { async fn next_page(&mut self) -> Result>> { self.inner.next_page().await } } struct BlockingConcurrentLimitPager { - inner: output::BlockingObjectPager, + inner: output::BlockingPager, // Hold on this permit until this iterator has been dropped. _permit: OwnedSemaphorePermit, } impl BlockingConcurrentLimitPager { - fn new(inner: output::BlockingObjectPager, permit: OwnedSemaphorePermit) -> Self { + fn new(inner: output::BlockingPager, permit: OwnedSemaphorePermit) -> Self { Self { inner, _permit: permit, @@ -365,7 +360,7 @@ impl BlockingConcurrentLimitPager { } } -impl output::BlockingObjectPage for BlockingConcurrentLimitPager { +impl output::BlockingPage for BlockingConcurrentLimitPager { fn next_page(&mut self) -> Result>> { self.inner.next_page() } diff --git a/src/layers/error_context.rs b/src/layers/error_context.rs index 437283624d6..00ce2270a44 100644 --- a/src/layers/error_context.rs +++ b/src/layers/error_context.rs @@ -118,7 +118,7 @@ impl LayeredAccessor for ErrorContextAccessor { .await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)> { self.inner .list(path, args) .map_ok(|(rp, os)| { @@ -134,7 +134,7 @@ impl LayeredAccessor for ErrorContextAccessor { // // ref: https://github.com/rust-lang/rust/issues/80437 inner: os, - }) as output::ObjectPager, + }) as output::Pager, ) }) .map_err(|err| { @@ -259,11 +259,7 @@ impl LayeredAccessor for ErrorContextAccessor { }) } - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> Result<(RpList, output::BlockingObjectPager)> { + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, output::BlockingPager)> { self.inner.blocking_list(path, args).map_err(|err| { err.with_operation(Operation::BlockingList.into_static()) .with_context("service", self.meta.scheme()) @@ -272,17 +268,17 @@ impl LayeredAccessor for ErrorContextAccessor { } } -struct ObjectStreamErrorContextWrapper { +struct ObjectStreamErrorContextWrapper { scheme: Scheme, path: String, inner: T, } #[async_trait::async_trait] -impl output::ObjectPage for ObjectStreamErrorContextWrapper { +impl output::Page for ObjectStreamErrorContextWrapper { async fn next_page(&mut self) -> Result>> { self.inner.next_page().await.map_err(|err| { - err.with_operation("ObjectPage::next_page") + err.with_operation("Page::next_page") .with_context("service", self.scheme) .with_context("path", &self.path) }) diff --git a/src/layers/immutable_index.rs b/src/layers/immutable_index.rs index bd75608f3fa..cf03f523467 100644 --- a/src/layers/immutable_index.rs +++ b/src/layers/immutable_index.rs @@ -146,7 +146,7 @@ impl LayeredAccessor for ImmutableIndexAccessor { self.inner.read(path, args).await } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::Pager)> { let mut path = path; if path == "/" { path = "" @@ -154,7 +154,7 @@ impl LayeredAccessor for ImmutableIndexAccessor { Ok(( RpList::default(), - Box::new(ImmutableDir::new(self.children(path))) as output::ObjectPager, + Box::new(ImmutableDir::new(self.children(path))) as output::Pager, )) } @@ -162,11 +162,7 @@ impl LayeredAccessor for ImmutableIndexAccessor { self.inner.blocking_read(path, args) } - fn blocking_list( - &self, - path: &str, - _: OpList, - ) -> Result<(RpList, output::BlockingObjectPager)> { + fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, output::BlockingPager)> { let mut path = path; if path == "/" { path = "" @@ -174,7 +170,7 @@ impl LayeredAccessor for ImmutableIndexAccessor { Ok(( RpList::default(), - Box::new(ImmutableDir::new(self.children(path))) as output::BlockingObjectPager, + Box::new(ImmutableDir::new(self.children(path))) as output::BlockingPager, )) } } @@ -212,13 +208,13 @@ impl ImmutableDir { } #[async_trait] -impl output::ObjectPage for ImmutableDir { +impl output::Page for ImmutableDir { async fn next_page(&mut self) -> Result>> { Ok(self.inner_next_page()) } } -impl output::BlockingObjectPage for ImmutableDir { +impl output::BlockingPage for ImmutableDir { fn next_page(&mut self) -> Result>> { Ok(self.inner_next_page()) } diff --git a/src/layers/logging.rs b/src/layers/logging.rs index eb06860975d..e3af10ab9dd 100644 --- a/src/layers/logging.rs +++ b/src/layers/logging.rs @@ -412,7 +412,7 @@ impl LayeredAccessor for LoggingAccessor { .await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)> { debug!( target: LOGGING_TARGET, "service={} operation={} path={} -> started", @@ -439,7 +439,7 @@ impl LayeredAccessor for LoggingAccessor { self.error_level, self.failure_level, ); - Ok((rp, Box::new(streamer) as output::ObjectPager)) + Ok((rp, Box::new(streamer) as output::Pager)) } Err(err) => { if let Some(lvl) = self.err_level(&err) { @@ -896,11 +896,7 @@ impl LayeredAccessor for LoggingAccessor { }) } - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> Result<(RpList, output::BlockingObjectPager)> { + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, output::BlockingPager)> { debug!( target: LOGGING_TARGET, "service={} operation={} path={} -> started", @@ -926,7 +922,7 @@ impl LayeredAccessor for LoggingAccessor { self.error_level, self.failure_level, ); - (rp, Box::new(li) as output::BlockingObjectPager) + (rp, Box::new(li) as output::BlockingPager) }) .map_err(|err| { if let Some(lvl) = self.err_level(&err) { @@ -1273,7 +1269,7 @@ struct LoggingPager { scheme: Scheme, path: String, finished: bool, - inner: output::ObjectPager, + inner: output::Pager, error_level: Option, failure_level: Option, } @@ -1282,7 +1278,7 @@ impl LoggingPager { fn new( scheme: Scheme, path: &str, - inner: output::ObjectPager, + inner: output::Pager, error_level: Option, failure_level: Option, ) -> Self { @@ -1340,7 +1336,7 @@ impl LoggingPager { } #[async_trait] -impl output::ObjectPage for LoggingPager { +impl output::Page for LoggingPager { async fn next_page(&mut self) -> Result>> { let res = self.inner.next_page().await; @@ -1388,7 +1384,7 @@ struct BlockingLoggingPager { scheme: Scheme, path: String, finished: bool, - inner: output::BlockingObjectPager, + inner: output::BlockingPager, error_level: Option, failure_level: Option, } @@ -1397,7 +1393,7 @@ impl BlockingLoggingPager { fn new( scheme: Scheme, path: &str, - inner: output::BlockingObjectPager, + inner: output::BlockingPager, error_level: Option, failure_level: Option, ) -> Self { @@ -1454,7 +1450,7 @@ impl BlockingLoggingPager { } } -impl output::BlockingObjectPage for BlockingLoggingPager { +impl output::BlockingPage for BlockingLoggingPager { fn next_page(&mut self) -> Result>> { let res = self.inner.next_page(); diff --git a/src/layers/metrics.rs b/src/layers/metrics.rs index fcad05ca323..74909e39c43 100644 --- a/src/layers/metrics.rs +++ b/src/layers/metrics.rs @@ -593,7 +593,7 @@ impl LayeredAccessor for MetricsAccessor { .await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)> { self.handle.requests_total_list.increment(1); let start = Instant::now(); @@ -851,11 +851,7 @@ impl LayeredAccessor for MetricsAccessor { }) } - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> Result<(RpList, output::BlockingObjectPager)> { + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, output::BlockingPager)> { self.handle.requests_total_blocking_list.increment(1); let start = Instant::now(); diff --git a/src/layers/retry.rs b/src/layers/retry.rs index 9b4ee4662bd..51f13c2c322 100644 --- a/src/layers/retry.rs +++ b/src/layers/retry.rs @@ -216,7 +216,7 @@ impl LayeredAccessor for RetryAccessor { .await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)> { { || self.inner.list(path, args.clone()) } .retry(&self.builder) .when(|e| e.is_temporary()) @@ -229,7 +229,7 @@ impl LayeredAccessor for RetryAccessor { .map(|v| { v.map(|(l, p)| { let pager = Box::new(RetryPager::new(p, path, self.builder.clone())) - as Box; + as Box; (l, pager) }) .map_err(|e| e.set_persistent()) @@ -367,11 +367,7 @@ impl LayeredAccessor for RetryAccessor { .map_err(|e| e.set_persistent()) } - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> Result<(RpList, output::BlockingObjectPager)> { + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, output::BlockingPager)> { { || self.inner.blocking_list(path, args.clone()) } .retry(&self.builder) .when(|e| e.is_temporary()) @@ -384,7 +380,7 @@ impl LayeredAccessor for RetryAccessor { .call() .map(|(rp, p)| { let p = RetryPager::new(p, path, self.builder.clone()); - let p = Box::new(p) as Box; + let p = Box::new(p) as Box; (rp, p) }) .map_err(|e| e.set_persistent()) @@ -691,7 +687,7 @@ impl

RetryPager

{ } #[async_trait] -impl output::ObjectPage for RetryPager

{ +impl output::Page for RetryPager

{ async fn next_page(&mut self) -> Result>> { if let Some(sleep) = self.sleep.take() { tokio::time::sleep(sleep).await; @@ -739,7 +735,7 @@ impl output::ObjectPage for RetryPager

{ } } -impl output::BlockingObjectPage for RetryPager

{ +impl output::BlockingPage for RetryPager

{ fn next_page(&mut self) -> Result>> { { || self.inner.next_page() } .retry(&self.policy) @@ -798,9 +794,9 @@ mod tests { )) } - async fn list(&self, _: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, _: &str, _: OpList) -> Result<(RpList, output::Pager)> { let pager = MockPager::default(); - let pager = Box::new(pager) as Box; + let pager = Box::new(pager) as Box; Ok((RpList::default(), pager)) } } @@ -865,7 +861,7 @@ mod tests { attempt: usize, } #[async_trait] - impl output::ObjectPage for MockPager { + impl output::Page for MockPager { async fn next_page(&mut self) -> Result>> { self.attempt += 1; match self.attempt { diff --git a/src/layers/tracing.rs b/src/layers/tracing.rs index 48856e0b34f..468543fd4ab 100644 --- a/src/layers/tracing.rs +++ b/src/layers/tracing.rs @@ -176,14 +176,14 @@ impl LayeredAccessor for TracingAccessor { } #[tracing::instrument(level = "debug", skip(self))] - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)> { self.inner .list(path, args) .map(|v| { v.map(|(rp, s)| { ( rp, - Box::new(TracingPager::new(Span::current(), s)) as output::ObjectPager, + Box::new(TracingPager::new(Span::current(), s)) as output::Pager, ) }) }) @@ -266,16 +266,11 @@ impl LayeredAccessor for TracingAccessor { } #[tracing::instrument(level = "debug", skip(self))] - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> Result<(RpList, output::BlockingObjectPager)> { + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, output::BlockingPager)> { self.inner.blocking_list(path, args).map(|(rp, it)| { ( rp, - Box::new(BlockingTracingPager::new(Span::current(), it)) - as output::BlockingObjectPager, + Box::new(BlockingTracingPager::new(Span::current(), it)) as output::BlockingPager, ) }) } @@ -355,11 +350,11 @@ impl Read for TracingReader { struct TracingPager { span: Span, - inner: output::ObjectPager, + inner: output::Pager, } impl TracingPager { - fn new(span: Span, streamer: output::ObjectPager) -> Self { + fn new(span: Span, streamer: output::Pager) -> Self { Self { span, inner: streamer, @@ -368,7 +363,7 @@ impl TracingPager { } #[async_trait] -impl output::ObjectPage for TracingPager { +impl output::Page for TracingPager { #[tracing::instrument(parent = &self.span, level = "debug", skip_all)] async fn next_page(&mut self) -> Result>> { self.inner.next_page().await @@ -377,16 +372,16 @@ impl output::ObjectPage for TracingPager { struct BlockingTracingPager { span: Span, - inner: output::BlockingObjectPager, + inner: output::BlockingPager, } impl BlockingTracingPager { - fn new(span: Span, inner: output::BlockingObjectPager) -> Self { + fn new(span: Span, inner: output::BlockingPager) -> Self { Self { span, inner } } } -impl output::BlockingObjectPage for BlockingTracingPager { +impl output::BlockingPage for BlockingTracingPager { #[tracing::instrument(parent = &self.span, level = "debug", skip_all)] fn next_page(&mut self) -> Result>> { self.inner.next_page() diff --git a/src/object/list.rs b/src/object/list.rs index bec01bf9e06..dd0327a3b85 100644 --- a/src/object/list.rs +++ b/src/object/list.rs @@ -32,19 +32,19 @@ use crate::*; /// call `next_page` directly. pub struct ObjectLister { acc: FusedAccessor, - pager: Option, + pager: Option, buf: VecDeque, /// We will move `pager` inside future and return it back while future is ready. /// Thus, we should not allow calling other function while we already have /// a future. #[allow(clippy::type_complexity)] - fut: Option>>)>>, + fut: Option>>)>>, } impl ObjectLister { /// Create a new object lister. - pub fn new(op: Operator, pager: output::ObjectPager) -> Self { + pub fn new(op: Operator, pager: output::Pager) -> Self { Self { acc: op.inner(), pager: Some(pager), @@ -135,13 +135,13 @@ impl Stream for ObjectLister { pub struct BlockingObjectLister { acc: FusedAccessor, - pager: output::BlockingObjectPager, + pager: output::BlockingPager, buf: VecDeque, } impl BlockingObjectLister { /// Create a new object lister. - pub fn new(acc: FusedAccessor, pager: output::BlockingObjectPager) -> Self { + pub fn new(acc: FusedAccessor, pager: output::BlockingPager) -> Self { Self { acc, pager, diff --git a/src/raw/accessor.rs b/src/raw/accessor.rs index 18b4a2da55a..8d90b6b1f26 100644 --- a/src/raw/accessor.rs +++ b/src/raw/accessor.rs @@ -160,7 +160,7 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// /// - Input path MUST be dir path, DON'T NEED to check object mode. /// - List non-exist dir should return Empty. - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)> { let (_, _) = (path, args); Err(Error::new( @@ -351,11 +351,7 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// /// - Require capability: `Blocking` /// - List non-exist dir should return Empty. - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> Result<(RpList, output::BlockingObjectPager)> { + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, output::BlockingPager)> { let (_, _) = (path, args); Err(Error::new( @@ -392,7 +388,7 @@ impl Accessor for Arc { async fn delete(&self, path: &str, args: OpDelete) -> Result { self.as_ref().delete(path, args).await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)> { self.as_ref().list(path, args).await } @@ -450,11 +446,7 @@ impl Accessor for Arc { fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { self.as_ref().blocking_delete(path, args) } - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> Result<(RpList, output::BlockingObjectPager)> { + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, output::BlockingPager)> { self.as_ref().blocking_list(path, args) } } @@ -488,7 +480,7 @@ impl Accessor for FusedAccessor { async fn delete(&self, path: &str, args: OpDelete) -> Result { self.as_ref().delete(path, args).await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)> { self.as_ref().list(path, args).await } @@ -546,11 +538,7 @@ impl Accessor for FusedAccessor { fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { self.as_ref().blocking_delete(path, args) } - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> Result<(RpList, output::BlockingObjectPager)> { + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, output::BlockingPager)> { self.as_ref().blocking_list(path, args) } } diff --git a/src/raw/io/output/entry.rs b/src/raw/io/output/entry.rs index 0f59f98c685..ab14b7335af 100644 --- a/src/raw/io/output/entry.rs +++ b/src/raw/io/output/entry.rs @@ -17,7 +17,7 @@ use crate::ObjectMetadata; use crate::ObjectMode; use crate::Operator; -/// Entry is returned by `ObjectPage` or `BlockingObjectPage` +/// Entry is returned by `Page` or `BlockingPage` /// during list operations. #[derive(Debug, Clone)] pub struct Entry { diff --git a/src/raw/io/output/mod.rs b/src/raw/io/output/mod.rs index 1729dfe73b7..d3f47f9aba2 100644 --- a/src/raw/io/output/mod.rs +++ b/src/raw/io/output/mod.rs @@ -46,7 +46,7 @@ mod entry; pub use entry::Entry; mod page; -pub use page::BlockingObjectPage; -pub use page::BlockingObjectPager; -pub use page::ObjectPage; -pub use page::ObjectPager; +pub use page::BlockingPage; +pub use page::BlockingPager; +pub use page::Page; +pub use page::Pager; diff --git a/src/raw/io/output/page.rs b/src/raw/io/output/page.rs index 122a003e9c0..87e61bcb3f5 100644 --- a/src/raw/io/output/page.rs +++ b/src/raw/io/output/page.rs @@ -17,12 +17,12 @@ use async_trait::async_trait; use super::Entry; use crate::*; -/// ObjectPage trait is used by [`Accessor`] to implement `list` operation. +/// Page trait is used by [`Accessor`] to implement `list` operation. /// -/// `list` will return a boxed `ObjectPage` which allow users to call `next_page` +/// `list` will return a boxed `Page` which allow users to call `next_page` /// to fecth a new page of [`Entry`]. #[async_trait] -pub trait ObjectPage: Send + Sync + 'static { +pub trait Page: Send + Sync + 'static { /// Fetch a new page of [`Entry`] /// /// `Ok(None)` means all object pages have been returned. Any following call @@ -30,25 +30,25 @@ pub trait ObjectPage: Send + Sync + 'static { async fn next_page(&mut self) -> Result>>; } -/// The boxed version of [`ObjectPage`] -pub type ObjectPager = Box; +/// The boxed version of [`Page`] +pub type Pager = Box; #[async_trait] -impl ObjectPage for ObjectPager { +impl Page for Pager { async fn next_page(&mut self) -> Result>> { self.as_mut().next_page().await } } #[async_trait] -impl ObjectPage for () { +impl Page for () { async fn next_page(&mut self) -> Result>> { Ok(None) } } -/// BlockingObjectPage is the blocking version of [`ObjectPage`]. -pub trait BlockingObjectPage: 'static { +/// BlockingPage is the blocking version of [`Page`]. +pub trait BlockingPage: 'static { /// Fetch a new page of [`Entry`] /// /// `Ok(None)` means all object pages have been returned. Any following call @@ -56,16 +56,16 @@ pub trait BlockingObjectPage: 'static { fn next_page(&mut self) -> Result>>; } -/// BlockingObjectPager is a boxed [`BlockingObjectPage`] -pub type BlockingObjectPager = Box; +/// BlockingPager is a boxed [`BlockingPage`] +pub type BlockingPager = Box; -impl BlockingObjectPage for BlockingObjectPager { +impl BlockingPage for BlockingPager { fn next_page(&mut self) -> Result>> { self.as_mut().next_page() } } -impl BlockingObjectPage for () { +impl BlockingPage for () { fn next_page(&mut self) -> Result>> { Ok(None) } diff --git a/src/raw/io/walk.rs b/src/raw/io/walk.rs index 91d1d8a8bef..ebee1751653 100644 --- a/src/raw/io/walk.rs +++ b/src/raw/io/walk.rs @@ -59,7 +59,7 @@ const WALK_BUFFER_SIZE: usize = 256; pub struct TopDownWalker { acc: FusedAccessor, dirs: VecDeque, - pagers: Vec<(output::ObjectPager, Vec)>, + pagers: Vec<(output::Pager, Vec)>, res: Vec, } @@ -80,7 +80,7 @@ impl TopDownWalker { } #[async_trait] -impl output::ObjectPage for TopDownWalker { +impl output::Page for TopDownWalker { async fn next_page(&mut self) -> Result>> { loop { if let Some(de) = self.dirs.pop_front() { @@ -172,7 +172,7 @@ impl output::ObjectPage for TopDownWalker { pub struct BottomUpWalker { acc: FusedAccessor, dirs: VecDeque, - pagers: Vec<(output::ObjectPager, output::Entry, Vec)>, + pagers: Vec<(output::Pager, output::Entry, Vec)>, res: Vec, } @@ -192,7 +192,7 @@ impl BottomUpWalker { } #[async_trait] -impl output::ObjectPage for BottomUpWalker { +impl output::Page for BottomUpWalker { async fn next_page(&mut self) -> Result>> { loop { if let Some(de) = self.dirs.pop_back() { diff --git a/src/raw/layer.rs b/src/raw/layer.rs index 2e13e7a132d..60224d81b4d 100644 --- a/src/raw/layer.rs +++ b/src/raw/layer.rs @@ -130,7 +130,7 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 'static { self.inner().delete(path, args).await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)> { self.inner().list(path, args).await } @@ -194,11 +194,7 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 'static { self.inner().blocking_delete(path, args) } - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> Result<(RpList, output::BlockingObjectPager)> { + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, output::BlockingPager)> { self.inner().blocking_list(path, args) } } @@ -232,7 +228,7 @@ impl Accessor for L { (self as &L).delete(path, args).await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)> { (self as &L).list(path, args).await } @@ -298,11 +294,7 @@ impl Accessor for L { (self as &L).blocking_delete(path, args) } - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> Result<(RpList, output::BlockingObjectPager)> { + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, output::BlockingPager)> { (self as &L).blocking_list(path, args) } } diff --git a/src/services/azblob/backend.rs b/src/services/azblob/backend.rs index 8fd62f0427c..11d4840c289 100644 --- a/src/services/azblob/backend.rs +++ b/src/services/azblob/backend.rs @@ -515,14 +515,14 @@ impl Accessor for AzblobBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::Pager)> { let op = Box::new(DirStream::new( Arc::new(self.clone()), self.root.clone(), path.to_string(), )); - Ok((RpList::default(), op as output::ObjectPager)) + Ok((RpList::default(), op as output::Pager)) } } diff --git a/src/services/azblob/dir_stream.rs b/src/services/azblob/dir_stream.rs index 6c63161af7b..e87e2cf82ce 100644 --- a/src/services/azblob/dir_stream.rs +++ b/src/services/azblob/dir_stream.rs @@ -48,7 +48,7 @@ impl DirStream { } #[async_trait] -impl output::ObjectPage for DirStream { +impl output::Page for DirStream { async fn next_page(&mut self) -> Result>> { if self.done { return Ok(None); diff --git a/src/services/azdfs/backend.rs b/src/services/azdfs/backend.rs index bb275791fa1..5d7d50dd280 100644 --- a/src/services/azdfs/backend.rs +++ b/src/services/azdfs/backend.rs @@ -420,14 +420,14 @@ impl Accessor for AzdfsBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::Pager)> { let op = Box::new(DirStream::new( Arc::new(self.clone()), self.root.clone(), path.to_string(), )); - Ok((RpList::default(), op as output::ObjectPager)) + Ok((RpList::default(), op as output::Pager)) } } diff --git a/src/services/azdfs/dir_stream.rs b/src/services/azdfs/dir_stream.rs index 4e6d43a3c2d..5b9bfb76065 100644 --- a/src/services/azdfs/dir_stream.rs +++ b/src/services/azdfs/dir_stream.rs @@ -47,7 +47,7 @@ impl DirStream { } #[async_trait] -impl output::ObjectPage for DirStream { +impl output::Page for DirStream { async fn next_page(&mut self) -> Result>> { if self.done { return Ok(None); diff --git a/src/services/fs/backend.rs b/src/services/fs/backend.rs index 3afd3bf9a23..a151af6d1bd 100644 --- a/src/services/fs/backend.rs +++ b/src/services/fs/backend.rs @@ -508,14 +508,14 @@ impl Accessor for FsBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::Pager)> { let p = self.root.join(path.trim_end_matches('/')); let f = match tokio::fs::read_dir(&p).await { Ok(rd) => rd, Err(e) => { return if e.kind() == io::ErrorKind::NotFound { - Ok((RpList::default(), Box::new(()) as output::ObjectPager)) + Ok((RpList::default(), Box::new(()) as output::Pager)) } else { Err(parse_io_error(e)) }; @@ -712,21 +712,14 @@ impl Accessor for FsBackend { } } - fn blocking_list( - &self, - path: &str, - _: OpList, - ) -> Result<(RpList, output::BlockingObjectPager)> { + fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, output::BlockingPager)> { let p = self.root.join(path.trim_end_matches('/')); let f = match std::fs::read_dir(p) { Ok(rd) => rd, Err(e) => { return if e.kind() == io::ErrorKind::NotFound { - Ok(( - RpList::default(), - Box::new(()) as output::BlockingObjectPager, - )) + Ok((RpList::default(), Box::new(()) as output::BlockingPager)) } else { Err(parse_io_error(e)) }; diff --git a/src/services/fs/dir_stream.rs b/src/services/fs/dir_stream.rs index c7bfc461fc7..d780db5f2be 100644 --- a/src/services/fs/dir_stream.rs +++ b/src/services/fs/dir_stream.rs @@ -42,7 +42,7 @@ impl DirPager { } #[async_trait] -impl output::ObjectPage for DirPager { +impl output::Page for DirPager { async fn next_page(&mut self) -> Result>> { let mut oes: Vec = Vec::with_capacity(self.size); @@ -104,7 +104,7 @@ impl BlockingDirPager { } } -impl output::BlockingObjectPage for BlockingDirPager { +impl output::BlockingPage for BlockingDirPager { fn next_page(&mut self) -> Result>> { let mut oes: Vec = Vec::with_capacity(self.size); diff --git a/src/services/ftp/backend.rs b/src/services/ftp/backend.rs index e786809b389..cd50eaf1317 100644 --- a/src/services/ftp/backend.rs +++ b/src/services/ftp/backend.rs @@ -447,7 +447,7 @@ impl Accessor for FtpBackend { Ok(RpDelete::default()) } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::Pager)> { let mut ftp_stream = self.ftp_connect(Operation::List).await?; let pathname = if path == "/" { None } else { Some(path) }; @@ -457,8 +457,7 @@ impl Accessor for FtpBackend { Ok(( RpList::default(), - Box::new(DirStream::new(if path == "/" { "" } else { path }, rd)) - as output::ObjectPager, + Box::new(DirStream::new(if path == "/" { "" } else { path }, rd)) as output::Pager, )) } } diff --git a/src/services/ftp/dir_stream.rs b/src/services/ftp/dir_stream.rs index 448f1f09788..438f056e417 100644 --- a/src/services/ftp/dir_stream.rs +++ b/src/services/ftp/dir_stream.rs @@ -70,7 +70,7 @@ impl DirStream { } #[async_trait] -impl output::ObjectPage for DirStream { +impl output::Page for DirStream { async fn next_page(&mut self) -> Result>> { let mut oes: Vec = Vec::with_capacity(self.size); diff --git a/src/services/gcs/backend.rs b/src/services/gcs/backend.rs index d276c15ce37..79446a79050 100644 --- a/src/services/gcs/backend.rs +++ b/src/services/gcs/backend.rs @@ -462,11 +462,10 @@ impl Accessor for GcsBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::Pager)> { Ok(( RpList::default(), - Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) - as output::ObjectPager, + Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) as output::Pager, )) } } diff --git a/src/services/gcs/dir_stream.rs b/src/services/gcs/dir_stream.rs index 26cc5e08ce6..5c346ee165e 100644 --- a/src/services/gcs/dir_stream.rs +++ b/src/services/gcs/dir_stream.rs @@ -52,7 +52,7 @@ impl DirStream { } #[async_trait] -impl output::ObjectPage for DirStream { +impl output::Page for DirStream { async fn next_page(&mut self) -> Result>> { if self.done { return Ok(None); diff --git a/src/services/hdfs/backend.rs b/src/services/hdfs/backend.rs index 9142b6a94c1..0aa542e6004 100644 --- a/src/services/hdfs/backend.rs +++ b/src/services/hdfs/backend.rs @@ -390,14 +390,14 @@ impl Accessor for HdfsBackend { Ok(RpDelete::default()) } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::Pager)> { let p = build_rooted_abs_path(&self.root, path); let f = match self.client.read_dir(&p) { Ok(f) => f, Err(e) => { return if e.kind() == io::ErrorKind::NotFound { - Ok((RpList::default(), Box::new(()) as output::ObjectPager)) + Ok((RpList::default(), Box::new(()) as output::Pager)) } else { Err(parse_io_error(e)) } @@ -564,21 +564,14 @@ impl Accessor for HdfsBackend { Ok(RpDelete::default()) } - fn blocking_list( - &self, - path: &str, - _: OpList, - ) -> Result<(RpList, output::BlockingObjectPager)> { + fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, output::BlockingPager)> { let p = build_rooted_abs_path(&self.root, path); let f = match self.client.read_dir(&p) { Ok(f) => f, Err(e) => { return if e.kind() == io::ErrorKind::NotFound { - Ok(( - RpList::default(), - Box::new(()) as output::BlockingObjectPager, - )) + Ok((RpList::default(), Box::new(()) as output::BlockingPager)) } else { Err(parse_io_error(e)) } diff --git a/src/services/hdfs/dir_stream.rs b/src/services/hdfs/dir_stream.rs index d7049958781..00accd1d3f7 100644 --- a/src/services/hdfs/dir_stream.rs +++ b/src/services/hdfs/dir_stream.rs @@ -39,7 +39,7 @@ impl DirStream { } #[async_trait] -impl output::ObjectPage for DirStream { +impl output::Page for DirStream { async fn next_page(&mut self) -> Result>> { let mut oes: Vec = Vec::with_capacity(self.size); @@ -73,7 +73,7 @@ impl output::ObjectPage for DirStream { } } -impl output::BlockingObjectPage for DirStream { +impl output::BlockingPage for DirStream { fn next_page(&mut self) -> Result>> { let mut oes: Vec = Vec::with_capacity(self.size); diff --git a/src/services/ipfs/backend.rs b/src/services/ipfs/backend.rs index 6e00440d8dc..250c9489621 100644 --- a/src/services/ipfs/backend.rs +++ b/src/services/ipfs/backend.rs @@ -381,10 +381,10 @@ impl Accessor for IpfsBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::Pager)> { Ok(( RpList::default(), - Box::new(DirStream::new(Arc::new(self.clone()), path)) as output::ObjectPager, + Box::new(DirStream::new(Arc::new(self.clone()), path)) as output::Pager, )) } } @@ -460,7 +460,7 @@ impl DirStream { } #[async_trait] -impl output::ObjectPage for DirStream { +impl output::Page for DirStream { async fn next_page(&mut self) -> Result>> { if self.consumed { return Ok(None); diff --git a/src/services/ipmfs/backend.rs b/src/services/ipmfs/backend.rs index 473f5fb8559..133cf536781 100644 --- a/src/services/ipmfs/backend.rs +++ b/src/services/ipmfs/backend.rs @@ -168,11 +168,10 @@ impl Accessor for IpmfsBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::Pager)> { Ok(( RpList::default(), - Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) - as output::ObjectPager, + Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) as output::Pager, )) } } diff --git a/src/services/ipmfs/dir_stream.rs b/src/services/ipmfs/dir_stream.rs index 7ce0f905826..fbd26ac22a2 100644 --- a/src/services/ipmfs/dir_stream.rs +++ b/src/services/ipmfs/dir_stream.rs @@ -45,7 +45,7 @@ impl DirStream { } #[async_trait] -impl output::ObjectPage for DirStream { +impl output::Page for DirStream { async fn next_page(&mut self) -> Result>> { if self.consumed { return Ok(None); diff --git a/src/services/obs/backend.rs b/src/services/obs/backend.rs index 13cbca6fa5c..5258f4677e0 100644 --- a/src/services/obs/backend.rs +++ b/src/services/obs/backend.rs @@ -406,11 +406,10 @@ impl Accessor for ObsBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::Pager)> { Ok(( RpList::default(), - Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) - as output::ObjectPager, + Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) as output::Pager, )) } } diff --git a/src/services/obs/dir_stream.rs b/src/services/obs/dir_stream.rs index d93dca00b47..cceb38b0a1a 100644 --- a/src/services/obs/dir_stream.rs +++ b/src/services/obs/dir_stream.rs @@ -50,7 +50,7 @@ impl DirStream { } #[async_trait] -impl output::ObjectPage for DirStream { +impl output::Page for DirStream { async fn next_page(&mut self) -> Result>> { if self.done { return Ok(None); diff --git a/src/services/oss/backend.rs b/src/services/oss/backend.rs index f9e7e4744e5..e9740bc38ca 100644 --- a/src/services/oss/backend.rs +++ b/src/services/oss/backend.rs @@ -485,11 +485,10 @@ impl Accessor for OssBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::Pager)> { Ok(( RpList::default(), - Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) - as output::ObjectPager, + Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) as output::Pager, )) } diff --git a/src/services/oss/dir_stream.rs b/src/services/oss/dir_stream.rs index dcefb545782..2c483722c1a 100644 --- a/src/services/oss/dir_stream.rs +++ b/src/services/oss/dir_stream.rs @@ -56,7 +56,7 @@ impl DirStream { } #[async_trait] -impl output::ObjectPage for DirStream { +impl output::Page for DirStream { async fn next_page(&mut self) -> Result>> { if self.done { return Ok(None); diff --git a/src/services/s3/backend.rs b/src/services/s3/backend.rs index 3b3d9b8313b..5905021a824 100644 --- a/src/services/s3/backend.rs +++ b/src/services/s3/backend.rs @@ -1210,11 +1210,10 @@ impl Accessor for S3Backend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::Pager)> { Ok(( RpList::default(), - Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) - as output::ObjectPager, + Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) as output::Pager, )) } diff --git a/src/services/s3/dir_stream.rs b/src/services/s3/dir_stream.rs index 2dccf7504bb..d640a54cabd 100644 --- a/src/services/s3/dir_stream.rs +++ b/src/services/s3/dir_stream.rs @@ -54,7 +54,7 @@ impl DirStream { } #[async_trait] -impl output::ObjectPage for DirStream { +impl output::Page for DirStream { async fn next_page(&mut self) -> Result>> { if self.done { return Ok(None); diff --git a/src/services/webhdfs/backend.rs b/src/services/webhdfs/backend.rs index 816d51e77ab..cbd83551b76 100644 --- a/src/services/webhdfs/backend.rs +++ b/src/services/webhdfs/backend.rs @@ -672,7 +672,7 @@ impl Accessor for WebhdfsBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::Pager)> { let path = path.trim_end_matches('/'); let req = self.webhdfs_list_status_req(path)?; @@ -693,14 +693,14 @@ impl Accessor for WebhdfsBackend { let objects = DirStream::new(path, file_statuses); Ok(( RpList::default(), - Box::new(objects) as Box, + Box::new(objects) as Box, )) } StatusCode::NOT_FOUND => { let objects = DirStream::new(path, vec![]); Ok(( RpList::default(), - Box::new(objects) as Box, + Box::new(objects) as Box, )) } _ => Err(parse_error(resp).await?), diff --git a/src/services/webhdfs/dir_stream.rs b/src/services/webhdfs/dir_stream.rs index 359f6a516e1..3496582cdbb 100644 --- a/src/services/webhdfs/dir_stream.rs +++ b/src/services/webhdfs/dir_stream.rs @@ -33,7 +33,7 @@ impl DirStream { } #[async_trait] -impl output::ObjectPage for DirStream { +impl output::Page for DirStream { async fn next_page(&mut self) -> Result>> { if self.statuses.is_empty() { return Ok(None); diff --git a/src/services/webhdfs/message.rs b/src/services/webhdfs/message.rs index 559e3ac2799..a60467b3809 100644 --- a/src/services/webhdfs/message.rs +++ b/src/services/webhdfs/message.rs @@ -85,7 +85,7 @@ pub(super) enum FileStatusType { #[cfg(test)] mod test { use super::*; - use crate::raw::output::ObjectPage; + use crate::raw::output::Page; use crate::services::webhdfs::dir_stream::DirStream; use crate::ObjectMode;