Skip to content

Commit

Permalink
Core: Look up targeted position deletes by path (#9251)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Dec 18, 2023
1 parent 9342f64 commit ad3cf9d
Show file tree
Hide file tree
Showing 9 changed files with 686 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -150,6 +151,10 @@ public Set<Entry<CharSequence, V>> entrySet() {
return entrySet;
}

public V computeIfAbsent(CharSequence key, Supplier<V> valueSupplier) {
return computeIfAbsent(key, ignored -> valueSupplier.get());
}

@Override
public boolean equals(Object other) {
if (this == other) {
Expand Down
547 changes: 308 additions & 239 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/ArrayUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,4 +320,31 @@ public static boolean isStrictlyAscending(long[] array) {

return true;
}

@SuppressWarnings("unchecked")
public static <T> T[] concat(Class<T> type, T[]... arrays) {
T[] result = (T[]) Array.newInstance(type, totalLength(arrays));

int currentLength = 0;

for (T[] array : arrays) {
int length = array.length;
if (length > 0) {
System.arraycopy(array, 0, result, currentLength, length);
currentLength += length;
}
}

return result;
}

private static int totalLength(Object[][] arrays) {
int totalLength = 0;

for (Object[] array : arrays) {
totalLength += array.length;
}

return totalLength;
}
}
34 changes: 34 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@
*/
package org.apache.iceberg.util;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;

public class ContentFileUtil {
private ContentFileUtil() {}
Expand All @@ -41,4 +48,31 @@ public static <F extends ContentFile<K>, K> K copy(
return file.copyWithoutStats();
}
}

public static CharSequence referencedDataFile(DeleteFile deleteFile) {
if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
return null;
}

int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId();
Type pathType = MetadataColumns.DELETE_FILE_PATH.type();

Map<Integer, ByteBuffer> lowerBounds = deleteFile.lowerBounds();
ByteBuffer lowerPathBound = lowerBounds != null ? lowerBounds.get(pathId) : null;
if (lowerPathBound == null) {
return null;
}

Map<Integer, ByteBuffer> upperBounds = deleteFile.upperBounds();
ByteBuffer upperPathBound = upperBounds != null ? upperBounds.get(pathId) : null;
if (upperPathBound == null) {
return null;
}

if (lowerPathBound.equals(upperPathBound)) {
return Conversions.fromByteBuffer(pathType, lowerPathBound);
} else {
return null;
}
}
}
71 changes: 71 additions & 0 deletions core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@

import static org.apache.iceberg.expressions.Expressions.bucket;
import static org.apache.iceberg.expressions.Expressions.equal;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import org.apache.iceberg.DeleteFileIndex.EqualityDeletes;
import org.apache.iceberg.DeleteFileIndex.PositionDeletes;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -514,4 +519,70 @@ public void testPartitionedTableWithExistingDeleteFile() {
Sets.newHashSet(FILE_A_EQ_1.path(), FILE_A_POS_1.path()),
Sets.newHashSet(Iterables.transform(task.deletes(), ContentFile::path)));
}

@Test
public void testPositionDeletesGroup() {
DeleteFile file1 = withDataSequenceNumber(1, partitionedPosDeletes(SPEC, FILE_A.partition()));
DeleteFile file2 = withDataSequenceNumber(2, partitionedPosDeletes(SPEC, FILE_A.partition()));
DeleteFile file3 = withDataSequenceNumber(3, partitionedPosDeletes(SPEC, FILE_A.partition()));
DeleteFile file4 = withDataSequenceNumber(4, partitionedPosDeletes(SPEC, FILE_A.partition()));

PositionDeletes group = new PositionDeletes();
group.add(file4);
group.add(file2);
group.add(file1);
group.add(file3);

// the group must not be empty
assertThat(group.isEmpty()).isFalse();

// all files must be reported as referenced
CharSequenceSet paths =
CharSequenceSet.of(Iterables.transform(group.referencedDeleteFiles(), ContentFile::path));
assertThat(paths).contains(file1.path(), file2.path(), file3.path(), file4.path());

// position deletes are indexed by their data sequence numbers
// so that position deletes can apply to data files added in the same snapshot
assertThat(group.filter(0)).isEqualTo(new DeleteFile[] {file1, file2, file3, file4});
assertThat(group.filter(1)).isEqualTo(new DeleteFile[] {file1, file2, file3, file4});
assertThat(group.filter(2)).isEqualTo(new DeleteFile[] {file2, file3, file4});
assertThat(group.filter(3)).isEqualTo(new DeleteFile[] {file3, file4});
assertThat(group.filter(4)).isEqualTo(new DeleteFile[] {file4});
assertThat(group.filter(5)).isEqualTo(new DeleteFile[] {});

// it should not be possible to add more elements upon indexing
assertThatThrownBy(() -> group.add(file1)).isInstanceOf(IllegalStateException.class);
}

