Skip to content

Commit

Permalink
Merge branch 'master' into limbooverlambda/fix-batch-put-ttl-issue
Browse files Browse the repository at this point in the history
Signed-off-by: limbooverlambda <[email protected]>
  • Loading branch information
limbooverlambda committed Jun 17, 2024
2 parents f9424cb + 60e0c54 commit d751fcc
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 198 deletions.
156 changes: 50 additions & 106 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,30 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use async_recursion::async_recursion;
use core::ops::Range;
use std::str::FromStr;
use std::sync::Arc;
use std::u32;

use futures::StreamExt;
use log::debug;
use tokio::sync::Semaphore;
use tokio::time::sleep;

use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::common::Error;
use crate::config::Config;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse};
use crate::proto::metapb;
use crate::raw::lowering::*;
use crate::request::Collect;
use crate::request::CollectSingle;
use crate::request::EncodeKeyspace;
use crate::request::KeyMode;
use crate::request::Keyspace;
use crate::request::Plan;
use crate::request::TruncateKeyspace;
use crate::request::{plan, Collect};
use crate::store::{HasRegionError, RegionStore};
use crate::Backoff;
use crate::BoundRange;
use crate::ColumnFamily;
use crate::Error::RegionError;
use crate::Key;
use crate::KvPair;
use crate::Result;
Expand Down Expand Up @@ -761,42 +756,57 @@ impl<PdC: PdClient> Client<PdC> {
max_limit: MAX_RAW_KV_SCAN_LIMIT,
});
}
let backoff = DEFAULT_STORE_BACKOFF;
let permits = Arc::new(Semaphore::new(16));
let range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);

let mut cur_range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
let mut result = Vec::new();
let mut current_limit = limit;
let (start_key, end_key) = range.clone().into_keys();
let mut current_key: Option<Key> = Some(start_key);
while current_limit > 0 {
let scan_args = ScanInnerArgs {
start_key: current_key.clone(),
range: range.clone(),
limit,
let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed();
let mut region_store =
scan_regions
.next()
.await
.ok_or(Error::RegionForRangeNotFound {
range: (cur_range.clone()),
})??;
let mut cur_limit = limit;

while cur_limit > 0 {
let request = new_raw_scan_request(
cur_range.clone(),
cur_limit,
key_only,
reverse,
permits: permits.clone(),
backoff: backoff.clone(),
};
let (res, next_key) = self.retryable_scan(scan_args).await?;

let mut kvs = res
.map(|r| r.kvs.into_iter().map(Into::into).collect::<Vec<KvPair>>())
.unwrap_or(Vec::new());

if !kvs.is_empty() {
current_limit -= kvs.len() as u32;
result.append(&mut kvs);
}
if end_key
.as_ref()
.map(|ek| ek <= next_key.as_ref() && !ek.is_empty())
.unwrap_or(false)
|| next_key.is_empty()
{
break;
self.cf.clone(),
);
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.single_region_with_store(region_store.clone())
.await?
.plan()
.execute()
.await?;
let mut region_scan_res = resp
.kvs
.into_iter()
.map(Into::into)
.collect::<Vec<KvPair>>();
let res_len = region_scan_res.len();
result.append(&mut region_scan_res);

// if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region
if res_len < cur_limit as usize {
region_store = match scan_regions.next().await {
Some(Ok(rs)) => {
cur_range = BoundRange::new(
std::ops::Bound::Included(region_store.region_with_leader.range().1),
cur_range.to,
);
rs
}
Some(Err(e)) => return Err(e),
None => break,
};
cur_limit -= res_len as u32;
} else {
current_key = Some(next_key);
break;
}
}

Expand All @@ -809,61 +819,6 @@ impl<PdC: PdClient> Client<PdC> {
Ok(result)
}

#[async_recursion]
async fn retryable_scan(
&self,
mut scan_args: ScanInnerArgs,
) -> Result<(Option<RawScanResponse>, Key)> {
let start_key = match scan_args.start_key {
None => return Ok((None, Key::EMPTY)),
Some(ref sk) => sk,
};
let permit = scan_args.permits.acquire().await.unwrap();
let region = self.rpc.clone().region_for_key(start_key).await?;
let store = self.rpc.clone().store_for_id(region.id()).await?;
let request = new_raw_scan_request(
scan_args.range.clone(),
scan_args.limit,
scan_args.key_only,
scan_args.reverse,
self.cf.clone(),
);
let resp = self.do_store_scan(store.clone(), request).await;
drop(permit);
match resp {
Ok(mut r) => {
if let Some(err) = r.region_error() {
let status =
plan::handle_region_error(self.rpc.clone(), err.clone(), store.clone())
.await?;
return if status {
self.retryable_scan(scan_args.clone()).await
} else if let Some(duration) = scan_args.backoff.next_delay_duration() {
sleep(duration).await;
self.retryable_scan(scan_args.clone()).await
} else {
Err(RegionError(Box::new(err)))
};
}
Ok((Some(r), region.end_key()))
}
Err(err) => Err(err),
}
}

async fn do_store_scan(
&self,
store: RegionStore,
scan_request: RawScanRequest,
) -> Result<RawScanResponse> {
crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, scan_request)
.single_region_with_store(store.clone())
.await?
.plan()
.execute()
.await
}

async fn batch_scan_inner(
&self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
Expand Down Expand Up @@ -910,17 +865,6 @@ impl<PdC: PdClient> Client<PdC> {
}
}

#[derive(Clone)]
struct ScanInnerArgs {
start_key: Option<Key>,
range: BoundRange,
limit: u32,
key_only: bool,
reverse: bool,
permits: Arc<Semaphore>,
backoff: Backoff,
}

#[cfg(test)]
mod tests {
use std::any::Any;
Expand Down
Loading

0 comments on commit d751fcc

Please sign in to comment.