Skip to content

Commit

Permalink
Move object entry
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Feb 9, 2023
1 parent 6c0edf6 commit 90e4fe4
Show file tree
Hide file tree
Showing 42 changed files with 297 additions and 206 deletions.
2 changes: 1 addition & 1 deletion src/docs/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ Since v0.21, `Accessor` will return `ObjectPager` for `List`:

```diff
- async fn list(&self, path: &str, args: OpList) -> Result<ObjectStreamer>
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)>
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)>
```

And `Object` will return an `ObjectLister` which is built upon `ObjectPage`:
Expand Down
29 changes: 17 additions & 12 deletions src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
self.inner.delete(path, args).await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)> {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> {
let permit = self
.semaphore
.clone()
Expand All @@ -146,7 +146,7 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
self.inner.list(path, args).await.map(|(rp, s)| {
(
rp,
Box::new(ConcurrentLimitPager::new(s, permit)) as ObjectPager,
Box::new(ConcurrentLimitPager::new(s, permit)) as output::ObjectPager,
)
})
}
Expand Down Expand Up @@ -261,7 +261,11 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
self.inner.blocking_delete(path, args)
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, BlockingObjectPager)> {
fn blocking_list(
&self,
path: &str,
args: OpList,
) -> Result<(RpList, output::BlockingObjectPager)> {
let permit = self
.semaphore
.clone()
Expand All @@ -271,7 +275,8 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
self.inner.blocking_list(path, args).map(|(rp, it)| {
(
rp,
Box::new(BlockingConcurrentLimitPager::new(it, permit)) as BlockingObjectPager,
Box::new(BlockingConcurrentLimitPager::new(it, permit))
as output::BlockingObjectPager,
)
})
}
Expand Down Expand Up @@ -322,14 +327,14 @@ impl<R: output::BlockingRead> output::BlockingRead for ConcurrentLimitReader<R>
}

struct ConcurrentLimitPager {
inner: ObjectPager,
inner: output::ObjectPager,

// Hold on this permit until this streamer has been dropped.
_permit: OwnedSemaphorePermit,
}

