Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KE-26764 fix loophole in spark #253

Merged
merged 3 commits into from
May 26, 2021
Merged

KE-26764 fix loophole in spark #253

merged 3 commits into from
May 26, 2021

Conversation

woyumen4597
Copy link

What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Copy link

@zheniantoushipashi zheniantoushipashi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@woyumen4597 woyumen4597 merged commit d6cf217 into kyspark-3.1.1.x-4.x May 26, 2021
chenzhx pushed a commit to chenzhx/spark that referenced this pull request Oct 22, 2021
* KE-26764 fix loophole in spark

* upgrade thriftserver to 0.14

* upgrade spark version to r21
chenzhx pushed a commit to chenzhx/spark that referenced this pull request May 17, 2022
### What changes were proposed in this pull request?
This PR adds a new optimizer rule `MergeScalarSubqueries` to merge multiple non-correlated `ScalarSubquery`s to compute multiple scalar values once.

E.g. the following query:
```
SELECT
  (SELECT avg(a) FROM t),
  (SELECT sum(b) FROM t)
```
is optimized from:
```
== Optimized Logical Plan ==
Project [scalar-subquery#242 [] AS scalarsubquery()Kyligence#253, scalar-subquery#243 [] AS scalarsubquery()#254L]
:  :- Aggregate [avg(a#244) AS avg(a)Kyligence#247]
:  :  +- Project [a#244]
:  :     +- Relation default.t[a#244,b#245] parquet
:  +- Aggregate [sum(a#251) AS sum(a)#250L]
:     +- Project [a#251]
:        +- Relation default.t[a#251,b#252] parquet
+- OneRowRelation
```
to:
```
== Optimized Logical Plan ==
Project [scalar-subquery#242 [].avg(a) AS scalarsubquery()Kyligence#253, scalar-subquery#243 [].sum(a) AS scalarsubquery()#254L]
:  :- Project [named_struct(avg(a), avg(a)Kyligence#247, sum(a), sum(a)#250L) AS mergedValue#260]
:  :  +- Aggregate [avg(a#244) AS avg(a)Kyligence#247, sum(a#244) AS sum(a)#250L]
:  :     +- Project [a#244]
:  :        +- Relation default.t[a#244,b#245] parquet
:  +- Project [named_struct(avg(a), avg(a)Kyligence#247, sum(a), sum(a)#250L) AS mergedValue#260]
:     +- Aggregate [avg(a#244) AS avg(a)Kyligence#247, sum(a#244) AS sum(a)#250L]
:        +- Project [a#244]
:           +- Relation default.t[a#244,b#245] parquet
+- OneRowRelation
```
and in the physical plan subqueries are reused:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) Project [Subquery subquery#242, [id=Kyligence#113].avg(a) AS scalarsubquery()Kyligence#253, ReusedSubquery Subquery subquery#242, [id=Kyligence#113].sum(a) AS scalarsubquery()#254L]
   :  :- Subquery subquery#242, [id=Kyligence#113]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            *(2) Project [named_struct(avg(a), avg(a)Kyligence#247, sum(a), sum(a)#250L) AS mergedValue#260]
            +- *(2) HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)Kyligence#247, sum(a)#250L])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=Kyligence#158]
                     +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L])
                        +- *(1) ColumnarToRow
                           +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int>
         +- == Initial Plan ==
            Project [named_struct(avg(a), avg(a)Kyligence#247, sum(a), sum(a)#250L) AS mergedValue#260]
            +- HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)Kyligence#247, sum(a)#250L])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=Kyligence#110]
                  +- HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L])
                     +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int>
   :  +- ReusedSubquery Subquery subquery#242, [id=Kyligence#113]
   +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
