-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21351][SQL] Update nullability based on children's output #18576
Conversation
@gatorsmile I probably missed something though...) Is it an expected design to ignore this kind of nullability (I saw your tests related to this issue here)? |
Test build #79401 has finished for PR 18576 at commit
|
Jenkins, retest this please. |
Test build #79423 has finished for PR 18576 at commit
|
Test build #79430 has finished for PR 18576 at commit
|
try { | ||
SQLConf.get.setConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED, true) | ||
val logicalPlan = testRelation.where('a =!= 2).select('a).analyze | ||
var expectedSchema = new StructType().add("a", "INT", nullable = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we use val
by using a different variable at line 47?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile suggested we better do so for multiple use in assertion.
@@ -132,7 +132,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) | |||
OptimizeCodegen) :: | |||
Batch("RewriteSubquery", Once, | |||
RewritePredicateSubquery, | |||
CollapseProject) :: Nil | |||
CollapseProject) :: | |||
Batch("UpdateAttributeReferences", Once, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we apply this optimization earlier to encourage other optimizations that uses information on nullable
?
For example, before Operator Optimizations
?
Or, is there any reason to put this optimization at the end of the optimizations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original motivation is to fix nullability inconsistency in logical plans and I couldn't find concrete examples to make the optimization better by using this new rule. Any idea?
test("update nullability when inferred constraints applied") { | ||
val testRelation = LocalRelation('a.int, 'b.int) | ||
try { | ||
SQLConf.get.setConf(SQLConf.CONSTRAINT_PROPAGATION_ENABLED, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we use PlanTest.withSQLConf
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update. Thanks!
Test build #79445 has finished for PR 18576 at commit
|
Jenkins, retest this please. |
Test build #79449 has finished for PR 18576 at commit
|
Jenkins, retest this please. |
Test build #79463 has finished for PR 18576 at commit
|
ping |
@gatorsmile If you get time, could you also check this? Thanks! |
If we can update the nullability in Optimizer rules, do we still need to do it in |
Yea, if we can do so, I feel it might be the best. I'll check if we can remove nullability update in |
Test build #80293 has finished for PR 18576 at commit
|
Test build #80303 has finished for PR 18576 at commit
|
Test build #80304 has finished for PR 18576 at commit
|
Test build #80305 has finished for PR 18576 at commit
|
I'm looking into the failure reason... |
c552a6e
to
5d2fd6d
Compare
Test build #80315 has finished for PR 18576 at commit
|
a | ||
} | ||
} | ||
child.output.map { attr => outputAttrs.find(_.exprId == attr.exprId).getOrElse(attr) } | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I simply tried to drop updating nullability and reuse output attributes outputAttrs
in an optimized logical plan here though, some python tests failed (all the scala tests passed). I checked and I found this; in the planner path of python, we have some cases changing operator's output from the optimized logical plan to a physical plan.
For example;
sql("""SELECT strlen(a) FROM test WHERE strlen(a) > 1""")
// pyspark
>>> spark.sql("SELECT strlen(a) FROM test WHERE strlen(a) > 1").explain(True)
...
== Optimized Logical Plan ==
Project [strlen(a#0) AS strlen(a)#30]
+- Filter (strlen(a#0) > 1)
+- LogicalRDD [a#0]
== Physical Plan ==
*Project [pythonUDF0#34 AS strlen(a)#30]
+- BatchEvalPython [strlen(a#0)], [a#0, pythonUDF0#34]
+- *Filter (pythonUDF0#33 > 1), [a#0]
+- BatchEvalPython [strlen(a#0)], [a#0, pythonUDF0#33]
+- Scan ExistingRDD[a#0]
So, I added code to check a difference between outputAttrs
and child.output
here.
Could you give me insight on this? @gatorsmile
ok, I'll do. |
An alternative solution is here and this is based on the suggestion. |
Test build #88653 has finished for PR 18576 at commit
|
val nullabilityMap = AttributeMap(p.children.flatMap(_.output).map { x => x -> x.nullable }) | ||
p transformExpressions { | ||
case ar: AttributeReference => | ||
nullabilityMap.get(ar).filterNot(_ == ar.nullable).map { nullable => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we just write
case a: AttributeReference if nullabilityMap.contains(a) => a.withNullability(nullabilityMap(a))
* Updates nullability in [[AttributeReference]]s if nullability is different between | ||
* non-leaf plan's expressions and the children output. | ||
*/ | ||
object UpdateNullabilityInAttributeReferences extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this rule itself is useful without the filter optimization.
In general, I don't think it's useful to optimize filter using the IsNotNull
expression, as it's not a common case. I think we can update Join.output
to specify nullability for join keys, which seems easier and a more common case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I'll make this pr simpler (I'll drop some changes for filters).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dropped the changes of execution.FilterExec
though, you suggested we would drop the changes of logical.Filter
, too? https://github.com/apache/spark/pull/18576/files#diff-72917e7b68f0311b2fb42990e0dc616dR139
I basically agree that the Join.output
modification is more simple/important, but is it okay to ignore nullability in logical.Filter
? For example, in the current master, QueryPlanConstraints.inferIsNotNullConstraints
appends non-nullable constraints in logical.Filter
and this constraints aren't correctly propagated into upper plan nodes now. So, I think it'd be better to respect nullability in both logical.Join
and logical.Filter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we think more about how these IsNotNull
constraints are generated, they mostly come from join. I don't think it's a common case users put IsNotNull
in filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I'll drop it in this pr. just a sec.
Test build #88657 has finished for PR 18576 at commit
|
retest this please |
Test build #88662 has finished for PR 18576 at commit
|
Test build #88665 has finished for PR 18576 at commit
|
Test build #88772 has finished for PR 18576 at commit
|
@cloud-fan I’m ganna to fix the join stuffs described in your comment in following prs, ok? |
I checked if we could fix the join issue: c4ca4f3 |
ping |
thanks, merging to master! |
## What changes were proposed in this pull request? This pr added a new optimizer rule `UpdateNullabilityInAttributeReferences ` to update the nullability that `Filter` changes when having `IsNotNull`. In the master, optimized plans do not respect the nullability when `Filter` has `IsNotNull`. This wrongly generates unnecessary code. For example: ``` scala> val df = Seq((Some(1), Some(2))).toDF("a", "b") scala> val bIsNotNull = df.where($"b" =!= 2).select($"b") scala> val targetQuery = bIsNotNull.distinct scala> val targetQuery.queryExecution.optimizedPlan.output(0).nullable res5: Boolean = true scala> targetQuery.debugCodegen Found 2 WholeStageCodegen subtrees. == Subtree 1 / 2 == *HashAggregate(keys=[b#19], functions=[], output=[b#19]) +- Exchange hashpartitioning(b#19, 200) +- *HashAggregate(keys=[b#19], functions=[], output=[b#19]) +- *Project [_2#16 AS b#19] +- *Filter isnotnull(_2#16) +- LocalTableScan [_1#15, _2#16] Generated code: ... /* 124 */ protected void processNext() throws java.io.IOException { ... /* 132 */ // output the result /* 133 */ /* 134 */ while (agg_mapIter.next()) { /* 135 */ wholestagecodegen_numOutputRows.add(1); /* 136 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey(); /* 137 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue(); /* 138 */ /* 139 */ boolean agg_isNull4 = agg_aggKey.isNullAt(0); /* 140 */ int agg_value4 = agg_isNull4 ? -1 : (agg_aggKey.getInt(0)); /* 141 */ agg_rowWriter1.zeroOutNullBytes(); /* 142 */ // We don't need this NULL check because NULL is filtered out in `$"b" =!=2` /* 143 */ if (agg_isNull4) { /* 144 */ agg_rowWriter1.setNullAt(0); /* 145 */ } else { /* 146 */ agg_rowWriter1.write(0, agg_value4); /* 147 */ } /* 148 */ append(agg_result1); /* 149 */ /* 150 */ if (shouldStop()) return; /* 151 */ } /* 152 */ /* 153 */ agg_mapIter.close(); /* 154 */ if (agg_sorter == null) { /* 155 */ agg_hashMap.free(); /* 156 */ } /* 157 */ } /* 158 */ /* 159 */ } ``` In the line 143, we don't need this NULL check because NULL is filtered out in `$"b" =!=2`. This pr could remove this NULL check; ``` scala> val targetQuery.queryExecution.optimizedPlan.output(0).nullable res5: Boolean = false scala> targetQuery.debugCodegen ... Generated code: ... /* 144 */ protected void processNext() throws java.io.IOException { ... /* 152 */ // output the result /* 153 */ /* 154 */ while (agg_mapIter.next()) { /* 155 */ wholestagecodegen_numOutputRows.add(1); /* 156 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey(); /* 157 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue(); /* 158 */ /* 159 */ int agg_value4 = agg_aggKey.getInt(0); /* 160 */ agg_rowWriter1.write(0, agg_value4); /* 161 */ append(agg_result1); /* 162 */ /* 163 */ if (shouldStop()) return; /* 164 */ } /* 165 */ /* 166 */ agg_mapIter.close(); /* 167 */ if (agg_sorter == null) { /* 168 */ agg_hashMap.free(); /* 169 */ } /* 170 */ } ``` ## How was this patch tested? Added `UpdateNullabilityInAttributeReferencesSuite` for unit tests. Author: Takeshi Yamamuro <[email protected]> Closes apache#18576 from maropu/SPARK-21351.
## What changes were proposed in this pull request? This pr added a new optimizer rule `UpdateNullabilityInAttributeReferences ` to update the nullability that `Filter` changes when having `IsNotNull`. In the master, optimized plans do not respect the nullability when `Filter` has `IsNotNull`. This wrongly generates unnecessary code. For example: ``` scala> val df = Seq((Some(1), Some(2))).toDF("a", "b") scala> val bIsNotNull = df.where($"b" =!= 2).select($"b") scala> val targetQuery = bIsNotNull.distinct scala> val targetQuery.queryExecution.optimizedPlan.output(0).nullable res5: Boolean = true scala> targetQuery.debugCodegen Found 2 WholeStageCodegen subtrees. == Subtree 1 / 2 == *HashAggregate(keys=[b#19], functions=[], output=[b#19]) +- Exchange hashpartitioning(b#19, 200) +- *HashAggregate(keys=[b#19], functions=[], output=[b#19]) +- *Project [_2#16 AS b#19] +- *Filter isnotnull(_2#16) +- LocalTableScan [_1#15, _2#16] Generated code: ... /* 124 */ protected void processNext() throws java.io.IOException { ... /* 132 */ // output the result /* 133 */ /* 134 */ while (agg_mapIter.next()) { /* 135 */ wholestagecodegen_numOutputRows.add(1); /* 136 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey(); /* 137 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue(); /* 138 */ /* 139 */ boolean agg_isNull4 = agg_aggKey.isNullAt(0); /* 140 */ int agg_value4 = agg_isNull4 ? -1 : (agg_aggKey.getInt(0)); /* 141 */ agg_rowWriter1.zeroOutNullBytes(); /* 142 */ // We don't need this NULL check because NULL is filtered out in `$"b" =!=2` /* 143 */ if (agg_isNull4) { /* 144 */ agg_rowWriter1.setNullAt(0); /* 145 */ } else { /* 146 */ agg_rowWriter1.write(0, agg_value4); /* 147 */ } /* 148 */ append(agg_result1); /* 149 */ /* 150 */ if (shouldStop()) return; /* 151 */ } /* 152 */ /* 153 */ agg_mapIter.close(); /* 154 */ if (agg_sorter == null) { /* 155 */ agg_hashMap.free(); /* 156 */ } /* 157 */ } /* 158 */ /* 159 */ } ``` In the line 143, we don't need this NULL check because NULL is filtered out in `$"b" =!=2`. This pr could remove this NULL check; ``` scala> val targetQuery.queryExecution.optimizedPlan.output(0).nullable res5: Boolean = false scala> targetQuery.debugCodegen ... Generated code: ... /* 144 */ protected void processNext() throws java.io.IOException { ... /* 152 */ // output the result /* 153 */ /* 154 */ while (agg_mapIter.next()) { /* 155 */ wholestagecodegen_numOutputRows.add(1); /* 156 */ UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey(); /* 157 */ UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue(); /* 158 */ /* 159 */ int agg_value4 = agg_aggKey.getInt(0); /* 160 */ agg_rowWriter1.write(0, agg_value4); /* 161 */ append(agg_result1); /* 162 */ /* 163 */ if (shouldStop()) return; /* 164 */ } /* 165 */ /* 166 */ agg_mapIter.close(); /* 167 */ if (agg_sorter == null) { /* 168 */ agg_hashMap.free(); /* 169 */ } /* 170 */ } ``` ## How was this patch tested? Added `UpdateNullabilityInAttributeReferencesSuite` for unit tests. Author: Takeshi Yamamuro <[email protected]> Closes apache#18576 from maropu/SPARK-21351.
RemoveRedundantProject) | ||
RemoveRedundantProject) :+ | ||
Batch("UpdateAttributeReferences", Once, | ||
UpdateNullabilityInAttributeReferences) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that this rule is very similar to FixNullability
. Do we have to keep both of them? cc @maropu @gatorsmile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UpdateNullabilityInAttributeReferences
is an optimizer rule that was added after we introduced the analyzer rule FixNullability
. Do we have any end-to-end test case for showing the benefit of this optimizer rule?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, I have the same opinion with @gatorsmile; FixNullability
is related to the correctness, but UpdateNullabilityInAttributeReferences
is not (just for optimization).
@gatorsmile I think we haven't checked actual performance benefits (e.g., wall time) from the rule now. I just assume that this rule could generate better code from the code generator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But these 2 rules are almost same, except that FixNullability
skips resolved plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I see... But, I have no idea about the way to resolve the two issue (the correctness issue and the optimization issue) simultaneously in a single place. I think there is one greedy & temporary solution that merges the current two rule into one, and then put it in the two places: the analyzer and the optimizer.
…h FixNullability ## What changes were proposed in this pull request? This is a followup of #18576 The newly added rule `UpdateNullabilityInAttributeReferences` does the same thing the `FixNullability` does, we only need to keep one of them. This PR removes `UpdateNullabilityInAttributeReferences`, and use `FixNullability` to replace it. Also rename it to `UpdateAttributeNullability` ## How was this patch tested? existing tests Closes #23390 from cloud-fan/nullable. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]>
…h FixNullability ## What changes were proposed in this pull request? This is a followup of apache#18576 The newly added rule `UpdateNullabilityInAttributeReferences` does the same thing the `FixNullability` does, we only need to keep one of them. This PR removes `UpdateNullabilityInAttributeReferences`, and use `FixNullability` to replace it. Also rename it to `UpdateAttributeNullability` ## How was this patch tested? existing tests Closes apache#23390 from cloud-fan/nullable. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]>
What changes were proposed in this pull request?
This pr added a new optimizer rule
UpdateNullabilityInAttributeReferences
to update the nullability thatFilter
changes when havingIsNotNull
. In the master, optimized plans do not respect the nullability whenFilter
hasIsNotNull
. This wrongly generates unnecessary code. For example:In the line 143, we don't need this NULL check because NULL is filtered out in
$"b" =!=2
.This pr could remove this NULL check;
How was this patch tested?
Added
UpdateNullabilityInAttributeReferencesSuite
for unit tests.