Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32590][SQL] Remove fullOutput from RowDataSourceScanExec #29415

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,15 @@ trait DataSourceScanExec extends LeafExecNode {

/** Physical plan node for scanning data from a relation. */
case class RowDataSourceScanExec(
fullOutput: Seq[Attribute],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you find out the PR that added it? I can't quite remember why we have it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was introduced in #18600 for plan equality comparison.
I manually print out the two canonicalized plans for df1 and df2 in https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/RowDataSourceStrategySuite.scala#L68 to check my change.
Before my change:

*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#25]
+- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
+- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [none#0,none#1] PushedFilters: [], ReadSchema: structnone:int,none:int

*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#52]
+- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
+- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [none#0,none#2] PushedFilters: [], ReadSchema: structnone:int,none:int

After my change :

*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#25]
+- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
+- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [A#0,B#1] PushedFilters: [], ReadSchema: struct<A:int,B:int>

*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#52]
+- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
+- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [A#0,C#2] PushedFilters: [], ReadSchema: struct<A:int,C:int>

requiredColumnsIndex: Seq[Int],
output: Seq[Attribute],
requiredSchema: StructType,
filters: Set[Filter],
handledFilters: Set[Filter],
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with InputRDDCodegen {

def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput)

override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

Expand All @@ -136,14 +134,14 @@ case class RowDataSourceScanExec(
if (handledFilters.contains(filter)) s"*$filter" else s"$filter"
}
Map(
"ReadSchema" -> output.toStructType.catalogString,
"ReadSchema" -> requiredSchema.catalogString,
"PushedFilters" -> markedFilters.mkString("[", ", ", "]"))
}

// Don't care about `rdd` and `tableIdentifier` when canonicalizing.
override def doCanonicalize(): SparkPlan =
copy(
fullOutput.map(QueryPlan.normalizeExpressions(_, fullOutput)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to normalize output now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileSourceScanExec does it as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to add requiredSchema to RowDataSourceScanExec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't know that we need to use the normalized exprId in canonicalized plan. If we do, then probably we can't remove fullOutput from RowDataSourceScanExec, because using the normalized pruned output would cause problem. For example, in https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/RowDataSourceStrategySuite.scala#L68, normalized the pruned output will give none#0,none#1 for both df1 and df2, and then both of them have exactly the same plan

*(2) HashAggregate(keys=[none#0], functions=[min(none#0)], output=[none#0, #0])
+- Exchange hashpartitioning(none#0, 5), true, [id=#110]
   +- *(1) HashAggregate(keys=[none#0], functions=[partial_min(none#1)], output=[none#0, none#4])
      +- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [none#0,none#1] PushedFilters: [], ReadSchema: struct<none:int,none:int>

Then in df1.union(df2), it takes ReusedExchange code path since both plans are equal

== Physical Plan ==
Union
:- *(2) HashAggregate(keys=[a#0], functions=[min(b#1)], output=[a#0, min(b)#12])
:  +- Exchange hashpartitioning(a#0, 5), true, [id=#34]
:     +- *(1) HashAggregate(keys=[a#0], functions=[partial_min(b#1)], output=[a#0, min#28])
:        +- *(1) Scan JDBCRelation(TEST.INTTYPES) [numPartitions=1] [A#0,B#1] PushedFilters: [], ReadSchema: struct<A:int,B:int>
+- *(4) HashAggregate(keys=[a#0], functions=[min(c#2)], output=[a#0, min(c)#24])
   +- ReusedExchange [a#0, min#30], Exchange hashpartitioning(a#0, 5), true, [id=#34]

The union result will be

+---+------+
|  a|min(b)|
+---+------+
|  1|     2|
|  1|     2|
+---+------+

instead of

+---+------+
|  a|min(b)|
+---+------+
|  1|     2|
|  1|     3|
+---+------+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea that's why I propose to add requiredSchema, like what FileSourceScanExec does. But I'm not sure how hard it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I added requiredSchema, could you please take a look to see if that's what you want?

output.map(QueryPlan.normalizeExpressions(_, output)),
rdd = null,
tableIdentifier = None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
case l @ LogicalRelation(baseRelation: TableScan, _, _, _) =>
RowDataSourceScanExec(
l.output,
l.output.indices,
l.output.toStructType,
Set.empty,
Set.empty,
toCatalystRDD(l, baseRelation.buildScan()),
Expand Down Expand Up @@ -379,8 +379,8 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
.map(relation.attributeMap)

val scan = RowDataSourceScanExec(
relation.output,
requestedColumns.map(relation.output.indexOf),
requestedColumns,
requestedColumns.toStructType,
pushedFilters.toSet,
handledFilters,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
Expand All @@ -401,8 +401,8 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq

val scan = RowDataSourceScanExec(
relation.output,
requestedColumns.map(relation.output.indexOf),
requestedColumns,
requestedColumns.toStructType,
pushedFilters.toSet,
handledFilters,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}
val rdd = v1Relation.buildScan()
val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd)
val originalOutputNames = relation.table.schema().map(_.name)
val requiredColumnsIndex = output.map(_.name).map(originalOutputNames.indexOf)
val dsScan = RowDataSourceScanExec(
output,
requiredColumnsIndex,
output.toStructType,
translated.toSet,
pushed.toSet,
unsafeRowRDD,
Expand Down