...
```

Please note that the above simple example could be easily optimized into a common select expression without reuse node, but this PR can handle more complex queries as well.

### Why are the changes needed?
Performance improvement.
```
[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9 - MergeScalarSubqueries off                    50798          52521        1423          0.0      Infinity       1.0X
[info] q9 - MergeScalarSubqueries on                     19484          19675         226          0.0      Infinity       2.6X

[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9b - MergeScalarSubqueries off                   15430          17803         NaN          0.0      Infinity       1.0X
[info] q9b - MergeScalarSubqueries on                     3862           4002         196          0.0      Infinity       4.0X
```
Please find `q9b` in the description of SPARK-34079. It is a variant of [q9.sql](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q9.sql) using CTE.
The performance improvement in case of `q9` comes from merging 15 subqueries into 5 and in case of `q9b` it comes from merging 5 subqueries into 1.

### Does this PR introduce _any_ user-facing change?
No. But this optimization can be disabled with `spark.sql.optimizer.excludedRules` config.

### How was this patch tested?
Existing and new UTs.

Closes apache#32298 from peter-toth/SPARK-34079-multi-column-scalar-subquery.

Lead-authored-by: Peter Toth <[email protected]>
Co-authored-by: attilapiros <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
chenzhx pushed a commit to chenzhx/spark that referenced this pull request Jul 13, 2022
…ource is incorrect

### What changes were proposed in this pull request?
apache#36726 supports TimestampNTZ type in JDBC data source.
But the implement is incorrect.
This PR just modify a test case and it will be failed !
The test case show below.
```
  test("SPARK-39339: TimestampNTZType with different local time zones") {
    val tableName = "timestamp_ntz_diff_tz_support_table"

    DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
      DateTimeTestUtils.withDefaultTimeZone(zoneId) {
        Seq(
          "1972-07-04 03:30:00",
          "2019-01-20 12:00:00.502",
          "2019-01-20T00:00:00.123456",
          "1500-01-20T00:00:00.123456"
        ).foreach { case datetime =>
          val df = spark.sql(s"select timestamp_ntz '$datetime'")
          df.write.format("jdbc")
            .mode("overwrite")
            .option("url", urlWithUserAndPass)
            .option("dbtable", tableName)
            .save()

          DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
            DateTimeTestUtils.withDefaultTimeZone(zoneId) {
              val res = spark.read.format("jdbc")
                .option("inferTimestampNTZType", "true")
                .option("url", urlWithUserAndPass)
                .option("dbtable", tableName)
                .load()

              checkAnswer(res, df)
            }
          }
        }
      }
    }
  }
```

The test case output failure show below.
```
Results do not match for query:
Timezone: sun.util.calendar.ZoneInfo[id="Africa/Dakar",offset=0,dstSavings=0,useDaylight=false,transitions=3,lastRule=null]
Timezone Env:

== Parsed Logical Plan ==
Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'Kyligence#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1]

== Analyzed Logical Plan ==
TIMESTAMP_NTZ '1500-01-20 00:00:00.123456': timestamp_ntz
Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'Kyligence#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1]

