Skip to content

Commit

Permalink
Optimize merging snapshot producer
Browse files Browse the repository at this point in the history
Run benchmark
  • Loading branch information
amogh-jahagirdar committed Sep 20, 2024
1 parent 79fd977 commit b37854d
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.iceberg.types.Types.NestedField.required;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.hadoop.HadoopTables;
Expand Down Expand Up @@ -73,11 +74,14 @@ public class ReplaceDeleteFilesBenchmark {
private List<DeleteFile> deleteFiles;
private List<DeleteFile> pendingDeleteFiles;

@Param({"50000", "100000", "500000", "1000000", "2500000"})
@Param({"50000", "100000", "500000", "1000000", "2000000"})
private int numFiles;

@Param({"5", "25", "50", "100"})
private int percentDeleteFilesReplaced;

@Setup
public void setupBenchmark() {
public void setupBenchmark() throws IOException {
initTable();
initFiles();
}
Expand All @@ -104,27 +108,39 @@ private void dropTable() {
TABLES.dropTable(TABLE_IDENT);
}

private void initFiles() {
private void initFiles() throws IOException {
List<DeleteFile> generatedDeleteFiles = Lists.newArrayListWithExpectedSize(numFiles);
List<DeleteFile> generatedPendingDeleteFiles = Lists.newArrayListWithExpectedSize(numFiles);

RowDelta rowDelta = table.newRowDelta();
int filesToDelete = (int) Math.ceil(numFiles * (percentDeleteFilesReplaced / 100.0));

for (int ordinal = 0; ordinal < numFiles; ordinal++) {
DataFile dataFile = FileGenerationUtil.generateDataFile(table, null);
rowDelta.addRows(dataFile);

DeleteFile deleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
rowDelta.addDeletes(deleteFile);
generatedDeleteFiles.add(deleteFile);

DeleteFile pendingDeleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
generatedPendingDeleteFiles.add(pendingDeleteFile);
if (filesToDelete > 0) {
DeleteFile deleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
rowDelta.addDeletes(deleteFile);
generatedDeleteFiles.add(deleteFile);
DeleteFile pendingDeleteFile =
FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
generatedPendingDeleteFiles.add(pendingDeleteFile);
filesToDelete--;
}
}

rowDelta.commit();

this.deleteFiles = generatedDeleteFiles;
List<DeleteFile> deleteFilesReadFromManifests = Lists.newArrayList();
List<ManifestFile> deleteManifests = table.currentSnapshot().deleteManifests(table.io());
for (ManifestFile deleteManifest : deleteManifests) {
try (ManifestReader<DeleteFile> manifestReader =
ManifestFiles.readDeleteManifest(deleteManifest, table.io(), table.specs())) {
manifestReader.iterator().forEachRemaining(deleteFilesReadFromManifests::add);
}
}

this.deleteFiles = deleteFilesReadFromManifests;
this.pendingDeleteFiles = generatedPendingDeleteFiles;
}
}
20 changes: 17 additions & 3 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public String partition() {

// cache filtered manifests to avoid extra work when commits fail.
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
private final Set<String> manifestDeletedPositions = Sets.newConcurrentHashSet();

// tracking where files were deleted to validate retries quickly
private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
Expand Down Expand Up @@ -153,6 +154,10 @@ void caseSensitive(boolean newCaseSensitive) {
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();
if (file.manifestLocation() != null) {
manifestDeletedPositions.add(file.manifestLocation());
}

deletePaths.add(file.path());
deleteFilePartitions.add(file.specId(), file.partition());
}
Expand All @@ -166,7 +171,8 @@ void delete(CharSequence path) {
}

boolean containsDeletes() {
return !deletePaths.isEmpty()
return !manifestDeletedPositions.isEmpty()
|| !deletePaths.isEmpty()
|| deleteExpression != Expressions.alwaysFalse()
|| !dropPartitions.isEmpty();
}
Expand Down Expand Up @@ -308,11 +314,15 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
PartitionSpec spec = reader.spec();
PartitionAndMetricsEvaluator evaluator =
new PartitionAndMetricsEvaluator(tableSchema, spec, deleteExpression);
boolean hasDeletedFiles = manifestDeletedPositions.contains(manifest.path());
if (hasDeletedFiles) {
return filterManifestWithDeletedFiles(evaluator, manifest, reader);
}

// this assumes that the manifest doesn't have files to remove and streams through the
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
boolean hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
if (!hasDeletedFiles) {
filteredManifests.put(manifest, manifest);
return manifest;
Expand All @@ -326,6 +336,10 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
}

private boolean canContainDeletedFiles(ManifestFile manifest) {
if (manifestDeletedPositions.contains(manifest.path())) {
return true;
}

boolean canContainExpressionDeletes;
if (deleteExpression != null && deleteExpression != Expressions.alwaysFalse()) {
ManifestEvaluator manifestEvaluator =
Expand Down Expand Up @@ -403,7 +417,7 @@ private boolean manifestHasDeletedFiles(
return false;
}

@SuppressWarnings({"CollectionUndefinedEquality", "checkstyle:CyclomaticComplexity"})
@SuppressWarnings("{CollectionUndefinedEquality, checkstyle:CyclomaticComplexity}")
private ManifestFile filterManifestWithDeletedFiles(
PartitionAndMetricsEvaluator evaluator, ManifestFile manifest, ManifestReader<F> reader) {
boolean isDelete = reader.isDeleteManifestReader();
Expand Down
53 changes: 53 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1456,6 +1457,58 @@ public void testRewrittenDeleteFiles() {
statuses(Status.DELETED));
}

@TestTemplate
public void testRewrittenDeleteFilesReadFromManifest() throws IOException {
DataFile dataFile = newDataFile("data_bucket=0");
DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0");
RowDelta baseRowDelta = table.newRowDelta().addRows(dataFile).addDeletes(deleteFile);
Snapshot baseSnapshot = commit(table, baseRowDelta, branch);
assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE);
List<ManifestFile> deleteManifests = baseSnapshot.deleteManifests(table.io());
try (ManifestReader<DeleteFile> deleteReader =
ManifestFiles.readDeleteManifest(deleteManifests.get(0), table.io(), table.specs())) {
deleteFile = deleteReader.iterator().next();
}

assertThat(deleteFile.manifestLocation()).isEqualTo(deleteManifests.get(0).path());
DeleteFile newDeleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0");
RowDelta rowDelta =
table
.newRowDelta()
.removeDeletes(deleteFile)
.addDeletes(newDeleteFile)
.validateFromSnapshot(baseSnapshot.snapshotId());
Snapshot snapshot = commit(table, rowDelta, branch);
assertThat(snapshot.operation()).isEqualTo(DataOperations.DELETE);

List<ManifestFile> dataManifests = snapshot.dataManifests(table.io());
assertThat(dataManifests).hasSize(1);
validateManifest(
dataManifests.get(0),
dataSeqs(1L),
fileSeqs(1L),
ids(baseSnapshot.snapshotId()),
files(dataFile),
statuses(Status.ADDED));

deleteManifests = snapshot.deleteManifests(table.io());
assertThat(deleteManifests).hasSize(2);
validateDeleteManifest(
deleteManifests.get(0),
dataSeqs(2L),
fileSeqs(2L),
ids(snapshot.snapshotId()),
files(newDeleteFile),
statuses(Status.ADDED));
validateDeleteManifest(
deleteManifests.get(1),
dataSeqs(1L),
fileSeqs(1L),
ids(snapshot.snapshotId()),
files(deleteFile),
statuses(Status.DELETED));
}

@TestTemplate
public void testConcurrentDeletesRewriteSameDeleteFile() {
DataFile dataFile = newDataFile("data_bucket=0");
Expand Down

0 comments on commit b37854d

Please sign in to comment.