Skip to content

Commit

Permalink
better error message
Browse files Browse the repository at this point in the history
  • Loading branch information
ConeyLiu committed Aug 25, 2023
1 parent 1f8dbae commit 62d223f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.apache.spark.sql.internal.SQLConf;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
Expand Down Expand Up @@ -763,6 +764,35 @@ public void testDefaultSortOrder() {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithSystemFunctions() {
Assumptions.assumeThat(catalogName).isNotEqualTo("spark_catalog");

sql(
"CREATE TABLE %s (c1 INT, c2 STRING, c3 TIMESTAMP) "
+ "USING iceberg "
+ "PARTITIONED BY (days(c3)) "
+ "TBLPROPERTIES ('%s' '%s')",
tableName,
TableProperties.WRITE_DISTRIBUTION_MODE,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE);

sql(
"INSERT INTO TABLE %s VALUES (0, 'data-0', CAST('2017-11-22T09:20:44.294658+00:00' AS TIMESTAMP))",
tableName);
sql(
"INSERT INTO TABLE %s VALUES (1, 'data-1', CAST('2017-11-23T03:15:32.194356+00:00' AS TIMESTAMP))",
tableName);
// Test with invalid filter column col1
Assertions.assertThatThrownBy(
() ->
sql(
"CALL %s.system.rewrite_data_files(table => '%s', where => '%s.system.years(c3) = 2017')",
catalogName, tableIdent, catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot translate Spark expression to data source filter");
}

private void createTable() {
sql("CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg", tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,14 @@ object SparkExpressionConverter {
// Currently, it is a double conversion as we are converting Spark expression to Spark filter
// and then converting Spark filter to Iceberg expression.
// But these two conversions already exist and well tested. So, we are going with this approach.
SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression, supportNestedPredicatePushdown = true).get)
DataSourceStrategy.translateFilter(sparkExpression, supportNestedPredicatePushdown = true) match {
case Some(filter) =>
val converted = SparkFilters.convert(filter)
assert(converted != null, s"Cannot convert Spark filter to Iceberg expression: $filter")
converted
case _ =>
throw new IllegalArgumentException(s"Cannot translate Spark expression to data source filter: $sparkExpression")
}
}

@throws[AnalysisException]
Expand Down

0 comments on commit 62d223f

Please sign in to comment.