Skip to content

Commit

Permalink
feat: introduce 'pg_catalog.pg_type' (#4332)
Browse files Browse the repository at this point in the history
* WIP: pg_catalog

* refactor: move memory_table to crate public level to reuse it in pgcatalog

* refactor: new system_schema mod to manage implementation of information_schema and pg_catalog

* feat: pg_catalog.pg_type

* fix: remove unused code to avoid warning

* test: add pg_catalog sqlness test

* feat: pg_catalog_cache in system_catalog

* fix: integration test

* test: rollback unit test

* refactor: mix pg_catalog table_id with old ones

* fix: add todo information

* tests: rerun sqlness

---------

Co-authored-by: johnsonlee <[email protected]>
  • Loading branch information
J0HN50N133 and johnsonlee authored Jul 15, 2024
1 parent 7900367 commit 072d7c2
Show file tree
Hide file tree
Showing 36 changed files with 782 additions and 297 deletions.
49 changes: 41 additions & 8 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::{Arc, Weak};
use async_stream::try_stream;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID,
PG_CATALOG_NAME,
};
use common_config::Mode;
use common_error::ext::BoxedError;
Expand Down Expand Up @@ -46,6 +47,8 @@ use crate::error::{
};
use crate::information_schema::InformationSchemaProvider;
use crate::kvbackend::TableCacheRef;
use crate::system_schema::pg_catalog::PGCatalogProvider;
use crate::system_schema::SystemSchemaProvider;
use crate::CatalogManager;

/// Access all existing catalog, schema and tables.
Expand Down Expand Up @@ -86,10 +89,15 @@ impl KvBackendCatalogManager {
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
pg_catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
information_schema_provider: Arc::new(InformationSchemaProvider::new(
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
)),
pg_catalog_provider: Arc::new(PGCatalogProvider::new(
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
)),
},
cache_registry,
})
Expand Down Expand Up @@ -295,37 +303,49 @@ fn build_table(table_info_value: TableInfoValue) -> Result<TableRef> {
/// Existing system tables:
/// - public.numbers
/// - information_schema.{tables}
/// - pg_catalog.{tables}
#[derive(Clone)]
struct SystemCatalog {
catalog_manager: Weak<KvBackendCatalogManager>,
catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,

// system_schema_provier for default catalog
information_schema_provider: Arc<InformationSchemaProvider>,
pg_catalog_provider: Arc<PGCatalogProvider>,
}

impl SystemCatalog {
// TODO(j0hn50n133): remove the duplicated hard-coded table names logic
fn schema_names(&self) -> Vec<String> {
vec![INFORMATION_SCHEMA_NAME.to_string()]
vec![
INFORMATION_SCHEMA_NAME.to_string(),
PG_CATALOG_NAME.to_string(),
]
}

fn table_names(&self, schema: &str) -> Vec<String> {
if schema == INFORMATION_SCHEMA_NAME {
self.information_schema_provider.table_names()
} else if schema == DEFAULT_SCHEMA_NAME {
vec![NUMBERS_TABLE_NAME.to_string()]
} else {
vec![]
match schema {
INFORMATION_SCHEMA_NAME => self.information_schema_provider.table_names(),
PG_CATALOG_NAME => self.pg_catalog_provider.table_names(),
DEFAULT_SCHEMA_NAME => {
vec![NUMBERS_TABLE_NAME.to_string()]
}
_ => vec![],
}
}

fn schema_exists(&self, schema: &str) -> bool {
schema == INFORMATION_SCHEMA_NAME
schema == INFORMATION_SCHEMA_NAME || schema == PG_CATALOG_NAME
}

fn table_exists(&self, schema: &str, table: &str) -> bool {
if schema == INFORMATION_SCHEMA_NAME {
self.information_schema_provider.table(table).is_some()
} else if schema == DEFAULT_SCHEMA_NAME {
table == NUMBERS_TABLE_NAME
} else if schema == PG_CATALOG_NAME {
self.pg_catalog_provider.table(table).is_some()
} else {
false
}
Expand All @@ -341,6 +361,19 @@ impl SystemCatalog {
))
});
information_schema_provider.table(table_name)
} else if schema == PG_CATALOG_NAME {
if catalog == DEFAULT_CATALOG_NAME {
self.pg_catalog_provider.table(table_name)
} else {
let pg_catalog_provider =
self.pg_catalog_cache.get_with_by_ref(catalog, move || {
Arc::new(PGCatalogProvider::new(
catalog.to_string(),
self.catalog_manager.clone(),
))
});
pg_catalog_provider.table(table_name)
}
} else if schema == DEFAULT_SCHEMA_NAME && table_name == NUMBERS_TABLE_NAME {
Some(NumbersTable::table(NUMBERS_TABLE_ID))
} else {
Expand Down
8 changes: 6 additions & 2 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@ use table::TableRef;
use crate::error::Result;

pub mod error;
pub mod information_schema;
pub mod kvbackend;
pub mod memory;
mod metrics;
pub mod table_source;
pub mod system_schema;
pub mod information_schema {
// TODO(j0hn50n133): re-export to make it compatible with the legacy code, migrate to the new path later
pub use crate::system_schema::information_schema::*;
}

pub mod table_source;
#[async_trait::async_trait]
pub trait CatalogManager: Send + Sync {
fn as_any(&self) -> &dyn Any;
Expand Down
12 changes: 10 additions & 2 deletions src/catalog/src/memory/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ use std::sync::{Arc, RwLock, Weak};
use async_stream::{stream, try_stream};
use common_catalog::build_db_string;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
};
use futures_util::stream::BoxStream;
use snafu::OptionExt;
use table::TableRef;

use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
use crate::information_schema::InformationSchemaProvider;
use crate::system_schema::SystemSchemaProvider;
use crate::{CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, RegisterTableRequest};

type SchemaEntries = HashMap<String, HashMap<String, TableRef>>;
Expand Down Expand Up @@ -173,6 +175,12 @@ impl MemoryCatalogManager {
schema: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
})
.unwrap();
manager
.register_schema_sync(RegisterSchemaRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: PG_CATALOG_NAME.to_string(),
})
.unwrap();
manager
.register_schema_sync(RegisterSchemaRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
Expand All @@ -196,7 +204,7 @@ impl MemoryCatalogManager {
}

fn catalog_exist_sync(&self, catalog: &str) -> Result<bool> {
Ok(self.catalogs.read().unwrap().get(catalog).is_some())
Ok(self.catalogs.read().unwrap().contains_key(catalog))
}

/// Registers a catalog if it does not exist and returns false if the schema exists.
Expand Down
164 changes: 164 additions & 0 deletions src/catalog/src/system_schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod information_schema;
mod memory_table;
pub mod pg_catalog;

use std::collections::HashMap;
use std::sync::Arc;

use common_error::ext::BoxedError;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use snafu::ResultExt;
use store_api::data_source::DataSource;
use store_api::storage::ScanRequest;
use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu};
use table::metadata::{
FilterPushDownType, TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType,
};
use table::{Table, TableRef};

