Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add region_statistics table #4771

Merged
merged 12 commits into from
Sep 27, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,20 @@ pub enum Error {
source: BoxedError,
},

#[snafu(display("Failed to list nodes in cluster: {source}"))]
#[snafu(display("Failed to list nodes in cluster"))]
ListNodes {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},

#[snafu(display("Failed to region stats in cluster"))]
ListRegionStats {
#[snafu(implicit)]
location: Location,
source: BoxedError,
},

#[snafu(display("Failed to list flows in catalog {catalog}"))]
ListFlows {
#[snafu(implicit)]
Expand Down Expand Up @@ -314,6 +321,7 @@ impl ErrorExt for Error {
| Error::ListTables { source, .. }
| Error::ListFlows { source, .. }
| Error::ListProcedures { source, .. }
| Error::ListRegionStats { source, .. }
| Error::ConvertProtoData { source, .. } => source.status_code(),

Error::CreateTable { source, .. } => source.status_code(),
Expand Down
5 changes: 5 additions & 0 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,17 @@ use crate::CatalogManager;
#[derive(Clone)]
pub struct KvBackendCatalogManager {
mode: Mode,
/// Only available in `Distributed` mode.
meta_client: Option<Arc<MetaClient>>,
/// Manages partition rules.
partition_manager: PartitionRuleManagerRef,
/// Manages table metadata.
table_metadata_manager: TableMetadataManagerRef,
/// A sub-CatalogManager that handles system tables
system_catalog: SystemCatalog,
/// Cache registry for all caches.
cache_registry: LayeredCacheRegistryRef,
/// Only available in `Standalone` mode.
procedure_manager: Option<ProcedureManagerRef>,
}

Expand Down
18 changes: 14 additions & 4 deletions src/catalog/src/system_schema/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod key_column_usage;
mod partitions;
mod procedure_info;
mod region_peers;
mod region_statistics;
mod runtime_metrics;
pub mod schemata;
mod table_constraints;
Expand Down Expand Up @@ -194,6 +195,11 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
self.catalog_manager.clone(),
)) as _,
),
REGION_STATISTICS => Some(Arc::new(
region_statistics::InformationSchemaRegionStatistics::new(
self.catalog_manager.clone(),
),
) as _),
_ => None,
}
}
Expand Down Expand Up @@ -241,6 +247,14 @@ impl InformationSchemaProvider {
CLUSTER_INFO.to_string(),
self.build_table(CLUSTER_INFO).unwrap(),
);
tables.insert(
PROCEDURE_INFO.to_string(),
self.build_table(PROCEDURE_INFO).unwrap(),
);
tables.insert(
REGION_STATISTICS.to_string(),
self.build_table(REGION_STATISTICS).unwrap(),
);
}

tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
Expand All @@ -256,10 +270,6 @@ impl InformationSchemaProvider {
self.build_table(TABLE_CONSTRAINTS).unwrap(),
);
tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
tables.insert(
PROCEDURE_INFO.to_string(),
self.build_table(PROCEDURE_INFO).unwrap(),
);
// Add memory tables
for name in MEMORY_TABLES.iter() {
tables.insert((*name).to_string(), self.build_table(name).expect(name));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID;
use common_config::Mode;
use common_error::ext::BoxedError;
use common_meta::cluster::ClusterInfo;
use common_meta::datanode::RegionStat;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
use common_telemetry::tracing::warn;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder};
use snafu::ResultExt;
use store_api::storage::{ScanRequest, TableId};

use super::{InformationTable, REGION_STATISTICS};
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, ListRegionStatsSnafu, Result};
use crate::information_schema::Predicates;
use crate::system_schema::utils;
use crate::CatalogManager;

const REGION_ID: &str = "region_id";
const TABLE_ID: &str = "table_id";
const REGION_NUMBER: &str = "region_number";
const MEMTABLE_SIZE: &str = "memtable_size";
const MANIFEST_SIZE: &str = "manifest_size";
const SST_SIZE: &str = "sst_size";
const ENGINE: &str = "engine";
const REGION_ROLE: &str = "region_role";

const INIT_CAPACITY: usize = 42;

/// The `REGION_STATISTICS` table provides information about the region statistics. Including fields:
///
/// - `region_id`: The region id.
/// - `table_id`: The table id.
/// - `region_number`: The region number.
/// - `memtable_size`: The memtable size in bytes.
/// - `manifest_size`: The manifest size in bytes.
/// - `sst_size`: The sst size in bytes.
/// - `engine`: The engine type.
/// - `region_role`: The region role.
///
pub(super) struct InformationSchemaRegionStatistics {
schema: SchemaRef,
catalog_manager: Weak<dyn CatalogManager>,
}

impl InformationSchemaRegionStatistics {
pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema: Self::schema(),
catalog_manager,
}
}

pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false),
ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(REGION_NUMBER, ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(MEMTABLE_SIZE, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(MANIFEST_SIZE, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(SST_SIZE, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(REGION_ROLE, ConcreteDataType::string_datatype(), true),
]))
}

fn builder(&self) -> InformationSchemaRegionStatisticsBuilder {
InformationSchemaRegionStatisticsBuilder::new(
self.schema.clone(),
self.catalog_manager.clone(),
)
}
}

