Skip to content

Commit

Permalink
fix: resolve catalog and schema in dist planner (#1891)
Browse files Browse the repository at this point in the history
* try resolve catalog and schema

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy

Signed-off-by: Ruihang Xia <[email protected]>

* upload sqlness case

Signed-off-by: Ruihang Xia <[email protected]>

* fix information schema case

Signed-off-by: Ruihang Xia <[email protected]>

* fix unnamed table name

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Jul 6, 2023
1 parent 979400a commit 9153191
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 6 deletions.
5 changes: 5 additions & 0 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use futures_util::StreamExt;
use snafu::ResultExt;
use store_api::storage::ScanRequest;
use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu};
use table::metadata::TableType;
use table::{Result as TableResult, Table, TableRef};

use self::columns::InformationSchemaColumns;
Expand Down Expand Up @@ -102,6 +103,10 @@ impl Table for InformationTable {
unreachable!("Should not call table_info() of InformationTable directly")
}

fn table_type(&self) -> table::metadata::TableType {
TableType::View
}

async fn scan_to_stream(&self, request: ScanRequest) -> TableResult<SendableRecordBatchStream> {
let projection = request.projection;
let projected_schema = if let Some(projection) = &projection {
Expand Down
6 changes: 3 additions & 3 deletions src/common/substrait/src/df_substrait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::Arc;

use async_trait::async_trait;
use bytes::{Buf, Bytes, BytesMut};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datafusion::catalog::catalog::CatalogList;
use datafusion::execution::context::SessionState;
use datafusion::execution::runtime_env::RuntimeEnv;
Expand All @@ -43,9 +42,10 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor {
&self,
message: B,
catalog_list: Arc<dyn CatalogList>,
catalog: &str,
schema: &str,
) -> Result<Self::Plan, Self::Error> {
let state_config = SessionConfig::new()
.with_default_catalog_and_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
let state_config = SessionConfig::new().with_default_catalog_and_schema(catalog, schema);
let state = SessionState::with_config_rt(state_config, Arc::new(RuntimeEnv::default()));
let mut context = SessionContext::with_state(state);
context.register_catalog_list(catalog_list);
Expand Down
2 changes: 2 additions & 0 deletions src/common/substrait/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub trait SubstraitPlan {
&self,
message: B,
catalog_list: Arc<dyn CatalogList>,
catalog: &str,
schema: &str,
) -> Result<Self::Plan, Self::Error>;

fn encode(&self, plan: Self::Plan) -> Result<Bytes, Self::Error>;
Expand Down
7 changes: 6 additions & 1 deletion src/datanode/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ impl Instance {
.await?;

let logical_plan = DFLogicalSubstraitConvertor
.decode(plan_bytes.as_slice(), Arc::new(catalog_list) as Arc<_>)
.decode(
plan_bytes.as_slice(),
Arc::new(catalog_list) as Arc<_>,
&ctx.current_catalog(),
&ctx.current_schema(),
)
.await
.context(DecodeLogicalPlanSnafu)?;

Expand Down
52 changes: 50 additions & 2 deletions src/query/src/dist_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::peer::Peer;
use common_meta::table_name::TableName;
use datafusion::common::Result;
use datafusion::datasource::DefaultTableSource;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::planner::ExtensionPlanner;
use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner};
use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::{DataFusionError, TableReference};
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
use partition::manager::PartitionRuleManager;
use snafu::ResultExt;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
pub use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;

use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan};
use crate::error;
Expand Down Expand Up @@ -82,6 +85,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
.map(Some);
};
let input_schema = input_plan.schema().clone();
let input_plan = self.set_table_name(&table_name, input_plan.clone())?;
let substrait_plan: Bytes = DFLogicalSubstraitConvertor
.encode(input_plan.clone())
.context(error::EncodeSubstraitLogicalPlanSnafu)?
Expand All @@ -100,7 +104,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
Ok(Some(Arc::new(exec) as _))
}
Err(_) => planner
.create_physical_plan(input_plan, session_state)
.create_physical_plan(&input_plan, session_state)
.await
.map(Some),
}
Expand All @@ -119,6 +123,12 @@ impl DistExtensionPlanner {
Ok(extractor.table_name)
}

/// Set the fully resolved table name to TableScan plan
fn set_table_name(&self, name: &TableName, plan: LogicalPlan) -> Result<LogicalPlan> {
// let mut rewriter
plan.transform(&|plan| TableNameRewriter::rewrite_table_name(plan, name))
}

async fn get_peers(&self, table_name: &TableName) -> Result<Vec<Peer>> {
self.partition_manager
.find_table_region_leaders(table_name)
Expand All @@ -142,6 +152,23 @@ impl TreeNodeVisitor for TableNameExtractor {
fn pre_visit(&mut self, node: &Self::N) -> Result<VisitRecursion> {
match node {
LogicalPlan::TableScan(scan) => {
if let Some(source) = scan.source.as_any().downcast_ref::<DefaultTableSource>() {
if let Some(provider) = source
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
{
if provider.table().table_type() == TableType::Base {
let info = provider.table().table_info();
self.table_name = Some(TableName::new(
info.catalog_name.clone(),
info.schema_name.clone(),
info.name.clone(),
));
return Ok(VisitRecursion::Stop);
}
}
}
match &scan.table_name {
TableReference::Full {
catalog,
Expand Down Expand Up @@ -178,3 +205,24 @@ impl TreeNodeVisitor for TableNameExtractor {
}
}
}

struct TableNameRewriter;

impl TableNameRewriter {
fn rewrite_table_name(
plan: LogicalPlan,
name: &TableName,
) -> datafusion_common::Result<Transformed<LogicalPlan>> {
Ok(match plan {
LogicalPlan::TableScan(mut table_scan) => {
table_scan.table_name = TableReference::full(
name.catalog_name.clone(),
name.schema_name.clone(),
name.table_name.clone(),
);
Transformed::Yes(LogicalPlan::TableScan(table_scan))
}
_ => Transformed::No(plan),
})
}
}
39 changes: 39 additions & 0 deletions tests/cases/standalone/common/create/upper_case_table_name.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
create database upper_case_table_name;

Affected Rows: 1

use upper_case_table_name;

++
++

create table system_Metric(ts timestamp time index);

Affected Rows: 0

insert into system_Metric values (0), (1);

Affected Rows: 2

select * from system_Metric;

Error: 3000(PlanQuery), Error during planning: Table not found: greptime.upper_case_table_name.system_metric

select * from "system_Metric";

+-------------------------+
| ts |
+-------------------------+
| 1970-01-01T00:00:00 |
| 1970-01-01T00:00:00.001 |
+-------------------------+

drop table system_Metric;

Affected Rows: 1

use public;

++
++

15 changes: 15 additions & 0 deletions tests/cases/standalone/common/create/upper_case_table_name.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
create database upper_case_table_name;

use upper_case_table_name;

create table system_Metric(ts timestamp time index);

insert into system_Metric values (0), (1);

select * from system_Metric;

select * from "system_Metric";

drop table system_Metric;

use public;

0 comments on commit 9153191

Please sign in to comment.