impl ConcurrentLimitPager {
fn new(inner: ObjectPager, permit: OwnedSemaphorePermit) -> Self {
fn new(inner: output::ObjectPager, permit: OwnedSemaphorePermit) -> Self {
Self {
inner,
_permit: permit,
Expand All @@ -338,30 +343,30 @@ impl ConcurrentLimitPager {
}

#[async_trait]
impl ObjectPage for ConcurrentLimitPager {
async fn next_page(&mut self) -> Result<Option<Vec<ObjectEntry>>> {
impl output::ObjectPage for ConcurrentLimitPager {
async fn next_page(&mut self) -> Result<Option<Vec<output::ObjectEntry>>> {
self.inner.next_page().await
}
}

struct BlockingConcurrentLimitPager {
inner: BlockingObjectPager,
inner: output::BlockingObjectPager,

// Hold on this permit until this iterator has been dropped.
_permit: OwnedSemaphorePermit,
}

impl BlockingConcurrentLimitPager {
fn new(inner: BlockingObjectPager, permit: OwnedSemaphorePermit) -> Self {
fn new(inner: output::BlockingObjectPager, permit: OwnedSemaphorePermit) -> Self {
Self {
inner,
_permit: permit,
}
}
}

impl BlockingObjectPage for BlockingConcurrentLimitPager {
fn next_page(&mut self) -> Result<Option<Vec<ObjectEntry>>> {
impl output::BlockingObjectPage for BlockingConcurrentLimitPager {
fn next_page(&mut self) -> Result<Option<Vec<output::ObjectEntry>>> {
self.inner.next_page()
}
}
16 changes: 10 additions & 6 deletions src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
.await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)> {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> {
self.inner
.list(path, args)
.map_ok(|(rp, os)| {
Expand All @@ -134,7 +134,7 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
//
// ref: https://github.com/rust-lang/rust/issues/80437
inner: os,
}) as ObjectPager,
}) as output::ObjectPager,
)
})
.map_err(|err| {
Expand Down Expand Up @@ -259,7 +259,11 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
})
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, BlockingObjectPager)> {
fn blocking_list(
&self,
path: &str,
args: OpList,
) -> Result<(RpList, output::BlockingObjectPager)> {
self.inner.blocking_list(path, args).map_err(|err| {
err.with_operation(Operation::BlockingList.into_static())
.with_context("service", self.meta.scheme())
Expand All @@ -268,15 +272,15 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
}
}

struct ObjectStreamErrorContextWrapper<T: ObjectPage> {
struct ObjectStreamErrorContextWrapper<T: output::ObjectPage> {
scheme: Scheme,
path: String,
inner: T,
}

#[async_trait::async_trait]
impl<T: ObjectPage> ObjectPage for ObjectStreamErrorContextWrapper<T> {
async fn next_page(&mut self) -> Result<Option<Vec<ObjectEntry>>> {
impl<T: output::ObjectPage> output::ObjectPage for ObjectStreamErrorContextWrapper<T> {
async fn next_page(&mut self) -> Result<Option<Vec<output::ObjectEntry>>> {
self.inner.next_page().await.map_err(|err| {
err.with_operation("ObjectPage::next_page")
.with_context("service", self.scheme)
Expand Down
24 changes: 14 additions & 10 deletions src/layers/immutable_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,31 +146,35 @@ impl<A: Accessor> LayeredAccessor for ImmutableIndexAccessor<A> {
self.inner.read(path, args).await
}

async fn list(&self, path: &str, _: OpList) -> Result<(RpList, ObjectPager)> {
async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> {
let mut path = path;
if path == "/" {
path = ""
}

Ok((
RpList::default(),
Box::new(ImmutableDir::new(self.children(path))) as ObjectPager,
Box::new(ImmutableDir::new(self.children(path))) as output::ObjectPager,
))
}

fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
self.inner.blocking_read(path, args)
}

fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, BlockingObjectPager)> {
fn blocking_list(
&self,
path: &str,
_: OpList,
) -> Result<(RpList, output::BlockingObjectPager)> {
let mut path = path;
if path == "/" {
path = ""
}

Ok((
RpList::default(),
Box::new(ImmutableDir::new(self.children(path))) as BlockingObjectPager,
Box::new(ImmutableDir::new(self.children(path))) as output::BlockingObjectPager,
))
}
}
Expand All @@ -184,7 +188,7 @@ impl ImmutableDir {
Self { idx }
}

fn inner_next_page(&mut self) -> Option<Vec<ObjectEntry>> {
fn inner_next_page(&mut self) -> Option<Vec<output::ObjectEntry>> {
if self.idx.is_empty() {
return None;
}
Expand All @@ -200,22 +204,22 @@ impl ImmutableDir {
ObjectMode::FILE
};
let meta = ObjectMetadata::new(mode);
ObjectEntry::with(v, meta)
output::ObjectEntry::with(v, meta)
})
.collect(),
)
}
}

#[async_trait]
impl ObjectPage for ImmutableDir {
async fn next_page(&mut self) -> Result<Option<Vec<ObjectEntry>>> {
impl output::ObjectPage for ImmutableDir {
async fn next_page(&mut self) -> Result<Option<Vec<output::ObjectEntry>>> {
Ok(self.inner_next_page())
}
}

impl BlockingObjectPage for ImmutableDir {
fn next_page(&mut self) -> Result<Option<Vec<ObjectEntry>>> {
impl output::BlockingObjectPage for ImmutableDir {
fn next_page(&mut self) -> Result<Option<Vec<output::ObjectEntry>>> {
Ok(self.inner_next_page())
}
}
Expand Down
28 changes: 16 additions & 12 deletions src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
.await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)> {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} -> started",
Expand All @@ -439,7 +439,7 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
self.error_level,
self.failure_level,
);
Ok((rp, Box::new(streamer) as ObjectPager))
Ok((rp, Box::new(streamer) as output::ObjectPager))
}
Err(err) => {
if let Some(lvl) = self.err_level(&err) {
Expand Down Expand Up @@ -896,7 +896,11 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
})
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, BlockingObjectPager)> {
fn blocking_list(
&self,
path: &str,
args: OpList,
) -> Result<(RpList, output::BlockingObjectPager)> {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} -> started",
Expand All @@ -922,7 +926,7 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
self.error_level,
self.failure_level,
);
(rp, Box::new(li) as BlockingObjectPager)
(rp, Box::new(li) as output::BlockingObjectPager)
})
.map_err(|err| {
if let Some(lvl) = self.err_level(&err) {
Expand Down Expand Up @@ -1269,7 +1273,7 @@ struct LoggingPager {
scheme: Scheme,
path: String,
finished: bool,
inner: ObjectPager,
inner: output::ObjectPager,
error_level: Option<Level>,
failure_level: Option<Level>,
}
Expand All @@ -1278,7 +1282,7 @@ impl LoggingPager {
fn new(
scheme: Scheme,
path: &str,
inner: ObjectPager,
inner: output::ObjectPager,
error_level: Option<Level>,
failure_level: Option<Level>,
) -> Self {
Expand Down Expand Up @@ -1336,8 +1340,8 @@ impl LoggingPager {
}

#[async_trait]
impl ObjectPage for LoggingPager {
async fn next_page(&mut self) -> Result<Option<Vec<ObjectEntry>>> {
impl output::ObjectPage for LoggingPager {
async fn next_page(&mut self) -> Result<Option<Vec<output::ObjectEntry>>> {
let res = self.inner.next_page().await;

match &res {
Expand Down Expand Up @@ -1384,7 +1388,7 @@ struct BlockingLoggingPager {
scheme: Scheme,
path: String,
finished: bool,
inner: BlockingObjectPager,
inner: output::BlockingObjectPager,
error_level: Option<Level>,
failure_level: Option<Level>,
}
Expand All @@ -1393,7 +1397,7 @@ impl BlockingLoggingPager {
fn new(
scheme: Scheme,
path: &str,
inner: BlockingObjectPager,
inner: output::BlockingObjectPager,
error_level: Option<Level>,
failure_level: Option<Level>,
) -> Self {
Expand Down Expand Up @@ -1450,8 +1454,8 @@ impl BlockingLoggingPager {
}
}

impl BlockingObjectPage for BlockingLoggingPager {
fn next_page(&mut self) -> Result<Option<Vec<ObjectEntry>>> {
impl output::BlockingObjectPage for BlockingLoggingPager {
fn next_page(&mut self) -> Result<Option<Vec<output::ObjectEntry>>> {
let res = self.inner.next_page();

match &res {
Expand Down
8 changes: 6 additions & 2 deletions src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ impl<A: Accessor> LayeredAccessor for MetricsAccessor<A> {
.await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, ObjectPager)> {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> {
self.handle.requests_total_list.increment(1);

let start = Instant::now();
Expand Down Expand Up @@ -851,7 +851,11 @@ impl<A: Accessor> LayeredAccessor for MetricsAccessor<A> {
})
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, BlockingObjectPager)> {
fn blocking_list(
&self,
path: &str,
args: OpList,
) -> Result<(RpList, output::BlockingObjectPager)> {
self.handle.requests_total_blocking_list.increment(1);

let start = Instant::now();
Expand Down
Loading

0 comments on commit 90e4fe4

Please sign in to comment.