diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java index 07086e18e7fd..94242a596d4d 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java @@ -80,7 +80,6 @@ public String partition() { private int duplicateDeleteCount = 0; private boolean caseSensitive = true; private boolean allDeletesReferenceManifests = true; - private boolean trustReferencedManifests = false; // cache filtered manifests to avoid extra work when commits fail. private final Map filteredManifests = Maps.newConcurrentMap(); @@ -180,8 +179,7 @@ void delete(CharSequence path) { } boolean containsDeletes() { - return !manifestsReferencedForDeletes.isEmpty() - || !deletePaths.isEmpty() + return !deletePaths.isEmpty() || !deleteFiles.isEmpty() || deleteExpression != Expressions.alwaysFalse() || !dropPartitions.isEmpty(); @@ -200,12 +198,15 @@ List filterManifests(Schema tableSchema, List manife return ImmutableList.of(); } - // The current set of referenced manifests can be trusted if it is a subset of the manifests - // being filtered. If a single referenced manifest is not in the set of manifests being filtered - // this indicates that the referenced manifests are stale and cannot be trusted. + // Use the current set of referenced manifests as a source of truth when it's a subset of all + // manifests and all removals which were performed reference manifests. + // If a manifest is not in the trusted referenced set, filtering can be skipped + // assuming that the manifest does not have any live entries or aged out deletes Set manifestLocations = manifests.stream().map(ManifestFile::path).collect(Collectors.toSet()); - trustReferencedManifests = manifestLocations.containsAll(manifestsReferencedForDeletes); + boolean trustReferencedManifests = + allDeletesReferenceManifests + && manifestLocations.containsAll(manifestsReferencedForDeletes); ManifestFile[] filtered = new ManifestFile[manifests.size()]; // open all of the manifest files in parallel, use index to avoid reordering @@ -215,7 +216,8 @@ List filterManifests(Schema tableSchema, List manife .executeWith(workerPoolSupplier.get()) .run( index -> { - ManifestFile manifest = filterManifest(tableSchema, manifests.get(index)); + ManifestFile manifest = + filterManifest(tableSchema, manifests.get(index), trustReferencedManifests); filtered[index] = manifest; }); @@ -329,12 +331,22 @@ private void invalidateFilteredCache() { /** * @return a ManifestReader that is a filtered version of the input manifest. */ - private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) { + private ManifestFile filterManifest( + Schema tableSchema, ManifestFile manifest, boolean trustReferencedManifests) { ManifestFile cached = filteredManifests.get(manifest); if (cached != null) { return cached; } + boolean manifestIsReferenced = manifestsReferencedForDeletes.contains(manifest.path()); + + // The manifest does not need to be rewritten if the referenced set can be trusted and the + // manifest is not referenced + if (trustReferencedManifests && !manifestIsReferenced) { + filteredManifests.put(manifest, manifest); + return manifest; + } + boolean hasLiveFiles = manifest.hasAddedFiles() || manifest.hasExistingFiles(); if (!hasLiveFiles || !canContainDeletedFiles(manifest)) { filteredManifests.put(manifest, manifest); @@ -345,15 +357,14 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) { PartitionSpec spec = reader.spec(); PartitionAndMetricsEvaluator evaluator = new PartitionAndMetricsEvaluator(tableSchema, spec, deleteExpression); - boolean hasDeletedFiles = manifestsReferencedForDeletes.contains(manifest.path()); - if (hasDeletedFiles) { + if (manifestIsReferenced) { return filterManifestWithDeletedFiles(evaluator, manifest, reader); } // this assumes that the manifest doesn't have files to remove and streams through the // manifest without copying data. if a manifest does have a file to remove, this will break // out of the loop and move on to filtering the manifest. - hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader); + boolean hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader); if (!hasDeletedFiles) { filteredManifests.put(manifest, manifest); return manifest; @@ -367,15 +378,9 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) { } private boolean canContainDeletedFiles(ManifestFile manifest) { - return canContainDropBySeq(manifest) + return canContainDroppedFiles(manifest) || canContainExpressionDeletes(manifest) - || canContainDroppedPartitions(manifest) - || canContainDroppedFiles(manifest); - } - - private boolean canContainDropBySeq(ManifestFile manifest) { - return manifest.content() == ManifestContent.DELETES - && manifest.minSequenceNumber() < minSequenceNumber; + || canContainDroppedPartitions(manifest); } private boolean canContainExpressionDeletes(ManifestFile manifest) { @@ -398,10 +403,8 @@ private boolean canContainDroppedPartitions(ManifestFile manifest) { } private boolean canContainDroppedFiles(ManifestFile manifest) { - if (manifestsReferencedForDeletes.contains(manifest.path()) || !deletePaths.isEmpty()) { + if (!deletePaths.isEmpty()) { return true; - } else if (allDeletesReferenceManifests && trustReferencedManifests) { - return false; } else { return ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById); } diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 573a0abbfb69..6c2cf0e7a0f7 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -573,7 +573,10 @@ public void testDeleteByExpressionWithDeleteFile() { @TestTemplate public void testDeleteDataFileWithDeleteFile() { - commit(table, table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A_DELETES), branch); + commit( + table, + table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A_DELETES).addDeletes(FILE_B_DELETES), + branch); long deltaSnapshotId = latestSnapshot(table, branch).snapshotId(); assertThat(latestSnapshot(table, branch).sequenceNumber()).isEqualTo(1); @@ -598,18 +601,18 @@ public void testDeleteDataFileWithDeleteFile() { assertThat(deleteSnap.deleteManifests(table.io())).hasSize(1); validateDeleteManifest( deleteSnap.deleteManifests(table.io()).get(0), - dataSeqs(1L), - fileSeqs(1L), - ids(deltaSnapshotId), - files(FILE_A_DELETES), - statuses(Status.ADDED)); + dataSeqs(1L, 1L), + fileSeqs(1L, 1L), + ids(deltaSnapshotId, deltaSnapshotId), + files(FILE_A_DELETES, FILE_B_DELETES), + statuses(Status.ADDED, Status.ADDED)); // the manifest that removed FILE_A will be dropped next commit, causing the min sequence number // of all data files // to be 2, the largest known sequence number. this will cause FILE_A_DELETES to be removed // because it is too old // to apply to any data files. - commit(table, table.newDelete().deleteFile("no-such-file"), branch); + commit(table, table.newRowDelta().removeDeletes(FILE_B_DELETES), branch); Snapshot nextSnap = latestSnapshot(table, branch); assertThat(nextSnap.sequenceNumber()).isEqualTo(3); @@ -619,11 +622,11 @@ public void testDeleteDataFileWithDeleteFile() { assertThat(nextSnap.deleteManifests(table.io())).hasSize(1); validateDeleteManifest( nextSnap.deleteManifests(table.io()).get(0), - dataSeqs(1L), - fileSeqs(1L), - ids(nextSnap.snapshotId()), - files(FILE_A_DELETES), - statuses(Status.DELETED)); + dataSeqs(1L, 1L), + fileSeqs(1L, 1L), + ids(nextSnap.snapshotId(), nextSnap.snapshotId()), + files(FILE_A_DELETES, FILE_B_DELETES), + statuses(Status.DELETED, Status.DELETED)); } @TestTemplate