Skip to content

Commit

Permalink
refactor: Remove PhysicalOptimizer and LogicalOptimizer trait (#4426)
Browse files Browse the repository at this point in the history
* refactor(query): Remove LogicalOptimizer trait

* refactor(query): Remove PhysicalOptimizer trait
  • Loading branch information
leaf-potato authored Jul 24, 2024
1 parent f787265 commit e935bf7
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 120 deletions.
1 change: 0 additions & 1 deletion src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use common_telemetry::debug;
use either::Either;
use meta_client::client::MetaClientBuilder;
use query::datafusion::DatafusionQueryEngine;
use query::logical_optimizer::LogicalOptimizer;
use query::parser::QueryLanguageParser;
use query::plan::LogicalPlan;
use query::query_engine::{DefaultSerializer, QueryEngineState};
Expand Down
130 changes: 64 additions & 66 deletions src/query/src/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ use crate::error::{
TableNotFoundSnafu, TableReadOnlySnafu, UnsupportedExprSnafu,
};
use crate::executor::QueryExecutor;
use crate::logical_optimizer::LogicalOptimizer;
use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED};
use crate::physical_optimizer::PhysicalOptimizer;
use crate::physical_wrapper::PhysicalPlanWrapperRef;
use crate::plan::LogicalPlan;
use crate::planner::{DfLogicalPlanner, LogicalPlanner};
Expand Down Expand Up @@ -310,6 +308,70 @@ impl DatafusionQueryEngine {
}
}
}

#[tracing::instrument(skip_all)]
pub fn optimize(
&self,
context: &QueryEngineContext,
plan: &LogicalPlan,
) -> Result<LogicalPlan> {
let _timer = metrics::OPTIMIZE_LOGICAL_ELAPSED.start_timer();
match plan {
LogicalPlan::DfPlan(df_plan) => {
// Optimized by extension rules
let optimized_plan = self
.state
.optimize_by_extension_rules(df_plan.clone(), context)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;

// Optimized by datafusion optimizer
let optimized_plan = self
.state
.session_state()
.optimize(&optimized_plan)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;

Ok(LogicalPlan::DfPlan(optimized_plan))
}
}
}

#[tracing::instrument(skip_all)]
fn optimize_physical_plan(
&self,
ctx: &mut QueryEngineContext,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer();

let state = ctx.state();
let config = state.config_options();
// skip optimize AnalyzeExec plan
let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::<AnalyzeExec>()
{
let mut new_plan = analyze_plan.input().clone();
for optimizer in state.physical_optimizers() {
new_plan = optimizer
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
Arc::new(DistAnalyzeExec::new(new_plan))
} else {
let mut new_plan = plan;
for optimizer in state.physical_optimizers() {
new_plan = optimizer
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
new_plan
};

Ok(optimized_plan)
}
}

#[async_trait]
Expand Down Expand Up @@ -387,70 +449,6 @@ impl QueryEngine for DatafusionQueryEngine {
}
}

impl LogicalOptimizer for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn optimize(&self, context: &QueryEngineContext, plan: &LogicalPlan) -> Result<LogicalPlan> {
let _timer = metrics::OPTIMIZE_LOGICAL_ELAPSED.start_timer();
match plan {
LogicalPlan::DfPlan(df_plan) => {
// Optimized by extension rules
let optimized_plan = self
.state
.optimize_by_extension_rules(df_plan.clone(), context)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;

// Optimized by datafusion optimizer
let optimized_plan = self
.state
.session_state()
.optimize(&optimized_plan)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;

Ok(LogicalPlan::DfPlan(optimized_plan))
}
}
}
}

impl PhysicalOptimizer for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn optimize_physical_plan(
&self,
ctx: &mut QueryEngineContext,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer();

let state = ctx.state();
let config = state.config_options();
// skip optimize AnalyzeExec plan
let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::<AnalyzeExec>()
{
let mut new_plan = analyze_plan.input().clone();
for optimizer in state.physical_optimizers() {
new_plan = optimizer
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
Arc::new(DistAnalyzeExec::new(new_plan))
} else {
let mut new_plan = plan;
for optimizer in state.physical_optimizers() {
new_plan = optimizer
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
new_plan
};

Ok(optimized_plan)
}
}

impl QueryExecutor for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn execute_stream(
Expand Down
2 changes: 0 additions & 2 deletions src/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ pub mod dist_plan;
pub mod dummy_catalog;
pub mod error;
pub mod executor;
pub mod logical_optimizer;
pub mod metrics;
mod optimizer;
pub mod parser;
pub mod physical_optimizer;
pub mod physical_wrapper;
pub mod plan;
pub mod planner;
Expand Down
23 changes: 0 additions & 23 deletions src/query/src/logical_optimizer.rs

This file was deleted.

28 changes: 0 additions & 28 deletions src/query/src/physical_optimizer.rs

This file was deleted.

0 comments on commit e935bf7

Please sign in to comment.