Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5: Use fanout writers for unsorted tables by default #8621

Merged
merged 1 commit into from
Sep 26, 2023

Conversation

aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Sep 23, 2023

This PR enables fanout writers for unsorted tables by default to speed up writes. This should be safe as we distribute the incoming records using the transforms for all operations in partitioned tables (unless configured otherwise). The ability to skip local sorts reduces the probability of expensive spills. This becomes critical given that we consider requesting larger advisory partition sizes.

@github-actions github-actions bot added the spark label Sep 23, 2023
@@ -179,7 +179,7 @@ public boolean fanoutWriterEnabled() {
.booleanConf()
.option(SparkWriteOptions.FANOUT_ENABLED)
.tableProperty(TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED)
.defaultValue(TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT)
.defaultValue(true)
Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proposed approach simply enables fanout writers by default. Our distribution and ordering code checks if the table is unsorted and whether fanout writers are enabled to skip a local sort. Previously, we only skipped local sorts if fanout writers were enabled explicitly.

Even though we enable fanout writers by default, it does not mean they will be used. Our code later checks if the input is ordered. If yes, we will still use clustered writers cause we know the local sort includes partition columns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just change the TableProperties default to true instead of hard-coding a different value here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, see my comment below about making this optional and inferring only when it is not explicitly set on the table or in write options. I think that's much safer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot change the value of the property because other Spark versions depend on it. I did change the logic to infer only if not set, however.

}

@Test
public void testDefaultWritePartitionedUnsortedTableFanout() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We did have some tests twice: one with fanout writers and one without. I squashed them.

@aokolnychyi aokolnychyi force-pushed the skip-sort-by-default branch 2 times, most recently from 16aa547 to 53704f3 Compare September 23, 2023 00:24
@@ -126,7 +126,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
this.useFanoutWriters = writeConf.fanoutWriterEnabled() && !writeRequirements.hasOrdering();
Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to what we already do for merge-on-read writes.

boolean fanoutEnabled = context.fanoutWriterEnabled();
boolean inputOrdered = context.inputOrdered();
long targetFileSize = context.targetDataFileSize();

if (table.spec().isPartitioned() && fanoutEnabled && !inputOrdered) {
  return new FanoutDataWriter<>(writers, files, io, targetFileSize);
} else {
  return new ClusteredDataWriter<>(writers, files, io, targetFileSize);
}

Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will need to check our compaction, however. It should be fine cause we write to one partition at a time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to think through the implication of this change.

This assumes that if an ordering is present in writeRequirements then that ordering clusters data by partition. When clustered by partition, there's no need to use a fanout writer. I think it's always the case that an ordering will cluster by partition when generated by Iceberg, but this is not a guarantee of SparkWriteRequirements so it's a little risky.

Another implication is that fanoutWriterEnabled can disable fanout writes but not enable fanout writes as an override, which is a behavior change. If there is a case where incoming data isn't clustered for some reason -- whether that's a bug in Iceberg or a problem in Spark -- then there is no way to override the behavior and use fanout to fix the write. I think that's a problem because it would block writes to a table.

I think that this PR should instead make SPARK_WRITE_PARTITIONED_FANOUT_ENABLED optional. When present the explicit value should be used. When not present, then the logic should default to !writeRequirements.hasOrdering.

@aokolnychyi aokolnychyi added this to the Iceberg 1.4.0 milestone Sep 23, 2023
@aokolnychyi
Copy link
Contributor Author

cc @RussellSpitzer @rdblue

@RussellSpitzer
Copy link
Member

RussellSpitzer commented Sep 23, 2023

We talked about this before offline and I think it's probably the right thing to do.

I only wonder about the combination between this and our inability to handle distribution and sort orders in older Spark versions. Currently we do fail out when folks attempt to write unsourced data during a CTAS statement on older spark releases. This would cause those writes to succeed but probably create a huge number of files.

For example before if I did a statement like

Create table ... PARTITIONED BY bucket (x, 100) ... as SELECT ...

Previously on Spark 3.4 this would fail because the data within the write tasks would not abide by the partitioning request. Users don't like this but it does prevent them from accidentally making 100 files per spark task because the distribution from the partitioning was ignored. With Fanout the write would succeed and we would end up writing those 100 files per spark task.

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Sep 23, 2023

