Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#15191
Browse files Browse the repository at this point in the history
ref tikv/pd#6556, close tikv#15184

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
nolouch authored and ti-chi-bot committed Jul 28, 2023
1 parent d374b3e commit c9e7ffd
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 15 deletions.
17 changes: 14 additions & 3 deletions components/pd_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ impl fmt::Debug for RpcClient {
}

const LEADER_CHANGE_RETRY: usize = 10;
// periodic request like store_heartbeat, we don't need to retry.
const NO_RETRY: usize = 1;

impl PdClient for RpcClient {
fn load_global_config(&self, list: Vec<String>) -> PdFuture<HashMap<String, String>> {
Expand Down Expand Up @@ -759,10 +761,21 @@ impl PdClient for RpcClient {
})
};
Box::pin(async move {
<<<<<<< HEAD
let resp = handler.await?;
PD_REQUEST_HISTOGRAM_VEC
.with_label_values(&["store_heartbeat"])
.observe(duration_to_sec(timer.saturating_elapsed()));
=======
let resp = handler
.map(|res| {
PD_REQUEST_HISTOGRAM_VEC
.store_heartbeat
.observe(timer.saturating_elapsed_secs());
res
})
.await?;
>>>>>>> e8238777ea (pd_client: reduce store heartbeat retires to prevent heartbeat storm (#15191))
check_resp_header(resp.get_header())?;
match feature_gate.set_version(resp.get_cluster_version()) {
Err(_) => warn!("invalid cluster version: {}", resp.get_cluster_version()),
Expand All @@ -773,9 +786,7 @@ impl PdClient for RpcClient {
}) as PdFuture<_>
};

self.pd_client
.request(req, executor, LEADER_CHANGE_RETRY)
.execute()
self.pd_client.request(req, executor, NO_RETRY).execute()
}

fn report_batch_split(&self, regions: Vec<metapb::Region>) -> PdFuture<()> {
Expand Down
45 changes: 45 additions & 0 deletions components/pd_client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,51 @@

use lazy_static::lazy_static;
use prometheus::*;
<<<<<<< HEAD
=======
use prometheus_static_metric::{make_static_metric, register_static_histogram_vec};

make_static_metric! {
pub label_enum PDRequestEventType {
get_region,
get_region_by_id,
get_region_leader_by_id,
scatter_region,
get_store,
get_store_async,
put_store,
get_all_stores,
get_store_and_stats,
store_global_config,
load_global_config,
watch_global_config,
bootstrap_cluster,
is_cluster_bootstrapped,
get_cluster_config,
ask_split,
ask_batch_split,
report_batch_split,
get_gc_safe_point,
update_service_safe_point,
min_resolved_ts,
get_operator,
alloc_id,
is_recovering_marked,
store_heartbeat,
tso,
scan_regions,
get_members,

meta_storage_put,
meta_storage_get,
meta_storage_watch,
}

pub struct PDRequestEventHistogramVec: Histogram {
"type" => PDRequestEventType,
}
}
>>>>>>> e8238777ea (pd_client: reduce store heartbeat retires to prevent heartbeat storm (#15191))

lazy_static! {
pub static ref PD_REQUEST_HISTOGRAM_VEC: HistogramVec = register_histogram_vec!(
Expand Down
32 changes: 20 additions & 12 deletions components/pd_client/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl Client {
F: FnMut(&Client, Req) -> PdFuture<Resp> + Send + 'static,
{
Request {
remain_reconnect_count: retry,
remain_request_count: retry,
request_sent: 0,
client: self.clone(),
req,
Expand Down Expand Up @@ -389,7 +389,7 @@ impl Client {

/// The context of sending requets.
pub struct Request<Req, F> {
remain_reconnect_count: usize,
remain_request_count: usize,
request_sent: usize,
client: Arc<Client>,
req: Req,
Expand All @@ -404,15 +404,11 @@ where
F: FnMut(&Client, Req) -> PdFuture<Resp> + Send + 'static,
{
async fn reconnect_if_needed(&mut self) -> Result<()> {
debug!("reconnecting ..."; "remain" => self.remain_reconnect_count);
if self.request_sent < MAX_REQUEST_COUNT {
debug!("reconnecting ..."; "remain" => self.remain_request_count);
if self.request_sent < MAX_REQUEST_COUNT && self.request_sent < self.remain_request_count {
return Ok(());
}
if self.remain_reconnect_count == 0 {
return Err(box_err!("request retry exceeds limit"));
}
// Updating client.
self.remain_reconnect_count -= 1;
// FIXME: should not block the core.
debug!("(re)connecting PD client");
match self.client.reconnect(true).await {
Expand All @@ -432,18 +428,22 @@ where
}

async fn send_and_receive(&mut self) -> Result<Resp> {
if self.remain_request_count == 0 {
return Err(box_err!("request retry exceeds limit"));
}
self.request_sent += 1;
self.remain_request_count -= 1;
debug!("request sent: {}", self.request_sent);
let r = self.req.clone();
(self.func)(&self.client, r).await
}

fn should_not_retry(resp: &Result<Resp>) -> bool {
fn should_not_retry(&self, resp: &Result<Resp>) -> bool {
match resp {
Ok(_) => true,
Err(err) => {
// these errors are not caused by network, no need to retry
if err.retryable() {
if err.retryable() && self.remain_request_count > 0 {
error!(?*err; "request failed, retry");
false
} else {
Expand All @@ -460,7 +460,7 @@ where
loop {
{
let resp = self.send_and_receive().await;
if Self::should_not_retry(&resp) {
if self.should_not_retry(&resp) {
return resp;
}
}
Expand Down Expand Up @@ -602,10 +602,14 @@ impl PdConnector {
});
let client = PdClientStub::new(channel);
let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT));
let timer = Instant::now();
let response = client
.get_members_async_opt(&GetMembersRequest::default(), option)
.unwrap_or_else(|e| panic!("fail to request PD {} err {:?}", "get_members", e))
.await;
PD_REQUEST_HISTOGRAM_VEC
.get_members
.observe(timer.saturating_elapsed_secs());
match response {
Ok(resp) => Ok((client, resp)),
Err(e) => Err(Error::Grpc(e)),
Expand Down Expand Up @@ -759,7 +763,7 @@ impl PdConnector {
Ok((None, has_network_error))
}

pub async fn reconnect_leader(
async fn reconnect_leader(
&self,
leader: &Member,
) -> Result<(Option<(PdClientStub, String)>, bool)> {
Expand Down Expand Up @@ -803,6 +807,7 @@ impl PdConnector {
let client_urls = leader.get_client_urls();
for leader_url in client_urls {
let target = TargetInfo::new(leader_url.clone(), &ep);
let timer = Instant::now();
let response = client
.get_members_async_opt(
&GetMembersRequest::default(),
Expand All @@ -814,6 +819,9 @@ impl PdConnector {
panic!("fail to request PD {} err {:?}", "get_members", e)
})
.await;
PD_REQUEST_HISTOGRAM_VEC
.get_members
.observe(timer.saturating_elapsed_secs());
match response {
Ok(_) => return Ok(Some((client, target))),
Err(_) => continue,
Expand Down

0 comments on commit c9e7ffd

Please sign in to comment.