Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#15191
Browse files Browse the repository at this point in the history
ref tikv/pd#6556, close tikv#15184

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
nolouch authored and ti-chi-bot committed Jul 28, 2023
1 parent 8577e53 commit c885d5e
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 19 deletions.
18 changes: 11 additions & 7 deletions components/pd_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ impl fmt::Debug for RpcClient {
}

const LEADER_CHANGE_RETRY: usize = 10;
// periodic request like store_heartbeat, we don't need to retry.
const NO_RETRY: usize = 1;

impl PdClient for RpcClient {
fn store_global_config(
Expand Down Expand Up @@ -818,10 +820,14 @@ impl PdClient for RpcClient {
})
};
Box::pin(async move {
let resp = handler.await?;
PD_REQUEST_HISTOGRAM_VEC
.store_heartbeat
.observe(timer.saturating_elapsed_secs());
let resp = handler
.map(|res| {
PD_REQUEST_HISTOGRAM_VEC
.store_heartbeat
.observe(timer.saturating_elapsed_secs());
res
})
.await?;
check_resp_header(resp.get_header())?;
match feature_gate.set_version(resp.get_cluster_version()) {
Err(_) => warn!("invalid cluster version: {}", resp.get_cluster_version()),
Expand All @@ -832,9 +838,7 @@ impl PdClient for RpcClient {
}) as PdFuture<_>
};

self.pd_client
.request(req, executor, LEADER_CHANGE_RETRY)
.execute()
self.pd_client.request(req, executor, NO_RETRY).execute()
}

fn report_batch_split(&self, regions: Vec<metapb::Region>) -> PdFuture<()> {
Expand Down
5 changes: 5 additions & 0 deletions components/pd_client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ make_static_metric! {
is_recovering_marked,
store_heartbeat,
tso,
<<<<<<< HEAD
=======
scan_regions,
get_members,
>>>>>>> e8238777ea (pd_client: reduce store heartbeat retires to prevent heartbeat storm (#15191))

meta_storage_put,
meta_storage_get,
Expand Down
32 changes: 20 additions & 12 deletions components/pd_client/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl Client {
F: FnMut(&Client, Req) -> PdFuture<Resp> + Send + 'static,
{
Request {
remain_reconnect_count: retry,
remain_request_count: retry,
request_sent: 0,
client: self.clone(),
req,
Expand Down Expand Up @@ -404,7 +404,7 @@ impl Client {

/// The context of sending requets.
pub struct Request<Req, F> {
remain_reconnect_count: usize,
remain_request_count: usize,
request_sent: usize,
client: Arc<Client>,
req: Req,
Expand All @@ -419,15 +419,11 @@ where
F: FnMut(&Client, Req) -> PdFuture<Resp> + Send + 'static,
{
async fn reconnect_if_needed(&mut self) -> Result<()> {
debug!("reconnecting ..."; "remain" => self.remain_reconnect_count);
if self.request_sent < MAX_REQUEST_COUNT {
debug!("reconnecting ..."; "remain" => self.remain_request_count);
if self.request_sent < MAX_REQUEST_COUNT && self.request_sent < self.remain_request_count {
return Ok(());
}
if self.remain_reconnect_count == 0 {
return Err(box_err!("request retry exceeds limit"));
}
// Updating client.
self.remain_reconnect_count -= 1;
// FIXME: should not block the core.
debug!("(re)connecting PD client");
match self.client.reconnect(true).await {
Expand All @@ -447,18 +443,22 @@ where
}

async fn send_and_receive(&mut self) -> Result<Resp> {
if self.remain_request_count == 0 {
return Err(box_err!("request retry exceeds limit"));
}
self.request_sent += 1;
self.remain_request_count -= 1;
debug!("request sent: {}", self.request_sent);
let r = self.req.clone();
(self.func)(&self.client, r).await
}

fn should_not_retry(resp: &Result<Resp>) -> bool {
fn should_not_retry(&self, resp: &Result<Resp>) -> bool {
match resp {
Ok(_) => true,
Err(err) => {
// these errors are not caused by network, no need to retry
if err.retryable() {
if err.retryable() && self.remain_request_count > 0 {
error!(?*err; "request failed, retry");
false
} else {
Expand All @@ -475,7 +475,7 @@ where
loop {
{
let resp = self.send_and_receive().await;
if Self::should_not_retry(&resp) {
if self.should_not_retry(&resp) {
return resp;
}
}
Expand Down Expand Up @@ -621,10 +621,14 @@ impl PdConnector {
});
let client = PdClientStub::new(channel.clone());
let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT));
let timer = Instant::now();
let response = client
.get_members_async_opt(&GetMembersRequest::default(), option)
.unwrap_or_else(|e| panic!("fail to request PD {} err {:?}", "get_members", e))
.await;
PD_REQUEST_HISTOGRAM_VEC
.get_members
.observe(timer.saturating_elapsed_secs());
match response {
Ok(resp) => Ok((client, resp)),
Err(e) => Err(Error::Grpc(e)),
Expand Down Expand Up @@ -789,7 +793,7 @@ impl PdConnector {
Ok((None, has_network_error))
}

pub async fn reconnect_leader(
async fn reconnect_leader(
&self,
leader: &Member,
) -> Result<(Option<(PdClientStub, String)>, bool)> {
Expand Down Expand Up @@ -835,6 +839,7 @@ impl PdConnector {
let client_urls = leader.get_client_urls();
for leader_url in client_urls {
let target = TargetInfo::new(leader_url.clone(), &ep);
let timer = Instant::now();
let response = client
.get_members_async_opt(
&GetMembersRequest::default(),
Expand All @@ -846,6 +851,9 @@ impl PdConnector {
panic!("fail to request PD {} err {:?}", "get_members", e)
})
.await;
PD_REQUEST_HISTOGRAM_VEC
.get_members
.observe(timer.saturating_elapsed_secs());
match response {
Ok(_) => return Ok(Some((client, target))),
Err(_) => continue,
Expand Down

0 comments on commit c885d5e

Please sign in to comment.