I only wonder about the combination between this and our inability to handle distribution and sort orders in older Spark versions. Currently we do fail out when folks attempt to write unsourced data during a CTAS statement on older spark releases. This would cause those writes to succeed but probably create a huge number of files.

Spark 3.5 respects the requested distribution in CTAS and RTAS. I added that in SPARK-43088. There should be no use case where we would not request a hash distribution for a partitioned table by default (row-level operations, transforms, etc). That means users would have to explicitly override the distribution mode to get into such a situation. If it happens, they probably are OK with the number of produced files, which would be the same with both writers? The only difference is a local sort that can spill vs keeping a row group per seen partition.

@RussellSpitzer
Copy link
Member

That's fine with me, so we will explicitly not be backporting this change.

@aokolnychyi
Copy link
Contributor Author

@RussellSpitzer, correct. I'd keep it specific to 3.5. These local sorts will be more critical in 3.5 because of our plan to bump the default advisory partition size.

@aokolnychyi
Copy link
Contributor Author

Another use case that would be affected is if someone disables distribution and ordering explicitly. Previously, such runs would go through clustered writers by default. However, I'd assume the user should know what they are doing if they disable it? If we were to enable fanout writers only if a distribution is requested, it would complicate the implementation. That said, we can do it if you think that's safer @RussellSpitzer @rdblue.

return useFanoutWriter(defaultValue);
}

private boolean useFanoutWriter(boolean defaultValue) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can simply expose this version and get rid of the one above. However, it felt a bit safer to only expose the one accepting write requirements to send the message the default value depends on how the data is ordered.

Thoughts, @RussellSpitzer @rdblue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good to have useFanoutWriter(SparkWriteRequirements), but this method breaks the API convention of avoiding confusing boolean arguments. For example, the method may return true when called as useFanoutWriter(false).

Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any ideas on how to avoid that? It is a default value in this case, not an abstract flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An internal method with a boolean would be fine. Or maybe have a method like fanoutWriterEnabled() vs useFanoutWriter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is already private, let me play around with this locally.

@@ -174,12 +174,17 @@ public long targetDataFileSize() {
.parse();
}

public boolean fanoutWriterEnabled() {
public boolean useFanoutWriter(SparkWriteRequirements writeRequirements) {
boolean defaultValue = !writeRequirements.hasOrdering();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be slightly more complex? If the distribution mode was explicitly set to none, for example, then this is not considered safe. I think we should ensure that there is no ordering and that there is a requested distribution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reasoned a bit about this here. I am open to that.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we may want to keep it this way.

Another use case that may benefit from the current approach is SPJ, where users set the distribution mode to none. They would get a super expensive local sort and spill without explicitly enabling fanout writers. I think that's a more realistic use case than setting to none and generating tons of files per task. If the user sets it to none explicitly, they are probably OK with the number of produced files, which hints it is not a crazy number. So why do a local sort for them?

We generally preferred safe options vs more performant and it meant more configs required to improve the performance. I'd say let's turn that around.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think, @rdblue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the argument against it is that this would blow up memory. A local sort is at least memory-safe but opening hundreds of Parquet files at once is not and could cause an OutOfMemoryError.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, Anton and I spoke directly and concluded that this is probably a good idea after all. Distributing with none is never done by default and must be explicitly set. In that case, I think it's reasonable to need to tune a bit more and possibly add a local sort to handle this case.

boolean fanoutEnabled) {
if (fanoutEnabled) {
boolean fanout) {
if (fanout) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why rename this? It seems like fanoutEnabled was fine in context. I don't think it implied that a broader option was set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a leftover from the previous version where enabling the fanout writer did not mean using it. I can remove this one.

@rdblue
Copy link
Contributor

rdblue commented Sep 26, 2023

Looks good to me. Thanks, @aokolnychyi!

@aokolnychyi aokolnychyi merged commit b7296a7 into apache:master Sep 26, 2023
37 checks passed
@aokolnychyi
Copy link
Contributor Author

Thanks, @RussellSpitzer @rdblue!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants