From b014ca60edf1e70de3482ce1312dbbb8851faa19 Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Sun, 20 Oct 2024 15:22:07 +0800 Subject: [PATCH 1/4] feat(adapter/kv): support async iterating on scan results --- core/Cargo.lock | 45 ++++++++++++++++ core/Cargo.toml | 5 +- core/src/raw/adapters/kv/api.rs | 13 ++++- core/src/raw/adapters/kv/backend.rs | 62 +++++++++++++++++----- core/src/raw/adapters/kv/mod.rs | 1 + core/src/services/atomicserver/backend.rs | 2 + core/src/services/cacache/backend.rs | 2 + core/src/services/cloudflare_kv/backend.rs | 15 +++++- core/src/services/d1/backend.rs | 2 + core/src/services/etcd/backend.rs | 11 ++-- core/src/services/foundationdb/backend.rs | 2 + core/src/services/gridfs/backend.rs | 2 + core/src/services/libsql/backend.rs | 2 + core/src/services/memcached/backend.rs | 2 + core/src/services/mongodb/backend.rs | 2 + core/src/services/mysql/backend.rs | 2 + core/src/services/nebula_graph/backend.rs | 10 ++-- core/src/services/persy/backend.rs | 2 + core/src/services/postgresql/backend.rs | 2 + core/src/services/redb/backend.rs | 2 + core/src/services/redis/backend.rs | 2 + core/src/services/rocksdb/backend.rs | 12 +++-- core/src/services/sled/backend.rs | 12 +++-- core/src/services/sqlite/backend.rs | 55 +++++++++++++++---- core/src/services/surrealdb/backend.rs | 2 + core/src/services/tikv/backend.rs | 2 + 26 files changed, 232 insertions(+), 39 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index 800575eb3dd..9ff51b5632b 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -80,6 +80,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "aliasable" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" + [[package]] name = "aligned-array" version = "1.0.1" @@ -5036,6 +5042,7 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", + "ouroboros", "percent-encoding", "persy", "pretty_assertions", @@ -5368,6 +5375,31 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "ouroboros" +version = "0.18.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "944fa20996a25aded6b4795c6d63f10014a7a83f8be9828a11860b08c5fc4a67" +dependencies = [ + "aliasable", + "ouroboros_macro", + "static_assertions", +] + +[[package]] +name = "ouroboros_macro" +version = "0.18.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39b0deead1528fd0e5947a8546a9642a9777c25f6e1e26f34c97b204bbb465bd" +dependencies = [ + "heck 0.4.1", + "itertools 0.12.1", + "proc-macro2", + "proc-macro2-diagnostics", + "quote", + "syn 2.0.79", +] + [[package]] name = "outref" version = "0.5.1" @@ -5908,6 +5940,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "proc-macro2-diagnostics" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", + "version_check", + "yansi", +] + [[package]] name = "procfs" version = "0.16.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index bb01c7a7d1f..ae341a83370 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -194,7 +194,7 @@ services-s3 = [ services-seafile = [] services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"] services-sled = ["dep:sled", "internal-tokio-rt"] -services-sqlite = ["dep:sqlx", "sqlx?/sqlite"] +services-sqlite = ["dep:sqlx", "sqlx?/sqlite", "dep:ouroboros"] services-supabase = [] services-surrealdb = ["dep:surrealdb"] services-swift = [] @@ -277,6 +277,9 @@ sqlx = { version = "0.8.0", features = [ # For http based services. reqsign = { version = "0.16", default-features = false, optional = true } +# for self-referencing structs +ouroboros = { version = "0.18.4", optional = true } + # for services-atomic-server atomic_lib = { version = "0.39.0", optional = true } # for services-cacache diff --git a/core/src/raw/adapters/kv/api.rs b/core/src/raw/adapters/kv/api.rs index acf449d58bd..7d1adb390e1 100644 --- a/core/src/raw/adapters/kv/api.rs +++ b/core/src/raw/adapters/kv/api.rs @@ -18,17 +18,28 @@ use std::fmt::Debug; use std::future::ready; +use futures::stream::Empty; use futures::Future; +use futures::Stream; use crate::raw::*; use crate::Capability; use crate::Scheme; use crate::*; +/// A noop placeholder for Adapter::ScanIter +pub type EmptyScanIter = Empty>; + /// KvAdapter is the adapter to underlying kv services. /// /// By implement this trait, any kv service can work as an OpenDAL Service. pub trait Adapter: Send + Sync + Debug + Unpin + 'static { + /// async iterator type for Adapter::scan() + /// + /// TODO: consider to replace it with std::async_iter::AsyncIterator after stablized + /// TODO: use default associate type `= EmptyScanIter` after stablized + type ScanIter: Stream> + Send + Unpin; + /// Return the metadata of this key value accessor. fn metadata(&self) -> Metadata; @@ -81,7 +92,7 @@ pub trait Adapter: Send + Sync + Debug + Unpin + 'static { } /// Scan a key prefix to get all keys that start with this key. - fn scan(&self, path: &str) -> impl Future>> + MaybeSend { + fn scan(&self, path: &str) -> impl Future> + MaybeSend { let _ = path; ready(Err(Error::new( diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 3a7ea3525d9..17c3eaf51a6 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -18,6 +18,9 @@ use std::sync::Arc; use std::vec::IntoIter; +use futures::lock::Mutex; +use futures::{Stream, StreamExt}; + use super::Adapter; use crate::raw::oio::HierarchyLister; use crate::raw::oio::QueueBuf; @@ -68,8 +71,8 @@ impl Access for Backend { type BlockingReader = Buffer; type Writer = KvWriter; type BlockingWriter = KvWriter; - type Lister = HierarchyLister; - type BlockingLister = HierarchyLister; + type Lister = HierarchyLister>; + type BlockingLister = HierarchyLister; fn info(&self) -> Arc { let mut am: AccessorInfo = self.kv.metadata().into(); @@ -182,19 +185,60 @@ impl Access for Backend { fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { let p = build_abs_path(&self.root, path); let res = self.kv.blocking_scan(&p)?; - let lister = KvLister::new(&self.root, res); + let lister = BlockingKvLister::new(&self.root, res); let lister = HierarchyLister::new(lister, path, args.recursive()); Ok((RpList::default(), lister)) } } -pub struct KvLister { +pub struct KvLister { + root: String, + inner: Mutex, +} + +impl KvLister +where + Iter: Stream> + Send + Unpin, +{ + fn new(root: &str, inner: Iter) -> Self { + Self { + root: root.to_string(), + inner: Mutex::new(inner), + } + } + + async fn inner_next(&mut self) -> Result> { + Ok(self.inner.lock().await.next().await.transpose()?.map(|v| { + let mode = if v.ends_with('/') { + EntryMode::DIR + } else { + EntryMode::FILE + }; + let mut path = build_rel_path(&self.root, &v); + if path.is_empty() { + path = "/".to_string(); + } + oio::Entry::new(&path, Metadata::new(mode)) + })) + } +} + +impl oio::List for KvLister +where + Iter: Stream> + Send + Unpin, +{ + async fn next(&mut self) -> Result> { + self.inner_next().await + } +} + +pub struct BlockingKvLister { root: String, inner: IntoIter, } -impl KvLister { +impl BlockingKvLister { fn new(root: &str, inner: Vec) -> Self { Self { root: root.to_string(), @@ -218,13 +262,7 @@ impl KvLister { } } -impl oio::List for KvLister { - async fn next(&mut self) -> Result> { - Ok(self.inner_next()) - } -} - -impl oio::BlockingList for KvLister { +impl oio::BlockingList for BlockingKvLister { fn next(&mut self) -> Result> { Ok(self.inner_next()) } diff --git a/core/src/raw/adapters/kv/mod.rs b/core/src/raw/adapters/kv/mod.rs index facb6efe1c5..dbc59520bfa 100644 --- a/core/src/raw/adapters/kv/mod.rs +++ b/core/src/raw/adapters/kv/mod.rs @@ -21,6 +21,7 @@ mod api; pub use api::Adapter; +pub use api::EmptyScanIter; pub use api::Metadata; mod backend; diff --git a/core/src/services/atomicserver/backend.rs b/core/src/services/atomicserver/backend.rs index ac5655bead0..8cbadc45101 100644 --- a/core/src/services/atomicserver/backend.rs +++ b/core/src/services/atomicserver/backend.rs @@ -351,6 +351,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Atomicserver, diff --git a/core/src/services/cacache/backend.rs b/core/src/services/cacache/backend.rs index 85914d864fd..39f4cead87a 100644 --- a/core/src/services/cacache/backend.rs +++ b/core/src/services/cacache/backend.rs @@ -85,6 +85,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Cacache, diff --git a/core/src/services/cloudflare_kv/backend.rs b/core/src/services/cloudflare_kv/backend.rs index 6a15187c3fa..8de7ffded46 100644 --- a/core/src/services/cloudflare_kv/backend.rs +++ b/core/src/services/cloudflare_kv/backend.rs @@ -17,8 +17,11 @@ use std::fmt::Debug; use std::fmt::Formatter; +use std::vec; use bytes::Buf; +use futures::stream; +use futures::stream::iter; use http::header; use http::Request; use http::StatusCode; @@ -181,6 +184,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = stream::Iter>>; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::CloudflareKv, @@ -240,7 +245,7 @@ impl kv::Adapter for Adapter { } } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let mut url = format!("{}/keys", self.url_prefix); if !path.is_empty() { url = format!("{}?prefix={}", url, path); @@ -261,7 +266,13 @@ impl kv::Adapter for Adapter { format!("failed to parse error response: {}", e), ) })?; - Ok(response.result.into_iter().map(|r| r.name).collect()) + Ok(iter( + response + .result + .into_iter() + .map(|r| Ok(r.name)) + .collect::>(), + )) } _ => Err(parse_error(resp)), } diff --git a/core/src/services/d1/backend.rs b/core/src/services/d1/backend.rs index f50fd674742..acf7bdbd211 100644 --- a/core/src/services/d1/backend.rs +++ b/core/src/services/d1/backend.rs @@ -258,6 +258,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::D1, diff --git a/core/src/services/etcd/backend.rs b/core/src/services/etcd/backend.rs index 103e7c7abb6..59a4aa34460 100644 --- a/core/src/services/etcd/backend.rs +++ b/core/src/services/etcd/backend.rs @@ -17,6 +17,7 @@ use std::fmt::Debug; use std::fmt::Formatter; +use std::vec; use bb8::PooledConnection; use bb8::RunError; @@ -27,6 +28,8 @@ use etcd_client::Error as EtcdError; use etcd_client::GetOptions; use etcd_client::Identity; use etcd_client::TlsOptions; +use futures::stream; +use futures::stream::iter; use tokio::sync::OnceCell; use crate::raw::adapters::kv; @@ -271,6 +274,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = stream::Iter>>; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Etcd, @@ -310,7 +315,7 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let mut client = self.conn().await?; let get_options = Some(GetOptions::new().with_prefix().with_keys_only()); let resp = client @@ -323,10 +328,10 @@ impl kv::Adapter for Adapter { Error::new(ErrorKind::Unexpected, "store key is not valid utf-8 string") .set_source(err) })?; - res.push(v); + res.push(Ok(v)); } - Ok(res) + Ok(iter(res)) } } diff --git a/core/src/services/foundationdb/backend.rs b/core/src/services/foundationdb/backend.rs index 4d4adfa52fd..4eecb1abcf4 100644 --- a/core/src/services/foundationdb/backend.rs +++ b/core/src/services/foundationdb/backend.rs @@ -110,6 +110,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Foundationdb, diff --git a/core/src/services/gridfs/backend.rs b/core/src/services/gridfs/backend.rs index db2bf34456e..7d1dd9a1897 100644 --- a/core/src/services/gridfs/backend.rs +++ b/core/src/services/gridfs/backend.rs @@ -212,6 +212,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Gridfs, diff --git a/core/src/services/libsql/backend.rs b/core/src/services/libsql/backend.rs index ff7b9551e6a..03cddf1a1a6 100644 --- a/core/src/services/libsql/backend.rs +++ b/core/src/services/libsql/backend.rs @@ -305,6 +305,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Libsql, diff --git a/core/src/services/memcached/backend.rs b/core/src/services/memcached/backend.rs index c89b81ba704..0627175abd4 100644 --- a/core/src/services/memcached/backend.rs +++ b/core/src/services/memcached/backend.rs @@ -197,6 +197,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Memcached, diff --git a/core/src/services/mongodb/backend.rs b/core/src/services/mongodb/backend.rs index 24abe6fb03e..ee7d21d9321 100644 --- a/core/src/services/mongodb/backend.rs +++ b/core/src/services/mongodb/backend.rs @@ -226,6 +226,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Mongodb, diff --git a/core/src/services/mysql/backend.rs b/core/src/services/mysql/backend.rs index 0b1481ef01c..2b357286a3c 100644 --- a/core/src/services/mysql/backend.rs +++ b/core/src/services/mysql/backend.rs @@ -188,6 +188,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Mysql, diff --git a/core/src/services/nebula_graph/backend.rs b/core/src/services/nebula_graph/backend.rs index d03dd5bf2b1..1189b2664d6 100644 --- a/core/src/services/nebula_graph/backend.rs +++ b/core/src/services/nebula_graph/backend.rs @@ -19,10 +19,12 @@ use std::fmt::Debug; #[cfg(feature = "tests")] use std::time::Duration; +use std::vec; use base64::engine::general_purpose::STANDARD as BASE64; use base64::engine::Engine as _; use bb8::{PooledConnection, RunError}; +use futures::stream::{self, iter}; use rust_nebula::{ graph::GraphQuery, HostAddress, SingleConnSessionConf, SingleConnSessionManager, }; @@ -269,6 +271,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = stream::Iter>>; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::NebulaGraph, @@ -359,7 +363,7 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let path = path.replace("'", "\\'").replace('"', "\\\""); let query = format!( "LOOKUP ON {} WHERE {}.{} STARTS WITH '{}' YIELD properties(vertex).{} AS {};", @@ -381,9 +385,9 @@ impl kv::Adapter for Adapter { .map_err(parse_nebulagraph_dataset_error)?; let sub_path = value.as_string().map_err(parse_nebulagraph_dataset_error)?; - res_vec.push(sub_path); + res_vec.push(Ok(sub_path)); } - Ok(res_vec) + Ok(iter(res_vec)) } } diff --git a/core/src/services/persy/backend.rs b/core/src/services/persy/backend.rs index 10b48db0813..766f1d79bf2 100644 --- a/core/src/services/persy/backend.rs +++ b/core/src/services/persy/backend.rs @@ -152,6 +152,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Persy, diff --git a/core/src/services/postgresql/backend.rs b/core/src/services/postgresql/backend.rs index 8b4ea56eb22..dc10edd6f8e 100644 --- a/core/src/services/postgresql/backend.rs +++ b/core/src/services/postgresql/backend.rs @@ -187,6 +187,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Postgresql, diff --git a/core/src/services/redb/backend.rs b/core/src/services/redb/backend.rs index d6dc290e758..647dabc4e2c 100644 --- a/core/src/services/redb/backend.rs +++ b/core/src/services/redb/backend.rs @@ -111,6 +111,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Redb, diff --git a/core/src/services/redis/backend.rs b/core/src/services/redis/backend.rs index bf8cff20185..aaba3e461d2 100644 --- a/core/src/services/redis/backend.rs +++ b/core/src/services/redis/backend.rs @@ -327,6 +327,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Redis, diff --git a/core/src/services/rocksdb/backend.rs b/core/src/services/rocksdb/backend.rs index 5ed0f38ec6b..19156bcc0ec 100644 --- a/core/src/services/rocksdb/backend.rs +++ b/core/src/services/rocksdb/backend.rs @@ -19,6 +19,8 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +use futures::stream::iter; +use futures::Stream; use rocksdb::DB; use tokio::task; @@ -108,6 +110,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type ScanIter = Box> + Send + Unpin>; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Rocksdb, @@ -164,13 +168,15 @@ impl kv::Adapter for Adapter { self.db.delete(path).map_err(parse_rocksdb_error) } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let cloned_self = self.clone(); let cloned_path = path.to_string(); - task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str())) + let res = task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str())) .await - .map_err(new_task_join_error)? + .map_err(new_task_join_error)??; + + Ok(Box::new(iter(res.into_iter().map(Ok)))) } /// TODO: we only need key here. diff --git a/core/src/services/sled/backend.rs b/core/src/services/sled/backend.rs index d1d1e7f1845..4b72f08af80 100644 --- a/core/src/services/sled/backend.rs +++ b/core/src/services/sled/backend.rs @@ -19,6 +19,8 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::str; +use futures::stream::iter; +use futures::Stream; use tokio::task; use crate::raw::adapters::kv; @@ -137,6 +139,8 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { + type ScanIter = Box> + Send + Unpin>; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Sled, @@ -199,13 +203,15 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let cloned_self = self.clone(); let cloned_path = path.to_string(); - task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str())) + let res = task::spawn_blocking(move || cloned_self.blocking_scan(cloned_path.as_str())) .await - .map_err(new_task_join_error)? + .map_err(new_task_join_error)??; + + Ok(Box::new(iter(res.into_iter().map(Ok)))) } fn blocking_scan(&self, path: &str) -> Result> { diff --git a/core/src/services/sqlite/backend.rs b/core/src/services/sqlite/backend.rs index 0b5c556f8cb..4b56fda054c 100644 --- a/core/src/services/sqlite/backend.rs +++ b/core/src/services/sqlite/backend.rs @@ -17,8 +17,15 @@ use std::fmt::Debug; use std::fmt::Formatter; +use std::pin::Pin; use std::str::FromStr; +use std::task::Context; +use std::task::Poll; +use futures::stream::BoxStream; +use futures::Stream; +use futures::StreamExt; +use ouroboros::self_referencing; use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; use tokio::sync::OnceCell; @@ -188,7 +195,27 @@ impl Adapter { } } +#[self_referencing] +pub struct SqlStream { + pool: SqlitePool, + query: String, + + #[borrows(pool, query)] + #[covariant] + stream: BoxStream<'this, Result>, +} + +impl Stream for SqlStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.with_stream_mut(|s| s.poll_next_unpin(cx)) + } +} + impl kv::Adapter for Adapter { + type ScanIter = SqlStream; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Sqlite, @@ -249,19 +276,25 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn scan(&self, path: &str) -> Result> { + async fn scan(&self, path: &str) -> Result { let pool = self.get_client().await?; + let stream = SqlStreamBuilder { + pool: pool.clone(), + query: format!( + "SELECT `{}` FROM `{}` WHERE `{}` LIKE $1", + self.key_field, self.table, self.key_field + ), + stream_builder: |pool, query| { + sqlx::query_scalar(query) + .bind(format!("{path}%")) + .fetch(pool) + .map(|v| v.map_err(parse_sqlite_error)) + .boxed() + }, + } + .build(); - let value = sqlx::query_scalar(&format!( - "SELECT `{}` FROM `{}` WHERE `{}` LIKE $1", - self.key_field, self.table, self.key_field - )) - .bind(format!("{path}%")) - .fetch_all(pool) - .await - .map_err(parse_sqlite_error)?; - - Ok(value) + Ok(stream) } } diff --git a/core/src/services/surrealdb/backend.rs b/core/src/services/surrealdb/backend.rs index a7c098a0ad8..4d5d2657611 100644 --- a/core/src/services/surrealdb/backend.rs +++ b/core/src/services/surrealdb/backend.rs @@ -283,6 +283,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Surrealdb, diff --git a/core/src/services/tikv/backend.rs b/core/src/services/tikv/backend.rs index 6f23f156ee7..1d34847990d 100644 --- a/core/src/services/tikv/backend.rs +++ b/core/src/services/tikv/backend.rs @@ -185,6 +185,8 @@ impl Adapter { } impl kv::Adapter for Adapter { + type ScanIter = kv::EmptyScanIter; + fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::Tikv, From 241a1e718b0308921a5b1055d45d7b3e20d36a80 Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Tue, 22 Oct 2024 01:08:57 +0800 Subject: [PATCH 2/4] fix --- core/src/raw/adapters/kv/api.rs | 69 ++++++++++++++++++---- core/src/raw/adapters/kv/backend.rs | 17 +++--- core/src/raw/adapters/kv/mod.rs | 4 +- core/src/services/atomicserver/backend.rs | 2 +- core/src/services/cacache/backend.rs | 2 +- core/src/services/cloudflare_kv/backend.rs | 17 ++---- core/src/services/d1/backend.rs | 2 +- core/src/services/etcd/backend.rs | 8 +-- core/src/services/foundationdb/backend.rs | 2 +- core/src/services/gridfs/backend.rs | 2 +- core/src/services/libsql/backend.rs | 2 +- core/src/services/memcached/backend.rs | 2 +- core/src/services/mongodb/backend.rs | 2 +- core/src/services/mysql/backend.rs | 2 +- core/src/services/nebula_graph/backend.rs | 7 +-- core/src/services/persy/backend.rs | 2 +- core/src/services/postgresql/backend.rs | 2 +- core/src/services/redb/backend.rs | 2 +- core/src/services/redis/backend.rs | 2 +- core/src/services/rocksdb/backend.rs | 8 +-- core/src/services/sled/backend.rs | 8 +-- core/src/services/sqlite/backend.rs | 12 +++- core/src/services/surrealdb/backend.rs | 2 +- core/src/services/tikv/backend.rs | 2 +- 24 files changed, 111 insertions(+), 69 deletions(-) diff --git a/core/src/raw/adapters/kv/api.rs b/core/src/raw/adapters/kv/api.rs index 7d1adb390e1..3ee1b57639b 100644 --- a/core/src/raw/adapters/kv/api.rs +++ b/core/src/raw/adapters/kv/api.rs @@ -17,28 +17,77 @@ use std::fmt::Debug; use std::future::ready; +use std::ops::DerefMut; -use futures::stream::Empty; use futures::Future; -use futures::Stream; use crate::raw::*; use crate::Capability; use crate::Scheme; use crate::*; -/// A noop placeholder for Adapter::ScanIter -pub type EmptyScanIter = Empty>; +/// ScanIter is the async iterator returned by `Adapter::scan`. +pub trait Scan: Send + Sync + Unpin { + /// Fetch the next key in the current key prefix + /// + /// `None` means no further key will be returned + fn next(&mut self) -> impl Future>> + MaybeSend; +} + +/// A noop implementation of Scan +impl Scan for () { + async fn next(&mut self) -> Option> { + None + } +} + +/// A ScanIterator implementation for all trivial non-async iterators +pub struct ScanStdIter(I); + +impl ScanStdIter +where + I: Iterator> + Unpin + Send + Sync, +{ + /// Create a new ScanStdIter from an Iterator + pub fn new(inner: I) -> Self { + Self(inner) + } +} + +impl Scan for ScanStdIter +where + I: Iterator> + Unpin + Send + Sync, +{ + async fn next(&mut self) -> Option> { + self.0.next() + } +} + +/// A type-erased wrapper of Scan +pub type Scanner = Box; + +pub trait ScanDyn: Unpin + Send + Sync { + fn next_dyn(&mut self) -> BoxedFuture>>; +} + +impl ScanDyn for T { + fn next_dyn(&mut self) -> BoxedFuture>> { + Box::pin(self.next()) + } +} + +impl Scan for Box { + async fn next(&mut self) -> Option> { + self.deref_mut().next_dyn().await + } +} /// KvAdapter is the adapter to underlying kv services. /// /// By implement this trait, any kv service can work as an OpenDAL Service. pub trait Adapter: Send + Sync + Debug + Unpin + 'static { - /// async iterator type for Adapter::scan() - /// - /// TODO: consider to replace it with std::async_iter::AsyncIterator after stablized - /// TODO: use default associate type `= EmptyScanIter` after stablized - type ScanIter: Stream> + Send + Unpin; + /// TODO: use default associate type `= ()` after stablized + type Scanner: Scan; /// Return the metadata of this key value accessor. fn metadata(&self) -> Metadata; @@ -92,7 +141,7 @@ pub trait Adapter: Send + Sync + Debug + Unpin + 'static { } /// Scan a key prefix to get all keys that start with this key. - fn scan(&self, path: &str) -> impl Future> + MaybeSend { + fn scan(&self, path: &str) -> impl Future> + MaybeSend { let _ = path; ready(Err(Error::new( diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 17c3eaf51a6..520f14351ed 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -18,10 +18,7 @@ use std::sync::Arc; use std::vec::IntoIter; -use futures::lock::Mutex; -use futures::{Stream, StreamExt}; - -use super::Adapter; +use super::{Adapter, Scan}; use crate::raw::oio::HierarchyLister; use crate::raw::oio::QueueBuf; use crate::raw::*; @@ -71,7 +68,7 @@ impl Access for Backend { type BlockingReader = Buffer; type Writer = KvWriter; type BlockingWriter = KvWriter; - type Lister = HierarchyLister>; + type Lister = HierarchyLister>; type BlockingLister = HierarchyLister; fn info(&self) -> Arc { @@ -194,22 +191,22 @@ impl Access for Backend { pub struct KvLister { root: String, - inner: Mutex, + inner: Iter, } impl KvLister where - Iter: Stream> + Send + Unpin, + Iter: Scan, { fn new(root: &str, inner: Iter) -> Self { Self { root: root.to_string(), - inner: Mutex::new(inner), + inner: inner, } } async fn inner_next(&mut self) -> Result> { - Ok(self.inner.lock().await.next().await.transpose()?.map(|v| { + Ok(self.inner.next().await.transpose()?.map(|v| { let mode = if v.ends_with('/') { EntryMode::DIR } else { @@ -226,7 +223,7 @@ where impl oio::List for KvLister where - Iter: Stream> + Send + Unpin, + Iter: Scan, { async fn next(&mut self) -> Result> { self.inner_next().await diff --git a/core/src/raw/adapters/kv/mod.rs b/core/src/raw/adapters/kv/mod.rs index dbc59520bfa..ef6d4793fee 100644 --- a/core/src/raw/adapters/kv/mod.rs +++ b/core/src/raw/adapters/kv/mod.rs @@ -21,8 +21,10 @@ mod api; pub use api::Adapter; -pub use api::EmptyScanIter; pub use api::Metadata; +pub use api::Scan; +pub use api::ScanStdIter; +pub use api::Scanner; mod backend; pub use backend::Backend; diff --git a/core/src/services/atomicserver/backend.rs b/core/src/services/atomicserver/backend.rs index 8cbadc45101..2a8318daa7a 100644 --- a/core/src/services/atomicserver/backend.rs +++ b/core/src/services/atomicserver/backend.rs @@ -351,7 +351,7 @@ impl Adapter { } impl kv::Adapter for Adapter { - type ScanIter = kv::EmptyScanIter; + type Scanner = (); fn metadata(&self) -> kv::Metadata { kv::Metadata::new( diff --git a/core/src/services/cacache/backend.rs b/core/src/services/cacache/backend.rs index 39f4cead87a..2083f124cf5 100644 --- a/core/src/services/cacache/backend.rs +++ b/core/src/services/cacache/backend.rs @@ -85,7 +85,7 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { - type ScanIter = kv::EmptyScanIter; + type Scanner = (); fn metadata(&self) -> kv::Metadata { kv::Metadata::new( diff --git a/core/src/services/cloudflare_kv/backend.rs b/core/src/services/cloudflare_kv/backend.rs index 8de7ffded46..ce68192f58e 100644 --- a/core/src/services/cloudflare_kv/backend.rs +++ b/core/src/services/cloudflare_kv/backend.rs @@ -17,11 +17,8 @@ use std::fmt::Debug; use std::fmt::Formatter; -use std::vec; use bytes::Buf; -use futures::stream; -use futures::stream::iter; use http::header; use http::Request; use http::StatusCode; @@ -184,7 +181,7 @@ impl Adapter { } impl kv::Adapter for Adapter { - type ScanIter = stream::Iter>>; + type Scanner = kv::Scanner; fn metadata(&self) -> kv::Metadata { kv::Metadata::new( @@ -245,7 +242,7 @@ impl kv::Adapter for Adapter { } } - async fn scan(&self, path: &str) -> Result { + async fn scan(&self, path: &str) -> Result { let mut url = format!("{}/keys", self.url_prefix); if !path.is_empty() { url = format!("{}?prefix={}", url, path); @@ -266,13 +263,9 @@ impl kv::Adapter for Adapter { format!("failed to parse error response: {}", e), ) })?; - Ok(iter( - response - .result - .into_iter() - .map(|r| Ok(r.name)) - .collect::>(), - )) + Ok(Box::new(kv::ScanStdIter::new( + response.result.into_iter().map(|r| Ok(r.name)), + ))) } _ => Err(parse_error(resp)), } diff --git a/core/src/services/d1/backend.rs b/core/src/services/d1/backend.rs index acf7bdbd211..b1c992b1a8c 100644 --- a/core/src/services/d1/backend.rs +++ b/core/src/services/d1/backend.rs @@ -258,7 +258,7 @@ impl Adapter { } impl kv::Adapter for Adapter { - type ScanIter = kv::EmptyScanIter; + type Scanner = (); fn metadata(&self) -> kv::Metadata { kv::Metadata::new( diff --git a/core/src/services/etcd/backend.rs b/core/src/services/etcd/backend.rs index 59a4aa34460..d37fb35d2a8 100644 --- a/core/src/services/etcd/backend.rs +++ b/core/src/services/etcd/backend.rs @@ -28,8 +28,6 @@ use etcd_client::Error as EtcdError; use etcd_client::GetOptions; use etcd_client::Identity; use etcd_client::TlsOptions; -use futures::stream; -use futures::stream::iter; use tokio::sync::OnceCell; use crate::raw::adapters::kv; @@ -274,7 +272,7 @@ impl Adapter { } impl kv::Adapter for Adapter { - type ScanIter = stream::Iter>>; + type Scanner = kv::ScanStdIter>>; fn metadata(&self) -> kv::Metadata { kv::Metadata::new( @@ -315,7 +313,7 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn scan(&self, path: &str) -> Result { + async fn scan(&self, path: &str) -> Result { let mut client = self.conn().await?; let get_options = Some(GetOptions::new().with_prefix().with_keys_only()); let resp = client @@ -331,7 +329,7 @@ impl kv::Adapter for Adapter { res.push(Ok(v)); } - Ok(iter(res)) + Ok(kv::ScanStdIter::new(res.into_iter())) } } diff --git a/core/src/services/foundationdb/backend.rs b/core/src/services/foundationdb/backend.rs index 4eecb1abcf4..d28b70152bc 100644 --- a/core/src/services/foundationdb/backend.rs +++ b/core/src/services/foundationdb/backend.rs @@ -110,7 +110,7 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { - type ScanIter = kv::EmptyScanIter; + type Scanner = (); fn metadata(&self) -> kv::Metadata { kv::Metadata::new( diff --git a/core/src/services/gridfs/backend.rs b/core/src/services/gridfs/backend.rs index 7d1dd9a1897..6d7898d1dd9 100644 --- a/core/src/services/gridfs/backend.rs +++ b/core/src/services/gridfs/backend.rs @@ -212,7 +212,7 @@ impl Adapter { } impl kv::Adapter for Adapter { - type ScanIter = kv::EmptyScanIter; + type Scanner = (); fn metadata(&self) -> kv::Metadata { kv::Metadata::new( diff --git a/core/src/services/libsql/backend.rs b/core/src/services/libsql/backend.rs index 03cddf1a1a6..c0870e37467 100644 --- a/core/src/services/libsql/backend.rs +++ b/core/src/services/libsql/backend.rs @@ -305,7 +305,7 @@ impl Adapter { } impl kv::Adapter for Adapter { - type ScanIter = kv::EmptyScanIter; + type Scanner = (); fn metadata(&self) -> kv::Metadata { kv::Metadata::new( diff --git a/core/src/services/memcached/backend.rs b/core/src/services/memcached/backend.rs index 0627175abd4..d0cc42c9211 100644 --- a/core/src/services/memcached/backend.rs +++ b/core/src/services/memcached/backend.rs @@ -197,7 +197,7 @@ impl Adapter { } impl kv::Adapter for Adapter { - type ScanIter = kv::EmptyScanIter; + type Scanner = (); fn metadata(&self) -> kv::Metadata { kv::Metadata::new( diff --git a/core/src/services/mongodb/backend.rs b/core/src/services/mongodb/backend.rs index ee7d21d9321..786c34dbe80 100644 --- a/core/src/services/mongodb/backend.rs +++ b/core/src/services/mongodb/backend.rs @@ -226,7 +226,7 @@ impl Adapter { } impl kv::Adapter for Adapter { - type ScanIter = kv::EmptyScanIter; + type Scanner = (); fn metadata(&self) -> kv::Metadata { kv::Metadata::new( diff --git a/core/src/services/mysql/backend.rs b/core/src/services/mysql/backend.rs index 2b357286a3c..4569431b7e9 100644 --- a/core/src/services/mysql/backend.rs +++ b/core/src/services/mysql/backend.rs @@ -188,7 +188,7 @@ impl Adapter { } impl kv::Adapter for Adapter { - type ScanIter = kv::EmptyScanIter; + type Scanner = (); fn metadata(&self) -> kv::Metadata { kv::Metadata::new( diff --git a/core/src/services/nebula_graph/backend.rs b/core/src/services/nebula_graph/backend.rs index 1189b2664d6..9c34018bfdb 100644 --- a/core/src/services/nebula_graph/backend.rs +++ b/core/src/services/nebula_graph/backend.rs @@ -24,7 +24,6 @@ use std::vec; use base64::engine::general_purpose::STANDARD as BASE64; use base64::engine::Engine as _; use bb8::{PooledConnection, RunError}; -use futures::stream::{self, iter}; use rust_nebula::{ graph::GraphQuery, HostAddress, SingleConnSessionConf, SingleConnSessionManager, }; @@ -271,7 +270,7 @@ impl Adapter { } impl kv::Adapter for Adapter { - type ScanIter = stream::Iter>>; + type Scanner = kv::ScanStdIter>>; fn metadata(&self) -> kv::Metadata { kv::Metadata::new( @@ -363,7 +362,7 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn scan(&self, path: &str) -> Result { + async fn scan(&self, path: &str) -> Result { let path = path.replace("'", "\\'").replace('"', "\\\""); let query = format!( "LOOKUP ON {} WHERE {}.{} STARTS WITH '{}' YIELD properties(vertex).{} AS {};", @@ -387,7 +386,7 @@ impl kv::Adapter for Adapter { res_vec.push(Ok(sub_path)); } - Ok(iter(res_vec)) + Ok(kv::ScanStdIter::new(res_vec.into_iter())) } } diff --git a/core/src/services/persy/backend.rs b/core/src/services/persy/backend.rs index 766f1d79bf2..e5317b11221 100644 --- a/core/src/services/persy/backend.rs +++ b/core/src/services/persy/backend.rs @@ -152,7 +152,7 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { - type ScanIter = kv::EmptyScanIter; + type Scanner = (); fn metadata(&self) -> kv::Metadata { kv::Metadata::new( diff --git a/core/src/services/postgresql/backend.rs b/core/src/services/postgresql/backend.rs index dc10edd6f8e..b7cbacc997a 100644 --- a/core/src/services/postgresql/backend.rs +++ b/core/src/services/postgresql/backend.rs @@ -187,7 +187,7 @@ impl Adapter { } impl kv::Adapter for Adapter { - type ScanIter = kv::EmptyScanIter; + type Scanner = (); fn metadata(&self) -> kv::Metadata { kv::Metadata::new( diff --git a/core/src/services/redb/backend.rs b/core/src/services/redb/backend.rs index 647dabc4e2c..281a5ac96a4 100644 --- a/core/src/services/redb/backend.rs +++ b/core/src/services/redb/backend.rs @@ -111,7 +111,7 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { - type ScanIter = kv::EmptyScanIter; + type Scanner = (); fn metadata(&self) -> kv::Metadata { kv::Metadata::new( diff --git a/core/src/services/redis/backend.rs b/core/src/services/redis/backend.rs index aaba3e461d2..e04f5af29da 100644 --- a/core/src/services/redis/backend.rs +++ b/core/src/services/redis/backend.rs @@ -327,7 +327,7 @@ impl Adapter { } impl kv::Adapter for Adapter { - type ScanIter = kv::EmptyScanIter; + type Scanner = (); fn metadata(&self) -> kv::Metadata { kv::Metadata::new( diff --git a/core/src/services/rocksdb/backend.rs b/core/src/services/rocksdb/backend.rs index 19156bcc0ec..ecbf9bdfcf0 100644 --- a/core/src/services/rocksdb/backend.rs +++ b/core/src/services/rocksdb/backend.rs @@ -19,8 +19,6 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; -use futures::stream::iter; -use futures::Stream; use rocksdb::DB; use tokio::task; @@ -110,7 +108,7 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { - type ScanIter = Box> + Send + Unpin>; + type Scanner = kv::Scanner; fn metadata(&self) -> kv::Metadata { kv::Metadata::new( @@ -168,7 +166,7 @@ impl kv::Adapter for Adapter { self.db.delete(path).map_err(parse_rocksdb_error) } - async fn scan(&self, path: &str) -> Result { + async fn scan(&self, path: &str) -> Result { let cloned_self = self.clone(); let cloned_path = path.to_string(); @@ -176,7 +174,7 @@ impl kv::Adapter for Adapter { .await .map_err(new_task_join_error)??; - Ok(Box::new(iter(res.into_iter().map(Ok)))) + Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok)))) } /// TODO: we only need key here. diff --git a/core/src/services/sled/backend.rs b/core/src/services/sled/backend.rs index 4b72f08af80..c4cb4bdf854 100644 --- a/core/src/services/sled/backend.rs +++ b/core/src/services/sled/backend.rs @@ -19,8 +19,6 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::str; -use futures::stream::iter; -use futures::Stream; use tokio::task; use crate::raw::adapters::kv; @@ -139,7 +137,7 @@ impl Debug for Adapter { } impl kv::Adapter for Adapter { - type ScanIter = Box> + Send + Unpin>; + type Scanner = kv::Scanner; fn metadata(&self) -> kv::Metadata { kv::Metadata::new( @@ -203,7 +201,7 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn scan(&self, path: &str) -> Result { + async fn scan(&self, path: &str) -> Result { let cloned_self = self.clone(); let cloned_path = path.to_string(); @@ -211,7 +209,7 @@ impl kv::Adapter for Adapter { .await .map_err(new_task_join_error)??; - Ok(Box::new(iter(res.into_iter().map(Ok)))) + Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok)))) } fn blocking_scan(&self, path: &str) -> Result> { diff --git a/core/src/services/sqlite/backend.rs b/core/src/services/sqlite/backend.rs index 4b56fda054c..f0f59da6301 100644 --- a/core/src/services/sqlite/backend.rs +++ b/core/src/services/sqlite/backend.rs @@ -213,8 +213,16 @@ impl Stream for SqlStream { } } +unsafe impl Sync for SqlStream {} + +impl kv::Scan for SqlStream { + async fn next(&mut self) -> Option> { + ::next(self).await + } +} + impl kv::Adapter for Adapter { - type ScanIter = SqlStream; + type Scanner = SqlStream; fn metadata(&self) -> kv::Metadata { kv::Metadata::new( @@ -276,7 +284,7 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn scan(&self, path: &str) -> Result { + async fn scan(&self, path: &str) -> Result { let pool = self.get_client().await?; let stream = SqlStreamBuilder { pool: pool.clone(), diff --git a/core/src/services/surrealdb/backend.rs b/core/src/services/surrealdb/backend.rs index 4d5d2657611..47b91e36057 100644 --- a/core/src/services/surrealdb/backend.rs +++ b/core/src/services/surrealdb/backend.rs @@ -283,7 +283,7 @@ impl Adapter { } impl kv::Adapter for Adapter { - type ScanIter = kv::EmptyScanIter; + type Scanner = (); fn metadata(&self) -> kv::Metadata { kv::Metadata::new( diff --git a/core/src/services/tikv/backend.rs b/core/src/services/tikv/backend.rs index 1d34847990d..275dcf9bbdc 100644 --- a/core/src/services/tikv/backend.rs +++ b/core/src/services/tikv/backend.rs @@ -185,7 +185,7 @@ impl Adapter { } impl kv::Adapter for Adapter { - type ScanIter = kv::EmptyScanIter; + type Scanner = (); fn metadata(&self) -> kv::Metadata { kv::Metadata::new( From dc2c4f3aa321b2dbdaf9011eec63608acaa3254a Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Thu, 24 Oct 2024 00:33:21 +0800 Subject: [PATCH 3/4] fix --- core/src/raw/adapters/kv/api.rs | 31 ++++++++++++++++++----------- core/src/raw/adapters/kv/backend.rs | 2 +- core/src/raw/adapters/kv/mod.rs | 9 ++++++++- core/src/services/sqlite/backend.rs | 16 +++++++-------- 4 files changed, 36 insertions(+), 22 deletions(-) diff --git a/core/src/raw/adapters/kv/api.rs b/core/src/raw/adapters/kv/api.rs index 3ee1b57639b..8f9b8e96583 100644 --- a/core/src/raw/adapters/kv/api.rs +++ b/core/src/raw/adapters/kv/api.rs @@ -26,30 +26,37 @@ use crate::Capability; use crate::Scheme; use crate::*; -/// ScanIter is the async iterator returned by `Adapter::scan`. +/// Scan is the async iterator returned by `Adapter::scan`. pub trait Scan: Send + Sync + Unpin { /// Fetch the next key in the current key prefix /// - /// `None` means no further key will be returned - fn next(&mut self) -> impl Future>> + MaybeSend; + /// `Ok(None)` means no further key will be returned + fn next(&mut self) -> impl Future>> + MaybeSend; } /// A noop implementation of Scan impl Scan for () { - async fn next(&mut self) -> Option> { - None + async fn next(&mut self) -> Result> { + Ok(None) } } -/// A ScanIterator implementation for all trivial non-async iterators +/// A Scan implementation for all trivial non-async iterators pub struct ScanStdIter(I); +#[cfg(any( + feature = "services-cloudflare-kv", + feature = "services-etcd", + feature = "services-nebula-graph", + feature = "services-rocksdb", + feature = "services-sled" +))] impl ScanStdIter where I: Iterator> + Unpin + Send + Sync, { /// Create a new ScanStdIter from an Iterator - pub fn new(inner: I) -> Self { + pub(crate) fn new(inner: I) -> Self { Self(inner) } } @@ -58,8 +65,8 @@ impl Scan for ScanStdIter where I: Iterator> + Unpin + Send + Sync, { - async fn next(&mut self) -> Option> { - self.0.next() + async fn next(&mut self) -> Result> { + self.0.next().transpose() } } @@ -67,17 +74,17 @@ where pub type Scanner = Box; pub trait ScanDyn: Unpin + Send + Sync { - fn next_dyn(&mut self) -> BoxedFuture>>; + fn next_dyn(&mut self) -> BoxedFuture>>; } impl ScanDyn for T { - fn next_dyn(&mut self) -> BoxedFuture>> { + fn next_dyn(&mut self) -> BoxedFuture>> { Box::pin(self.next()) } } impl Scan for Box { - async fn next(&mut self) -> Option> { + async fn next(&mut self) -> Result> { self.deref_mut().next_dyn().await } } diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 520f14351ed..bdbe8202a7b 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -206,7 +206,7 @@ where } async fn inner_next(&mut self) -> Result> { - Ok(self.inner.next().await.transpose()?.map(|v| { + Ok(self.inner.next().await?.map(|v| { let mode = if v.ends_with('/') { EntryMode::DIR } else { diff --git a/core/src/raw/adapters/kv/mod.rs b/core/src/raw/adapters/kv/mod.rs index ef6d4793fee..c03c8d71b80 100644 --- a/core/src/raw/adapters/kv/mod.rs +++ b/core/src/raw/adapters/kv/mod.rs @@ -23,7 +23,14 @@ mod api; pub use api::Adapter; pub use api::Metadata; pub use api::Scan; -pub use api::ScanStdIter; +#[cfg(any( + feature = "services-cloudflare-kv", + feature = "services-etcd", + feature = "services-nebula-graph", + feature = "services-rocksdb", + feature = "services-sled" +))] +pub(crate) use api::ScanStdIter; pub use api::Scanner; mod backend; diff --git a/core/src/services/sqlite/backend.rs b/core/src/services/sqlite/backend.rs index f0f59da6301..06158048341 100644 --- a/core/src/services/sqlite/backend.rs +++ b/core/src/services/sqlite/backend.rs @@ -196,7 +196,7 @@ impl Adapter { } #[self_referencing] -pub struct SqlStream { +pub struct SqliteScanner { pool: SqlitePool, query: String, @@ -205,7 +205,7 @@ pub struct SqlStream { stream: BoxStream<'this, Result>, } -impl Stream for SqlStream { +impl Stream for SqliteScanner { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -213,16 +213,16 @@ impl Stream for SqlStream { } } -unsafe impl Sync for SqlStream {} +unsafe impl Sync for SqliteScanner {} -impl kv::Scan for SqlStream { - async fn next(&mut self) -> Option> { - ::next(self).await +impl kv::Scan for SqliteScanner { + async fn next(&mut self) -> Result> { + ::next(self).await.transpose() } } impl kv::Adapter for Adapter { - type Scanner = SqlStream; + type Scanner = SqliteScanner; fn metadata(&self) -> kv::Metadata { kv::Metadata::new( @@ -286,7 +286,7 @@ impl kv::Adapter for Adapter { async fn scan(&self, path: &str) -> Result { let pool = self.get_client().await?; - let stream = SqlStreamBuilder { + let stream = SqliteScannerBuilder { pool: pool.clone(), query: format!( "SELECT `{}` FROM `{}` WHERE `{}` LIKE $1", From 1a92e296349b02b347cf46ea2a5e48c13e602ee5 Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Thu, 24 Oct 2024 00:40:40 +0800 Subject: [PATCH 4/4] fix --- core/src/raw/adapters/kv/backend.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index bdbe8202a7b..6b625c78b6e 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -201,7 +201,7 @@ where fn new(root: &str, inner: Iter) -> Self { Self { root: root.to_string(), - inner: inner, + inner, } }