From 869301b96bd3897ae11b4e8c5701f4091cd7914b Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Fri, 28 Jul 2023 14:58:16 -0700 Subject: [PATCH] Core, Spark 3.4: Adjust split size to benefit from parallelism (#7714) --- .../java/org/apache/iceberg/TableProperties.java | 3 +++ .../org/apache/iceberg/util/TableScanUtil.java | 13 +++++++++++++ .../org/apache/iceberg/util/TestTableScanUtil.java | 14 ++++++++++++++ .../org/apache/iceberg/spark/SparkReadConf.java | 8 ++++++++ .../spark/source/SparkPartitioningAwareScan.java | 4 ++-- .../org/apache/iceberg/spark/source/SparkScan.java | 12 ++++++++++++ 6 files changed, 52 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index b14354def6ac..a9116bc57f83 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -216,6 +216,9 @@ private TableProperties() {} public static final String SPLIT_OPEN_FILE_COST = "read.split.open-file-cost"; public static final long SPLIT_OPEN_FILE_COST_DEFAULT = 4 * 1024 * 1024; // 4MB + public static final String ADAPTIVE_SPLIT_SIZE_ENABLED = "read.split.adaptive-size.enabled"; + public static final boolean ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT = true; + public static final String PARQUET_VECTORIZATION_ENABLED = "read.parquet.vectorization.enabled"; public static final boolean PARQUET_VECTORIZATION_ENABLED_DEFAULT = true; diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java index af3c28c81d6c..6e25e380dd21 100644 --- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.util; +import java.math.RoundingMode; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -42,10 +43,13 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.types.Types; public class TableScanUtil { + private static final long MIN_SPLIT_SIZE = 16 * 1024 * 1024; // 16 MB + private TableScanUtil() {} public static boolean hasDeletes(CombinedScanTask task) { @@ -246,6 +250,15 @@ public static List mergeTasks(List tasks) { return mergedTasks; } + public static long adjustSplitSize(long scanSize, int parallelism, long splitSize) { + // use the configured split size if it produces at least one split per slot + // otherwise, adjust the split size to target parallelism with a reasonable minimum + // increasing the split size may cause expensive spills and is not done automatically + long splitCount = LongMath.divide(scanSize, splitSize, RoundingMode.CEILING); + long adjustedSplitSize = Math.max(scanSize / parallelism, Math.min(MIN_SPLIT_SIZE, splitSize)); + return splitCount < parallelism ? adjustedSplitSize : splitSize; + } + private static void validatePlanningArguments(long splitSize, int lookback, long openFileCost) { Preconditions.checkArgument(splitSize > 0, "Split size must be > 0: %s", splitSize); Preconditions.checkArgument(lookback > 0, "Split planning lookback must be > 0: %s", lookback); diff --git a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java index ee454de00c18..0dff941616ab 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java +++ b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java @@ -240,6 +240,20 @@ public void testTaskGroupPlanningByPartition() { .hasMessageStartingWith("Cannot find field"); } + @Test + public void testAdaptiveSplitSize() { + long scanSize = 500L * 1024 * 1024 * 1024; // 500 GB + int parallelism = 500; + long smallDefaultSplitSize = 128 * 1024 * 1024; // 128 MB + long largeDefaultSplitSize = 2L * 1024 * 1024 * 1024; // 2 GB + + long adjusted1 = TableScanUtil.adjustSplitSize(scanSize, parallelism, smallDefaultSplitSize); + assertThat(adjusted1).isEqualTo(smallDefaultSplitSize); + + long adjusted2 = TableScanUtil.adjustSplitSize(scanSize, parallelism, largeDefaultSplitSize); + assertThat(adjusted2).isEqualTo(scanSize / parallelism); + } + private PartitionScanTask taskWithPartition( PartitionSpec spec, StructLike partition, long sizeBytes) { PartitionScanTask task = Mockito.mock(PartitionScanTask.class); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 421e7a07a162..85e368d8cf69 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -267,4 +267,12 @@ public boolean aggregatePushDownEnabled() { .defaultValue(SparkSQLProperties.AGGREGATE_PUSH_DOWN_ENABLED_DEFAULT) .parse(); } + + public boolean adaptiveSplitSizeEnabled() { + return confParser + .booleanConf() + .tableProperty(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED) + .defaultValue(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT) + .parse(); + } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java index 6538268697e2..141dd4dcba0e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java @@ -200,7 +200,7 @@ protected synchronized List> taskGroups() { CloseableIterable> plannedTaskGroups = TableScanUtil.planTaskGroups( CloseableIterable.withNoopClose(tasks()), - scan.targetSplitSize(), + adjustSplitSize(tasks(), scan.targetSplitSize()), scan.splitLookback(), scan.splitOpenFileCost()); this.taskGroups = Lists.newArrayList(plannedTaskGroups); @@ -214,7 +214,7 @@ protected synchronized List> taskGroups() { List> plannedTaskGroups = TableScanUtil.planTaskGroups( tasks(), - scan.targetSplitSize(), + adjustSplitSize(tasks(), scan.targetSplitSize()), scan.splitLookback(), scan.splitOpenFileCost(), groupingKeyType()); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 65c5e04f31a1..535d43853f3b 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; @@ -51,6 +52,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.metric.CustomMetric; @@ -221,4 +223,14 @@ public CustomMetric[] supportedCustomMetrics() { new SkippedDataFiles() }; } + + protected long adjustSplitSize(List tasks, long splitSize) { + if (readConf.splitSizeOption() == null && readConf.adaptiveSplitSizeEnabled()) { + long scanSize = tasks.stream().mapToLong(ScanTask::sizeBytes).sum(); + int parallelism = sparkContext.defaultParallelism(); + return TableScanUtil.adjustSplitSize(scanSize, parallelism, splitSize); + } else { + return splitSize; + } + } }