Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32590][SQL] Remove fullOutput from RowDataSourceScanExec #29415

Closed
wants to merge 2 commits into from

Conversation

huaxingao
Copy link
Contributor

What changes were proposed in this pull request?

Remove fullOutput from RowDataSourceScanExec

Why are the changes needed?

RowDataSourceScanExec requires the full output instead of the scan output after column pruning. However, in v2 code path, we don't have the full output anymore so we just pass the pruned output. RowDataSourceScanExec.fullOutput is actually meaningless so we should remove it.

Does this PR introduce any user-facing change?

No

How was this patch tested?

existing tests

@SparkQA
Copy link

SparkQA commented Aug 12, 2020

Test build #127371 has finished for PR 29415 at commit 9558823.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@huaxingao
Copy link
Contributor Author

cc @cloud-fan @MaxGekk @viirya

@@ -99,17 +99,14 @@ trait DataSourceScanExec extends LeafExecNode {

/** Physical plan node for scanning data from a relation. */
case class RowDataSourceScanExec(
fullOutput: Seq[Attribute],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you find out the PR that added it? I can't quite remember why we have it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was introduced in #18600 for plan equality comparison.
I manually print out the two canonicalized plans for df1 and df2 in https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/RowDataSourceStrategySuite.scala#L68 to check my change.
Before my change:

*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#25]
+- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
+- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [none#0,none#1] PushedFilters: [], ReadSchema: structnone:int,none:int

*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#52]
+- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
+- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [none#0,none#2] PushedFilters: [], ReadSchema: structnone:int,none:int

After my change :

*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#25]
+- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
+- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [A#0,B#1] PushedFilters: [], ReadSchema: struct<A:int,B:int>

*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#52]
+- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
+- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [A#0,C#2] PushedFilters: [], ReadSchema: struct<A:int,C:int>

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fullOutput seems having no actual usage except for plan comparison. If we can make sure we don't break it, looks ok to remove fullOutput.

@@ -143,7 +140,6 @@ case class RowDataSourceScanExec(
// Don't care about `rdd` and `tableIdentifier` when canonicalizing.
override def doCanonicalize(): SparkPlan =
copy(
fullOutput.map(QueryPlan.normalizeExpressions(_, fullOutput)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to normalize output now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileSourceScanExec does it as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to add requiredSchema to RowDataSourceScanExec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't know that we need to use the normalized exprId in canonicalized plan. If we do, then probably we can't remove fullOutput from RowDataSourceScanExec, because using the normalized pruned output would cause problem. For example, in https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/RowDataSourceStrategySuite.scala#L68, normalized the pruned output will give none#0,none#1 for both df1 and df2, and then both of them have exactly the same plan

*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#110]
   +- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
      +- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [none#0,none#1] PushedFilters: [], ReadSchema: struct<none:int,none:int>

Then in df1.union(df2), it takes ReusedExchange code path since both plans are equal

== Physical Plan ==
Union
:- *(2) HashAggregate(keys=[a#0], functions=[min(b#1)], output=[a#0, min(b)#12])
:  +- Exchange hashpartitioning(a#0, 5), true, [id=#34]
:     +- *(1) HashAggregate(keys=[a#0], functions=[partial_min(b#1)], output=[a#0, min#28])
:        +- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [A#0,B#1] PushedFilters: [], ReadSchema: struct<A:int,B:int>
+- *(4) HashAggregate(keys=[a#0], functions=[min(c#2)], output=[a#0, min(c)#24])
   +- ReusedExchange [a#0, min#30], Exchange hashpartitioning(a#0, 5), true, [id=#34]

The union result will be

+---+------+
|  a|min(b)|
+---+------+
|  1|     2|
|  1|     2|
+---+------+

instead of

+---+------+
|  a|min(b)|
+---+------+
|  1|     2|
|  1|     3|
+---+------+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea that's why I propose to add requiredSchema, like what FileSourceScanExec does. But I'm not sure how hard it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I added requiredSchema, could you please take a look to see if that's what you want?

@SparkQA
Copy link

SparkQA commented Aug 13, 2020

Test build #127416 has finished for PR 29415 at commit bd58665.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 14003d4 Aug 14, 2020
@huaxingao
Copy link
Contributor Author

Thanks! @cloud-fan @viirya

@huaxingao huaxingao deleted the rm_full_output branch August 14, 2020 15:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants