From 6ed6e83e5d61af8d4dfb75c60f901a86645b54d8 Mon Sep 17 00:00:00 2001 From: fys Date: Wed, 2 Aug 2023 16:25:06 +0800 Subject: [PATCH] cr --- src/meta-srv/src/handler.rs | 2 +- .../src/handler/persist_stats_handler.rs | 1 - ...andler.rs => publish_heartbeat_handler.rs} | 22 ++++++++++++------- .../src/handler/response_header_handler.rs | 1 - src/meta-srv/src/metasrv.rs | 12 ++++------ src/meta-srv/src/metasrv/builder.rs | 9 ++++---- src/meta-srv/src/pubsub.rs | 9 +++++--- src/meta-srv/src/pubsub/publish.rs | 2 +- src/meta-srv/src/pubsub/subscribe_manager.rs | 4 ++-- src/meta-srv/src/pubsub/tests.rs | 10 ++++----- 10 files changed, 38 insertions(+), 34 deletions(-) rename src/meta-srv/src/handler/{report_handler.rs => publish_heartbeat_handler.rs} (70%) diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 10a7b0124d04..611a0d5b18a3 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -54,8 +54,8 @@ pub mod mailbox_handler; pub mod node_stat; mod on_leader_start_handler; mod persist_stats_handler; +pub mod publish_heartbeat_handler; pub(crate) mod region_lease_handler; -pub mod report_handler; mod response_header_handler; #[async_trait::async_trait] diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index 412b2a57a145..db8d73362abb 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -181,7 +181,6 @@ mod tests { table_metadata_manager: Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( kv_store.clone(), ))), - publish: None, }; let handler = PersistStatsHandler::default(); diff --git a/src/meta-srv/src/handler/report_handler.rs b/src/meta-srv/src/handler/publish_heartbeat_handler.rs similarity index 70% rename from src/meta-srv/src/handler/report_handler.rs rename to src/meta-srv/src/handler/publish_heartbeat_handler.rs index 825b74281b09..beceb4fe9af3 100644 --- a/src/meta-srv/src/handler/report_handler.rs +++ b/src/meta-srv/src/handler/publish_heartbeat_handler.rs @@ -18,12 +18,20 @@ use async_trait::async_trait; use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; -use crate::pubsub::Message; +use crate::pubsub::{Message, PublishRef}; -pub struct ReportHandler; +pub struct PublishHeartbeatHandler { + publish: PublishRef, +} + +impl PublishHeartbeatHandler { + pub fn new(publish: PublishRef) -> PublishHeartbeatHandler { + PublishHeartbeatHandler { publish } + } +} #[async_trait] -impl HeartbeatHandler for ReportHandler { +impl HeartbeatHandler for PublishHeartbeatHandler { fn is_acceptable(&self, role: Role) -> bool { role == Role::Datanode } @@ -31,13 +39,11 @@ impl HeartbeatHandler for ReportHandler { async fn handle( &self, req: &HeartbeatRequest, - ctx: &mut Context, + _: &mut Context, _: &mut HeartbeatAccumulator, ) -> Result<()> { - if let Some(publish) = ctx.publish.as_ref() { - let req = Box::new(req.clone()); - publish.send_msg(Message::Heartbeat(req)).await; - } + let msg = Message::Heartbeat(Box::new(req.clone())); + self.publish.send_msg(msg).await; Ok(()) } diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 65aa8acedb81..25e3ff5bd621 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -89,7 +89,6 @@ mod tests { table_metadata_manager: Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( kv_store.clone(), ))), - publish: None, }; let req = HeartbeatRequest { diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index d0ea76e27476..0bf5427ff11d 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -118,7 +118,6 @@ pub struct Context { pub meta_peer_client: MetaPeerClientRef, pub mailbox: MailboxRef, pub election: Option, - pub publish: Option, pub skip_all: Arc, pub is_infancy: bool, pub table_metadata_manager: TableMetadataManagerRef, @@ -179,8 +178,7 @@ pub struct MetaSrv { ddl_manager: DdlManagerRef, table_metadata_manager: TableMetadataManagerRef, greptimedb_telemerty_task: Arc, - publish: Option, - subscribe_manager: Option, + pubsub: Option<(PublishRef, SubscribeManagerRef)>, } impl MetaSrv { @@ -200,7 +198,7 @@ impl MetaSrv { let procedure_manager = self.procedure_manager.clone(); let in_memory = self.in_memory.clone(); let leader_cached_kv_store = self.leader_cached_kv_store.clone(); - let subscribe_manager = self.subscribe_manager.clone(); + let subscribe_manager = self.subscribe_manager().cloned(); let mut rx = election.subscribe_leader_change(); let task_handler = self.greptimedb_telemerty_task.clone(); let _handle = common_runtime::spawn_bg(async move { @@ -341,11 +339,11 @@ impl MetaSrv { } pub fn publish(&self) -> Option<&PublishRef> { - self.publish.as_ref() + self.pubsub.as_ref().map(|suite| &suite.0) } pub fn subscribe_manager(&self) -> Option<&SubscribeManagerRef> { - self.subscribe_manager.as_ref() + self.pubsub.as_ref().map(|suite| &suite.1) } #[inline] @@ -358,7 +356,6 @@ impl MetaSrv { let mailbox = self.mailbox.clone(); let election = self.election.clone(); let skip_all = Arc::new(AtomicBool::new(false)); - let publish = self.publish.clone(); Context { server_addr, @@ -371,7 +368,6 @@ impl MetaSrv { skip_all, is_infancy: false, table_metadata_manager: self.table_metadata_manager.clone(), - publish, } } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index cbab704c7faa..70ad5044dab6 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -27,8 +27,8 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::ddl::{DdlManager, DdlManagerRef}; use crate::error::Result; use crate::handler::mailbox_handler::MailboxHandler; +use crate::handler::publish_heartbeat_handler::PublishHeartbeatHandler; use crate::handler::region_lease_handler::RegionLeaseHandler; -use crate::handler::report_handler::ReportHandler; use crate::handler::{ CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox, KeepLeaseHandler, OnLeaderStartHandler, PersistStatsHandler, Pushers, RegionFailureHandler, @@ -221,7 +221,9 @@ impl MetaSrvBuilder { } group.add_handler(RegionLeaseHandler::default()).await; group.add_handler(PersistStatsHandler::default()).await; - group.add_handler(ReportHandler).await; + group + .add_handler(PublishHeartbeatHandler::new(publish.clone())) + .await; group } }; @@ -244,8 +246,7 @@ impl MetaSrvBuilder { ddl_manager, table_metadata_manager, greptimedb_telemerty_task: get_greptimedb_telemetry_task(meta_peer_client).await, - publish: Some(publish), - subscribe_manager: Some(sub_manager), + pubsub: Some((publish, sub_manager)), }) } } diff --git a/src/meta-srv/src/pubsub.rs b/src/meta-srv/src/pubsub.rs index 8badcb50258b..0560861ebc9d 100644 --- a/src/meta-srv/src/pubsub.rs +++ b/src/meta-srv/src/pubsub.rs @@ -20,9 +20,12 @@ mod subscriber; #[cfg(test)] mod tests; -pub use publish::*; -pub use subscribe_manager::*; -pub use subscriber::*; +pub use publish::{DefaultPublish, Publish, PublishRef}; +pub use subscribe_manager::{ + AddSubRequest, DefaultSubscribeManager, SubscribeManager, SubscribeManagerRef, SubscribeQuery, + UnSubRequest, +}; +pub use subscriber::{Subscriber, SubscriberRef, Transport}; /// Subscribed topic type, determined by the ability of meta. #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] diff --git a/src/meta-srv/src/pubsub/publish.rs b/src/meta-srv/src/pubsub/publish.rs index caa4b9868d18..8657b376c690 100644 --- a/src/meta-srv/src/pubsub/publish.rs +++ b/src/meta-srv/src/pubsub/publish.rs @@ -53,7 +53,7 @@ where async fn send_msg(&self, message: Message) { let sub_list = self .subscribe_manager - .subscriber_list_by_topic(&message.topic()); + .subscribers_by_topic(&message.topic()); for sub in sub_list { if sub.transport_msg(message.clone()).await.is_err() { diff --git a/src/meta-srv/src/pubsub/subscribe_manager.rs b/src/meta-srv/src/pubsub/subscribe_manager.rs index 25899a36d77d..743562e8aaa2 100644 --- a/src/meta-srv/src/pubsub/subscribe_manager.rs +++ b/src/meta-srv/src/pubsub/subscribe_manager.rs @@ -22,7 +22,7 @@ use crate::error::Result; use crate::pubsub::{Message, Subscriber, SubscriberRef, Topic, Transport}; pub trait SubscribeQuery: Send + Sync { - fn subscriber_list_by_topic(&self, topic: &Topic) -> Vec>; + fn subscribers_by_topic(&self, topic: &Topic) -> Vec>; } pub trait SubscribeManager: SubscribeQuery { @@ -60,7 +60,7 @@ impl SubscribeQuery for DefaultSubscribeManager where T: Transport, { - fn subscriber_list_by_topic(&self, topic: &Topic) -> Vec> { + fn subscribers_by_topic(&self, topic: &Topic) -> Vec> { self.topic2sub .get(topic) .map(|list_ref| list_ref.clone()) diff --git a/src/meta-srv/src/pubsub/tests.rs b/src/meta-srv/src/pubsub/tests.rs index 0fe088707198..41f1e3e95d89 100644 --- a/src/meta-srv/src/pubsub/tests.rs +++ b/src/meta-srv/src/pubsub/tests.rs @@ -103,7 +103,7 @@ async fn test_subscriber_disconnect() { join.await.unwrap(); - let subscriber_list = manager.subscriber_list_by_topic(&Topic::Heartbeat); + let subscriber_list = manager.subscribers_by_topic(&Topic::Heartbeat); assert!(subscriber_list.is_empty()); } @@ -123,7 +123,7 @@ fn test_sub_manager() { subscriber, }; manager.subscribe(req).unwrap(); - let ret = manager.subscriber_list_by_topic(&Topic::Heartbeat); + let ret = manager.subscribers_by_topic(&Topic::Heartbeat); assert_eq!(1, ret.len()); let subscriber = mock_subscriber(2, "gcrm").0; @@ -132,17 +132,17 @@ fn test_sub_manager() { subscriber, }; manager.subscribe(req).unwrap(); - let ret = manager.subscriber_list_by_topic(&Topic::Heartbeat); + let ret = manager.subscribers_by_topic(&Topic::Heartbeat); assert_eq!(2, ret.len()); let req = UnSubRequest { subscriber_id: 1 }; manager.un_subscribe(req).unwrap(); - let ret = manager.subscriber_list_by_topic(&Topic::Heartbeat); + let ret = manager.subscribers_by_topic(&Topic::Heartbeat); assert_eq!(1, ret.len()); let req = UnSubRequest { subscriber_id: 2 }; manager.un_subscribe(req).unwrap(); - let ret = manager.subscriber_list_by_topic(&Topic::Heartbeat); + let ret = manager.subscribers_by_topic(&Topic::Heartbeat); assert_eq!(0, ret.len()); }