Skip to content

Commit

Permalink
Core: Support commits with DVs (#11495)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Nov 11, 2024
1 parent 5530605 commit af3fbfe
Show file tree
Hide file tree
Showing 19 changed files with 506 additions and 212 deletions.
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ protected void validate(TableMetadata base, Snapshot parent) {
if (validateNewDeleteFiles) {
validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, parent);
}

validateAddedDVs(base, startingSnapshotId, conflictDetectionFilter, parent);
}
}
}
92 changes: 90 additions & 2 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -70,6 +72,9 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
// delete files can be added in "overwrite" or "delete" operations
private static final Set<String> VALIDATE_ADDED_DELETE_FILES_OPERATIONS =
ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE);
// DVs can be added in "overwrite", "delete", and "replace" operations
private static final Set<String> VALIDATE_ADDED_DVS_OPERATIONS =
ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE, DataOperations.REPLACE);

private final String tableName;
private final TableOperations ops;
Expand All @@ -83,6 +88,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private final Map<Integer, DataFileSet> newDataFilesBySpec = Maps.newHashMap();
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, DeleteFileSet> newDeleteFilesBySpec = Maps.newHashMap();
private final Set<String> newDVRefs = Sets.newHashSet();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder();
Expand Down Expand Up @@ -245,13 +251,13 @@ private PartitionSpec spec(int specId) {

/** Add a delete file to the new snapshot. */
protected void add(DeleteFile file) {
Preconditions.checkNotNull(file, "Invalid delete file: null");
validateNewDeleteFile(file);
add(new PendingDeleteFile(file));
}

/** Add a delete file to the new snapshot. */
protected void add(DeleteFile file, long dataSequenceNumber) {
Preconditions.checkNotNull(file, "Invalid delete file: null");
validateNewDeleteFile(file);
add(new PendingDeleteFile(file, dataSequenceNumber));
}

Expand All @@ -268,9 +274,39 @@ private void add(PendingDeleteFile file) {
if (deleteFiles.add(file)) {
addedFilesSummary.addedFile(spec, file);
hasNewDeleteFiles = true;
if (ContentFileUtil.isDV(file)) {
newDVRefs.add(file.referencedDataFile());
}
}
}

protected void validateNewDeleteFile(DeleteFile file) {
Preconditions.checkNotNull(file, "Invalid delete file: null");
switch (formatVersion()) {
case 1:
throw new IllegalArgumentException("Deletes are supported in V2 and above");
case 2:
Preconditions.checkArgument(
file.content() == FileContent.EQUALITY_DELETES || !ContentFileUtil.isDV(file),
"Must not use DVs for position deletes in V2: %s",
ContentFileUtil.dvDesc(file));
break;
case 3:
Preconditions.checkArgument(
file.content() == FileContent.EQUALITY_DELETES || ContentFileUtil.isDV(file),
"Must use DVs for position deletes in V%s: %s",
formatVersion(),
file.location());
break;
default:
throw new IllegalArgumentException("Unsupported format version: " + formatVersion());
}
}

private int formatVersion() {
return ops.current().formatVersion();
}

/** Add all files in a manifest to the new snapshot. */
protected void add(ManifestFile manifest) {
Preconditions.checkArgument(
Expand Down Expand Up @@ -769,6 +805,58 @@ protected void validateDataFilesExist(
}
}

// validates there are no concurrently added DVs for referenced data files
protected void validateAddedDVs(
TableMetadata base,
Long startingSnapshotId,
Expression conflictDetectionFilter,
Snapshot parent) {
// skip if there is no current table state or this operation doesn't add new DVs
if (parent == null || newDVRefs.isEmpty()) {
return;
}

Pair<List<ManifestFile>, Set<Long>> history =
validationHistory(
base,
startingSnapshotId,
VALIDATE_ADDED_DVS_OPERATIONS,
ManifestContent.DELETES,
parent);
List<ManifestFile> newDeleteManifests = history.first();
Set<Long> newSnapshotIds = history.second();

Tasks.foreach(newDeleteManifests)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(workerPool())
.run(manifest -> validateAddedDVs(manifest, conflictDetectionFilter, newSnapshotIds));
}

private void validateAddedDVs(
ManifestFile manifest, Expression conflictDetectionFilter, Set<Long> newSnapshotIds) {
try (CloseableIterable<ManifestEntry<DeleteFile>> entries =
ManifestFiles.readDeleteManifest(manifest, ops.io(), ops.current().specsById())
.filterRows(conflictDetectionFilter)
.caseSensitive(caseSensitive)
.liveEntries()) {

for (ManifestEntry<DeleteFile> entry : entries) {
DeleteFile file = entry.file();
if (newSnapshotIds.contains(entry.snapshotId()) && ContentFileUtil.isDV(file)) {
ValidationException.check(
!newDVRefs.contains(file.referencedDataFile()),
"Found concurrently added DV for %s: %s",
file.referencedDataFile(),
ContentFileUtil.dvDesc(file));
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

// returns newly added manifests and snapshot IDs between the starting and parent snapshots
private Pair<List<ManifestFile>, Set<Long>> validationHistory(
TableMetadata base,
Long startingSnapshotId,
Expand Down
31 changes: 14 additions & 17 deletions core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,6 @@ public static List<Object> parameters() {
return Arrays.asList(2, 3);
}

static final DeleteFile FILE_A_POS_1 =
FileMetadata.deleteFileBuilder(SPEC)
.ofPositionDeletes()
.withPath("/path/to/data-a-pos-deletes.parquet")
.withFileSizeInBytes(10)
.withPartition(FILE_A.partition())
.withRecordCount(1)
.build();

static final DeleteFile FILE_A_EQ_1 =
FileMetadata.deleteFileBuilder(SPEC)
.ofEqualityDeletes()
Expand Down Expand Up @@ -311,7 +302,7 @@ public void testUnpartitionedTableScan() throws IOException {
public void testPartitionedTableWithPartitionPosDeletes() {
table.newAppend().appendFile(FILE_A).commit();

table.newRowDelta().addDeletes(FILE_A_POS_1).commit();
table.newRowDelta().addDeletes(fileADeletes()).commit();

List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
assertThat(tasks).as("Should have one task").hasSize(1);
Expand All @@ -323,7 +314,7 @@ public void testPartitionedTableWithPartitionPosDeletes() {
assertThat(task.deletes()).as("Should have one associated delete file").hasSize(1);
assertThat(task.deletes().get(0).path())
.as("Should have only pos delete file")
.isEqualTo(FILE_A_POS_1.path());
.isEqualTo(fileADeletes().path());
}

@TestTemplate
Expand All @@ -349,7 +340,7 @@ public void testPartitionedTableWithPartitionEqDeletes() {
public void testPartitionedTableWithUnrelatedPartitionDeletes() {
table.newAppend().appendFile(FILE_B).commit();

table.newRowDelta().addDeletes(FILE_A_POS_1).addDeletes(FILE_A_EQ_1).commit();
table.newRowDelta().addDeletes(fileADeletes()).addDeletes(FILE_A_EQ_1).commit();

List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
assertThat(tasks).as("Should have one task").hasSize(1);
Expand All @@ -363,7 +354,9 @@ public void testPartitionedTableWithUnrelatedPartitionDeletes() {

@TestTemplate
public void testPartitionedTableWithOlderPartitionDeletes() {
table.newRowDelta().addDeletes(FILE_A_POS_1).addDeletes(FILE_A_EQ_1).commit();
assumeThat(formatVersion).as("DVs are not filtered using sequence numbers").isEqualTo(2);

table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A_EQ_1).commit();

table.newAppend().appendFile(FILE_A).commit();

Expand All @@ -379,6 +372,8 @@ public void testPartitionedTableWithOlderPartitionDeletes() {

@TestTemplate
public void testPartitionedTableScanWithGlobalDeletes() {
assumeThat(formatVersion).as("Requires V2 position deletes").isEqualTo(2);

table.newAppend().appendFile(FILE_A).commit();

TableMetadata base = table.ops().current();
Expand Down Expand Up @@ -407,6 +402,8 @@ public void testPartitionedTableScanWithGlobalDeletes() {

@TestTemplate
public void testPartitionedTableScanWithGlobalAndPartitionDeletes() {
assumeThat(formatVersion).as("Requires V2 position deletes").isEqualTo(2);

table.newAppend().appendFile(FILE_A).commit();

table.newRowDelta().addDeletes(FILE_A_EQ_1).commit();
Expand Down Expand Up @@ -437,7 +434,7 @@ public void testPartitionedTableScanWithGlobalAndPartitionDeletes() {

@TestTemplate
public void testPartitionedTableSequenceNumbers() {
table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A_EQ_1).addDeletes(FILE_A_POS_1).commit();
table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A_EQ_1).addDeletes(fileADeletes()).commit();

List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
assertThat(tasks).as("Should have one task").hasSize(1);
Expand All @@ -449,7 +446,7 @@ public void testPartitionedTableSequenceNumbers() {
assertThat(task.deletes()).as("Should have one associated delete file").hasSize(1);
assertThat(task.deletes().get(0).path())
.as("Should have only pos delete file")
.isEqualTo(FILE_A_POS_1.path());
.isEqualTo(fileADeletes().path());
}

@TestTemplate
Expand Down Expand Up @@ -501,7 +498,7 @@ public void testPartitionedTableWithExistingDeleteFile() {

table.newRowDelta().addDeletes(FILE_A_EQ_1).commit();

table.newRowDelta().addDeletes(FILE_A_POS_1).commit();
table.newRowDelta().addDeletes(fileADeletes()).commit();

table
.updateProperties()
Expand Down Expand Up @@ -557,7 +554,7 @@ public void testPartitionedTableWithExistingDeleteFile() {
assertThat(task.deletes()).as("Should have two associated delete files").hasSize(2);
assertThat(Sets.newHashSet(Iterables.transform(task.deletes(), ContentFile::path)))
.as("Should have expected delete files")
.isEqualTo(Sets.newHashSet(FILE_A_EQ_1.path(), FILE_A_POS_1.path()));
.isEqualTo(Sets.newHashSet(FILE_A_EQ_1.path(), fileADeletes().path()));
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void scanningWithDeletes() throws IOException {
reporter);

table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_C).commit();
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_B_DELETES).commit();
table.newRowDelta().addDeletes(fileADeletes()).addDeletes(fileBDeletes()).commit();
ScanT tableScan = newScan(table);

try (CloseableIterable<T> fileScanTasks = tableScan.planFiles()) {
Expand All @@ -208,12 +208,19 @@ public void scanningWithDeletes() throws IOException {
assertThat(result.totalDataManifests().value()).isEqualTo(1);
assertThat(result.totalDeleteManifests().value()).isEqualTo(1);
assertThat(result.totalFileSizeInBytes().value()).isEqualTo(30L);
assertThat(result.totalDeleteFileSizeInBytes().value()).isEqualTo(20L);
assertThat(result.totalDeleteFileSizeInBytes().value())
.isEqualTo(contentSize(fileADeletes(), fileBDeletes()));
assertThat(result.skippedDataFiles().value()).isEqualTo(0);
assertThat(result.skippedDeleteFiles().value()).isEqualTo(0);
assertThat(result.indexedDeleteFiles().value()).isEqualTo(2);
assertThat(result.equalityDeleteFiles().value()).isEqualTo(0);
assertThat(result.positionalDeleteFiles().value()).isEqualTo(2);
if (formatVersion == 2) {
assertThat(result.positionalDeleteFiles().value()).isEqualTo(2);
assertThat(result.dvs().value()).isEqualTo(0);
} else {
assertThat(result.positionalDeleteFiles().value()).isEqualTo(0);
assertThat(result.dvs().value()).isEqualTo(2);
}
}

@TestTemplate
Expand Down Expand Up @@ -264,8 +271,8 @@ public void scanningWithSkippedDeleteFiles() throws IOException {
tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, reporter);
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_D).commit();
table.newOverwrite().deleteFile(FILE_A).addFile(FILE_A2).commit();
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_D2_DELETES).commit();
table.newRowDelta().addDeletes(FILE_B_DELETES).addDeletes(FILE_C2_DELETES).commit();
table.newRowDelta().addDeletes(fileADeletes()).addDeletes(FILE_D2_DELETES).commit();
table.newRowDelta().addDeletes(fileBDeletes()).addDeletes(FILE_C2_DELETES).commit();
ScanT tableScan = newScan(table);

List<FileScanTask> fileTasks = Lists.newArrayList();
Expand Down Expand Up @@ -308,7 +315,7 @@ public void scanningWithEqualityAndPositionalDeleteFiles() throws IOException {
tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, reporter);
table.newAppend().appendFile(FILE_A).commit();
// FILE_A_DELETES = positionalDelete / FILE_A2_DELETES = equalityDelete
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A2_DELETES).commit();
table.newRowDelta().addDeletes(fileADeletes()).addDeletes(FILE_A2_DELETES).commit();
ScanT tableScan = newScan(table);

try (CloseableIterable<T> fileScanTasks =
Expand All @@ -321,7 +328,13 @@ public void scanningWithEqualityAndPositionalDeleteFiles() throws IOException {
ScanMetricsResult result = scanReport.scanMetrics();
assertThat(result.indexedDeleteFiles().value()).isEqualTo(2);
assertThat(result.equalityDeleteFiles().value()).isEqualTo(1);
assertThat(result.positionalDeleteFiles().value()).isEqualTo(1);
if (formatVersion == 2) {
assertThat(result.positionalDeleteFiles().value()).isEqualTo(1);
assertThat(result.dvs().value()).isEqualTo(0);
} else {
assertThat(result.positionalDeleteFiles().value()).isEqualTo(0);
assertThat(result.dvs().value()).isEqualTo(1);
}
}

static class TestMetricsReporter implements MetricsReporter {
Expand Down
Loading

0 comments on commit af3fbfe

Please sign in to comment.