Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(adapter/kv): support async iterating on scan results #5208

Merged
merged 7 commits into from
Nov 4, 2024

Conversation

PragmaTwice
Copy link
Member

@PragmaTwice PragmaTwice commented Oct 20, 2024

Which issue does this PR close?

Closes #3639.

Rationale for this change

Instead of performing a full scanning and returning a Vec<String>, now we introduce an associated type into kv::Adapter:

type Scanner: Scan;

So that the implementation can do paged scanning and returns a ScanIter, and users can iterate over it to get keys in batches from the server.

What changes are included in this PR?

  • add associated type ScanIter into kv::Adapter and refactor KvLister and Adaptor's Accessor impl;
  • refactor all services using kv::Adapter, especially for which has list cap;
  • for sqlite (via sqlx): now, instead of using fetch_all, we can use fetch to fetch data in batches instead of fetching all data.

Are there any user-facing changes?

No changes for end users.

@PragmaTwice PragmaTwice marked this pull request as ready for review October 20, 2024 09:06
Comment on lines 198 to 206
#[self_referencing]
pub struct SqlStream {
pool: SqlitePool,
query: String,

#[borrows(pool, query)]
#[covariant]
stream: BoxStream<'this, Result<String>>,
}
Copy link
Member Author

@PragmaTwice PragmaTwice Oct 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lifetime handling here is a bit tricky.

sqlx::query_scalar() and its .fetch() heavily depend on lifetime parameter restrictions:

pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
    where
        'q: 'e,
        E: 'e + Executor<'c, Database = DB>,
        DB: 'e,
        A: 'e,
        O: 'e,

So here to extend the lifetime (since we don't use GAT in our API here e.g. type ScanIter<'a>, in order to be compatible to trait List which is also not a GAT in Access.), we need to carry both the Pool and also the query &str along our Stream into return value and put it into a self-referencing structure and then we don't need to care the lifetime checking.

core/src/raw/adapters/kv/api.rs Outdated Show resolved Hide resolved
core/src/raw/adapters/kv/api.rs Outdated Show resolved Hide resolved
core/src/raw/adapters/kv/backend.rs Outdated Show resolved Hide resolved
core/src/raw/adapters/kv/api.rs Outdated Show resolved Hide resolved
core/src/raw/adapters/kv/api.rs Outdated Show resolved Hide resolved
core/src/raw/adapters/kv/api.rs Outdated Show resolved Hide resolved
core/src/raw/adapters/kv/mod.rs Outdated Show resolved Hide resolved
core/src/services/sqlite/backend.rs Outdated Show resolved Hide resolved
Copy link
Member

@Xuanwo Xuanwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @PragmaTwice, for this excellent pull request!

@Xuanwo
Copy link
Member

Xuanwo commented Oct 24, 2024

There are some critical bug fixes that need to be released. I will merge this after we create a new release.

@Xuanwo Xuanwo merged commit 6cec7b4 into apache:main Nov 4, 2024
235 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

idea: kv adapter's scan should return an iterator or stream instead
2 participants