From 90e4fe4672c162e7d01db2870113fd5dc89dc842 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 9 Feb 2023 15:00:37 +0800 Subject: [PATCH] Move object entry Signed-off-by: Xuanwo --- src/docs/upgrade.md | 2 +- src/layers/concurrent_limit.rs | 29 +++++++++++--------- src/layers/error_context.rs | 16 ++++++----- src/layers/immutable_index.rs | 24 ++++++++++------- src/layers/logging.rs | 28 ++++++++++--------- src/layers/metrics.rs | 8 ++++-- src/layers/retry.rs | 36 ++++++++++++++----------- src/layers/tracing.rs | 29 +++++++++++--------- src/lib.rs | 2 +- src/object/list.rs | 22 ++++++++++----- src/object/object.rs | 6 ++--- src/raw/accessor.rs | 24 ++++++++++++----- src/raw/io/output/mod.rs | 9 +++++++ src/raw/{ => io/output}/object_entry.rs | 0 src/raw/{ => io/output}/object_page.rs | 2 +- src/raw/io/walk.rs | 31 ++++++++++++--------- src/raw/layer.rs | 16 ++++++++--- src/raw/mod.rs | 9 ------- src/services/azblob/backend.rs | 4 +-- src/services/azblob/dir_stream.rs | 8 +++--- src/services/azdfs/backend.rs | 4 +-- src/services/azdfs/dir_stream.rs | 6 ++--- src/services/fs/backend.rs | 15 ++++++++--- src/services/fs/dir_stream.rs | 24 ++++++++--------- src/services/ftp/backend.rs | 5 ++-- src/services/ftp/dir_stream.rs | 12 ++++----- src/services/gcs/backend.rs | 5 ++-- src/services/gcs/dir_stream.rs | 8 +++--- src/services/hdfs/backend.rs | 15 ++++++++--- src/services/hdfs/dir_stream.rs | 24 ++++++++--------- src/services/ipfs/backend.rs | 10 +++---- src/services/ipmfs/backend.rs | 5 ++-- src/services/ipmfs/dir_stream.rs | 6 ++--- src/services/obs/backend.rs | 5 ++-- src/services/obs/dir_stream.rs | 8 +++--- src/services/oss/backend.rs | 5 ++-- src/services/oss/dir_stream.rs | 8 +++--- src/services/s3/backend.rs | 5 ++-- src/services/s3/dir_stream.rs | 8 +++--- src/services/webhdfs/backend.rs | 12 ++++++--- src/services/webhdfs/dir_stream.rs | 6 ++--- src/services/webhdfs/message.rs | 2 +- 42 files changed, 297 insertions(+), 206 deletions(-) rename src/raw/{ => io/output}/object_entry.rs (100%) rename src/raw/{ => io/output}/object_page.rs (98%) diff --git a/src/docs/upgrade.md b/src/docs/upgrade.md index 1113e6ca523..fb2d6623635 100644 --- a/src/docs/upgrade.md +++ b/src/docs/upgrade.md @@ -155,7 +155,7 @@ Since v0.21, `Accessor` will return `ObjectPager` for `List`: ```diff - async fn list(&self, path: &str, args: OpList) -> Result -+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)> ++ async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> ``` And `Object` will return an `ObjectLister` which is built upon `ObjectPage`: diff --git a/src/layers/concurrent_limit.rs b/src/layers/concurrent_limit.rs index fe9f1c67078..b37124233d9 100644 --- a/src/layers/concurrent_limit.rs +++ b/src/layers/concurrent_limit.rs @@ -135,7 +135,7 @@ impl LayeredAccessor for ConcurrentLimitAccessor { self.inner.delete(path, args).await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { let permit = self .semaphore .clone() @@ -146,7 +146,7 @@ impl LayeredAccessor for ConcurrentLimitAccessor { self.inner.list(path, args).await.map(|(rp, s)| { ( rp, - Box::new(ConcurrentLimitPager::new(s, permit)) as ObjectPager, + Box::new(ConcurrentLimitPager::new(s, permit)) as output::ObjectPager, ) }) } @@ -261,7 +261,11 @@ impl LayeredAccessor for ConcurrentLimitAccessor { self.inner.blocking_delete(path, args) } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, BlockingObjectPager)> { + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> Result<(RpList, output::BlockingObjectPager)> { let permit = self .semaphore .clone() @@ -271,7 +275,8 @@ impl LayeredAccessor for ConcurrentLimitAccessor { self.inner.blocking_list(path, args).map(|(rp, it)| { ( rp, - Box::new(BlockingConcurrentLimitPager::new(it, permit)) as BlockingObjectPager, + Box::new(BlockingConcurrentLimitPager::new(it, permit)) + as output::BlockingObjectPager, ) }) } @@ -322,14 +327,14 @@ impl output::BlockingRead for ConcurrentLimitReader } struct ConcurrentLimitPager { - inner: ObjectPager, + inner: output::ObjectPager, // Hold on this permit until this streamer has been dropped. _permit: OwnedSemaphorePermit, } impl ConcurrentLimitPager { - fn new(inner: ObjectPager, permit: OwnedSemaphorePermit) -> Self { + fn new(inner: output::ObjectPager, permit: OwnedSemaphorePermit) -> Self { Self { inner, _permit: permit, @@ -338,21 +343,21 @@ impl ConcurrentLimitPager { } #[async_trait] -impl ObjectPage for ConcurrentLimitPager { - async fn next_page(&mut self) -> Result>> { +impl output::ObjectPage for ConcurrentLimitPager { + async fn next_page(&mut self) -> Result>> { self.inner.next_page().await } } struct BlockingConcurrentLimitPager { - inner: BlockingObjectPager, + inner: output::BlockingObjectPager, // Hold on this permit until this iterator has been dropped. _permit: OwnedSemaphorePermit, } impl BlockingConcurrentLimitPager { - fn new(inner: BlockingObjectPager, permit: OwnedSemaphorePermit) -> Self { + fn new(inner: output::BlockingObjectPager, permit: OwnedSemaphorePermit) -> Self { Self { inner, _permit: permit, @@ -360,8 +365,8 @@ impl BlockingConcurrentLimitPager { } } -impl BlockingObjectPage for BlockingConcurrentLimitPager { - fn next_page(&mut self) -> Result>> { +impl output::BlockingObjectPage for BlockingConcurrentLimitPager { + fn next_page(&mut self) -> Result>> { self.inner.next_page() } } diff --git a/src/layers/error_context.rs b/src/layers/error_context.rs index fe3917ea7e5..9d114e9a5f6 100644 --- a/src/layers/error_context.rs +++ b/src/layers/error_context.rs @@ -118,7 +118,7 @@ impl LayeredAccessor for ErrorContextAccessor { .await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { self.inner .list(path, args) .map_ok(|(rp, os)| { @@ -134,7 +134,7 @@ impl LayeredAccessor for ErrorContextAccessor { // // ref: https://github.com/rust-lang/rust/issues/80437 inner: os, - }) as ObjectPager, + }) as output::ObjectPager, ) }) .map_err(|err| { @@ -259,7 +259,11 @@ impl LayeredAccessor for ErrorContextAccessor { }) } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, BlockingObjectPager)> { + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> Result<(RpList, output::BlockingObjectPager)> { self.inner.blocking_list(path, args).map_err(|err| { err.with_operation(Operation::BlockingList.into_static()) .with_context("service", self.meta.scheme()) @@ -268,15 +272,15 @@ impl LayeredAccessor for ErrorContextAccessor { } } -struct ObjectStreamErrorContextWrapper { +struct ObjectStreamErrorContextWrapper { scheme: Scheme, path: String, inner: T, } #[async_trait::async_trait] -impl ObjectPage for ObjectStreamErrorContextWrapper { - async fn next_page(&mut self) -> Result>> { +impl output::ObjectPage for ObjectStreamErrorContextWrapper { + async fn next_page(&mut self) -> Result>> { self.inner.next_page().await.map_err(|err| { err.with_operation("ObjectPage::next_page") .with_context("service", self.scheme) diff --git a/src/layers/immutable_index.rs b/src/layers/immutable_index.rs index f98b0bbb3e2..771e54aa36b 100644 --- a/src/layers/immutable_index.rs +++ b/src/layers/immutable_index.rs @@ -146,7 +146,7 @@ impl LayeredAccessor for ImmutableIndexAccessor { self.inner.read(path, args).await } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { let mut path = path; if path == "/" { path = "" @@ -154,7 +154,7 @@ impl LayeredAccessor for ImmutableIndexAccessor { Ok(( RpList::default(), - Box::new(ImmutableDir::new(self.children(path))) as ObjectPager, + Box::new(ImmutableDir::new(self.children(path))) as output::ObjectPager, )) } @@ -162,7 +162,11 @@ impl LayeredAccessor for ImmutableIndexAccessor { self.inner.blocking_read(path, args) } - fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, BlockingObjectPager)> { + fn blocking_list( + &self, + path: &str, + _: OpList, + ) -> Result<(RpList, output::BlockingObjectPager)> { let mut path = path; if path == "/" { path = "" @@ -170,7 +174,7 @@ impl LayeredAccessor for ImmutableIndexAccessor { Ok(( RpList::default(), - Box::new(ImmutableDir::new(self.children(path))) as BlockingObjectPager, + Box::new(ImmutableDir::new(self.children(path))) as output::BlockingObjectPager, )) } } @@ -184,7 +188,7 @@ impl ImmutableDir { Self { idx } } - fn inner_next_page(&mut self) -> Option> { + fn inner_next_page(&mut self) -> Option> { if self.idx.is_empty() { return None; } @@ -200,7 +204,7 @@ impl ImmutableDir { ObjectMode::FILE }; let meta = ObjectMetadata::new(mode); - ObjectEntry::with(v, meta) + output::ObjectEntry::with(v, meta) }) .collect(), ) @@ -208,14 +212,14 @@ impl ImmutableDir { } #[async_trait] -impl ObjectPage for ImmutableDir { - async fn next_page(&mut self) -> Result>> { +impl output::ObjectPage for ImmutableDir { + async fn next_page(&mut self) -> Result>> { Ok(self.inner_next_page()) } } -impl BlockingObjectPage for ImmutableDir { - fn next_page(&mut self) -> Result>> { +impl output::BlockingObjectPage for ImmutableDir { + fn next_page(&mut self) -> Result>> { Ok(self.inner_next_page()) } } diff --git a/src/layers/logging.rs b/src/layers/logging.rs index 04ad8fe3b18..dabfbc820d6 100644 --- a/src/layers/logging.rs +++ b/src/layers/logging.rs @@ -412,7 +412,7 @@ impl LayeredAccessor for LoggingAccessor { .await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { debug!( target: LOGGING_TARGET, "service={} operation={} path={} -> started", @@ -439,7 +439,7 @@ impl LayeredAccessor for LoggingAccessor { self.error_level, self.failure_level, ); - Ok((rp, Box::new(streamer) as ObjectPager)) + Ok((rp, Box::new(streamer) as output::ObjectPager)) } Err(err) => { if let Some(lvl) = self.err_level(&err) { @@ -896,7 +896,11 @@ impl LayeredAccessor for LoggingAccessor { }) } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, BlockingObjectPager)> { + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> Result<(RpList, output::BlockingObjectPager)> { debug!( target: LOGGING_TARGET, "service={} operation={} path={} -> started", @@ -922,7 +926,7 @@ impl LayeredAccessor for LoggingAccessor { self.error_level, self.failure_level, ); - (rp, Box::new(li) as BlockingObjectPager) + (rp, Box::new(li) as output::BlockingObjectPager) }) .map_err(|err| { if let Some(lvl) = self.err_level(&err) { @@ -1269,7 +1273,7 @@ struct LoggingPager { scheme: Scheme, path: String, finished: bool, - inner: ObjectPager, + inner: output::ObjectPager, error_level: Option, failure_level: Option, } @@ -1278,7 +1282,7 @@ impl LoggingPager { fn new( scheme: Scheme, path: &str, - inner: ObjectPager, + inner: output::ObjectPager, error_level: Option, failure_level: Option, ) -> Self { @@ -1336,8 +1340,8 @@ impl LoggingPager { } #[async_trait] -impl ObjectPage for LoggingPager { - async fn next_page(&mut self) -> Result>> { +impl output::ObjectPage for LoggingPager { + async fn next_page(&mut self) -> Result>> { let res = self.inner.next_page().await; match &res { @@ -1384,7 +1388,7 @@ struct BlockingLoggingPager { scheme: Scheme, path: String, finished: bool, - inner: BlockingObjectPager, + inner: output::BlockingObjectPager, error_level: Option, failure_level: Option, } @@ -1393,7 +1397,7 @@ impl BlockingLoggingPager { fn new( scheme: Scheme, path: &str, - inner: BlockingObjectPager, + inner: output::BlockingObjectPager, error_level: Option, failure_level: Option, ) -> Self { @@ -1450,8 +1454,8 @@ impl BlockingLoggingPager { } } -impl BlockingObjectPage for BlockingLoggingPager { - fn next_page(&mut self) -> Result>> { +impl output::BlockingObjectPage for BlockingLoggingPager { + fn next_page(&mut self) -> Result>> { let res = self.inner.next_page(); match &res { diff --git a/src/layers/metrics.rs b/src/layers/metrics.rs index 9e610db7812..fcad05ca323 100644 --- a/src/layers/metrics.rs +++ b/src/layers/metrics.rs @@ -593,7 +593,7 @@ impl LayeredAccessor for MetricsAccessor { .await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { self.handle.requests_total_list.increment(1); let start = Instant::now(); @@ -851,7 +851,11 @@ impl LayeredAccessor for MetricsAccessor { }) } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, BlockingObjectPager)> { + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> Result<(RpList, output::BlockingObjectPager)> { self.handle.requests_total_blocking_list.increment(1); let start = Instant::now(); diff --git a/src/layers/retry.rs b/src/layers/retry.rs index 400c2c5ffcc..672d21e8069 100644 --- a/src/layers/retry.rs +++ b/src/layers/retry.rs @@ -216,7 +216,7 @@ impl LayeredAccessor for RetryAccessor { .await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { { || self.inner.list(path, args.clone()) } .retry(&self.builder) .when(|e| e.is_temporary()) @@ -229,7 +229,7 @@ impl LayeredAccessor for RetryAccessor { .map(|v| { v.map(|(l, p)| { let pager = Box::new(RetryPager::new(p, path, self.builder.clone())) - as Box; + as Box; (l, pager) }) .map_err(|e| e.set_persistent()) @@ -367,7 +367,11 @@ impl LayeredAccessor for RetryAccessor { .map_err(|e| e.set_persistent()) } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, BlockingObjectPager)> { + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> Result<(RpList, output::BlockingObjectPager)> { { || self.inner.blocking_list(path, args.clone()) } .retry(&self.builder) .when(|e| e.is_temporary()) @@ -380,7 +384,7 @@ impl LayeredAccessor for RetryAccessor { .call() .map(|(rp, p)| { let p = RetryPager::new(p, path, self.builder.clone()); - let p = Box::new(p) as Box; + let p = Box::new(p) as Box; (rp, p) }) .map_err(|e| e.set_persistent()) @@ -687,8 +691,8 @@ impl

RetryPager

{ } #[async_trait] -impl ObjectPage for RetryPager

{ - async fn next_page(&mut self) -> Result>> { +impl output::ObjectPage for RetryPager

{ + async fn next_page(&mut self) -> Result>> { if let Some(sleep) = self.sleep.take() { tokio::time::sleep(sleep).await; } @@ -735,8 +739,8 @@ impl ObjectPage for RetryPager

{ } } -impl BlockingObjectPage for RetryPager

{ - fn next_page(&mut self) -> Result>> { +impl output::BlockingObjectPage for RetryPager

{ + fn next_page(&mut self) -> Result>> { { || self.inner.next_page() } .retry(&self.policy) .when(|e| e.is_temporary()) @@ -794,9 +798,9 @@ mod tests { )) } - async fn list(&self, _: &str, _: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, _: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { let pager = MockPager::default(); - let pager = Box::new(pager) as Box; + let pager = Box::new(pager) as Box; Ok((RpList::default(), pager)) } } @@ -861,8 +865,8 @@ mod tests { attempt: usize, } #[async_trait] - impl ObjectPage for MockPager { - async fn next_page(&mut self) -> Result>> { + impl output::ObjectPage for MockPager { + async fn next_page(&mut self) -> Result>> { self.attempt += 1; match self.attempt { 1 => Err(Error::new( @@ -872,8 +876,8 @@ mod tests { .set_temporary()), 2 => { let entries = vec![ - ObjectEntry::new("hello", ObjectMetadata::new(ObjectMode::FILE)), - ObjectEntry::new("world", ObjectMetadata::new(ObjectMode::FILE)), + output::ObjectEntry::new("hello", ObjectMetadata::new(ObjectMode::FILE)), + output::ObjectEntry::new("world", ObjectMetadata::new(ObjectMode::FILE)), ]; Ok(Some(entries)) } @@ -883,8 +887,8 @@ mod tests { ), 4 => { let entries = vec![ - ObjectEntry::new("2023/", ObjectMetadata::new(ObjectMode::DIR)), - ObjectEntry::new("0208/", ObjectMetadata::new(ObjectMode::DIR)), + output::ObjectEntry::new("2023/", ObjectMetadata::new(ObjectMode::DIR)), + output::ObjectEntry::new("0208/", ObjectMetadata::new(ObjectMode::DIR)), ]; Ok(Some(entries)) } diff --git a/src/layers/tracing.rs b/src/layers/tracing.rs index 0408dcca204..ee62224231f 100644 --- a/src/layers/tracing.rs +++ b/src/layers/tracing.rs @@ -176,14 +176,14 @@ impl LayeredAccessor for TracingAccessor { } #[tracing::instrument(level = "debug", skip(self))] - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { self.inner .list(path, args) .map(|v| { v.map(|(rp, s)| { ( rp, - Box::new(TracingPager::new(Span::current(), s)) as ObjectPager, + Box::new(TracingPager::new(Span::current(), s)) as output::ObjectPager, ) }) }) @@ -266,11 +266,16 @@ impl LayeredAccessor for TracingAccessor { } #[tracing::instrument(level = "debug", skip(self))] - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, BlockingObjectPager)> { + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> Result<(RpList, output::BlockingObjectPager)> { self.inner.blocking_list(path, args).map(|(rp, it)| { ( rp, - Box::new(BlockingTracingPager::new(Span::current(), it)) as BlockingObjectPager, + Box::new(BlockingTracingPager::new(Span::current(), it)) + as output::BlockingObjectPager, ) }) } @@ -350,11 +355,11 @@ impl Read for TracingReader { struct TracingPager { span: Span, - inner: ObjectPager, + inner: output::ObjectPager, } impl TracingPager { - fn new(span: Span, streamer: ObjectPager) -> Self { + fn new(span: Span, streamer: output::ObjectPager) -> Self { Self { span, inner: streamer, @@ -363,27 +368,27 @@ impl TracingPager { } #[async_trait] -impl ObjectPage for TracingPager { +impl output::ObjectPage for TracingPager { #[tracing::instrument(parent = &self.span, level = "debug", skip_all)] - async fn next_page(&mut self) -> Result>> { + async fn next_page(&mut self) -> Result>> { self.inner.next_page().await } } struct BlockingTracingPager { span: Span, - inner: BlockingObjectPager, + inner: output::BlockingObjectPager, } impl BlockingTracingPager { - fn new(span: Span, inner: BlockingObjectPager) -> Self { + fn new(span: Span, inner: output::BlockingObjectPager) -> Self { Self { span, inner } } } -impl BlockingObjectPage for BlockingTracingPager { +impl output::BlockingObjectPage for BlockingTracingPager { #[tracing::instrument(parent = &self.span, level = "debug", skip_all)] - fn next_page(&mut self) -> Result>> { + fn next_page(&mut self) -> Result>> { self.inner.next_page() } } diff --git a/src/lib.rs b/src/lib.rs index 30c1f0a42fd..041ba442564 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -116,7 +116,7 @@ mod tests { assert_eq!(88, size_of::()); assert_eq!(16, size_of::()); assert_eq!(16, size_of::()); - assert_eq!(184, size_of::()); + assert_eq!(184, size_of::()); assert_eq!(48, size_of::()); assert_eq!(160, size_of::()); assert_eq!(1, size_of::()); diff --git a/src/object/list.rs b/src/object/list.rs index f475e4a773c..7c0e42bf9a9 100644 --- a/src/object/list.rs +++ b/src/object/list.rs @@ -32,19 +32,27 @@ use crate::*; /// call `next_page` directly. pub struct ObjectLister { acc: FusedAccessor, - pager: Option, + pager: Option, - buf: VecDeque, + buf: VecDeque, /// We will move `pager` inside future and return it back while future is ready. /// Thus, we should not allow calling other function while we already have /// a future. #[allow(clippy::type_complexity)] - fut: Option>>)>>, + fut: Option< + BoxFuture< + 'static, + ( + output::ObjectPager, + Result>>, + ), + >, + >, } impl ObjectLister { /// Create a new object lister. - pub fn new(op: Operator, pager: ObjectPager) -> Self { + pub fn new(op: Operator, pager: output::ObjectPager) -> Self { Self { acc: op.inner(), pager: Some(pager), @@ -135,13 +143,13 @@ impl Stream for ObjectLister { pub struct BlockingObjectLister { acc: FusedAccessor, - pager: BlockingObjectPager, - buf: VecDeque, + pager: output::BlockingObjectPager, + buf: VecDeque, } impl BlockingObjectLister { /// Create a new object lister. - pub fn new(acc: FusedAccessor, pager: BlockingObjectPager) -> Self { + pub fn new(acc: FusedAccessor, pager: output::BlockingObjectPager) -> Self { Self { acc, pager, diff --git a/src/object/object.rs b/src/object/object.rs index b3b5eeb98ab..2b463fabd4f 100644 --- a/src/object/object.rs +++ b/src/object/object.rs @@ -1183,7 +1183,7 @@ impl Object { /// `content_md5` is a prefetched metadata field in `ObjectEntry` /// /// It doesn't mean this metadata field of object doesn't exist if `content_md5` is `None`. - /// Then you have to call `ObjectEntry::metadata()` to get the metadata you want. + /// Then you have to call `output::ObjectEntry::metadata()` to get the metadata you want. pub async fn content_md5(&self) -> Result> { { let guard = self.meta.lock(); @@ -1205,7 +1205,7 @@ impl Object { /// `last_modified` is a prefetched metadata field in `ObjectEntry` /// /// It doesn't mean this metadata field of object doesn't exist if `last_modified` is `None`. - /// Then you have to call `ObjectEntry::metadata()` to get the metadata you want. + /// Then you have to call `output::ObjectEntry::metadata()` to get the metadata you want. pub async fn last_modified(&self) -> Result> { { let guard = self.meta.lock(); @@ -1227,7 +1227,7 @@ impl Object { /// `etag` is a prefetched metadata field in `ObjectEntry`. /// /// It doesn't mean this metadata field of object doesn't exist if `etag` is `None`. - /// Then you have to call `ObjectEntry::metadata()` to get the metadata you want. + /// Then you have to call `output::ObjectEntry::metadata()` to get the metadata you want. pub async fn etag(&self) -> Result> { { let guard = self.meta.lock(); diff --git a/src/raw/accessor.rs b/src/raw/accessor.rs index add5b543923..18b4a2da55a 100644 --- a/src/raw/accessor.rs +++ b/src/raw/accessor.rs @@ -160,7 +160,7 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// /// - Input path MUST be dir path, DON'T NEED to check object mode. /// - List non-exist dir should return Empty. - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { let (_, _) = (path, args); Err(Error::new( @@ -351,7 +351,11 @@ pub trait Accessor: Send + Sync + Debug + Unpin + 'static { /// /// - Require capability: `Blocking` /// - List non-exist dir should return Empty. - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, BlockingObjectPager)> { + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> Result<(RpList, output::BlockingObjectPager)> { let (_, _) = (path, args); Err(Error::new( @@ -388,7 +392,7 @@ impl Accessor for Arc { async fn delete(&self, path: &str, args: OpDelete) -> Result { self.as_ref().delete(path, args).await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { self.as_ref().list(path, args).await } @@ -446,7 +450,11 @@ impl Accessor for Arc { fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { self.as_ref().blocking_delete(path, args) } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, BlockingObjectPager)> { + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> Result<(RpList, output::BlockingObjectPager)> { self.as_ref().blocking_list(path, args) } } @@ -480,7 +488,7 @@ impl Accessor for FusedAccessor { async fn delete(&self, path: &str, args: OpDelete) -> Result { self.as_ref().delete(path, args).await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { self.as_ref().list(path, args).await } @@ -538,7 +546,11 @@ impl Accessor for FusedAccessor { fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { self.as_ref().blocking_delete(path, args) } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, BlockingObjectPager)> { + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> Result<(RpList, output::BlockingObjectPager)> { self.as_ref().blocking_list(path, args) } } diff --git a/src/raw/io/output/mod.rs b/src/raw/io/output/mod.rs index 9b418352307..72ff1e88b5a 100644 --- a/src/raw/io/output/mod.rs +++ b/src/raw/io/output/mod.rs @@ -41,3 +41,12 @@ pub use cursor::Cursor; mod into_streamable; pub use into_streamable::into_streamable_reader; pub use into_streamable::IntoStreamableReader; + +mod object_entry; +pub use object_entry::ObjectEntry; + +mod object_page; +pub use object_page::BlockingObjectPage; +pub use object_page::BlockingObjectPager; +pub use object_page::ObjectPage; +pub use object_page::ObjectPager; diff --git a/src/raw/object_entry.rs b/src/raw/io/output/object_entry.rs similarity index 100% rename from src/raw/object_entry.rs rename to src/raw/io/output/object_entry.rs diff --git a/src/raw/object_page.rs b/src/raw/io/output/object_page.rs similarity index 98% rename from src/raw/object_page.rs rename to src/raw/io/output/object_page.rs index a59405b3b35..fd606f061bd 100644 --- a/src/raw/object_page.rs +++ b/src/raw/io/output/object_page.rs @@ -14,7 +14,7 @@ use async_trait::async_trait; -use crate::raw::*; +use super::ObjectEntry; use crate::*; /// ObjectPage trait is used by [`Accessor`] to implement `list` operation. diff --git a/src/raw/io/walk.rs b/src/raw/io/walk.rs index 3a716ce13e4..392ace32403 100644 --- a/src/raw/io/walk.rs +++ b/src/raw/io/walk.rs @@ -58,9 +58,9 @@ const WALK_BUFFER_SIZE: usize = 256; /// We only make sure the parent dirs will show up before nest dirs. pub struct TopDownWalker { acc: FusedAccessor, - dirs: VecDeque, - pagers: Vec<(ObjectPager, Vec)>, - res: Vec, + dirs: VecDeque, + pagers: Vec<(output::ObjectPager, Vec)>, + res: Vec, } impl TopDownWalker { @@ -69,7 +69,7 @@ impl TopDownWalker { let path = normalize_path(path); TopDownWalker { acc, - dirs: VecDeque::from([ObjectEntry::with( + dirs: VecDeque::from([output::ObjectEntry::with( path, ObjectMetadata::new(ObjectMode::DIR), )]), @@ -80,8 +80,8 @@ impl TopDownWalker { } #[async_trait] -impl ObjectPage for TopDownWalker { - async fn next_page(&mut self) -> Result>> { +impl output::ObjectPage for TopDownWalker { + async fn next_page(&mut self) -> Result>> { loop { if let Some(de) = self.dirs.pop_front() { let (_, op) = self.acc.list(de.path(), OpList::default()).await?; @@ -171,9 +171,13 @@ impl ObjectPage for TopDownWalker { /// always output directly while listing. pub struct BottomUpWalker { acc: FusedAccessor, - dirs: VecDeque, - pagers: Vec<(ObjectPager, ObjectEntry, Vec)>, - res: Vec, + dirs: VecDeque, + pagers: Vec<( + output::ObjectPager, + output::ObjectEntry, + Vec, + )>, + res: Vec, } impl BottomUpWalker { @@ -181,7 +185,10 @@ impl BottomUpWalker { pub fn new(acc: FusedAccessor, path: &str) -> Self { BottomUpWalker { acc, - dirs: VecDeque::from([ObjectEntry::new(path, ObjectMetadata::new(ObjectMode::DIR))]), + dirs: VecDeque::from([output::ObjectEntry::new( + path, + ObjectMetadata::new(ObjectMode::DIR), + )]), pagers: vec![], res: Vec::with_capacity(WALK_BUFFER_SIZE), } @@ -189,8 +196,8 @@ impl BottomUpWalker { } #[async_trait] -impl ObjectPage for BottomUpWalker { - async fn next_page(&mut self) -> Result>> { +impl output::ObjectPage for BottomUpWalker { + async fn next_page(&mut self) -> Result>> { loop { if let Some(de) = self.dirs.pop_back() { let (_, op) = self.acc.list(de.path(), OpList::default()).await?; diff --git a/src/raw/layer.rs b/src/raw/layer.rs index 81a55adb353..2e13e7a132d 100644 --- a/src/raw/layer.rs +++ b/src/raw/layer.rs @@ -130,7 +130,7 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 'static { self.inner().delete(path, args).await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { self.inner().list(path, args).await } @@ -194,7 +194,11 @@ pub trait LayeredAccessor: Send + Sync + Debug + Unpin + 'static { self.inner().blocking_delete(path, args) } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, BlockingObjectPager)> { + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> Result<(RpList, output::BlockingObjectPager)> { self.inner().blocking_list(path, args) } } @@ -228,7 +232,7 @@ impl Accessor for L { (self as &L).delete(path, args).await } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> { (self as &L).list(path, args).await } @@ -294,7 +298,11 @@ impl Accessor for L { (self as &L).blocking_delete(path, args) } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, BlockingObjectPager)> { + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> Result<(RpList, output::BlockingObjectPager)> { (self as &L).blocking_list(path, args) } } diff --git a/src/raw/mod.rs b/src/raw/mod.rs index bd30641dd14..48332a7c2a9 100644 --- a/src/raw/mod.rs +++ b/src/raw/mod.rs @@ -47,15 +47,6 @@ pub use path::normalize_path; pub use path::normalize_root; pub use path::validate_path; -mod object_entry; -pub use object_entry::ObjectEntry; - -mod object_page; -pub use object_page::BlockingObjectPage; -pub use object_page::BlockingObjectPager; -pub use object_page::ObjectPage; -pub use object_page::ObjectPager; - mod operation; pub use operation::Operation; diff --git a/src/services/azblob/backend.rs b/src/services/azblob/backend.rs index baf4f3a1e10..8fd62f0427c 100644 --- a/src/services/azblob/backend.rs +++ b/src/services/azblob/backend.rs @@ -515,14 +515,14 @@ impl Accessor for AzblobBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { let op = Box::new(DirStream::new( Arc::new(self.clone()), self.root.clone(), path.to_string(), )); - Ok((RpList::default(), op as ObjectPager)) + Ok((RpList::default(), op as output::ObjectPager)) } } diff --git a/src/services/azblob/dir_stream.rs b/src/services/azblob/dir_stream.rs index 316221986c0..e508e009536 100644 --- a/src/services/azblob/dir_stream.rs +++ b/src/services/azblob/dir_stream.rs @@ -48,8 +48,8 @@ impl DirStream { } #[async_trait] -impl ObjectPage for DirStream { - async fn next_page(&mut self) -> Result>> { +impl output::ObjectPage for DirStream { + async fn next_page(&mut self) -> Result>> { if self.done { return Ok(None); } @@ -81,7 +81,7 @@ impl ObjectPage for DirStream { let mut entries = Vec::with_capacity(prefixes.len() + output.blobs.blob.len()); for prefix in prefixes { - let de = ObjectEntry::new( + let de = output::ObjectEntry::new( &build_rel_path(&self.root, &prefix.name), ObjectMetadata::new(ObjectMode::DIR).with_complete(), ); @@ -115,7 +115,7 @@ impl ObjectPage for DirStream { ) .with_complete(); - let de = ObjectEntry::new(&build_rel_path(&self.root, &object.name), meta); + let de = output::ObjectEntry::new(&build_rel_path(&self.root, &object.name), meta); entries.push(de); } diff --git a/src/services/azdfs/backend.rs b/src/services/azdfs/backend.rs index b54b1b7ae11..bb275791fa1 100644 --- a/src/services/azdfs/backend.rs +++ b/src/services/azdfs/backend.rs @@ -420,14 +420,14 @@ impl Accessor for AzdfsBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { let op = Box::new(DirStream::new( Arc::new(self.clone()), self.root.clone(), path.to_string(), )); - Ok((RpList::default(), op as ObjectPager)) + Ok((RpList::default(), op as output::ObjectPager)) } } diff --git a/src/services/azdfs/dir_stream.rs b/src/services/azdfs/dir_stream.rs index 4a668b63bb9..dbda4a04a8b 100644 --- a/src/services/azdfs/dir_stream.rs +++ b/src/services/azdfs/dir_stream.rs @@ -47,8 +47,8 @@ impl DirStream { } #[async_trait] -impl ObjectPage for DirStream { - async fn next_page(&mut self) -> Result>> { +impl output::ObjectPage for DirStream { + async fn next_page(&mut self) -> Result>> { if self.done { return Ok(None); } @@ -118,7 +118,7 @@ impl ObjectPage for DirStream { path += "/" }; - let de = ObjectEntry::new(&path, meta); + let de = output::ObjectEntry::new(&path, meta); entries.push(de); } diff --git a/src/services/fs/backend.rs b/src/services/fs/backend.rs index d1bf4b97540..3afd3bf9a23 100644 --- a/src/services/fs/backend.rs +++ b/src/services/fs/backend.rs @@ -508,14 +508,14 @@ impl Accessor for FsBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { let p = self.root.join(path.trim_end_matches('/')); let f = match tokio::fs::read_dir(&p).await { Ok(rd) => rd, Err(e) => { return if e.kind() == io::ErrorKind::NotFound { - Ok((RpList::default(), Box::new(()) as ObjectPager)) + Ok((RpList::default(), Box::new(()) as output::ObjectPager)) } else { Err(parse_io_error(e)) }; @@ -712,14 +712,21 @@ impl Accessor for FsBackend { } } - fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, BlockingObjectPager)> { + fn blocking_list( + &self, + path: &str, + _: OpList, + ) -> Result<(RpList, output::BlockingObjectPager)> { let p = self.root.join(path.trim_end_matches('/')); let f = match std::fs::read_dir(p) { Ok(rd) => rd, Err(e) => { return if e.kind() == io::ErrorKind::NotFound { - Ok((RpList::default(), Box::new(()) as BlockingObjectPager)) + Ok(( + RpList::default(), + Box::new(()) as output::BlockingObjectPager, + )) } else { Err(parse_io_error(e)) }; diff --git a/src/services/fs/dir_stream.rs b/src/services/fs/dir_stream.rs index 80d63c89ea4..a0b691087e4 100644 --- a/src/services/fs/dir_stream.rs +++ b/src/services/fs/dir_stream.rs @@ -42,9 +42,9 @@ impl DirPager { } #[async_trait] -impl ObjectPage for DirPager { - async fn next_page(&mut self) -> Result>> { - let mut oes: Vec = Vec::with_capacity(self.size); +impl output::ObjectPage for DirPager { + async fn next_page(&mut self) -> Result>> { + let mut oes: Vec = Vec::with_capacity(self.size); for _ in 0..self.size { let de = match self.rd.next_entry().await.map_err(parse_io_error)? { @@ -68,15 +68,15 @@ impl ObjectPage for DirPager { let file_type = de.file_type().await.map_err(parse_io_error)?; let d = if file_type.is_file() { - ObjectEntry::new(&rel_path, ObjectMetadata::new(ObjectMode::FILE)) + output::ObjectEntry::new(&rel_path, ObjectMetadata::new(ObjectMode::FILE)) } else if file_type.is_dir() { // Make sure we are returning the correct path. - ObjectEntry::new( + output::ObjectEntry::new( &format!("{rel_path}/"), ObjectMetadata::new(ObjectMode::DIR).with_complete(), ) } else { - ObjectEntry::new(&rel_path, ObjectMetadata::new(ObjectMode::Unknown)) + output::ObjectEntry::new(&rel_path, ObjectMetadata::new(ObjectMode::Unknown)) }; oes.push(d) @@ -104,9 +104,9 @@ impl BlockingDirPager { } } -impl BlockingObjectPage for BlockingDirPager { - fn next_page(&mut self) -> Result>> { - let mut oes: Vec = Vec::with_capacity(self.size); +impl output::BlockingObjectPage for BlockingDirPager { + fn next_page(&mut self) -> Result>> { + let mut oes: Vec = Vec::with_capacity(self.size); for _ in 0..self.size { let de = match self.rd.next() { @@ -130,15 +130,15 @@ impl BlockingObjectPage for BlockingDirPager { let file_type = de.file_type().map_err(parse_io_error)?; let d = if file_type.is_file() { - ObjectEntry::new(&rel_path, ObjectMetadata::new(ObjectMode::FILE)) + output::ObjectEntry::new(&rel_path, ObjectMetadata::new(ObjectMode::FILE)) } else if file_type.is_dir() { // Make sure we are returning the correct path. - ObjectEntry::new( + output::ObjectEntry::new( &format!("{rel_path}/"), ObjectMetadata::new(ObjectMode::DIR).with_complete(), ) } else { - ObjectEntry::new(&rel_path, ObjectMetadata::new(ObjectMode::Unknown)) + output::ObjectEntry::new(&rel_path, ObjectMetadata::new(ObjectMode::Unknown)) }; oes.push(d) diff --git a/src/services/ftp/backend.rs b/src/services/ftp/backend.rs index e3868f4aef4..e786809b389 100644 --- a/src/services/ftp/backend.rs +++ b/src/services/ftp/backend.rs @@ -447,7 +447,7 @@ impl Accessor for FtpBackend { Ok(RpDelete::default()) } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { let mut ftp_stream = self.ftp_connect(Operation::List).await?; let pathname = if path == "/" { None } else { Some(path) }; @@ -457,7 +457,8 @@ impl Accessor for FtpBackend { Ok(( RpList::default(), - Box::new(DirStream::new(if path == "/" { "" } else { path }, rd)) as ObjectPager, + Box::new(DirStream::new(if path == "/" { "" } else { path }, rd)) + as output::ObjectPager, )) } } diff --git a/src/services/ftp/dir_stream.rs b/src/services/ftp/dir_stream.rs index 959a7446ed2..01d2d8fb97b 100644 --- a/src/services/ftp/dir_stream.rs +++ b/src/services/ftp/dir_stream.rs @@ -70,9 +70,9 @@ impl DirStream { } #[async_trait] -impl ObjectPage for DirStream { - async fn next_page(&mut self) -> Result>> { - let mut oes: Vec = Vec::with_capacity(self.size); +impl output::ObjectPage for DirStream { + async fn next_page(&mut self) -> Result>> { + let mut oes: Vec = Vec::with_capacity(self.size); for _ in 0..self.size { let de = match self.rd.next() { @@ -83,7 +83,7 @@ impl ObjectPage for DirStream { let path = self.path.to_string() + de.name(); let d = if de.is_file() { - ObjectEntry::new( + output::ObjectEntry::new( &path, ObjectMetadata::new(ObjectMode::FILE) .with_content_length(de.size() as u64) @@ -91,12 +91,12 @@ impl ObjectPage for DirStream { .with_complete(), ) } else if de.is_directory() { - ObjectEntry::new( + output::ObjectEntry::new( &format!("{}/", &path), ObjectMetadata::new(ObjectMode::DIR).with_complete(), ) } else { - ObjectEntry::new( + output::ObjectEntry::new( &path, ObjectMetadata::new(ObjectMode::Unknown).with_complete(), ) diff --git a/src/services/gcs/backend.rs b/src/services/gcs/backend.rs index a05fe4bc997..d276c15ce37 100644 --- a/src/services/gcs/backend.rs +++ b/src/services/gcs/backend.rs @@ -462,10 +462,11 @@ impl Accessor for GcsBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { Ok(( RpList::default(), - Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) as ObjectPager, + Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) + as output::ObjectPager, )) } } diff --git a/src/services/gcs/dir_stream.rs b/src/services/gcs/dir_stream.rs index 02949dc8ad8..fff30a3f10f 100644 --- a/src/services/gcs/dir_stream.rs +++ b/src/services/gcs/dir_stream.rs @@ -52,8 +52,8 @@ impl DirStream { } #[async_trait] -impl ObjectPage for DirStream { - async fn next_page(&mut self) -> Result>> { +impl output::ObjectPage for DirStream { + async fn next_page(&mut self) -> Result>> { if self.done { return Ok(None); } @@ -80,7 +80,7 @@ impl ObjectPage for DirStream { let mut entries = Vec::with_capacity(output.prefixes.len() + output.items.len()); for prefix in output.prefixes { - let de = ObjectEntry::new( + let de = output::ObjectEntry::new( &build_rel_path(&self.root, &prefix), ObjectMetadata::new(ObjectMode::DIR).with_complete(), ); @@ -113,7 +113,7 @@ impl ObjectPage for DirStream { meta.set_last_modified(dt); meta.set_complete(); - let de = ObjectEntry::new(&build_rel_path(&self.root, &object.name), meta); + let de = output::ObjectEntry::new(&build_rel_path(&self.root, &object.name), meta); entries.push(de); } diff --git a/src/services/hdfs/backend.rs b/src/services/hdfs/backend.rs index 509dcd4e3f7..9142b6a94c1 100644 --- a/src/services/hdfs/backend.rs +++ b/src/services/hdfs/backend.rs @@ -390,14 +390,14 @@ impl Accessor for HdfsBackend { Ok(RpDelete::default()) } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { let p = build_rooted_abs_path(&self.root, path); let f = match self.client.read_dir(&p) { Ok(f) => f, Err(e) => { return if e.kind() == io::ErrorKind::NotFound { - Ok((RpList::default(), Box::new(()) as ObjectPager)) + Ok((RpList::default(), Box::new(()) as output::ObjectPager)) } else { Err(parse_io_error(e)) } @@ -564,14 +564,21 @@ impl Accessor for HdfsBackend { Ok(RpDelete::default()) } - fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, BlockingObjectPager)> { + fn blocking_list( + &self, + path: &str, + _: OpList, + ) -> Result<(RpList, output::BlockingObjectPager)> { let p = build_rooted_abs_path(&self.root, path); let f = match self.client.read_dir(&p) { Ok(f) => f, Err(e) => { return if e.kind() == io::ErrorKind::NotFound { - Ok((RpList::default(), Box::new(()) as BlockingObjectPager)) + Ok(( + RpList::default(), + Box::new(()) as output::BlockingObjectPager, + )) } else { Err(parse_io_error(e)) } diff --git a/src/services/hdfs/dir_stream.rs b/src/services/hdfs/dir_stream.rs index f345ffd7414..82274dd45c5 100644 --- a/src/services/hdfs/dir_stream.rs +++ b/src/services/hdfs/dir_stream.rs @@ -39,9 +39,9 @@ impl DirStream { } #[async_trait] -impl ObjectPage for DirStream { - async fn next_page(&mut self) -> Result>> { - let mut oes: Vec = Vec::with_capacity(self.size); +impl output::ObjectPage for DirStream { + async fn next_page(&mut self) -> Result>> { + let mut oes: Vec = Vec::with_capacity(self.size); for _ in 0..self.size { let de = match self.rd.next() { @@ -55,15 +55,15 @@ impl ObjectPage for DirStream { let meta = ObjectMetadata::new(ObjectMode::FILE) .with_content_length(de.len()) .with_last_modified(time::OffsetDateTime::from(de.modified())); - ObjectEntry::new(&path, meta) + output::ObjectEntry::new(&path, meta) } else if de.is_dir() { // Make sure we are returning the correct path. - ObjectEntry::new( + output::ObjectEntry::new( &format!("{path}/"), ObjectMetadata::new(ObjectMode::DIR).with_complete(), ) } else { - ObjectEntry::new(&path, ObjectMetadata::new(ObjectMode::Unknown)) + output::ObjectEntry::new(&path, ObjectMetadata::new(ObjectMode::Unknown)) }; oes.push(d) @@ -73,9 +73,9 @@ impl ObjectPage for DirStream { } } -impl BlockingObjectPage for DirStream { - fn next_page(&mut self) -> Result>> { - let mut oes: Vec = Vec::with_capacity(self.size); +impl output::BlockingObjectPage for DirStream { + fn next_page(&mut self) -> Result>> { + let mut oes: Vec = Vec::with_capacity(self.size); for _ in 0..self.size { let de = match self.rd.next() { @@ -89,15 +89,15 @@ impl BlockingObjectPage for DirStream { let meta = ObjectMetadata::new(ObjectMode::FILE) .with_content_length(de.len()) .with_last_modified(time::OffsetDateTime::from(de.modified())); - ObjectEntry::new(&path, meta) + output::ObjectEntry::new(&path, meta) } else if de.is_dir() { // Make sure we are returning the correct path. - ObjectEntry::new( + output::ObjectEntry::new( &format!("{path}/"), ObjectMetadata::new(ObjectMode::DIR).with_complete(), ) } else { - ObjectEntry::new(&path, ObjectMetadata::new(ObjectMode::Unknown)) + output::ObjectEntry::new(&path, ObjectMetadata::new(ObjectMode::Unknown)) }; oes.push(d) diff --git a/src/services/ipfs/backend.rs b/src/services/ipfs/backend.rs index ce7b7c9a2c7..601fabe9797 100644 --- a/src/services/ipfs/backend.rs +++ b/src/services/ipfs/backend.rs @@ -381,10 +381,10 @@ impl Accessor for IpfsBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { Ok(( RpList::default(), - Box::new(DirStream::new(Arc::new(self.clone()), path)) as ObjectPager, + Box::new(DirStream::new(Arc::new(self.clone()), path)) as output::ObjectPager, )) } } @@ -460,8 +460,8 @@ impl DirStream { } #[async_trait] -impl ObjectPage for DirStream { - async fn next_page(&mut self) -> Result>> { +impl output::ObjectPage for DirStream { + async fn next_page(&mut self) -> Result>> { if self.consumed { return Ok(None); } @@ -496,7 +496,7 @@ impl ObjectPage for DirStream { name += "/"; } - oes.push(ObjectEntry::new(&name, meta.with_complete())) + oes.push(output::ObjectEntry::new(&name, meta.with_complete())) } self.consumed = true; diff --git a/src/services/ipmfs/backend.rs b/src/services/ipmfs/backend.rs index 9349ada58b6..473f5fb8559 100644 --- a/src/services/ipmfs/backend.rs +++ b/src/services/ipmfs/backend.rs @@ -168,10 +168,11 @@ impl Accessor for IpmfsBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { Ok(( RpList::default(), - Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) as ObjectPager, + Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) + as output::ObjectPager, )) } } diff --git a/src/services/ipmfs/dir_stream.rs b/src/services/ipmfs/dir_stream.rs index 750072e3073..0a6abd7c6e7 100644 --- a/src/services/ipmfs/dir_stream.rs +++ b/src/services/ipmfs/dir_stream.rs @@ -45,8 +45,8 @@ impl DirStream { } #[async_trait] -impl ObjectPage for DirStream { - async fn next_page(&mut self) -> Result>> { +impl output::ObjectPage for DirStream { + async fn next_page(&mut self) -> Result>> { if self.consumed { return Ok(None); } @@ -82,7 +82,7 @@ impl ObjectPage for DirStream { let path = build_rel_path(&self.root, &path); - ObjectEntry::new( + output::ObjectEntry::new( &path, ObjectMetadata::new(object.mode()) .with_content_length(object.size) diff --git a/src/services/obs/backend.rs b/src/services/obs/backend.rs index 58b7b2cc93c..13cbca6fa5c 100644 --- a/src/services/obs/backend.rs +++ b/src/services/obs/backend.rs @@ -406,10 +406,11 @@ impl Accessor for ObsBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { Ok(( RpList::default(), - Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) as ObjectPager, + Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) + as output::ObjectPager, )) } } diff --git a/src/services/obs/dir_stream.rs b/src/services/obs/dir_stream.rs index 247c5fad153..a781f3e2464 100644 --- a/src/services/obs/dir_stream.rs +++ b/src/services/obs/dir_stream.rs @@ -50,8 +50,8 @@ impl DirStream { } #[async_trait] -impl ObjectPage for DirStream { - async fn next_page(&mut self) -> Result>> { +impl output::ObjectPage for DirStream { + async fn next_page(&mut self) -> Result>> { if self.done { return Ok(None); } @@ -83,7 +83,7 @@ impl ObjectPage for DirStream { let mut entries = Vec::with_capacity(common_prefixes.len() + output.contents.len()); for prefix in common_prefixes { - let de = ObjectEntry::new( + let de = output::ObjectEntry::new( &build_rel_path(&self.root, &prefix.prefix), ObjectMetadata::new(ObjectMode::DIR).with_complete(), ); @@ -98,7 +98,7 @@ impl ObjectPage for DirStream { let meta = ObjectMetadata::new(ObjectMode::FILE).with_content_length(object.size); - let de = ObjectEntry::new(&build_rel_path(&self.root, &object.key), meta); + let de = output::ObjectEntry::new(&build_rel_path(&self.root, &object.key), meta); entries.push(de); } diff --git a/src/services/oss/backend.rs b/src/services/oss/backend.rs index d0e22c03e9e..f9e7e4744e5 100644 --- a/src/services/oss/backend.rs +++ b/src/services/oss/backend.rs @@ -485,10 +485,11 @@ impl Accessor for OssBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { Ok(( RpList::default(), - Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) as ObjectPager, + Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) + as output::ObjectPager, )) } diff --git a/src/services/oss/dir_stream.rs b/src/services/oss/dir_stream.rs index b33f4103361..607013cb175 100644 --- a/src/services/oss/dir_stream.rs +++ b/src/services/oss/dir_stream.rs @@ -56,8 +56,8 @@ impl DirStream { } #[async_trait] -impl ObjectPage for DirStream { - async fn next_page(&mut self) -> Result>> { +impl output::ObjectPage for DirStream { + async fn next_page(&mut self) -> Result>> { if self.done { return Ok(None); } @@ -82,7 +82,7 @@ impl ObjectPage for DirStream { let mut entries = Vec::with_capacity(output.common_prefixes.len() + output.contents.len()); for prefix in output.common_prefixes { - let de = ObjectEntry::new( + let de = output::ObjectEntry::new( &build_rel_path(&self.root, &prefix.prefix), ObjectMetadata::new(ObjectMode::DIR).with_complete(), ); @@ -111,7 +111,7 @@ impl ObjectPage for DirStream { let rel = build_rel_path(&self.root, &object.key); let path = unescape(&rel) .map_err(|e| Error::new(ErrorKind::Unexpected, "excapse xml").set_source(e))?; - let de = ObjectEntry::new(&path, meta); + let de = output::ObjectEntry::new(&path, meta); entries.push(de); } diff --git a/src/services/s3/backend.rs b/src/services/s3/backend.rs index e19681d14de..3b3d9b8313b 100644 --- a/src/services/s3/backend.rs +++ b/src/services/s3/backend.rs @@ -1210,10 +1210,11 @@ impl Accessor for S3Backend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { Ok(( RpList::default(), - Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) as ObjectPager, + Box::new(DirStream::new(Arc::new(self.clone()), &self.root, path)) + as output::ObjectPager, )) } diff --git a/src/services/s3/dir_stream.rs b/src/services/s3/dir_stream.rs index 25415a9252a..7e578efb18e 100644 --- a/src/services/s3/dir_stream.rs +++ b/src/services/s3/dir_stream.rs @@ -54,8 +54,8 @@ impl DirStream { } #[async_trait] -impl ObjectPage for DirStream { - async fn next_page(&mut self) -> Result>> { +impl output::ObjectPage for DirStream { + async fn next_page(&mut self) -> Result>> { if self.done { return Ok(None); } @@ -90,7 +90,7 @@ impl ObjectPage for DirStream { let mut entries = Vec::with_capacity(output.common_prefixes.len() + output.contents.len()); for prefix in output.common_prefixes { - let de = ObjectEntry::new( + let de = output::ObjectEntry::new( &build_rel_path(&self.root, &prefix.prefix), ObjectMetadata::new(ObjectMode::DIR).with_complete(), ); @@ -128,7 +128,7 @@ impl ObjectPage for DirStream { })?; meta.set_last_modified(dt); - let de = ObjectEntry::new(&build_rel_path(&self.root, &object.key), meta); + let de = output::ObjectEntry::new(&build_rel_path(&self.root, &object.key), meta); entries.push(de); } diff --git a/src/services/webhdfs/backend.rs b/src/services/webhdfs/backend.rs index fbae6727c3b..816d51e77ab 100644 --- a/src/services/webhdfs/backend.rs +++ b/src/services/webhdfs/backend.rs @@ -672,7 +672,7 @@ impl Accessor for WebhdfsBackend { } } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, ObjectPager)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> { let path = path.trim_end_matches('/'); let req = self.webhdfs_list_status_req(path)?; @@ -691,11 +691,17 @@ impl Accessor for WebhdfsBackend { .file_status; let objects = DirStream::new(path, file_statuses); - Ok((RpList::default(), Box::new(objects) as Box)) + Ok(( + RpList::default(), + Box::new(objects) as Box, + )) } StatusCode::NOT_FOUND => { let objects = DirStream::new(path, vec![]); - Ok((RpList::default(), Box::new(objects) as Box)) + Ok(( + RpList::default(), + Box::new(objects) as Box, + )) } _ => Err(parse_error(resp).await?), } diff --git a/src/services/webhdfs/dir_stream.rs b/src/services/webhdfs/dir_stream.rs index 82999810b27..38f4dbeb802 100644 --- a/src/services/webhdfs/dir_stream.rs +++ b/src/services/webhdfs/dir_stream.rs @@ -33,8 +33,8 @@ impl DirStream { } #[async_trait] -impl ObjectPage for DirStream { - async fn next_page(&mut self) -> Result>> { +impl output::ObjectPage for DirStream { + async fn next_page(&mut self) -> Result>> { if self.statuses.is_empty() { return Ok(None); } @@ -48,7 +48,7 @@ impl ObjectPage for DirStream { if meta.mode().is_dir() { path += "/" } - let entry = ObjectEntry::new(&path, meta); + let entry = output::ObjectEntry::new(&path, meta); entries.push(entry); } diff --git a/src/services/webhdfs/message.rs b/src/services/webhdfs/message.rs index d1f4891e3e7..559e3ac2799 100644 --- a/src/services/webhdfs/message.rs +++ b/src/services/webhdfs/message.rs @@ -85,7 +85,7 @@ pub(super) enum FileStatusType { #[cfg(test)] mod test { use super::*; - use crate::raw::ObjectPage; + use crate::raw::output::ObjectPage; use crate::services::webhdfs::dir_stream::DirStream; use crate::ObjectMode;