Skip to content

Commit

Permalink
refactor: merge catalog provider & schema provider into catalog manag…
Browse files Browse the repository at this point in the history
…er (#1803)

* move  to expr_factory

Signed-off-by: Ruihang Xia <[email protected]>

* move configs into service_config

Signed-off-by: Ruihang Xia <[email protected]>

* move GrpcQueryHandler into distributed.rs

Signed-off-by: Ruihang Xia <[email protected]>

* fix compile and test in catalog sub-crate

Signed-off-by: Ruihang Xia <[email protected]>

* clean up

Signed-off-by: Ruihang Xia <[email protected]>

* fix table-procedure compile and test

Signed-off-by: Ruihang Xia <[email protected]>

* fix query compile and tests

Signed-off-by: Ruihang Xia <[email protected]>

* fix datanode compile and tests

Signed-off-by: Ruihang Xia <[email protected]>

* fix catalog/query/script/servers compile

Signed-off-by: Ruihang Xia <[email protected]>

* fix frontend compile

Signed-off-by: Ruihang Xia <[email protected]>

* fix nextest except information_schema

Signed-off-by: Ruihang Xia <[email protected]>

* support information_schema

Signed-off-by: Ruihang Xia <[email protected]>

* fix sqlness test

Signed-off-by: Ruihang Xia <[email protected]>

* fix merge errors

Signed-off-by: Ruihang Xia <[email protected]>

* remove other structs

Signed-off-by: Ruihang Xia <[email protected]>

* clean up

Signed-off-by: Ruihang Xia <[email protected]>

* fix format

Signed-off-by: Ruihang Xia <[email protected]>

* change deregister_table's return type to empty tuple

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Jun 26, 2023
1 parent 964d26e commit a95f876
Show file tree
Hide file tree
Showing 50 changed files with 1,397 additions and 2,085 deletions.
11 changes: 10 additions & 1 deletion src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,18 @@ pub enum Error {
source: BoxedError,
},

#[snafu(display(
"Failed to upgrade weak catalog manager reference. location: {}",
location
))]
UpgradeWeakCatalogManagerRef { location: Location },

#[snafu(display("Failed to execute system catalog table scan, source: {}", source))]
SystemCatalogTableScanExec {
location: Location,
source: common_query::error::Error,
},

