Skip to content

Commit

Permalink
Spark 3.5: Rework DeleteFileIndexBenchmark (apache#9165)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored and lisirrx committed Jan 4, 2024
1 parent 759c540 commit d71faa6
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 82 deletions.
191 changes: 191 additions & 0 deletions core/src/test/java/org/apache/iceberg/FileGenerationUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;

public class FileGenerationUtil {

private FileGenerationUtil() {}

public static DataFile generateDataFile(Table table, StructLike partition) {
Schema schema = table.schema();
PartitionSpec spec = table.spec();
LocationProvider locations = table.locationProvider();
String path = locations.newDataLocation(spec, partition, generateFileName());
long fileSize = generateFileSize();
Metrics metrics = generateRandomMetrics(schema);
return DataFiles.builder(spec)
.withPath(path)
.withPartition(partition)
.withFileSizeInBytes(fileSize)
.withFormat(FileFormat.PARQUET)
.withMetrics(metrics)
.build();
}

public static DeleteFile generatePositionDeleteFile(Table table, StructLike partition) {
PartitionSpec spec = table.spec();
LocationProvider locations = table.locationProvider();
String path = locations.newDataLocation(spec, partition, generateFileName());
long fileSize = generateFileSize();
Metrics metrics = generatePositionDeleteMetrics();
return FileMetadata.deleteFileBuilder(table.spec())
.ofPositionDeletes()
.withPath(path)
.withPartition(partition)
.withFileSizeInBytes(fileSize)
.withFormat(FileFormat.PARQUET)
.withMetrics(metrics)
.build();
}

public static DeleteFile generatePositionDeleteFile(Table table, DataFile dataFile) {
PartitionSpec spec = table.spec();
StructLike partition = dataFile.partition();
LocationProvider locations = table.locationProvider();
String path = locations.newDataLocation(spec, partition, generateFileName());
long fileSize = generateFileSize();
Metrics metrics = generatePositionDeleteMetrics(dataFile);
return FileMetadata.deleteFileBuilder(table.spec())
.ofPositionDeletes()
.withPath(path)
.withPartition(partition)
.withFileSizeInBytes(fileSize)
.withFormat(FileFormat.PARQUET)
.withMetrics(metrics)
.build();
}

// mimics the behavior of OutputFileFactory
public static String generateFileName() {
int partitionId = random().nextInt(100_000);
int taskId = random().nextInt(100);
UUID operationId = UUID.randomUUID();
int fileCount = random().nextInt(1_000);
return String.format("%d-%d-%s-%d.parquet", partitionId, taskId, operationId, fileCount);
}

public static Metrics generateRandomMetrics(Schema schema) {
long rowCount = generateRowCount();
Map<Integer, Long> columnSizes = Maps.newHashMap();
Map<Integer, Long> valueCounts = Maps.newHashMap();
Map<Integer, Long> nullValueCounts = Maps.newHashMap();
Map<Integer, Long> nanValueCounts = Maps.newHashMap();
Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();

for (Types.NestedField column : schema.columns()) {
int fieldId = column.fieldId();
columnSizes.put(fieldId, generateColumnSize());
valueCounts.put(fieldId, generateValueCount());
nullValueCounts.put(fieldId, (long) random().nextInt(5));
nanValueCounts.put(fieldId, (long) random().nextInt(5));
byte[] lower = new byte[16];
random().nextBytes(lower);
lowerBounds.put(fieldId, ByteBuffer.wrap(lower));
byte[] upper = new byte[16];
random().nextBytes(upper);
upperBounds.put(fieldId, ByteBuffer.wrap(upper));
}

return new Metrics(
rowCount,
columnSizes,
valueCounts,
nullValueCounts,
nanValueCounts,
lowerBounds,
upperBounds);
}

private static Metrics generatePositionDeleteMetrics(DataFile dataFile) {
long rowCount = generateRowCount();
Map<Integer, Long> columnSizes = Maps.newHashMap();
Map<Integer, ByteBuffer> lowerBounds = Maps.newHashMap();
Map<Integer, ByteBuffer> upperBounds = Maps.newHashMap();

for (Types.NestedField column : DeleteSchemaUtil.pathPosSchema().columns()) {
int fieldId = column.fieldId();
columnSizes.put(fieldId, generateColumnSize());
if (fieldId == MetadataColumns.DELETE_FILE_PATH.fieldId()) {
ByteBuffer bound = Conversions.toByteBuffer(Types.StringType.get(), dataFile.path());
lowerBounds.put(fieldId, bound);
upperBounds.put(fieldId, bound);
}
}

return new Metrics(
rowCount,
columnSizes,
null /* no value counts */,
null /* no NULL counts */,
null /* no NaN counts */,
lowerBounds,
upperBounds);
}

private static Metrics generatePositionDeleteMetrics() {
long rowCount = generateRowCount();
Map<Integer, Long> columnSizes = Maps.newHashMap();

for (Types.NestedField column : DeleteSchemaUtil.pathPosSchema().columns()) {
int fieldId = column.fieldId();
columnSizes.put(fieldId, generateColumnSize());
}

return new Metrics(
rowCount,
columnSizes,
null /* no value counts */,
null /* no NULL counts */,
null /* no NaN counts */,
null /* no lower bounds */,
null /* no upper bounds */);
}

private static long generateRowCount() {
return 100_000L + random().nextInt(1000);
}

private static long generateColumnSize() {
return 1_000_000L + random().nextInt(100_000);
}

private static long generateValueCount() {
return 100_000L + random().nextInt(100);
}

private static long generateFileSize() {
return random().nextInt(50_000);
}

private static Random random() {
return ThreadLocalRandom.current();
}
}
1 change: 1 addition & 0 deletions spark/v3.5/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation project(path: ':iceberg-parquet')
testImplementation project(path: ':iceberg-hive-metastore')
testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
testImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.iceberg;

import static org.apache.spark.sql.functions.lit;

import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import java.io.IOException;
Expand All @@ -29,25 +27,15 @@
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkSessionCatalog;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.types.StructType;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -83,10 +71,8 @@ public class DeleteFileIndexBenchmark {
private static final String PARTITION_COLUMN = "ss_ticket_number";

private static final int NUM_PARTITIONS = 50;
private static final int NUM_REAL_DATA_FILES_PER_PARTITION = 25;
private static final int NUM_REPLICA_DATA_FILES_PER_PARTITION = 50_000;
private static final int NUM_DATA_FILES_PER_PARTITION = 50_000;
private static final int NUM_DELETE_FILES_PER_PARTITION = 100;
private static final int NUM_ROWS_PER_DATA_FILE = 500;

private final Configuration hadoopConf = new Configuration();
private SparkSession spark;
Expand Down Expand Up @@ -148,85 +134,26 @@ private DeleteFileIndex buildDeletes() {
.build();
}

private DataFile loadAddedDataFile() {
table.refresh();

Iterable<DataFile> addedDataFiles = table.currentSnapshot().addedDataFiles(table.io());
return Iterables.getOnlyElement(addedDataFiles);
}

private DeleteFile loadAddedDeleteFile() {
table.refresh();

Iterable<DeleteFile> addedDeleteFiles = table.currentSnapshot().addedDeleteFiles(table.io());
return Iterables.getOnlyElement(addedDeleteFiles);
}

private void initDataAndDeletes() throws NoSuchTableException {
Schema schema = table.schema();
PartitionSpec spec = table.spec();
LocationProvider locations = table.locationProvider();

private void initDataAndDeletes() {
for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS; partitionOrdinal++) {
Dataset<Row> inputDF =
randomDataDF(schema, NUM_ROWS_PER_DATA_FILE)
.drop(PARTITION_COLUMN)
.withColumn(PARTITION_COLUMN, lit(partitionOrdinal));

for (int fileOrdinal = 0; fileOrdinal < NUM_REAL_DATA_FILES_PER_PARTITION; fileOrdinal++) {
appendAsFile(inputDF);
}
StructLike partition = TestHelpers.Row.of(partitionOrdinal);

DataFile dataFile = loadAddedDataFile();

sql(
"DELETE FROM %s WHERE ss_item_sk IS NULL AND %s = %d",
TABLE_NAME, PARTITION_COLUMN, partitionOrdinal);

DeleteFile deleteFile = loadAddedDeleteFile();

AppendFiles append = table.newFastAppend();
RowDelta rowDelta = table.newRowDelta();

for (int fileOrdinal = 0; fileOrdinal < NUM_REPLICA_DATA_FILES_PER_PARTITION; fileOrdinal++) {
String replicaFileName = UUID.randomUUID() + "-replica.parquet";
DataFile replicaDataFile =
DataFiles.builder(spec)
.copy(dataFile)
.withPath(locations.newDataLocation(spec, dataFile.partition(), replicaFileName))
.build();
append.appendFile(replicaDataFile);
for (int fileOrdinal = 0; fileOrdinal < NUM_DATA_FILES_PER_PARTITION; fileOrdinal++) {
DataFile dataFile = FileGenerationUtil.generateDataFile(table, partition);
rowDelta.addRows(dataFile);
}

append.commit();

RowDelta rowDelta = table.newRowDelta();

for (int fileOrdinal = 0; fileOrdinal < NUM_DELETE_FILES_PER_PARTITION; fileOrdinal++) {
String replicaFileName = UUID.randomUUID() + "-replica.parquet";
DeleteFile replicaDeleteFile =
FileMetadata.deleteFileBuilder(spec)
.copy(deleteFile)
.withPath(locations.newDataLocation(spec, deleteFile.partition(), replicaFileName))
.build();
rowDelta.addDeletes(replicaDeleteFile);
DeleteFile deleteFile = FileGenerationUtil.generatePositionDeleteFile(table, partition);
rowDelta.addDeletes(deleteFile);
}

rowDelta.commit();
}
}

private void appendAsFile(Dataset<Row> df) throws NoSuchTableException {
df.coalesce(1).writeTo(TABLE_NAME).append();
}

private Dataset<Row> randomDataDF(Schema schema, int numRows) {
Iterable<InternalRow> rows = RandomData.generateSpark(schema, numRows, 0);
JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<InternalRow> rowRDD = context.parallelize(Lists.newArrayList(rows));
StructType rowSparkType = SparkSchemaUtil.convert(schema);
return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType, false);
}

private void setupSpark() {
this.spark =
SparkSession.builder()
Expand Down

0 comments on commit d71faa6

Please sign in to comment.