Skip to content

Commit

Permalink
[SPARK-36444][SQL] Remove OptimizeSubqueries from batch of PartitionP…
Browse files Browse the repository at this point in the history
…runing

### What changes were proposed in this pull request?

Remove `OptimizeSubqueries` from batch of `PartitionPruning` to make DPP support more cases. For example:
```sql
SELECT date_id, product_id FROM fact_sk f
JOIN (select store_id + 3 as new_store_id from dim_store where country = 'US') s
ON f.store_id = s.new_store_id
```

Before this PR:
```
== Physical Plan ==
*(2) Project [date_id#3998, product_id#3999]
+- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#274]
      +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
         +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
            +- *(1) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
```

After this PR:
```
== Physical Plan ==
*(2) Project [date_id#3998, product_id#3999]
+- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN dynamicpruning#4007)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int>
   :        +- SubqueryBroadcast dynamicpruning#4007, 0, [new_store_id#3997], [id=#263]
   :           +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262]
   :              +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
   :                 +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
   :                    +- *(1) ColumnarToRow
   :                       +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
   +- ReusedExchange [new_store_id#3997], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262]
```
This is because `OptimizeSubqueries` will infer more filters, so we cannot reuse broadcasts. The following is the plan if disable `spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly`:
```
== Physical Plan ==
*(2) Project [date_id#3998, product_id#3999]
+- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN subquery#4009)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int>
   :        +- Subquery subquery#4009, [id=#284]
   :           +- *(2) HashAggregate(keys=[new_store_id#3997#4008], functions=[])
   :              +- Exchange hashpartitioning(new_store_id#3997#4008, 5), ENSURE_REQUIREMENTS, [id=#280]
   :                 +- *(1) HashAggregate(keys=[new_store_id#3997 AS new_store_id#3997#4008], functions=[])
   :                    +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
   :                       +- *(1) Filter (((isnotnull(store_id#4002) AND isnotnull(country#4004)) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
   :                          +- *(1) ColumnarToRow
   :                             +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(store_id#4002), isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002..., Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(store_id), IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#305]
      +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
         +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
            +- *(1) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
```

### Why are the changes needed?

Improve DPP to support more cases.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test and benchmark test:
SQL | Before this PR(Seconds) | After this PR(Seconds)
-- | -- | --
TPC-DS q58 | 40 | 20
TPC-DS q83 | 18 | 14

Closes #33664 from wangyum/SPARK-36444.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
(cherry picked from commit 2310b99)
Signed-off-by: Yuming Wang <[email protected]>
  • Loading branch information
wangyum committed Aug 19, 2021
1 parent 9544c24 commit 5b97165
Show file tree
Hide file tree
Showing 10 changed files with 1,235 additions and 1,156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ class SparkOptimizer(
override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Batch("PartitionPruning", Once,
PartitionPruning,
OptimizeSubqueries) :+
PartitionPruning) :+
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
PushDownPredicates) :+
Batch("Cleanup filters that cannot be pushed down", Once,
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,32 @@ TakeOrderedAndProject [item_id,ss_item_rev,ss_dev,cs_item_rev,cs_dev,ws_item_rev
ColumnarToRow
InputAdapter
Scan parquet default.store_sales [ss_item_sk,ss_ext_sales_price,ss_sold_date_sk]
InputAdapter
BroadcastExchange #2
WholeStageCodegen (2)
Project [d_date_sk]
BroadcastHashJoin [d_date,d_date]
Filter [d_date_sk]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_date_sk,d_date]
InputAdapter
BroadcastExchange #3
WholeStageCodegen (1)
Project [d_date]
Filter [d_week_seq]
Subquery #1
SubqueryBroadcast [d_date_sk] #1
BroadcastExchange #2
WholeStageCodegen (2)
Project [d_date_sk]
BroadcastHashJoin [d_date,d_date]
Filter [d_date_sk]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_date_sk,d_date]
InputAdapter
BroadcastExchange #3
WholeStageCodegen (1)
Project [d_week_seq]
Filter [d_date]
Project [d_date]
Filter [d_week_seq]
Subquery #2
WholeStageCodegen (1)
Project [d_week_seq]
Filter [d_date]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_date,d_week_seq]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_date,d_week_seq]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_date,d_week_seq]
InputAdapter
ReusedExchange [d_date_sk] #2
InputAdapter
BroadcastExchange #4
WholeStageCodegen (3)
Expand All @@ -66,6 +68,7 @@ TakeOrderedAndProject [item_id,ss_item_rev,ss_dev,cs_item_rev,cs_dev,ws_item_rev
ColumnarToRow
InputAdapter
Scan parquet default.catalog_sales [cs_item_sk,cs_ext_sales_price,cs_sold_date_sk]
ReusedSubquery [d_date_sk] #1
InputAdapter
ReusedExchange [d_date_sk] #2
InputAdapter
Expand All @@ -87,6 +90,7 @@ TakeOrderedAndProject [item_id,ss_item_rev,ss_dev,cs_item_rev,cs_dev,ws_item_rev
ColumnarToRow
InputAdapter
Scan parquet default.web_sales [ws_item_sk,ws_ext_sales_price,ws_sold_date_sk]
ReusedSubquery [d_date_sk] #1
InputAdapter
ReusedExchange [d_date_sk] #2
InputAdapter
Expand Down
Loading

0 comments on commit 5b97165

Please sign in to comment.