Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Push down system functions by V2 filters for rewriting DataFiles and PositionDeleteFiles #8560

Merged
merged 3 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,37 @@ public void testRewriteDataFilesWithFilterOnPartitionTable() {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithFilterOnOnBucketExpression() {
// The schema `system` cannot be found in spark_catalog
Assume.assumeFalse(catalogName.equals(SparkCatalogConfig.SPARK.catalogName()));
createBucketPartitionTable();
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
// create 5 files for each partition (c2 = 'foo' and c2 = 'bar')
insertData(10);
List<Object[]> expectedRecords = currentData();

// select only 5 files for compaction (files in the partition c2 = 'bar')
List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files(table => '%s',"
+ " where => '%s.system.bucket(2, c2) = 0')",
catalogName, tableIdent, catalogName);

assertEquals(
"Action should rewrite 5 data files from single matching partition"
+ "(containing c2 = bar) and add 1 data files",
row(5, 1),
Arrays.copyOf(output.get(0), 2));
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));

List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithInFilterOnPartitionTable() {
createPartitionTable();
Expand Down Expand Up @@ -480,7 +511,10 @@ public void testRewriteDataFilesWithAllPossibleFilters() {
sql(
"CALL %s.system.rewrite_data_files(table => '%s'," + " where => 'c2 like \"%s\"')",
catalogName, tableIdent, "car%");

// StringStartsWith
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose here? It seems to just copy from L510-L513.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this meant to be a bucket transform call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah good catch. Initially I was about to add filter here for bucket transform (forgot to change) but I end up create a new method to test all V2Filters can be evaluated without exception.

sql(
"CALL %s.system.rewrite_data_files(table => '%s'," + " where => 'c2 like \"%s\"')",
catalogName, tableIdent, "car%");
// TODO: Enable when org.apache.iceberg.spark.SparkFilters have implementations for
// StringEndsWith & StringContains
// StringEndsWith
Expand Down Expand Up @@ -778,6 +812,17 @@ private void createPartitionTable() {
TableProperties.WRITE_DISTRIBUTION_MODE_NONE);
}

private void createBucketPartitionTable() {
sql(
"CREATE TABLE %s (c1 int, c2 string, c3 string) "
+ "USING iceberg "
+ "PARTITIONED BY (bucket(2, c2)) "
+ "TBLPROPERTIES ('%s' '%s')",
tableName,
TableProperties.WRITE_DISTRIBUTION_MODE,
TableProperties.WRITE_DISTRIBUTION_MODE_NONE);
}

private void insertData(int filesCount) {
insertData(tableName, filesCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,17 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.NamedReference;
import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.ExtendedParser;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
import org.apache.spark.sql.execution.datasources.SparkExpressionConverter;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
Expand Down Expand Up @@ -103,8 +100,6 @@ public InternalRow[] call(InternalRow args) {
return modifyIcebergTable(
tableIdent,
table -> {
String quotedFullIdentifier =
Spark3Util.quotedFullIdentifier(tableCatalog().name(), tableIdent);
RewriteDataFiles action = actions().rewriteDataFiles(table);

String strategy = args.isNullAt(1) ? null : args.getString(1);
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -120,7 +115,7 @@ public InternalRow[] call(InternalRow args) {

String where = args.isNullAt(4) ? null : args.getString(4);

action = checkAndApplyFilter(action, where, quotedFullIdentifier);
action = checkAndApplyFilter(action, where, tableIdent);

RewriteDataFiles.Result result = action.execute();

Expand All @@ -129,15 +124,10 @@ public InternalRow[] call(InternalRow args) {
}

private RewriteDataFiles checkAndApplyFilter(
RewriteDataFiles action, String where, String tableName) {
RewriteDataFiles action, String where, Identifier ident) {
if (where != null) {
try {
Expression expression =
SparkExpressionConverter.collectResolvedSparkExpression(spark(), tableName, where);
return action.filter(SparkExpressionConverter.convertToIcebergExpression(expression));
} catch (AnalysisException e) {
throw new IllegalArgumentException("Cannot parse predicates in where option: " + where, e);
}
Expression expression = filterExpression(ident, where);
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
return action.filter(expression);
}
return action;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.spark.sql.execution.datasources

import org.apache.iceberg.spark.SparkFilters
import org.apache.iceberg.spark.SparkV2Filters
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Attribute
Expand All @@ -28,14 +28,15 @@ import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy

object SparkExpressionConverter {

def convertToIcebergExpression(sparkExpression: Expression): org.apache.iceberg.expressions.Expression = {
// Currently, it is a double conversion as we are converting Spark expression to Spark filter
// and then converting Spark filter to Iceberg expression.
// But these two conversions already exist and well tested. So, we are going with this approach.
SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression, supportNestedPredicatePushdown = true).get)
SparkV2Filters.convert(DataSourceV2Strategy.translateFilterV2(sparkExpression).get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR either. The get here indeed is not a good idea, because the expression could fail to translate and the error message is not valuable. I have opened #8394 for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there anything we have to worry about here in moving from SparkFilters to SparkV2Filters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @ConeyLiu , I will rebase my change if 8394 gets merged first.
Also added some comments where we are now convert Spark catalyst expression to Predicate instead of spark source filter. I ran all the unit tests to make sure old filter are working as expected.

}

@throws[AnalysisException]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,31 @@ public void testBinPackWithFilter() {
assertEquals("Rows must match", expectedRecords, actualRecords);
}

@Test
public void testBinPackWithFilterOnBucketExpression() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("c3", 2).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use the test utils class SystemFunctionPushDownHelper to build the table and data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Coney, your test utils class is super helpful. However I realized this SparkAction tests was assumed to use hadoop catalog so the table creation is a bit different as it's by table location https://github.com/apache/iceberg/blob/master/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java#L1563. But I opted to use your SystemFunctionPushDownHelper in TestRewriteDataFilesProcedure.

Table table = TABLES.create(SCHEMA, spec, Maps.newHashMap(), tableLocation);

insertData(10, 2);

shouldHaveFiles(table, 4);
List<Object[]> expectedRecords = currentData();
long dataSizeBefore = testDataSize(table);

Expression expression = Expressions.equal(Expressions.bucket("c3", 2), 0);

Result result = basicRewrite(table).filter(expression).execute();

Assert.assertEquals("Action should rewrite 2 data files", 2, result.rewrittenDataFilesCount());
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
Assert.assertEquals("Action should add 1 data file", 1, result.addedDataFilesCount());
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);

shouldHaveFiles(table, 3);

List<Object[]> actualRecords = currentData();
assertEquals("Rows must match", expectedRecords, actualRecords);
}

@Test
public void testBinPackAfterPartitionChange() {
Table table = createTable();
Expand Down Expand Up @@ -260,7 +285,7 @@ public void testBinPackAfterPartitionChange() {
}

@Test
public void testBinPackWithDeletes() throws Exception {
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
public void testBinPackWithDeletes() {
Table table = createTablePartitioned(4, 2);
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
shouldHaveFiles(table, 8);
Expand Down Expand Up @@ -1606,6 +1631,20 @@ private Table createTypeTestTable() {
return table;
}

private void insertData(int recordCount, int numDataFiles) {
dramaticlly marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < numDataFiles; i++) {
spark
.range(recordCount)
.withColumnRenamed("id", "c1")
.withColumn("c2", expr("CAST(c1 AS STRING)"))
.withColumn("c3", expr("CAST(c1 AS STRING)"))
.write()
.format("iceberg")
.mode("append")
.save(tableLocation);
}
}

protected int averageFileSize(Table table) {
table.refresh();
return (int)
Expand Down