Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: database base ttl #4926

Merged
merged 11 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub mod catalog_name;
pub mod datanode_table;
pub mod flow;
pub mod node_address;
mod schema_metadata_manager;
pub mod schema_name;
pub mod table_info;
pub mod table_name;
Expand All @@ -116,6 +117,7 @@ use flow::flow_route::FlowRouteValue;
use flow::table_flow::TableFlowValue;
use lazy_static::lazy_static;
use regex::Regex;
pub use schema_metadata_manager::{SchemaMetadataManager, SchemaMetadataManagerRef};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
Expand Down
122 changes: 122 additions & 0 deletions src/common/meta/src/key/schema_metadata_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Schema-level metadata manager.

use std::sync::Arc;

use snafu::OptionExt;
use store_api::storage::TableId;

use crate::error::TableInfoNotFoundSnafu;
use crate::key::schema_name::{SchemaManager, SchemaNameKey};
use crate::key::table_info::{TableInfoManager, TableInfoManagerRef};
use crate::kv_backend::KvBackendRef;
use crate::{error, SchemaOptions};

pub type SchemaMetadataManagerRef = Arc<SchemaMetadataManager>;

pub struct SchemaMetadataManager {
table_info_manager: TableInfoManagerRef,
schema_manager: SchemaManager,
#[cfg(any(test, feature = "testing"))]
kv_backend: KvBackendRef,
}

impl SchemaMetadataManager {
/// Creates a new database meta
#[cfg(not(any(test, feature = "testing")))]
pub fn new(kv_backend: KvBackendRef) -> Self {
let table_info_manager = Arc::new(TableInfoManager::new(kv_backend.clone()));
let schema_manager = SchemaManager::new(kv_backend);
Self {
table_info_manager,
schema_manager,
}
}

/// Creates a new database meta
#[cfg(any(test, feature = "testing"))]
pub fn new(kv_backend: KvBackendRef) -> Self {
let table_info_manager = Arc::new(TableInfoManager::new(kv_backend.clone()));
let schema_manager = SchemaManager::new(kv_backend.clone());
Self {
table_info_manager,
schema_manager,
kv_backend,
}
}

/// Gets schema options by table id.
pub async fn get_schema_options_by_table_id(
&self,
table_id: TableId,
) -> error::Result<Option<SchemaOptions>> {
let table_info = self
.table_info_manager
.get(table_id)
.await?
.with_context(|| TableInfoNotFoundSnafu {
table: format!("table id: {}", table_id),
})?;

let key = SchemaNameKey::new(
&table_info.table_info.catalog_name,
&table_info.table_info.schema_name,
);
self.schema_manager.get(key).await
}

#[cfg(any(test, feature = "testing"))]
pub async fn register_region_table_info(
&self,
table_id: TableId,
table_name: &str,
schema_name: &str,
catalog_name: &str,
schema_value: Option<crate::key::schema_name::SchemaNameValue>,
) {
use table::metadata::{RawTableInfo, TableType};
let value = crate::key::table_info::TableInfoValue::new(RawTableInfo {
ident: Default::default(),
name: table_name.to_string(),
desc: None,
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
meta: Default::default(),
table_type: TableType::Base,
});
let (txn, _) = self
.table_info_manager
.build_create_txn(table_id, &value)
.unwrap();
let resp = self.kv_backend.txn(txn).await.unwrap();
assert!(resp.succeeded, "Failed to create table metadata");
let key = SchemaNameKey {
catalog: catalog_name,
schema: schema_name,
};
self.schema_manager
.create(key, schema_value, false)
.await
.expect("Failed to create schema metadata");
common_telemetry::info!(
"Register table: {}, id: {}, schema: {}, catalog: {}",
table_name,
table_id,
schema_name,
catalog_name
);
}
}
1 change: 1 addition & 0 deletions src/common/meta/src/key/table_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl TableInfoValue {
}

pub type TableInfoManagerRef = Arc<TableInfoManager>;

