-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22548][SQL] Incorrect nested AND expression pushed down to JDBC data source #19776
Conversation
Looks like a bug and be there for a long while. cc @cloud-fan @HyukjinKwon can you help trigger the test? Thanks. |
@jliwork Can you fix the PR title? The title is cut when pasting on. |
@viirya Thanks for letting me know, Simon. I've fixed the title. Can someone help trigger the tests please? |
ok to test |
@@ -497,7 +497,10 @@ object DataSourceStrategy { | |||
Some(sources.IsNotNull(a.name)) | |||
|
|||
case expressions.And(left, right) => | |||
(translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And) | |||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a small comment like the PR you pointed out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Will do. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Follow what @yhuai wrote in the PR https://github.com/apache/spark/pull/10362/files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Just did that as you suggested.
This affects correctness, should we also backport to 2.2? |
@viirya I'm fine with backport to 2.2 unless anyone objects. |
@jliwork Let's see what @cloud-fan @felixcheung think about it. |
Test build #84010 has finished for PR 19776 at commit
|
@@ -497,7 +497,11 @@ object DataSourceStrategy { | |||
Some(sources.IsNotNull(a.name)) | |||
|
|||
case expressions.And(left, right) => | |||
(translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And) | |||
// See SPARK-12218 and PR 10362 for detailed discussion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the comment, you need to give an example to explain why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I have added more comments there with an example. Thanks, Sean!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually we don't list PR number but just JIRA number is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya I see. Thanks, Simon! I've removed the PR number from the comment.
Test build #84017 has finished for PR 19776 at commit
|
Test build #84018 has finished for PR 19776 at commit
|
Test build #84019 has finished for PR 19776 at commit
|
for { | ||
leftFilter <- translateFilter(left) | ||
rightFilter <- translateFilter(right) | ||
} yield sources.And(leftFilter, rightFilter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need SPARK-12218 after this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would think so. SPARK-12218 put fixes into ParquetFilters.createFilter
and OrcFilters.createFilter
. They're similar to DataSourceStrategy.translateFilter
but have different signature customized for Parquet and ORC. For all datasources including JDBC, Parquet, etc, translateFilter
is called to determine if a predicate Expression
can be pushed down as a Filter
or not. Next for Parquet and ORC, Filters get mapped to Parquet or ORC specific filters with their own createFilter
method.
So this PR does help all data sources to get the correct set of push down predicates. Without this PR we simply got lucky with Parquet and ORC in terms of result correctness because 1) it looks like we always apply Filter
on top of scan; 2) we end up with same number of or more rows returned with one leg missing from AND
.
JDBC data source does not always come with Filter
on top of scan therefore exposed the bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not need to clean up the codes in this PR. Let us minimize the code changes and it can simplify the backport.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although Catalyst predicate expressions are all converted to sources.Filter
when we try to push down them. Not all convertible filters can be handled by Parquet and ORC. So I think we still can face the case only one sub-filter of AND
can be pushed down by the file format.
assert(df7.collect.toSet === Set(Row("fred", 1), Row("mary", 2))) | ||
assert(df8.collect.toSet === Set(Row("fred", 1), Row("mary", 2))) | ||
assert(df9.collect.toSet === Set(Row("fred", 1), Row("mary", 2))) | ||
assert(df10.collect.toSet === Set(Row("fred", 1), Row("mary", 2))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to create a new DataSourceStrategySuite
to test the translateFilter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I can help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are end-to-end test cases.
If you can, we should also add such a unit test suite. In the future, we can add more unit test cases for verifying more complex cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went ahead and added a new DataSourceStrategySuite
to test the translateFilter
. Please free feel to let me know of any further comments. Thanks!
good catch! It's a long-standing bug and I think we should backport it all the way to 2.0 |
import org.apache.spark.sql.types._ | ||
|
||
|
||
class DataSourceStrategySuite extends QueryTest with SharedSQLContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extends PlanTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks!
|
||
assertResult(Some(sources.EqualTo("cint", 1))) { | ||
DataSourceStrategy.translateFilter( | ||
expressions.EqualTo(attrInt, Literal(1))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to call Literal
here. It will be implicitly casted to Literal
expressions.EqualTo(attrInt,1))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks!
|
||
test("translate simple expression") { | ||
val attrInt = AttributeReference("cint", IntegerType)() | ||
val attrStr = AttributeReference("cstr", StringType)() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import org.apache.spark.sql.catalyst.dsl.expressions._
You can simplify your test cases.
val attrInt = 'cint.int
val attrStr = 'cstr.string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks!
assertResult(None) { | ||
DataSourceStrategy.translateFilter( | ||
expressions.LessThanOrEqual( | ||
expressions.Subtract(expressions.Abs(attrInt), 2), 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be better to add a comment to say that abs
is not supported
expressions.And( | ||
expressions.GreaterThan(attrInt, 1), | ||
expressions.LessThan( | ||
expressions.Abs(attrInt), 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Test build #84055 has finished for PR 19776 at commit
|
Test build #84047 has finished for PR 19776 at commit
|
Test build #84053 has finished for PR 19776 at commit
|
"WHERE NOT((THEID < 0 OR NAME != 'mary') AND (THEID != 1 OR NAME != 'fred'))") | ||
val df9 = sql("SELECT * FROM foobar " + | ||
"WHERE NOT((THEID < 0 OR NAME != 'mary') AND (THEID != 1 OR TRIM(NAME) != 'fred'))") | ||
val df10 = sql("SELECT * FROM foobar " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to test so many cases? as an end-to-end test, I think we only need a typical case.
LGTM except a few minor comments |
@cloud-fan Thank you for your comments! I have updated the test cases as you suggested. |
assertResult(None) { | ||
DataSourceStrategy.translateFilter( | ||
expressions.LessThanOrEqual( | ||
expressions.Subtract(expressions.Abs(attrInt), 2), 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move the comment to this line? i.e.
// `Abs` expression cannot be pushed down
expressions.Subtract(expressions.Abs(attrInt), 2), 1))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks.
Test build #84063 has finished for PR 19776 at commit
|
@@ -296,9 +296,15 @@ class JDBCSuite extends SparkFunSuite | |||
// The older versions of spark have this kind of bugs in parquet data source. | |||
val df1 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2 AND NAME != 'mary')") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I leave the comment in #10468 (comment), the above test doesn't actually test against SPARK-12218 issue. Maybe we can simply drop it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
assertResult(Some(sources.EqualTo("cint", 1))) { | ||
DataSourceStrategy.translateFilter( | ||
expressions.EqualTo(attrInt, 1)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we can have a small helper function:
def testTranslateFilter(catalystFilter: Expression, result: Option[sources.Filter]): Unit = {
assertResult(result) {
DataSourceStrategy.translateFilter(catalystFilter)
}
}
So the tests can be rewritten as:
testTranslateFilter(expressions.EqualTo(attrInt, 1), Some(sources.EqualTo("cint", 1)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I've followed your suggestion and the test suite looks cleaner now.
|
||
import org.apache.spark.sql.catalyst.dsl.expressions._ | ||
import org.apache.spark.sql.catalyst.expressions | ||
import org.apache.spark.sql.catalyst.expressions._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you import expressions._
, I think we can write EqualTo
instead of expressions.EqualTo
for catalyst predicates below?
Because you always write sources.
EqualTo`, I think we don't confuse with them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. Fixed.
Few comments, otherwise LGTM. |
LGTM otherwise too. |
Thanks for everyone's comments! I have polished the test cases. |
|
||
// ABS(cint) - 2 = 1 | ||
testTranslateFilter(LessThanOrEqual( | ||
// Expressions are not supported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch @_@ fixed the typo. Thanks!
LGTM |
Test build #84086 has finished for PR 19776 at commit
|
LGTM |
…C data source ## What changes were proposed in this pull request? Let’s say I have a nested AND expression shown below and p2 can not be pushed down, (p1 AND p2) OR p3 In current Spark code, during data source filter translation, (p1 AND p2) is returned as p1 only and p2 is simply lost. This issue occurs with JDBC data source and is similar to [SPARK-12218](#10362) for Parquet. When we have AND nested below another expression, we should either push both legs or nothing. Note that: - The current Spark code will always split conjunctive predicate before it determines if a predicate can be pushed down or not - If I have (p1 AND p2) AND p3, it will be split into p1, p2, p3. There won't be nested AND expression. - The current Spark code logic for OR is OK. It either pushes both legs or nothing. The same translation method is also called by Data Source V2. ## How was this patch tested? Added new unit test cases to JDBCSuite gatorsmile Author: Jia Li <[email protected]> Closes #19776 from jliwork/spark-22548. (cherry picked from commit 881c5c8) Signed-off-by: gatorsmile <[email protected]>
…C data source ## What changes were proposed in this pull request? Let’s say I have a nested AND expression shown below and p2 can not be pushed down, (p1 AND p2) OR p3 In current Spark code, during data source filter translation, (p1 AND p2) is returned as p1 only and p2 is simply lost. This issue occurs with JDBC data source and is similar to [SPARK-12218](#10362) for Parquet. When we have AND nested below another expression, we should either push both legs or nothing. Note that: - The current Spark code will always split conjunctive predicate before it determines if a predicate can be pushed down or not - If I have (p1 AND p2) AND p3, it will be split into p1, p2, p3. There won't be nested AND expression. - The current Spark code logic for OR is OK. It either pushes both legs or nothing. The same translation method is also called by Data Source V2. ## How was this patch tested? Added new unit test cases to JDBCSuite gatorsmile Author: Jia Li <[email protected]> Closes #19776 from jliwork/spark-22548. (cherry picked from commit 881c5c8) Signed-off-by: gatorsmile <[email protected]>
Thanks! Merged to master/2.2/2.1 |
@gatorsmile @cloud-fan @viirya @HyukjinKwon Thanks a lot! =) |
Test build #84087 has finished for PR 19776 at commit
|
…C data source ## What changes were proposed in this pull request? Let’s say I have a nested AND expression shown below and p2 can not be pushed down, (p1 AND p2) OR p3 In current Spark code, during data source filter translation, (p1 AND p2) is returned as p1 only and p2 is simply lost. This issue occurs with JDBC data source and is similar to [SPARK-12218](apache#10362) for Parquet. When we have AND nested below another expression, we should either push both legs or nothing. Note that: - The current Spark code will always split conjunctive predicate before it determines if a predicate can be pushed down or not - If I have (p1 AND p2) AND p3, it will be split into p1, p2, p3. There won't be nested AND expression. - The current Spark code logic for OR is OK. It either pushes both legs or nothing. The same translation method is also called by Data Source V2. ## How was this patch tested? Added new unit test cases to JDBCSuite gatorsmile Author: Jia Li <[email protected]> Closes apache#19776 from jliwork/spark-22548. (cherry picked from commit 881c5c8) Signed-off-by: gatorsmile <[email protected]>
What changes were proposed in this pull request?
Let’s say I have a nested AND expression shown below and p2 can not be pushed down,
(p1 AND p2) OR p3
In current Spark code, during data source filter translation, (p1 AND p2) is returned as p1 only and p2 is simply lost. This issue occurs with JDBC data source and is similar to SPARK-12218 for Parquet. When we have AND nested below another expression, we should either push both legs or nothing.
Note that:
The same translation method is also called by Data Source V2.
How was this patch tested?
Added new unit test cases to JDBCSuite
@gatorsmile