Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.3, 3.4: use a deterministic where condition to make rewrite_data_files… #6760

Merged
merged 46 commits into from
May 20, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
22b8722
spark: use a deterministic where condition to make rewrite_data_files…
ludlows Feb 7, 2023
f579cc7
Spark 3.3: Let rewrite_data_files exit without exceptions if where co…
ludlows Feb 12, 2023
6da89af
Merge branch 'master' into rewritr_data_file-exit-where-false
ludlows Feb 26, 2023
8e09dac
add a test case for the PR #6760 (use a deterministic where condition…
ludlows Mar 5, 2023
f67abe2
modified the PR #6760 by changing return type of function collectReso…
ludlows Mar 11, 2023
a27bf48
using the right indentation to pass the coding style check
ludlows Mar 11, 2023
1aa2838
add '{}'s for the 'if' statement
ludlows Mar 11, 2023
f31e4e3
change the indentation for the code style check in RewriteDataFilesPr…
ludlows Mar 11, 2023
304e52a
remove AnalysisException annotation
ludlows Mar 11, 2023
99f91c8
remove try-catch block for AnalysisException since it is not raised b…
ludlows Mar 11, 2023
54ebdc5
remove unused import statement
ludlows Mar 11, 2023
38145a6
update to pass java code style check
ludlows Mar 11, 2023
ed35030
provide helper function `filter` to get spark expression
ludlows Mar 11, 2023
24bdd88
leave a more precise comment to explain why we can terminate immediat…
ludlows Mar 12, 2023
4198b5f
function used to check if the spark expression is a deterministic tru…
ludlows Mar 15, 2023
e98a6e0
adapt procedure once we can distinguish the deterministic false and true
ludlows Mar 15, 2023
b8770b1
add 2 test cases for the rewrite_data_files procedure
ludlows Mar 15, 2023
22f0e83
Merge branch 'master' of https://github.com/ludlows/iceberg
Mar 16, 2023
5570ed0
change to pass coding style check
Mar 16, 2023
6d5b660
change CRLF to LF
Mar 16, 2023
7d446e6
raise AnalysisException in the test case
ludlows Mar 17, 2023
47a8359
raise IllegalArgumentException in the test case
ludlows Mar 17, 2023
e865f06
Avoid block imports in Scala
Mar 17, 2023
dc45f3d
coding style check
Mar 17, 2023
9a5d87d
remove REMOVED_FILE_SIZE_PROP check
Mar 18, 2023
24aa16a
Merge branch 'apache:master' into master
ludlows Mar 18, 2023
a298b97
Merge branch 'apache:master' into master
ludlows Mar 21, 2023
26c8aec
Merge branch 'apache:master' into master
ludlows Apr 5, 2023
d433101
use optimizedLogicalPlan.containsChild to check if a sparkExpression …
ludlows Apr 5, 2023
5faac1e
use collectResolvedIcebergExpression directly to get less changes
ludlows Apr 8, 2023
4a026d3
using match case
ludlows Apr 16, 2023
b600d11
collectSparkExpressionOption
ludlows Apr 16, 2023
5bbb179
change the way to distinguish alwaysTrue, alwaysFalse and undetermined
ludlows May 5, 2023
31588fe
Merge branch 'master' into rewritr_data_file-exit-where-false
ludlows May 5, 2023
0a4d2c2
verify rewritten bytes
ludlows May 6, 2023
2451995
format fix
ludlows May 6, 2023
5fc3614
Merge branch 'master' into rewritr_data_file-exit-where-false
ludlows May 6, 2023
fa865e8
we do not verify rewritten bytes since the number of rewritten files …
ludlows May 6, 2023
3b0c395
collectResolvedSparkExpression
ludlows May 12, 2023
bf413e1
format fix
ludlows May 12, 2023
0408626
remove checks for REMOVED_FILE_SIZE_PROP
ludlows May 16, 2023
e69ebdb
Merge branch 'apache:master' into master
ludlows May 17, 2023
c006716
remove unnecessary import
ludlows May 17, 2023
d500fe4
spark 3.4 implementation
ludlows May 17, 2023
02e2f76
format fix in TestRewriteDataFilesProcedure.java
ludlows May 17, 2023
63dfe2f
result output of rewriteDataFiles procedure has 4 elements in spark 3.4
ludlows May 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,26 @@ public void testRewriteDataFilesWithFilter() {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithFalseFilter() {
createTable();
List<Object[]> expectedRecords = currentData();
// select only 0 files for compaction
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: select no files..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. let me fix it.

List<Object[]> output = sql(
"CALL %s.system.rewrite_data_files(table => '%s', where => '0=1')", catalogName, tableIdent);
assertEquals(
"Action should rewrite 0 data files and add 0 data files",
row(0, 0),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems no need for this comment, as we don't assert for bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. let me fix it.

assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithFilterOnPartitionTable() {
createPartitionTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.NamedReference;
import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -42,6 +43,7 @@
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Option;
import scala.runtime.BoxedUnit;

/**
Expand Down Expand Up @@ -117,7 +119,6 @@ public InternalRow[] call(InternalRow args) {
}

String where = args.isNullAt(4) ? null : args.getString(4);

action = checkAndApplyFilter(action, where, quotedFullIdentifier);

RewriteDataFiles.Result result = action.execute();
Expand All @@ -130,9 +131,10 @@ private RewriteDataFiles checkAndApplyFilter(
RewriteDataFiles action, String where, String tableName) {
if (where != null) {
try {
Expression expression =
SparkExpressionConverter.collectResolvedSparkExpression(spark(), tableName, where);
return action.filter(SparkExpressionConverter.convertToIcebergExpression(expression));
Option<Expression> expressionOption =
SparkExpressionConverter.collectResolvedSparkExpressionOption(spark(), tableName, where);
if (expressionOption.isEmpty()) return action.filter(Expressions.alwaysFalse());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'm not sure if checkstyle/spotless will fail here, but think we need extra newline in any case for the return.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. you are right. the style check failed here. i need to add {} for each if statement.

return action.filter(SparkExpressionConverter.convertToIcebergExpression(expressionOption.get()));
} catch (AnalysisException e) {
throw new IllegalArgumentException("Cannot parse predicates in where option: " + where);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ object SparkExpressionConverter {
}

@throws[AnalysisException]
ludlows marked this conversation as resolved.
Show resolved Hide resolved
def collectResolvedSparkExpression(session: SparkSession, tableName: String, where: String): Expression = {
def collectResolvedSparkExpressionOption(session: SparkSession,
tableName: String, where: String): Option[Expression] = {
val tableAttrs = session.table(tableName).queryExecution.analyzed.output
val unresolvedExpression = session.sessionState.sqlParser.parseExpression(where)
val filter = Filter(unresolvedExpression, DummyRelation(tableAttrs))
val optimizedLogicalPlan = session.sessionState.executePlan(filter).optimizedPlan
optimizedLogicalPlan.collectFirst {
case filter: Filter => filter.condition
}.getOrElse(throw new AnalysisException("Failed to find filter expression"))
case filter: Filter => Some(filter.condition)
Copy link
Contributor

@aokolnychyi aokolnychyi Mar 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be fair to assume we would get back an empty local table scan back if the condition is evaluated to false? If so, what about modifying the logic in collectFirst to find that and then return false?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we should have 3 branches:

case filter: Filter => filter.condition
case relation: DummyRelation => Literal.TrueLiteral
case relation: LocalRelation => Literal.FalseLiteral

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1st -> we have a real filter
2nd -> filter evaluates to true
3rd -> filter evaluates to false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @aokolnychyi , thanks for your suggestions.
I am trying to combine collectResolvedSparkExpressionOption and collectDeterministicSparkExpression together.
could you give more explanation about your suggestions?
thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ludlows, I think the underlying problem we are trying to solve is that the logic in the existing collectFirst is not exhaustive and does not cover cases when the condition evaluates to true or false. Instead of offering a new method to check if the condition evaluates to true or false, we should modify the existing method to take those scenarios into account.

I believe we only check this now:

case filter: Filter => filter.condition

I think it should be something like this instead:

case filter: Filter => filter.condition // the condition is non-trivial 
case relation: DummyRelation => Literal.TrueLiteral // the condition is always true and the filter was removed by the optimizer
case relation: LocalRelation => Literal.FalseLiteral // the condition is always false and the optimizer replaced the plan with an empty relation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @aokolnychyi comment is the right way:

We just need to modify the method : collectResolvedIcebergExpression , and add those extra scala pattern-matching that Anton showed.

Then, no need to modify the outside method. The convert will automatically convert those Spark true/false to Iceberg true/false.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ludlows , actually taking a look at this, why didn't we try just using @aokolnychyi 's code suggestion directly?

  def collectResolvedSparkExpression(session: SparkSession, tableName: String, where: String): Expression = {
    val tableAttrs = session.table(tableName).queryExecution.analyzed.output
    val unresolvedExpression = session.sessionState.sqlParser.parseExpression(where)
    val filter = Filter(unresolvedExpression, DummyRelation(tableAttrs))
    val optimizedLogicalPlan = session.sessionState.executePlan(filter).optimizedPlan
    optimizedLogicalPlan.collectFirst {
      case filter: Filter => filter.condition // the condition is non-trivial
      case relation: DummyRelation => Literal.TrueLiteral // the condition is always true and the filter was removed by the optimizer 
      case relation: LocalRelation => Literal.FalseLiteral // the condition is always false and the optimizer replaced the plan with an empty relation
    }.getOrElse(throw new AnalysisException("Failed to find filter expression"))
  }

It looks like it should work. ConstantFolding rule will probably get rid of the filter and make this match the second case (DummyRelation). And we have added test case for alwaysTrue and alwaysFalse to catch if we miss something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @szehon-ho, yes. @aokolnychyi and you are right. now the current version is using this method to distinguish alwaysTrue, alwaysFalse and undetermined. thanks for your explanation. as I remember, I didn't use this method previously since I didn't understand the behavior of collectFirst .

}.getOrElse(Option.empty)
}

case class DummyRelation(output: Seq[Attribute]) extends LeafNode
Expand Down