diff --git a/Cargo.lock b/Cargo.lock index 786c1c3a8bdf..891a55647bd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1014,7 +1014,7 @@ dependencies = [ "bitflags 2.5.0", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.12.1", "lazy_static", "lazycell", "log", @@ -4019,6 +4019,7 @@ dependencies = [ "common-test-util", "common-time", "common-version", + "datafusion-expr", "datanode", "datatypes", "futures", @@ -4836,7 +4837,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -8439,7 +8440,7 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", "heck 0.5.0", - "itertools 0.10.5", + "itertools 0.12.1", "log", "multimap", "once_cell", @@ -8491,7 +8492,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.66", @@ -8651,7 +8652,7 @@ dependencies = [ "indoc", "libc", "memoffset 0.9.1", - "parking_lot 0.11.2", + "parking_lot 0.12.3", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -10480,6 +10481,7 @@ dependencies = [ "dashmap", "datafusion", "datafusion-common", + "datafusion-expr", "datatypes", "derive_builder 0.12.0", "futures", diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 6d7e211d7cb9..3fc2184d3747 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -35,7 +35,6 @@ use either::Either; use meta_client::client::MetaClientBuilder; use query::datafusion::DatafusionQueryEngine; use query::parser::QueryLanguageParser; -use query::plan::LogicalPlan; use query::query_engine::{DefaultSerializer, QueryEngineState}; use query::QueryEngine; use rustyline::error::ReadlineError; @@ -179,7 +178,7 @@ impl Repl { .await .context(PlanStatementSnafu)?; - let LogicalPlan::DfPlan(plan) = query_engine + let plan = query_engine .optimize(&query_engine.engine_context(query_ctx), &plan) .context(PlanStatementSnafu)?; diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 56068a38c3aa..54e1cdbafd3d 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -843,7 +843,7 @@ impl RegionServerInner { let result = self .query_engine - .execute(request.plan.into(), query_ctx) + .execute(request.plan, query_ctx) .await .context(ExecuteLogicalPlanSnafu)?; diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 87285eabfa67..89be76511dfb 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -24,8 +24,8 @@ use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_query::prelude::ScalarUdf; use common_query::Output; use common_runtime::Runtime; +use datafusion_expr::LogicalPlan; use query::dataframe::DataFrame; -use query::plan::LogicalPlan; use query::planner::LogicalPlanner; use query::query_engine::{DescribeResult, QueryEngineState}; use query::{QueryEngine, QueryEngineContext}; diff --git a/src/flow/src/df_optimizer.rs b/src/flow/src/df_optimizer.rs index d5368d5189e6..d73bf91bfbe0 100644 --- a/src/flow/src/df_optimizer.rs +++ b/src/flow/src/df_optimizer.rs @@ -41,7 +41,6 @@ use datafusion_expr::{ BinaryExpr, Expr, Operator, Projection, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; use query::parser::QueryLanguageParser; -use query::plan::LogicalPlan; use query::query_engine::DefaultSerializer; use query::QueryEngine; use snafu::ResultExt; @@ -111,7 +110,6 @@ pub async fn sql_to_flow_plan( .await .map_err(BoxedError::new) .context(ExternalSnafu)?; - let LogicalPlan::DfPlan(plan) = plan; let opted_plan = apply_df_optimizer(plan).await?; diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index f6dff58856db..6d4de974175c 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -163,7 +163,6 @@ mod test { use itertools::Itertools; use prost::Message; use query::parser::QueryLanguageParser; - use query::plan::LogicalPlan; use query::query_engine::DefaultSerializer; use query::QueryEngine; use session::context::QueryContext; @@ -274,7 +273,6 @@ mod test { .plan(stmt, QueryContext::arc()) .await .unwrap(); - let LogicalPlan::DfPlan(plan) = plan; let plan = apply_df_optimizer(plan).await.unwrap(); // encode then decode so to rely on the impl of conversion from logical plan to substrait plan @@ -297,7 +295,6 @@ mod test { .plan(stmt, QueryContext::arc()) .await .unwrap(); - let LogicalPlan::DfPlan(plan) = plan; let plan = apply_df_optimizer(plan).await; assert!(plan.is_err()); diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 362d147d135f..555a20128033 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -37,6 +37,7 @@ common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true common-version.workspace = true +datafusion-expr.workspace = true datanode.workspace = true humantime-serde.workspace = true lazy_static.workspace = true diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 2c5544c51a8c..dcc56f75819a 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -40,6 +40,7 @@ use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_query::Output; use common_telemetry::{debug, error, tracing}; +use datafusion_expr::LogicalPlan; use log_store::raft_engine::RaftEngineBackend; use operator::delete::DeleterRef; use operator::insert::InserterRef; @@ -48,7 +49,6 @@ use pipeline::pipeline_operator::PipelineOperator; use prometheus::HistogramTimer; use query::metrics::OnDone; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; -use query::plan::LogicalPlan; use query::query_engine::options::{validate_catalog_and_schema, QueryOptions}; use query::query_engine::DescribeResult; use query::QueryEngineRef; diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 02a880a90342..1931814c8b07 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -133,13 +133,6 @@ pub enum Error { source: query::error::Error, }, - #[snafu(display("Failed to get schema from logical plan"))] - GetSchema { - #[snafu(implicit)] - location: Location, - source: query::error::Error, - }, - #[snafu(display("Column datatype error"))] ColumnDataType { #[snafu(implicit)] @@ -184,6 +177,13 @@ pub enum Error { source: datatypes::error::Error, }, + #[snafu(display("Failed to convert datafusion schema"))] + ConvertSchema { + source: datatypes::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to convert expr to struct"))] InvalidExpr { #[snafu(implicit)] @@ -795,6 +795,7 @@ impl ErrorExt for Error { | Error::PrepareFileTable { .. } | Error::InferFileTableSchema { .. } | Error::SchemaIncompatible { .. } + | Error::ConvertSchema { .. } | Error::UnsupportedRegionRequest { .. } | Error::InvalidTableName { .. } | Error::InvalidViewName { .. } @@ -872,7 +873,6 @@ impl ErrorExt for Error { | Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(), Error::ExecuteStatement { source, .. } - | Error::GetSchema { source, .. } | Error::ExtractTableNames { source, .. } | Error::PlanStatement { source, .. } | Error::ParseQuery { source, .. } diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 35e6752d08d3..4dc43e0d92e9 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -42,9 +42,9 @@ use common_query::Output; use common_telemetry::tracing; use common_time::range::TimestampRange; use common_time::Timestamp; +use datafusion_expr::LogicalPlan; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use query::parser::QueryStatement; -use query::plan::LogicalPlan; use query::QueryEngineRef; use session::context::QueryContextRef; use session::table_name::table_idents_to_full_name; diff --git a/src/operator/src/statement/copy_table_to.rs b/src/operator/src/statement/copy_table_to.rs index 8a90d1095569..0e53d79ac60b 100644 --- a/src/operator/src/statement/copy_table_to.rs +++ b/src/operator/src/statement/copy_table_to.rs @@ -29,7 +29,6 @@ use datafusion::datasource::DefaultTableSource; use datafusion_common::TableReference as DfTableReference; use datafusion_expr::LogicalPlanBuilder; use object_store::ObjectStore; -use query::plan::LogicalPlan; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; use table::requests::CopyTableRequest; @@ -133,7 +132,7 @@ impl StatementExecutor { let output = self .query_engine - .execute(LogicalPlan::DfPlan(plan), query_ctx) + .execute(plan, query_ctx) .await .context(ExecLogicalPlanSnafu)?; let stream = match output.data { diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index aa1a07087545..e9b186bd63b1 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -37,7 +37,7 @@ use common_query::Output; use common_telemetry::{debug, info, tracing}; use common_time::Timezone; use datatypes::prelude::ConcreteDataType; -use datatypes::schema::RawSchema; +use datatypes::schema::{RawSchema, Schema}; use datatypes::value::Value; use lazy_static::lazy_static; use partition::expr::{Operand, PartitionExpr, RestrictedOp}; @@ -69,11 +69,11 @@ use table::TableRef; use super::StatementExecutor; use crate::error::{ self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, - CreateLogicalTablesSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu, - ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu, - InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result, - SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu, - TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, + ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu, + EmptyDdlExprSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu, + InvalidPartitionSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, + ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, + SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu, }; use crate::expr_factory; @@ -406,9 +406,12 @@ impl StatementExecutor { // Save the columns in plan, it may changed when the schemas of tables in plan // are altered. - let plan_columns: Vec<_> = logical_plan + let schema: Schema = logical_plan .schema() - .context(error::GetSchemaSnafu)? + .clone() + .try_into() + .context(ConvertSchemaSnafu)?; + let plan_columns: Vec<_> = schema .column_schemas() .iter() .map(|c| c.name.clone()) @@ -434,9 +437,8 @@ impl StatementExecutor { // Extract the table names from the original plan // and rewrite them as fully qualified names. - let (table_names, plan) = - extract_and_rewrite_full_table_names(logical_plan.unwrap_df_plan(), ctx.clone()) - .context(ExtractTableNamesSnafu)?; + let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone()) + .context(ExtractTableNamesSnafu)?; let table_names = table_names.into_iter().map(|t| t.into()).collect(); diff --git a/src/operator/src/statement/tql.rs b/src/operator/src/statement/tql.rs index 72b2db641b8c..008aba0d78e8 100644 --- a/src/operator/src/statement/tql.rs +++ b/src/operator/src/statement/tql.rs @@ -16,11 +16,11 @@ use std::collections::HashMap; use common_query::Output; use common_telemetry::tracing; +use datafusion_expr::LogicalPlan; use query::parser::{ PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME, DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME, }; -use query::plan::LogicalPlan; use session::context::QueryContextRef; use snafu::ResultExt; use sql::statements::tql::Tql; diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index 3c69f59f2a05..8989412c3eb5 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -26,7 +26,7 @@ use common_telemetry::{debug, info}; use common_time::timestamp::{TimeUnit, Timestamp}; use datafusion::logical_expr::col; use datafusion_common::{TableReference, ToDFSchema}; -use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan}; +use datafusion_expr::{DmlStatement, LogicalPlan}; use datatypes::prelude::ScalarVector; use datatypes::timestamp::TimestampNanosecond; use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector}; @@ -34,7 +34,6 @@ use moka::sync::Cache; use operator::insert::InserterRef; use operator::statement::StatementExecutorRef; use query::dataframe::DataFrame; -use query::plan::LogicalPlan; use query::QueryEngineRef; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; @@ -373,7 +372,7 @@ impl PipelineTable { Arc::new(dataframe.into_parts().1), ); - let plan = LogicalPlan::DfPlan(DfLogicalPlan::Dml(stmt)); + let plan = LogicalPlan::Dml(stmt); // 4. execute dml stmt let output = self @@ -427,7 +426,7 @@ impl PipelineTable { .limit(0, Some(1)) .context(BuildDfLogicalPlanSnafu)?; - let plan = LogicalPlan::DfPlan(dataframe.into_parts().1); + let plan = dataframe.into_parts().1; let table_info = self.table.table_info(); diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 6ed5844de09b..888ebbba831e 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -36,7 +36,7 @@ use datafusion::physical_plan::analyze::AnalyzeExec; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::ExecutionPlan; use datafusion_common::ResolvedTableReference; -use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp}; +use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlan, WriteOp}; use datatypes::prelude::VectorRef; use datatypes::schema::Schema; use futures_util::StreamExt; @@ -50,14 +50,13 @@ use crate::dataframe::DataFrame; pub use crate::datafusion::planner::DfContextProviderAdapter; use crate::dist_plan::MergeScanLogicalPlan; use crate::error::{ - CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu, MissingTableMutationHandlerSnafu, - MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableMutationSnafu, - TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu, + CatalogSnafu, ConvertSchemaSnafu, CreateRecordBatchSnafu, DataFusionSnafu, + MissingTableMutationHandlerSnafu, MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, + TableMutationSnafu, TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu, }; use crate::executor::QueryExecutor; use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED}; use crate::physical_wrapper::PhysicalPlanWrapperRef; -use crate::plan::LogicalPlan; use crate::planner::{DfLogicalPlanner, LogicalPlanner}; use crate::query_engine::{DescribeResult, QueryEngineContext, QueryEngineState}; use crate::{metrics, QueryEngine}; @@ -119,7 +118,7 @@ impl DatafusionQueryEngine { let table = self.find_table(&table_name, &query_ctx).await?; let output = self - .exec_query_plan(LogicalPlan::DfPlan((*dml.input).clone()), query_ctx.clone()) + .exec_query_plan((*dml.input).clone(), query_ctx.clone()) .await?; let mut stream = match output.data { OutputData::RecordBatches(batches) => batches.as_stream(), @@ -265,52 +264,48 @@ impl DatafusionQueryEngine { logical_plan: &LogicalPlan, ) -> Result> { let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer(); - match logical_plan { - LogicalPlan::DfPlan(df_plan) => { - let state = ctx.state(); - - // special handle EXPLAIN plan - if matches!(df_plan, DfLogicalPlan::Explain(_)) { - return state - .create_physical_plan(df_plan) - .await - .context(error::DatafusionSnafu) - .map_err(BoxedError::new) - .context(QueryExecutionSnafu); - } + let state = ctx.state(); - // analyze first - let analyzed_plan = state - .analyzer() - .execute_and_check(df_plan.clone(), state.config_options(), |_, _| {}) - .context(error::DatafusionSnafu) - .map_err(BoxedError::new) - .context(QueryExecutionSnafu)?; - // skip optimize for MergeScan - let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan - && ext.node.name() == MergeScanLogicalPlan::name() - { - analyzed_plan.clone() - } else { - state - .optimizer() - .optimize(analyzed_plan, state, |_, _| {}) - .context(error::DatafusionSnafu) - .map_err(BoxedError::new) - .context(QueryExecutionSnafu)? - }; + // special handle EXPLAIN plan + if matches!(logical_plan, DfLogicalPlan::Explain(_)) { + return state + .create_physical_plan(logical_plan) + .await + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu); + } - let physical_plan = state - .query_planner() - .create_physical_plan(&optimized_plan, state) - .await - .context(error::DatafusionSnafu) - .map_err(BoxedError::new) - .context(QueryExecutionSnafu)?; + // analyze first + let analyzed_plan = state + .analyzer() + .execute_and_check(logical_plan.clone(), state.config_options(), |_, _| {}) + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; + // skip optimize for MergeScan + let optimized_plan = if let DfLogicalPlan::Extension(ext) = &analyzed_plan + && ext.node.name() == MergeScanLogicalPlan::name() + { + analyzed_plan.clone() + } else { + state + .optimizer() + .optimize(analyzed_plan, state, |_, _| {}) + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)? + }; - Ok(physical_plan) - } - } + let physical_plan = state + .query_planner() + .create_physical_plan(&optimized_plan, state) + .await + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; + + Ok(physical_plan) } #[tracing::instrument(skip_all)] @@ -320,28 +315,25 @@ impl DatafusionQueryEngine { plan: &LogicalPlan, ) -> Result { let _timer = metrics::OPTIMIZE_LOGICAL_ELAPSED.start_timer(); - match plan { - LogicalPlan::DfPlan(df_plan) => { - // Optimized by extension rules - let optimized_plan = self - .state - .optimize_by_extension_rules(df_plan.clone(), context) - .context(error::DatafusionSnafu) - .map_err(BoxedError::new) - .context(QueryExecutionSnafu)?; - // Optimized by datafusion optimizer - let optimized_plan = self - .state - .session_state() - .optimize(&optimized_plan) - .context(error::DatafusionSnafu) - .map_err(BoxedError::new) - .context(QueryExecutionSnafu)?; + // Optimized by extension rules + let optimized_plan = self + .state + .optimize_by_extension_rules(plan.clone(), context) + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; + + // Optimized by datafusion optimizer + let optimized_plan = self + .state + .session_state() + .optimize(&optimized_plan) + .context(error::DatafusionSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; - Ok(LogicalPlan::DfPlan(optimized_plan)) - } - } + Ok(optimized_plan) } #[tracing::instrument(skip_all)] @@ -399,15 +391,25 @@ impl QueryEngine for DatafusionQueryEngine { ) -> Result { let ctx = self.engine_context(query_ctx); if let Ok(optimised_plan) = self.optimize(&ctx, &plan) { + let schema = optimised_plan + .schema() + .clone() + .try_into() + .context(ConvertSchemaSnafu)?; Ok(DescribeResult { - schema: optimised_plan.schema()?, + schema, logical_plan: optimised_plan, }) } else { // Table's like those in information_schema cannot be optimized when // it contains parameters. So we fallback to original plans. + let schema = plan + .schema() + .clone() + .try_into() + .context(ConvertSchemaSnafu)?; Ok(DescribeResult { - schema: plan.schema()?, + schema, logical_plan: plan, }) } @@ -415,9 +417,7 @@ impl QueryEngine for DatafusionQueryEngine { async fn execute(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result { match plan { - LogicalPlan::DfPlan(DfLogicalPlan::Dml(dml)) => { - self.exec_dml_statement(dml, query_ctx).await - } + LogicalPlan::Dml(dml) => self.exec_dml_statement(dml, query_ctx).await, _ => self.exec_query_plan(plan, query_ctx).await, } } @@ -577,10 +577,10 @@ mod tests { // TODO(sunng87): do not rely on to_string for compare assert_eq!( format!("{plan:?}"), - r#"DfPlan(Limit: skip=0, fetch=20 + r#"Limit: skip=0, fetch=20 Projection: SUM(numbers.number) Aggregate: groupBy=[[]], aggr=[[SUM(numbers.number)]] - TableScan: numbers)"# + TableScan: numbers"# ); } diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 4eca47e175e6..7e246d11c332 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -108,13 +108,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to convert Datafusion schema"))] - ConvertDatafusionSchema { - source: datatypes::error::Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to parse timestamp `{}`", raw))] ParseTimestamp { raw: String, @@ -228,6 +221,7 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Unknown table type, downcast failed"))] UnknownTable { #[snafu(implicit)] @@ -354,7 +348,6 @@ impl ErrorExt for Error { QueryAccessDenied { .. } => StatusCode::AccessDenied, Catalog { source, .. } => source.status_code(), - ConvertDatafusionSchema { source, .. } => source.status_code(), CreateRecordBatch { source, .. } => source.status_code(), QueryExecution { source, .. } | QueryPlan { source, .. } => source.status_code(), PlanSql { error, .. } => { diff --git a/src/query/src/plan.rs b/src/query/src/plan.rs index ea9dae3770da..710e92129d47 100644 --- a/src/query/src/plan.rs +++ b/src/query/src/plan.rs @@ -12,108 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; -use std::fmt::{Debug, Display}; +use std::collections::HashSet; -use common_query::prelude::ScalarValue; use datafusion::datasource::DefaultTableSource; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; -use datafusion_common::{ParamValues, TableReference}; +use datafusion_common::TableReference; use datafusion_expr::LogicalPlan as DfLogicalPlan; -use datatypes::data_type::ConcreteDataType; -use datatypes::schema::Schema; use session::context::QueryContextRef; use snafu::ResultExt; pub use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; use table::table_name::TableName; -use crate::error::{ConvertDatafusionSchemaSnafu, DataFusionSnafu, Result}; - -/// A LogicalPlan represents the different types of relational -/// operators (such as Projection, Filter, etc) and can be created by -/// the SQL query planner. -/// -/// A LogicalPlan represents transforming an input relation (table) to -/// an output relation (table) with a (potentially) different -/// schema. A plan represents a dataflow tree where data flows -/// from leaves up to the root to produce the query result. -#[derive(Clone, Debug)] -pub enum LogicalPlan { - DfPlan(DfLogicalPlan), -} - -impl LogicalPlan { - /// Get the schema for this logical plan - pub fn schema(&self) -> Result { - match self { - Self::DfPlan(plan) => { - let df_schema = plan.schema(); - df_schema - .clone() - .try_into() - .context(ConvertDatafusionSchemaSnafu) - } - } - } - - /// Return a `format`able structure that produces a single line - /// per node. For example: - /// - /// ```text - /// Projection: employee.id - /// Filter: employee.state Eq Utf8(\"CO\")\ - /// CsvScan: employee projection=Some([0, 3]) - /// ``` - pub fn display_indent(&self) -> impl Display + '_ { - let LogicalPlan::DfPlan(plan) = self; - plan.display_indent() - } - - /// Walk the logical plan, find any `PlaceHolder` tokens, - /// and return a map of their IDs and ConcreteDataTypes - pub fn get_param_types(&self) -> Result>> { - let LogicalPlan::DfPlan(plan) = self; - let types = plan.get_parameter_types().context(DataFusionSnafu)?; - - Ok(types - .into_iter() - .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v)))) - .collect()) - } - - /// Return a logical plan with all placeholders/params (e.g $1 $2, - /// ...) replaced with corresponding values provided in the - /// params_values - pub fn replace_params_with_values(&self, values: &[ScalarValue]) -> Result { - let LogicalPlan::DfPlan(plan) = self; - - plan.clone() - .replace_params_with_values(&ParamValues::List(values.to_vec())) - .context(DataFusionSnafu) - .map(LogicalPlan::DfPlan) - } - - /// Unwrap the logical plan into a DataFusion logical plan - pub fn unwrap_df_plan(self) -> DfLogicalPlan { - match self { - LogicalPlan::DfPlan(plan) => plan, - } - } - - /// Returns the DataFusion logical plan reference - pub fn df_plan(&self) -> &DfLogicalPlan { - match self { - LogicalPlan::DfPlan(plan) => plan, - } - } -} - -impl From for LogicalPlan { - fn from(plan: DfLogicalPlan) -> Self { - Self::DfPlan(plan) - } -} +use crate::error::{DataFusionSnafu, Result}; struct TableNamesExtractAndRewriter { pub(crate) table_names: HashSet, diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index 4c0986033586..0f4f74133ab5 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -22,7 +22,7 @@ use common_telemetry::tracing; use datafusion::common::DFSchema; use datafusion::execution::context::SessionState; use datafusion::sql::planner::PlannerContext; -use datafusion_expr::Expr as DfExpr; +use datafusion_expr::{Expr as DfExpr, LogicalPlan}; use datafusion_sql::planner::{ParserOptions, SqlToRel}; use promql_parser::parser::EvalStmt; use session::context::QueryContextRef; @@ -32,7 +32,6 @@ use sql::statements::statement::Statement; use crate::error::{DataFusionSnafu, PlanSqlSnafu, QueryPlanSnafu, Result, SqlSnafu}; use crate::parser::QueryStatement; -use crate::plan::LogicalPlan; use crate::promql::planner::PromPlanner; use crate::query_engine::{DefaultPlanDecoder, QueryEngineState}; use crate::range_select::plan_rewrite::RangePlanRewriter; @@ -109,7 +108,7 @@ impl DfLogicalPlanner { .optimize_by_extension_rules(plan, &context) .context(DataFusionSnafu)?; - Ok(LogicalPlan::DfPlan(plan)) + Ok(plan) } /// Generate a relational expression from a SQL expression @@ -160,7 +159,6 @@ impl DfLogicalPlanner { ); PromPlanner::stmt_to_plan(table_provider, stmt, &self.session_state) .await - .map(LogicalPlan::DfPlan) .map_err(BoxedError::new) .context(QueryPlanSnafu) } @@ -168,7 +166,7 @@ impl DfLogicalPlanner { #[tracing::instrument(skip_all)] fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result { self.engine_state - .optimize_logical_plan(plan.unwrap_df_plan()) + .optimize_logical_plan(plan) .context(DataFusionSnafu) .map(Into::into) } diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index e2d3e01c9114..61f6e1a8f0f8 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -30,6 +30,7 @@ use common_function::handlers::{ use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_query::prelude::ScalarUdf; use common_query::Output; +use datafusion_expr::LogicalPlan; use datatypes::schema::Schema; pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer}; use session::context::QueryContextRef; @@ -38,7 +39,6 @@ use table::TableRef; use crate::dataframe::DataFrame; use crate::datafusion::DatafusionQueryEngine; use crate::error::Result; -use crate::plan::LogicalPlan; use crate::planner::LogicalPlanner; pub use crate::query_engine::context::QueryEngineContext; pub use crate::query_engine::state::QueryEngineState; diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index 9eb16b359edf..43b0928539ee 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -556,7 +556,6 @@ mod test { use super::*; use crate::parser::QueryLanguageParser; - use crate::plan::LogicalPlan as GreptimeLogicalPlan; use crate::{QueryEngineFactory, QueryEngineRef}; async fn create_test_engine() -> QueryEngineRef { @@ -611,14 +610,14 @@ mod test { QueryEngineFactory::new(catalog_list, None, None, None, None, false).query_engine() } - async fn do_query(sql: &str) -> Result { + async fn do_query(sql: &str) -> Result { let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap(); let engine = create_test_engine().await; engine.planner().plan(stmt, QueryContext::arc()).await } async fn query_plan_compare(sql: &str, expected: String) { - let GreptimeLogicalPlan::DfPlan(plan) = do_query(sql).await.unwrap(); + let plan = do_query(sql).await.unwrap(); assert_eq!(plan.display_indent_schema().to_string(), expected); } diff --git a/src/query/src/tests/query_engine_test.rs b/src/query/src/tests/query_engine_test.rs index 6359940d4fa1..687346dbcb3e 100644 --- a/src/query/src/tests/query_engine_test.rs +++ b/src/query/src/tests/query_engine_test.rs @@ -35,7 +35,6 @@ use table::test_util::MemTable; use crate::error::{QueryExecutionSnafu, Result}; use crate::parser::QueryLanguageParser; -use crate::plan::LogicalPlan; use crate::query_engine::options::QueryOptions; use crate::query_engine::QueryEngineFactory; use crate::tests::exec_selection; @@ -64,18 +63,16 @@ async fn test_datafusion_query_engine() -> Result<()> { let limit = 10; let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone())); - let plan = LogicalPlan::DfPlan( - LogicalPlanBuilder::scan( - "numbers", - Arc::new(DefaultTableSource { table_provider }), - None, - ) - .unwrap() - .limit(0, Some(limit)) - .unwrap() - .build() - .unwrap(), - ); + let plan = LogicalPlanBuilder::scan( + "numbers", + Arc::new(DefaultTableSource { table_provider }), + None, + ) + .unwrap() + .limit(0, Some(limit)) + .unwrap() + .build() + .unwrap(); let output = engine.execute(plan, QueryContext::arc()).await?; diff --git a/src/script/src/table.rs b/src/script/src/table.rs index bbc04e6f64a7..cc20cd4f8442 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -33,7 +33,6 @@ use datafusion_common::TableReference; use datafusion_expr::LogicalPlanBuilder; use datatypes::prelude::ScalarVector; use datatypes::vectors::{StringVector, Vector}; -use query::plan::LogicalPlan; use query::QueryEngineRef; use servers::query_handler::grpc::GrpcQueryHandlerRef; use session::context::{QueryContextBuilder, QueryContextRef}; @@ -224,7 +223,7 @@ impl ScriptsTable { let output = self .query_engine - .execute(LogicalPlan::DfPlan(plan), query_ctx(&table_info)) + .execute(plan, query_ctx(&table_info)) .await .context(ExecuteInternalStatementSnafu)?; let stream = match output.data { @@ -279,7 +278,7 @@ impl ScriptsTable { .context(BuildDfLogicalPlanSnafu)?; let output = query_engine - .execute(LogicalPlan::DfPlan(plan), query_ctx(&table_info)) + .execute(plan, query_ctx(&table_info)) .await .context(ExecuteInternalStatementSnafu)?; let stream = match output.data { diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index d088961e731c..54665b8c686c 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -46,6 +46,7 @@ common-version = { workspace = true, features = ["codec"] } dashmap.workspace = true datafusion.workspace = true datafusion-common.workspace = true +datafusion-expr.workspace = true datatypes.workspace = true derive_builder.workspace = true futures = "0.3" diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 43bb458bd5d2..0fde3b527c84 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -398,13 +398,6 @@ pub enum Error { source: query::error::Error, }, - #[snafu(display("Failed to get param types"))] - GetPreparedStmtParams { - source: query::error::Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("{}", reason))] UnexpectedResult { reason: String, @@ -452,13 +445,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to replace params with values in prepared statement"))] - ReplacePreparedStmtParams { - source: query::error::Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to convert scalar value"))] ConvertScalarValue { source: datatypes::error::Error, @@ -635,9 +621,7 @@ impl ErrorExt for Error { InvalidUtf8Value { .. } => StatusCode::InvalidArguments, - ReplacePreparedStmtParams { source, .. } - | GetPreparedStmtParams { source, .. } - | ParsePromQL { source, .. } => source.status_code(), + ParsePromQL { source, .. } => source.status_code(), Other { source, .. } => source.status_code(), UnexpectedResult { .. } => StatusCode::Unexpected, diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 18388998e792..a2b72b548b1e 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -969,11 +969,11 @@ mod test { use axum::routing::get; use common_query::Output; use common_recordbatch::RecordBatches; + use datafusion_expr::LogicalPlan; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::{StringVector, UInt32Vector}; use query::parser::PromQuery; - use query::plan::LogicalPlan; use query::query_engine::DescribeResult; use session::context::QueryContextRef; use tokio::sync::mpsc; diff --git a/src/servers/src/interceptor.rs b/src/servers/src/interceptor.rs index d3478a56ea62..76ed4728e001 100644 --- a/src/servers/src/interceptor.rs +++ b/src/servers/src/interceptor.rs @@ -21,8 +21,8 @@ use api::v1::RowInsertRequests; use async_trait::async_trait; use common_error::ext::ErrorExt; use common_query::Output; +use datafusion_expr::LogicalPlan; use query::parser::PromQuery; -use query::plan::LogicalPlan; use serde_json::Value; use session::context::QueryContextRef; use sql::statements::statement::Statement; diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index ff1af967fa2f..ce6857c6d23f 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -18,8 +18,8 @@ #![feature(let_chains)] #![feature(if_let_guard)] +use datafusion_expr::LogicalPlan; use datatypes::schema::Schema; -use query::plan::LogicalPlan; pub mod addrs; pub mod configurator; diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index 8e7b4630ebb4..3c0ac36f4c63 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -25,6 +25,8 @@ use common_catalog::parse_optional_catalog_and_schema_from_db_string; use common_error::ext::ErrorExt; use common_query::Output; use common_telemetry::{debug, error, tracing, warn}; +use datafusion_common::ParamValues; +use datafusion_expr::LogicalPlan; use datatypes::prelude::ConcreteDataType; use itertools::Itertools; use opensrv_mysql::{ @@ -32,7 +34,6 @@ use opensrv_mysql::{ StatementMetaWriter, ValueInner, }; use parking_lot::RwLock; -use query::plan::LogicalPlan; use query::query_engine::DescribeResult; use rand::RngCore; use session::context::{Channel, QueryContextRef}; @@ -43,7 +44,7 @@ use sql::parser::{ParseOptions, ParserContext}; use sql::statements::statement::Statement; use tokio::io::AsyncWrite; -use crate::error::{self, InvalidPrepareStatementSnafu, Result}; +use crate::error::{self, DataFrameSnafu, InvalidPrepareStatementSnafu, Result}; use crate::metrics::METRIC_AUTH_FAILURE; use crate::mysql::helper::{ self, format_placeholder, replace_placeholders, transform_placeholders, @@ -175,8 +176,11 @@ impl MysqlInstanceShim { let params = if let Some(plan) = &plan { prepared_params( &plan - .get_param_types() - .context(error::GetPreparedStmtParamsSnafu)?, + .get_parameter_types() + .context(DataFrameSnafu)? + .into_iter() + .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v)))) + .collect(), )? } else { dummy_params(param_num)? @@ -323,8 +327,11 @@ impl AsyncMysqlShim for MysqlInstanceShi let outputs = match sql_plan.plan { Some(plan) => { let param_types = plan - .get_param_types() - .context(error::GetPreparedStmtParamsSnafu)?; + .get_parameter_types() + .context(DataFrameSnafu)? + .into_iter() + .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v)))) + .collect::>(); if params.len() != param_types.len() { return error::InternalSnafu { @@ -436,8 +443,11 @@ impl AsyncMysqlShim for MysqlInstanceShi let outputs = match sql_plan.plan { Some(plan) => { let param_types = plan - .get_param_types() - .context(error::GetPreparedStmtParamsSnafu)?; + .get_parameter_types() + .context(DataFrameSnafu)? + .into_iter() + .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v)))) + .collect::>(); if params.len() != param_types.len() { writer @@ -618,8 +628,9 @@ fn replace_params_with_values( } } - plan.replace_params_with_values(&values) - .context(error::ReplacePreparedStmtParamsSnafu) + plan.clone() + .replace_params_with_values(&ParamValues::List(values.clone())) + .context(DataFrameSnafu) } fn replace_params_with_exprs( @@ -645,8 +656,9 @@ fn replace_params_with_exprs( } } - plan.replace_params_with_values(&values) - .context(error::ReplacePreparedStmtParamsSnafu) + plan.clone() + .replace_params_with_values(&ParamValues::List(values.clone())) + .context(DataFrameSnafu) } async fn validate_query(query: &str) -> Result { diff --git a/src/servers/src/postgres/handler.rs b/src/servers/src/postgres/handler.rs index 53d907d814db..e10a45ddecec 100644 --- a/src/servers/src/postgres/handler.rs +++ b/src/servers/src/postgres/handler.rs @@ -20,6 +20,8 @@ use common_query::{Output, OutputData}; use common_recordbatch::error::Result as RecordBatchResult; use common_recordbatch::RecordBatch; use common_telemetry::{debug, error, tracing}; +use datafusion_common::ParamValues; +use datatypes::prelude::ConcreteDataType; use datatypes::schema::SchemaRef; use futures::{future, stream, Stream, StreamExt}; use pgwire::api::portal::{Format, Portal}; @@ -272,7 +274,10 @@ impl ExtendedQueryHandler for PostgresServerHandler { let output = if let Some(plan) = &sql_plan.plan { let plan = plan - .replace_params_with_values(parameters_to_scalar_values(plan, portal)?.as_ref()) + .clone() + .replace_params_with_values(&ParamValues::List(parameters_to_scalar_values( + plan, portal, + )?)) .map_err(|e| PgWireError::ApiError(Box::new(e)))?; self.query_handler .do_exec_plan(plan, query_ctx.clone()) @@ -306,8 +311,11 @@ impl ExtendedQueryHandler for PostgresServerHandler { let sql_plan = &stmt.statement; let (param_types, sql_plan, format) = if let Some(plan) = &sql_plan.plan { let param_types = plan - .get_param_types() - .map_err(|e| PgWireError::ApiError(Box::new(e)))?; + .get_parameter_types() + .map_err(|e| PgWireError::ApiError(Box::new(e)))? + .into_iter() + .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v)))) + .collect(); let types = param_types_to_pg_types(¶m_types) .map_err(|e| PgWireError::ApiError(Box::new(e)))?; diff --git a/src/servers/src/postgres/types.rs b/src/servers/src/postgres/types.rs index 9f9d94905e4a..85b626cb1ac6 100644 --- a/src/servers/src/postgres/types.rs +++ b/src/servers/src/postgres/types.rs @@ -23,6 +23,7 @@ use std::ops::Deref; use chrono::{NaiveDate, NaiveDateTime}; use common_time::Interval; use datafusion_common::ScalarValue; +use datafusion_expr::LogicalPlan; use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::Schema; use datatypes::types::TimestampType; @@ -30,7 +31,6 @@ use pgwire::api::portal::{Format, Portal}; use pgwire::api::results::{DataRowEncoder, FieldInfo}; use pgwire::api::Type; use pgwire::error::{PgWireError, PgWireResult}; -use query::plan::LogicalPlan; use session::context::QueryContextRef; use session::session_config::PGByteaOutputValue; @@ -276,8 +276,11 @@ pub(super) fn parameters_to_scalar_values( let client_param_types = &portal.statement.parameter_types; let param_types = plan - .get_param_types() - .map_err(|e| PgWireError::ApiError(Box::new(e)))?; + .get_parameter_types() + .map_err(|e| PgWireError::ApiError(Box::new(e)))? + .into_iter() + .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v)))) + .collect::>(); for idx in 0..param_count { let server_type = param_types diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index d548a843b59f..34d332e81dac 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -28,10 +28,10 @@ use common_telemetry::tracing; use common_time::timestamp::TimeUnit; use datafusion::prelude::{col, lit, regexp_match, Expr}; use datafusion_common::ScalarValue; +use datafusion_expr::LogicalPlan; use datatypes::prelude::{ConcreteDataType, Value}; use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue}; use query::dataframe::DataFrame; -use query::plan::LogicalPlan; use snafu::{ensure, OptionExt, ResultExt}; use snap::raw::{Decoder, Encoder}; @@ -123,7 +123,7 @@ pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result { .filter(conditions) .context(error::DataFrameSnafu)?; - Ok(LogicalPlan::DfPlan(dataframe.into_parts().1)) + Ok(dataframe.into_parts().1) } #[inline] diff --git a/src/servers/src/query_handler/sql.rs b/src/servers/src/query_handler/sql.rs index 9b9148af881a..eef443bbec1e 100644 --- a/src/servers/src/query_handler/sql.rs +++ b/src/servers/src/query_handler/sql.rs @@ -17,8 +17,8 @@ use std::sync::Arc; use async_trait::async_trait; use common_error::ext::{BoxedError, ErrorExt}; use common_query::Output; +use datafusion_expr::LogicalPlan; use query::parser::PromQuery; -use query::plan::LogicalPlan; use session::context::QueryContextRef; use snafu::ResultExt; use sql::statements::statement::Statement; diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index 3e6e743b5c09..a0ef088050e3 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -21,8 +21,8 @@ use auth::tests::{DatabaseAuthInfo, MockUserProvider}; use axum::{http, Router}; use common_query::Output; use common_test_util::ports; +use datafusion_expr::LogicalPlan; use query::parser::PromQuery; -use query::plan::LogicalPlan; use query::query_engine::DescribeResult; use servers::error::{Error, Result}; use servers::http::header::constants::GREPTIME_DB_HEADER_NAME; diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index 1a719aa93d2c..635060261a2b 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -19,8 +19,8 @@ use async_trait::async_trait; use axum::Router; use common_query::Output; use common_test_util::ports; +use datafusion_expr::LogicalPlan; use query::parser::PromQuery; -use query::plan::LogicalPlan; use query::query_engine::DescribeResult; use servers::error::{self, Result}; use servers::http::test_helpers::TestClient; diff --git a/src/servers/tests/http/prom_store_test.rs b/src/servers/tests/http/prom_store_test.rs index fd2d01304e4c..bb800e5b7cea 100644 --- a/src/servers/tests/http/prom_store_test.rs +++ b/src/servers/tests/http/prom_store_test.rs @@ -23,9 +23,9 @@ use async_trait::async_trait; use axum::Router; use common_query::Output; use common_test_util::ports; +use datafusion_expr::LogicalPlan; use prost::Message; use query::parser::PromQuery; -use query::plan::LogicalPlan; use query::query_engine::DescribeResult; use servers::error::{Error, Result}; use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF}; diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index daf9382ec737..ca098546aa9e 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -21,8 +21,8 @@ use async_trait::async_trait; use catalog::memory::MemoryCatalogManager; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; +use datafusion_expr::LogicalPlan; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; -use query::plan::LogicalPlan; use query::query_engine::DescribeResult; use query::{QueryEngineFactory, QueryEngineRef}; use script::engine::{CompileContext, EvalContext, Script, ScriptEngine}; diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index fa88de07de4f..a4ef632b335f 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -33,7 +33,6 @@ mod test { use common_recordbatch::RecordBatches; use frontend::instance::Instance; use query::parser::QueryLanguageParser; - use query::plan::LogicalPlan; use query::query_engine::DefaultSerializer; use servers::query_handler::grpc::GrpcQueryHandler; use session::context::QueryContext; @@ -540,7 +539,7 @@ CREATE TABLE {table_name} ( &QueryContext::arc(), ) .unwrap(); - let LogicalPlan::DfPlan(plan) = instance + let plan = instance .frontend() .statement_executor() .plan(stmt, QueryContext::arc()) diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index b3f966c811e5..096be44e0c6e 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -28,10 +28,10 @@ mod tests { use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::debug; + use datafusion_expr::LogicalPlan; use frontend::error::{self, Error, Result}; use frontend::instance::Instance; use query::parser::QueryLanguageParser; - use query::plan::LogicalPlan; use query::query_engine::DefaultSerializer; use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef}; use servers::query_handler::sql::SqlQueryHandler; @@ -233,7 +233,7 @@ mod tests { &QueryContext::arc(), ) .unwrap(); - let LogicalPlan::DfPlan(plan) = instance + let plan = instance .frontend() .statement_executor() .plan(stmt, QueryContext::arc()) @@ -317,7 +317,7 @@ mod tests { fn pre_execute( &self, _statement: &Statement, - _plan: Option<&query::plan::LogicalPlan>, + _plan: Option<&LogicalPlan>, _query_ctx: QueryContextRef, ) -> Result<()> { let _ = self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed);