-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.3, 3.4: use a deterministic where condition to make rewrite_data_files… #6760
Spark 3.3, 3.4: use a deterministic where condition to make rewrite_data_files… #6760
Conversation
… exit directly without exceptions
…ndition is always false
Hi @ludlows , im not too familiar with Spark side , but wondering , doesnt the RewriteDataFiles procedure already do a check to skip if there are no matching data files? Ref: https://github.com/apache/iceberg/blob/master/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java#L177 Not sure if we have a huge savings? |
Hi @szehon-ho , thanks for your comments. Line 120 in 18d45b4
However, if the where condition is always false like Line 136 in 18d45b4
to make the sql like |
Thanks for explanation, problem now makes sense to me. To me, maybe modifying the original method to return a Option will be cleaner ? But I think it will be nice for @aokolnychyi , @rdblue to take a look as well, as they are more familiar with Spark side |
Hi @szehon-ho , thanks for your suggestions. And hi @aokolnychyi , @rdblue , do you have any suggestions about the new feature of |
@ludlows can you please add a unit test to demonstrate the bug as well? |
… to make rewrite_data_files exit without exceptions)
Hi @szehon-ho , I added a test case for this PR. you may comment some lines of code to reproduce the bug in file RewriteDataFilesProcedure.java : // if (where != null && SparkExpressionConverter.checkWhereAlwaysFalse(spark(), quotedFullIdentifier, where)) {
// RewriteDataFiles.Result result = new BaseRewriteDataFilesResult(Lists.newArrayList());
// return toOutputRows(result);
// } let me know if you have any questions. |
I walked through the code and see the problem. I still think, let's change the original method: collectResolvedSparkExpression to not throw AnalysisException in this case. I feel its not so useful to make an additional method that does the same thing , and have the code have to call both.
|
…lvedSparkExpression and renaming this function
Hi @szehon-ho , |
return action.filter(SparkExpressionConverter.convertToIcebergExpression(expression)); | ||
Option<Expression> expressionOption = | ||
SparkExpressionConverter.collectResolvedSparkExpressionOption(spark(), tableName, where); | ||
if (expressionOption.isEmpty()) return action.filter(Expressions.alwaysFalse()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I'm not sure if checkstyle/spotless will fail here, but think we need extra newline in any case for the return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. you are right. the style check failed here. i need to add {}
for each if
statement.
...ark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
Outdated
Show resolved
Hide resolved
…y collectResolvedSparkExpressionOption anymore.
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
Actually one thing is bothering me, can we check, if you pass in a filter that is always true, can we distinguish from always false case? |
we now can tell the on the other hand, we execute rewrite actions as normal if we are sure about |
@szehon-ho thanks for the comments. I am glad to work with you to push this feature availiable in the |
# Conflicts: # spark/v3.3/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
val tableAttrs = session.table(tableName).queryExecution.analyzed.output | ||
val unresolvedExpression = session.sessionState.sqlParser.parseExpression(where) | ||
val filter = Filter(unresolvedExpression, DummyRelation(tableAttrs)) | ||
val optimizedLogicalPlan = session.sessionState.executePlan(filter).optimizedPlan | ||
optimizedLogicalPlan.collectFirst { | ||
case filter: Filter => filter.condition | ||
case filter: Filter => convertToIcebergExpression(filter.condition) | ||
case dummyRelation: DummyRelation => Expressions.alwaysTrue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For cleaner code, can we return Spark's Expression.TRUE, Expression.FALSE, and return the convertToIcebergExpression outside?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @szehon-ho , do we need an Expression.TRUE in spark expression? finally we only need an iceberg one. but it seems possible if we implement it in the following way:
optimizedLogicalPlan.collectFirst {
case filter: Filter =>filter.condition
case dummyRelation: DummyRelation => session.sessionState.sqlParser.parseExpression("true")
case localRelation: LocalRelation => session.sessionState.sqlParser.parseExpression("false")
}.getOrElse(throw new AnalysisException("Failed to find filter expression"))
how do you think about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from spark, Literal.TrueLiteral()
assertEquals( | ||
"Action should rewrite 0 data files and add 0 data files", | ||
row(0, 0), | ||
Arrays.copyOf(output.get(0), 2)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we get output=0,0,0? Can we just assert all 3 values instead of first two in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it is due to the first two values are of type of integer and the last one is of type of long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean the assert fails? How about row(0,0,0L?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Action should rewrite 0 data files and add 0 data files", | ||
row(0, 0), | ||
Arrays.copyOf(output.get(0), 2)); | ||
// verify rewritten bytes separately |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems no need for this comment, as we don't assert for bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. let me fix it.
// create 10 files under non-partitioned table | ||
insertData(10); | ||
List<Object[]> expectedRecords = currentData(); | ||
// select only 0 files for compaction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: select no files..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. let me fix it.
catalogName, tableIdent); | ||
assertEquals( | ||
"Action should rewrite 10 data files and add 1 data files", | ||
row(10, 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: I think the test is more understandable if we just put row(10, 1, Long.valueOf(snapshotSummar().get(...)). I do realize its that way in other tests.
@ludlows do you have a chance to try this: #6760 (comment)? |
@szehon-ho oh, yes. the current version is using the Spark Expression of True and False. we do not need to modify |
Hi @ludlows thanks for the change! Sorry, just realized as now Spark 3.4 is the active branch, could you duplicate the Spark 3.3 changes to Spark 3.4 branch as well in this pr? (As usually we do new change on active branch first, and then backport as necessary). |
@@ -19,7 +19,6 @@ | |||
|
|||
package org.apache.spark.sql.execution.datasources | |||
|
|||
import org.apache.iceberg.expressions.Expressions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this un-intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems that we do not need this import statement. so i remove it here.
Merged, thanks @ludlows |
the behavior is requested in the issue #6759
here I implement the evaluation procedure to check if the where condition is a deterministic false.
if so, the rewrite_data_files exits directly.
Closes #6759