Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove DfPlan wrapper #4733

Merged
merged 5 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use either::Either;
use meta_client::client::MetaClientBuilder;
use query::datafusion::DatafusionQueryEngine;
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::query_engine::{DefaultSerializer, QueryEngineState};
use query::QueryEngine;
use rustyline::error::ReadlineError;
Expand Down Expand Up @@ -179,7 +178,7 @@ impl Repl {
.await
.context(PlanStatementSnafu)?;

let LogicalPlan::DfPlan(plan) = query_engine
let plan = query_engine
.optimize(&query_engine.engine_context(query_ctx), &plan)
.context(PlanStatementSnafu)?;

Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ impl RegionServerInner {

let result = self
.query_engine
.execute(request.plan.into(), query_ctx)
.execute(request.plan, query_ctx)
.await
.context(ExecuteLogicalPlanSnafu)?;

Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::prelude::ScalarUdf;
use common_query::Output;
use common_runtime::Runtime;
use datafusion_expr::LogicalPlan;
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
use query::planner::LogicalPlanner;
use query::query_engine::{DescribeResult, QueryEngineState};
use query::{QueryEngine, QueryEngineContext};
Expand Down
2 changes: 0 additions & 2 deletions src/flow/src/df_optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use datafusion_expr::{
BinaryExpr, Expr, Operator, Projection, ScalarUDFImpl, Signature, TypeSignature, Volatility,
};
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::query_engine::DefaultSerializer;
use query::QueryEngine;
use snafu::ResultExt;
Expand Down Expand Up @@ -111,7 +110,6 @@ pub async fn sql_to_flow_plan(
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let LogicalPlan::DfPlan(plan) = plan;

let opted_plan = apply_df_optimizer(plan).await?;

Expand Down
3 changes: 0 additions & 3 deletions src/flow/src/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ mod test {
use itertools::Itertools;
use prost::Message;
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::query_engine::DefaultSerializer;
use query::QueryEngine;
use session::context::QueryContext;
Expand Down Expand Up @@ -274,7 +273,6 @@ mod test {
.plan(stmt, QueryContext::arc())
.await
.unwrap();
let LogicalPlan::DfPlan(plan) = plan;
let plan = apply_df_optimizer(plan).await.unwrap();

// encode then decode so to rely on the impl of conversion from logical plan to substrait plan
Expand All @@ -297,7 +295,6 @@ mod test {
.plan(stmt, QueryContext::arc())
.await
.unwrap();
let LogicalPlan::DfPlan(plan) = plan;
let plan = apply_df_optimizer(plan).await;

assert!(plan.is_err());
Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
datafusion-expr.workspace = true
datanode.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_query::Output;
use common_telemetry::{debug, error, tracing};
use datafusion_expr::LogicalPlan;
use log_store::raft_engine::RaftEngineBackend;
use operator::delete::DeleterRef;
use operator::insert::InserterRef;
Expand All @@ -48,7 +49,6 @@ use pipeline::pipeline_operator::PipelineOperator;
use prometheus::HistogramTimer;
use query::metrics::OnDone;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::plan::LogicalPlan;
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use query::query_engine::DescribeResult;
use query::QueryEngineRef;
Expand Down
16 changes: 8 additions & 8 deletions src/operator/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,6 @@ pub enum Error {
source: query::error::Error,
},

#[snafu(display("Failed to get schema from logical plan"))]
GetSchema {
#[snafu(implicit)]
location: Location,
source: query::error::Error,
},

#[snafu(display("Column datatype error"))]
ColumnDataType {
#[snafu(implicit)]
Expand Down Expand Up @@ -184,6 +177,13 @@ pub enum Error {
source: datatypes::error::Error,
},

#[snafu(display("Failed to convert datafusion schema"))]
ConvertSchema {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to convert expr to struct"))]
InvalidExpr {
#[snafu(implicit)]
Expand Down Expand Up @@ -795,6 +795,7 @@ impl ErrorExt for Error {
| Error::PrepareFileTable { .. }
| Error::InferFileTableSchema { .. }
| Error::SchemaIncompatible { .. }
| Error::ConvertSchema { .. }
| Error::UnsupportedRegionRequest { .. }
| Error::InvalidTableName { .. }
| Error::InvalidViewName { .. }
Expand Down Expand Up @@ -872,7 +873,6 @@ impl ErrorExt for Error {
| Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(),

Error::ExecuteStatement { source, .. }
| Error::GetSchema { source, .. }
| Error::ExtractTableNames { source, .. }
| Error::PlanStatement { source, .. }
| Error::ParseQuery { source, .. }
Expand Down
2 changes: 1 addition & 1 deletion src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ use common_query::Output;
use common_telemetry::tracing;
use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion_expr::LogicalPlan;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use query::parser::QueryStatement;
use query::plan::LogicalPlan;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
Expand Down
3 changes: 1 addition & 2 deletions src/operator/src/statement/copy_table_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use datafusion::datasource::DefaultTableSource;
use datafusion_common::TableReference as DfTableReference;
use datafusion_expr::LogicalPlanBuilder;
use object_store::ObjectStore;
use query::plan::LogicalPlan;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use table::requests::CopyTableRequest;
Expand Down Expand Up @@ -133,7 +132,7 @@ impl StatementExecutor {

let output = self
.query_engine
.execute(LogicalPlan::DfPlan(plan), query_ctx)
.execute(plan, query_ctx)
.await
.context(ExecLogicalPlanSnafu)?;
let stream = match output.data {
Expand Down
24 changes: 13 additions & 11 deletions src/operator/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use common_query::Output;
use common_telemetry::{debug, info, tracing};
use common_time::Timezone;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use datatypes::schema::{RawSchema, Schema};
use datatypes::value::Value;
use lazy_static::lazy_static;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
Expand Down Expand Up @@ -69,11 +69,11 @@ use table::TableRef;
use super::StatementExecutor;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
CreateLogicalTablesSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu,
ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu, InvalidPartitionSnafu,
InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result,
SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu, SubstraitCodecSnafu,
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
ConvertSchemaSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DeserializePartitionSnafu,
EmptyDdlExprSnafu, ExtractTableNamesSnafu, FlowNotFoundSnafu, InvalidPartitionRuleSnafu,
InvalidPartitionSnafu, InvalidTableNameSnafu, InvalidViewNameSnafu, InvalidViewStmtSnafu,
ParseSqlValueSnafu, Result, SchemaInUseSnafu, SchemaNotFoundSnafu, SchemaReadOnlySnafu,
SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu,
};
use crate::expr_factory;
Expand Down Expand Up @@ -406,9 +406,12 @@ impl StatementExecutor {

// Save the columns in plan, it may changed when the schemas of tables in plan
// are altered.
let plan_columns: Vec<_> = logical_plan
let schema: Schema = logical_plan
.schema()
.context(error::GetSchemaSnafu)?
.clone()
.try_into()
.context(ConvertSchemaSnafu)?;
let plan_columns: Vec<_> = schema
.column_schemas()
.iter()
.map(|c| c.name.clone())
Expand All @@ -434,9 +437,8 @@ impl StatementExecutor {

// Extract the table names from the original plan
// and rewrite them as fully qualified names.
let (table_names, plan) =
extract_and_rewrite_full_table_names(logical_plan.unwrap_df_plan(), ctx.clone())
.context(ExtractTableNamesSnafu)?;
let (table_names, plan) = extract_and_rewrite_full_table_names(logical_plan, ctx.clone())
.context(ExtractTableNamesSnafu)?;

let table_names = table_names.into_iter().map(|t| t.into()).collect();

Expand Down
2 changes: 1 addition & 1 deletion src/operator/src/statement/tql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ use std::collections::HashMap;

use common_query::Output;
use common_telemetry::tracing;
use datafusion_expr::LogicalPlan;
use query::parser::{
PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME,
DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
};
use query::plan::LogicalPlan;
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::statements::tql::Tql;
Expand Down
7 changes: 3 additions & 4 deletions src/pipeline/src/manager/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ use common_telemetry::{debug, info};
use common_time::timestamp::{TimeUnit, Timestamp};
use datafusion::logical_expr::col;
use datafusion_common::{TableReference, ToDFSchema};
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan};
use datafusion_expr::{DmlStatement, LogicalPlan};
use datatypes::prelude::ScalarVector;
use datatypes::timestamp::TimestampNanosecond;
use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
use moka::sync::Cache;
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
use query::QueryEngineRef;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
Expand Down Expand Up @@ -373,7 +372,7 @@ impl PipelineTable {
Arc::new(dataframe.into_parts().1),
);

let plan = LogicalPlan::DfPlan(DfLogicalPlan::Dml(stmt));
let plan = LogicalPlan::Dml(stmt);

// 4. execute dml stmt
let output = self
Expand Down Expand Up @@ -427,7 +426,7 @@ impl PipelineTable {
.limit(0, Some(1))
.context(BuildDfLogicalPlanSnafu)?;

let plan = LogicalPlan::DfPlan(dataframe.into_parts().1);
let plan = dataframe.into_parts().1;

let table_info = self.table.table_info();

Expand Down
Loading