Skip to content

Commit

Permalink
refactor: introduce HeartbeatHandlerGroupBuilder (#4785)
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu authored Sep 30, 2024
1 parent cd55202 commit 77af4fd
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 76 deletions.
156 changes: 130 additions & 26 deletions src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,28 @@ use api::v1::meta::{
HeartbeatRequest, HeartbeatResponse, MailboxMessage, RegionLease, RequestHeader,
ResponseHeader, Role, PROTOCOL_VERSION,
};
use check_leader_handler::CheckLeaderHandler;
use collect_cluster_info_handler::{
CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
CollectFrontendClusterInfoHandler,
};
use collect_stats_handler::CollectStatsHandler;
use common_base::Plugins;
use common_meta::datanode::Stat;
use common_meta::instruction::{Instruction, InstructionReply};
use common_meta::sequence::Sequence;
use common_telemetry::{debug, info, warn};
use dashmap::DashMap;
use extract_stat_handler::ExtractStatHandler;
use failure_handler::RegionFailureHandler;
use filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
use futures::future::join_all;
use keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
use mailbox_handler::MailboxHandler;
use on_leader_start_handler::OnLeaderStartHandler;
use publish_heartbeat_handler::PublishHeartbeatHandler;
use region_lease_handler::RegionLeaseHandler;
use response_header_handler::ResponseHeaderHandler;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use tokio::sync::mpsc::Sender;
Expand All @@ -36,6 +52,7 @@ use tokio::sync::{oneshot, Notify, RwLock};
use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
use crate::metasrv::Context;
use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
use crate::pubsub::PublisherRef;
use crate::service::mailbox::{
BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId,
};
Expand Down Expand Up @@ -96,6 +113,7 @@ impl HeartbeatAccumulator {
}
}

/// The pusher of the heartbeat response.
pub struct Pusher {
sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
res_header: ResponseHeader,
Expand Down Expand Up @@ -131,6 +149,7 @@ impl Pusher {
}
}

/// The group of heartbeat pushers.
#[derive(Clone, Default)]
pub struct Pushers(Arc<RwLock<BTreeMap<String, Pusher>>>);

Expand Down Expand Up @@ -203,50 +222,57 @@ impl NameCachedHandler {
}
}

#[derive(Clone, Default)]
pub type HeartbeatHandlerGroupRef = Arc<HeartbeatHandlerGroup>;

/// The group of heartbeat handlers.
#[derive(Default)]
pub struct HeartbeatHandlerGroup {
handlers: Arc<RwLock<Vec<NameCachedHandler>>>,
handlers: Vec<NameCachedHandler>,
pushers: Pushers,
}

