Skip to content

Commit

Permalink
Rename Pager
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Feb 9, 2023
1 parent 4d8f277 commit ef8daf4
Show file tree
Hide file tree
Showing 39 changed files with 138 additions and 208 deletions.
8 changes: 4 additions & 4 deletions src/docs/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,16 @@ In v0.21, we refactor the whole `Accessor`'s API:

Since v0.21, we will return a reply struct for different operations called `RpWrite` instead of an exact type. We can split OpenDAL's public API and raw API with this change.

## ObjectList and ObjectPage
## ObjectList and Page

Since v0.21, `Accessor` will return `ObjectPager` for `List`:
Since v0.21, `Accessor` will return `Pager` for `List`:

```diff
- async fn list(&self, path: &str, args: OpList) -> Result<ObjectStreamer>
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)>
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)>
```

And `Object` will return an `ObjectLister` which is built upon `ObjectPage`:
And `Object` will return an `ObjectLister` which is built upon `Page`:

```rust
pub async fn list(&self) -> Result<ObjectLister> { ... }
Expand Down
25 changes: 10 additions & 15 deletions src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
self.inner.delete(path, args).await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)> {
let permit = self
.semaphore
.clone()
Expand All @@ -146,7 +146,7 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
self.inner.list(path, args).await.map(|(rp, s)| {
(
rp,
Box::new(ConcurrentLimitPager::new(s, permit)) as output::ObjectPager,
Box::new(ConcurrentLimitPager::new(s, permit)) as output::Pager,
)
})
}
Expand Down Expand Up @@ -261,11 +261,7 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
self.inner.blocking_delete(path, args)
}

fn blocking_list(
&self,
path: &str,
args: OpList,
) -> Result<(RpList, output::BlockingObjectPager)> {
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, output::BlockingPager)> {
let permit = self
.semaphore
.clone()
Expand All @@ -275,8 +271,7 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
self.inner.blocking_list(path, args).map(|(rp, it)| {
(
rp,
Box::new(BlockingConcurrentLimitPager::new(it, permit))
as output::BlockingObjectPager,
Box::new(BlockingConcurrentLimitPager::new(it, permit)) as output::BlockingPager,
)
})
}
Expand Down Expand Up @@ -327,14 +322,14 @@ impl<R: output::BlockingRead> output::BlockingRead for ConcurrentLimitReader<R>
}

struct ConcurrentLimitPager {
inner: output::ObjectPager,
inner: output::Pager,

// Hold on this permit until this streamer has been dropped.
_permit: OwnedSemaphorePermit,
}

impl ConcurrentLimitPager {
fn new(inner: output::ObjectPager, permit: OwnedSemaphorePermit) -> Self {
fn new(inner: output::Pager, permit: OwnedSemaphorePermit) -> Self {
Self {
inner,
_permit: permit,
Expand All @@ -343,29 +338,29 @@ impl ConcurrentLimitPager {
}

#[async_trait]
impl output::ObjectPage for ConcurrentLimitPager {
impl output::Page for ConcurrentLimitPager {
async fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
self.inner.next_page().await
}
}

struct BlockingConcurrentLimitPager {
inner: output::BlockingObjectPager,
inner: output::BlockingPager,

// Hold on this permit until this iterator has been dropped.
_permit: OwnedSemaphorePermit,
}

impl BlockingConcurrentLimitPager {
fn new(inner: output::BlockingObjectPager, permit: OwnedSemaphorePermit) -> Self {
fn new(inner: output::BlockingPager, permit: OwnedSemaphorePermit) -> Self {
Self {
inner,
_permit: permit,
}
}
}

impl output::BlockingObjectPage for BlockingConcurrentLimitPager {
impl output::BlockingPage for BlockingConcurrentLimitPager {
fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
self.inner.next_page()
}
Expand Down
16 changes: 6 additions & 10 deletions src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
.await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)> {
self.inner
.list(path, args)
.map_ok(|(rp, os)| {
Expand All @@ -134,7 +134,7 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
//
// ref: https://github.com/rust-lang/rust/issues/80437
inner: os,
}) as output::ObjectPager,
}) as output::Pager,
)
})
.map_err(|err| {
Expand Down Expand Up @@ -259,11 +259,7 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
})
}

fn blocking_list(
&self,
path: &str,
args: OpList,
) -> Result<(RpList, output::BlockingObjectPager)> {
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, output::BlockingPager)> {
self.inner.blocking_list(path, args).map_err(|err| {
err.with_operation(Operation::BlockingList.into_static())
.with_context("service", self.meta.scheme())
Expand All @@ -272,17 +268,17 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
}
}

struct ObjectStreamErrorContextWrapper<T: output::ObjectPage> {
struct ObjectStreamErrorContextWrapper<T: output::Page> {
scheme: Scheme,
path: String,
inner: T,
}

#[async_trait::async_trait]
impl<T: output::ObjectPage> output::ObjectPage for ObjectStreamErrorContextWrapper<T> {
impl<T: output::Page> output::Page for ObjectStreamErrorContextWrapper<T> {
async fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
self.inner.next_page().await.map_err(|err| {
err.with_operation("ObjectPage::next_page")
err.with_operation("Page::next_page")
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
Expand Down
16 changes: 6 additions & 10 deletions src/layers/immutable_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,35 +146,31 @@ impl<A: Accessor> LayeredAccessor for ImmutableIndexAccessor<A> {
self.inner.read(path, args).await
}

async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::ObjectPager)> {
async fn list(&self, path: &str, _: OpList) -> Result<(RpList, output::Pager)> {
let mut path = path;
if path == "/" {
path = ""
}

Ok((
RpList::default(),
Box::new(ImmutableDir::new(self.children(path))) as output::ObjectPager,
Box::new(ImmutableDir::new(self.children(path))) as output::Pager,
))
}

fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
self.inner.blocking_read(path, args)
}

fn blocking_list(
&self,
path: &str,
_: OpList,
) -> Result<(RpList, output::BlockingObjectPager)> {
fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, output::BlockingPager)> {
let mut path = path;
if path == "/" {
path = ""
}

Ok((
RpList::default(),
Box::new(ImmutableDir::new(self.children(path))) as output::BlockingObjectPager,
Box::new(ImmutableDir::new(self.children(path))) as output::BlockingPager,
))
}
}
Expand Down Expand Up @@ -212,13 +208,13 @@ impl ImmutableDir {
}

#[async_trait]
impl output::ObjectPage for ImmutableDir {
impl output::Page for ImmutableDir {
async fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
Ok(self.inner_next_page())
}
}

impl output::BlockingObjectPage for ImmutableDir {
impl output::BlockingPage for ImmutableDir {
fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
Ok(self.inner_next_page())
}
Expand Down
24 changes: 10 additions & 14 deletions src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
.await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)> {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} -> started",
Expand All @@ -439,7 +439,7 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
self.error_level,
self.failure_level,
);
Ok((rp, Box::new(streamer) as output::ObjectPager))
Ok((rp, Box::new(streamer) as output::Pager))
}
Err(err) => {
if let Some(lvl) = self.err_level(&err) {
Expand Down Expand Up @@ -896,11 +896,7 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
})
}

fn blocking_list(
&self,
path: &str,
args: OpList,
) -> Result<(RpList, output::BlockingObjectPager)> {
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, output::BlockingPager)> {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} -> started",
Expand All @@ -926,7 +922,7 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
self.error_level,
self.failure_level,
);
(rp, Box::new(li) as output::BlockingObjectPager)
(rp, Box::new(li) as output::BlockingPager)
})
.map_err(|err| {
if let Some(lvl) = self.err_level(&err) {
Expand Down Expand Up @@ -1273,7 +1269,7 @@ struct LoggingPager {
scheme: Scheme,
path: String,
finished: bool,
inner: output::ObjectPager,
inner: output::Pager,
error_level: Option<Level>,
failure_level: Option<Level>,
}
Expand All @@ -1282,7 +1278,7 @@ impl LoggingPager {
fn new(
scheme: Scheme,
path: &str,
inner: output::ObjectPager,
inner: output::Pager,
error_level: Option<Level>,
failure_level: Option<Level>,
) -> Self {
Expand Down Expand Up @@ -1340,7 +1336,7 @@ impl LoggingPager {
}

#[async_trait]
impl output::ObjectPage for LoggingPager {
impl output::Page for LoggingPager {
async fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
let res = self.inner.next_page().await;

Expand Down Expand Up @@ -1388,7 +1384,7 @@ struct BlockingLoggingPager {
scheme: Scheme,
path: String,
finished: bool,
inner: output::BlockingObjectPager,
inner: output::BlockingPager,
error_level: Option<Level>,
failure_level: Option<Level>,
}
Expand All @@ -1397,7 +1393,7 @@ impl BlockingLoggingPager {
fn new(
scheme: Scheme,
path: &str,
inner: output::BlockingObjectPager,
inner: output::BlockingPager,
error_level: Option<Level>,
failure_level: Option<Level>,
) -> Self {
Expand Down Expand Up @@ -1454,7 +1450,7 @@ impl BlockingLoggingPager {
}
}

impl output::BlockingObjectPage for BlockingLoggingPager {
impl output::BlockingPage for BlockingLoggingPager {
fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
let res = self.inner.next_page();

Expand Down
8 changes: 2 additions & 6 deletions src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ impl<A: Accessor> LayeredAccessor for MetricsAccessor<A> {
.await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::ObjectPager)> {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, output::Pager)> {
self.handle.requests_total_list.increment(1);

let start = Instant::now();
Expand Down Expand Up @@ -851,11 +851,7 @@ impl<A: Accessor> LayeredAccessor for MetricsAccessor<A> {
})
}

fn blocking_list(
&self,
path: &str,
args: OpList,
) -> Result<(RpList, output::BlockingObjectPager)> {
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, output::BlockingPager)> {
self.handle.requests_total_blocking_list.increment(1);

let start = Instant::now();
Expand Down
Loading

1 comment on commit ef8daf4

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for opendal ready!

✅ Preview
https://opendal-9wc411ioy-databend.vercel.app
https://opendal-git-strong-type-page.vercel.app

Built with commit ef8daf4.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.