Skip to content

Commit

Permalink
Core: Optimize merging snapshot producer to use referenced manifest t…
Browse files Browse the repository at this point in the history
…o determine if a given manifest needs to be rewritten or not
  • Loading branch information
amogh-jahagirdar committed Oct 24, 2024
1 parent ec6c919 commit e137da1
Showing 1 changed file with 21 additions and 19 deletions.
40 changes: 21 additions & 19 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public String partition() {
private int duplicateDeleteCount = 0;
private boolean caseSensitive = true;
private boolean allDeletesReferenceManifests = true;
private boolean trustReferencedManifests = false;

// cache filtered manifests to avoid extra work when commits fail.
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
Expand Down Expand Up @@ -180,8 +179,7 @@ void delete(CharSequence path) {
}

boolean containsDeletes() {
return !manifestsReferencedForDeletes.isEmpty()
|| !deletePaths.isEmpty()
return !deletePaths.isEmpty()
|| !deleteFiles.isEmpty()
|| deleteExpression != Expressions.alwaysFalse()
|| !dropPartitions.isEmpty();
Expand All @@ -200,12 +198,15 @@ List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manife
return ImmutableList.of();
}

// The current set of referenced manifests can be trusted if it is a subset of the manifests
// being filtered. If a single referenced manifest is not in the set of manifests being filtered
// this indicates that the referenced manifests are stale and cannot be trusted.
// Use the current set of referenced manifests as a source of truth when it's a subset of all
// manifests and all removals which were performed reference manifests.
// If a manifest is not in the trusted referenced set, filtering can be skipped
// assuming that the manifest does not have any live entries or aged out deletes
Set<String> manifestLocations =
manifests.stream().map(ManifestFile::path).collect(Collectors.toSet());
trustReferencedManifests = manifestLocations.containsAll(manifestsReferencedForDeletes);
boolean trustReferencedManifests =
allDeletesReferenceManifests
&& manifestLocations.containsAll(manifestsReferencedForDeletes);

ManifestFile[] filtered = new ManifestFile[manifests.size()];
// open all of the manifest files in parallel, use index to avoid reordering
Expand All @@ -215,7 +216,8 @@ List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manife
.executeWith(workerPoolSupplier.get())
.run(
index -> {
ManifestFile manifest = filterManifest(tableSchema, manifests.get(index));
ManifestFile manifest =
filterManifest(tableSchema, manifests.get(index), trustReferencedManifests);
filtered[index] = manifest;
});

Expand Down Expand Up @@ -329,14 +331,15 @@ private void invalidateFilteredCache() {
/**
* @return a ManifestReader that is a filtered version of the input manifest.
*/
private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
private ManifestFile filterManifest(
Schema tableSchema, ManifestFile manifest, boolean trustReferencedManifests) {
ManifestFile cached = filteredManifests.get(manifest);
if (cached != null) {
return cached;
}

boolean hasLiveFiles = manifest.hasAddedFiles() || manifest.hasExistingFiles();
if (!hasLiveFiles || !canContainDeletedFiles(manifest)) {
if (!hasLiveFiles || !canContainDeletedFiles(manifest, trustReferencedManifests)) {
filteredManifests.put(manifest, manifest);
return manifest;
}
Expand All @@ -345,15 +348,14 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
PartitionSpec spec = reader.spec();
PartitionAndMetricsEvaluator evaluator =
new PartitionAndMetricsEvaluator(tableSchema, spec, deleteExpression);
boolean hasDeletedFiles = manifestsReferencedForDeletes.contains(manifest.path());
if (hasDeletedFiles) {
if (manifestsReferencedForDeletes.contains(manifest.path())) {
return filterManifestWithDeletedFiles(evaluator, manifest, reader);
}

// this assumes that the manifest doesn't have files to remove and streams through the
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
boolean hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
if (!hasDeletedFiles) {
filteredManifests.put(manifest, manifest);
return manifest;
Expand All @@ -366,11 +368,11 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
}
}

private boolean canContainDeletedFiles(ManifestFile manifest) {
private boolean canContainDeletedFiles(ManifestFile manifest, boolean trustReferencedManifests) {
return canContainDropBySeq(manifest)
|| canContainDroppedFiles(manifest, trustReferencedManifests)
|| canContainExpressionDeletes(manifest)
|| canContainDroppedPartitions(manifest)
|| canContainDroppedFiles(manifest);
|| canContainDroppedPartitions(manifest);
}

private boolean canContainDropBySeq(ManifestFile manifest) {
Expand All @@ -397,10 +399,10 @@ private boolean canContainDroppedPartitions(ManifestFile manifest) {
return false;
}

private boolean canContainDroppedFiles(ManifestFile manifest) {
if (manifestsReferencedForDeletes.contains(manifest.path()) || !deletePaths.isEmpty()) {
private boolean canContainDroppedFiles(ManifestFile manifest, boolean trustReferencedManifests) {
if (!deletePaths.isEmpty() || manifestsReferencedForDeletes.contains(manifest.path())) {
return true;
} else if (allDeletesReferenceManifests && trustReferencedManifests) {
} else if (trustReferencedManifests) {
return false;
} else {
return ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
Expand Down

0 comments on commit e137da1

Please sign in to comment.