Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(adapter/kv): support async iterating on scan results #5208

Merged
merged 7 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ services-s3 = [
services-seafile = []
services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"]
services-sled = ["dep:sled", "internal-tokio-rt"]
services-sqlite = ["dep:sqlx", "sqlx?/sqlite"]
services-sqlite = ["dep:sqlx", "sqlx?/sqlite", "dep:ouroboros"]
services-supabase = []
services-surrealdb = ["dep:surrealdb"]
services-swift = []
Expand Down Expand Up @@ -277,6 +277,9 @@ sqlx = { version = "0.8.0", features = [
# For http based services.
reqsign = { version = "0.16", default-features = false, optional = true }

# for self-referencing structs
ouroboros = { version = "0.18.4", optional = true }

# for services-atomic-server
atomic_lib = { version = "0.39.0", optional = true }
# for services-cacache
Expand Down
69 changes: 68 additions & 1 deletion core/src/raw/adapters/kv/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::fmt::Debug;
use std::future::ready;
use std::ops::DerefMut;

use futures::Future;

Expand All @@ -25,10 +26,76 @@ use crate::Capability;
use crate::Scheme;
use crate::*;

/// Scan is the async iterator returned by `Adapter::scan`.
pub trait Scan: Send + Sync + Unpin {
/// Fetch the next key in the current key prefix
///
/// `Ok(None)` means no further key will be returned
fn next(&mut self) -> impl Future<Output = Result<Option<String>>> + MaybeSend;
}

/// A noop implementation of Scan
impl Scan for () {
async fn next(&mut self) -> Result<Option<String>> {
Ok(None)
}
}

/// A Scan implementation for all trivial non-async iterators
pub struct ScanStdIter<I>(I);

#[cfg(any(
feature = "services-cloudflare-kv",
feature = "services-etcd",
feature = "services-nebula-graph",
feature = "services-rocksdb",
feature = "services-sled"
))]
impl<I> ScanStdIter<I>
where
I: Iterator<Item = Result<String>> + Unpin + Send + Sync,
{
/// Create a new ScanStdIter from an Iterator
pub(crate) fn new(inner: I) -> Self {
Self(inner)
}
}

impl<I> Scan for ScanStdIter<I>
where
I: Iterator<Item = Result<String>> + Unpin + Send + Sync,
{
async fn next(&mut self) -> Result<Option<String>> {
self.0.next().transpose()
}
}

/// A type-erased wrapper of Scan
pub type Scanner = Box<dyn ScanDyn>;

pub trait ScanDyn: Unpin + Send + Sync {
fn next_dyn(&mut self) -> BoxedFuture<Result<Option<String>>>;
}

impl<T: Scan + ?Sized> ScanDyn for T {
fn next_dyn(&mut self) -> BoxedFuture<Result<Option<String>>> {
Box::pin(self.next())
}
}

impl<T: ScanDyn + ?Sized> Scan for Box<T> {
async fn next(&mut self) -> Result<Option<String>> {
self.deref_mut().next_dyn().await
}
}

/// KvAdapter is the adapter to underlying kv services.
///
/// By implement this trait, any kv service can work as an OpenDAL Service.
pub trait Adapter: Send + Sync + Debug + Unpin + 'static {
/// TODO: use default associate type `= ()` after stablized
type Scanner: Scan;

/// Return the metadata of this key value accessor.
fn metadata(&self) -> Metadata;

Expand Down Expand Up @@ -81,7 +148,7 @@ pub trait Adapter: Send + Sync + Debug + Unpin + 'static {
}

