Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#15429
Browse files Browse the repository at this point in the history
ref tikv/pd#6556, close tikv#15428

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
nolouch authored and ti-chi-bot committed Aug 30, 2023
1 parent f455905 commit 89a5439
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 38 deletions.
14 changes: 4 additions & 10 deletions components/pd_client/src/client_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl RawClient {

/// Returns Ok(true) when a new connection is established.
async fn maybe_reconnect(&mut self, ctx: &ConnectContext, force: bool) -> Result<bool> {
PD_RECONNECT_COUNTER_VEC.with_label_values(&["try"]).inc();
PD_RECONNECT_COUNTER_VEC.try_connect.inc();
let start = Instant::now();

let members = self.members.clone();
Expand All @@ -135,21 +135,15 @@ impl RawClient {
.await
{
Err(e) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["failure"])
.inc();
PD_RECONNECT_COUNTER_VEC.failure.inc();
return Err(e);
}
Ok(None) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["no-need"])
.inc();
PD_RECONNECT_COUNTER_VEC.no_need.inc();
return Ok(false);
}
Ok(Some(tuple)) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["success"])
.inc();
PD_RECONNECT_COUNTER_VEC.success.inc();
tuple
}
};
Expand Down
27 changes: 20 additions & 7 deletions components/pd_client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use lazy_static::lazy_static;
use prometheus::*;
use prometheus_static_metric::{make_static_metric, register_static_histogram_vec};
use prometheus_static_metric::*;

