diff --git a/components/pd_client/src/client_v2.rs b/components/pd_client/src/client_v2.rs index 11224ad894e..758f0716637 100644 --- a/components/pd_client/src/client_v2.rs +++ b/components/pd_client/src/client_v2.rs @@ -117,7 +117,7 @@ impl RawClient { /// Returns Ok(true) when a new connection is established. async fn maybe_reconnect(&mut self, ctx: &ConnectContext, force: bool) -> Result { - PD_RECONNECT_COUNTER_VEC.with_label_values(&["try"]).inc(); + PD_RECONNECT_COUNTER_VEC.try_connect.inc(); let start = Instant::now(); let members = self.members.clone(); @@ -135,21 +135,15 @@ impl RawClient { .await { Err(e) => { - PD_RECONNECT_COUNTER_VEC - .with_label_values(&["failure"]) - .inc(); + PD_RECONNECT_COUNTER_VEC.failure.inc(); return Err(e); } Ok(None) => { - PD_RECONNECT_COUNTER_VEC - .with_label_values(&["no-need"]) - .inc(); + PD_RECONNECT_COUNTER_VEC.no_need.inc(); return Ok(false); } Ok(Some(tuple)) => { - PD_RECONNECT_COUNTER_VEC - .with_label_values(&["success"]) - .inc(); + PD_RECONNECT_COUNTER_VEC.success.inc(); tuple } }; diff --git a/components/pd_client/src/metrics.rs b/components/pd_client/src/metrics.rs index e1f1100444a..cb66474a345 100644 --- a/components/pd_client/src/metrics.rs +++ b/components/pd_client/src/metrics.rs @@ -2,7 +2,7 @@ use lazy_static::lazy_static; use prometheus::*; -use prometheus_static_metric::{make_static_metric, register_static_histogram_vec}; +use prometheus_static_metric::*; make_static_metric! { pub label_enum PDRequestEventType { @@ -38,9 +38,20 @@ make_static_metric! { meta_storage_watch, } + pub label_enum PDReconnectEventKind { + success, + failure, + no_need, + cancel, + try_connect, + } + pub struct PDRequestEventHistogramVec: Histogram { "type" => PDRequestEventType, } + pub struct PDReconnectEventCounterVec: IntCounter { + "type" => PDReconnectEventKind, + } } lazy_static! { @@ -64,12 +75,14 @@ lazy_static! { &["type"] ) .unwrap(); - pub static ref PD_RECONNECT_COUNTER_VEC: IntCounterVec = register_int_counter_vec!( - "tikv_pd_reconnect_total", - "Total number of PD reconnections.", - &["type"] - ) - .unwrap(); + pub static ref PD_RECONNECT_COUNTER_VEC: PDReconnectEventCounterVec = + register_static_int_counter_vec!( + PDReconnectEventCounterVec, + "tikv_pd_reconnect_total", + "Total number of PD reconnections.", + &["type"] + ) + .unwrap(); pub static ref PD_PENDING_HEARTBEAT_GAUGE: IntGauge = register_int_gauge!( "tikv_pd_pending_heartbeat_total", "Total number of pending region heartbeat" diff --git a/components/pd_client/src/util.rs b/components/pd_client/src/util.rs index f3a8451f321..77319e086b2 100644 --- a/components/pd_client/src/util.rs +++ b/components/pd_client/src/util.rs @@ -47,6 +47,7 @@ const MAX_RETRY_TIMES: u64 = 5; // The max duration when retrying to connect to leader. No matter if the // MAX_RETRY_TIMES is reached. const MAX_RETRY_DURATION: Duration = Duration::from_secs(10); +const MAX_BACKOFF: Duration = Duration::from_secs(3); // FIXME: Use a request-independent way to handle reconnection. const GLOBAL_RECONNECT_INTERVAL: Duration = Duration::from_millis(100); // 0.1s @@ -108,6 +109,7 @@ pub struct Inner { pub meta_storage: MetaStorageStub, last_try_reconnect: Instant, + bo: ExponentialBackoff, } impl Inner { @@ -201,6 +203,7 @@ impl Client { pending_heartbeat: Arc::default(), pending_buckets: Arc::default(), last_try_reconnect: Instant::now(), + bo: ExponentialBackoff::new(retry_interval), tso, meta_storage, }), @@ -328,18 +331,20 @@ impl Client { /// Note: Retrying too quickly will return an error due to cancellation. /// Please always try to reconnect after sending the request first. pub async fn reconnect(&self, force: bool) -> Result<()> { - PD_RECONNECT_COUNTER_VEC.with_label_values(&["try"]).inc(); + PD_RECONNECT_COUNTER_VEC.try_connect.inc(); let start = Instant::now(); let future = { let inner = self.inner.rl(); +<<<<<<< HEAD if start.saturating_duration_since(inner.last_try_reconnect) < GLOBAL_RECONNECT_INTERVAL { +======= + if start.saturating_duration_since(inner.last_try_reconnect) < inner.bo.get_interval() { +>>>>>>> 4b3e33e6c2 (pd_client: add backoff for the reconnect retries (#15429)) // Avoid unnecessary updating. // Prevent a large number of reconnections in a short time. - PD_RECONNECT_COUNTER_VEC - .with_label_values(&["cancel"]) - .inc(); + PD_RECONNECT_COUNTER_VEC.cancel.inc(); return Err(box_err!("cancel reconnection due to too small interval")); } let connector = PdConnector::new(inner.env.clone(), inner.security_mgr.clone()); @@ -360,37 +365,43 @@ impl Client { { let mut inner = self.inner.wl(); +<<<<<<< HEAD if start.saturating_duration_since(inner.last_try_reconnect) < GLOBAL_RECONNECT_INTERVAL { +======= + if start.saturating_duration_since(inner.last_try_reconnect) < inner.bo.get_interval() { +>>>>>>> 4b3e33e6c2 (pd_client: add backoff for the reconnect retries (#15429)) // There may be multiple reconnections that pass the read lock at the same time. // Check again in the write lock to avoid unnecessary updating. - PD_RECONNECT_COUNTER_VEC - .with_label_values(&["cancel"]) - .inc(); + PD_RECONNECT_COUNTER_VEC.cancel.inc(); return Err(box_err!("cancel reconnection due to too small interval")); } inner.last_try_reconnect = start; + inner.bo.next_backoff(); } slow_log!(start.saturating_elapsed(), "try reconnect pd"); let (client, target_info, members, tso) = match future.await { Err(e) => { - PD_RECONNECT_COUNTER_VEC - .with_label_values(&["failure"]) - .inc(); + PD_RECONNECT_COUNTER_VEC.failure.inc(); return Err(e); } - Ok(None) => { - PD_RECONNECT_COUNTER_VEC - .with_label_values(&["no-need"]) - .inc(); - return Ok(()); - } - Ok(Some(tuple)) => { - PD_RECONNECT_COUNTER_VEC - .with_label_values(&["success"]) - .inc(); - tuple + Ok(res) => { + // Reset the retry count. + { + let mut inner = self.inner.wl(); + inner.bo.reset() + } + match res { + None => { + PD_RECONNECT_COUNTER_VEC.no_need.inc(); + return Ok(()); + } + Some(tuple) => { + PD_RECONNECT_COUNTER_VEC.success.inc(); + tuple + } + } } }; @@ -859,6 +870,33 @@ impl PdConnector { } } +/// Simple backoff strategy. +struct ExponentialBackoff { + base: Duration, + interval: Duration, +} + +impl ExponentialBackoff { + pub fn new(base: Duration) -> Self { + Self { + base, + interval: base, + } + } + pub fn next_backoff(&mut self) -> Duration { + self.interval = std::cmp::min(self.interval * 2, MAX_BACKOFF); + self.interval + } + + pub fn get_interval(&self) -> Duration { + self.interval + } + + pub fn reset(&mut self) { + self.interval = self.base; + } +} + pub fn trim_http_prefix(s: &str) -> &str { s.trim_start_matches("http://") .trim_start_matches("https://") @@ -1004,8 +1042,11 @@ pub fn merge_bucket_stats, I: AsRef<[u8]>>( mod test { use kvproto::metapb::BucketStats; + use super::*; use crate::{merge_bucket_stats, util::find_bucket_index}; + const BASE_BACKOFF: Duration = Duration::from_millis(100); + #[test] fn test_merge_bucket_stats() { #[allow(clippy::type_complexity)] @@ -1121,4 +1162,23 @@ mod test { assert_eq!(find_bucket_index(b"k7", &keys), Some(4)); assert_eq!(find_bucket_index(b"k8", &keys), Some(4)); } + + #[test] + fn test_exponential_backoff() { + let mut backoff = ExponentialBackoff::new(BASE_BACKOFF); + assert_eq!(backoff.get_interval(), BASE_BACKOFF); + + assert_eq!(backoff.next_backoff(), 2 * BASE_BACKOFF); + assert_eq!(backoff.next_backoff(), Duration::from_millis(400)); + assert_eq!(backoff.get_interval(), Duration::from_millis(400)); + + // Should not exceed MAX_BACKOFF + for _ in 0..20 { + backoff.next_backoff(); + } + assert_eq!(backoff.get_interval(), MAX_BACKOFF); + + backoff.reset(); + assert_eq!(backoff.get_interval(), BASE_BACKOFF); + } }