From 5a8a3c4cccffbe87bfeb866a982f5b2eaee04736 Mon Sep 17 00:00:00 2001 From: limbooverlambda Date: Thu, 22 Aug 2024 00:06:46 -0700 Subject: [PATCH] adding retryable to scan (#456) Co-authored-by: Ping Yu --- src/raw/client.rs | 147 ++++++++++++++++++++++------------ src/request/plan.rs | 189 +++++++++++++++++++++----------------------- 2 files changed, 188 insertions(+), 148 deletions(-) diff --git a/src/raw/client.rs b/src/raw/client.rs index 76d40b65..620531ee 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -1,29 +1,32 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. - use core::ops::Range; + use std::str::FromStr; use std::sync::Arc; -use futures::StreamExt; use log::debug; +use tokio::time::sleep; -use crate::backoff::DEFAULT_REGION_BACKOFF; +use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF}; use crate::common::Error; use crate::config::Config; use crate::pd::PdClient; use crate::pd::PdRpcClient; +use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse}; use crate::proto::metapb; use crate::raw::lowering::*; -use crate::request::Collect; use crate::request::CollectSingle; use crate::request::EncodeKeyspace; use crate::request::KeyMode; use crate::request::Keyspace; use crate::request::Plan; use crate::request::TruncateKeyspace; +use crate::request::{plan, Collect}; +use crate::store::{HasRegionError, RegionStore}; use crate::Backoff; use crate::BoundRange; use crate::ColumnFamily; +use crate::Error::RegionError; use crate::Key; use crate::KvPair; use crate::Result; @@ -755,57 +758,37 @@ impl Client { max_limit: MAX_RAW_KV_SCAN_LIMIT, }); } - - let mut cur_range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw); + let backoff = DEFAULT_STORE_BACKOFF; + let mut range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw); let mut result = Vec::new(); - let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed(); - let mut region_store = - scan_regions - .next() - .await - .ok_or(Error::RegionForRangeNotFound { - range: (cur_range.clone()), - })??; - let mut cur_limit = limit; - - while cur_limit > 0 { - let request = new_raw_scan_request( - cur_range.clone(), - cur_limit, + let mut current_limit = limit; + let (start_key, end_key) = range.clone().into_keys(); + let mut current_key: Key = start_key; + + while current_limit > 0 { + let scan_args = ScanInnerArgs { + start_key: current_key.clone(), + end_key: end_key.clone(), + limit: current_limit, key_only, reverse, - self.cf.clone(), - ); - let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) - .single_region_with_store(region_store.clone()) - .await? - .plan() - .execute() - .await?; - let mut region_scan_res = resp - .kvs - .into_iter() - .map(Into::into) - .collect::>(); - let res_len = region_scan_res.len(); - result.append(&mut region_scan_res); - - // if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region - if res_len < cur_limit as usize { - region_store = match scan_regions.next().await { - Some(Ok(rs)) => { - cur_range = BoundRange::new( - std::ops::Bound::Included(region_store.region_with_leader.range().1), - cur_range.to, - ); - rs - } - Some(Err(e)) => return Err(e), - None => break, - }; - cur_limit -= res_len as u32; - } else { + backoff: backoff.clone(), + }; + let (res, next_key) = self.retryable_scan(scan_args).await?; + + let mut kvs = res + .map(|r| r.kvs.into_iter().map(Into::into).collect::>()) + .unwrap_or(Vec::new()); + + if !kvs.is_empty() { + current_limit -= kvs.len() as u32; + result.append(&mut kvs); + } + if end_key.clone().is_some_and(|ek| ek <= next_key) { break; + } else { + current_key = next_key; + range = BoundRange::new(std::ops::Bound::Included(current_key.clone()), range.to); } } @@ -818,6 +801,58 @@ impl Client { Ok(result) } + async fn retryable_scan( + &self, + mut scan_args: ScanInnerArgs, + ) -> Result<(Option, Key)> { + let start_key = scan_args.start_key; + let end_key = scan_args.end_key; + loop { + let region = self.rpc.clone().region_for_key(&start_key).await?; + let store = self.rpc.clone().store_for_id(region.id()).await?; + let request = new_raw_scan_request( + (start_key.clone(), end_key.clone()).into(), + scan_args.limit, + scan_args.key_only, + scan_args.reverse, + self.cf.clone(), + ); + let resp = self.do_store_scan(store.clone(), request.clone()).await; + return match resp { + Ok(mut r) => { + if let Some(err) = r.region_error() { + let status = + plan::handle_region_error(self.rpc.clone(), err.clone(), store.clone()) + .await?; + if status { + continue; + } else if let Some(duration) = scan_args.backoff.next_delay_duration() { + sleep(duration).await; + continue; + } else { + return Err(RegionError(Box::new(err))); + } + } + Ok((Some(r), region.end_key())) + } + Err(err) => Err(err), + }; + } + } + + async fn do_store_scan( + &self, + store: RegionStore, + scan_request: RawScanRequest, + ) -> Result { + crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, scan_request) + .single_region_with_store(store.clone()) + .await? + .plan() + .execute() + .await + } + async fn batch_scan_inner( &self, ranges: impl IntoIterator>, @@ -864,6 +899,16 @@ impl Client { } } +#[derive(Clone)] +struct ScanInnerArgs { + start_key: Key, + end_key: Option, + limit: u32, + key_only: bool, + reverse: bool, + backoff: Backoff, +} + #[cfg(test)] mod tests { use std::any::Any; diff --git a/src/request/plan.rs b/src/request/plan.rs index 369a2ff1..ffff6c24 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -187,7 +187,7 @@ where match backoff.next_delay_duration() { Some(duration) => { let region_error_resolved = - Self::handle_region_error(pd_client.clone(), e, region_store).await?; + handle_region_error(pd_client.clone(), e, region_store).await?; // don't sleep if we have resolved the region error if !region_error_resolved { sleep(duration).await; @@ -208,102 +208,6 @@ where } } - // Returns - // 1. Ok(true): error has been resolved, retry immediately - // 2. Ok(false): backoff, and then retry - // 3. Err(Error): can't be resolved, return the error to upper level - async fn handle_region_error( - pd_client: Arc, - e: errorpb::Error, - region_store: RegionStore, - ) -> Result { - let ver_id = region_store.region_with_leader.ver_id(); - if let Some(not_leader) = e.not_leader { - if let Some(leader) = not_leader.leader { - match pd_client - .update_leader(region_store.region_with_leader.ver_id(), leader) - .await - { - Ok(_) => Ok(true), - Err(e) => { - pd_client.invalidate_region_cache(ver_id).await; - Err(e) - } - } - } else { - // The peer doesn't know who is the current leader. Generally it's because - // the Raft group is in an election, but it's possible that the peer is - // isolated and removed from the Raft group. So it's necessary to reload - // the region from PD. - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } - } else if e.store_not_match.is_some() { - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } else if e.epoch_not_match.is_some() { - Self::on_region_epoch_not_match( - pd_client.clone(), - region_store, - e.epoch_not_match.unwrap(), - ) - .await - } else if e.stale_command.is_some() || e.region_not_found.is_some() { - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } else if e.server_is_busy.is_some() - || e.raft_entry_too_large.is_some() - || e.max_timestamp_not_synced.is_some() - { - Err(Error::RegionError(Box::new(e))) - } else { - // TODO: pass the logger around - // info!("unknwon region error: {:?}", e); - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } - } - - // Returns - // 1. Ok(true): error has been resolved, retry immediately - // 2. Ok(false): backoff, and then retry - // 3. Err(Error): can't be resolved, return the error to upper level - async fn on_region_epoch_not_match( - pd_client: Arc, - region_store: RegionStore, - error: EpochNotMatch, - ) -> Result { - let ver_id = region_store.region_with_leader.ver_id(); - if error.current_regions.is_empty() { - pd_client.invalidate_region_cache(ver_id).await; - return Ok(true); - } - - for r in error.current_regions { - if r.id == region_store.region_with_leader.id() { - let region_epoch = r.region_epoch.unwrap(); - let returned_conf_ver = region_epoch.conf_ver; - let returned_version = region_epoch.version; - let current_region_epoch = region_store - .region_with_leader - .region - .region_epoch - .clone() - .unwrap(); - let current_conf_ver = current_region_epoch.conf_ver; - let current_version = current_region_epoch.version; - - // Find whether the current region is ahead of TiKV's. If so, backoff. - if returned_conf_ver < current_conf_ver || returned_version < current_version { - return Ok(false); - } - } - } - // TODO: finer grained processing - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } - async fn handle_grpc_error( pd_client: Arc, plan: P, @@ -333,6 +237,97 @@ where } } +// Returns +// 1. Ok(true): error has been resolved, retry immediately +// 2. Ok(false): backoff, and then retry +// 3. Err(Error): can't be resolved, return the error to upper level +pub(crate) async fn handle_region_error( + pd_client: Arc, + e: errorpb::Error, + region_store: RegionStore, +) -> Result { + let ver_id = region_store.region_with_leader.ver_id(); + if let Some(not_leader) = e.not_leader { + if let Some(leader) = not_leader.leader { + match pd_client + .update_leader(region_store.region_with_leader.ver_id(), leader) + .await + { + Ok(_) => Ok(true), + Err(e) => { + pd_client.invalidate_region_cache(ver_id).await; + Err(e) + } + } + } else { + // The peer doesn't know who is the current leader. Generally it's because + // the Raft group is in an election, but it's possible that the peer is + // isolated and removed from the Raft group. So it's necessary to reload + // the region from PD. + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } + } else if e.store_not_match.is_some() { + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } else if e.epoch_not_match.is_some() { + on_region_epoch_not_match(pd_client.clone(), region_store, e.epoch_not_match.unwrap()).await + } else if e.stale_command.is_some() || e.region_not_found.is_some() { + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } else if e.server_is_busy.is_some() + || e.raft_entry_too_large.is_some() + || e.max_timestamp_not_synced.is_some() + { + Err(Error::RegionError(Box::new(e))) + } else { + // TODO: pass the logger around + // info!("unknwon region error: {:?}", e); + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } +} + +// Returns +// 1. Ok(true): error has been resolved, retry immediately +// 2. Ok(false): backoff, and then retry +// 3. Err(Error): can't be resolved, return the error to upper level +pub(crate) async fn on_region_epoch_not_match( + pd_client: Arc, + region_store: RegionStore, + error: EpochNotMatch, +) -> Result { + let ver_id = region_store.region_with_leader.ver_id(); + if error.current_regions.is_empty() { + pd_client.invalidate_region_cache(ver_id).await; + return Ok(true); + } + + for r in error.current_regions { + if r.id == region_store.region_with_leader.id() { + let region_epoch = r.region_epoch.unwrap(); + let returned_conf_ver = region_epoch.conf_ver; + let returned_version = region_epoch.version; + let current_region_epoch = region_store + .region_with_leader + .region + .region_epoch + .clone() + .unwrap(); + let current_conf_ver = current_region_epoch.conf_ver; + let current_version = current_region_epoch.version; + + // Find whether the current region is ahead of TiKV's. If so, backoff. + if returned_conf_ver < current_conf_ver || returned_version < current_version { + return Ok(false); + } + } + } + // TODO: finer grained processing + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) +} + impl Clone for RetryableMultiRegion { fn clone(&self) -> Self { RetryableMultiRegion {