make_static_metric! {
pub label_enum PDRequestEventType {
Expand Down Expand Up @@ -38,9 +38,20 @@ make_static_metric! {
meta_storage_watch,
}

pub label_enum PDReconnectEventKind {
success,
failure,
no_need,
cancel,
try_connect,
}

pub struct PDRequestEventHistogramVec: Histogram {
"type" => PDRequestEventType,
}
pub struct PDReconnectEventCounterVec: IntCounter {
"type" => PDReconnectEventKind,
}
}

lazy_static! {
Expand All @@ -64,12 +75,14 @@ lazy_static! {
&["type"]
)
.unwrap();
pub static ref PD_RECONNECT_COUNTER_VEC: IntCounterVec = register_int_counter_vec!(
"tikv_pd_reconnect_total",
"Total number of PD reconnections.",
&["type"]
)
.unwrap();
pub static ref PD_RECONNECT_COUNTER_VEC: PDReconnectEventCounterVec =
register_static_int_counter_vec!(
PDReconnectEventCounterVec,
"tikv_pd_reconnect_total",
"Total number of PD reconnections.",
&["type"]
)
.unwrap();
pub static ref PD_PENDING_HEARTBEAT_GAUGE: IntGauge = register_int_gauge!(
"tikv_pd_pending_heartbeat_total",
"Total number of pending region heartbeat"
Expand Down
102 changes: 81 additions & 21 deletions components/pd_client/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const MAX_RETRY_TIMES: u64 = 5;
// The max duration when retrying to connect to leader. No matter if the
// MAX_RETRY_TIMES is reached.
const MAX_RETRY_DURATION: Duration = Duration::from_secs(10);
const MAX_BACKOFF: Duration = Duration::from_secs(3);

// FIXME: Use a request-independent way to handle reconnection.
const GLOBAL_RECONNECT_INTERVAL: Duration = Duration::from_millis(100); // 0.1s
Expand Down Expand Up @@ -108,6 +109,7 @@ pub struct Inner {
pub meta_storage: MetaStorageStub,

last_try_reconnect: Instant,
bo: ExponentialBackoff,
}

impl Inner {
Expand Down Expand Up @@ -201,6 +203,7 @@ impl Client {
pending_heartbeat: Arc::default(),
pending_buckets: Arc::default(),
last_try_reconnect: Instant::now(),
bo: ExponentialBackoff::new(retry_interval),
tso,
meta_storage,
}),
Expand Down Expand Up @@ -328,18 +331,20 @@ impl Client {
/// Note: Retrying too quickly will return an error due to cancellation.
/// Please always try to reconnect after sending the request first.
pub async fn reconnect(&self, force: bool) -> Result<()> {
PD_RECONNECT_COUNTER_VEC.with_label_values(&["try"]).inc();
PD_RECONNECT_COUNTER_VEC.try_connect.inc();
let start = Instant::now();

let future = {
let inner = self.inner.rl();
<<<<<<< HEAD
if start.saturating_duration_since(inner.last_try_reconnect) < GLOBAL_RECONNECT_INTERVAL
{
=======
if start.saturating_duration_since(inner.last_try_reconnect) < inner.bo.get_interval() {
>>>>>>> 4b3e33e6c2 (pd_client: add backoff for the reconnect retries (#15429))
// Avoid unnecessary updating.
// Prevent a large number of reconnections in a short time.
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["cancel"])
.inc();
PD_RECONNECT_COUNTER_VEC.cancel.inc();
return Err(box_err!("cancel reconnection due to too small interval"));
}
let connector = PdConnector::new(inner.env.clone(), inner.security_mgr.clone());
Expand All @@ -360,37 +365,43 @@ impl Client {

{
let mut inner = self.inner.wl();
<<<<<<< HEAD
if start.saturating_duration_since(inner.last_try_reconnect) < GLOBAL_RECONNECT_INTERVAL
{
=======
if start.saturating_duration_since(inner.last_try_reconnect) < inner.bo.get_interval() {
>>>>>>> 4b3e33e6c2 (pd_client: add backoff for the reconnect retries (#15429))
// There may be multiple reconnections that pass the read lock at the same time.
// Check again in the write lock to avoid unnecessary updating.
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["cancel"])
.inc();
PD_RECONNECT_COUNTER_VEC.cancel.inc();
return Err(box_err!("cancel reconnection due to too small interval"));
}
inner.last_try_reconnect = start;
inner.bo.next_backoff();
}

slow_log!(start.saturating_elapsed(), "try reconnect pd");
let (client, target_info, members, tso) = match future.await {
Err(e) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["failure"])
.inc();
PD_RECONNECT_COUNTER_VEC.failure.inc();
return Err(e);
}
Ok(None) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["no-need"])
.inc();
return Ok(());
}
Ok(Some(tuple)) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["success"])
.inc();
tuple
Ok(res) => {
// Reset the retry count.
{
let mut inner = self.inner.wl();
inner.bo.reset()
}
match res {
None => {
PD_RECONNECT_COUNTER_VEC.no_need.inc();
return Ok(());
}
Some(tuple) => {
PD_RECONNECT_COUNTER_VEC.success.inc();
tuple
}
}
}
};

Expand Down Expand Up @@ -859,6 +870,33 @@ impl PdConnector {
}
}

/// Simple backoff strategy.
struct ExponentialBackoff {
base: Duration,
interval: Duration,
}

impl ExponentialBackoff {
pub fn new(base: Duration) -> Self {
Self {
base,
interval: base,
}
}
pub fn next_backoff(&mut self) -> Duration {
self.interval = std::cmp::min(self.interval * 2, MAX_BACKOFF);
self.interval
}

pub fn get_interval(&self) -> Duration {
self.interval
}

pub fn reset(&mut self) {
self.interval = self.base;
}
}

pub fn trim_http_prefix(s: &str) -> &str {
s.trim_start_matches("http://")
.trim_start_matches("https://")
Expand Down Expand Up @@ -1004,8 +1042,11 @@ pub fn merge_bucket_stats<C: AsRef<[u8]>, I: AsRef<[u8]>>(
mod test {
use kvproto::metapb::BucketStats;

use super::*;
use crate::{merge_bucket_stats, util::find_bucket_index};

const BASE_BACKOFF: Duration = Duration::from_millis(100);

#[test]
fn test_merge_bucket_stats() {
#[allow(clippy::type_complexity)]
Expand Down Expand Up @@ -1121,4 +1162,23 @@ mod test {
assert_eq!(find_bucket_index(b"k7", &keys), Some(4));
assert_eq!(find_bucket_index(b"k8", &keys), Some(4));
}

#[test]
fn test_exponential_backoff() {
let mut backoff = ExponentialBackoff::new(BASE_BACKOFF);
assert_eq!(backoff.get_interval(), BASE_BACKOFF);

assert_eq!(backoff.next_backoff(), 2 * BASE_BACKOFF);
assert_eq!(backoff.next_backoff(), Duration::from_millis(400));
assert_eq!(backoff.get_interval(), Duration::from_millis(400));

// Should not exceed MAX_BACKOFF
for _ in 0..20 {
backoff.next_backoff();
}
assert_eq!(backoff.get_interval(), MAX_BACKOFF);

backoff.reset();
assert_eq!(backoff.get_interval(), BASE_BACKOFF);
}
}

0 comments on commit 89a5439

Please sign in to comment.