diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index fcbb109ef..e312dd070 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -445,7 +445,7 @@ impl Node { metadata_store_client: &MetadataStoreClient, config: &Configuration, ) -> Result { - Self::retry_on_network_error(|| { + Self::retry_on_network_error(config.common.network_error_retry_policy.clone(), || { metadata_store_client.get_or_insert(PARTITION_TABLE_KEY.clone(), || { FixedPartitionTable::new(Version::MIN, config.common.bootstrap_num_partitions()) }) @@ -459,7 +459,7 @@ impl Node { config: &Configuration, num_partitions: u64, ) -> Result { - Self::retry_on_network_error(|| { + Self::retry_on_network_error(config.common.network_error_retry_policy.clone(), || { metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), || { bootstrap_logs_metadata(config.bifrost.default_provider, num_partitions) }) @@ -472,7 +472,7 @@ impl Node { metadata_store_client: &MetadataStoreClient, common_opts: &CommonOptions, ) -> Result { - Self::retry_on_network_error(|| { + Self::retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { let mut previous_node_generation = None; metadata_store_client.read_modify_write(NODES_CONFIG_KEY.clone(), move |nodes_config| { let mut nodes_config = if common_opts.allow_bootstrap { @@ -553,22 +553,17 @@ impl Node { .map_err(|err| err.transpose()) } - async fn retry_on_network_error(action: Fn) -> Result + async fn retry_on_network_error(retry_policy: P, action: Fn) -> Result where + P: Into, Fn: FnMut() -> Fut, Fut: Future>, E: MetadataStoreClientError + std::fmt::Display, { - // todo: Make upsert timeout configurable - let retry_policy = RetryPolicy::exponential( - Duration::from_millis(10), - 2.0, - Some(15), - Some(Duration::from_secs(5)), - ); let upsert_start = Instant::now(); retry_policy + .into() .retry_if(action, |err: &E| { if err.is_network_error() { if upsert_start.elapsed() < Duration::from_secs(5) { diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 813a3fb09..68cc880c7 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -233,6 +233,11 @@ pub struct CommonOptions { #[serde(with = "serde_with::As::")] #[cfg_attr(feature = "schemars", schemars(with = "String"))] pub metadata_update_interval: humantime::Duration, + + /// # Network error retry policy + /// + /// The retry policy for node network error + pub network_error_retry_policy: RetryPolicy, } static HOSTNAME: Lazy = Lazy::new(|| { @@ -357,6 +362,12 @@ impl Default for CommonOptions { rocksdb_perf_level: PerfStatsLevel::EnableCount, rocksdb: Default::default(), metadata_update_interval: std::time::Duration::from_secs(3).into(), + network_error_retry_policy: RetryPolicy::exponential( + Duration::from_millis(10), + 2.0, + Some(15), + Some(Duration::from_secs(5)), + ), } } }