use crate::error::Result;

pub trait SystemSchemaProvider {
/// Returns a map of [TableRef] in information schema.
fn tables(&self) -> &HashMap<String, TableRef>;

/// Returns the [TableRef] by table name.
fn table(&self, name: &str) -> Option<TableRef> {
self.tables().get(name).cloned()
}

/// Returns table names in the order of table id.
fn table_names(&self) -> Vec<String> {
let mut tables = self.tables().values().clone().collect::<Vec<_>>();

tables.sort_by(|t1, t2| {
t1.table_info()
.table_id()
.partial_cmp(&t2.table_info().table_id())
.unwrap()
});
tables
.into_iter()
.map(|t| t.table_info().name.clone())
.collect()
}
}

trait SystemSchemaProviderInner {
fn catalog_name(&self) -> &str;
fn schema_name() -> &'static str;
fn build_table(&self, name: &str) -> Option<TableRef> {
self.system_table(name).map(|table| {
let table_info = Self::table_info(self.catalog_name().to_string(), &table);
let filter_pushdown = FilterPushDownType::Inexact;
let data_source = Arc::new(SystemTableDataSource::new(table));
let table = Table::new(table_info, filter_pushdown, data_source);
Arc::new(table)
})
}
fn system_table(&self, name: &str) -> Option<SystemTableRef>;

fn table_info(catalog_name: String, table: &SystemTableRef) -> TableInfoRef {
let table_meta = TableMetaBuilder::default()
.schema(table.schema())
.primary_key_indices(vec![])
.next_column_id(0)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.table_id(table.table_id())
.name(table.table_name().to_string())
.catalog_name(catalog_name)
.schema_name(Self::schema_name().to_string())
.meta(table_meta)
.table_type(table.table_type())
.build()
.unwrap();
Arc::new(table_info)
}
}

pub(crate) trait SystemTable {
fn table_id(&self) -> TableId;

fn table_name(&self) -> &'static str;

fn schema(&self) -> SchemaRef;

fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;

fn table_type(&self) -> TableType {
TableType::Temporary
}
}

pub(crate) type SystemTableRef = Arc<dyn SystemTable + Send + Sync>;

struct SystemTableDataSource {
table: SystemTableRef,
}

impl SystemTableDataSource {
fn new(table: SystemTableRef) -> Self {
Self { table }
}

fn try_project(&self, projection: &[usize]) -> std::result::Result<SchemaRef, BoxedError> {
let schema = self
.table
.schema()
.try_project(projection)
.context(SchemaConversionSnafu)
.map_err(BoxedError::new)?;
Ok(Arc::new(schema))
}
}

impl DataSource for SystemTableDataSource {
fn get_stream(
&self,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
let projection = request.projection.clone();
let projected_schema = match &projection {
Some(projection) => self.try_project(projection)?,
None => self.table.schema(),
};

let stream = self
.table
.to_stream(request)
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)
.map_err(BoxedError::new)?
.map(move |batch| match &projection {
Some(p) => batch.and_then(|b| b.try_project(p)),
None => batch,
});

let stream = RecordBatchStreamWrapper {
schema: projected_schema,
stream: Box::pin(stream),
output_ordering: None,
metrics: Default::default(),
};

Ok(Box::pin(stream))
}
}
Loading

0 comments on commit 072d7c2

Please sign in to comment.