From b7296a755ba9c3e9236a43f99225c580cd9abdc0 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Tue, 26 Sep 2023 16:04:30 -0700 Subject: [PATCH] Spark 3.5: Use fanout writers for unsorted tables by default (#8621) --- .../TestRequiredDistributionAndOrdering.java | 1 + .../spark/extensions/TestWriteAborts.java | 1 + .../apache/iceberg/spark/SparkWriteConf.java | 13 +- .../spark/source/SparkPositionDeltaWrite.java | 13 +- .../iceberg/spark/source/SparkWrite.java | 14 +- .../TestSparkDistributionAndOrderingUtil.java | 409 +++++------------- .../spark/source/TestFilteredScan.java | 59 +-- .../spark/source/TestPartitionValues.java | 4 + .../TestRequiredDistributionAndOrdering.java | 1 + 9 files changed, 143 insertions(+), 372 deletions(-) diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java index fcdf9bf992bb..809de08379f0 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java @@ -195,6 +195,7 @@ public void testDisabledDistributionAndOrdering() { inputDF .writeTo(tableName) .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false") + .option(SparkWriteOptions.FANOUT_ENABLED, "false") .append(); } catch (NoSuchTableException e) { throw new RuntimeException(e); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java index 15484f45f895..4d87099572b8 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java @@ -112,6 +112,7 @@ public void testBatchAppend() throws Exception { .sortWithinPartitions("id") .writeTo(tableName) .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false") + .option(SparkWriteOptions.FANOUT_ENABLED, "false") .append()) .isInstanceOf(SparkException.class) .hasMessageContaining("Encountered records that belong to already closed files"); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 28a586e3e919..834e839874f7 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -172,12 +172,21 @@ public long targetDataFileSize() { .parse(); } - public boolean fanoutWriterEnabled() { + public boolean useFanoutWriter(SparkWriteRequirements writeRequirements) { + boolean defaultValue = !writeRequirements.hasOrdering(); + return fanoutWriterEnabled(defaultValue); + } + + private boolean fanoutWriterEnabled() { + return fanoutWriterEnabled(true /* enabled by default */); + } + + private boolean fanoutWriterEnabled(boolean defaultValue) { return confParser .booleanConf() .option(SparkWriteOptions.FANOUT_ENABLED) .tableProperty(TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED) - .defaultValue(TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT) + .defaultValue(defaultValue) .parse(); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 9fea33948b3e..a8145c2abaa0 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -406,11 +406,10 @@ protected PartitioningWriter newDataWriter( Table table, SparkFileWriterFactory writers, OutputFileFactory files, Context context) { FileIO io = table.io(); - boolean fanoutEnabled = context.fanoutWriterEnabled(); - boolean inputOrdered = context.inputOrdered(); + boolean useFanoutWriter = context.useFanoutWriter(); long targetFileSize = context.targetDataFileSize(); - if (table.spec().isPartitioned() && fanoutEnabled && !inputOrdered) { + if (table.spec().isPartitioned() && useFanoutWriter) { return new FanoutDataWriter<>(writers, files, io, targetFileSize); } else { return new ClusteredDataWriter<>(writers, files, io, targetFileSize); @@ -670,7 +669,7 @@ private static class Context implements Serializable { private final FileFormat deleteFileFormat; private final long targetDeleteFileSize; private final String queryId; - private final boolean fanoutWriterEnabled; + private final boolean useFanoutWriter; private final boolean inputOrdered; Context( @@ -687,7 +686,7 @@ private static class Context implements Serializable { this.targetDeleteFileSize = writeConf.targetDeleteFileSize(); this.metadataSparkType = info.metadataSchema().get(); this.queryId = info.queryId(); - this.fanoutWriterEnabled = writeConf.fanoutWriterEnabled(); + this.useFanoutWriter = writeConf.useFanoutWriter(writeRequirements); this.inputOrdered = writeRequirements.hasOrdering(); } @@ -723,8 +722,8 @@ String queryId() { return queryId; } - boolean fanoutWriterEnabled() { - return fanoutWriterEnabled; + boolean useFanoutWriter() { + return useFanoutWriter; } boolean inputOrdered() { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 15881098e7a3..802c789ce84f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -98,7 +98,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { private final Schema writeSchema; private final StructType dsSchema; private final Map extraSnapshotMetadata; - private final boolean partitionedFanoutEnabled; + private final boolean useFanoutWriter; private final SparkWriteRequirements writeRequirements; private final Map writeProperties; @@ -126,7 +126,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering { this.writeSchema = writeSchema; this.dsSchema = dsSchema; this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata(); - this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled(); + this.useFanoutWriter = writeConf.useFanoutWriter(writeRequirements); this.writeRequirements = writeRequirements; this.outputSpecId = writeConf.outputSpecId(); this.writeProperties = writeConf.writeProperties(); @@ -188,7 +188,7 @@ private WriterFactory createWriterFactory() { targetFileSize, writeSchema, dsSchema, - partitionedFanoutEnabled, + useFanoutWriter, writeProperties); } @@ -617,7 +617,7 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr private final long targetFileSize; private final Schema writeSchema; private final StructType dsSchema; - private final boolean partitionedFanoutEnabled; + private final boolean useFanoutWriter; private final String queryId; private final Map writeProperties; @@ -629,7 +629,7 @@ protected WriterFactory( long targetFileSize, Schema writeSchema, StructType dsSchema, - boolean partitionedFanoutEnabled, + boolean useFanoutWriter, Map writeProperties) { this.tableBroadcast = tableBroadcast; this.format = format; @@ -637,7 +637,7 @@ protected WriterFactory( this.targetFileSize = targetFileSize; this.writeSchema = writeSchema; this.dsSchema = dsSchema; - this.partitionedFanoutEnabled = partitionedFanoutEnabled; + this.useFanoutWriter = useFanoutWriter; this.queryId = queryId; this.writeProperties = writeProperties; } @@ -678,7 +678,7 @@ public DataWriter createWriter(int partitionId, long taskId, long e writeSchema, dsSchema, targetFileSize, - partitionedFanoutEnabled); + useFanoutWriter); } } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java index 79374edc3f16..7ed34d4016ba 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java @@ -241,6 +241,8 @@ public void testDefaultWritePartitionedUnsortedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; Distribution expectedDistribution = Distributions.clustered(expectedClustering); @@ -252,23 +254,8 @@ public void testDefaultWritePartitionedUnsortedTable() { }; checkWriteDistributionAndOrdering(table, expectedDistribution, expectedOrdering); - } - - @Test - public void testDefaultWritePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - Table table = validationCatalog.loadTable(tableIdent); - - table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true").commit(); - - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); + enableFanoutWriters(table); checkWriteDistributionAndOrdering(table, expectedDistribution, EMPTY_ORDERING); } @@ -285,6 +272,8 @@ public void testHashWritePartitionedUnsortedTable() { table.updateProperties().set(WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; Distribution expectedDistribution = Distributions.clustered(expectedClustering); @@ -296,27 +285,8 @@ public void testHashWritePartitionedUnsortedTable() { }; checkWriteDistributionAndOrdering(table, expectedDistribution, expectedOrdering); - } - @Test - public void testHashWritePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); - - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); + enableFanoutWriters(table); checkWriteDistributionAndOrdering(table, expectedDistribution, EMPTY_ORDERING); } @@ -333,6 +303,8 @@ public void testRangeWritePartitionedUnsortedTable() { table.updateProperties().set(WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), @@ -342,31 +314,8 @@ public void testRangeWritePartitionedUnsortedTable() { Distribution expectedDistribution = Distributions.ordered(expectedOrdering); checkWriteDistributionAndOrdering(table, expectedDistribution, expectedOrdering); - } - - @Test - public void testRangeWritePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); - - SortOrder[] expectedOrdering = - new SortOrder[] { - Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), - Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING) - }; - - Distribution expectedDistribution = Distributions.ordered(expectedOrdering); + enableFanoutWriters(table); checkWriteDistributionAndOrdering(table, expectedDistribution, EMPTY_ORDERING); } @@ -434,6 +383,8 @@ public void testRangeWritePartitionedSortedTable() { table.replaceSortOrder().asc("id").commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), @@ -443,29 +394,8 @@ public void testRangeWritePartitionedSortedTable() { Distribution expectedDistribution = Distributions.ordered(expectedOrdering); checkWriteDistributionAndOrdering(table, expectedDistribution, expectedOrdering); - } - - @Test - public void testRangeWritePartitionedSortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date)", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - table.replaceSortOrder().asc("id").commit(); - - table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true").commit(); - - SortOrder[] expectedOrdering = - new SortOrder[] { - Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), - Expressions.sort(Expressions.column("id"), SortDirection.ASCENDING) - }; - Distribution expectedDistribution = Distributions.ordered(expectedOrdering); + enableFanoutWriters(table); checkWriteDistributionAndOrdering(table, expectedDistribution, expectedOrdering); } @@ -642,6 +572,8 @@ public void testDefaultCopyOnWriteDeletePartitionedUnsortedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; Distribution expectedDistribution = Distributions.clustered(expectedClustering); @@ -653,23 +585,8 @@ public void testDefaultCopyOnWriteDeletePartitionedUnsortedTable() { }; checkCopyOnWriteDistributionAndOrdering(table, DELETE, expectedDistribution, expectedOrdering); - } - @Test - public void testDefaultCopyOnWriteDeletePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true").commit(); - - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, DELETE, expectedDistribution, EMPTY_ORDERING); } @@ -686,6 +603,8 @@ public void testNoneCopyOnWriteDeletePartitionedUnsortedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), @@ -694,23 +613,8 @@ public void testNoneCopyOnWriteDeletePartitionedUnsortedTable() { checkCopyOnWriteDistributionAndOrdering( table, DELETE, UNSPECIFIED_DISTRIBUTION, expectedOrdering); - } - - @Test - public void testNoneCopyOnWriteDeletePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering( table, DELETE, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING); @@ -728,6 +632,8 @@ public void testHashCopyOnWriteDeletePartitionedUnsortedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; Distribution expectedDistribution = Distributions.clustered(expectedClustering); @@ -739,27 +645,8 @@ public void testHashCopyOnWriteDeletePartitionedUnsortedTable() { }; checkCopyOnWriteDistributionAndOrdering(table, DELETE, expectedDistribution, expectedOrdering); - } - - @Test - public void testHashCopyOnWriteDeletePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - table - .updateProperties() - .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); - - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, DELETE, expectedDistribution, EMPTY_ORDERING); } @@ -776,6 +663,8 @@ public void testRangeCopyOnWriteDeletePartitionedUnsortedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), @@ -785,30 +674,8 @@ public void testRangeCopyOnWriteDeletePartitionedUnsortedTable() { Distribution expectedDistribution = Distributions.ordered(expectedOrdering); checkCopyOnWriteDistributionAndOrdering(table, DELETE, expectedDistribution, expectedOrdering); - } - - @Test - public void testRangeCopyOnWriteDeletePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); - - SortOrder[] expectedOrdering = - new SortOrder[] { - Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), - Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING) - }; - Distribution expectedDistribution = Distributions.ordered(expectedOrdering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, DELETE, expectedDistribution, EMPTY_ORDERING); } @@ -1086,6 +953,8 @@ public void testDefaultCopyOnWriteUpdatePartitionedUnsortedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; Distribution expectedDistribution = Distributions.clustered(expectedClustering); @@ -1097,23 +966,8 @@ public void testDefaultCopyOnWriteUpdatePartitionedUnsortedTable() { }; checkCopyOnWriteDistributionAndOrdering(table, UPDATE, expectedDistribution, expectedOrdering); - } - - @Test - public void testDefaultCopyOnWriteUpdatePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true").commit(); - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, UPDATE, expectedDistribution, EMPTY_ORDERING); } @@ -1130,6 +984,8 @@ public void testNoneCopyOnWriteUpdatePartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), @@ -1138,23 +994,8 @@ public void testNoneCopyOnWriteUpdatePartitionedUnsortedTable() { checkCopyOnWriteDistributionAndOrdering( table, UPDATE, UNSPECIFIED_DISTRIBUTION, expectedOrdering); - } - @Test - public void testNoneCopyOnWriteUpdatePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering( table, UPDATE, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING); @@ -1172,6 +1013,8 @@ public void testHashCopyOnWriteUpdatePartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; Distribution expectedDistribution = Distributions.clustered(expectedClustering); @@ -1183,27 +1026,8 @@ public void testHashCopyOnWriteUpdatePartitionedUnsortedTable() { }; checkCopyOnWriteDistributionAndOrdering(table, UPDATE, expectedDistribution, expectedOrdering); - } - - @Test - public void testHashCopyOnWriteUpdatePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, UPDATE, expectedDistribution, EMPTY_ORDERING); } @@ -1220,6 +1044,8 @@ public void testRangeCopyOnWriteUpdatePartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), @@ -1229,30 +1055,8 @@ public void testRangeCopyOnWriteUpdatePartitionedUnsortedTable() { Distribution expectedDistribution = Distributions.ordered(expectedOrdering); checkCopyOnWriteDistributionAndOrdering(table, UPDATE, expectedDistribution, expectedOrdering); - } - - @Test - public void testRangeCopyOnWriteUpdatePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); - - SortOrder[] expectedOrdering = - new SortOrder[] { - Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), - Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING) - }; - Distribution expectedDistribution = Distributions.ordered(expectedOrdering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, UPDATE, expectedDistribution, EMPTY_ORDERING); } @@ -1526,6 +1330,8 @@ public void testDefaultCopyOnWriteMergePartitionedUnsortedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; Distribution expectedDistribution = Distributions.clustered(expectedClustering); @@ -1537,23 +1343,8 @@ public void testDefaultCopyOnWriteMergePartitionedUnsortedTable() { }; checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering); - } - - @Test - public void testDefaultCopyOnWriteMergePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - Table table = validationCatalog.loadTable(tableIdent); - - table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true").commit(); - - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, EMPTY_ORDERING); } @@ -1570,6 +1361,8 @@ public void testNoneCopyOnWriteMergePartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), @@ -1578,23 +1371,8 @@ public void testNoneCopyOnWriteMergePartitionedUnsortedTable() { checkCopyOnWriteDistributionAndOrdering( table, MERGE, UNSPECIFIED_DISTRIBUTION, expectedOrdering); - } - - @Test - public void testNoneCopyOnWriteMergePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - table - .updateProperties() - .set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, MERGE, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING); } @@ -1611,6 +1389,8 @@ public void testHashCopyOnWriteMergePartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; Distribution expectedDistribution = Distributions.clustered(expectedClustering); @@ -1622,27 +1402,8 @@ public void testHashCopyOnWriteMergePartitionedUnsortedTable() { }; checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering); - } - @Test - public void testHashCopyOnWriteMergePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); - - Expression[] expectedClustering = - new Expression[] {Expressions.identity("date"), Expressions.days("ts")}; - Distribution expectedDistribution = Distributions.clustered(expectedClustering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, EMPTY_ORDERING); } @@ -1659,6 +1420,8 @@ public void testRangeCopyOnWriteMergePartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), @@ -1668,30 +1431,8 @@ public void testRangeCopyOnWriteMergePartitionedUnsortedTable() { Distribution expectedDistribution = Distributions.ordered(expectedOrdering); checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering); - } - - @Test - public void testRangeCopyOnWriteMergePartitionedUnsortedTableFanout() { - sql( - "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) " - + "USING iceberg " - + "PARTITIONED BY (date, days(ts))", - tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - table - .updateProperties() - .set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE) - .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true") - .commit(); - SortOrder[] expectedOrdering = - new SortOrder[] { - Expressions.sort(Expressions.column("date"), SortDirection.ASCENDING), - Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING) - }; - Distribution expectedDistribution = Distributions.ordered(expectedOrdering); + enableFanoutWriters(table); checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, EMPTY_ORDERING); } @@ -1838,6 +1579,8 @@ public void testDefaultPositionDeltaDeleteUnpartitionedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, DELETE, @@ -1858,6 +1601,8 @@ public void testNonePositionDeltaDeleteUnpartitionedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, DELETE, UNSPECIFIED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING); @@ -1875,6 +1620,8 @@ public void testHashPositionDeltaDeleteUnpartitionedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, DELETE, @@ -1895,6 +1642,8 @@ public void testRangePositionDeltaDeleteUnpartitionedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + Distribution expectedDistribution = Distributions.ordered(SPEC_ID_PARTITION_FILE_ORDERING); checkPositionDeltaDistributionAndOrdering( @@ -1915,6 +1664,8 @@ public void testDefaultPositionDeltaDeletePartitionedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, DELETE, @@ -1939,6 +1690,8 @@ public void testNonePositionDeltaDeletePartitionedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, DELETE, UNSPECIFIED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING); @@ -1960,6 +1713,8 @@ public void testHashPositionDeltaDeletePartitionedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, DELETE, @@ -1984,6 +1739,8 @@ public void testRangePositionDeltaDeletePartitionedTable() { table.updateProperties().set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + Distribution expectedDistribution = Distributions.ordered(SPEC_ID_PARTITION_ORDERING); checkPositionDeltaDistributionAndOrdering( @@ -2063,6 +1820,8 @@ public void testDefaultPositionDeltaUpdateUnpartitionedUnsortedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, UPDATE, @@ -2083,6 +1842,8 @@ public void testNonePositionDeltaUpdateUnpartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, UPDATE, UNSPECIFIED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING); @@ -2100,6 +1861,8 @@ public void testHashPositionDeltaUpdateUnpartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, UPDATE, @@ -2120,12 +1883,14 @@ public void testRangePositionDeltaUpdateUnpartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + Distribution expectedDistribution = Distributions.ordered(SPEC_ID_PARTITION_FILE_ORDERING); checkPositionDeltaDistributionAndOrdering( table, UPDATE, expectedDistribution, SPEC_ID_PARTITION_FILE_POSITION_ORDERING); - table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true").commit(); + enableFanoutWriters(table); checkPositionDeltaDistributionAndOrdering(table, UPDATE, expectedDistribution, EMPTY_ORDERING); } @@ -2263,6 +2028,8 @@ public void testDefaultPositionDeltaUpdatePartitionedUnsortedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] { Expressions.column(MetadataColumns.SPEC_ID.name()), @@ -2306,6 +2073,8 @@ public void testNonePositionDeltaUpdatePartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort( @@ -2341,6 +2110,8 @@ public void testHashPositionDeltaUpdatePartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] { Expressions.column(MetadataColumns.SPEC_ID.name()), @@ -2384,6 +2155,8 @@ public void testRangePositionDeltaUpdatePartitionedUnsortedTable() { table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedDistributionOrdering = new SortOrder[] { Expressions.sort( @@ -2647,6 +2420,8 @@ public void testDefaultPositionDeltaMergeUnpartitionedUnsortedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] { Expressions.column(MetadataColumns.SPEC_ID.name()), @@ -2671,6 +2446,8 @@ public void testNonePositionDeltaMergeUnpartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + checkPositionDeltaDistributionAndOrdering( table, MERGE, UNSPECIFIED_DISTRIBUTION, SPEC_ID_PARTITION_FILE_POSITION_ORDERING); @@ -2688,6 +2465,8 @@ public void testHashPositionDeltaMergeUnpartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] { Expressions.column(MetadataColumns.SPEC_ID.name()), @@ -2712,6 +2491,8 @@ public void testRangePositionDeltaMergeUnpartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedDistributionOrdering = new SortOrder[] { Expressions.sort( @@ -2877,6 +2658,8 @@ public void testDefaultPositionDeltaMergePartitionedUnsortedTable() { Table table = validationCatalog.loadTable(tableIdent); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] { Expressions.column(MetadataColumns.SPEC_ID.name()), @@ -2919,6 +2702,8 @@ public void testNonePositionDeltaMergePartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedOrdering = new SortOrder[] { Expressions.sort( @@ -2954,6 +2739,8 @@ public void testHashPositionDeltaMergePartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH).commit(); + disableFanoutWriters(table); + Expression[] expectedClustering = new Expression[] { Expressions.column(MetadataColumns.SPEC_ID.name()), @@ -2996,6 +2783,8 @@ public void testRangePositionDeltaMergePartitionedUnsortedTable() { table.updateProperties().set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE).commit(); + disableFanoutWriters(table); + SortOrder[] expectedDistributionOrdering = new SortOrder[] { Expressions.sort( @@ -3227,6 +3016,10 @@ private void checkPositionDeltaDistributionAndOrdering( Assert.assertArrayEquals("Ordering must match", expectedOrdering, ordering); } + private void disableFanoutWriters(Table table) { + table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "false").commit(); + } + private void enableFanoutWriters(Table table) { table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true").commit(); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index e8af5e51ec44..0efec160e8f0 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -21,18 +21,13 @@ import static org.apache.iceberg.Files.localOutput; import static org.apache.iceberg.PlanningMode.DISTRIBUTED; import static org.apache.iceberg.PlanningMode.LOCAL; -import static org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp; -import static org.apache.spark.sql.functions.callUDF; -import static org.apache.spark.sql.functions.column; import java.io.File; import java.io.IOException; -import java.sql.Timestamp; import java.time.OffsetDateTime; import java.util.Arrays; import java.util.List; import java.util.UUID; -import java.util.function.Function; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; @@ -52,14 +47,13 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.spark.data.GenericsHelpers; -import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import org.apache.spark.sql.connector.expressions.filter.Predicate; import org.apache.spark.sql.connector.read.Batch; @@ -73,9 +67,6 @@ import org.apache.spark.sql.sources.LessThan; import org.apache.spark.sql.sources.Not; import org.apache.spark.sql.sources.StringStartsWith; -import org.apache.spark.sql.types.IntegerType$; -import org.apache.spark.sql.types.LongType$; -import org.apache.spark.sql.types.StringType$; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.assertj.core.api.Assertions; import org.junit.AfterClass; @@ -119,29 +110,6 @@ public class TestFilteredScan { @BeforeClass public static void startSpark() { TestFilteredScan.spark = SparkSession.builder().master("local[2]").getOrCreate(); - - // define UDFs used by partition tests - Function bucket4 = Transforms.bucket(4).bind(Types.LongType.get()); - spark.udf().register("bucket4", (UDF1) bucket4::apply, IntegerType$.MODULE$); - - Function day = Transforms.day().bind(Types.TimestampType.withZone()); - spark - .udf() - .register( - "ts_day", - (UDF1) timestamp -> day.apply(fromJavaTimestamp(timestamp)), - IntegerType$.MODULE$); - - Function hour = Transforms.hour().bind(Types.TimestampType.withZone()); - spark - .udf() - .register( - "ts_hour", - (UDF1) timestamp -> hour.apply(fromJavaTimestamp(timestamp)), - IntegerType$.MODULE$); - - spark.udf().register("data_ident", (UDF1) data -> data, StringType$.MODULE$); - spark.udf().register("id_ident", (UDF1) id -> id, LongType$.MODULE$); } @AfterClass @@ -299,7 +267,7 @@ public void testUnpartitionedTimestampFilter() { @Test public void testBucketPartitionedIDFilters() { - Table table = buildPartitionedTable("bucketed_by_id", BUCKET_BY_ID, "bucket4", "id"); + Table table = buildPartitionedTable("bucketed_by_id", BUCKET_BY_ID); CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location())); @@ -329,7 +297,7 @@ public void testBucketPartitionedIDFilters() { @SuppressWarnings("checkstyle:AvoidNestedBlocks") @Test public void testDayPartitionedTimestampFilters() { - Table table = buildPartitionedTable("partitioned_by_day", PARTITION_BY_DAY, "ts_day", "ts"); + Table table = buildPartitionedTable("partitioned_by_day", PARTITION_BY_DAY); CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location())); Batch unfiltered = @@ -383,7 +351,7 @@ public void testDayPartitionedTimestampFilters() { @SuppressWarnings("checkstyle:AvoidNestedBlocks") @Test public void testHourPartitionedTimestampFilters() { - Table table = buildPartitionedTable("partitioned_by_hour", PARTITION_BY_HOUR, "ts_hour", "ts"); + Table table = buildPartitionedTable("partitioned_by_hour", PARTITION_BY_HOUR); CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location())); @@ -479,8 +447,7 @@ public void testFilterByNonProjectedColumn() { @Test public void testPartitionedByDataStartsWithFilter() { - Table table = - buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA, "data_ident", "data"); + Table table = buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA); CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location())); @@ -495,8 +462,7 @@ public void testPartitionedByDataStartsWithFilter() { @Test public void testPartitionedByDataNotStartsWithFilter() { - Table table = - buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA, "data_ident", "data"); + Table table = buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA); CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location())); @@ -511,7 +477,7 @@ public void testPartitionedByDataNotStartsWithFilter() { @Test public void testPartitionedByIdStartsWith() { - Table table = buildPartitionedTable("partitioned_by_id", PARTITION_BY_ID, "id_ident", "id"); + Table table = buildPartitionedTable("partitioned_by_id", PARTITION_BY_ID); CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location())); @@ -527,7 +493,7 @@ public void testPartitionedByIdStartsWith() { @Test public void testPartitionedByIdNotStartsWith() { - Table table = buildPartitionedTable("partitioned_by_id", PARTITION_BY_ID, "id_ident", "id"); + Table table = buildPartitionedTable("partitioned_by_id", PARTITION_BY_ID); CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location())); @@ -623,8 +589,7 @@ private void pushFilters(ScanBuilder scan, Filter... filters) { filterable.pushPredicates(Arrays.stream(filters).map(Filter::toV2).toArray(Predicate[]::new)); } - private Table buildPartitionedTable( - String desc, PartitionSpec spec, String udf, String partitionColumn) { + private Table buildPartitionedTable(String desc, PartitionSpec spec) { File location = new File(parent, desc); Table table = TABLES.create(SCHEMA, spec, location.toString()); @@ -640,12 +605,10 @@ private Table buildPartitionedTable( .option(SparkReadOptions.VECTORIZATION_ENABLED, String.valueOf(vectorized)) .load(unpartitioned.toString()); + // disable fanout writers to locally order records for future verifications allRows - .coalesce(1) // ensure only 1 file per partition is written - .withColumn("part", callUDF(udf, column(partitionColumn))) - .sortWithinPartitions("part") - .drop("part") .write() + .option(SparkWriteOptions.FANOUT_ENABLED, "false") .format("iceberg") .mode("append") .save(table.location()); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java index ad0984ef4220..11153b3943b4 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java @@ -296,11 +296,13 @@ public void testPartitionValueTypes() throws Exception { Table table = tables.create(SUPPORTED_PRIMITIVES, spec, location.toString()); table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); + // disable distribution/ordering and fanout writers to preserve the original ordering sourceDF .write() .format("iceberg") .mode(SaveMode.Append) .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false") + .option(SparkWriteOptions.FANOUT_ENABLED, "false") .save(location.toString()); List actual = @@ -374,11 +376,13 @@ public void testNestedPartitionValues() throws Exception { Table table = tables.create(nestedSchema, spec, location.toString()); table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); + // disable distribution/ordering and fanout writers to preserve the original ordering sourceDF .write() .format("iceberg") .mode(SaveMode.Append) .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false") + .option(SparkWriteOptions.FANOUT_ENABLED, "false") .save(location.toString()); List actual = diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java index 6c96a33a2579..4b57dcd3c891 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java @@ -167,6 +167,7 @@ public void testDisabledDistributionAndOrdering() { inputDF .writeTo(tableName) .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false") + .option(SparkWriteOptions.FANOUT_ENABLED, "false") .append()) .cause() .isInstanceOf(IllegalStateException.class)