From 38185cacda6e56573c1aa4b316458679a81948af Mon Sep 17 00:00:00 2001 From: jakevin Date: Sun, 26 Feb 2023 05:09:39 +0800 Subject: [PATCH] enhance: remove more projection. (#5402) --- datafusion/core/src/dataframe.rs | 6 ++-- datafusion/core/tests/sql/explain_analyze.rs | 3 +- datafusion/core/tests/sql/subqueries.rs | 34 +++++++++---------- datafusion/optimizer/src/eliminate_project.rs | 2 ++ 4 files changed, 21 insertions(+), 24 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 3d00c684550a..1d5396219584 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -1575,11 +1575,9 @@ mod tests { \n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\ \n Inner Join: t1.c1 = t2.c1\ \n SubqueryAlias: t1\ - \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\ - \n TableScan: aggregate_test_100 projection=[c1, c2, c3]\ + \n TableScan: aggregate_test_100 projection=[c1, c2, c3]\ \n SubqueryAlias: t2\ - \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\ - \n TableScan: aggregate_test_100 projection=[c1, c2, c3]", + \n TableScan: aggregate_test_100 projection=[c1, c2, c3]", format!("{:?}", df_renamed.clone().into_optimized_plan()?) ); diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 7391f29de921..3cbf50275326 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -650,8 +650,7 @@ async fn test_physical_plan_display_indent_multi_children() { " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 9000), input_partitions=9000", " RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1", - " ProjectionExec: expr=[c1@0 as c1]", - " CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]", + " CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 9000), input_partitions=9000", " RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1", diff --git a/datafusion/core/tests/sql/subqueries.rs b/datafusion/core/tests/sql/subqueries.rs index 969443f6235e..fb114d641567 100644 --- a/datafusion/core/tests/sql/subqueries.rs +++ b/datafusion/core/tests/sql/subqueries.rs @@ -129,15 +129,14 @@ async fn exists_subquery_with_same_table() -> Result<()> { let plan = dataframe.into_optimized_plan()?; let expected = vec![ - "Explain [plan_type:Utf8, plan:Utf8]", - " Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", - " Filter: EXISTS () [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", - " Subquery: [t1_int:UInt32;N]", - " Projection: t1.t1_int [t1_int:UInt32;N]", - " Filter: t1.t1_id > t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", - " TableScan: t1 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", - " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", - ]; + "Explain [plan_type:Utf8, plan:Utf8]", + " Filter: EXISTS () [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", + " Subquery: [t1_int:UInt32;N]", + " Projection: t1.t1_int [t1_int:UInt32;N]", + " Filter: t1.t1_id > t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", + " TableScan: t1 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", + " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", + ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); assert_eq!( @@ -160,15 +159,14 @@ async fn in_subquery_with_same_table() -> Result<()> { let plan = dataframe.into_optimized_plan()?; let expected = vec![ - "Explain [plan_type:Utf8, plan:Utf8]", - " Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", - " LeftSemi Join: t1.t1_id = __correlated_sq_1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", - " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", - " SubqueryAlias: __correlated_sq_1 [t1_int:UInt32;N]", - " Projection: t1.t1_int AS t1_int [t1_int:UInt32;N]", - " Filter: t1.t1_id > t1.t1_int [t1_id:UInt32;N, t1_int:UInt32;N]", - " TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]", - ]; + "Explain [plan_type:Utf8, plan:Utf8]", + " LeftSemi Join: t1.t1_id = __correlated_sq_1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", + " TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", + " SubqueryAlias: __correlated_sq_1 [t1_int:UInt32;N]", + " Projection: t1.t1_int AS t1_int [t1_int:UInt32;N]", + " Filter: t1.t1_id > t1.t1_int [t1_id:UInt32;N, t1_int:UInt32;N]", + " TableScan: t1 projection=[t1_id, t1_int] [t1_id:UInt32;N, t1_int:UInt32;N]", + ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); assert_eq!( diff --git a/datafusion/optimizer/src/eliminate_project.rs b/datafusion/optimizer/src/eliminate_project.rs index cf77a227f628..143004aaabeb 100644 --- a/datafusion/optimizer/src/eliminate_project.rs +++ b/datafusion/optimizer/src/eliminate_project.rs @@ -46,6 +46,8 @@ impl OptimizerRule for EliminateProjection { | LogicalPlan::CrossJoin(_) | LogicalPlan::Union(_) | LogicalPlan::Filter(_) + | LogicalPlan::TableScan(_) + | LogicalPlan::SubqueryAlias(_) | LogicalPlan::Sort(_) => { if can_eliminate(projection, child_plan.schema()) { Ok(Some(child_plan.clone()))