diff --git a/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java b/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java new file mode 100644 index 000000000000..98a6eafaf8f6 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; + +public class FileGenerationUtil { + + private FileGenerationUtil() {} + + public static DataFile generateDataFile(Table table, StructLike partition) { + Schema schema = table.schema(); + PartitionSpec spec = table.spec(); + LocationProvider locations = table.locationProvider(); + String path = locations.newDataLocation(spec, partition, generateFileName()); + long fileSize = generateFileSize(); + Metrics metrics = generateRandomMetrics(schema); + return DataFiles.builder(spec) + .withPath(path) + .withPartition(partition) + .withFileSizeInBytes(fileSize) + .withFormat(FileFormat.PARQUET) + .withMetrics(metrics) + .build(); + } + + public static DeleteFile generatePositionDeleteFile(Table table, StructLike partition) { + PartitionSpec spec = table.spec(); + LocationProvider locations = table.locationProvider(); + String path = locations.newDataLocation(spec, partition, generateFileName()); + long fileSize = generateFileSize(); + Metrics metrics = generatePositionDeleteMetrics(); + return FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath(path) + .withPartition(partition) + .withFileSizeInBytes(fileSize) + .withFormat(FileFormat.PARQUET) + .withMetrics(metrics) + .build(); + } + + public static DeleteFile generatePositionDeleteFile(Table table, DataFile dataFile) { + PartitionSpec spec = table.spec(); + StructLike partition = dataFile.partition(); + LocationProvider locations = table.locationProvider(); + String path = locations.newDataLocation(spec, partition, generateFileName()); + long fileSize = generateFileSize(); + Metrics metrics = generatePositionDeleteMetrics(dataFile); + return FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath(path) + .withPartition(partition) + .withFileSizeInBytes(fileSize) + .withFormat(FileFormat.PARQUET) + .withMetrics(metrics) + .build(); + } + + // mimics the behavior of OutputFileFactory + public static String generateFileName() { + int partitionId = random().nextInt(100_000); + int taskId = random().nextInt(100); + UUID operationId = UUID.randomUUID(); + int fileCount = random().nextInt(1_000); + return String.format("%d-%d-%s-%d.parquet", partitionId, taskId, operationId, fileCount); + } + + public static Metrics generateRandomMetrics(Schema schema) { + long rowCount = generateRowCount(); + Map columnSizes = Maps.newHashMap(); + Map valueCounts = Maps.newHashMap(); + Map nullValueCounts = Maps.newHashMap(); + Map nanValueCounts = Maps.newHashMap(); + Map lowerBounds = Maps.newHashMap(); + Map upperBounds = Maps.newHashMap(); + + for (Types.NestedField column : schema.columns()) { + int fieldId = column.fieldId(); + columnSizes.put(fieldId, generateColumnSize()); + valueCounts.put(fieldId, generateValueCount()); + nullValueCounts.put(fieldId, (long) random().nextInt(5)); + nanValueCounts.put(fieldId, (long) random().nextInt(5)); + byte[] lower = new byte[16]; + random().nextBytes(lower); + lowerBounds.put(fieldId, ByteBuffer.wrap(lower)); + byte[] upper = new byte[16]; + random().nextBytes(upper); + upperBounds.put(fieldId, ByteBuffer.wrap(upper)); + } + + return new Metrics( + rowCount, + columnSizes, + valueCounts, + nullValueCounts, + nanValueCounts, + lowerBounds, + upperBounds); + } + + private static Metrics generatePositionDeleteMetrics(DataFile dataFile) { + long rowCount = generateRowCount(); + Map columnSizes = Maps.newHashMap(); + Map lowerBounds = Maps.newHashMap(); + Map upperBounds = Maps.newHashMap(); + + for (Types.NestedField column : DeleteSchemaUtil.pathPosSchema().columns()) { + int fieldId = column.fieldId(); + columnSizes.put(fieldId, generateColumnSize()); + if (fieldId == MetadataColumns.DELETE_FILE_PATH.fieldId()) { + ByteBuffer bound = Conversions.toByteBuffer(Types.StringType.get(), dataFile.path()); + lowerBounds.put(fieldId, bound); + upperBounds.put(fieldId, bound); + } + } + + return new Metrics( + rowCount, + columnSizes, + null /* no value counts */, + null /* no NULL counts */, + null /* no NaN counts */, + lowerBounds, + upperBounds); + } + + private static Metrics generatePositionDeleteMetrics() { + long rowCount = generateRowCount(); + Map columnSizes = Maps.newHashMap(); + + for (Types.NestedField column : DeleteSchemaUtil.pathPosSchema().columns()) { + int fieldId = column.fieldId(); + columnSizes.put(fieldId, generateColumnSize()); + } + + return new Metrics( + rowCount, + columnSizes, + null /* no value counts */, + null /* no NULL counts */, + null /* no NaN counts */, + null /* no lower bounds */, + null /* no upper bounds */); + } + + private static long generateRowCount() { + return 100_000L + random().nextInt(1000); + } + + private static long generateColumnSize() { + return 1_000_000L + random().nextInt(100_000); + } + + private static long generateValueCount() { + return 100_000L + random().nextInt(100); + } + + private static long generateFileSize() { + return random().nextInt(50_000); + } + + private static Random random() { + return ThreadLocalRandom.current(); + } +} diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle index 58edd4145602..2c58281904c9 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -158,6 +158,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer testImplementation project(path: ':iceberg-parquet') testImplementation project(path: ':iceberg-hive-metastore') testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') + testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') testImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') diff --git a/spark/v3.5/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java b/spark/v3.5/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java index aa310d8f7fc1..73bbbd85235d 100644 --- a/spark/v3.5/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java +++ b/spark/v3.5/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg; -import static org.apache.spark.sql.functions.lit; - import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.FormatString; import java.io.IOException; @@ -29,25 +27,15 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.Spark3Util; -import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.SparkSessionCatalog; -import org.apache.iceberg.spark.data.RandomData; import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions; import org.apache.iceberg.util.ThreadPools; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.parser.ParseException; -import org.apache.spark.sql.types.StructType; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -83,10 +71,8 @@ public class DeleteFileIndexBenchmark { private static final String PARTITION_COLUMN = "ss_ticket_number"; private static final int NUM_PARTITIONS = 50; - private static final int NUM_REAL_DATA_FILES_PER_PARTITION = 25; - private static final int NUM_REPLICA_DATA_FILES_PER_PARTITION = 50_000; + private static final int NUM_DATA_FILES_PER_PARTITION = 50_000; private static final int NUM_DELETE_FILES_PER_PARTITION = 100; - private static final int NUM_ROWS_PER_DATA_FILE = 500; private final Configuration hadoopConf = new Configuration(); private SparkSession spark; @@ -148,85 +134,26 @@ private DeleteFileIndex buildDeletes() { .build(); } - private DataFile loadAddedDataFile() { - table.refresh(); - - Iterable addedDataFiles = table.currentSnapshot().addedDataFiles(table.io()); - return Iterables.getOnlyElement(addedDataFiles); - } - - private DeleteFile loadAddedDeleteFile() { - table.refresh(); - - Iterable addedDeleteFiles = table.currentSnapshot().addedDeleteFiles(table.io()); - return Iterables.getOnlyElement(addedDeleteFiles); - } - - private void initDataAndDeletes() throws NoSuchTableException { - Schema schema = table.schema(); - PartitionSpec spec = table.spec(); - LocationProvider locations = table.locationProvider(); - + private void initDataAndDeletes() { for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS; partitionOrdinal++) { - Dataset inputDF = - randomDataDF(schema, NUM_ROWS_PER_DATA_FILE) - .drop(PARTITION_COLUMN) - .withColumn(PARTITION_COLUMN, lit(partitionOrdinal)); - - for (int fileOrdinal = 0; fileOrdinal < NUM_REAL_DATA_FILES_PER_PARTITION; fileOrdinal++) { - appendAsFile(inputDF); - } + StructLike partition = TestHelpers.Row.of(partitionOrdinal); - DataFile dataFile = loadAddedDataFile(); - - sql( - "DELETE FROM %s WHERE ss_item_sk IS NULL AND %s = %d", - TABLE_NAME, PARTITION_COLUMN, partitionOrdinal); - - DeleteFile deleteFile = loadAddedDeleteFile(); - - AppendFiles append = table.newFastAppend(); + RowDelta rowDelta = table.newRowDelta(); - for (int fileOrdinal = 0; fileOrdinal < NUM_REPLICA_DATA_FILES_PER_PARTITION; fileOrdinal++) { - String replicaFileName = UUID.randomUUID() + "-replica.parquet"; - DataFile replicaDataFile = - DataFiles.builder(spec) - .copy(dataFile) - .withPath(locations.newDataLocation(spec, dataFile.partition(), replicaFileName)) - .build(); - append.appendFile(replicaDataFile); + for (int fileOrdinal = 0; fileOrdinal < NUM_DATA_FILES_PER_PARTITION; fileOrdinal++) { + DataFile dataFile = FileGenerationUtil.generateDataFile(table, partition); + rowDelta.addRows(dataFile); } - append.commit(); - - RowDelta rowDelta = table.newRowDelta(); - for (int fileOrdinal = 0; fileOrdinal < NUM_DELETE_FILES_PER_PARTITION; fileOrdinal++) { - String replicaFileName = UUID.randomUUID() + "-replica.parquet"; - DeleteFile replicaDeleteFile = - FileMetadata.deleteFileBuilder(spec) - .copy(deleteFile) - .withPath(locations.newDataLocation(spec, deleteFile.partition(), replicaFileName)) - .build(); - rowDelta.addDeletes(replicaDeleteFile); + DeleteFile deleteFile = FileGenerationUtil.generatePositionDeleteFile(table, partition); + rowDelta.addDeletes(deleteFile); } rowDelta.commit(); } } - private void appendAsFile(Dataset df) throws NoSuchTableException { - df.coalesce(1).writeTo(TABLE_NAME).append(); - } - - private Dataset randomDataDF(Schema schema, int numRows) { - Iterable rows = RandomData.generateSpark(schema, numRows, 0); - JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD rowRDD = context.parallelize(Lists.newArrayList(rows)); - StructType rowSparkType = SparkSchemaUtil.convert(schema); - return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType, false); - } - private void setupSpark() { this.spark = SparkSession.builder()