Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Support DVs in DeleteFileIndex #11467

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 55 additions & 7 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
Expand Down Expand Up @@ -70,6 +71,7 @@ class DeleteFileIndex {
private final PartitionMap<EqualityDeletes> eqDeletesByPartition;
private final PartitionMap<PositionDeletes> posDeletesByPartition;
private final Map<String, PositionDeletes> posDeletesByPath;
private final Map<String, DeleteFile> dvByPath;
private final boolean hasEqDeletes;
private final boolean hasPosDeletes;
private final boolean isEmpty;
Expand All @@ -78,13 +80,16 @@ private DeleteFileIndex(
EqualityDeletes globalDeletes,
PartitionMap<EqualityDeletes> eqDeletesByPartition,
PartitionMap<PositionDeletes> posDeletesByPartition,
Map<String, PositionDeletes> posDeletesByPath) {
Map<String, PositionDeletes> posDeletesByPath,
Map<String, DeleteFile> dvByPath) {
this.globalDeletes = globalDeletes;
this.eqDeletesByPartition = eqDeletesByPartition;
this.posDeletesByPartition = posDeletesByPartition;
this.posDeletesByPath = posDeletesByPath;
this.dvByPath = dvByPath;
this.hasEqDeletes = globalDeletes != null || eqDeletesByPartition != null;
this.hasPosDeletes = posDeletesByPartition != null || posDeletesByPath != null;
this.hasPosDeletes =
posDeletesByPartition != null || posDeletesByPath != null || dvByPath != null;
this.isEmpty = !hasEqDeletes && !hasPosDeletes;
}

Expand Down Expand Up @@ -125,6 +130,10 @@ public Iterable<DeleteFile> referencedDeleteFiles() {
}
}

if (dvByPath != null) {
deleteFiles = Iterables.concat(deleteFiles, dvByPath.values());
}

return deleteFiles;
}

Expand All @@ -143,9 +152,16 @@ DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {

DeleteFile[] global = findGlobalDeletes(sequenceNumber, file);
DeleteFile[] eqPartition = findEqPartitionDeletes(sequenceNumber, file);
DeleteFile[] posPartition = findPosPartitionDeletes(sequenceNumber, file);
DeleteFile[] posPath = findPathDeletes(sequenceNumber, file);
return concat(global, eqPartition, posPartition, posPath);
DeleteFile dv = findDV(sequenceNumber, file);
if (dv != null && global == null && eqPartition == null) {
return new DeleteFile[] {dv};
} else if (dv != null) {
return concat(global, eqPartition, new DeleteFile[] {dv});
} else {
DeleteFile[] posPartition = findPosPartitionDeletes(sequenceNumber, file);
DeleteFile[] posPath = findPathDeletes(sequenceNumber, file);
return concat(global, eqPartition, posPartition, posPath);
}
}

