Skip to content

Commit

Permalink
Core: Optimize merging snapshot producer to use referenced manifest t…
Browse files Browse the repository at this point in the history
…o determine if a given manifest needs to be rewritten or not
  • Loading branch information
amogh-jahagirdar committed Oct 25, 2024
1 parent ec6c919 commit 6e71f7f
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 51 deletions.
53 changes: 29 additions & 24 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public String partition() {
private int duplicateDeleteCount = 0;
private boolean caseSensitive = true;
private boolean allDeletesReferenceManifests = true;
private boolean trustReferencedManifests = false;

// cache filtered manifests to avoid extra work when commits fail.
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
Expand Down Expand Up @@ -180,8 +179,7 @@ void delete(CharSequence path) {
}

boolean containsDeletes() {
return !manifestsReferencedForDeletes.isEmpty()
|| !deletePaths.isEmpty()
return !deletePaths.isEmpty()
|| !deleteFiles.isEmpty()
|| deleteExpression != Expressions.alwaysFalse()
|| !dropPartitions.isEmpty();
Expand All @@ -200,12 +198,15 @@ List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manife
return ImmutableList.of();
}

// The current set of referenced manifests can be trusted if it is a subset of the manifests
// being filtered. If a single referenced manifest is not in the set of manifests being filtered
// this indicates that the referenced manifests are stale and cannot be trusted.
// Use the current set of referenced manifests as a source of truth when it's a subset of all
// manifests and all removals which were performed reference manifests.
// If a manifest is not in the trusted referenced set, filtering can be skipped
// assuming that the manifest does not have any live entries or aged out deletes
Set<String> manifestLocations =
manifests.stream().map(ManifestFile::path).collect(Collectors.toSet());
trustReferencedManifests = manifestLocations.containsAll(manifestsReferencedForDeletes);
boolean trustReferencedManifests =
allDeletesReferenceManifests
&& manifestLocations.containsAll(manifestsReferencedForDeletes);

ManifestFile[] filtered = new ManifestFile[manifests.size()];
// open all of the manifest files in parallel, use index to avoid reordering
Expand All @@ -215,7 +216,8 @@ List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manife
.executeWith(workerPoolSupplier.get())
.run(
index -> {
ManifestFile manifest = filterManifest(tableSchema, manifests.get(index));
ManifestFile manifest =
filterManifest(tableSchema, manifests.get(index), trustReferencedManifests);
filtered[index] = manifest;
});

Expand Down Expand Up @@ -329,12 +331,22 @@ private void invalidateFilteredCache() {
/**
* @return a ManifestReader that is a filtered version of the input manifest.
*/
private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
private ManifestFile filterManifest(
Schema tableSchema, ManifestFile manifest, boolean trustReferencedManifests) {
ManifestFile cached = filteredManifests.get(manifest);
if (cached != null) {
return cached;
}

boolean manifestIsReferenced = manifestsReferencedForDeletes.contains(manifest.path());

// The manifest does not need to be rewritten if the referenced set can be trusted and the
// manifest is not referenced
if (trustReferencedManifests && !manifestIsReferenced) {
filteredManifests.put(manifest, manifest);
return manifest;
}

boolean hasLiveFiles = manifest.hasAddedFiles() || manifest.hasExistingFiles();
if (!hasLiveFiles || !canContainDeletedFiles(manifest)) {
filteredManifests.put(manifest, manifest);
Expand All @@ -345,15 +357,14 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
PartitionSpec spec = reader.spec();
PartitionAndMetricsEvaluator evaluator =
new PartitionAndMetricsEvaluator(tableSchema, spec, deleteExpression);
boolean hasDeletedFiles = manifestsReferencedForDeletes.contains(manifest.path());
if (hasDeletedFiles) {
if (manifestIsReferenced) {
return filterManifestWithDeletedFiles(evaluator, manifest, reader);
}

// this assumes that the manifest doesn't have files to remove and streams through the
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
boolean hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
if (!hasDeletedFiles) {
filteredManifests.put(manifest, manifest);
return manifest;
Expand All @@ -367,15 +378,9 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
}

private boolean canContainDeletedFiles(ManifestFile manifest) {
return canContainDropBySeq(manifest)
return canContainDroppedFiles(manifest)
|| canContainExpressionDeletes(manifest)
|| canContainDroppedPartitions(manifest)
|| canContainDroppedFiles(manifest);
}

private boolean canContainDropBySeq(ManifestFile manifest) {
return manifest.content() == ManifestContent.DELETES
&& manifest.minSequenceNumber() < minSequenceNumber;
|| canContainDroppedPartitions(manifest);
}

private boolean canContainExpressionDeletes(ManifestFile manifest) {
Expand All @@ -398,13 +403,13 @@ private boolean canContainDroppedPartitions(ManifestFile manifest) {
}

private boolean canContainDroppedFiles(ManifestFile manifest) {
if (manifestsReferencedForDeletes.contains(manifest.path()) || !deletePaths.isEmpty()) {
if (!deletePaths.isEmpty()) {
return true;
} else if (allDeletesReferenceManifests && trustReferencedManifests) {
return false;
} else {
} else if (!deleteFiles.isEmpty()) {
return ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
}

return false;
}

@SuppressWarnings({"CollectionUndefinedEquality", "checkstyle:CyclomaticComplexity"})
Expand Down
27 changes: 15 additions & 12 deletions core/src/test/java/org/apache/iceberg/TestRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,10 @@ public void testDeleteByExpressionWithDeleteFile() {

@TestTemplate
public void testDeleteDataFileWithDeleteFile() {
commit(table, table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A_DELETES), branch);
commit(
table,
table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A_DELETES).addDeletes(FILE_B_DELETES),
branch);

long deltaSnapshotId = latestSnapshot(table, branch).snapshotId();
assertThat(latestSnapshot(table, branch).sequenceNumber()).isEqualTo(1);
Expand All @@ -598,18 +601,18 @@ public void testDeleteDataFileWithDeleteFile() {
assertThat(deleteSnap.deleteManifests(table.io())).hasSize(1);
validateDeleteManifest(
deleteSnap.deleteManifests(table.io()).get(0),
dataSeqs(1L),
fileSeqs(1L),
ids(deltaSnapshotId),
files(FILE_A_DELETES),
statuses(Status.ADDED));
dataSeqs(1L, 1L),
fileSeqs(1L, 1L),
ids(deltaSnapshotId, deltaSnapshotId),
files(FILE_A_DELETES, FILE_B_DELETES),
statuses(Status.ADDED, Status.ADDED));

// the manifest that removed FILE_A will be dropped next commit, causing the min sequence number
// of all data files
// to be 2, the largest known sequence number. this will cause FILE_A_DELETES to be removed
// because it is too old
// to apply to any data files.
commit(table, table.newDelete().deleteFile("no-such-file"), branch);
commit(table, table.newRowDelta().removeDeletes(FILE_B_DELETES), branch);

Snapshot nextSnap = latestSnapshot(table, branch);
assertThat(nextSnap.sequenceNumber()).isEqualTo(3);
Expand All @@ -619,11 +622,11 @@ public void testDeleteDataFileWithDeleteFile() {
assertThat(nextSnap.deleteManifests(table.io())).hasSize(1);
validateDeleteManifest(
nextSnap.deleteManifests(table.io()).get(0),
dataSeqs(1L),
fileSeqs(1L),
ids(nextSnap.snapshotId()),
files(FILE_A_DELETES),
statuses(Status.DELETED));
dataSeqs(1L, 1L),
fileSeqs(1L, 1L),
ids(nextSnap.snapshotId(), nextSnap.snapshotId()),
files(FILE_A_DELETES, FILE_B_DELETES),
statuses(Status.DELETED, Status.DELETED));
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.PartitionStatisticsFile;
Expand Down Expand Up @@ -294,8 +295,8 @@ public void testExpireDeleteFiles() throws Exception {
"Should have 1 delete manifest", 1, TestHelpers.deleteManifests(table).size());
Assert.assertEquals("Should have 1 delete file", 1, TestHelpers.deleteFiles(table).size());
Path deleteManifestPath = new Path(TestHelpers.deleteManifests(table).iterator().next().path());
Path deleteFilePath =
new Path(String.valueOf(TestHelpers.deleteFiles(table).iterator().next().path()));
DeleteFile deleteFile = TestHelpers.deleteFiles(table).iterator().next();
Path deleteFilePath = new Path(String.valueOf(deleteFile.path()));

sql(
"CALL %s.system.rewrite_data_files("
Expand All @@ -306,9 +307,10 @@ public void testExpireDeleteFiles() throws Exception {
catalogName, tableIdent);
table.refresh();

sql(
"INSERT INTO TABLE %s VALUES (5, 'e')",
tableName); // this txn moves the file to the DELETED state
table
.newRowDelta()
.removeDeletes(deleteFile)
.commit(); // this txn moves the file to the DELETED state
sql("INSERT INTO TABLE %s VALUES (6, 'f')", tableName); // this txn removes the file reference
table.refresh();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.PartitionStatisticsFile;
Expand Down Expand Up @@ -285,8 +286,8 @@ public void testExpireDeleteFiles() throws Exception {
"Should have 1 delete manifest", 1, TestHelpers.deleteManifests(table).size());
Assert.assertEquals("Should have 1 delete file", 1, TestHelpers.deleteFiles(table).size());
Path deleteManifestPath = new Path(TestHelpers.deleteManifests(table).iterator().next().path());
Path deleteFilePath =
new Path(String.valueOf(TestHelpers.deleteFiles(table).iterator().next().path()));
DeleteFile deleteFile = TestHelpers.deleteFiles(table).iterator().next();
Path deleteFilePath = new Path(String.valueOf(deleteFile.path()));

sql(
"CALL %s.system.rewrite_data_files("
Expand All @@ -297,9 +298,10 @@ public void testExpireDeleteFiles() throws Exception {
catalogName, tableIdent);
table.refresh();

sql(
"INSERT INTO TABLE %s VALUES (5, 'e')",
tableName); // this txn moves the file to the DELETED state
table
.newRowDelta()
.removeDeletes(deleteFile)
.commit(); // this txn moves the file to the DELETED state
sql("INSERT INTO TABLE %s VALUES (6, 'f')", tableName); // this txn removes the file reference
table.refresh();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.PartitionStatisticsFile;
Expand Down Expand Up @@ -277,8 +278,8 @@ public void testExpireDeleteFiles() throws Exception {
assertThat(TestHelpers.deleteManifests(table)).as("Should have 1 delete manifest").hasSize(1);
assertThat(TestHelpers.deleteFiles(table)).as("Should have 1 delete file").hasSize(1);
Path deleteManifestPath = new Path(TestHelpers.deleteManifests(table).iterator().next().path());
Path deleteFilePath =
new Path(String.valueOf(TestHelpers.deleteFiles(table).iterator().next().path()));
DeleteFile deleteFile = TestHelpers.deleteFiles(table).iterator().next();
Path deleteFilePath = new Path(String.valueOf(deleteFile.path()));

sql(
"CALL %s.system.rewrite_data_files("
Expand All @@ -289,9 +290,10 @@ public void testExpireDeleteFiles() throws Exception {
catalogName, tableIdent);
table.refresh();

sql(
"INSERT INTO TABLE %s VALUES (5, 'e')",
tableName); // this txn moves the file to the DELETED state
table
.newRowDelta()
.removeDeletes(deleteFile)
.commit(); // this txn moves the file to the DELETED state
sql("INSERT INTO TABLE %s VALUES (6, 'f')", tableName); // this txn removes the file reference
table.refresh();

Expand Down

0 comments on commit 6e71f7f

Please sign in to comment.