Skip to content

Commit

Permalink
Core, Spark 3.4: Adjust split size to benefit from parallelism (#7714)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Jul 28, 2023
1 parent a7a09d4 commit 869301b
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 2 deletions.
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ private TableProperties() {}
public static final String SPLIT_OPEN_FILE_COST = "read.split.open-file-cost";
public static final long SPLIT_OPEN_FILE_COST_DEFAULT = 4 * 1024 * 1024; // 4MB

public static final String ADAPTIVE_SPLIT_SIZE_ENABLED = "read.split.adaptive-size.enabled";
public static final boolean ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT = true;

public static final String PARQUET_VECTORIZATION_ENABLED = "read.parquet.vectorization.enabled";
public static final boolean PARQUET_VECTORIZATION_ENABLED_DEFAULT = true;

Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.util;

import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
Expand All @@ -42,10 +43,13 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.types.Types;

public class TableScanUtil {

private static final long MIN_SPLIT_SIZE = 16 * 1024 * 1024; // 16 MB

private TableScanUtil() {}

public static boolean hasDeletes(CombinedScanTask task) {
Expand Down Expand Up @@ -246,6 +250,15 @@ public static <T extends ScanTask> List<T> mergeTasks(List<T> tasks) {
return mergedTasks;
}

public static long adjustSplitSize(long scanSize, int parallelism, long splitSize) {
// use the configured split size if it produces at least one split per slot
// otherwise, adjust the split size to target parallelism with a reasonable minimum
// increasing the split size may cause expensive spills and is not done automatically
long splitCount = LongMath.divide(scanSize, splitSize, RoundingMode.CEILING);
long adjustedSplitSize = Math.max(scanSize / parallelism, Math.min(MIN_SPLIT_SIZE, splitSize));
return splitCount < parallelism ? adjustedSplitSize : splitSize;
}

private static void validatePlanningArguments(long splitSize, int lookback, long openFileCost) {
Preconditions.checkArgument(splitSize > 0, "Split size must be > 0: %s", splitSize);
Preconditions.checkArgument(lookback > 0, "Split planning lookback must be > 0: %s", lookback);
Expand Down
14 changes: 14 additions & 0 deletions core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,20 @@ public void testTaskGroupPlanningByPartition() {
.hasMessageStartingWith("Cannot find field");
}

@Test
public void testAdaptiveSplitSize() {
long scanSize = 500L * 1024 * 1024 * 1024; // 500 GB
int parallelism = 500;
long smallDefaultSplitSize = 128 * 1024 * 1024; // 128 MB
long largeDefaultSplitSize = 2L * 1024 * 1024 * 1024; // 2 GB

long adjusted1 = TableScanUtil.adjustSplitSize(scanSize, parallelism, smallDefaultSplitSize);
assertThat(adjusted1).isEqualTo(smallDefaultSplitSize);

long adjusted2 = TableScanUtil.adjustSplitSize(scanSize, parallelism, largeDefaultSplitSize);
assertThat(adjusted2).isEqualTo(scanSize / parallelism);
}

private PartitionScanTask taskWithPartition(
PartitionSpec spec, StructLike partition, long sizeBytes) {
PartitionScanTask task = Mockito.mock(PartitionScanTask.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,12 @@ public boolean aggregatePushDownEnabled() {
.defaultValue(SparkSQLProperties.AGGREGATE_PUSH_DOWN_ENABLED_DEFAULT)
.parse();
}

public boolean adaptiveSplitSizeEnabled() {
return confParser
.booleanConf()
.tableProperty(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED)
.defaultValue(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT)
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
CloseableIterable<ScanTaskGroup<T>> plannedTaskGroups =
TableScanUtil.planTaskGroups(
CloseableIterable.withNoopClose(tasks()),
scan.targetSplitSize(),
adjustSplitSize(tasks(), scan.targetSplitSize()),
scan.splitLookback(),
scan.splitOpenFileCost());
this.taskGroups = Lists.newArrayList(plannedTaskGroups);
Expand All @@ -214,7 +214,7 @@ protected synchronized List<ScanTaskGroup<T>> taskGroups() {
List<ScanTaskGroup<T>> plannedTaskGroups =
TableScanUtil.planTaskGroups(
tasks(),
scan.targetSplitSize(),
adjustSplitSize(tasks(), scan.targetSplitSize()),
scan.splitLookback(),
scan.splitOpenFileCost(),
groupingKeyType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.metric.CustomMetric;
Expand Down Expand Up @@ -221,4 +223,14 @@ public CustomMetric[] supportedCustomMetrics() {
new SkippedDataFiles()
};
}

protected long adjustSplitSize(List<? extends ScanTask> tasks, long splitSize) {
if (readConf.splitSizeOption() == null && readConf.adaptiveSplitSizeEnabled()) {
long scanSize = tasks.stream().mapToLong(ScanTask::sizeBytes).sum();
int parallelism = sparkContext.defaultParallelism();
return TableScanUtil.adjustSplitSize(scanSize, parallelism, splitSize);
} else {
return splitSize;
}
}
}

0 comments on commit 869301b

Please sign in to comment.