-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core, Data, Spark 3.5: Support file and partition delete granularity #9384
Conversation
@@ -334,6 +335,9 @@ private TableProperties() {} | |||
public static final String MAX_REF_AGE_MS = "history.expire.max-ref-age-ms"; | |||
public static final long MAX_REF_AGE_MS_DEFAULT = Long.MAX_VALUE; | |||
|
|||
public static final String DELETE_GRANULARITY = "write.delete.granularity"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still debating the property name. As it stands today, it will be applicable only to position deletes but I am not sure the name has to reflect it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we would probably want to always use the file granularity for Flink position deletes to solve compaction issues. This property becomes more like a recommendation then.
Any feedback is appreciated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just write.position-delete.granularity
? I prefer to use a more precise name and limit the scope of its usage.
A while ago I encountered an issue about adjusting the row-group size of Parquet position delete files.
I want to adjust the default row-group size of Parquet pos delete of the tables that I manage to speed up queries (more details are in issue #9149), however I found the parameter write.delete.parquet.row-group-size-bytes
that controls the row-group size of Parquet pos delete also controls the row-group size of equality delete files. But obviously the row-group sizes applicable to these two type of delete files are not the same.
Because we also use equality delete when the data size is small, I cannot directly set a default value of write.delete.parquet.row-group-size-bytes
for new tables. I can only adjust write.delete.parquet.row-group-size-bytes
according to the specific use of each table, which is inconvenient.
In fact, I think it is not appropriate to use one parameter to control the row-group size of both position delete files and equality delete files, so I created #9177 to add a separate parameter for the position delete file that only writes the file_path
and pos
columns.
Back to this, IIUC, If we later add a grouping granularity for equality delete, since position delete and equality delete have different characteristics, they will most likely apply different grouping granularity. So I think we'd better make the distinction right from the start, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest, I doubt we will ever support this property for equality deletes.
In general, I do get that we may want to configure position and equality deletes differently. We can explore adding an extra namespace. I am still not sure this use case falls into that bucket.
@rdblue @RussellSpitzer @zhongyujiang, thoughts? Do we want a prefix for this config to make it explicit that it only applies to position deletes? Currently, I only note that in the docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This option makes no sense for equality deletes because they aren't targeted at a single file, so I agree that we won't support it for equality. This is also mostly advisory. It is unlikely that we will support it in Flink and will instead always use file-level granularity. Maybe we won't even want to support this in the long term, if we decide that Spark performs better with file granularity at all times.
I guess where I'm at for this is that I would probably not worry much about it -- but also not add it to documentation since we will probably not want people setting it themselves. I think I'd leave it as write.delete.granularity
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea not to document it for now is reasonable given that it acts like a recommendation and we are not sure we want to support it in the future. Let's keep the name as is then.
Adding a way to configure position and equality deletes separately is another discussion.
writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition); | ||
return new SortingPositionOnlyDeleteWriter<>(delegate); | ||
return new SortingPositionOnlyDeleteWriter<>( | ||
() -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like how this part is formatted but I don't have a better way. Ideas welcome.
* Despite the chosen granularity, regular delete compaction remains necessary. It is also possible | ||
* to use one granularity for ingestion and another one for table maintenance. | ||
*/ | ||
public enum DeleteGranularity { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the granularity currently ? file ? what is the impact to flink writers ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current behavior is partition granularity. The new default will match the existing behavior.
There is no immediate impact on Flink writes. Equality deletes can only be written with partition granularity at the moment. That said, we should make Flink write position deletes with file granularity no matter what to solve the concurrent data compaction issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flink uses the old writer API right now. We will follow up to change that.
* by file and position as required by the spec. If there is no external process to order the | ||
* records, consider using {@link SortingPositionOnlyDeleteWriter} instead. | ||
*/ | ||
public class TargetedPositionDeleteWriter<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataFileTargetedPositionDeleteWriter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to prefer shorter names if possible given our new 100 line length limit. Do you think this name will be easier to understand?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the naming needs to somehow reflect the granularity which will make things more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, let me think a bit more about this. If you have more ideas, please share them as well!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ClusteredFilePosDeleteWriter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe Clustered
is something we use for PartitioningWriter
implementations to indicate that incoming records are grouped by spec and partition. If we use that prefix in this context, it may be a bit misleading.
I renamed this class to FileScopedPositionDeleteWriter
. Let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think FileScoped or if you want a whole new name PerFilePostionDeleteFileWriter?
return true; | ||
} | ||
|
||
for (int index = s1Length - 1; index >= 0; index--) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comparing the paths from the end as the prefix is usually the same and is long.
@@ -708,4 +709,15 @@ private long sparkAdvisoryPartitionSize() { | |||
private double shuffleCompressionRatio(FileFormat outputFileFormat, String outputCodec) { | |||
return SparkCompressionUtil.shuffleCompressionRatio(spark, outputFileFormat, outputCodec); | |||
} | |||
|
|||
public DeleteGranularity deleteGranularity() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not add a SQL property cause I am not sure it makes sense at the session level.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable to me. Config properties are more surface area to support.
@@ -334,6 +335,9 @@ private TableProperties() {} | |||
public static final String MAX_REF_AGE_MS = "history.expire.max-ref-age-ms"; | |||
public static final long MAX_REF_AGE_MS_DEFAULT = Long.MAX_VALUE; | |||
|
|||
public static final String DELETE_GRANULARITY = "write.delete.granularity"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a document for this table property?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do, same for the write option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted based on this discussion.
One question: |
@@ -334,6 +335,9 @@ private TableProperties() {} | |||
public static final String MAX_REF_AGE_MS = "history.expire.max-ref-age-ms"; | |||
public static final long MAX_REF_AGE_MS_DEFAULT = Long.MAX_VALUE; | |||
|
|||
public static final String DELETE_GRANULARITY = "write.delete.granularity"; | |||
public static final String DELETE_GRANULARITY_DEFAULT = DeleteGranularity.PARTITION.toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I'd use a string so that we are forced to continue supporting it, like the other defaults. This would technically allow someone to change PARTITION
in the code without breaking although it would change the property value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see you override toString
so it's probably fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started with a string constant but then saw what we did for RowLevelOperationMode
and decided to follow that for consistency.
@@ -60,7 +72,7 @@ public void write(PositionDelete<T> positionDelete) { | |||
|
|||
@Override | |||
public long length() { | |||
return writer.length(); | |||
return result != null ? result.totalFileSizeInBytes() : 0L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you note this behavior in the Javadoc? I think it is correct to only produce the size once it has been closed an produces a result, since that would avoid any problem from wrapping this in a RollingFileWriter
. But it is still unexpected that this isn't accurate during the write.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll probably switch to not implementing it at all, just like we do in the other writer.
List<CharSequence> paths = Lists.newArrayList(positionsByPath.keySet()); | ||
paths.sort(Comparators.charSequences()); | ||
return paths; | ||
private Iterable<CharSequence> sort(Iterable<CharSequence> paths) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about using Collection
for the incoming so you can test its size? Then you could check whether the size is 1 and avoid copying the list.
} | ||
} | ||
|
||
private static boolean unequal(CharSequence s1, CharSequence s2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to put this in CharSeqComparator
for reuse. I think it is fine that it doesn't worry about high surrogates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our CharSeqComparator
is private. I've put this into a new utility class.
|
||
@Override | ||
public long length() { | ||
throw new UnsupportedOperationException(getClass().getName() + " does not implement length"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this doesn't need to be implemented, should we avoid implementing it in SortingPositionOnlyDeleteWriter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need cause this writer wraps the rolling writer, not the other way around.
this.deleteFiles = Lists.newArrayList(); | ||
this.referencedDataFiles = CharSequenceSet.empty(); | ||
} | ||
|
||
@Override | ||
protected FileWriter<PositionDelete<T>, DeleteWriteResult> newWriter( | ||
PartitionSpec spec, StructLike partition) { | ||
switch (granularity) { | ||
case FILE: | ||
return new TargetedPositionDeleteWriter<>(() -> newRollingWriter(spec, partition)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't need to use rolling writers, right? The sorting writer won't ever roll because its length is 0L
until it is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can actually roll correctly here because this is the "clustered" path. We are not going to use the sorting writer and will not buffer deletes. We can also roll correctly in the "fanout" path cause the sorting writer acts as a wrapper on top of the rolling writer.
Looks good overall. Thanks for adding this! |
/** | ||
* An enum that represents the granularity of deletes. | ||
* | ||
* <p>Under partition granularity, delete writers are allowed to group deletes for different data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"are allowed"? Perhaps maybe we should say something like "are directed to group deletes". I think the text in this doc goes a bit back and forth between saying that the delete writers will do something and the delete writers may do something.
I think it may also help to kind of express these as (Many data files -> One Delete file) and (One data file -> One Delete File) or something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
* | ||
* <p>Under partition granularity, delete writers are allowed to group deletes for different data | ||
* files into one delete file. This strategy tends to reduce the total number of delete files in the | ||
* table. However, it may lead to the assignment of irrelevant deletes to some data files during the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential Rewrite? Trying to make this a but more directly worded
However, a scan for a single data file will require reading delete information for multiple data files in the partition even if those other files are not required for the scan. This information will be ignored during the reads but reading this extra delete information will cause overhead. The overhead can potentially be mitigated via delete file caching (link here?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it, incorporated.
* | ||
* <p>Under file granularity, delete writers always organize deletes by their target data file, | ||
* creating separate delete files for each referenced data file. This strategy ensures the job | ||
* planning does not assign irrelevant deletes to data files. However, it also increases the total |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"to data files which means no possibly extranousious delete information will be read unlike in partition
granularity"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rewrote this part as well.
* <p>Currently, this configuration is only applicable to position deletes. | ||
* | ||
* <p>Each granularity has its own benefits and drawbacks and should be picked based on a use case. | ||
* Despite the chosen granularity, regular delete compaction remains necessary. It is also possible |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Despite -> Regardless of the
or maybe
"Regular delete compaction is still required regardless of which granularity is chosen."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I think we can tighten up the Java doc a bit but I think all the code and tests are good
ad37f18
to
fbd206c
Compare
@jerqi, yes, it will. There is a new test in |
Thanks for reviewing, @jerqi @zinking @zhongyujiang @rdblue @RussellSpitzer! |
This change backports #9384 to Spark 3.4.
This change backports apache#9384 to Spark 3.4.
This PR adds support for file and partition delete granularity, allowing users to pick between the two.
Under partition granularity, delete writers are allowed to group deletes for different data files into one delete file. This strategy tends to reduce the total number of delete files in the table. However, it may lead to the assignment of irrelevant deletes to some data files during the job planning. All irrelevant deletes are filtered by readers but add extra overhead, which can be mitigated via caching.
Under file granularity, delete writers always organize deletes by their target data file, creating separate delete files for each referenced data file. This strategy ensures the job planning does not assign irrelevant deletes to data files. However, it also increases the total number of delete files in the table and may require a more aggressive approach for delete file compaction.
Currently, this configuration is only applicable to position deletes.
Each granularity has its own benefits and drawbacks and should be picked based on a use case. Despite the chosen granularity, regular delete compaction remains necessary. It is also possible to use one granularity for ingestion and another one for table maintenance.
After
Before