/// Scan a key prefix to get all keys that start with this key.
fn scan(&self, path: &str) -> impl Future<Output = Result<Vec<String>>> + MaybeSend {
fn scan(&self, path: &str) -> impl Future<Output = Result<Self::Scanner>> + MaybeSend {
let _ = path;

ready(Err(Error::new(
Expand Down
61 changes: 48 additions & 13 deletions core/src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::sync::Arc;
use std::vec::IntoIter;

use super::Adapter;
use super::{Adapter, Scan};
use crate::raw::oio::HierarchyLister;
use crate::raw::oio::QueueBuf;
use crate::raw::*;
Expand Down Expand Up @@ -68,8 +68,8 @@ impl<S: Adapter> Access for Backend<S> {
type BlockingReader = Buffer;
type Writer = KvWriter<S>;
type BlockingWriter = KvWriter<S>;
type Lister = HierarchyLister<KvLister>;
type BlockingLister = HierarchyLister<KvLister>;
type Lister = HierarchyLister<KvLister<S::Scanner>>;
type BlockingLister = HierarchyLister<BlockingKvLister>;

fn info(&self) -> Arc<AccessorInfo> {
let mut am: AccessorInfo = self.kv.metadata().into();
Expand Down Expand Up @@ -182,19 +182,60 @@ impl<S: Adapter> Access for Backend<S> {
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
let p = build_abs_path(&self.root, path);
let res = self.kv.blocking_scan(&p)?;
let lister = KvLister::new(&self.root, res);
let lister = BlockingKvLister::new(&self.root, res);
let lister = HierarchyLister::new(lister, path, args.recursive());

Ok((RpList::default(), lister))
}
}

pub struct KvLister {
pub struct KvLister<Iter> {
root: String,
inner: Iter,
}

impl<Iter> KvLister<Iter>
where
Iter: Scan,
{
fn new(root: &str, inner: Iter) -> Self {
Self {
root: root.to_string(),
inner,
}
}

async fn inner_next(&mut self) -> Result<Option<oio::Entry>> {
Ok(self.inner.next().await?.map(|v| {
let mode = if v.ends_with('/') {
EntryMode::DIR
} else {
EntryMode::FILE
};
let mut path = build_rel_path(&self.root, &v);
if path.is_empty() {
path = "/".to_string();
}
oio::Entry::new(&path, Metadata::new(mode))
}))
}
}

impl<Iter> oio::List for KvLister<Iter>
where
Iter: Scan,
{
async fn next(&mut self) -> Result<Option<oio::Entry>> {
self.inner_next().await
}
}

pub struct BlockingKvLister {
root: String,
inner: IntoIter<String>,
}

impl KvLister {
impl BlockingKvLister {
fn new(root: &str, inner: Vec<String>) -> Self {
Self {
root: root.to_string(),
Expand All @@ -218,13 +259,7 @@ impl KvLister {
}
}

impl oio::List for KvLister {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
Ok(self.inner_next())
}
}

impl oio::BlockingList for KvLister {
impl oio::BlockingList for BlockingKvLister {
fn next(&mut self) -> Result<Option<oio::Entry>> {
Ok(self.inner_next())
}
Expand Down
10 changes: 10 additions & 0 deletions core/src/raw/adapters/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@
mod api;
pub use api::Adapter;
pub use api::Metadata;
pub use api::Scan;
#[cfg(any(
feature = "services-cloudflare-kv",
feature = "services-etcd",
feature = "services-nebula-graph",
feature = "services-rocksdb",
feature = "services-sled"
))]
pub(crate) use api::ScanStdIter;
pub use api::Scanner;

mod backend;
pub use backend::Backend;
2 changes: 2 additions & 0 deletions core/src/services/atomicserver/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type Scanner = ();

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::Atomicserver,
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/cacache/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ impl Debug for Adapter {
}

impl kv::Adapter for Adapter {
type Scanner = ();

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::Cacache,
Expand Down
8 changes: 6 additions & 2 deletions core/src/services/cloudflare_kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type Scanner = kv::Scanner;

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::CloudflareKv,
Expand Down Expand Up @@ -240,7 +242,7 @@ impl kv::Adapter for Adapter {
}
}

async fn scan(&self, path: &str) -> Result<Vec<String>> {
async fn scan(&self, path: &str) -> Result<Self::Scanner> {
let mut url = format!("{}/keys", self.url_prefix);
if !path.is_empty() {
url = format!("{}?prefix={}", url, path);
Expand All @@ -261,7 +263,9 @@ impl kv::Adapter for Adapter {
format!("failed to parse error response: {}", e),
)
})?;
Ok(response.result.into_iter().map(|r| r.name).collect())
Ok(Box::new(kv::ScanStdIter::new(
response.result.into_iter().map(|r| Ok(r.name)),
)))
}
_ => Err(parse_error(resp)),
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/d1/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type Scanner = ();

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::D1,
Expand Down
9 changes: 6 additions & 3 deletions core/src/services/etcd/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::fmt::Debug;
use std::fmt::Formatter;
use std::vec;

use bb8::PooledConnection;
use bb8::RunError;
Expand Down Expand Up @@ -271,6 +272,8 @@ impl Adapter {
}

impl kv::Adapter for Adapter {
type Scanner = kv::ScanStdIter<vec::IntoIter<Result<String>>>;

fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
Scheme::Etcd,
Expand Down Expand Up @@ -310,7 +313,7 @@ impl kv::Adapter for Adapter {
Ok(())
}

async fn scan(&self, path: &str) -> Result<Vec<String>> {
async fn scan(&self, path: &str) -> Result<Self::Scanner> {
let mut client = self.conn().await?;
let get_options = Some(GetOptions::new().with_prefix().with_keys_only());
let resp = client
Expand All @@ -323,10 +326,10 @@ impl kv::Adapter for Adapter {
Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string")
.set_source(err)
})?;
res.push(v);
res.push(Ok(v));
}

Ok(res)
Ok(kv::ScanStdIter::new(res.into_iter()))
}
}

Expand Down
Loading
Loading