Skip to content

Commit

Permalink
Spark 3.5: Use fanout writers for unsorted tables by default (#8621)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Sep 26, 2023
1 parent 6b33b82 commit b7296a7
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 372 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public void testDisabledDistributionAndOrdering() {
inputDF
.writeTo(tableName)
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
.option(SparkWriteOptions.FANOUT_ENABLED, "false")
.append();
} catch (NoSuchTableException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public void testBatchAppend() throws Exception {
.sortWithinPartitions("id")
.writeTo(tableName)
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
.option(SparkWriteOptions.FANOUT_ENABLED, "false")
.append())
.isInstanceOf(SparkException.class)
.hasMessageContaining("Encountered records that belong to already closed files");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,21 @@ public long targetDataFileSize() {
.parse();
}

public boolean fanoutWriterEnabled() {
public boolean useFanoutWriter(SparkWriteRequirements writeRequirements) {
boolean defaultValue = !writeRequirements.hasOrdering();
return fanoutWriterEnabled(defaultValue);
}

private boolean fanoutWriterEnabled() {
return fanoutWriterEnabled(true /* enabled by default */);
}

private boolean fanoutWriterEnabled(boolean defaultValue) {
return confParser
.booleanConf()
.option(SparkWriteOptions.FANOUT_ENABLED)
.tableProperty(TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED)
.defaultValue(TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT)
.defaultValue(defaultValue)
.parse();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,10 @@ protected PartitioningWriter<InternalRow, DataWriteResult> newDataWriter(
Table table, SparkFileWriterFactory writers, OutputFileFactory files, Context context) {

FileIO io = table.io();
boolean fanoutEnabled = context.fanoutWriterEnabled();
boolean inputOrdered = context.inputOrdered();
boolean useFanoutWriter = context.useFanoutWriter();
long targetFileSize = context.targetDataFileSize();

if (table.spec().isPartitioned() && fanoutEnabled && !inputOrdered) {
if (table.spec().isPartitioned() && useFanoutWriter) {
return new FanoutDataWriter<>(writers, files, io, targetFileSize);
} else {
return new ClusteredDataWriter<>(writers, files, io, targetFileSize);
Expand Down Expand Up @@ -670,7 +669,7 @@ private static class Context implements Serializable {
private final FileFormat deleteFileFormat;
private final long targetDeleteFileSize;
private final String queryId;
private final boolean fanoutWriterEnabled;
private final boolean useFanoutWriter;
private final boolean inputOrdered;

Context(
Expand All @@ -687,7 +686,7 @@ private static class Context implements Serializable {
this.targetDeleteFileSize = writeConf.targetDeleteFileSize();
this.metadataSparkType = info.metadataSchema().get();
this.queryId = info.queryId();
this.fanoutWriterEnabled = writeConf.fanoutWriterEnabled();
this.useFanoutWriter = writeConf.useFanoutWriter(writeRequirements);
this.inputOrdered = writeRequirements.hasOrdering();
}

Expand Down Expand Up @@ -723,8 +722,8 @@ String queryId() {
return queryId;
}

boolean fanoutWriterEnabled() {
return fanoutWriterEnabled;
boolean useFanoutWriter() {
return useFanoutWriter;
}

boolean inputOrdered() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
private final Schema writeSchema;
private final StructType dsSchema;
private final Map<String, String> extraSnapshotMetadata;
private final boolean partitionedFanoutEnabled;
private final boolean useFanoutWriter;
private final SparkWriteRequirements writeRequirements;
private final Map<String, String> writeProperties;

Expand Down Expand Up @@ -126,7 +126,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
this.useFanoutWriter = writeConf.useFanoutWriter(writeRequirements);
this.writeRequirements = writeRequirements;
this.outputSpecId = writeConf.outputSpecId();
this.writeProperties = writeConf.writeProperties();
Expand Down Expand Up @@ -188,7 +188,7 @@ private WriterFactory createWriterFactory() {
targetFileSize,
writeSchema,
dsSchema,
partitionedFanoutEnabled,
useFanoutWriter,
writeProperties);
}

Expand Down Expand Up @@ -617,7 +617,7 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr
private final long targetFileSize;
private final Schema writeSchema;
private final StructType dsSchema;
private final boolean partitionedFanoutEnabled;
private final boolean useFanoutWriter;
private final String queryId;
private final Map<String, String> writeProperties;

Expand All @@ -629,15 +629,15 @@ protected WriterFactory(
long targetFileSize,
Schema writeSchema,
StructType dsSchema,
boolean partitionedFanoutEnabled,
boolean useFanoutWriter,
Map<String, String> writeProperties) {
this.tableBroadcast = tableBroadcast;
this.format = format;
this.outputSpecId = outputSpecId;
this.targetFileSize = targetFileSize;
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
this.partitionedFanoutEnabled = partitionedFanoutEnabled;
this.useFanoutWriter = useFanoutWriter;
this.queryId = queryId;
this.writeProperties = writeProperties;
}
Expand Down Expand Up @@ -678,7 +678,7 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long e
writeSchema,
dsSchema,
targetFileSize,
partitionedFanoutEnabled);
useFanoutWriter);
}
}
}
Expand Down
Loading

0 comments on commit b7296a7

Please sign in to comment.