Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: merge catalog provider & schema provider into catalog manager #1803

Merged
merged 21 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,18 @@ pub enum Error {
source: BoxedError,
},

#[snafu(display(
"Failed to upgrade weak catalog manager reference. location: {}",
location
))]
UpgradeWeakCatalogManagerRef { location: Location },

#[snafu(display("Failed to execute system catalog table scan, source: {}", source))]
SystemCatalogTableScanExec {
location: Location,
source: common_query::error::Error,
},

#[snafu(display("Cannot parse catalog value, source: {}", source))]
InvalidCatalogValue {
location: Location,
Expand Down Expand Up @@ -256,7 +263,9 @@ impl ErrorExt for Error {
| Error::EmptyValue { .. }
| Error::ValueDeserialize { .. } => StatusCode::StorageUnavailable,

Error::Generic { .. } | Error::SystemCatalogTypeMismatch { .. } => StatusCode::Internal,
Error::Generic { .. }
| Error::SystemCatalogTypeMismatch { .. }
| Error::UpgradeWeakCatalogManagerRef { .. } => StatusCode::Internal,

Error::ReadSystemCatalog { source, .. } | Error::CreateRecordBatch { source, .. } => {
source.status_code()
Expand Down
2 changes: 2 additions & 0 deletions src/catalog/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub fn build_schema_prefix(catalog_name: impl AsRef<str>) -> String {
format!("{SCHEMA_KEY_PREFIX}-{}-", catalog_name.as_ref())
}

/// Global table info has only one key across all datanodes so it does not have `node_id` field.
pub fn build_table_global_prefix(
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,
Expand All @@ -78,6 +79,7 @@ pub fn build_table_global_prefix(
)
}

/// Regional table info varies between datanode, so it contains a `node_id` field.
pub fn build_table_regional_prefix(
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,
Expand Down
36 changes: 10 additions & 26 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod columns;
mod tables;

use std::any::Any;
use std::sync::Arc;
use std::sync::{Arc, Weak};

use async_trait::async_trait;
use common_error::prelude::BoxedError;
Expand All @@ -33,46 +33,35 @@ use table::{Result as TableResult, Table, TableRef};
use self::columns::InformationSchemaColumns;
use crate::error::Result;
use crate::information_schema::tables::InformationSchemaTables;
use crate::{CatalogProviderRef, SchemaProvider};
use crate::CatalogManager;

const TABLES: &str = "tables";
const COLUMNS: &str = "columns";

pub(crate) struct InformationSchemaProvider {
pub struct InformationSchemaProvider {
catalog_name: String,
catalog_provider: CatalogProviderRef,
tables: Vec<String>,
catalog_manager: Weak<dyn CatalogManager>,
}

impl InformationSchemaProvider {
pub(crate) fn new(catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
pub fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
catalog_name,
catalog_provider,
tables: vec![TABLES.to_string(), COLUMNS.to_string()],
catalog_manager,
}
}
}

#[async_trait]
impl SchemaProvider for InformationSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

async fn table_names(&self) -> Result<Vec<String>> {
Ok(self.tables.clone())
}

async fn table(&self, name: &str) -> Result<Option<TableRef>> {
impl InformationSchemaProvider {
pub fn table(&self, name: &str) -> Result<Option<TableRef>> {
let stream_builder = match name.to_ascii_lowercase().as_ref() {
TABLES => Arc::new(InformationSchemaTables::new(
self.catalog_name.clone(),
self.catalog_provider.clone(),
self.catalog_manager.clone(),
)) as _,
COLUMNS => Arc::new(InformationSchemaColumns::new(
self.catalog_name.clone(),
self.catalog_provider.clone(),
self.catalog_manager.clone(),
)) as _,
_ => {
return Ok(None);
Expand All @@ -81,11 +70,6 @@ impl SchemaProvider for InformationSchemaProvider {

Ok(Some(Arc::new(InformationTable::new(stream_builder))))
}

async fn table_exist(&self, name: &str) -> Result<bool> {
let normalized_name = name.to_ascii_lowercase();
Ok(self.tables.contains(&normalized_name))
}
}

// TODO(ruihang): make it a more generic trait:
Expand Down
50 changes: 34 additions & 16 deletions src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::{
Expand All @@ -29,16 +29,18 @@ use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};

use super::InformationStreamBuilder;
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::CatalogProviderRef;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::CatalogManager;

pub(super) struct InformationSchemaColumns {
schema: SchemaRef,
catalog_name: String,
catalog_provider: CatalogProviderRef,
catalog_manager: Weak<dyn CatalogManager>,
}

const TABLE_CATALOG: &str = "table_catalog";
Expand All @@ -49,7 +51,7 @@ const DATA_TYPE: &str = "data_type";
const SEMANTIC_TYPE: &str = "semantic_type";

impl InformationSchemaColumns {
pub(super) fn new(catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
Expand All @@ -61,15 +63,15 @@ impl InformationSchemaColumns {
Self {
schema,
catalog_name,
catalog_provider,
catalog_manager,
}
}

fn builder(&self) -> InformationSchemaColumnsBuilder {
InformationSchemaColumnsBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_provider.clone(),
self.catalog_manager.clone(),
)
}
}
Expand Down Expand Up @@ -103,7 +105,7 @@ impl InformationStreamBuilder for InformationSchemaColumns {
struct InformationSchemaColumnsBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_provider: CatalogProviderRef,
catalog_manager: Weak<dyn CatalogManager>,

catalog_names: StringVectorBuilder,
schema_names: StringVectorBuilder,
Expand All @@ -114,11 +116,15 @@ struct InformationSchemaColumnsBuilder {
}

impl InformationSchemaColumnsBuilder {
fn new(schema: SchemaRef, catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> Self {
Self {
schema,
catalog_name,
catalog_provider,
catalog_manager,
catalog_names: StringVectorBuilder::with_capacity(42),
schema_names: StringVectorBuilder::with_capacity(42),
table_names: StringVectorBuilder::with_capacity(42),
Expand All @@ -131,11 +137,23 @@ impl InformationSchemaColumnsBuilder {
/// Construct the `information_schema.tables` virtual table
async fn make_tables(&mut self) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();

for schema_name in self.catalog_provider.schema_names().await? {
let Some(schema) = self.catalog_provider.schema(&schema_name).await? else { continue };
for table_name in schema.table_names().await? {
let Some(table) = schema.table(&table_name).await? else { continue };
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if !catalog_manager
.schema_exist(&catalog_name, &schema_name)
.await?
{
continue;
}
for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
let Some(table) = catalog_manager.table(&catalog_name, &schema_name, &table_name).await? else { continue };
let keys = &table.table_info().meta.primary_key_indices;
let schema = table.schema();
for (idx, column) in schema.column_schemas().iter().enumerate() {
Expand Down
48 changes: 33 additions & 15 deletions src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_NAME;
Expand All @@ -26,21 +26,23 @@ use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatc
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableType;

use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::InformationStreamBuilder;
use crate::CatalogProviderRef;
use crate::CatalogManager;

pub(super) struct InformationSchemaTables {
schema: SchemaRef,
catalog_name: String,
catalog_provider: CatalogProviderRef,
catalog_manager: Weak<dyn CatalogManager>,
}

impl InformationSchemaTables {
pub(super) fn new(catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("table_catalog", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_schema", ConcreteDataType::string_datatype(), false),
Expand All @@ -52,15 +54,15 @@ impl InformationSchemaTables {
Self {
schema,
catalog_name,
catalog_provider,
catalog_manager,
}
}

fn builder(&self) -> InformationSchemaTablesBuilder {
InformationSchemaTablesBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_provider.clone(),
self.catalog_manager.clone(),
)
}
}
Expand Down Expand Up @@ -97,7 +99,7 @@ impl InformationStreamBuilder for InformationSchemaTables {
struct InformationSchemaTablesBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_provider: CatalogProviderRef,
catalog_manager: Weak<dyn CatalogManager>,

catalog_names: StringVectorBuilder,
schema_names: StringVectorBuilder,
Expand All @@ -108,11 +110,15 @@ struct InformationSchemaTablesBuilder {
}

impl InformationSchemaTablesBuilder {
fn new(schema: SchemaRef, catalog_name: String, catalog_provider: CatalogProviderRef) -> Self {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> Self {
Self {
schema,
catalog_name,
catalog_provider,
catalog_manager,
catalog_names: StringVectorBuilder::with_capacity(42),
schema_names: StringVectorBuilder::with_capacity(42),
table_names: StringVectorBuilder::with_capacity(42),
Expand All @@ -125,15 +131,27 @@ impl InformationSchemaTablesBuilder {
/// Construct the `information_schema.tables` virtual table
async fn make_tables(&mut self) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;

for schema_name in self.catalog_provider.schema_names().await? {
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if schema_name == INFORMATION_SCHEMA_NAME {
continue;
}
if !catalog_manager
.schema_exist(&catalog_name, &schema_name)
.await?
{
continue;
}

let Some(schema) = self.catalog_provider.schema(&schema_name).await? else { continue };
for table_name in schema.table_names().await? {
let Some(table) = schema.table(&table_name).await? else { continue };
for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
let Some(table) = catalog_manager.table(&catalog_name, &schema_name, &table_name).await? else { continue };
let table_info = table.table_info();
self.add_table(
&catalog_name,
Expand Down
Loading