impl HeartbeatHandlerGroup {
pub(crate) fn new(pushers: Pushers) -> Self {
Self {
handlers: Arc::new(RwLock::new(vec![])),
handlers: vec![],
pushers,
}
}

pub async fn add_handler(&self, handler: impl HeartbeatHandler + 'static) {
let mut handlers = self.handlers.write().await;
handlers.push(NameCachedHandler::new(handler));
fn add_handler(&mut self, handler: impl HeartbeatHandler + 'static) {
self.handlers.push(NameCachedHandler::new(handler));
}

pub async fn register(&self, key: impl AsRef<str>, pusher: Pusher) {
/// Registers the heartbeat response [`Pusher`] with the given key to the group.
pub async fn register_pusher(&self, key: impl AsRef<str>, pusher: Pusher) {
let key = key.as_ref();
METRIC_META_HEARTBEAT_CONNECTION_NUM.inc();
info!("Pusher register: {}", key);
let _ = self.pushers.insert(key.to_string(), pusher).await;
}

pub async fn deregister(&self, key: impl AsRef<str>) -> Option<Pusher> {
/// Deregisters the heartbeat response [`Pusher`] with the given key from the group.
///
/// Returns the [`Pusher`] if it exists.
pub async fn deregister_push(&self, key: impl AsRef<str>) -> Option<Pusher> {
let key = key.as_ref();
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
info!("Pusher unregister: {}", key);
self.pushers.remove(key).await
}

/// Returns the [`Pushers`] of the group.
pub fn pushers(&self) -> Pushers {
self.pushers.clone()
}

/// Handles the heartbeat request.
pub async fn handle(
&self,
req: HeartbeatRequest,
mut ctx: Context,
) -> Result<HeartbeatResponse> {
let mut acc = HeartbeatAccumulator::default();
let handlers = self.handlers.read().await;
let role = req
.header
.as_ref()
Expand All @@ -255,7 +281,7 @@ impl HeartbeatHandlerGroup {
err_msg: format!("invalid role: {:?}", req.header),
})?;

for NameCachedHandler { name, handler } in handlers.iter() {
for NameCachedHandler { name, handler } in self.handlers.iter() {
if !handler.is_acceptable(role) {
continue;
}
Expand Down Expand Up @@ -426,6 +452,84 @@ impl Mailbox for HeartbeatMailbox {
}
}

/// The builder to build the group of heartbeat handlers.
pub struct HeartbeatHandlerGroupBuilder {
/// The handler to handle region failure.
region_failure_handler: Option<RegionFailureHandler>,

/// The handler to handle region lease.
region_lease_handler: RegionLeaseHandler,

/// The plugins.
plugins: Option<Plugins>,

/// The heartbeat response pushers.
pushers: Pushers,
}

impl HeartbeatHandlerGroupBuilder {
pub fn new(pushers: Pushers, region_lease_handler: RegionLeaseHandler) -> Self {
Self {
region_failure_handler: None,
region_lease_handler,
plugins: None,
pushers,
}
}

/// Sets the [`RegionFailureHandler`].
pub fn with_region_failure_handler(mut self, handler: Option<RegionFailureHandler>) -> Self {
self.region_failure_handler = handler;
self
}

/// Sets the [`Plugins`].
pub fn with_plugins(mut self, plugins: Option<Plugins>) -> Self {
self.plugins = plugins;
self
}

/// Builds the group of heartbeat handlers.
pub fn build(self) -> HeartbeatHandlerGroup {
// Extract the `PublishHeartbeatHandler` from the plugins.
let publish_heartbeat_handler = if let Some(plugins) = self.plugins {
plugins
.get::<PublisherRef>()
.map(|publish| PublishHeartbeatHandler::new(publish.clone()))
} else {
None
};

// TODO(weny): Considers classifying handlers
// to make it easier for upper layers to customize handler groups.
let mut group = HeartbeatHandlerGroup::new(self.pushers);
group.add_handler(ResponseHeaderHandler);
// `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
// because even if the current meta-server node is no longer the leader it can
// still help the datanode to keep lease.
group.add_handler(DatanodeKeepLeaseHandler);
group.add_handler(FlownodeKeepLeaseHandler);
group.add_handler(CheckLeaderHandler);
group.add_handler(OnLeaderStartHandler);
group.add_handler(ExtractStatHandler);
group.add_handler(CollectDatanodeClusterInfoHandler);
group.add_handler(CollectFrontendClusterInfoHandler);
group.add_handler(CollectFlownodeClusterInfoHandler);
group.add_handler(MailboxHandler);
group.add_handler(self.region_lease_handler);
group.add_handler(FilterInactiveRegionStatsHandler);
if let Some(region_failure_handler) = self.region_failure_handler {
group.add_handler(region_failure_handler);
}
if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
group.add_handler(publish_heartbeat_handler);
}
group.add_handler(CollectStatsHandler::default());

group
}
}

#[cfg(test)]
mod tests {

Expand Down Expand Up @@ -489,7 +593,7 @@ mod tests {
let pusher: Pusher = Pusher::new(pusher_tx, &res_header);
let handler_group = HeartbeatHandlerGroup::default();
handler_group
.register(format!("{}-{}", Role::Datanode as i32, datanode_id), pusher)
.register_pusher(format!("{}-{}", Role::Datanode as i32, datanode_id), pusher)
.await;

let kv_backend = Arc::new(MemoryKvBackend::new());
Expand Down Expand Up @@ -519,21 +623,21 @@ mod tests {

#[tokio::test]
async fn test_handler_name() {
let group = HeartbeatHandlerGroup::default();
group.add_handler(ResponseHeaderHandler).await;
group.add_handler(DatanodeKeepLeaseHandler).await;
group.add_handler(FlownodeKeepLeaseHandler).await;
group.add_handler(CheckLeaderHandler).await;
group.add_handler(OnLeaderStartHandler).await;
group.add_handler(ExtractStatHandler).await;
group.add_handler(CollectDatanodeClusterInfoHandler).await;
group.add_handler(CollectFrontendClusterInfoHandler).await;
group.add_handler(CollectFlownodeClusterInfoHandler).await;
group.add_handler(MailboxHandler).await;
group.add_handler(FilterInactiveRegionStatsHandler).await;
group.add_handler(CollectStatsHandler::default()).await;

let handlers = group.handlers.read().await;
let mut group = HeartbeatHandlerGroup::default();
group.add_handler(ResponseHeaderHandler);
group.add_handler(DatanodeKeepLeaseHandler);
group.add_handler(FlownodeKeepLeaseHandler);
group.add_handler(CheckLeaderHandler);
group.add_handler(OnLeaderStartHandler);
group.add_handler(ExtractStatHandler);
group.add_handler(CollectDatanodeClusterInfoHandler);
group.add_handler(CollectFrontendClusterInfoHandler);
group.add_handler(CollectFlownodeClusterInfoHandler);
group.add_handler(MailboxHandler);
group.add_handler(FilterInactiveRegionStatsHandler);
group.add_handler(CollectStatsHandler::default());

let handlers = group.handlers;

assert_eq!(12, handlers.len());

Expand Down
6 changes: 3 additions & 3 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use crate::error::{
StopProcedureManagerSnafu,
};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::HeartbeatHandlerGroup;
use crate::handler::HeartbeatHandlerGroupRef;
use crate::lease::lookup_datanode_peer;
use crate::lock::DistLockRef;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
Expand Down Expand Up @@ -366,7 +366,7 @@ pub struct Metasrv {
selector: SelectorRef,
// The flow selector is used to select a target flownode.
flow_selector: SelectorRef,
handler_group: HeartbeatHandlerGroup,
handler_group: HeartbeatHandlerGroupRef,
election: Option<ElectionRef>,
lock: DistLockRef,
procedure_manager: ProcedureManagerRef,
Expand Down Expand Up @@ -562,7 +562,7 @@ impl Metasrv {
&self.flow_selector
}

pub fn handler_group(&self) -> &HeartbeatHandlerGroup {
pub fn handler_group(&self) -> &HeartbeatHandlerGroupRef {
&self.handler_group
}

Expand Down
53 changes: 8 additions & 45 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,11 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::error::{self, Result};
use crate::flow_meta_alloc::FlowPeerAllocator;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::check_leader_handler::CheckLeaderHandler;
use crate::handler::collect_cluster_info_handler::{
CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler,
CollectFrontendClusterInfoHandler,
};
use crate::handler::collect_stats_handler::CollectStatsHandler;
use crate::handler::extract_stat_handler::ExtractStatHandler;
use crate::handler::failure_handler::RegionFailureHandler;
use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler;
use crate::handler::keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler};
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::on_leader_start_handler::OnLeaderStartHandler;
use crate::handler::publish_heartbeat_handler::PublishHeartbeatHandler;
use crate::handler::region_lease_handler::RegionLeaseHandler;
use crate::handler::response_header_handler::ResponseHeaderHandler;
use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pushers};
use crate::handler::{
HeartbeatHandlerGroup, HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers,
};
use crate::lease::MetaPeerLookupService;
use crate::lock::memory::MemLock;
use crate::lock::DistLockRef;
Expand All @@ -70,7 +59,6 @@ use crate::metasrv::{
};
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::pubsub::PublisherRef;
use crate::region::supervisor::{
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorTicker,
DEFAULT_TICK_INTERVAL,
Expand Down Expand Up @@ -364,41 +352,16 @@ impl MetasrvBuilder {
let handler_group = match handler_group {
Some(handler_group) => handler_group,
None => {
let publish_heartbeat_handler = plugins
.clone()
.and_then(|plugins| plugins.get::<PublisherRef>())
.map(|publish| PublishHeartbeatHandler::new(publish.clone()));

let region_lease_handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
table_metadata_manager.clone(),
memory_region_keeper.clone(),
);

let group = HeartbeatHandlerGroup::new(pushers);
group.add_handler(ResponseHeaderHandler).await;
// `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
// because even if the current meta-server node is no longer the leader it can
// still help the datanode to keep lease.
group.add_handler(DatanodeKeepLeaseHandler).await;
group.add_handler(FlownodeKeepLeaseHandler).await;
group.add_handler(CheckLeaderHandler).await;
group.add_handler(OnLeaderStartHandler).await;
group.add_handler(ExtractStatHandler).await;
group.add_handler(CollectDatanodeClusterInfoHandler).await;
group.add_handler(CollectFrontendClusterInfoHandler).await;
group.add_handler(CollectFlownodeClusterInfoHandler).await;
group.add_handler(MailboxHandler).await;
group.add_handler(region_lease_handler).await;
group.add_handler(FilterInactiveRegionStatsHandler).await;
if let Some(region_failover_handler) = region_failover_handler {
group.add_handler(region_failover_handler).await;
}
if let Some(publish_heartbeat_handler) = publish_heartbeat_handler {
group.add_handler(publish_heartbeat_handler).await;
}
group.add_handler(CollectStatsHandler::default()).await;
group
HeartbeatHandlerGroupBuilder::new(pushers, region_lease_handler)
.with_plugins(plugins.clone())
.with_region_failure_handler(region_failover_handler)
.build()
}
};

Expand All @@ -417,7 +380,7 @@ impl MetasrvBuilder {
selector,
// TODO(jeremy): We do not allow configuring the flow selector.
flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)),
handler_group,
handler_group: Arc::new(handler_group),
election,
lock,
procedure_manager,
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/service/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl heartbeat_server::Heartbeat for Metasrv {
);

if let Some(key) = pusher_key {
let _ = handler_group.deregister(&key).await;
let _ = handler_group.deregister_push(&key).await;
}
});

Expand Down Expand Up @@ -177,7 +177,7 @@ async fn register_pusher(
let node_id = get_node_id(header);
let key = format!("{}-{}", role, node_id);
let pusher = Pusher::new(sender, header);
handler_group.register(&key, pusher).await;
handler_group.register_pusher(&key, pusher).await;
Some(key)
}

Expand Down

0 comments on commit 77af4fd

Please sign in to comment.