Skip to content

Commit

Permalink
pd_client: reduce PD reconnection (tikv#14954)
Browse files Browse the repository at this point in the history
ref tikv/pd#6556, close tikv#14964

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
Signed-off-by: tonyxuqqi <[email protected]>
  • Loading branch information
2 people authored and tonyxuqqi committed Jun 22, 2023
1 parent 6137273 commit e0910ed
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 25 deletions.
1 change: 1 addition & 0 deletions components/pd_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl RpcClient {
target,
tso.unwrap(),
cfg.enable_forwarding,
cfg.retry_interval.0,
)),
monitor: monitor.clone(),
};
Expand Down
2 changes: 1 addition & 1 deletion components/pd_client/src/client_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ async fn reconnect_loop(
use tikv_util::config::ReadableDuration;
ReadableDuration::from_str(&s.unwrap()).unwrap().0
});
request_timeout()
cfg.retry_interval.0
})();
let mut last_connect = StdInstant::now();
loop {
Expand Down
2 changes: 1 addition & 1 deletion components/pd_client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct Config {
///
/// Default is `"127.0.0.1:2379"`.
pub endpoints: Vec<String>,
/// The interval at which to retry a PD connection initialization.
/// The interval at which to retry a PD connection.
///
/// Default is 300ms.
pub retry_interval: ReadableDuration,
Expand Down
10 changes: 5 additions & 5 deletions components/pd_client/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ const MAX_RETRY_TIMES: u64 = 5;
const MAX_RETRY_DURATION: Duration = Duration::from_secs(10);

// FIXME: Use a request-independent way to handle reconnection.
const GLOBAL_RECONNECT_INTERVAL: Duration = Duration::from_millis(100); // 0.1s
pub const REQUEST_RECONNECT_INTERVAL: Duration = Duration::from_secs(1); // 1s

#[derive(Clone)]
Expand Down Expand Up @@ -160,6 +159,7 @@ pub struct Client {
pub(crate) inner: RwLock<Inner>,
pub feature_gate: FeatureGate,
enable_forwarding: bool,
retry_interval: Duration,
}

impl Client {
Expand All @@ -171,6 +171,7 @@ impl Client {
target: TargetInfo,
tso: TimestampOracle,
enable_forwarding: bool,
retry_interval: Duration,
) -> Client {
if !target.direct_connected() {
REQUEST_FORWARDED_GAUGE_VEC
Expand Down Expand Up @@ -206,6 +207,7 @@ impl Client {
}),
feature_gate: FeatureGate::default(),
enable_forwarding,
retry_interval,
}
}

Expand Down Expand Up @@ -333,8 +335,7 @@ impl Client {

let future = {
let inner = self.inner.rl();
if start.saturating_duration_since(inner.last_try_reconnect) < GLOBAL_RECONNECT_INTERVAL
{
if start.saturating_duration_since(inner.last_try_reconnect) < self.retry_interval {
// Avoid unnecessary updating.
// Prevent a large number of reconnections in a short time.
PD_RECONNECT_COUNTER_VEC
Expand All @@ -360,8 +361,7 @@ impl Client {

{
let mut inner = self.inner.wl();
if start.saturating_duration_since(inner.last_try_reconnect) < GLOBAL_RECONNECT_INTERVAL
{
if start.saturating_duration_since(inner.last_try_reconnect) < self.retry_interval {
// There may be multiple reconnections that pass the read lock at the same time.
// Check again in the write lock to avoid unnecessary updating.
PD_RECONNECT_COUNTER_VEC
Expand Down
2 changes: 1 addition & 1 deletion etc/config-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@
## PD endpoints.
# endpoints = ["127.0.0.1:2379"]

## The interval at which to retry a PD connection initialization.
## The interval at which to retry a PD connection.
## Default is 300ms.
# retry-interval = "300ms"

Expand Down
4 changes: 2 additions & 2 deletions tests/failpoints/cases/test_pd_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ fn test_slow_periodical_update() {

fail::cfg(pd_client_reconnect_fp, "pause").unwrap();
// Wait for the PD client thread blocking on the fail point.
// The GLOBAL_RECONNECT_INTERVAL is 0.1s so sleeps 0.2s here.
thread::sleep(Duration::from_millis(200));
// The retry interval is 300ms so sleeps 400ms here.
thread::sleep(Duration::from_millis(400));

let (tx, rx) = mpsc::channel();
let handle = thread::spawn(move || {
Expand Down
8 changes: 4 additions & 4 deletions tests/failpoints/cases/test_pd_client_legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ fn test_slow_periodical_update() {

fail::cfg(pd_client_reconnect_fp, "pause").unwrap();
// Wait for the PD client thread blocking on the fail point.
// The GLOBAL_RECONNECT_INTERVAL is 0.1s so sleeps 0.2s here.
thread::sleep(Duration::from_millis(200));
// The retry interval is 300ms so sleeps 400ms here.
thread::sleep(Duration::from_millis(400));

let (tx, rx) = mpsc::channel();
let handle = thread::spawn(move || {
Expand All @@ -245,8 +245,8 @@ fn test_reconnect_limit() {
let pd_client_reconnect_fp = "pd_client_reconnect";
let (_server, client) = new_test_server_and_client(ReadableDuration::secs(100));

// The GLOBAL_RECONNECT_INTERVAL is 0.1s so sleeps 0.2s here.
thread::sleep(Duration::from_millis(200));
// The default retry interval is 300ms so sleeps 400ms here.
thread::sleep(Duration::from_millis(400));

// The first reconnection will succeed, and the last_update will not be updated.
fail::cfg(pd_client_reconnect_fp, "return").unwrap();
Expand Down
10 changes: 5 additions & 5 deletions tests/integrations/pd/test_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ fn restart_leader(mgr: SecurityManager) {
server.stop();
server.start(&mgr, eps);

// The GLOBAL_RECONNECT_INTERVAL is 0.1s so sleeps 0.2s here.
thread::sleep(Duration::from_millis(200));
// The default retry interval is 300ms so sleeps 400ms here.
thread::sleep(Duration::from_millis(400));

let region = block_on(client.get_region_by_id(region.get_id())).unwrap();
assert_eq!(region.unwrap().get_id(), region_id);
Expand Down Expand Up @@ -604,9 +604,9 @@ fn test_cluster_version() {
assert!(feature_gate.can_enable(feature_b));
assert!(!feature_gate.can_enable(feature_c));

// After reconnect the version should be still accessable.
// The GLOBAL_RECONNECT_INTERVAL is 0.1s so sleeps 0.2s here.
thread::sleep(Duration::from_millis(200));
// After reconnect the version should be still accessible.
// The default retry interval is 300ms so sleeps 400ms here.
thread::sleep(Duration::from_millis(400));
client.reconnect().unwrap();
assert!(feature_gate.can_enable(feature_b));
assert!(!feature_gate.can_enable(feature_c));
Expand Down
12 changes: 6 additions & 6 deletions tests/integrations/pd/test_rpc_client_legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,8 @@ fn restart_leader(mgr: SecurityManager) {
server.stop();
server.start(&mgr, eps);

// The GLOBAL_RECONNECT_INTERVAL is 0.1s so sleeps 0.2s here.
thread::sleep(Duration::from_millis(200));
// The default retry interval is 300ms so sleeps 400ms here.
thread::sleep(Duration::from_millis(400));

let region = block_on(client.get_region_by_id(region.get_id())).unwrap();
assert_eq!(region.unwrap().get_id(), region_id);
Expand Down Expand Up @@ -518,7 +518,7 @@ fn test_pd_client_heartbeat_send_failed() {
RegionStat::default(),
None,
));
let rsp = rx.recv_timeout(Duration::from_millis(100));
let rsp = rx.recv_timeout(Duration::from_millis(300));
if ok {
assert!(rsp.is_ok());
assert_eq!(rsp.unwrap().get_region_id(), 1);
Expand Down Expand Up @@ -677,9 +677,9 @@ fn test_cluster_version() {
assert!(feature_gate.can_enable(feature_b));
assert!(!feature_gate.can_enable(feature_c));

// After reconnect the version should be still accessable.
// The GLOBAL_RECONNECT_INTERVAL is 0.1s so sleeps 0.2s here.
thread::sleep(Duration::from_millis(200));
// After reconnect the version should be still accessible.
// The default retry interval is 300ms so sleeps 400ms here.
thread::sleep(Duration::from_millis(400));
client.reconnect().unwrap();
assert!(feature_gate.can_enable(feature_b));
assert!(!feature_gate.can_enable(feature_c));
Expand Down

0 comments on commit e0910ed

Please sign in to comment.