From efe7ab7aa325c0cfd585ac7822e7e056f959b36e Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Thu, 12 Sep 2024 13:38:07 -0600 Subject: [PATCH] Core: Optimize merging snapshot producer to use referenced manifest to determine if a given manifest needs to be rewritten or not --- .../iceberg/ReplaceDeleteFilesBenchmark.java | 49 +++++++++++++---- .../apache/iceberg/ManifestFilterManager.java | 30 +++++++++-- .../java/org/apache/iceberg/TestRowDelta.java | 53 +++++++++++++++++++ 3 files changed, 118 insertions(+), 14 deletions(-) diff --git a/core/src/jmh/java/org/apache/iceberg/ReplaceDeleteFilesBenchmark.java b/core/src/jmh/java/org/apache/iceberg/ReplaceDeleteFilesBenchmark.java index a899b870a90c..da6fe583a2e2 100644 --- a/core/src/jmh/java/org/apache/iceberg/ReplaceDeleteFilesBenchmark.java +++ b/core/src/jmh/java/org/apache/iceberg/ReplaceDeleteFilesBenchmark.java @@ -20,7 +20,10 @@ import static org.apache.iceberg.types.Types.NestedField.required; +import com.google.common.collect.Maps; +import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -70,14 +73,17 @@ public class ReplaceDeleteFilesBenchmark { private static final HadoopTables TABLES = new HadoopTables(); private Table table; - private List deleteFiles; + private List deleteFilesToReplace; private List pendingDeleteFiles; - @Param({"50000", "100000", "500000", "1000000", "2500000"}) + @Param({"50000", "100000", "500000", "1000000", "2000000"}) private int numFiles; + @Param({"5", "25", "50", "100"}) + private int percentDeleteFilesReplaced; + @Setup - public void setupBenchmark() { + public void setupBenchmark() throws IOException { initTable(); initFiles(); } @@ -91,7 +97,7 @@ public void tearDownBenchmark() { @Threads(1) public void replaceDeleteFiles() { RowDelta rowDelta = table.newRowDelta(); - deleteFiles.forEach(rowDelta::removeDeletes); + deleteFilesToReplace.forEach(rowDelta::removeDeletes); pendingDeleteFiles.forEach(rowDelta::addDeletes); rowDelta.commit(); } @@ -104,27 +110,48 @@ private void dropTable() { TABLES.dropTable(TABLE_IDENT); } - private void initFiles() { + private void initFiles() throws IOException { List generatedDeleteFiles = Lists.newArrayListWithExpectedSize(numFiles); List generatedPendingDeleteFiles = Lists.newArrayListWithExpectedSize(numFiles); RowDelta rowDelta = table.newRowDelta(); - + int numDeleteFilesToReplace = (int) Math.ceil(numFiles * (percentDeleteFilesReplaced / 100.0)); + Map filesToReplace = + Maps.newHashMapWithExpectedSize(numDeleteFilesToReplace); for (int ordinal = 0; ordinal < numFiles; ordinal++) { DataFile dataFile = FileGenerationUtil.generateDataFile(table, null); rowDelta.addRows(dataFile); - DeleteFile deleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile); rowDelta.addDeletes(deleteFile); generatedDeleteFiles.add(deleteFile); - - DeleteFile pendingDeleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile); - generatedPendingDeleteFiles.add(pendingDeleteFile); + if (numDeleteFilesToReplace > 0) { + filesToReplace.put(deleteFile.location(), deleteFile); + DeleteFile pendingDeleteFile = + FileGenerationUtil.generatePositionDeleteFile(table, dataFile); + generatedPendingDeleteFiles.add(pendingDeleteFile); + numDeleteFilesToReplace--; + } } rowDelta.commit(); - this.deleteFiles = generatedDeleteFiles; + List deleteFilesReadFromManifests = Lists.newArrayList(); + List deleteManifests = table.currentSnapshot().deleteManifests(table.io()); + for (ManifestFile deleteManifest : deleteManifests) { + try (ManifestReader manifestReader = + ManifestFiles.readDeleteManifest(deleteManifest, table.io(), table.specs())) { + manifestReader + .iterator() + .forEachRemaining( + file -> { + if (filesToReplace.containsKey(file.location())) { + deleteFilesReadFromManifests.add(file); + } + }); + } + } + this.pendingDeleteFiles = generatedPendingDeleteFiles; + this.deleteFilesToReplace = deleteFilesReadFromManifests; } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java index 106be74fa3ad..d12c83e2440e 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java @@ -78,9 +78,11 @@ public String partition() { private boolean failMissingDeletePaths = false; private int duplicateDeleteCount = 0; private boolean caseSensitive = true; + private boolean allDeletesReferenceManifests = true; // cache filtered manifests to avoid extra work when commits fail. private final Map filteredManifests = Maps.newConcurrentMap(); + private final Set manifestsWithDeletedFiles = Sets.newConcurrentHashSet(); // tracking where files were deleted to validate retries quickly private final Map> filteredManifestToDeletedFiles = @@ -120,6 +122,7 @@ protected void deleteByRowFilter(Expression expr) { Preconditions.checkNotNull(expr, "Cannot delete files using filter: null"); invalidateFilteredCache(); this.deleteExpression = Expressions.or(deleteExpression, expr); + this.allDeletesReferenceManifests = false; } /** Add a partition tuple to drop from the table during the delete phase. */ @@ -127,6 +130,7 @@ protected void dropPartition(int specId, StructLike partition) { Preconditions.checkNotNull(partition, "Cannot delete files in invalid partition: null"); invalidateFilteredCache(); dropPartitions.add(specId, partition); + this.allDeletesReferenceManifests = false; } /** @@ -153,7 +157,13 @@ void caseSensitive(boolean newCaseSensitive) { void delete(F file) { Preconditions.checkNotNull(file, "Cannot delete file: null"); invalidateFilteredCache(); - deletePaths.add(file.path()); + if (file.manifestLocation() == null) { + this.allDeletesReferenceManifests = false; + } else { + manifestsWithDeletedFiles.add(file.manifestLocation()); + } + + deletePaths.add(file.location()); deleteFilePartitions.add(file.specId(), file.partition()); } @@ -162,11 +172,13 @@ void delete(CharSequence path) { Preconditions.checkNotNull(path, "Cannot delete file path: null"); invalidateFilteredCache(); this.hasPathOnlyDeletes = true; + this.allDeletesReferenceManifests = false; deletePaths.add(path); } boolean containsDeletes() { - return !deletePaths.isEmpty() + return !manifestsWithDeletedFiles.isEmpty() + || !deletePaths.isEmpty() || deleteExpression != Expressions.alwaysFalse() || !dropPartitions.isEmpty(); } @@ -308,11 +320,15 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) { PartitionSpec spec = reader.spec(); PartitionAndMetricsEvaluator evaluator = new PartitionAndMetricsEvaluator(tableSchema, spec, deleteExpression); + boolean hasDeletedFiles = manifestsWithDeletedFiles.contains(manifest.path()); + if (hasDeletedFiles) { + return filterManifestWithDeletedFiles(evaluator, manifest, reader); + } // this assumes that the manifest doesn't have files to remove and streams through the // manifest without copying data. if a manifest does have a file to remove, this will break // out of the loop and move on to filtering the manifest. - boolean hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader); + hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader); if (!hasDeletedFiles) { filteredManifests.put(manifest, manifest); return manifest; @@ -325,7 +341,15 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) { } } + @SuppressWarnings("checkstyle:CyclomaticComplexity") private boolean canContainDeletedFiles(ManifestFile manifest) { + boolean manifestReferencedInDelete = manifestsWithDeletedFiles.contains(manifest.path()); + if (manifest.content() == ManifestContent.DELETES && allDeletesReferenceManifests) { + return manifestReferencedInDelete || manifest.minSequenceNumber() < minSequenceNumber; + } else if (manifestReferencedInDelete) { + return true; + } + boolean canContainExpressionDeletes; if (deleteExpression != null && deleteExpression != Expressions.alwaysFalse()) { ManifestEvaluator manifestEvaluator = diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java b/core/src/test/java/org/apache/iceberg/TestRowDelta.java index 1d67e48a2ce2..255da47b295f 100644 --- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java +++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java @@ -30,6 +30,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -1456,6 +1457,58 @@ public void testRewrittenDeleteFiles() { statuses(Status.DELETED)); } + @TestTemplate + public void testRewrittenDeleteFilesReadFromManifest() throws IOException { + DataFile dataFile = newDataFile("data_bucket=0"); + DeleteFile deleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0"); + RowDelta baseRowDelta = table.newRowDelta().addRows(dataFile).addDeletes(deleteFile); + Snapshot baseSnapshot = commit(table, baseRowDelta, branch); + assertThat(baseSnapshot.operation()).isEqualTo(DataOperations.OVERWRITE); + List deleteManifests = baseSnapshot.deleteManifests(table.io()); + try (ManifestReader deleteReader = + ManifestFiles.readDeleteManifest(deleteManifests.get(0), table.io(), table.specs())) { + deleteFile = deleteReader.iterator().next(); + } + + assertThat(deleteFile.manifestLocation()).isEqualTo(deleteManifests.get(0).path()); + DeleteFile newDeleteFile = newDeleteFile(dataFile.specId(), "data_bucket=0"); + RowDelta rowDelta = + table + .newRowDelta() + .removeDeletes(deleteFile) + .addDeletes(newDeleteFile) + .validateFromSnapshot(baseSnapshot.snapshotId()); + Snapshot snapshot = commit(table, rowDelta, branch); + assertThat(snapshot.operation()).isEqualTo(DataOperations.DELETE); + + List dataManifests = snapshot.dataManifests(table.io()); + assertThat(dataManifests).hasSize(1); + validateManifest( + dataManifests.get(0), + dataSeqs(1L), + fileSeqs(1L), + ids(baseSnapshot.snapshotId()), + files(dataFile), + statuses(Status.ADDED)); + + deleteManifests = snapshot.deleteManifests(table.io()); + assertThat(deleteManifests).hasSize(2); + validateDeleteManifest( + deleteManifests.get(0), + dataSeqs(2L), + fileSeqs(2L), + ids(snapshot.snapshotId()), + files(newDeleteFile), + statuses(Status.ADDED)); + validateDeleteManifest( + deleteManifests.get(1), + dataSeqs(1L), + fileSeqs(1L), + ids(snapshot.snapshotId()), + files(deleteFile), + statuses(Status.DELETED)); + } + @TestTemplate public void testConcurrentDeletesRewriteSameDeleteFile() { DataFile dataFile = newDataFile("data_bucket=0");