@Test
public void testEqualityDeletesGroup() {
DeleteFile file1 = withDataSequenceNumber(1, partitionedEqDeletes(SPEC, FILE_A.partition()));
DeleteFile file2 = withDataSequenceNumber(2, partitionedEqDeletes(SPEC, FILE_A.partition()));
DeleteFile file3 = withDataSequenceNumber(3, partitionedEqDeletes(SPEC, FILE_A.partition()));
DeleteFile file4 = withDataSequenceNumber(4, partitionedEqDeletes(SPEC, FILE_A.partition()));

EqualityDeletes group = new EqualityDeletes();
group.add(SPEC, file4);
group.add(SPEC, file2);
group.add(SPEC, file1);
group.add(SPEC, file3);

// the group must not be empty
assertThat(group.isEmpty()).isFalse();

// all files must be reported as referenced
CharSequenceSet paths =
CharSequenceSet.of(Iterables.transform(group.referencedDeleteFiles(), ContentFile::path));
assertThat(paths).contains(file1.path(), file2.path(), file3.path(), file4.path());

// equality deletes are indexed by data sequence number - 1 to apply to next snapshots
assertThat(group.filter(0, FILE_A)).isEqualTo(new DeleteFile[] {file1, file2, file3, file4});
assertThat(group.filter(1, FILE_A)).isEqualTo(new DeleteFile[] {file2, file3, file4});
assertThat(group.filter(2, FILE_A)).isEqualTo(new DeleteFile[] {file3, file4});
assertThat(group.filter(3, FILE_A)).isEqualTo(new DeleteFile[] {file4});
assertThat(group.filter(4, FILE_A)).isEqualTo(new DeleteFile[] {});

// it should not be possible to add more elements upon indexing
assertThatThrownBy(() -> group.add(SPEC, file1)).isInstanceOf(IllegalStateException.class);
}
}
32 changes: 32 additions & 0 deletions core/src/test/java/org/apache/iceberg/util/TestArrayUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,36 @@ public void testStrictlyAscendingLongArrays() {
long[] ascendingArray = new long[] {1, 2, 2, 3};
assertThat(ArrayUtil.isStrictlyAscending(ascendingArray)).isFalse();
}

@Test
public void testConcatWithDifferentLengthArrays() {
Integer[] array1 = {1, 2, 3};
Integer[] array2 = {4, 5};
Integer[] array3 = {6, 7, 8, 9};

Integer[] result = ArrayUtil.concat(Integer.class, array1, array2, array3);
assertThat(result).isEqualTo(new Integer[] {1, 2, 3, 4, 5, 6, 7, 8, 9});
}

@Test
public void testConcatWithEmptyArrays() {
String[] array1 = {};
String[] array2 = {"a", "b"};

String[] result = ArrayUtil.concat(String.class, array1, array2);
assertThat(result).isEqualTo(new String[] {"a", "b"});
}

@Test
public void testConcatWithSingleArray() {
Boolean[] array = {true, false};
Boolean[] result = ArrayUtil.concat(Boolean.class, array);
assertThat(result).isEqualTo(new Boolean[] {true, false});
}

@Test
public void testConcatWithNoArray() {
Character[] result = ArrayUtil.concat(Character.class);
assertThat(result).isEqualTo(new Character[] {});
}
}
14 changes: 14 additions & 0 deletions core/src/test/java/org/apache/iceberg/util/TestPartitionMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,18 @@ public void testLookupArbitraryKeyTypes() {
assertThat(map.get("some-string")).isNull();
assertThat(map.remove("some-string")).isNull();
}

@Test
public void testComputeIfAbsent() {
PartitionMap<String> map = PartitionMap.create(SPECS);

String result1 = map.computeIfAbsent(BY_DATA_SPEC.specId(), Row.of("a"), () -> "v1");
assertThat(result1).isEqualTo("v1");
assertThat(map.get(BY_DATA_SPEC.specId(), CustomRow.of("a"))).isEqualTo("v1");

// verify existing key is not affected
String result2 = map.computeIfAbsent(BY_DATA_SPEC.specId(), CustomRow.of("a"), () -> "v2");
assertThat(result2).isEqualTo("v1");
assertThat(map.get(BY_DATA_SPEC.specId(), CustomRow.of("a"))).isEqualTo("v1");
}
}
Loading

0 comments on commit ad3cf9d

Please sign in to comment.