diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index 801396718137..53ccab80a264 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -42,6 +42,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; import org.apache.spark.sql.internal.SQLConf; import org.assertj.core.api.Assertions; +import org.assertj.core.api.Assumptions; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -763,6 +764,35 @@ public void testDefaultSortOrder() { assertEquals("Data after compaction should not change", expectedRecords, actualRecords); } + @Test + public void testRewriteDataFilesWithSystemFunctions() { + Assumptions.assumeThat(catalogName).isNotEqualTo("spark_catalog"); + + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 TIMESTAMP) " + + "USING iceberg " + + "PARTITIONED BY (days(c3)) " + + "TBLPROPERTIES ('%s' '%s')", + tableName, + TableProperties.WRITE_DISTRIBUTION_MODE, + TableProperties.WRITE_DISTRIBUTION_MODE_NONE); + + sql( + "INSERT INTO TABLE %s VALUES (0, 'data-0', CAST('2017-11-22T09:20:44.294658+00:00' AS TIMESTAMP))", + tableName); + sql( + "INSERT INTO TABLE %s VALUES (1, 'data-1', CAST('2017-11-23T03:15:32.194356+00:00' AS TIMESTAMP))", + tableName); + // Test with invalid filter column col1 + Assertions.assertThatThrownBy( + () -> + sql( + "CALL %s.system.rewrite_data_files(table => '%s', where => '%s.system.years(c3) = 2017')", + catalogName, tableIdent, catalogName)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot translate Spark expression to data source filter"); + } + private void createTable() { sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", tableName); } diff --git a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala index 9f53eae60aba..2832de2ad91a 100644 --- a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala +++ b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala @@ -35,7 +35,14 @@ object SparkExpressionConverter { // Currently, it is a double conversion as we are converting Spark expression to Spark filter // and then converting Spark filter to Iceberg expression. // But these two conversions already exist and well tested. So, we are going with this approach. - SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression, supportNestedPredicatePushdown = true).get) + DataSourceStrategy.translateFilter(sparkExpression, supportNestedPredicatePushdown = true) match { + case Some(filter) => + val converted = SparkFilters.convert(filter) + assert(converted != null, s"Cannot convert Spark filter to Iceberg expression: $filter") + converted + case _ => + throw new IllegalArgumentException(s"Cannot translate Spark expression to data source filter: $sparkExpression") + } } @throws[AnalysisException]