Skip to content

Commit

Permalink
cr
Browse files Browse the repository at this point in the history
  • Loading branch information
fengys1996 committed Aug 2, 2023
1 parent af675b1 commit 6ed6e83
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ pub mod mailbox_handler;
pub mod node_stat;
mod on_leader_start_handler;
mod persist_stats_handler;
pub mod publish_heartbeat_handler;
pub(crate) mod region_lease_handler;
pub mod report_handler;
mod response_header_handler;

#[async_trait::async_trait]
Expand Down
1 change: 0 additions & 1 deletion src/meta-srv/src/handler/persist_stats_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ mod tests {
table_metadata_manager: Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
))),
publish: None,
};

let handler = PersistStatsHandler::default();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,32 @@ use async_trait::async_trait;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::pubsub::Message;
use crate::pubsub::{Message, PublishRef};

pub struct ReportHandler;
pub struct PublishHeartbeatHandler {
publish: PublishRef,
}

impl PublishHeartbeatHandler {
pub fn new(publish: PublishRef) -> PublishHeartbeatHandler {
PublishHeartbeatHandler { publish }
}
}

#[async_trait]
impl HeartbeatHandler for ReportHandler {
impl HeartbeatHandler for PublishHeartbeatHandler {
fn is_acceptable(&self, role: Role) -> bool {
role == Role::Datanode
}

async fn handle(
&self,
req: &HeartbeatRequest,
ctx: &mut Context,
_: &mut Context,
_: &mut HeartbeatAccumulator,
) -> Result<()> {
if let Some(publish) = ctx.publish.as_ref() {
let req = Box::new(req.clone());
publish.send_msg(Message::Heartbeat(req)).await;
}
let msg = Message::Heartbeat(Box::new(req.clone()));
self.publish.send_msg(msg).await;

Ok(())
}
Expand Down
1 change: 0 additions & 1 deletion src/meta-srv/src/handler/response_header_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ mod tests {
table_metadata_manager: Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
))),
publish: None,
};