#[derive(Clone)]
pub struct TableInfoManager {
kv_backend: KvBackendRef,
Expand Down
3 changes: 3 additions & 0 deletions src/common/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,7 @@ pub type DatanodeId = u64;
// The id of the flownode.
pub type FlownodeId = u64;

/// Schema options.
pub type SchemaOptions = key::schema_name::SchemaNameValue;

pub use instruction::RegionIdent;
3 changes: 2 additions & 1 deletion src/datanode/src/alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,8 @@ mod test {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let mut engine_env = TestEnv::with_prefix("region-alive-keeper");
let engine = Arc::new(engine_env.create_engine(MitoConfig::default()).await);
let engine = engine_env.create_engine(MitoConfig::default()).await;
let engine = Arc::new(engine);
region_server.register_engine(engine.clone());

let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), 100));
Expand Down
21 changes: 18 additions & 3 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_base::Plugins;
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::wal_options_allocator::prepare_wal_options;
pub use common_procedure::options::ProcedureConfig;
Expand Down Expand Up @@ -207,7 +208,10 @@ impl DatanodeBuilder {
(Box::new(NoopRegionServerEventListener) as _, None)
};

let region_server = self.new_region_server(region_event_listener).await?;
let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(kv_backend.clone()));
let region_server = self
.new_region_server(schema_metadata_manager, region_event_listener)
.await?;

let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let table_values = datanode_table_manager
Expand Down Expand Up @@ -312,6 +316,7 @@ impl DatanodeBuilder {

async fn new_region_server(
&self,
schema_metadata_manager: SchemaMetadataManagerRef,
event_listener: RegionServerEventListenerRef,
) -> Result<RegionServer> {
let opts: &DatanodeOptions = &self.opts;
Expand Down Expand Up @@ -340,8 +345,13 @@ impl DatanodeBuilder {
);

let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
let engines =
Self::build_store_engines(opts, object_store_manager, self.plugins.clone()).await?;
let engines = Self::build_store_engines(
opts,
object_store_manager,
schema_metadata_manager,
self.plugins.clone(),
)
.await?;
for engine in engines {
region_server.register_engine(engine);
}
Expand All @@ -355,6 +365,7 @@ impl DatanodeBuilder {
async fn build_store_engines(
opts: &DatanodeOptions,
object_store_manager: ObjectStoreManagerRef,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<Vec<RegionEngineRef>> {
let mut engines = vec![];
Expand All @@ -365,6 +376,7 @@ impl DatanodeBuilder {
opts,
object_store_manager.clone(),
config.clone(),
schema_metadata_manager.clone(),
plugins.clone(),
)
.await?;
Expand All @@ -390,6 +402,7 @@ impl DatanodeBuilder {
opts: &DatanodeOptions,
object_store_manager: ObjectStoreManagerRef,
config: MitoConfig,
schema_metadata_manager: SchemaMetadataManagerRef,
plugins: Plugins,
) -> Result<MitoEngine> {
let mito_engine = match &opts.wal {
Expand All @@ -399,6 +412,7 @@ impl DatanodeBuilder {
Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
.await?,
object_store_manager,
schema_metadata_manager,
plugins,
)
.await
Expand Down Expand Up @@ -429,6 +443,7 @@ impl DatanodeBuilder {
config,
Self::build_kafka_log_store(kafka_config, global_index_collector).await?,
object_store_manager,
schema_metadata_manager,
plugins,
)
.await
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1355,7 +1355,7 @@ mod tests {
}

#[tokio::test]
async fn test_region_server_parallism() {
async fn test_region_server_parallelism() {
let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
let first_query = p.acquire().await;
assert!(first_query.is_ok());
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ impl FlownodeBuilder {
///
/// or recover all existing flow tasks if in standalone mode(nodeid is None)
///
/// TODO(discord9): persisent flow tasks with internal state
/// TODO(discord9): persistent flow tasks with internal state
async fn recover_flows(&self, manager: &FlowWorkerManagerRef) -> Result<usize, Error> {
let nodeid = self.opts.node_id;
let to_be_recovered: Vec<_> = if let Some(nodeid) = nodeid {
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ common-datasource.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
Expand Down Expand Up @@ -74,6 +75,7 @@ uuid.workspace = true

[dev-dependencies]
common-function.workspace = true
common-meta = { workspace = true, features = ["testing"] }
common-procedure-test.workspace = true
common-test-util.workspace = true
criterion = "0.4"
Expand Down
Loading