#[snafu(display("Cannot parse catalog value, source: {}", source))]
InvalidCatalogValue {
location: Location,
Expand Down Expand Up @@ -256,7 +263,9 @@ impl ErrorExt for Error {
| Error::EmptyValue { .. }
| Error::ValueDeserialize { .. } => StatusCode::StorageUnavailable,

Error::Generic { .. } | Error::SystemCatalogTypeMismatch { .. } => StatusCode::Internal,
Error::Generic { .. }
| Error::SystemCatalogTypeMismatch { .. }
| Error::UpgradeWeakCatalogManagerRef { .. } => StatusCode::Internal,

Error::ReadSystemCatalog { source, .. } | Error::CreateRecordBatch { source, .. } => {
source.status_code()
Expand Down
2 changes: 2 additions & 0 deletions src/catalog/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub fn build_schema_prefix(catalog_name: impl AsRef<str>) -> String {
format!("{SCHEMA_KEY_PREFIX}-{}-", catalog_name.as_ref())
}

/// Global table info has only one key across all datanodes so it does not have `node_id` field.
pub fn build_table_global_prefix(
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,
Expand All @@ -78,6 +79,7 @@ pub fn build_table_global_prefix(
)
}

/// Regional table info varies between datanode, so it contains a `node_id` field.
pub fn build_table_regional_prefix(
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,
Expand Down
36 changes: 10 additions & 26 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod columns;
mod tables;

use std::any::Any;
use std::sync::Arc;
use std::sync::{Arc, Weak};

use async_trait::async_trait;
use common_error::prelude::BoxedError;
Expand All @@ -33,46 +33,35 @@ use table::{Result as TableResult, Table, TableRef};
use self::columns::InformationSchemaColumns;
use crate::error::Result;
use crate::information_schema::tables::InformationSchemaTables;
use crate::{CatalogProviderRef, SchemaProvider};
use crate::CatalogManager;

const TABLES: &str = "tables";
const COLUMNS: &str = "columns";

pub(crate) struct InformationSchemaProvider {
pub struct InformationSchemaProvider {
catalog_name: String,
catalog_provider: CatalogProviderRef,
tables: Vec<String>,
catalog_manager: Weak<dyn CatalogManager>,
}

impl InformationSchemaProvider {
pub(crate) fn new(catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
pub fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
catalog_name,
catalog_provider,
tables: vec![TABLES.to_string(), COLUMNS.to_string()],
catalog_manager,
}
}
}

#[async_trait]
impl SchemaProvider for InformationSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

async fn table_names(&self) -> Result<Vec<String>> {
Ok(self.tables.clone())
}

async fn table(&self, name: &str) -> Result<Option<TableRef>> {
impl InformationSchemaProvider {
pub fn table(&self, name: &str) -> Result<Option<TableRef>> {
let stream_builder = match name.to_ascii_lowercase().as_ref() {
TABLES => Arc::new(InformationSchemaTables::new(
self.catalog_name.clone(),
self.catalog_provider.clone(),
self.catalog_manager.clone(),
)) as _,
COLUMNS => Arc::new(InformationSchemaColumns::new(
self.catalog_name.clone(),
self.catalog_provider.clone(),
self.catalog_manager.clone(),
)) as _,
_ => {
return Ok(None);
Expand All @@ -81,11 +70,6 @@ impl SchemaProvider for InformationSchemaProvider {

Ok(Some(Arc::new(InformationTable::new(stream_builder))))
}

async fn table_exist(&self, name: &str) -> Result<bool> {
let normalized_name = name.to_ascii_lowercase();
Ok(self.tables.contains(&normalized_name))
}
}

// TODO(ruihang): make it a more generic trait:
Expand Down
50 changes: 34 additions & 16 deletions src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::{
Expand All @@ -29,16 +29,18 @@ use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};

use super::InformationStreamBuilder;
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::CatalogProviderRef;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::CatalogManager;

pub(super) struct InformationSchemaColumns {
schema: SchemaRef,
catalog_name: String,
catalog_provider: CatalogProviderRef,
catalog_manager: Weak<dyn CatalogManager>,
}

const TABLE_CATALOG: &str = "table_catalog";
Expand All @@ -49,7 +51,7 @@ const DATA_TYPE: &str = "data_type";
const SEMANTIC_TYPE: &str = "semantic_type";

impl InformationSchemaColumns {
pub(super) fn new(catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
Expand All @@ -61,15 +63,15 @@ impl InformationSchemaColumns {
Self {
schema,
catalog_name,
catalog_provider,
catalog_manager,
}
}

fn builder(&self) -> InformationSchemaColumnsBuilder {
InformationSchemaColumnsBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_provider.clone(),
self.catalog_manager.clone(),
)
}
}
Expand Down Expand Up @@ -103,7 +105,7 @@ impl InformationStreamBuilder for InformationSchemaColumns {
struct InformationSchemaColumnsBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_provider: CatalogProviderRef,
catalog_manager: Weak<dyn CatalogManager>,

catalog_names: StringVectorBuilder,
schema_names: StringVectorBuilder,
Expand All @@ -114,11 +116,15 @@ struct InformationSchemaColumnsBuilder {
}

impl InformationSchemaColumnsBuilder {
fn new(schema: SchemaRef, catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> Self {
Self {
schema,
catalog_name,
catalog_provider,
catalog_manager,
catalog_names: StringVectorBuilder::with_capacity(42),
schema_names: StringVectorBuilder::with_capacity(42),
table_names: StringVectorBuilder::with_capacity(42),
Expand All @@ -131,11 +137,23 @@ impl InformationSchemaColumnsBuilder {
/// Construct the `information_schema.tables` virtual table
async fn make_tables(&mut self) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();

for schema_name in self.catalog_provider.schema_names().await? {
let Some(schema) = self.catalog_provider.schema(&schema_name).await? else { continue };
for table_name in schema.table_names().await? {
let Some(table) = schema.table(&table_name).await? else { continue };
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if !catalog_manager
.schema_exist(&catalog_name, &schema_name)
.await?
{
continue;
}
for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
let Some(table) = catalog_manager.table(&catalog_name, &schema_name, &table_name).await? else { continue };
let keys = &table.table_info().meta.primary_key_indices;
let schema = table.schema();
for (idx, column) in schema.column_schemas().iter().enumerate() {
Expand Down
48 changes: 33 additions & 15 deletions src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_NAME;
Expand All @@ -26,21 +26,23 @@ use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatc
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableType;

use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::InformationStreamBuilder;
use crate::CatalogProviderRef;
use crate::CatalogManager;

pub(super) struct InformationSchemaTables {
schema: SchemaRef,
catalog_name: String,
catalog_provider: CatalogProviderRef,
catalog_manager: Weak<dyn CatalogManager>,
}

impl InformationSchemaTables {
pub(super) fn new(catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("table_catalog", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_schema", ConcreteDataType::string_datatype(), false),
Expand All @@ -52,15 +54,15 @@ impl InformationSchemaTables {
Self {
schema,
catalog_name,
catalog_provider,
catalog_manager,
}
}

fn builder(&self) -> InformationSchemaTablesBuilder {
InformationSchemaTablesBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_provider.clone(),
self.catalog_manager.clone(),
)
}
}
Expand Down Expand Up @@ -97,7 +99,7 @@ impl InformationStreamBuilder for InformationSchemaTables {
struct InformationSchemaTablesBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_provider: CatalogProviderRef,
catalog_manager: Weak<dyn CatalogManager>,

catalog_names: StringVectorBuilder,
schema_names: StringVectorBuilder,
Expand All @@ -108,11 +110,15 @@ struct InformationSchemaTablesBuilder {
}

impl InformationSchemaTablesBuilder {
fn new(schema: SchemaRef, catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> Self {
Self {
schema,
catalog_name,
catalog_provider,
catalog_manager,
catalog_names: StringVectorBuilder::with_capacity(42),
schema_names: StringVectorBuilder::with_capacity(42),
table_names: StringVectorBuilder::with_capacity(42),
Expand All @@ -125,15 +131,27 @@ impl InformationSchemaTablesBuilder {
/// Construct the `information_schema.tables` virtual table
async fn make_tables(&mut self) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;

for schema_name in self.catalog_provider.schema_names().await? {
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if schema_name == INFORMATION_SCHEMA_NAME {
continue;
}
if !catalog_manager
.schema_exist(&catalog_name, &schema_name)
.await?
{
continue;
}

let Some(schema) = self.catalog_provider.schema(&schema_name).await? else { continue };
for table_name in schema.table_names().await? {
let Some(table) = schema.table(&table_name).await? else { continue };
for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
let Some(table) = catalog_manager.table(&catalog_name, &schema_name, &table_name).await? else { continue };
let table_info = table.table_info();
self.add_table(
&catalog_name,
Expand Down
Loading

0 comments on commit a95f876

Please sign in to comment.