Skip to content

Commit

Permalink
refactor: metasrv cannot be cloned (#4834)
Browse files Browse the repository at this point in the history
* refactor: metasrv cannot be cloned

* chore: remove MetasrvInstance's clone
  • Loading branch information
fengjiachun authored Oct 15, 2024
1 parent 16b8cdc commit 59ec902
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 46 deletions.
16 changes: 8 additions & 8 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ use crate::selector::SelectorType;
use crate::service::admin;
use crate::{error, Result};

#[derive(Clone)]
pub struct MetasrvInstance {
metasrv: Metasrv,
metasrv: Arc<Metasrv>,

httpsrv: Arc<HttpServer>,

Expand All @@ -83,8 +82,9 @@ impl MetasrvInstance {
.with_greptime_config_options(opts.to_toml().context(TomlFormatSnafu)?)
.build(),
);
let metasrv = Arc::new(metasrv);
// put metasrv into plugins for later use
plugins.insert::<Arc<Metasrv>>(Arc::new(metasrv.clone()));
plugins.insert::<Arc<Metasrv>>(metasrv.clone());
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(InitExportMetricsTaskSnafu)?;
Ok(MetasrvInstance {
Expand Down Expand Up @@ -178,13 +178,13 @@ pub async fn bootstrap_metasrv_with_router(
Ok(())
}

pub fn router(metasrv: Metasrv) -> Router {
pub fn router(metasrv: Arc<Metasrv>) -> Router {
tonic::transport::Server::builder()
.accept_http1(true) // for admin services
.add_service(HeartbeatServer::new(metasrv.clone()))
.add_service(StoreServer::new(metasrv.clone()))
.add_service(ClusterServer::new(metasrv.clone()))
.add_service(ProcedureServiceServer::new(metasrv.clone()))
.add_service(HeartbeatServer::from_arc(metasrv.clone()))
.add_service(StoreServer::from_arc(metasrv.clone()))
.add_service(ClusterServer::from_arc(metasrv.clone()))
.add_service(ProcedureServiceServer::from_arc(metasrv.clone()))
.add_service(admin::make_admin_service(metasrv))
}

Expand Down
1 change: 0 additions & 1 deletion src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ impl Mailbox for HeartbeatMailbox {
}

/// The builder to build the group of heartbeat handlers.
#[derive(Clone)]
pub struct HeartbeatHandlerGroupBuilder {
/// The handler to handle region failure.
region_failure_handler: Option<RegionFailureHandler>,
Expand Down
1 change: 0 additions & 1 deletion src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatAcceptor, RegionSupervisor};

#[derive(Clone)]
pub struct RegionFailureHandler {
heartbeat_acceptor: HeartbeatAcceptor,
}
Expand Down
1 change: 0 additions & 1 deletion src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use crate::metasrv::Context;
use crate::region::lease_keeper::{RegionLeaseKeeperRef, RenewRegionLeasesResponse};
use crate::region::RegionLeaseKeeper;

#[derive(Clone)]
pub struct RegionLeaseHandler {
region_lease_seconds: u64,
region_lease_keeper: RegionLeaseKeeperRef,
Expand Down
35 changes: 16 additions & 19 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub mod builder;

use std::fmt::Display;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;

use clap::ValueEnum;
Expand Down Expand Up @@ -337,7 +337,6 @@ impl MetaStateHandler {
}
}

#[derive(Clone)]
pub struct Metasrv {
state: StateRef,
started: Arc<AtomicBool>,
Expand All @@ -353,8 +352,8 @@ pub struct Metasrv {
selector: SelectorRef,
// The flow selector is used to select a target flownode.
flow_selector: SelectorRef,
handler_group: Option<HeartbeatHandlerGroupRef>,
handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
election: Option<ElectionRef>,
procedure_manager: ProcedureManagerRef,
mailbox: MailboxRef,
Expand All @@ -371,15 +370,7 @@ pub struct Metasrv {
}

impl Metasrv {
pub async fn try_start(&mut self) -> Result<()> {
let builder = self
.handler_group_builder
.take()
.context(error::UnexpectedSnafu {
violated: "expected heartbeat handler group builder",
})?;
self.handler_group = Some(Arc::new(builder.build()?));

pub async fn try_start(&self) -> Result<()> {
if self
.started
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
Expand All @@ -389,6 +380,16 @@ impl Metasrv {
return Ok(());
}

let handler_group_builder =
self.handler_group_builder
.lock()
.unwrap()
.take()
.context(error::UnexpectedSnafu {
violated: "expected heartbeat handler group builder",
})?;
*self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));

// Creates default schema if not exists
self.table_metadata_manager
.init()
Expand Down Expand Up @@ -567,12 +568,8 @@ impl Metasrv {
&self.flow_selector
}

pub fn handler_group(&self) -> &Option<HeartbeatHandlerGroupRef> {
&self.handler_group
}

pub fn handler_group_builder(&mut self) -> &mut Option<HeartbeatHandlerGroupBuilder> {
&mut self.handler_group_builder
pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
self.handler_group.read().unwrap().clone()
}

pub fn election(&self) -> Option<&ElectionRef> {
Expand Down
6 changes: 3 additions & 3 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;

use client::client_manager::NodeClients;
Expand Down Expand Up @@ -371,8 +371,8 @@ impl MetasrvBuilder {
selector,
// TODO(jeremy): We do not allow configuring the flow selector.
flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)),
handler_group: None,
handler_group_builder: Some(handler_group_builder),
handler_group: RwLock::new(None),
handler_group_builder: Mutex::new(Some(handler_group_builder)),
election,
procedure_manager,
mailbox,
Expand Down
11 changes: 6 additions & 5 deletions src/meta-srv/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
pub struct MockInfo {
pub server_addr: String,
pub channel_manager: ChannelManager,
pub metasrv: Metasrv,
pub metasrv: Arc<Metasrv>,
}

pub async fn mock_with_memstore() -> MockInfo {
Expand Down Expand Up @@ -74,16 +74,17 @@ pub async fn mock(
None => builder,
};

let mut metasrv = builder.build().await.unwrap();
let metasrv = builder.build().await.unwrap();
metasrv.try_start().await.unwrap();

let (client, server) = tokio::io::duplex(1024);
let metasrv = Arc::new(metasrv);
let service = metasrv.clone();
let _handle = tokio::spawn(async move {
tonic::transport::Server::builder()
.add_service(HeartbeatServer::new(service.clone()))
.add_service(StoreServer::new(service.clone()))
.add_service(ProcedureServiceServer::new(service.clone()))
.add_service(HeartbeatServer::from_arc(service.clone()))
.add_service(StoreServer::from_arc(service.clone()))
.add_service(ProcedureServiceServer::from_arc(service.clone()))
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/service/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tonic::server::NamedService;

use crate::metasrv::Metasrv;

pub fn make_admin_service(metasrv: Metasrv) -> Admin {
pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
let router = Router::new().route("/health", health::HealthHandler);

let router = router.route(
Expand Down
9 changes: 3 additions & 6 deletions src/meta-srv/src/service/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,9 @@ impl heartbeat_server::Heartbeat for Metasrv {
) -> GrpcResult<Self::HeartbeatStream> {
let mut in_stream = req.into_inner();
let (tx, rx) = mpsc::channel(128);
let handler_group = self
.handler_group()
.clone()
.context(error::UnexpectedSnafu {
violated: "expected heartbeat handlers",
})?;
let handler_group = self.handler_group().context(error::UnexpectedSnafu {
violated: "expected heartbeat handlers",
})?;

let ctx = self.new_ctx();
let _handle = common_runtime::spawn_global(async move {
Expand Down
2 changes: 1 addition & 1 deletion tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub struct GreptimeDbCluster {

pub datanode_instances: HashMap<DatanodeId, Datanode>,
pub kv_backend: KvBackendRef,
pub metasrv: Metasrv,
pub metasrv: Arc<Metasrv>,
pub frontend: Arc<FeInstance>,
}

Expand Down

0 comments on commit 59ec902

Please sign in to comment.