Skip to content

Commit

Permalink
Core: Optimize merging snapshot producer to use referenced manifest t…
Browse files Browse the repository at this point in the history
…o determine if a given manifest needs to be rewritten or not
  • Loading branch information
amogh-jahagirdar committed Sep 28, 2024
1 parent c0d73f4 commit efe7ab7
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import static org.apache.iceberg.types.Types.NestedField.required;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -70,14 +73,17 @@ public class ReplaceDeleteFilesBenchmark {
private static final HadoopTables TABLES = new HadoopTables();

private Table table;
private List<DeleteFile> deleteFiles;
private List<DeleteFile> deleteFilesToReplace;
private List<DeleteFile> pendingDeleteFiles;

@Param({"50000", "100000", "500000", "1000000", "2500000"})
@Param({"50000", "100000", "500000", "1000000", "2000000"})
private int numFiles;

@Param({"5", "25", "50", "100"})
private int percentDeleteFilesReplaced;

@Setup
public void setupBenchmark() {
public void setupBenchmark() throws IOException {
initTable();
initFiles();
}
Expand All @@ -91,7 +97,7 @@ public void tearDownBenchmark() {
@Threads(1)
public void replaceDeleteFiles() {
RowDelta rowDelta = table.newRowDelta();
deleteFiles.forEach(rowDelta::removeDeletes);
deleteFilesToReplace.forEach(rowDelta::removeDeletes);
pendingDeleteFiles.forEach(rowDelta::addDeletes);
rowDelta.commit();
}
Expand All @@ -104,27 +110,48 @@ private void dropTable() {
TABLES.dropTable(TABLE_IDENT);
}

private void initFiles() {
private void initFiles() throws IOException {
List<DeleteFile> generatedDeleteFiles = Lists.newArrayListWithExpectedSize(numFiles);
List<DeleteFile> generatedPendingDeleteFiles = Lists.newArrayListWithExpectedSize(numFiles);

RowDelta rowDelta = table.newRowDelta();

int numDeleteFilesToReplace = (int) Math.ceil(numFiles * (percentDeleteFilesReplaced / 100.0));
Map<String, DeleteFile> filesToReplace =
Maps.newHashMapWithExpectedSize(numDeleteFilesToReplace);
for (int ordinal = 0; ordinal < numFiles; ordinal++) {
DataFile dataFile = FileGenerationUtil.generateDataFile(table, null);
rowDelta.addRows(dataFile);

DeleteFile deleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
rowDelta.addDeletes(deleteFile);
generatedDeleteFiles.add(deleteFile);

DeleteFile pendingDeleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
generatedPendingDeleteFiles.add(pendingDeleteFile);
if (numDeleteFilesToReplace > 0) {
filesToReplace.put(deleteFile.location(), deleteFile);
DeleteFile pendingDeleteFile =
FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
generatedPendingDeleteFiles.add(pendingDeleteFile);
numDeleteFilesToReplace--;
}
}

rowDelta.commit();

this.deleteFiles = generatedDeleteFiles;
List<DeleteFile> deleteFilesReadFromManifests = Lists.newArrayList();
List<ManifestFile> deleteManifests = table.currentSnapshot().deleteManifests(table.io());
for (ManifestFile deleteManifest : deleteManifests) {
try (ManifestReader<DeleteFile> manifestReader =
ManifestFiles.readDeleteManifest(deleteManifest, table.io(), table.specs())) {
manifestReader
.iterator()
.forEachRemaining(
file -> {
if (filesToReplace.containsKey(file.location())) {
deleteFilesReadFromManifests.add(file);
}
});
}
}

this.pendingDeleteFiles = generatedPendingDeleteFiles;
this.deleteFilesToReplace = deleteFilesReadFromManifests;
}
}
30 changes: 27 additions & 3 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ public String partition() {
private boolean failMissingDeletePaths = false;
private int duplicateDeleteCount = 0;
private boolean caseSensitive = true;
private boolean allDeletesReferenceManifests = true;

// cache filtered manifests to avoid extra work when commits fail.
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
private final Set<String> manifestsWithDeletedFiles = Sets.newConcurrentHashSet();

// tracking where files were deleted to validate retries quickly
private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
Expand Down Expand Up @@ -120,13 +122,15 @@ protected void deleteByRowFilter(Expression expr) {
Preconditions.checkNotNull(expr, "Cannot delete files using filter: null");
invalidateFilteredCache();
this.deleteExpression = Expressions.or(deleteExpression, expr);
this.allDeletesReferenceManifests = false;
}

/** Add a partition tuple to drop from the table during the delete phase. */
protected void dropPartition(int specId, StructLike partition) {
Preconditions.checkNotNull(partition, "Cannot delete files in invalid partition: null");
invalidateFilteredCache();
dropPartitions.add(specId, partition);
this.allDeletesReferenceManifests = false;
}

/**
Expand All @@ -153,7 +157,13 @@ void caseSensitive(boolean newCaseSensitive) {
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();
deletePaths.add(file.path());
if (file.manifestLocation() == null) {
this.allDeletesReferenceManifests = false;
} else {
manifestsWithDeletedFiles.add(file.manifestLocation());
}

deletePaths.add(file.location());
deleteFilePartitions.add(file.specId(), file.partition());
}

Expand All @@ -162,11 +172,13 @@ void delete(CharSequence path) {
Preconditions.checkNotNull(path, "Cannot delete file path: null");
invalidateFilteredCache();
this.hasPathOnlyDeletes = true;
this.allDeletesReferenceManifests = false;
deletePaths.add(path);
}

boolean containsDeletes() {
return !deletePaths.isEmpty()
return !manifestsWithDeletedFiles.isEmpty()
|| !deletePaths.isEmpty()
|| deleteExpression != Expressions.alwaysFalse()
|| !dropPartitions.isEmpty();
}
Expand Down Expand Up @@ -308,11 +320,15 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
PartitionSpec spec = reader.spec();
PartitionAndMetricsEvaluator evaluator =
new PartitionAndMetricsEvaluator(tableSchema, spec, deleteExpression);
boolean hasDeletedFiles = manifestsWithDeletedFiles.contains(manifest.path());
if (hasDeletedFiles) {
return filterManifestWithDeletedFiles(evaluator, manifest, reader);
}

// this assumes that the manifest doesn't have files to remove and streams through the
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
boolean hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
if (!hasDeletedFiles) {
filteredManifests.put(manifest, manifest);
return manifest;
Expand All @@ -325,7 +341,15 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
}
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private boolean canContainDeletedFiles(ManifestFile manifest) {
boolean manifestReferencedInDelete = manifestsWithDeletedFiles.contains(manifest.path());
if (manifest.content() == ManifestContent.DELETES && allDeletesReferenceManifests) {
return manifestReferencedInDelete || manifest.minSequenceNumber() < minSequenceNumber;
} else if (manifestReferencedInDelete) {
return true;
}

boolean canContainExpressionDeletes;
if (deleteExpression != null && deleteExpression != Expressions.alwaysFalse()) {
ManifestEvaluator manifestEvaluator =
Expand Down
53 changes: 53 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1456,6 +1457,58 @@ public void testRewrittenDeleteFiles() {
statuses(Status.DELETED));
}

@TestTemplate
public void testRewrittenDeleteFilesReadFromManifest() throws IOException {
DataFile dataFile = newDataFile("data_bucket=0");
DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0");
RowDelta baseRowDelta = table.newRowDelta().addRows(dataFile).addDeletes(deleteFile);
Snapshot baseSnapshot = commit(table, baseRowDelta, branch);
assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE);
List<ManifestFile> deleteManifests = baseSnapshot.deleteManifests(table.io());
try (ManifestReader<DeleteFile> deleteReader =
ManifestFiles.readDeleteManifest(deleteManifests.get(0), table.io(), table.specs())) {
deleteFile = deleteReader.iterator().next();
}

assertThat(deleteFile.manifestLocation()).isEqualTo(deleteManifests.get(0).path());
DeleteFile newDeleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0");
RowDelta rowDelta =
table
.newRowDelta()
.removeDeletes(deleteFile)
.addDeletes(newDeleteFile)
.validateFromSnapshot(baseSnapshot.snapshotId());
Snapshot snapshot = commit(table, rowDelta, branch);
assertThat(snapshot.operation()).isEqualTo(DataOperations.DELETE);

List<ManifestFile> dataManifests = snapshot.dataManifests(table.io());
assertThat(dataManifests).hasSize(1);
validateManifest(
dataManifests.get(0),
dataSeqs(1L),
fileSeqs(1L),
ids(baseSnapshot.snapshotId()),
files(dataFile),
statuses(Status.ADDED));

deleteManifests = snapshot.deleteManifests(table.io());
assertThat(deleteManifests).hasSize(2);
validateDeleteManifest(
deleteManifests.get(0),
dataSeqs(2L),
fileSeqs(2L),
ids(snapshot.snapshotId()),
files(newDeleteFile),
statuses(Status.ADDED));
validateDeleteManifest(
deleteManifests.get(1),
dataSeqs(1L),
fileSeqs(1L),
ids(snapshot.snapshotId()),
files(deleteFile),
statuses(Status.DELETED));
}

@TestTemplate
public void testConcurrentDeletesRewriteSameDeleteFile() {
DataFile dataFile = newDataFile("data_bucket=0");
Expand Down

0 comments on commit efe7ab7

Please sign in to comment.