Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.3, 3.4: use a deterministic where condition to make rewrite_data_files… #6760

Merged
merged 46 commits into from
May 20, 2023
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
22b8722
spark: use a deterministic where condition to make rewrite_data_files…
ludlows Feb 7, 2023
f579cc7
Spark 3.3: Let rewrite_data_files exit without exceptions if where co…
ludlows Feb 12, 2023
6da89af
Merge branch 'master' into rewritr_data_file-exit-where-false
ludlows Feb 26, 2023
8e09dac
add a test case for the PR #6760 (use a deterministic where condition…
ludlows Mar 5, 2023
f67abe2
modified the PR #6760 by changing return type of function collectReso…
ludlows Mar 11, 2023
a27bf48
using the right indentation to pass the coding style check
ludlows Mar 11, 2023
1aa2838
add '{}'s for the 'if' statement
ludlows Mar 11, 2023
f31e4e3
change the indentation for the code style check in RewriteDataFilesPr…
ludlows Mar 11, 2023
304e52a
remove AnalysisException annotation
ludlows Mar 11, 2023
99f91c8
remove try-catch block for AnalysisException since it is not raised b…
ludlows Mar 11, 2023
54ebdc5
remove unused import statement
ludlows Mar 11, 2023
38145a6
update to pass java code style check
ludlows Mar 11, 2023
ed35030
provide helper function `filter` to get spark expression
ludlows Mar 11, 2023
24bdd88
leave a more precise comment to explain why we can terminate immediat…
ludlows Mar 12, 2023
4198b5f
function used to check if the spark expression is a deterministic tru…
ludlows Mar 15, 2023
e98a6e0
adapt procedure once we can distinguish the deterministic false and true
ludlows Mar 15, 2023
b8770b1
add 2 test cases for the rewrite_data_files procedure
ludlows Mar 15, 2023
22f0e83
Merge branch 'master' of https://github.com/ludlows/iceberg
Mar 16, 2023
5570ed0
change to pass coding style check
Mar 16, 2023
6d5b660
change CRLF to LF
Mar 16, 2023
7d446e6
raise AnalysisException in the test case
ludlows Mar 17, 2023
47a8359
raise IllegalArgumentException in the test case
ludlows Mar 17, 2023
e865f06
Avoid block imports in Scala
Mar 17, 2023
dc45f3d
coding style check
Mar 17, 2023
9a5d87d
remove REMOVED_FILE_SIZE_PROP check
Mar 18, 2023
24aa16a
Merge branch 'apache:master' into master
ludlows Mar 18, 2023
a298b97
Merge branch 'apache:master' into master
ludlows Mar 21, 2023
26c8aec
Merge branch 'apache:master' into master
ludlows Apr 5, 2023
d433101
use optimizedLogicalPlan.containsChild to check if a sparkExpression …
ludlows Apr 5, 2023
5faac1e
use collectResolvedIcebergExpression directly to get less changes
ludlows Apr 8, 2023
4a026d3
using match case
ludlows Apr 16, 2023
b600d11
collectSparkExpressionOption
ludlows Apr 16, 2023
5bbb179
change the way to distinguish alwaysTrue, alwaysFalse and undetermined
ludlows May 5, 2023
31588fe
Merge branch 'master' into rewritr_data_file-exit-where-false
ludlows May 5, 2023
0a4d2c2
verify rewritten bytes
ludlows May 6, 2023
2451995
format fix
ludlows May 6, 2023
5fc3614
Merge branch 'master' into rewritr_data_file-exit-where-false
ludlows May 6, 2023
fa865e8
we do not verify rewritten bytes since the number of rewritten files …
ludlows May 6, 2023
3b0c395
collectResolvedSparkExpression
ludlows May 12, 2023
bf413e1
format fix
ludlows May 12, 2023
0408626
remove checks for REMOVED_FILE_SIZE_PROP
ludlows May 16, 2023
e69ebdb
Merge branch 'apache:master' into master
ludlows May 17, 2023
c006716
remove unnecessary import
ludlows May 17, 2023
d500fe4
spark 3.4 implementation
ludlows May 17, 2023
02e2f76
format fix in TestRewriteDataFilesProcedure.java
ludlows May 17, 2023
63dfe2f
result output of rewriteDataFiles procedure has 4 elements in spark 3.4
ludlows May 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,51 @@ public void testRewriteDataFilesWithFilter() {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithDeterministicTrueFilter() {
ludlows marked this conversation as resolved.
Show resolved Hide resolved
createTable();
// create 10 files under non-partitioned table
insertData(10);
List<Object[]> expectedRecords = currentData();
// select all 10 files for compaction
List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files(table => '%s', where => '1=1')",
catalogName, tableIdent);
assertEquals(
"Action should rewrite 10 data files and add 1 data files",
row(10, 1),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: I think the test is more understandable if we just put row(10, 1, Long.valueOf(snapshotSummar().get(...)). I do realize its that way in other tests.

Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithDeterministicFalseFilter() {
createTable();
// create 10 files under non-partitioned table
insertData(10);
List<Object[]> expectedRecords = currentData();
// select only 0 files for compaction
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: select no files..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. let me fix it.

List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files(table => '%s', where => '0=1')",
catalogName, tableIdent);
assertEquals(
"Action should rewrite 0 data files and add 0 data files",
row(0, 0),
Arrays.copyOf(output.get(0), 2));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we get output=0,0,0? Can we just assert all 3 values instead of first two in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it is due to the first two values are of type of integer and the last one is of type of long.

Copy link
Collaborator

@szehon-ho szehon-ho May 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the assert fails? How about row(0,0,0L?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
yes. snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP) becomes null here. It makes the test falied.

// verify rewritten bytes separately
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems no need for this comment, as we don't assert for bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. let me fix it.

assertThat(output.get(0)).hasSize(3);
List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithFilterOnPartitionTable() {
createPartitionTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.NamedReference;
import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -33,7 +34,6 @@
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
Expand Down Expand Up @@ -131,8 +131,8 @@ private RewriteDataFiles checkAndApplyFilter(
if (where != null) {
try {
Expression expression =
SparkExpressionConverter.collectResolvedSparkExpression(spark(), tableName, where);
return action.filter(SparkExpressionConverter.convertToIcebergExpression(expression));
SparkExpressionConverter.collectResolvedIcebergExpression(spark(), tableName, where);
return action.filter(expression);
} catch (AnalysisException e) {
throw new IllegalArgumentException("Cannot parse predicates in where option: " + where);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

package org.apache.spark.sql.execution.datasources

import org.apache.iceberg.expressions.Expressions
import org.apache.iceberg.spark.SparkFilters
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation

object SparkExpressionConverter {

Expand All @@ -37,13 +39,17 @@ object SparkExpressionConverter {
}

@throws[AnalysisException]
ludlows marked this conversation as resolved.
Show resolved Hide resolved
def collectResolvedSparkExpression(session: SparkSession, tableName: String, where: String): Expression = {
def collectResolvedIcebergExpression(session: SparkSession,
tableName: String,
where: String): org.apache.iceberg.expressions.Expression = {
val tableAttrs = session.table(tableName).queryExecution.analyzed.output
val unresolvedExpression = session.sessionState.sqlParser.parseExpression(where)
val filter = Filter(unresolvedExpression, DummyRelation(tableAttrs))
val optimizedLogicalPlan = session.sessionState.executePlan(filter).optimizedPlan
optimizedLogicalPlan.collectFirst {
case filter: Filter => filter.condition
case filter: Filter => convertToIcebergExpression(filter.condition)
case dummyRelation: DummyRelation => Expressions.alwaysTrue()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For cleaner code, can we return Spark's Expression.TRUE, Expression.FALSE, and return the convertToIcebergExpression outside?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @szehon-ho , do we need an Expression.TRUE in spark expression? finally we only need an iceberg one. but it seems possible if we implement it in the following way:

    optimizedLogicalPlan.collectFirst {
      case filter: Filter =>filter.condition
      case dummyRelation: DummyRelation => session.sessionState.sqlParser.parseExpression("true")
      case localRelation: LocalRelation => session.sessionState.sqlParser.parseExpression("false")
    }.getOrElse(throw new AnalysisException("Failed to find filter expression"))

how do you think about it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from spark, Literal.TrueLiteral()

case localRelation: LocalRelation => Expressions.alwaysFalse()
}.getOrElse(throw new AnalysisException("Failed to find filter expression"))
}

Expand Down