Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Push down system functions by V2 filters for rewriting DataFiles and PositionDeleteFiles #8560

Merged
merged 3 commits into from
Sep 18, 2023

Conversation

dramaticlly
Copy link
Contributor

@dramaticlly dramaticlly commented Sep 14, 2023

Inspired by #7886, try to push down v2 filters in where clause used by both rewriteDataFiles and rewritePositionDeleteFiles

  • make sure the bucket transform works in the action
  • Make sure the conversion works in the procedure

CC @RussellSpitzer @aokolnychyi @ConeyLiu @rdblue

Expect following query to work after this change

spark.sql(
"""
CALL iceberg.system.rewrite_data_files(
  table => 'foo.bar', 
  where => 'iceberg.system.bucket(4, url) = 0')
"""
)


object SparkExpressionConverter {

def convertToIcebergExpression(sparkExpression: Expression): org.apache.iceberg.expressions.Expression = {
// Currently, it is a double conversion as we are converting Spark expression to Spark filter
// and then converting Spark filter to Iceberg expression.
// But these two conversions already exist and well tested. So, we are going with this approach.
SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression, supportNestedPredicatePushdown = true).get)
SparkV2Filters.convert(DataSourceV2Strategy.translateFilterV2(sparkExpression).get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR either. The get here indeed is not a good idea, because the expression could fail to translate and the error message is not valuable. I have opened #8394 for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there anything we have to worry about here in moving from SparkFilters to SparkV2Filters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @ConeyLiu , I will rebase my change if 8394 gets merged first.
Also added some comments where we are now convert Spark catalyst expression to Predicate instead of spark source filter. I ran all the unit tests to make sure old filter are working as expected.

@@ -223,6 +223,31 @@ public void testBinPackWithFilter() {
assertEquals("Rows must match", expectedRecords, actualRecords);
}

@Test
public void testBinPackWithFilterOnBucketExpression() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("c3", 2).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use the test utils class SystemFunctionPushDownHelper to build the table and data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Coney, your test utils class is super helpful. However I realized this SparkAction tests was assumed to use hadoop catalog so the table creation is a bit different as it's by table location https://github.com/apache/iceberg/blob/master/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java#L1563. But I opted to use your SystemFunctionPushDownHelper in TestRewriteDataFilesProcedure.

@@ -480,7 +511,10 @@ public void testRewriteDataFilesWithAllPossibleFilters() {
sql(
"CALL %s.system.rewrite_data_files(table => '%s'," + " where => 'c2 like \"%s\"')",
catalogName, tableIdent, "car%");

// StringStartsWith
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose here? It seems to just copy from L510-L513.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this meant to be a bucket transform call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah good catch. Initially I was about to add filter here for bucket transform (forgot to change) but I end up create a new method to test all V2Filters can be evaluated without exception.

@ConeyLiu
Copy link
Contributor

Thanks @dramaticlly for this contribution. This is on my working list but happy to see it finished in advance. I have some minor comments

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just have some minor questions, this looks solid to me though. Thanks for adding all the tests as well!

Copy link
Contributor

@ConeyLiu ConeyLiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@RussellSpitzer RussellSpitzer merged commit 2817dd4 into apache:master Sep 18, 2023
31 checks passed
@RussellSpitzer
Copy link
Member

Thanks @dramaticlly for the PR, and thank you @ConeyLiu for the Review! Merged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants