Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Cleanup pager related implementation #1439

Merged
merged 5 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/docs/internals/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
//! use AccessorCapability::*;
//!
//! let mut am = AccessorMetadata::default();
//! am.set_capabilities(Read | Write | List | Scan | Presign | Multipart | Batch);
//! am.set_capabilities(Read | Write | List | Scan | Presign | Batch);
//!
//! am
//! }
Expand Down
16 changes: 8 additions & 8 deletions src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,13 +480,13 @@ where
A: Accessor<Pager = P>,
P: output::Page,
{
async fn next_page(&mut self) -> Result<Option<Vec<Entry>>> {
async fn next(&mut self) -> Result<Option<Vec<Entry>>> {
use CompletePager::*;

match self {
AlreadyComplete(p) => p.next_page().await,
NeedFlat(p) => p.next_page().await,
NeedHierarchy(p) => p.next_page().await,
AlreadyComplete(p) => p.next().await,
NeedFlat(p) => p.next().await,
NeedHierarchy(p) => p.next().await,
}
}
}
Expand All @@ -496,13 +496,13 @@ where
A: Accessor<BlockingPager = P>,
P: output::BlockingPage,
{
fn next_page(&mut self) -> Result<Option<Vec<Entry>>> {
fn next(&mut self) -> Result<Option<Vec<Entry>>> {
use CompletePager::*;

match self {
AlreadyComplete(p) => p.next_page(),
NeedFlat(p) => p.next_page(),
NeedHierarchy(p) => p.next_page(),
AlreadyComplete(p) => p.next(),
NeedFlat(p) => p.next(),
NeedHierarchy(p) => p.next(),
}
}
}
8 changes: 4 additions & 4 deletions src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,13 @@ impl<R: output::BlockingWrite> output::BlockingWrite for ConcurrentLimitWrapper<

#[async_trait]
impl<R: output::Page> output::Page for ConcurrentLimitWrapper<R> {
async fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
self.inner.next_page().await
async fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
self.inner.next().await
}
}

impl<R: output::BlockingPage> output::BlockingPage for ConcurrentLimitWrapper<R> {
fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
self.inner.next_page()
fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
self.inner.next()
}
}
8 changes: 4 additions & 4 deletions src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ pub struct ErrorContextWrapper<T> {

#[async_trait::async_trait]
impl<T: output::Page> output::Page for ErrorContextWrapper<T> {
async fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
self.inner.next_page().await.map_err(|err| {
async fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
self.inner.next().await.map_err(|err| {
err.with_operation("Page::next_page")
.with_context("service", self.scheme)
.with_context("path", &self.path)
Expand All @@ -306,8 +306,8 @@ impl<T: output::Page> output::Page for ErrorContextWrapper<T> {
}

impl<T: output::BlockingPage> output::BlockingPage for ErrorContextWrapper<T> {
fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
self.inner.next_page().map_err(|err| {
fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
self.inner.next().map_err(|err| {
err.with_operation("Page::next_page")
.with_context("service", self.scheme)
.with_context("path", &self.path)
Expand Down
4 changes: 2 additions & 2 deletions src/layers/immutable_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,13 @@ impl ImmutableDir {

#[async_trait]
impl output::Page for ImmutableDir {
async fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
async fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
Ok(self.inner_next_page())
}
}

impl output::BlockingPage for ImmutableDir {
fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
Ok(self.inner_next_page())
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1244,8 +1244,8 @@ impl<P> LoggingPager<P> {

#[async_trait]
impl<P: output::Page> output::Page for LoggingPager<P> {
async fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
let res = self.inner.next_page().await;
async fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
let res = self.inner.next().await;

match &res {
Ok(Some(des)) => {
Expand Down Expand Up @@ -1288,8 +1288,8 @@ impl<P: output::Page> output::Page for LoggingPager<P> {
}

impl<P: output::BlockingPage> output::BlockingPage for LoggingPager<P> {
fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
let res = self.inner.next_page();
fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
let res = self.inner.next();

match &res {
Ok(Some(des)) => {
Expand Down
12 changes: 6 additions & 6 deletions src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,11 +681,11 @@ impl<P> RetryPager<P> {

#[async_trait]
impl<P: output::Page> output::Page for RetryPager<P> {
async fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
async fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
if let Some(sleep) = self.sleep.take() {
tokio::time::sleep(sleep).await;
}
match self.inner.next_page().await {
match self.inner.next().await {
Ok(v) => {
// request successful, reset backoff
self.reset_backoff();
Expand Down Expand Up @@ -720,7 +720,7 @@ impl<P: output::Page> output::Page for RetryPager<P> {
"operation={} path={} -> pager retry after {}s: error={:?}",
Operation::List, self.path, dur.as_secs_f64(), e);
self.sleep = Some(dur);
self.next_page().await
self.next().await
}
}
}
Expand All @@ -729,8 +729,8 @@ impl<P: output::Page> output::Page for RetryPager<P> {
}

impl<P: output::BlockingPage> output::BlockingPage for RetryPager<P> {
fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
{ || self.inner.next_page() }
fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
{ || self.inner.next() }
.retry(&self.policy)
.when(|e| e.is_temporary())
.notify(move |err, dur| {
Expand Down Expand Up @@ -859,7 +859,7 @@ mod tests {
}
#[async_trait]
impl output::Page for MockPager {
async fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
async fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
self.attempt += 1;
match self.attempt {
1 => Err(Error::new(
Expand Down
8 changes: 4 additions & 4 deletions src/layers/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,14 +322,14 @@ impl<R: input::BlockingRead> Read for TracingWrapper<R> {
#[async_trait]
impl<R: output::Page> output::Page for TracingWrapper<R> {
#[tracing::instrument(parent = &self.span, level = "debug", skip_all)]
async fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
self.inner.next_page().await
async fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
self.inner.next().await
}
}

impl<R: output::BlockingPage> output::BlockingPage for TracingWrapper<R> {
#[tracing::instrument(parent = &self.span, level = "debug", skip_all)]
fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
self.inner.next_page()
fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
self.inner.next()
}
}
8 changes: 4 additions & 4 deletions src/object/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl ObjectLister {
.pager
.as_mut()
.expect("pager must be valid")
.next_page()
.next()
.await?
{
// Ideally, the convert from `Vec` to `VecDeque` will not do reallocation.
Expand Down Expand Up @@ -124,7 +124,7 @@ impl Stream for ObjectLister {

let mut pager = self.pager.take().expect("pager must be valid");
let fut = async move {
let res = pager.next_page().await;
let res = pager.next().await;

(pager, res)
};
Expand Down Expand Up @@ -159,7 +159,7 @@ impl BlockingObjectLister {
let entries = if !self.buf.is_empty() {
mem::take(&mut self.buf)
} else {
match self.pager.next_page()? {
match self.pager.next()? {
// Ideally, the convert from `Vec` to `VecDeque` will not do reallocation.
//
// However, this could be changed as described in [impl<T, A> From<Vec<T, A>> for VecDeque<T, A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E)
Expand All @@ -186,7 +186,7 @@ impl Iterator for BlockingObjectLister {
return Some(Ok(oe.into_object(self.operator())));
}

self.buf = match self.pager.next_page() {
self.buf = match self.pager.next() {
// Ideally, the convert from `Vec` to `VecDeque` will not do reallocation.
//
// However, this could be changed as described in [impl<T, A> From<Vec<T, A>> for VecDeque<T, A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E)
Expand Down
6 changes: 4 additions & 2 deletions src/object/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
use std::fmt::Display;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::Context;
use std::task::Poll;

use bytes::Bytes;
use futures::future::BoxFuture;
use futures::ready;
use futures::AsyncWrite;
use futures::FutureExt;
use futures::{ready, AsyncWrite};

use crate::ops::OpWrite;
use crate::raw::*;
Expand Down
4 changes: 2 additions & 2 deletions src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,13 @@ impl KvPager {

#[async_trait]
impl output::Page for KvPager {
async fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
async fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
Ok(self.inner_next_page())
}
}

impl output::BlockingPage for KvPager {
fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
Ok(self.inner_next_page())
}
}
Expand Down
28 changes: 14 additions & 14 deletions src/raw/io/output/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,32 @@ pub trait Page: Send + Sync + 'static {
/// Fetch a new page of [`Entry`]
///
/// `Ok(None)` means all object pages have been returned. Any following call
/// to `next_page` will always get the same result.
async fn next_page(&mut self) -> Result<Option<Vec<Entry>>>;
/// to `next` will always get the same result.
async fn next(&mut self) -> Result<Option<Vec<Entry>>>;
}

/// The boxed version of [`Page`]
pub type Pager = Box<dyn Page>;

#[async_trait]
impl Page for Pager {
async fn next_page(&mut self) -> Result<Option<Vec<Entry>>> {
self.as_mut().next_page().await
async fn next(&mut self) -> Result<Option<Vec<Entry>>> {
self.as_mut().next().await
}
}

#[async_trait]
impl Page for () {
async fn next_page(&mut self) -> Result<Option<Vec<Entry>>> {
async fn next(&mut self) -> Result<Option<Vec<Entry>>> {
Ok(None)
}
}

#[async_trait]
impl<P: Page> Page for Option<P> {
async fn next_page(&mut self) -> Result<Option<Vec<Entry>>> {
async fn next(&mut self) -> Result<Option<Vec<Entry>>> {
match self {
Some(p) => p.next_page().await,
Some(p) => p.next().await,
None => Ok(None),
}
}
Expand All @@ -60,30 +60,30 @@ pub trait BlockingPage: 'static {
/// Fetch a new page of [`Entry`]
///
/// `Ok(None)` means all object pages have been returned. Any following call
/// to `next_page` will always get the same result.
fn next_page(&mut self) -> Result<Option<Vec<Entry>>>;
/// to `next` will always get the same result.
fn next(&mut self) -> Result<Option<Vec<Entry>>>;
}

/// BlockingPager is a boxed [`BlockingPage`]
pub type BlockingPager = Box<dyn BlockingPage>;

impl BlockingPage for BlockingPager {
fn next_page(&mut self) -> Result<Option<Vec<Entry>>> {
self.as_mut().next_page()
fn next(&mut self) -> Result<Option<Vec<Entry>>> {
self.as_mut().next()
}
}

impl BlockingPage for () {
fn next_page(&mut self) -> Result<Option<Vec<Entry>>> {
fn next(&mut self) -> Result<Option<Vec<Entry>>> {
Ok(None)
}
}

#[async_trait]
impl<P: BlockingPage> BlockingPage for Option<P> {
fn next_page(&mut self) -> Result<Option<Vec<Entry>>> {
fn next(&mut self) -> Result<Option<Vec<Entry>>> {
match self {
Some(p) => p.next_page(),
Some(p) => p.next(),
None => Ok(None),
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/raw/io/output/to_flat_pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
A: Accessor<Pager = P>,
P: output::Page,
{
async fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
async fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
loop {
if let Some(de) = self.dirs.pop_back() {
let (_, op) = self.acc.list(de.path(), OpList::new()).await?;
Expand All @@ -116,7 +116,7 @@ where
};

if buf.is_empty() {
match pager.next_page().await? {
match pager.next().await? {
Some(v) => {
buf = v;
}
Expand Down Expand Up @@ -155,7 +155,7 @@ where
A: Accessor<BlockingPager = P>,
P: output::BlockingPage,
{
fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
loop {
if let Some(de) = self.dirs.pop_back() {
let (_, op) = self.acc.blocking_list(de.path(), OpList::new())?;
Expand All @@ -173,7 +173,7 @@ where
};

if buf.is_empty() {
match pager.next_page()? {
match pager.next()? {
Some(v) => {
buf = v;
}
Expand Down Expand Up @@ -267,7 +267,7 @@ mod tests {
}

impl BlockingPage for MockPager {
fn next_page(&mut self) -> Result<Option<Vec<output::Entry>>> {
fn next(&mut self) -> Result<Option<Vec<output::Entry>>> {
if self.done {
return Ok(None);
}
Expand Down Expand Up @@ -298,7 +298,7 @@ mod tests {

let mut entries = Vec::default();

while let Some(e) = pager.next_page()? {
while let Some(e) = pager.next()? {
entries.extend_from_slice(&e)
}

Expand Down
Loading