Skip to content

Commit

Permalink
fix raw scan (tikv#409)
Browse files Browse the repository at this point in the history
* fix raw scan

Signed-off-by: Smityz <[email protected]>

* fix

Signed-off-by: Smityz <[email protected]>

---------

Signed-off-by: Smityz <[email protected]>
  • Loading branch information
Smityz authored Jul 22, 2023
1 parent 8b3ada2 commit abf22ba
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 18 deletions.
4 changes: 4 additions & 0 deletions src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::result;

use thiserror::Error;

use crate::BoundRange;

/// An error originating from the TiKV client or dependencies.
#[derive(Debug, Error)]
#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -80,6 +82,8 @@ pub enum Error {
/// No region is found for the given key.
#[error("Region is not found for key: {:?}", key)]
RegionForKeyNotFound { key: Vec<u8> },
#[error("Region is not found for range: {:?}", range)]
RegionForRangeNotFound { range: BoundRange },
/// No region is found for the given id. note: distinguish it with the RegionNotFound error in errorpb.
#[error("Region {} is not found in the response", region_id)]
RegionNotFoundInResponse { region_id: u64 },
Expand Down
6 changes: 3 additions & 3 deletions src/kv/bound_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ use crate::proto::kvrpcpb;
#[derive(Clone, Debug, Eq, PartialEq)]
#[cfg_attr(test, derive(Arbitrary))]
pub struct BoundRange {
from: Bound<Key>,
to: Bound<Key>,
pub from: Bound<Key>,
pub to: Bound<Key>,
}

impl BoundRange {
/// Create a new BoundRange.
///
/// The caller must ensure that `from` is not `Unbounded`.
fn new(from: Bound<Key>, to: Bound<Key>) -> BoundRange {
pub fn new(from: Bound<Key>, to: Bound<Key>) -> BoundRange {
BoundRange { from, to }
}

Expand Down
60 changes: 49 additions & 11 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::u32;

use futures::StreamExt;
use log::debug;

use crate::backoff::DEFAULT_REGION_BACKOFF;
Expand Down Expand Up @@ -591,17 +592,54 @@ impl<PdC: PdClient> Client<PdC> {
max_limit: MAX_RAW_KV_SCAN_LIMIT,
});
}

let request = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(self.backoff.clone())
.merge(Collect)
.plan();
let res = plan.execute().await;
res.map(|mut s| {
s.truncate(limit as usize);
s
})
let mut result = Vec::new();
let mut cur_range = range.into();
let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed();
let mut region_store =
scan_regions
.next()
.await
.ok_or(Error::RegionForRangeNotFound {
range: (cur_range.clone()),
})??;
let mut cur_limit = limit;
while cur_limit > 0 {
let request =
new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone());
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region_with_store(region_store.clone())
.await?
.plan()
.execute()
.await?;
let mut region_scan_res = resp
.kvs
.into_iter()
.map(Into::into)
.collect::<Vec<KvPair>>();
let res_len = region_scan_res.len();
result.append(&mut region_scan_res);
// if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region
if res_len < cur_limit as usize {
region_store = match scan_regions.next().await {
Some(Ok(rs)) => {
cur_range = BoundRange::new(
std::ops::Bound::Included(region_store.region_with_leader.range().1),
cur_range.to,
);
rs
}
Some(Err(e)) => return Err(e),
None => return Ok(result),
};
cur_limit -= res_len as u32;
} else {
break;
}
}
// limit is a soft limit, so we need check the number of results
result.truncate(limit as usize);
Ok(result)
}

async fn batch_scan_inner(
Expand Down
96 changes: 92 additions & 4 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,98 @@ async fn raw_write_million() -> Result<()> {
assert_eq!(res.len(), 2usize.pow(NUM_BITS_KEY_PER_TXN));
}

// test scan
let limit = 10;
let res = client.scan(vec![].., limit).await?;
assert_eq!(res.len(), limit as usize);
// test scan, key range from [0,0,0,0] to [255.0.0.0]
let mut limit = 2000;
let mut r = client.scan(.., limit).await?;
assert_eq!(r.len(), 256);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8);
}
r = client.scan(vec![100, 0, 0, 0].., limit).await?;
assert_eq!(r.len(), 156);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 100);
}
r = client
.scan(vec![5, 0, 0, 0]..vec![200, 0, 0, 0], limit)
.await?;
assert_eq!(r.len(), 195);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan(vec![5, 0, 0, 0]..=vec![200, 0, 0, 0], limit)
.await?;
assert_eq!(r.len(), 196);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan(vec![5, 0, 0, 0]..=vec![255, 10, 0, 0], limit)
.await?;
assert_eq!(r.len(), 251);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan(vec![255, 1, 0, 0]..=vec![255, 10, 0, 0], limit)
.await?;
assert_eq!(r.len(), 0);
r = client.scan(..vec![0, 0, 0, 0], limit).await?;
assert_eq!(r.len(), 0);

limit = 3;
let mut r = client.scan(.., limit).await?;
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8);
}
r = client.scan(vec![100, 0, 0, 0].., limit).await?;
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 100);
}
r = client
.scan(vec![5, 0, 0, 0]..vec![200, 0, 0, 0], limit)
.await?;
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan(vec![5, 0, 0, 0]..=vec![200, 0, 0, 0], limit)
.await?;
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan(vec![5, 0, 0, 0]..=vec![255, 10, 0, 0], limit)
.await?;
assert_eq!(r.len(), limit as usize);
for (i, val) in r.iter().enumerate() {
let k: Vec<u8> = val.0.clone().into();
assert_eq!(k[0], i as u8 + 5);
}
r = client
.scan(vec![255, 1, 0, 0]..=vec![255, 10, 0, 0], limit)
.await?;
assert_eq!(r.len(), 0);
r = client.scan(..vec![0, 0, 0, 0], limit).await?;
assert_eq!(r.len(), 0);

limit = 0;
r = client.scan(.., limit).await?;
assert_eq!(r.len(), limit as usize);

// test batch_scan
for batch_num in 1..4 {
Expand Down

0 comments on commit abf22ba

Please sign in to comment.