let req = HeartbeatRequest {
Expand Down
12 changes: 4 additions & 8 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ pub struct Context {
pub meta_peer_client: MetaPeerClientRef,
pub mailbox: MailboxRef,
pub election: Option<ElectionRef>,
pub publish: Option<PublishRef>,
pub skip_all: Arc<AtomicBool>,
pub is_infancy: bool,
pub table_metadata_manager: TableMetadataManagerRef,
Expand Down Expand Up @@ -179,8 +178,7 @@ pub struct MetaSrv {
ddl_manager: DdlManagerRef,
table_metadata_manager: TableMetadataManagerRef,
greptimedb_telemerty_task: Arc<GreptimeDBTelemetryTask>,
publish: Option<PublishRef>,
subscribe_manager: Option<SubscribeManagerRef>,
pubsub: Option<(PublishRef, SubscribeManagerRef)>,
}

impl MetaSrv {
Expand All @@ -200,7 +198,7 @@ impl MetaSrv {
let procedure_manager = self.procedure_manager.clone();
let in_memory = self.in_memory.clone();
let leader_cached_kv_store = self.leader_cached_kv_store.clone();
let subscribe_manager = self.subscribe_manager.clone();
let subscribe_manager = self.subscribe_manager().cloned();
let mut rx = election.subscribe_leader_change();
let task_handler = self.greptimedb_telemerty_task.clone();
let _handle = common_runtime::spawn_bg(async move {
Expand Down Expand Up @@ -341,11 +339,11 @@ impl MetaSrv {
}

pub fn publish(&self) -> Option<&PublishRef> {
self.publish.as_ref()
self.pubsub.as_ref().map(|suite| &suite.0)
}

pub fn subscribe_manager(&self) -> Option<&SubscribeManagerRef> {
self.subscribe_manager.as_ref()
self.pubsub.as_ref().map(|suite| &suite.1)
}

#[inline]
Expand All @@ -358,7 +356,6 @@ impl MetaSrv {
let mailbox = self.mailbox.clone();
let election = self.election.clone();
let skip_all = Arc::new(AtomicBool::new(false));
let publish = self.publish.clone();

Context {
server_addr,
Expand All @@ -371,7 +368,6 @@ impl MetaSrv {
skip_all,
is_infancy: false,
table_metadata_manager: self.table_metadata_manager.clone(),
publish,
}
}
}
9 changes: 5 additions & 4 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::ddl::{DdlManager, DdlManagerRef};
use crate::error::Result;
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::publish_heartbeat_handler::PublishHeartbeatHandler;
use crate::handler::region_lease_handler::RegionLeaseHandler;
use crate::handler::report_handler::ReportHandler;
use crate::handler::{
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox,
KeepLeaseHandler, OnLeaderStartHandler, PersistStatsHandler, Pushers, RegionFailureHandler,
Expand Down Expand Up @@ -221,7 +221,9 @@ impl MetaSrvBuilder {
}
group.add_handler(RegionLeaseHandler::default()).await;
group.add_handler(PersistStatsHandler::default()).await;
group.add_handler(ReportHandler).await;
group
.add_handler(PublishHeartbeatHandler::new(publish.clone()))
.await;
group
}
};
Expand All @@ -244,8 +246,7 @@ impl MetaSrvBuilder {
ddl_manager,
table_metadata_manager,
greptimedb_telemerty_task: get_greptimedb_telemetry_task(meta_peer_client).await,
publish: Some(publish),
subscribe_manager: Some(sub_manager),
pubsub: Some((publish, sub_manager)),
})
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/meta-srv/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ mod subscriber;
#[cfg(test)]
mod tests;

pub use publish::*;
pub use subscribe_manager::*;
pub use subscriber::*;
pub use publish::{DefaultPublish, Publish, PublishRef};
pub use subscribe_manager::{
AddSubRequest, DefaultSubscribeManager, SubscribeManager, SubscribeManagerRef, SubscribeQuery,
UnSubRequest,
};
pub use subscriber::{Subscriber, SubscriberRef, Transport};

/// Subscribed topic type, determined by the ability of meta.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/pubsub/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ where
async fn send_msg(&self, message: Message) {
let sub_list = self
.subscribe_manager
.subscriber_list_by_topic(&message.topic());
.subscribers_by_topic(&message.topic());

for sub in sub_list {
if sub.transport_msg(message.clone()).await.is_err() {
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/pubsub/subscribe_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::error::Result;
use crate::pubsub::{Message, Subscriber, SubscriberRef, Topic, Transport};

pub trait SubscribeQuery<T>: Send + Sync {
fn subscriber_list_by_topic(&self, topic: &Topic) -> Vec<SubscriberRef<T>>;
fn subscribers_by_topic(&self, topic: &Topic) -> Vec<SubscriberRef<T>>;
}

pub trait SubscribeManager<T>: SubscribeQuery<T> {
Expand Down Expand Up @@ -60,7 +60,7 @@ impl<T> SubscribeQuery<T> for DefaultSubscribeManager<T>
where
T: Transport,
{
fn subscriber_list_by_topic(&self, topic: &Topic) -> Vec<SubscriberRef<T>> {
fn subscribers_by_topic(&self, topic: &Topic) -> Vec<SubscriberRef<T>> {
self.topic2sub
.get(topic)
.map(|list_ref| list_ref.clone())
Expand Down
10 changes: 5 additions & 5 deletions src/meta-srv/src/pubsub/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async fn test_subscriber_disconnect() {

join.await.unwrap();

let subscriber_list = manager.subscriber_list_by_topic(&Topic::Heartbeat);
let subscriber_list = manager.subscribers_by_topic(&Topic::Heartbeat);
assert!(subscriber_list.is_empty());
}

Expand All @@ -123,7 +123,7 @@ fn test_sub_manager() {
subscriber,
};
manager.subscribe(req).unwrap();
let ret = manager.subscriber_list_by_topic(&Topic::Heartbeat);
let ret = manager.subscribers_by_topic(&Topic::Heartbeat);
assert_eq!(1, ret.len());

let subscriber = mock_subscriber(2, "gcrm").0;
Expand All @@ -132,17 +132,17 @@ fn test_sub_manager() {
subscriber,
};
manager.subscribe(req).unwrap();
let ret = manager.subscriber_list_by_topic(&Topic::Heartbeat);
let ret = manager.subscribers_by_topic(&Topic::Heartbeat);
assert_eq!(2, ret.len());

let req = UnSubRequest { subscriber_id: 1 };
manager.un_subscribe(req).unwrap();
let ret = manager.subscriber_list_by_topic(&Topic::Heartbeat);
let ret = manager.subscribers_by_topic(&Topic::Heartbeat);
assert_eq!(1, ret.len());

let req = UnSubRequest { subscriber_id: 2 };
manager.un_subscribe(req).unwrap();
let ret = manager.subscriber_list_by_topic(&Topic::Heartbeat);
let ret = manager.subscribers_by_topic(&Topic::Heartbeat);
assert_eq!(0, ret.len());
}

Expand Down

0 comments on commit 6ed6e83

Please sign in to comment.