forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#57 fix KAP ut Don't delete shuffle data immediately #68
Merged
zheniantoushipashi
merged 1 commit into
Kyligence:kyspark-2.4.1.x-4.x
from
zheniantoushipashi:testtest
Oct 18, 2019
Merged
#57 fix KAP ut Don't delete shuffle data immediately #68
zheniantoushipashi
merged 1 commit into
Kyligence:kyspark-2.4.1.x-4.x
from
zheniantoushipashi:testtest
Oct 18, 2019
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
follow up issue: #69 |
hn5092
approved these changes
Oct 17, 2019
jiaoew1991
approved these changes
Oct 18, 2019
7mming7
pushed a commit
that referenced
this pull request
Nov 4, 2020
… coerce to nullable type ### What changes were proposed in this pull request? This PR targets for non-nullable null type not to coerce to nullable type in complex types. Non-nullable fields in struct, elements in an array and entries in map can mean empty array, struct and map. They are empty so it does not need to force the nullability when we find common types. This PR also reverts and supersedes apache@d7b97a1 ### Why are the changes needed? To make type coercion coherent and consistent. Currently, we correctly keep the nullability even between non-nullable fields: ```scala import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ spark.range(1).select(array(lit(1)).cast(ArrayType(IntegerType, false))).printSchema() spark.range(1).select(array(lit(1)).cast(ArrayType(DoubleType, false))).printSchema() ``` ```scala spark.range(1).selectExpr("concat(array(1), array(1)) as arr").printSchema() ``` ### Does this PR introduce any user-facing change? Yes. ```scala import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ spark.range(1).select(array().cast(ArrayType(IntegerType, false))).printSchema() ``` ```scala spark.range(1).selectExpr("concat(array(), array(1)) as arr").printSchema() ``` **Before:** ``` org.apache.spark.sql.AnalysisException: cannot resolve 'array()' due to data type mismatch: cannot cast array<null> to array<int>;; 'Project [cast(array() as array<int>) AS array()#68] +- Range (0, 1, step=1, splits=Some(12)) at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:149) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:140) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:333) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:333) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237) ``` ``` root |-- arr: array (nullable = false) | |-- element: integer (containsNull = true) ``` **After:** ``` root |-- array(): array (nullable = false) | |-- element: integer (containsNull = false) ``` ``` root |-- arr: array (nullable = false) | |-- element: integer (containsNull = false) ``` ### How was this patch tested? Unittests were added and manually tested. Closes apache#27991 from HyukjinKwon/SPARK-31227. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
lxian
pushed a commit
to lxian/spark
that referenced
this pull request
Mar 24, 2021
…join can be planned as broadcast join ### What changes were proposed in this pull request? Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases. ```scala spark.range(50000000L).selectExpr("id % 10000 as a", "id % 10000 as b").write.saveAsTable("t1") spark.range(40000000L).selectExpr("id % 8000 as c", "id % 8000 as d").write.saveAsTable("t2") spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM t2").explain ``` Before this pr: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#72] +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=Kyligence#65] : +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint> +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=Kyligence#66] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#61] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint> ``` After this pr: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#74] +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=Kyligence#67] : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) : +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#61] : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) : +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint> +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=Kyligence#68] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#63] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint> ``` ### Why are the changes needed? 1. Pushdown LeftSemi/LeftAnti over Aggregate will affect performance. 2. It will remove user added DISTINCT operator, e.g.: [q38](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q38.sql), [q87](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q87.sql). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test and benchmark test. SQL | Before this PR(Seconds) | After this PR(Seconds) -- | -- | -- q14a | 660 | 594 q14b | 660 | 600 q38 | 55 | 29 q87 | 66 | 35 Before this pr: ![image](https://user-images.githubusercontent.com/5399861/104452849-8789fc80-55de-11eb-88da-44059899f9a9.png) After this pr: ![image](https://user-images.githubusercontent.com/5399861/104452899-9a043600-55de-11eb-9286-d8f3a23ca3b8.png) Closes apache#31145 from wangyum/SPARK-34081. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
lxian
pushed a commit
to lxian/spark
that referenced
this pull request
Sep 24, 2021
…query ### What changes were proposed in this pull request? Remove redundant aliases after `RewritePredicateSubquery`. For example: ```scala sql("CREATE TABLE t1 USING parquet AS SELECT id AS a, id AS b, id AS c FROM range(10)") sql("CREATE TABLE t2 USING parquet AS SELECT id AS x, id AS y FROM range(8)") sql( """ |SELECT * |FROM t1 |WHERE a IN (SELECT x | FROM (SELECT x AS x, | Rank() OVER (partition BY x ORDER BY Sum(y) DESC) AS ranking | FROM t2 | GROUP BY x) tmp1 | WHERE ranking <= 5) |""".stripMargin).explain ``` Before this PR: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- BroadcastHashJoin [a#10L], [x#7L], LeftSemi, BuildRight, false :- FileScan parquet default.t1[a#10L,b#11L,c#12L] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=Kyligence#68] +- Project [x#7L] +- Filter (ranking#8 <= 5) +- Window [rank(_w2#25L) windowspecdefinition(x#15L, _w2#25L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ranking#8], [x#15L], [_w2#25L DESC NULLS LAST] +- Sort [x#15L ASC NULLS FIRST, _w2#25L DESC NULLS LAST], false, 0 +- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#62] +- HashAggregate(keys=[x#15L], functions=[sum(y#16L)]) +- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#59] +- HashAggregate(keys=[x#15L], functions=[partial_sum(y#16L)]) +- FileScan parquet default.t2[x#15L,y#16L] ``` After this PR: ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- BroadcastHashJoin [a#10L], [x#15L], LeftSemi, BuildRight, false :- FileScan parquet default.t1[a#10L,b#11L,c#12L] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=Kyligence#67] +- Project [x#15L] +- Filter (ranking#8 <= 5) +- Window [rank(_w2#25L) windowspecdefinition(x#15L, _w2#25L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ranking#8], [x#15L], [_w2#25L DESC NULLS LAST] +- Sort [x#15L ASC NULLS FIRST, _w2#25L DESC NULLS LAST], false, 0 +- HashAggregate(keys=[x#15L], functions=[sum(y#16L)]) +- Exchange hashpartitioning(x#15L, 5), ENSURE_REQUIREMENTS, [id=Kyligence#59] +- HashAggregate(keys=[x#15L], functions=[partial_sum(y#16L)]) +- FileScan parquet default.t2[x#15L,y#16L] ``` ### Why are the changes needed? Reduce shuffle to improve query performance. This change can benefit TPC-DS q70. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes apache#33509 from wangyum/SPARK-36280. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
No description provided.