== Optimized Logical Plan ==
Relation [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'Kyligence#253] JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1]

== Physical Plan ==
*(1) Scan JDBCRelation(timestamp_ntz_diff_tz_support_table) [numPartitions=1] [TIMESTAMP_NTZ '1500-01-20 00:00:00.123456'Kyligence#253] PushedFilters: [], ReadSchema: struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz>

== Results ==

== Results ==
!== Correct Answer - 1 ==                                           == Spark Answer - 1 ==
 struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz>   struct<TIMESTAMP_NTZ '1500-01-20 00:00:00.123456':timestamp_ntz>
![1500-01-20T00:00:00.123456]                                       [1500-01-20T00:16:08.123456]

ScalaTestFailureLocation: org.apache.spark.sql.QueryTest$ at (QueryTest.scala:243)
org.scalatest.exceptions.TestFailedException:
```

### Why are the changes needed?
Fix an implement bug.
The reason of the bug is use `toJavaTimestamp` and `fromJavaTimestamp`.
`toJavaTimestamp` and `fromJavaTimestamp` lead to the timestamp with JVM system time zone.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New test case.

Closes apache#37013 from beliefer/SPARK-39339_followup.

Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
leejaywei pushed a commit that referenced this pull request Jul 13, 2022
### What changes were proposed in this pull request?
This PR adds a new optimizer rule `MergeScalarSubqueries` to merge multiple non-correlated `ScalarSubquery`s to compute multiple scalar values once.

E.g. the following query:
```
SELECT
  (SELECT avg(a) FROM t),
  (SELECT sum(b) FROM t)
```
is optimized from:
```
== Optimized Logical Plan ==
Project [scalar-subquery#242 [] AS scalarsubquery()#253, scalar-subquery#243 [] AS scalarsubquery()#254L]
:  :- Aggregate [avg(a#244) AS avg(a)#247]
:  :  +- Project [a#244]
:  :     +- Relation default.t[a#244,b#245] parquet
:  +- Aggregate [sum(a#251) AS sum(a)#250L]
:     +- Project [a#251]
:        +- Relation default.t[a#251,b#252] parquet
+- OneRowRelation
```
to:
```
== Optimized Logical Plan ==
Project [scalar-subquery#242 [].avg(a) AS scalarsubquery()#253, scalar-subquery#243 [].sum(a) AS scalarsubquery()#254L]
:  :- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
:  :  +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L]
:  :     +- Project [a#244]
:  :        +- Relation default.t[a#244,b#245] parquet
:  +- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
:     +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L]
:        +- Project [a#244]
:           +- Relation default.t[a#244,b#245] parquet
+- OneRowRelation
```
and in the physical plan subqueries are reused:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) Project [Subquery subquery#242, [id=#113].avg(a) AS scalarsubquery()#253, ReusedSubquery Subquery subquery#242, [id=#113].sum(a) AS scalarsubquery()#254L]
   :  :- Subquery subquery#242, [id=#113]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            *(2) Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
            +- *(2) HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#158]
                     +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L])
                        +- *(1) ColumnarToRow
                           +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int>
         +- == Initial Plan ==
            Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
            +- HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#110]
                  +- HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L])
                     +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int>
   :  +- ReusedSubquery Subquery subquery#242, [id=#113]
   +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
...
```

Please note that the above simple example could be easily optimized into a common select expression without reuse node, but this PR can handle more complex queries as well.

### Why are the changes needed?
Performance improvement.
```
[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9 - MergeScalarSubqueries off                    50798          52521        1423          0.0      Infinity       1.0X
[info] q9 - MergeScalarSubqueries on                     19484          19675         226          0.0      Infinity       2.6X

[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9b - MergeScalarSubqueries off                   15430          17803         NaN          0.0      Infinity       1.0X
[info] q9b - MergeScalarSubqueries on                     3862           4002         196          0.0      Infinity       4.0X
```
Please find `q9b` in the description of SPARK-34079. It is a variant of [q9.sql](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q9.sql) using CTE.
The performance improvement in case of `q9` comes from merging 15 subqueries into 5 and in case of `q9b` it comes from merging 5 subqueries into 1.

### Does this PR introduce _any_ user-facing change?
No. But this optimization can be disabled with `spark.sql.optimizer.excludedRules` config.

### How was this patch tested?
Existing and new UTs.

Closes apache#32298 from peter-toth/SPARK-34079-multi-column-scalar-subquery.

Lead-authored-by: Peter Toth <[email protected]>
Co-authored-by: attilapiros <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit e00b81e)
Signed-off-by: Wenchen Fan <[email protected]>
leejaywei pushed a commit that referenced this pull request Jul 13, 2022
* KE-26764 fix loophole in spark

* upgrade thriftserver to 0.14

* upgrade spark version to r21
RolatZhang pushed a commit that referenced this pull request Aug 29, 2023
* KE-26764 fix loophole in spark

* upgrade thriftserver to 0.14

* upgrade spark version to r21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants