Skip to content

Commit

Permalink
[SPARK-32564][SQL][TEST] Inject data statistics to simulate plan gene…
Browse files Browse the repository at this point in the history
…ration on actual TPCDS data

### What changes were proposed in this pull request?

`TPCDSQuerySuite` currently computes plans with empty TPCDS tables, then checks if plans can be generated correctly. But, the generated plans can be different from actual ones because the input tables are empty (e.g., the plans always use broadcast-hash joins, but actual ones use sort-merge joins for larger tables). To mitigate the issue, this PR defines data statistics constants extracted from generated TPCDS data in `TPCDSTableStats`, then injects the statistics via `spark.sessionState.catalog.alterTableStats` when defining TPCDS tables in `TPCDSQuerySuite`.

Please see a link below about how to extract the table statistics:
 - https://gist.github.com/maropu/f553d32c323ee803d39e2f7fa0b5a8c3

For example, the generated plans of TPCDS `q2` are different with/without this fix:
```
==== w/ this fix: q2 ====
== Physical Plan ==
* Sort (43)
+- Exchange (42)
   +- * Project (41)
      +- * SortMergeJoin Inner (40)
         :- * Sort (28)
         :  +- Exchange (27)
         :     +- * Project (26)
         :        +- * BroadcastHashJoin Inner BuildRight (25)
         :           :- * HashAggregate (19)
         :           :  +- Exchange (18)
         :           :     +- * HashAggregate (17)
         :           :        +- * Project (16)
         :           :           +- * BroadcastHashJoin Inner BuildRight (15)
         :           :              :- Union (9)
         :           :              :  :- * Project (4)
         :           :              :  :  +- * Filter (3)
         :           :              :  :     +- * ColumnarToRow (2)
         :           :              :  :        +- Scan parquet default.web_sales (1)
         :           :              :  +- * Project (8)
         :           :              :     +- * Filter (7)
         :           :              :        +- * ColumnarToRow (6)
         :           :              :           +- Scan parquet default.catalog_sales (5)
         :           :              +- BroadcastExchange (14)
         :           :                 +- * Project (13)
         :           :                    +- * Filter (12)
         :           :                       +- * ColumnarToRow (11)
         :           :                          +- Scan parquet default.date_dim (10)
         :           +- BroadcastExchange (24)
         :              +- * Project (23)
         :                 +- * Filter (22)
         :                    +- * ColumnarToRow (21)
         :                       +- Scan parquet default.date_dim (20)
         +- * Sort (39)
            +- Exchange (38)
               +- * Project (37)
                  +- * BroadcastHashJoin Inner BuildRight (36)
                     :- * HashAggregate (30)
                     :  +- ReusedExchange (29)
                     +- BroadcastExchange (35)
                        +- * Project (34)
                           +- * Filter (33)
                              +- * ColumnarToRow (32)
                                 +- Scan parquet default.date_dim (31)

==== w/o this fix: q2 ====
== Physical Plan ==
* Sort (40)
+- Exchange (39)
   +- * Project (38)
      +- * BroadcastHashJoin Inner BuildRight (37)
         :- * Project (26)
         :  +- * BroadcastHashJoin Inner BuildRight (25)
         :     :- * HashAggregate (19)
         :     :  +- Exchange (18)
         :     :     +- * HashAggregate (17)
         :     :        +- * Project (16)
         :     :           +- * BroadcastHashJoin Inner BuildRight (15)
         :     :              :- Union (9)
         :     :              :  :- * Project (4)
         :     :              :  :  +- * Filter (3)
         :     :              :  :     +- * ColumnarToRow (2)
         :     :              :  :        +- Scan parquet default.web_sales (1)
         :     :              :  +- * Project (8)
         :     :              :     +- * Filter (7)
         :     :              :        +- * ColumnarToRow (6)
         :     :              :           +- Scan parquet default.catalog_sales (5)
         :     :              +- BroadcastExchange (14)
         :     :                 +- * Project (13)
         :     :                    +- * Filter (12)
         :     :                       +- * ColumnarToRow (11)
         :     :                          +- Scan parquet default.date_dim (10)
         :     +- BroadcastExchange (24)
         :        +- * Project (23)
         :           +- * Filter (22)
         :              +- * ColumnarToRow (21)
         :                 +- Scan parquet default.date_dim (20)
         +- BroadcastExchange (36)
            +- * Project (35)
               +- * BroadcastHashJoin Inner BuildRight (34)
                  :- * HashAggregate (28)
                  :  +- ReusedExchange (27)
                  +- BroadcastExchange (33)
                     +- * Project (32)
                        +- * Filter (31)
                           +- * ColumnarToRow (30)
                              +- Scan parquet default.date_dim (29)
```

This comes from the cloud-fan comment: #29270 (comment)

### Why are the changes needed?

For better test coverage.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #29384 from maropu/AddTPCDSTableStats.

Authored-by: Takeshi Yamamuro <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
maropu authored and dongjoon-hyun committed Aug 7, 2020
1 parent 7b6e1d5 commit 5b8444a
Show file tree
Hide file tree
Showing 3 changed files with 517 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,17 @@ class TPCDSQuerySuite extends BenchmarkQueryTest with TPCDSSchema {
"q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90",
"q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99")

val sqlConfgs = Seq(
SQLConf.CBO_ENABLED.key -> "true",
SQLConf.PLAN_STATS_ENABLED.key -> "true",
SQLConf.JOIN_REORDER_ENABLED.key -> "true"
)

tpcdsQueries.foreach { name =>
val queryString = resourceToString(s"tpcds/$name.sql",
classLoader = Thread.currentThread().getContextClassLoader)
test(name) {
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
withSQLConf(sqlConfgs: _*) {
// check the plans can be properly generated
val plan = sql(queryString).queryExecution.executedPlan
checkGeneratedCode(plan)
Expand All @@ -69,7 +75,7 @@ class TPCDSQuerySuite extends BenchmarkQueryTest with TPCDSSchema {
val queryString = resourceToString(s"tpcds-v2.7.0/$name.sql",
classLoader = Thread.currentThread().getContextClassLoader)
test(s"$name-v2.7") {
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
withSQLConf(sqlConfgs: _*) {
// check the plans can be properly generated
val plan = sql(queryString).queryExecution.executedPlan
checkGeneratedCode(plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.TableIdentifier

trait TPCDSSchema {

private val tableColumns = Map(
Expand Down Expand Up @@ -255,5 +257,9 @@ trait TPCDSSchema {
|USING $format
|${options.mkString("\n")}
""".stripMargin)

// To simulate plan generation on actual TPCDS data, injects data stats here
spark.sessionState.catalog.alterTableStats(
TableIdentifier(tableName), Some(TPCDSTableStats.sf100TableStats(tableName)))
}
}
Loading

0 comments on commit 5b8444a

Please sign in to comment.