Skip to content

Commit

Permalink
refactor(datanode): move Instance heartbeat task to Datanode struct (G…
Browse files Browse the repository at this point in the history
…reptimeTeam#1832)

* refactor(datanode): move Instance heartbeat to Datanode struct

* chore: apply suggestions from CR

* fix: start heartbeat task after instance starts
  • Loading branch information
WenyXu authored and paomian committed Oct 19, 2023
1 parent 47867b1 commit f301cb7
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 82 deletions.
28 changes: 23 additions & 5 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,26 @@

//! Datanode configurations

use std::sync::Arc;
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use common_error::prelude::BoxedError;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
use meta_client::MetaClientOptions;
use secrecy::SecretString;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::Mode;
use snafu::ResultExt;
use storage::config::{
EngineConfig as StorageEngineConfig, DEFAULT_AUTO_FLUSH_INTERVAL, DEFAULT_MAX_FLUSH_TASKS,
DEFAULT_PICKER_SCHEDULE_INTERVAL, DEFAULT_REGION_WRITE_BUFFER_SIZE,
};
use storage::scheduler::SchedulerConfig;

use crate::error::Result;
use crate::error::{Result, ShutdownInstanceSnafu};
use crate::heartbeat::HeartbeatTask;
use crate::instance::{Instance, InstanceRef};
use crate::server::Services;

Expand Down Expand Up @@ -340,6 +342,7 @@ pub struct DatanodeOptions {
pub rpc_addr: String,
pub rpc_hostname: Option<String>,
pub rpc_runtime_size: usize,
pub heartbeat_interval_millis: u64,
pub http_opts: HttpOptions,
pub meta_client_options: Option<MetaClientOptions>,
pub wal: WalConfig,
Expand All @@ -363,6 +366,7 @@ impl Default for DatanodeOptions {
storage: StorageConfig::default(),
procedure: ProcedureConfig::default(),
logging: LoggingOptions::default(),
heartbeat_interval_millis: 5000,
}
}
}
Expand All @@ -378,11 +382,12 @@ pub struct Datanode {
opts: DatanodeOptions,
services: Option<Services>,
instance: InstanceRef,
heartbeat_task: Option<HeartbeatTask>,
}

impl Datanode {
pub async fn new(opts: DatanodeOptions) -> Result<Datanode> {
let instance = Arc::new(Instance::with_opts(&opts).await?);
let (instance, heartbeat_task) = Instance::with_opts(&opts).await?;
let services = match opts.mode {
Mode::Distributed => Some(Services::try_new(instance.clone(), &opts).await?),
Mode::Standalone => None,
Expand All @@ -391,6 +396,7 @@ impl Datanode {
opts,
services,
instance,
heartbeat_task,
})
}

Expand All @@ -402,7 +408,11 @@ impl Datanode {

/// Start only the internal component of datanode.
pub async fn start_instance(&mut self) -> Result<()> {
self.instance.start().await
let _ = self.instance.start().await;
if let Some(task) = &self.heartbeat_task {
task.start().await?;
}
Ok(())
}

/// Start services of datanode. This method call will block until services are shutdown.
Expand All @@ -419,7 +429,15 @@ impl Datanode {
}

pub async fn shutdown_instance(&self) -> Result<()> {
self.instance.shutdown().await
if let Some(heartbeat_task) = &self.heartbeat_task {
heartbeat_task
.close()
.await
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?;
}
let _ = self.instance.shutdown().await;
Ok(())
}

async fn shutdown_services(&self) -> Result<()> {
Expand Down
123 changes: 72 additions & 51 deletions src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use storage::config::EngineConfig as StorageEngineConfig;
use storage::scheduler::{LocalScheduler, SchedulerConfig};
use storage::EngineImpl;
use store_api::logstore::LogStore;
use table::engine::manager::MemoryTableEngineManager;
use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef};
use table::engine::{TableEngine, TableEngineProcedureRef};
use table::requests::FlushTableRequest;
use table::table::numbers::NumbersTable;
Expand Down Expand Up @@ -78,14 +78,13 @@ pub struct Instance {
pub(crate) sql_handler: SqlHandler,
pub(crate) catalog_manager: CatalogManagerRef,
pub(crate) table_id_provider: Option<TableIdProviderRef>,
pub(crate) heartbeat_task: Option<HeartbeatTask>,
procedure_manager: ProcedureManagerRef,
}

pub type InstanceRef = Arc<Instance>;

impl Instance {
pub async fn with_opts(opts: &DatanodeOptions) -> Result<Self> {
pub async fn with_opts(opts: &DatanodeOptions) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let meta_client = match opts.mode {
Mode::Standalone => None,
Mode::Distributed => {
Expand All @@ -105,11 +104,57 @@ impl Instance {
Self::new(opts, meta_client, compaction_scheduler).await
}

fn build_heartbeat_task(
opts: &DatanodeOptions,
meta_client: Option<Arc<MetaClient>>,
catalog_manager: CatalogManagerRef,
engine_manager: TableEngineManagerRef,
region_alive_keepers: Option<Arc<RegionAliveKeepers>>,
) -> Result<Option<HeartbeatTask>> {
Ok(match opts.mode {
Mode::Standalone => None,
Mode::Distributed => {
let node_id = opts.node_id.context(MissingNodeIdSnafu)?;
let meta_client = meta_client.context(IncorrectInternalStateSnafu {
state: "meta client is not provided when building heartbeat task",
})?;
let region_alive_keepers =
region_alive_keepers.context(IncorrectInternalStateSnafu {
state: "region_alive_keepers is not provided when building heartbeat task",
})?;
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler::default()),
Arc::new(OpenRegionHandler::new(
catalog_manager.clone(),
engine_manager.clone(),
region_alive_keepers.clone(),
)),
Arc::new(CloseRegionHandler::new(
catalog_manager.clone(),
engine_manager,
region_alive_keepers.clone(),
)),
region_alive_keepers.clone(),
]);

Some(HeartbeatTask::new(
node_id,
opts,
meta_client,
catalog_manager,
Arc::new(handlers_executor),
opts.heartbeat_interval_millis,
region_alive_keepers,
))
}
})
}

pub(crate) async fn new(
opts: &DatanodeOptions,
meta_client: Option<Arc<MetaClient>>,
compaction_scheduler: CompactionSchedulerRef<RaftEngineLogStore>,
) -> Result<Self> {
) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let object_store = store::new_object_store(&opts.storage.store).await?;
let log_store = Arc::new(create_log_store(&opts.storage.store, &opts.wal).await?);

Expand Down Expand Up @@ -151,7 +196,7 @@ impl Instance {
);

// create remote catalog manager
let (catalog_manager, table_id_provider, heartbeat_task) = match opts.mode {
let (catalog_manager, table_id_provider, region_alive_keepers) = match opts.mode {
Mode::Standalone => {
if opts.enable_memory_catalog {
let catalog = Arc::new(catalog::local::MemoryCatalogManager::default());
Expand Down Expand Up @@ -189,17 +234,15 @@ impl Instance {
}

Mode::Distributed => {
let meta_client = meta_client.context(IncorrectInternalStateSnafu {
let meta_client = meta_client.clone().context(IncorrectInternalStateSnafu {
state: "meta client is not provided when creating distributed Datanode",
})?;

let kv_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));

let heartbeat_interval_millis = 5000;
let kv_backend = Arc::new(CachedMetaKvBackend::new(meta_client));

let region_alive_keepers = Arc::new(RegionAliveKeepers::new(
engine_manager.clone(),
heartbeat_interval_millis,
opts.heartbeat_interval_millis,
));

let catalog_manager = Arc::new(RemoteCatalogManager::new(
Expand All @@ -209,32 +252,11 @@ impl Instance {
region_alive_keepers.clone(),
));

let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler::default()),
Arc::new(OpenRegionHandler::new(
catalog_manager.clone(),
engine_manager.clone(),
region_alive_keepers.clone(),
)),
Arc::new(CloseRegionHandler::new(
catalog_manager.clone(),
engine_manager.clone(),
region_alive_keepers.clone(),
)),
region_alive_keepers.clone(),
]);

let heartbeat_task = Some(HeartbeatTask::new(
opts.node_id.context(MissingNodeIdSnafu)?,
opts,
meta_client,
catalog_manager.clone(),
Arc::new(handlers_executor),
heartbeat_interval_millis,
region_alive_keepers,
));

(catalog_manager as CatalogManagerRef, None, heartbeat_task)
(
catalog_manager as CatalogManagerRef,
None,
Some(region_alive_keepers),
)
}
};

Expand All @@ -258,28 +280,34 @@ impl Instance {
&*procedure_manager,
);

Ok(Self {
let instance = Arc::new(Self {
query_engine: query_engine.clone(),
sql_handler: SqlHandler::new(
engine_manager,
engine_manager.clone(),
catalog_manager.clone(),
procedure_manager.clone(),
),
catalog_manager,
heartbeat_task,
catalog_manager: catalog_manager.clone(),
table_id_provider,
procedure_manager,
})
});

let heartbeat_task = Instance::build_heartbeat_task(
opts,
meta_client,
catalog_manager,
engine_manager,
region_alive_keepers,
)?;

Ok((instance, heartbeat_task))
}

pub async fn start(&self) -> Result<()> {
self.catalog_manager
.start()
.await
.context(NewCatalogSnafu)?;
if let Some(task) = &self.heartbeat_task {
task.start().await?;
}

// Recover procedures after the catalog manager is started, so we can
// ensure we can access all tables from the catalog manager.
Expand All @@ -298,13 +326,6 @@ impl Instance {
.stop()
.await
.context(StopProcedureManagerSnafu)?;
if let Some(heartbeat_task) = &self.heartbeat_task {
heartbeat_task
.close()
.await
.map_err(BoxedError::new)
.context(ShutdownInstanceSnafu)?;
}

self.flush_tables().await?;

Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

pub mod datanode;
pub mod error;
mod heartbeat;
pub mod heartbeat;
pub mod instance;
pub mod metrics;
mod mock;
Expand Down
12 changes: 9 additions & 3 deletions src/datanode/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,21 @@ use storage::compaction::noop::NoopCompactionScheduler;

use crate::datanode::DatanodeOptions;
use crate::error::Result;
use crate::instance::Instance;
use crate::heartbeat::HeartbeatTask;
use crate::instance::{Instance, InstanceRef};

impl Instance {
pub async fn with_mock_meta_client(opts: &DatanodeOptions) -> Result<Self> {
pub async fn with_mock_meta_client(
opts: &DatanodeOptions,
) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let mock_info = meta_srv::mocks::mock_with_memstore().await;
Self::with_mock_meta_server(opts, mock_info).await
}

pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result<Self> {
pub async fn with_mock_meta_server(
opts: &DatanodeOptions,
meta_srv: MockInfo,
) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await);
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
Instance::new(opts, Some(meta_client), compaction_scheduler).await
Expand Down
17 changes: 13 additions & 4 deletions src/datanode/src/tests/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,30 @@ use crate::datanode::{
DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, StorageConfig, WalConfig,
};
use crate::error::{CreateTableSnafu, Result};
use crate::instance::Instance;
use crate::heartbeat::HeartbeatTask;
use crate::instance::{Instance, InstanceRef};

pub(crate) struct MockInstance {
instance: Instance,
instance: InstanceRef,
_heartbeat: Option<HeartbeatTask>,
_guard: TestGuard,
}

impl MockInstance {
pub(crate) async fn new(name: &str) -> Self {
let (opts, _guard) = create_tmp_dir_and_datanode_opts(name);

let instance = Instance::with_mock_meta_client(&opts).await.unwrap();
let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap();
instance.start().await.unwrap();
if let Some(task) = heartbeat.as_ref() {
task.start().await.unwrap();
}

MockInstance { instance, _guard }
MockInstance {
instance,
_guard,
_heartbeat: heartbeat,
}
}

pub(crate) fn inner(&self) -> &Instance {
Expand Down
Loading

0 comments on commit f301cb7

Please sign in to comment.