impl InformationTable for InformationSchemaRegionStatistics {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID
}

fn table_name(&self) -> &'static str {
REGION_STATISTICS
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();

let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_region_statistics(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
));

Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}

struct InformationSchemaRegionStatisticsBuilder {
schema: SchemaRef,
catalog_manager: Weak<dyn CatalogManager>,

region_ids: UInt64VectorBuilder,
table_ids: UInt32VectorBuilder,
region_numbers: UInt32VectorBuilder,
memtable_sizes: UInt64VectorBuilder,
manifest_sizes: UInt64VectorBuilder,
sst_sizes: UInt64VectorBuilder,
engines: StringVectorBuilder,
region_roles: StringVectorBuilder,
}

impl InformationSchemaRegionStatisticsBuilder {
fn new(schema: SchemaRef, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema,
catalog_manager,
region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
region_numbers: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
memtable_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
manifest_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
sst_sizes: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
region_roles: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}

/// Construct a new `InformationSchemaRegionStatistics` from the collected data.
async fn make_region_statistics(
&mut self,
request: Option<ScanRequest>,
) -> Result<RecordBatch> {
let predicates = Predicates::from_scan_request(&request);
let mode = utils::running_mode(&self.catalog_manager)?.unwrap_or(Mode::Standalone);

match mode {
Mode::Standalone => {
// TODO(weny): implement it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not implement it? It's very useful for both standalone and distributed clusters to get the statistics info of regions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have implemented it in #4811

}
Mode::Distributed => {
if let Some(meta_client) = utils::meta_client(&self.catalog_manager)? {
let region_stats = meta_client
.list_region_stats()
.await
.map_err(BoxedError::new)
.context(ListRegionStatsSnafu)?;
for region_stat in region_stats {
self.add_region_statistic(&predicates, region_stat);
}
} else {
warn!("Meta client is not available");
}
}
}

self.finish()
}

fn add_region_statistic(&mut self, predicate: &Predicates, region_stat: RegionStat) {
let row = [
(REGION_ID, &Value::from(region_stat.id.as_u64())),
(TABLE_ID, &Value::from(region_stat.id.table_id())),
(REGION_NUMBER, &Value::from(region_stat.id.region_number())),
(MEMTABLE_SIZE, &Value::from(region_stat.memtable_size)),
(MANIFEST_SIZE, &Value::from(region_stat.manifest_size)),
(SST_SIZE, &Value::from(region_stat.sst_size)),
(ENGINE, &Value::from(region_stat.engine.as_str())),
(REGION_ROLE, &Value::from(region_stat.role.to_string())),
];

if !predicate.eval(&row) {
return;
}

self.region_ids.push(Some(region_stat.id.as_u64()));
self.table_ids.push(Some(region_stat.id.table_id()));
self.region_numbers
.push(Some(region_stat.id.region_number()));
self.memtable_sizes.push(Some(region_stat.memtable_size));
self.manifest_sizes.push(Some(region_stat.manifest_size));
self.sst_sizes.push(Some(region_stat.sst_size));
self.engines.push(Some(&region_stat.engine));
self.region_roles.push(Some(&region_stat.role.to_string()));
}

fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> = vec![
Arc::new(self.region_ids.finish()),
Arc::new(self.table_ids.finish()),
Arc::new(self.region_numbers.finish()),
Arc::new(self.memtable_sizes.finish()),
Arc::new(self.manifest_sizes.finish()),
Arc::new(self.sst_sizes.finish()),
Arc::new(self.engines.finish()),
Arc::new(self.region_roles.finish()),
];

RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}

impl DfPartitionStream for InformationSchemaRegionStatistics {
fn schema(&self) -> &ArrowSchemaRef {
self.schema.arrow_schema()
}

fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_region_statistics(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ pub const CLUSTER_INFO: &str = "cluster_info";
pub const VIEWS: &str = "views";
pub const FLOWS: &str = "flows";
pub const PROCEDURE_INFO: &str = "procedure_info";
pub const REGION_STATISTICS: &str = "region_statistics";
3 changes: 3 additions & 0 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ pub const INFORMATION_SCHEMA_VIEW_TABLE_ID: u32 = 32;
pub const INFORMATION_SCHEMA_FLOW_TABLE_ID: u32 = 33;
/// id for information_schema.procedure_info
pub const INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID: u32 = 34;
/// id for information_schema.region_statistics
pub const INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID: u32 = 35;

/// ----- End of information_schema tables -----

/// ----- Begin of pg_catalog tables -----
Expand Down
4 changes: 4 additions & 0 deletions src/common/meta/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};

use crate::datanode::RegionStat;
use crate::error::{
DecodeJsonSnafu, EncodeJsonSnafu, Error, FromUtf8Snafu, InvalidNodeInfoKeySnafu,
InvalidRoleSnafu, ParseNumSnafu, Result,
Expand Down Expand Up @@ -47,6 +48,9 @@ pub trait ClusterInfo {
role: Option<Role>,
) -> std::result::Result<Vec<NodeInfo>, Self::Error>;

/// List all region stats in the cluster.
async fn list_region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error>;

// TODO(jeremy): Other info, like region status, etc.
}

Expand Down
Loading