Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.3, 3.4: use a deterministic where condition to make rewrite_data_files… #6760

Merged
merged 46 commits into from
May 20, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
22b8722
spark: use a deterministic where condition to make rewrite_data_files…
ludlows Feb 7, 2023
f579cc7
Spark 3.3: Let rewrite_data_files exit without exceptions if where co…
ludlows Feb 12, 2023
6da89af
Merge branch 'master' into rewritr_data_file-exit-where-false
ludlows Feb 26, 2023
8e09dac
add a test case for the PR #6760 (use a deterministic where condition…
ludlows Mar 5, 2023
f67abe2
modified the PR #6760 by changing return type of function collectReso…
ludlows Mar 11, 2023
a27bf48
using the right indentation to pass the coding style check
ludlows Mar 11, 2023
1aa2838
add '{}'s for the 'if' statement
ludlows Mar 11, 2023
f31e4e3
change the indentation for the code style check in RewriteDataFilesPr…
ludlows Mar 11, 2023
304e52a
remove AnalysisException annotation
ludlows Mar 11, 2023
99f91c8
remove try-catch block for AnalysisException since it is not raised b…
ludlows Mar 11, 2023
54ebdc5
remove unused import statement
ludlows Mar 11, 2023
38145a6
update to pass java code style check
ludlows Mar 11, 2023
ed35030
provide helper function `filter` to get spark expression
ludlows Mar 11, 2023
24bdd88
leave a more precise comment to explain why we can terminate immediat…
ludlows Mar 12, 2023
4198b5f
function used to check if the spark expression is a deterministic tru…
ludlows Mar 15, 2023
e98a6e0
adapt procedure once we can distinguish the deterministic false and true
ludlows Mar 15, 2023
b8770b1
add 2 test cases for the rewrite_data_files procedure
ludlows Mar 15, 2023
22f0e83
Merge branch 'master' of https://github.com/ludlows/iceberg
Mar 16, 2023
5570ed0
change to pass coding style check
Mar 16, 2023
6d5b660
change CRLF to LF
Mar 16, 2023
7d446e6
raise AnalysisException in the test case
ludlows Mar 17, 2023
47a8359
raise IllegalArgumentException in the test case
ludlows Mar 17, 2023
e865f06
Avoid block imports in Scala
Mar 17, 2023
dc45f3d
coding style check
Mar 17, 2023
9a5d87d
remove REMOVED_FILE_SIZE_PROP check
Mar 18, 2023
24aa16a
Merge branch 'apache:master' into master
ludlows Mar 18, 2023
a298b97
Merge branch 'apache:master' into master
ludlows Mar 21, 2023
26c8aec
Merge branch 'apache:master' into master
ludlows Apr 5, 2023
d433101
use optimizedLogicalPlan.containsChild to check if a sparkExpression …
ludlows Apr 5, 2023
5faac1e
use collectResolvedIcebergExpression directly to get less changes
ludlows Apr 8, 2023
4a026d3
using match case
ludlows Apr 16, 2023
b600d11
collectSparkExpressionOption
ludlows Apr 16, 2023
5bbb179
change the way to distinguish alwaysTrue, alwaysFalse and undetermined
ludlows May 5, 2023
31588fe
Merge branch 'master' into rewritr_data_file-exit-where-false
ludlows May 5, 2023
0a4d2c2
verify rewritten bytes
ludlows May 6, 2023
2451995
format fix
ludlows May 6, 2023
5fc3614
Merge branch 'master' into rewritr_data_file-exit-where-false
ludlows May 6, 2023
fa865e8
we do not verify rewritten bytes since the number of rewritten files …
ludlows May 6, 2023
3b0c395
collectResolvedSparkExpression
ludlows May 12, 2023
bf413e1
format fix
ludlows May 12, 2023
0408626
remove checks for REMOVED_FILE_SIZE_PROP
ludlows May 16, 2023
e69ebdb
Merge branch 'apache:master' into master
ludlows May 17, 2023
c006716
remove unnecessary import
ludlows May 17, 2023
d500fe4
spark 3.4 implementation
ludlows May 17, 2023
02e2f76
format fix in TestRewriteDataFilesProcedure.java
ludlows May 17, 2023
63dfe2f
result output of rewriteDataFiles procedure has 4 elements in spark 3.4
ludlows May 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.spark.sql.execution.datasources

import org.apache.iceberg.expressions.Expressions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this un-intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that we do not need this import statement. so i remove it here.

import org.apache.iceberg.spark.SparkFilters
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,49 @@ public void testRewriteDataFilesWithFilter() {
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithDeterministicTrueFilter() {
createTable();
// create 10 files under non-partitioned table
insertData(10);
List<Object[]> expectedRecords = currentData();
// select all 10 files for compaction
List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files(table => '%s', where => '1=1')",
catalogName, tableIdent);
assertEquals(
"Action should rewrite 10 data files and add 1 data files",
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithDeterministicFalseFilter() {
createTable();
// create 10 files under non-partitioned table
insertData(10);
List<Object[]> expectedRecords = currentData();
// select no files for compaction
List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files(table => '%s', where => '0=1')",
catalogName, tableIdent);
assertEquals(
"Action should rewrite 0 data files and add 0 data files",
row(0, 0),
Arrays.copyOf(output.get(0), 2));
List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testRewriteDataFilesWithFilterOnPartitionTable() {
createPartitionTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation

object SparkExpressionConverter {

Expand All @@ -44,6 +46,8 @@ object SparkExpressionConverter {
val optimizedLogicalPlan = session.sessionState.executePlan(filter).optimizedPlan
optimizedLogicalPlan.collectFirst {
case filter: Filter => filter.condition
case dummyRelation: DummyRelation => Literal.TrueLiteral
case localRelation: LocalRelation => Literal.FalseLiteral
}.getOrElse(throw new AnalysisException("Failed to find filter expression"))
}

Expand Down