Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Rewrite procedure throw better exception when filter expression cannot translate #8394

Merged
merged 4 commits into from
Sep 20, 2023

Conversation

ConeyLiu
Copy link
Contributor

@ConeyLiu ConeyLiu commented Aug 25, 2023

For rewrite procedure, we should throw better exceptions when the filter condition can't be pushed down or can't convert to Iceberg. For example:

scala> spark.sql("call local.system.rewrite_data_files(table => 'db.test_rewrite', where => 'substr(data, 2) = \"fo\"')").show()
java.util.NoSuchElementException: None.get
  at scala.None$.get(Option.scala:529)
  at scala.None$.get(Option.scala:527)
  at org.apache.spark.sql.execution.datasources.SparkExpressionConverter$.convertToIcebergExpression(SparkExpressionConverter.scala:38)
  at org.apache.spark.sql.execution.datasources.SparkExpressionConverter.convertToIcebergExpression(SparkExpressionConverter.scala)
  at org.apache.iceberg.spark.procedures.RewriteDataFilesProcedure.checkAndApplyFilter(RewriteDataFilesProcedure.java:137)
  at org.apache.iceberg.spark.procedures.RewriteDataFilesProcedure.lambda$call$0(RewriteDataFilesProcedure.java:123)
  at org.apache.iceberg.spark.procedures.BaseProcedure.execute(BaseProcedure.java:107)
  at org.apache.iceberg.spark.procedures.BaseProcedure.modifyIcebergTable(BaseProcedure.java:88)
  at org.apache.iceberg.spark.procedures.RewriteDataFilesProcedure.call(RewriteDataFilesProcedure.java:103)

After this PR:

scala> spark.sql("call local.system.rewrite_data_files(table => 'db.test_rewrite', where => 'substr(data, 2) = \"fo\"')").show()
java.lang.IllegalArgumentException: Cannot convert Spark filter: (data IS NOT NULL) AND ((SUBSTRING(data, 2)) = 'fo') to Iceberg expression
  at org.apache.spark.sql.execution.datasources.SparkExpressionConverter$.convertToIcebergExpression(SparkExpressionConverter.scala:43)
  at org.apache.spark.sql.execution.datasources.SparkExpressionConverter.convertToIcebergExpression(SparkExpressionConverter.scala)
  at org.apache.iceberg.spark.procedures.BaseProcedure.filterExpression(BaseProcedure.java:171)
  at org.apache.iceberg.spark.procedures.RewriteDataFilesProcedure.checkAndApplyFilter(RewriteDataFilesProcedure.java:129)
  at org.apache.iceberg.spark.procedures.RewriteDataFilesProcedure.lambda$call$0(RewriteDataFilesProcedure.java:118)
  at org.apache.iceberg.spark.procedures.BaseProcedure.execute(BaseProcedure.java:107)
  at org.apache.iceberg.spark.procedures.BaseProcedure.modifyIcebergTable(BaseProcedure.java:88)
  at org.apache.iceberg.spark.procedures.RewriteDataFilesProcedure.call(RewriteDataFilesProcedure.java:100)

@github-actions github-actions bot added the spark label Aug 25, 2023
@@ -35,7 +35,14 @@ object SparkExpressionConverter {
// Currently, it is a double conversion as we are converting Spark expression to Spark filter
// and then converting Spark filter to Iceberg expression.
// But these two conversions already exist and well tested. So, we are going with this approach.
SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression, supportNestedPredicatePushdown = true).get)
DataSourceStrategy.translateFilter(sparkExpression, supportNestedPredicatePushdown = true) match {
Copy link
Contributor Author

@ConeyLiu ConeyLiu Aug 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change this to V2 translator and V2 filter. Then we could convert the system functions to Iceberg expression after #8088 or after apache/spark#42612.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you and I think it's much better than null.get when I saw it in my change

@@ -35,7 +35,14 @@ object SparkExpressionConverter {
// Currently, it is a double conversion as we are converting Spark expression to Spark filter
// and then converting Spark filter to Iceberg expression.
// But these two conversions already exist and well tested. So, we are going with this approach.
SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression, supportNestedPredicatePushdown = true).get)
DataSourceStrategy.translateFilter(sparkExpression, supportNestedPredicatePushdown = true) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you and I think it's much better than null.get when I saw it in my change

@ConeyLiu
Copy link
Contributor Author

cc @dramaticlly @RussellSpitzer @nastra @advancedxy code has rebased, please take a look when you are free.

Copy link
Contributor

@dramaticlly dramaticlly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @ConeyLiu , LGTM!

@@ -828,6 +828,26 @@ public void testDefaultSortOrder() {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteWithUntranslatedOrUnconvertedFilter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this can potentially be part of existing test testRewriteDataFilesWithInvalidInputs but I think it's also fine to leave it here.

Copy link
Contributor

@nastra nastra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I think we should also apply this against Spark 3.5 now

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, except a minor wording comment.

@ConeyLiu
Copy link
Contributor Author

I think we should also apply this against Spark 3.5 now

@nastra should we do it with a follow-up PR to port the changes to other spark versions? Because it exists in all other spark versions.

@nastra
Copy link
Contributor

nastra commented Sep 19, 2023

Follow-up PR is fine IMO (whatever you prefer)

DataSourceV2Strategy.translateFilterV2(sparkExpression) match {
case Some(filter) =>
val converted = SparkV2Filters.convert(filter)
assert(converted != null, s"Cannot convert Spark filter: $filter to Iceberg expression")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this. Is it normal to use assert in Scala? I'd rather prefer throwing IAE here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we had used them quite often in scala code: https://github.com/search?q=repo%3Aapache%2Ficeberg%20%20assert(&type=code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert is common usage in Spark code. Anyway, changed it to IllegalArgumentException to keep the same behavior as Cannot translate Spark expression.

@nastra nastra merged commit 09a5dbc into apache:master Sep 20, 2023
37 checks passed
@ConeyLiu
Copy link
Contributor Author

Thanks @nastra for merging this. And thanks @dramaticlly @advancedxy for the review. I will submit a backport.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants