Skip to content

Commit

Permalink
Spark 3.5: Adapt PlanningBenchmark for DVs (#11531)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Nov 15, 2024
1 parent 315e154 commit 7e4fd1b
Showing 1 changed file with 57 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
Expand Down Expand Up @@ -107,6 +108,9 @@ public class PlanningBenchmark {
private SparkSession spark;
private Table table;

@Param({"partition", "file", "dv"})
private String type;

@Setup
public void setupBenchmark() throws NoSuchTableException, ParseException {
setupSpark();
Expand Down Expand Up @@ -266,7 +270,7 @@ private void initTable() throws NoSuchTableException, ParseException {
TableProperties.DELETE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName(),
TableProperties.FORMAT_VERSION,
2);
type.equals("dv") ? 3 : 2);

this.table = Spark3Util.loadIcebergTable(spark, TABLE_NAME);
}
Expand All @@ -276,6 +280,16 @@ private void dropTable() {
}

private void initDataAndDeletes() {
if (type.equals("partition")) {
initDataAndPartitionScopedDeletes();
} else if (type.equals("file")) {
initDataAndFileScopedDeletes();
} else {
initDataAndDVs();
}
}

private void initDataAndPartitionScopedDeletes() {
for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS; partitionOrdinal++) {
StructLike partition = TestHelpers.Row.of(partitionOrdinal);

Expand All @@ -299,6 +313,48 @@ private void initDataAndDeletes() {
}
}

private void initDataAndFileScopedDeletes() {
for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS; partitionOrdinal++) {
StructLike partition = TestHelpers.Row.of(partitionOrdinal);

RowDelta rowDelta = table.newRowDelta();

for (int fileOrdinal = 0; fileOrdinal < NUM_DATA_FILES_PER_PARTITION; fileOrdinal++) {
DataFile dataFile = generateDataFile(partition, Integer.MIN_VALUE, Integer.MIN_VALUE);
DeleteFile deleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
rowDelta.addRows(dataFile);
rowDelta.addDeletes(deleteFile);
}

// add one data file that would match the sort key predicate
DataFile sortKeyDataFile = generateDataFile(partition, SORT_KEY_VALUE, SORT_KEY_VALUE);
rowDelta.addRows(sortKeyDataFile);

rowDelta.commit();
}
}

private void initDataAndDVs() {
for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS; partitionOrdinal++) {
StructLike partition = TestHelpers.Row.of(partitionOrdinal);

RowDelta rowDelta = table.newRowDelta();

for (int fileOrdinal = 0; fileOrdinal < NUM_DATA_FILES_PER_PARTITION; fileOrdinal++) {
DataFile dataFile = generateDataFile(partition, Integer.MIN_VALUE, Integer.MIN_VALUE);
DeleteFile dv = FileGenerationUtil.generateDV(table, dataFile);
rowDelta.addRows(dataFile);
rowDelta.addDeletes(dv);
}

// add one data file that would match the sort key predicate
DataFile sortKeyDataFile = generateDataFile(partition, SORT_KEY_VALUE, SORT_KEY_VALUE);
rowDelta.addRows(sortKeyDataFile);

rowDelta.commit();
}
}

private DataFile generateDataFile(StructLike partition, int sortKeyMin, int sortKeyMax) {
int sortKeyFieldId = table.schema().findField(SORT_KEY_COLUMN).fieldId();
ByteBuffer lower = Conversions.toByteBuffer(Types.IntegerType.get(), sortKeyMin);
Expand Down

0 comments on commit 7e4fd1b

Please sign in to comment.