Skip to content

Commit

Permalink
refactor(query): Remove PhysicalOptimizer trait
Browse files Browse the repository at this point in the history
  • Loading branch information
leaf-potato committed Jul 23, 2024
1 parent 49f22f0 commit 6a396df
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 65 deletions.
69 changes: 33 additions & 36 deletions src/query/src/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ use crate::error::{
use crate::executor::QueryExecutor;
use crate::logical_optimizer::LogicalOptimizer;
use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED};
use crate::physical_optimizer::PhysicalOptimizer;
use crate::physical_planner::PhysicalPlanner;
use crate::physical_wrapper::PhysicalPlanWrapperRef;
use crate::plan::LogicalPlan;
Expand Down Expand Up @@ -256,6 +255,39 @@ impl DatafusionQueryEngine {
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu { table: table_name })
}

#[tracing::instrument(skip_all)]
fn optimize_physical_plan(
&self,
ctx: &mut QueryEngineContext,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer();

let state = ctx.state();
let config = state.config_options();
// skip optimize AnalyzeExec plan
let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::<AnalyzeExec>()
{
let mut new_plan = analyze_plan.input().clone();
for optimizer in state.physical_optimizers() {
new_plan = optimizer
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
Arc::new(DistAnalyzeExec::new(new_plan))
} else {
let mut new_plan = plan;
for optimizer in state.physical_optimizers() {
new_plan = optimizer
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
new_plan
};

Ok(optimized_plan)
}
}

#[async_trait]
Expand Down Expand Up @@ -420,41 +452,6 @@ impl PhysicalPlanner for DatafusionQueryEngine {
}
}

impl PhysicalOptimizer for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn optimize_physical_plan(
&self,
ctx: &mut QueryEngineContext,
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer();

let state = ctx.state();
let config = state.config_options();
// skip optimize AnalyzeExec plan
let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::<AnalyzeExec>()
{
let mut new_plan = analyze_plan.input().clone();
for optimizer in state.physical_optimizers() {
new_plan = optimizer
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
Arc::new(DistAnalyzeExec::new(new_plan))
} else {
let mut new_plan = plan;
for optimizer in state.physical_optimizers() {
new_plan = optimizer
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
new_plan
};

Ok(optimized_plan)
}
}

impl QueryExecutor for DatafusionQueryEngine {
#[tracing::instrument(skip_all)]
fn execute_stream(
Expand Down
1 change: 0 additions & 1 deletion src/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub mod logical_optimizer;
pub mod metrics;
mod optimizer;
pub mod parser;
pub mod physical_optimizer;
pub mod physical_planner;
pub mod physical_wrapper;
pub mod plan;
Expand Down
28 changes: 0 additions & 28 deletions src/query/src/physical_optimizer.rs

This file was deleted.

0 comments on commit 6a396df

Please sign in to comment.