private DeleteFile[] findGlobalDeletes(long seq, DataFile dataFile) {
Expand Down Expand Up @@ -180,6 +196,22 @@ private DeleteFile[] findPathDeletes(long seq, DataFile dataFile) {
return deletes == null ? EMPTY_DELETES : deletes.filter(seq);
}

private DeleteFile findDV(long seq, DataFile dataFile) {
if (dvByPath == null) {
return null;
}

DeleteFile dv = dvByPath.get(dataFile.location());
if (dv != null) {
ValidationException.check(
dv.dataSequenceNumber() >= seq,
"DV data sequence number (%s) must be greater than or equal to data file sequence number (%s)",
dv.dataSequenceNumber(),
seq);
}
return dv;
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private static boolean canContainEqDeletesForFile(
DataFile dataFile, EqualityDeleteFile deleteFile) {
Expand Down Expand Up @@ -434,11 +466,16 @@ DeleteFileIndex build() {
PartitionMap<EqualityDeletes> eqDeletesByPartition = PartitionMap.create(specsById);
PartitionMap<PositionDeletes> posDeletesByPartition = PartitionMap.create(specsById);
Map<String, PositionDeletes> posDeletesByPath = Maps.newHashMap();
Map<String, DeleteFile> dvByPath = Maps.newHashMap();

for (DeleteFile file : files) {
switch (file.content()) {
case POSITION_DELETES:
add(posDeletesByPath, posDeletesByPartition, file);
if (ContentFileUtil.isDV(file)) {
add(dvByPath, file);
} else {
add(posDeletesByPath, posDeletesByPartition, file);
}
break;
case EQUALITY_DELETES:
add(globalDeletes, eqDeletesByPartition, file);
Expand All @@ -453,7 +490,18 @@ DeleteFileIndex build() {
globalDeletes.isEmpty() ? null : globalDeletes,
eqDeletesByPartition.isEmpty() ? null : eqDeletesByPartition,
posDeletesByPartition.isEmpty() ? null : posDeletesByPartition,
posDeletesByPath.isEmpty() ? null : posDeletesByPath);
posDeletesByPath.isEmpty() ? null : posDeletesByPath,
dvByPath.isEmpty() ? null : dvByPath);
}

private void add(Map<String, DeleteFile> dvByPath, DeleteFile dv) {
String path = dv.referencedDataFile();
DeleteFile existingDV = dvByPath.putIfAbsent(path, dv);
if (existingDV != null) {
throw new ValidationException(
"Can't index multiple DVs for %s: %s and %s",
path, ContentFileUtil.dvDesc(dv), ContentFileUtil.dvDesc(existingDV));
}
}

private void add(
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -84,4 +85,17 @@ public static String referencedDataFileLocation(DeleteFile deleteFile) {
CharSequence location = referencedDataFile(deleteFile);
return location != null ? location.toString() : null;
}

public static boolean isDV(DeleteFile deleteFile) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as I mentioned on the other PR. This isn't probably needed anymore, since it exists in ScanTaskUtil

Copy link
Contributor Author

@aokolnychyi aokolnychyi Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually intentional. I originally wanted to move ContentFileUtil to api but then we decided against that, as it required moving more classes and increasing the API surface. Therefore, we added a simple utility for what was needed in api. I do want ContentFileUtil to contain all the logic for content files, it is just a matter of one duplicate method. I made that method in ScanTaskUtil private on purpose.

return deleteFile.format() == FileFormat.PUFFIN;
}

public static String dvDesc(DeleteFile deleteFile) {
return String.format(
"DV{location=%s, offset=%s, length=%s, referencedDataFile=%s}",
deleteFile.location(),
deleteFile.contentOffset(),
deleteFile.contentSizeInBytes(),
deleteFile.referencedDataFile());
}
}
52 changes: 52 additions & 0 deletions core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,24 @@
import static org.apache.iceberg.expressions.Expressions.equal;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.iceberg.DeleteFileIndex.EqualityDeletes;
import org.apache.iceberg.DeleteFileIndex.PositionDeletes;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.ContentFileUtil;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

Expand Down Expand Up @@ -624,4 +628,52 @@ public void testEqualityDeletesGroup() {
// it should not be possible to add more elements upon indexing
assertThatThrownBy(() -> group.add(SPEC, file1)).isInstanceOf(IllegalStateException.class);
}

@TestTemplate
public void testMixDeleteFilesAndDVs() {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);

List<DeleteFile> deletes =
Arrays.asList(
withDataSequenceNumber(1, partitionedPosDeletes(SPEC, FILE_A.partition())),
withDataSequenceNumber(2, newDV(FILE_A)),
withDataSequenceNumber(1, partitionedPosDeletes(SPEC, FILE_B.partition())),
withDataSequenceNumber(2, partitionedPosDeletes(SPEC, FILE_B.partition())));

DeleteFileIndex index = DeleteFileIndex.builderFor(deletes).specsById(table.specs()).build();

DeleteFile[] fileADeletes = index.forDataFile(0, FILE_A);
assertThat(fileADeletes).as("Only DV should apply to FILE_A").hasSize(1);
assertThat(ContentFileUtil.isDV(fileADeletes[0])).isTrue();
assertThat(fileADeletes[0].referencedDataFile()).isEqualTo(FILE_A.location());

DeleteFile[] fileBDeletes = index.forDataFile(0, FILE_B);
assertThat(fileBDeletes).as("Two delete files should apply to FILE_B").hasSize(2);
assertThat(ContentFileUtil.isDV(fileBDeletes[0])).isFalse();
assertThat(ContentFileUtil.isDV(fileBDeletes[1])).isFalse();
}

@TestTemplate
public void testMultipleDVs() {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);

DeleteFile dv1 = withDataSequenceNumber(1, newDV(FILE_A));
DeleteFile dv2 = withDataSequenceNumber(2, newDV(FILE_A));
List<DeleteFile> dvs = Arrays.asList(dv1, dv2);

assertThatThrownBy(() -> DeleteFileIndex.builderFor(dvs).specsById(table.specs()).build())
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Can't index multiple DVs for %s", FILE_A.location());
}

@TestTemplate
public void testInvalidDVSequenceNumber() {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
DeleteFile dv = withDataSequenceNumber(1, newDV(FILE_A));
List<DeleteFile> dvs = Collections.singletonList(dv);
DeleteFileIndex index = DeleteFileIndex.builderFor(dvs).specsById(table.specs()).build();
assertThatThrownBy(() -> index.forDataFile(2, FILE_A))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("must be greater than or equal to data file sequence number");
}
}