Skip to content

Commit

Permalink
feat: Make heartbeat intervals configurable in Frontend and Datanode (#…
Browse files Browse the repository at this point in the history
…1864)

* update frontend options and config

* fix format
  • Loading branch information
ccjeff committed Jul 3, 2023
1 parent 783a794 commit e54415e
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 9 deletions.
2 changes: 2 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ rpc_addr = "127.0.0.1:3001"
rpc_hostname = "127.0.0.1"
# The number of gRPC server worker threads, 8 by default.
rpc_runtime_size = 8
# Interval for sending heartbeat messages to the Metasrv in milliseconds, 5000 by default.
heartbeat_interval_millis = 5000

# Metasrv client options.
[meta_client_options]
Expand Down
4 changes: 4 additions & 0 deletions config/frontend.example.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Node running mode, see `standalone.example.toml`.
mode = "distributed"
# Interval for sending heartbeat task to the Metasrv in milliseconds, 5000 by default.
heartbeat_interval_millis = 5000
# Interval for retry sending heartbeat task in milliseconds, 5000 by default.
retry_interval_millis = 5000

# HTTP server options, see `standalone.example.toml`.
[http_options]
Expand Down
1 change: 1 addition & 0 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ impl StandaloneOptions {
prom_options: self.prom_options,
meta_client_options: None,
logging: self.logging,
..Default::default()
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use crate::service_config::{
#[serde(default)]
pub struct FrontendOptions {
pub mode: Mode,
pub heartbeat_interval_millis: u64,
pub retry_interval_millis: u64,
pub http_options: Option<HttpOptions>,
pub grpc_options: Option<GrpcOptions>,
pub mysql_options: Option<MysqlOptions>,
Expand All @@ -43,6 +45,8 @@ impl Default for FrontendOptions {
fn default() -> Self {
Self {
mode: Mode::Standalone,
heartbeat_interval_millis: 5000,
retry_interval_millis: 5000,
http_options: Some(HttpOptions::default()),
grpc_options: Some(GrpcOptions::default()),
mysql_options: Some(MysqlOptions::default()),
Expand Down
12 changes: 6 additions & 6 deletions src/frontend/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ pub struct HeartbeatTask {
impl HeartbeatTask {
pub fn new(
meta_client: Arc<MetaClient>,
report_interval: u64,
retry_interval: u64,
heartbeat_interval_millis: u64,
retry_interval_millis: u64,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
) -> Self {
HeartbeatTask {
meta_client,
report_interval,
retry_interval,
report_interval: heartbeat_interval_millis,
retry_interval: retry_interval_millis,
resp_handler_executor,
}
}
Expand Down Expand Up @@ -92,7 +92,7 @@ impl HeartbeatTask {
Err(e) => {
error!(e; "Occur error while reading heartbeat response");
capture_self
.start_with_retry(Duration::from_secs(retry_interval))
.start_with_retry(Duration::from_millis(retry_interval))
.await;

break;
Expand Down Expand Up @@ -136,7 +136,7 @@ impl HeartbeatTask {
}
}
_ = &mut sleep => {
sleep.as_mut().reset(Instant::now() + Duration::from_secs(report_interval));
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
Some(HeartbeatRequest::default())
}
};
Expand Down
7 changes: 4 additions & 3 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,14 @@ impl Instance {

let datanode_clients = Arc::new(DatanodeClients::default());

Self::try_new_distributed_with(meta_client, datanode_clients, plugins).await
Self::try_new_distributed_with(meta_client, datanode_clients, plugins, opts).await
}

pub async fn try_new_distributed_with(
meta_client: Arc<MetaClient>,
datanode_clients: Arc<DatanodeClients>,
plugins: Arc<Plugins>,
opts: &FrontendOptions,
) -> Result<Self> {
let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let table_routes = Arc::new(TableRoutes::new(meta_client.clone()));
Expand Down Expand Up @@ -195,8 +196,8 @@ impl Instance {

let heartbeat_task = Some(HeartbeatTask::new(
meta_client,
5,
5,
opts.heartbeat_interval_millis,
opts.retry_interval_millis,
Arc::new(handlers_executor),
));

Expand Down
4 changes: 4 additions & 0 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use common_test_util::temp_dir::create_temp_dir;
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
use datanode::heartbeat::HeartbeatTask;
use datanode::instance::Instance as DatanodeInstance;
use frontend::frontend::FrontendOptions;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
use meta_client::client::MetaClientBuilder;
use meta_srv::cluster::MetaPeerClientRef;
Expand Down Expand Up @@ -221,11 +222,14 @@ impl GreptimeDbClusterBuilder {
meta_client.start(&[&meta_srv.server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);

let frontend_opts = FrontendOptions::default();

Arc::new(
FeInstance::try_new_distributed_with(
meta_client,
datanode_clients,
Arc::new(Plugins::default()),
&frontend_opts,
)
.await
.unwrap(),
Expand Down

0 comments on commit e54415e

Please sign in to comment.