Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-36444][SQL] Remove OptimizeSubqueries from batch of PartitionPruning #33664

Closed
wants to merge 3 commits into from
Closed

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented Aug 6, 2021

What changes were proposed in this pull request?

Remove OptimizeSubqueries from batch of PartitionPruning to make DPP support more cases. For example:

SELECT date_id, product_id FROM fact_sk f                                        
JOIN (select store_id + 3 as new_store_id from dim_store where country = 'US') s 
ON f.store_id = s.new_store_id                                                   

Before this PR:

== Physical Plan ==
*(2) Project [date_id#3998, product_id#3999]
+- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#274]
      +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
         +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
            +- *(1) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>

After this PR:

== Physical Plan ==
*(2) Project [date_id#3998, product_id#3999]
+- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN dynamicpruning#4007)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int>
   :        +- SubqueryBroadcast dynamicpruning#4007, 0, [new_store_id#3997], [id=#263]
   :           +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262]
   :              +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
   :                 +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
   :                    +- *(1) ColumnarToRow
   :                       +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
   +- ReusedExchange [new_store_id#3997], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262]

This is because OptimizeSubqueries will infer more filters, so we cannot reuse broadcasts. The following is the plan if disable spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly:

== Physical Plan ==
*(2) Project [date_id#3998, product_id#3999]
+- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN subquery#4009)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int>
   :        +- Subquery subquery#4009, [id=#284]
   :           +- *(2) HashAggregate(keys=[new_store_id#3997#4008], functions=[])
   :              +- Exchange hashpartitioning(new_store_id#3997#4008, 5), ENSURE_REQUIREMENTS, [id=#280]
   :                 +- *(1) HashAggregate(keys=[new_store_id#3997 AS new_store_id#3997#4008], functions=[])
   :                    +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
   :                       +- *(1) Filter (((isnotnull(store_id#4002) AND isnotnull(country#4004)) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
   :                          +- *(1) ColumnarToRow
   :                             +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(store_id#4002), isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002..., Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(store_id), IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#305]
      +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
         +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
            +- *(1) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>

Why are the changes needed?

Improve DPP to support more cases.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test and benchmark test:

SQL Before this PR(Seconds) After this PR(Seconds)
TPC-DS q58 40 20
TPC-DS q83 18 14

@github-actions github-actions bot added the SQL label Aug 6, 2021
@SparkQA
Copy link

SparkQA commented Aug 6, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46655/

@SparkQA
Copy link

SparkQA commented Aug 6, 2021

Test build #142143 has finished for PR 33664 at commit be2abff.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 6, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46655/

@wangyum
Copy link
Member Author

wangyum commented Aug 6, 2021

retest this please.

@SparkQA
Copy link

SparkQA commented Aug 6, 2021

Test build #142158 has finished for PR 33664 at commit be2abff.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 6, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46671/

@SparkQA
Copy link

SparkQA commented Aug 6, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46671/

@SparkQA
Copy link

SparkQA commented Aug 6, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46677/

@wangyum
Copy link
Member Author

wangyum commented Aug 6, 2021

cc @cloud-fan @maryannxue

@SparkQA
Copy link

SparkQA commented Aug 6, 2021

Test build #142164 has finished for PR 33664 at commit 0d7e228.

  • This patch fails from timeout after a configured wait of 500m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Aug 7, 2021

retest this please.

@SparkQA
Copy link

SparkQA commented Aug 7, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46696/

@SparkQA
Copy link

SparkQA commented Aug 7, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/46696/

@SparkQA
Copy link

SparkQA commented Aug 7, 2021

Test build #142184 has finished for PR 33664 at commit 0d7e228.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

This is a tricky case, inferring more filters is generally good if it doesn't break DPP, so this is not a simple decision.

cc @maryannxue

Comment on lines 44 to 46
Batch("PartitionPruning", Once,
PartitionPruning,
OptimizeSubqueries) :+
PartitionPruning) :+
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is:

  private val partitionPruningRules = Seq(PartitionPruning) ++
    (if (catalog.conf.dynamicPartitionPruningReuseBroadcastOnly) Nil else Seq(OptimizeSubqueries))

    Batch("PartitionPruning", Once,
      partitionPruningRules: _*) :+

@SparkQA
Copy link

SparkQA commented Aug 16, 2021

Test build #142480 has finished for PR 33664 at commit 0d7e228.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Aug 19, 2021

retest this please.

@cloud-fan
Copy link
Contributor

cloud-fan commented Aug 19, 2021

a7a3935#diff-5221c65a64ad82c34cae68169cdb389210a9a28145058ae995b46ff4d3d4964cR39

We put this OptimizeSubqueries rule together with the DPP rule at the very beginning. It's kind of a mistake, as once this rule applies, we break plan reuse and thus break DPP. cc @maryannxue

This PR LGTM

@SparkQA
Copy link

SparkQA commented Aug 19, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47133/

@SparkQA
Copy link

SparkQA commented Aug 19, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47133/

@SparkQA
Copy link

SparkQA commented Aug 19, 2021

Test build #142633 has finished for PR 33664 at commit 0d7e228.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum wangyum closed this in 2310b99 Aug 19, 2021
wangyum added a commit that referenced this pull request Aug 19, 2021
…runing

### What changes were proposed in this pull request?

Remove `OptimizeSubqueries` from batch of `PartitionPruning` to make DPP support more cases. For example:
```sql
SELECT date_id, product_id FROM fact_sk f
JOIN (select store_id + 3 as new_store_id from dim_store where country = 'US') s
ON f.store_id = s.new_store_id
```

Before this PR:
```
== Physical Plan ==
*(2) Project [date_id#3998, product_id#3999]
+- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#274]
      +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
         +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
            +- *(1) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
```

After this PR:
```
== Physical Plan ==
*(2) Project [date_id#3998, product_id#3999]
+- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN dynamicpruning#4007)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int>
   :        +- SubqueryBroadcast dynamicpruning#4007, 0, [new_store_id#3997], [id=#263]
   :           +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262]
   :              +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
   :                 +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
   :                    +- *(1) ColumnarToRow
   :                       +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
   +- ReusedExchange [new_store_id#3997], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#262]
```
This is because `OptimizeSubqueries` will infer more filters, so we cannot reuse broadcasts. The following is the plan if disable `spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly`:
```
== Physical Plan ==
*(2) Project [date_id#3998, product_id#3999]
+- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, BuildRight, false
   :- *(2) ColumnarToRow
   :  +- FileScan parquet default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), dynamicpruningexpression(store_id#4001 IN subquery#4009)], PushedFilters: [], ReadSchema: struct<date_id:int,product_id:int>
   :        +- Subquery subquery#4009, [id=#284]
   :           +- *(2) HashAggregate(keys=[new_store_id#3997#4008], functions=[])
   :              +- Exchange hashpartitioning(new_store_id#3997#4008, 5), ENSURE_REQUIREMENTS, [id=#280]
   :                 +- *(1) HashAggregate(keys=[new_store_id#3997 AS new_store_id#3997#4008], functions=[])
   :                    +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
   :                       +- *(1) Filter (((isnotnull(store_id#4002) AND isnotnull(country#4004)) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
   :                          +- *(1) ColumnarToRow
   :                             +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(store_id#4002), isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002..., Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(store_id), IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#305]
      +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
         +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
            +- *(1) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: [isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
```

### Why are the changes needed?

Improve DPP to support more cases.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test and benchmark test:
SQL | Before this PR(Seconds) | After this PR(Seconds)
-- | -- | --
TPC-DS q58 | 40 | 20
TPC-DS q83 | 18 | 14

Closes #33664 from wangyum/SPARK-36444.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
(cherry picked from commit 2310b99)
Signed-off-by: Yuming Wang <[email protected]>
@wangyum
Copy link
Member Author

wangyum commented Aug 19, 2021

Merged to master and branch-3.2.

@wangyum wangyum deleted the SPARK-36444 branch August 19, 2021 08:47
cloud-fan added a commit that referenced this pull request Nov 10, 2022
…d optimize subqueries

### What changes were proposed in this pull request?

This is a followup to #36304 to simplify `RowLevelOperationRuntimeGroupFiltering`. It does 3 things:
1. run `OptimizeSubqueries` in the batch `PartitionPruning`, so that `RowLevelOperationRuntimeGroupFiltering` does not need to invoke it manually.
2. skip dpp subquery in `OptimizeSubqueries`, to avoid the issue fixed by #33664
3. `RowLevelOperationRuntimeGroupFiltering` creates `InSubquery` instead of `DynamicPruningSubquery`, so that it can be optimized by `OptimizeSubqueries` later. This also avoids unnecessary planning overhead of `DynamicPruningSubquery`, as there is no join and we can only run it as a subquery.

### Why are the changes needed?

code simplification

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #38557 from cloud-fan/help.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
…d optimize subqueries

### What changes were proposed in this pull request?

This is a followup to apache#36304 to simplify `RowLevelOperationRuntimeGroupFiltering`. It does 3 things:
1. run `OptimizeSubqueries` in the batch `PartitionPruning`, so that `RowLevelOperationRuntimeGroupFiltering` does not need to invoke it manually.
2. skip dpp subquery in `OptimizeSubqueries`, to avoid the issue fixed by apache#33664
3. `RowLevelOperationRuntimeGroupFiltering` creates `InSubquery` instead of `DynamicPruningSubquery`, so that it can be optimized by `OptimizeSubqueries` later. This also avoids unnecessary planning overhead of `DynamicPruningSubquery`, as there is no join and we can only run it as a subquery.

### Why are the changes needed?

code simplification

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes apache#38557 from cloud-fan/help.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants