-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.4: Add write options to override the compression properties of the table #8313
Conversation
@chenjunjiedada @ConeyLiu Could you help me review this pr? |
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
Show resolved
Hide resolved
Thanks for your review. I have added the write option document for Spark. |
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
Show resolved
Hide resolved
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
Show resolved
Hide resolved
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
import org.junit.runners.Parameterized; | ||
|
||
@RunWith(Parameterized.class) | ||
public class TestCompressionSettings { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The project is currently in the process of moving away from JUnit4, meaning that new tests should be written purely in Junit5 + using AssertJ assertions. See also https://iceberg.apache.org/contribute/#testing for some additional info
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review. I got it. I have migrated the test to the Junit 5 framework.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to add some new ut for SparkPositionDeltaWrite
and SparkPositionDeletesRewrite.java
. I need to extend the SparkCatalogTestBase
. The SparkCatalogTestBase
still use JUnit4, I will change ut to Junit4 to avoid conflicts.
@@ -63,4 +63,9 @@ private SparkSQLProperties() {} | |||
// Controls the WAP branch used for write-audit-publish workflow. | |||
// When set, new snapshots will be committed to this branch. | |||
public static final String WAP_BRANCH = "spark.wap.branch"; | |||
|
|||
// Controls write compress options | |||
public static final String COMPRESSION_CODEC = "spark.sql.iceberg.write.compression-codec"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We rarely use extra write
and read
namespaces in SQL properties, the only exception is preserving grouping as it was not clear otherwise. What about dropping write
from all names?
spark.sql.iceberg.compression-codec
spark.sql.iceberg.compression-level
spark.sql.iceberg.compression-strategy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -76,6 +79,12 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> { | |||
this.dataSparkType = dataSparkType; | |||
this.equalityDeleteSparkType = equalityDeleteSparkType; | |||
this.positionDeleteSparkType = positionDeleteSparkType; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: What about inlining if it fits on one line?
...
this.equalityDeleteSparkType = equalityDeleteSparkType;
this.positionDeleteSparkType = positionDeleteSparkType;
this.writeProperties = writeProperties != null ? writeProperties : ImmutableMap.of();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
.isEqualToIgnoringCase(properties.get(COMPRESSION_CODEC)); | ||
} | ||
|
||
if (PARQUET.equals(format)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find that we can't set table property for the action RewritePositionDeletes
. The action RewritePositionDeletes
will always use parquet as the format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems like an issue worth looking into after this change.
cc @szehon-ho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will investigate it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find the root cause of this case.
The class BaseMetadataTable
override the method properties
@Override
public Map<String, String> properties() {
return ImmutableMap.of();
}
Option 1:
we can modify the class BaseMetadataTable
.
#8428
Option2:
We can modify the class PositionDeletesTable
#8429
I prefer option 1. I feel the properties of meta data table should respect the ones of base table. We also don't have ways to modify the properites of meta data table. @aokolnychyi @szehon-ho WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, yea I am ok with option 1 as well, @aokolnychyi what do you think?
@aokolnychyi I have addressed the comments. Could you take another look if you have time? |
@nastra @aokolnychyi Gently ping. Sorry to bother you. Could you have another look if you have time? |
Let me take another look. |
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/WritePropertiesUtil.java
Outdated
Show resolved
Hide resolved
This change looks good to me. I am not sure about adding a new utility class vs just using |
The second solution seems more elegant. Thanks for your suggestion. I have refactored the code to follow the suggestion. |
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
Outdated
Show resolved
Hide resolved
writeProperties.put(ORC_COMPRESSION_STRATEGY, orcCompressionStrategy()); | ||
break; | ||
default: | ||
throw new IllegalArgumentException(String.format("Unknown file format %s", format)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether it is worth failing the query. Maybe, just do nothing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refer to the Flink's implement #6049. Flink choose to fail the query. It also has benefits if we choose to fail the query. We can find the wrong config option more easily.
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Left a few suggestions.
@@ -104,6 +104,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde | |||
private final Context context; | |||
|
|||
private boolean cleanupOnAbort = true; | |||
private final Map<String, String> writeProperties; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: This variable should be co-located with other final variables above. It is super minor but let's do this in a follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I have raised a follow-up pr #8421. The pr has been merged.
Thanks, @jerqi! Thanks for reviewing, @ConeyLiu @chenjunjiedada. |
Thanks @aokolnychyi ! Code master. Thanks @ConeyLiu @chenjunjiedada @nastra , too. |
writeProperties.put(PARQUET_COMPRESSION, parquetCompressionCodec()); | ||
String parquetCompressionLevel = parquetCompressionLevel(); | ||
if (parquetCompressionLevel != null) { | ||
writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for this mistake. I will check how to valid the correctness of the case.
If we don't set the delete compression codec, we will reuse data compression codec. So the test case passed.
I have raised a pr #8438 to fix this issue.
What changes were proposed in this pull request?
Flink has supported to use write option to override the compression properties of the table.
I refer to the pull request #6049 and then I add write options to override the compression properties of the table for Spark 3.4.
Why are the changes needed?
First, there exist some old tables using
gzip
compression codec in the production environment. If we usezstd
compression codec to rewrite the data, we can reduce the data volume and reduce the cost of storage.Second, this pr is also meaning after we choose
zstd
as default compression codec. Because we can choose different compression levels when we write the Iceberg data and rewrite the Iceberg data if we have this pr.Does this PR introduce any user-facing change?
Yes.
This pr introduces the config option
compression-codec
,compression-level
andcompression-strategy
. (document added)This pr introduces the Spark config properties
spark.sql.iceberg.write.compression-codec
,spark.sql.iceberg.write.compression-level
andspark.sql.iceberg.write.compression-strategy
. (I will add the document after this pr is merged.)How was this patch tested?
Add a new ut and manual verification.
I use the ut to verify that the compression codec is correct.
I verify the compression config option is correct by hand.