Skip to content

Commit

Permalink
Spark 3.4: Revise PlanningBenchmark (#8262)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Aug 9, 2023
1 parent a7a55b5 commit 0794609
Showing 1 changed file with 76 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,29 @@

import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BatchScan;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.data.RandomData;
Expand All @@ -47,8 +57,6 @@
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation;
import org.apache.spark.sql.types.StructType;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down Expand Up @@ -87,15 +95,16 @@ public class PlanningBenchmark {
private static final String SORT_KEY_COLUMN = "ss_sold_date_sk";
private static final int SORT_KEY_VALUE = 5;

private static final String SORT_KEY_PREDICATE =
String.format("%s = %s", SORT_KEY_COLUMN, SORT_KEY_VALUE);
private static final String PARTITION_AND_SORT_KEY_PREDICATE =
String.format(
"%s = %d AND %s = %d",
PARTITION_COLUMN, PARTITION_VALUE, SORT_KEY_COLUMN, SORT_KEY_VALUE);
private static final Expression SORT_KEY_PREDICATE =
Expressions.equal(SORT_KEY_COLUMN, SORT_KEY_VALUE);
private static final Expression PARTITION_PREDICATE =
Expressions.equal(PARTITION_COLUMN, PARTITION_VALUE);
private static final Expression PARTITION_AND_SORT_KEY_PREDICATE =
Expressions.and(PARTITION_PREDICATE, SORT_KEY_PREDICATE);

private static final int NUM_PARTITIONS = 30;
private static final int NUM_DATA_FILES_PER_PARTITION = 50_000;
private static final int NUM_REAL_DATA_FILES_PER_PARTITION = 25;
private static final int NUM_REPLICA_DATA_FILES_PER_PARTITION = 50_000;
private static final int NUM_DELETE_FILES_PER_PARTITION = 50;
private static final int NUM_ROWS_PER_DATA_FILE = 500;

Expand All @@ -119,22 +128,29 @@ public void tearDownBenchmark() {
@Benchmark
@Threads(1)
public void localPlanningWithPartitionAndMinMaxFilter(Blackhole blackhole) {
InputPartition[] partitions = planInputPartitions(PARTITION_AND_SORT_KEY_PREDICATE);
blackhole.consume(partitions);
List<ScanTask> fileTasks = planFilesWithoutColumnStats(PARTITION_AND_SORT_KEY_PREDICATE);
blackhole.consume(fileTasks);
}

@Benchmark
@Threads(1)
public void localPlanningWithMinMaxFilter(Blackhole blackhole) {
InputPartition[] partitions = planInputPartitions(SORT_KEY_PREDICATE);
blackhole.consume(partitions);
List<ScanTask> fileTasks = planFilesWithoutColumnStats(SORT_KEY_PREDICATE);
blackhole.consume(fileTasks);
}

@Benchmark
@Threads(1)
public void localPlanningWithoutFilter(Blackhole blackhole) {
InputPartition[] partitions = planInputPartitions("true");
blackhole.consume(partitions);
List<ScanTask> fileTasks = planFilesWithoutColumnStats(Expressions.alwaysTrue());
blackhole.consume(fileTasks);
}

@Benchmark
@Threads(1)
public void localPlanningWithoutFilterWithStats(Blackhole blackhole) {
List<ScanTask> fileTasks = planFilesWithColumnStats(Expressions.alwaysTrue());
blackhole.consume(fileTasks);
}

private void setupSpark() {
Expand Down Expand Up @@ -184,10 +200,13 @@ private void initTable() throws NoSuchTableException, ParseException {
+ "USING iceberg "
+ "PARTITIONED BY (%s) "
+ "TBLPROPERTIES ("
+ " '%s' '%b',"
+ " '%s' '%s',"
+ " '%s' '%d')",
TABLE_NAME,
PARTITION_COLUMN,
TableProperties.MANIFEST_MERGE_ENABLED,
false,
TableProperties.DELETE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName(),
TableProperties.FORMAT_VERSION,
Expand Down Expand Up @@ -215,12 +234,21 @@ private DeleteFile loadAddedDeleteFile() {
}

private void initDataAndDeletes() throws NoSuchTableException {
Schema schema = table.schema();
PartitionSpec spec = table.spec();
LocationProvider locations = table.locationProvider();

for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS; partitionOrdinal++) {
Dataset<Row> inputDF =
randomDataDF(table.schema(), NUM_ROWS_PER_DATA_FILE)
randomDataDF(schema, NUM_ROWS_PER_DATA_FILE)
.drop(PARTITION_COLUMN)
.withColumn(PARTITION_COLUMN, lit(partitionOrdinal));
appendAsFile(inputDF);
.withColumn(PARTITION_COLUMN, lit(partitionOrdinal))
.drop(SORT_KEY_COLUMN)
.withColumn(SORT_KEY_COLUMN, lit(Integer.MIN_VALUE));

for (int fileOrdinal = 0; fileOrdinal < NUM_REAL_DATA_FILES_PER_PARTITION; fileOrdinal++) {
appendAsFile(inputDF);
}

DataFile dataFile = loadAddedDataFile();

Expand All @@ -232,11 +260,12 @@ private void initDataAndDeletes() throws NoSuchTableException {

AppendFiles append = table.newFastAppend();

for (int fileOrdinal = 0; fileOrdinal < NUM_DATA_FILES_PER_PARTITION; fileOrdinal++) {
for (int fileOrdinal = 0; fileOrdinal < NUM_REPLICA_DATA_FILES_PER_PARTITION; fileOrdinal++) {
String replicaFileName = UUID.randomUUID() + "-replica.parquet";
DataFile replicaDataFile =
DataFiles.builder(table.spec())
DataFiles.builder(spec)
.copy(dataFile)
.withPath("replica-" + fileOrdinal + "-" + dataFile.path())
.withPath(locations.newDataLocation(spec, dataFile.partition(), replicaFileName))
.build();
append.appendFile(replicaDataFile);
}
Expand All @@ -246,18 +275,19 @@ private void initDataAndDeletes() throws NoSuchTableException {
RowDelta rowDelta = table.newRowDelta();

for (int fileOrdinal = 0; fileOrdinal < NUM_DELETE_FILES_PER_PARTITION; fileOrdinal++) {
String replicaFileName = UUID.randomUUID() + "-replica.parquet";
DeleteFile replicaDeleteFile =
FileMetadata.deleteFileBuilder(table.spec())
FileMetadata.deleteFileBuilder(spec)
.copy(deleteFile)
.withPath("replica-" + fileOrdinal + "-" + deleteFile.path())
.withPath(locations.newDataLocation(spec, deleteFile.partition(), replicaFileName))
.build();
rowDelta.addDeletes(replicaDeleteFile);
}

rowDelta.commit();

Dataset<Row> sortedInputDF =
randomDataDF(table.schema(), NUM_ROWS_PER_DATA_FILE)
randomDataDF(schema, NUM_ROWS_PER_DATA_FILE)
.drop(SORT_KEY_COLUMN)
.withColumn(SORT_KEY_COLUMN, lit(SORT_KEY_VALUE))
.drop(PARTITION_COLUMN)
Expand All @@ -282,16 +312,28 @@ private Dataset<Row> randomDataDF(Schema schema, int numRows) {
return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType, false);
}

private InputPartition[] planInputPartitions(String predicate) {
DataSourceV2ScanRelation relation =
(DataSourceV2ScanRelation)
spark
.sql(String.format("SELECT * FROM %s WHERE %s", TABLE_NAME, predicate))
.queryExecution()
.optimizedPlan()
.collectLeaves()
.head();
return relation.scan().toBatch().planInputPartitions();
private List<ScanTask> planFilesWithoutColumnStats(Expression predicate) {
return planFiles(predicate, false);
}

private List<ScanTask> planFilesWithColumnStats(Expression predicate) {
return planFiles(predicate, true);
}

private List<ScanTask> planFiles(Expression predicate, boolean withColumnStats) {
table.refresh();

BatchScan scan = table.newBatchScan().filter(predicate);

if (withColumnStats) {
scan.includeColumnStats();
}

try (CloseableIterable<ScanTask> fileTasks = scan.planFiles()) {
return Lists.newArrayList(fileTasks);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@FormatMethod
Expand Down

0 comments on commit 0794609

Please sign in to comment.