diff --git a/api/src/main/java/org/apache/iceberg/util/CharSequenceMap.java b/api/src/main/java/org/apache/iceberg/util/CharSequenceMap.java index 98da48ed590c..e9595b45d559 100644 --- a/api/src/main/java/org/apache/iceberg/util/CharSequenceMap.java +++ b/api/src/main/java/org/apache/iceberg/util/CharSequenceMap.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -150,6 +151,10 @@ public Set> entrySet() { return entrySet; } + public V computeIfAbsent(CharSequence key, Supplier valueSupplier) { + return computeIfAbsent(key, ignored -> valueSupplier.get()); + } + @Override public boolean equals(Object other) { if (this == other) { diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index 02e55c595f3c..6c69a6e01370 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -32,8 +32,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -44,21 +42,19 @@ import org.apache.iceberg.metrics.ScanMetrics; import org.apache.iceberg.metrics.ScanMetricsUtil; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; -import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.CharSequenceMap; +import org.apache.iceberg.util.ContentFileUtil; +import org.apache.iceberg.util.PartitionMap; import org.apache.iceberg.util.PartitionSet; -import org.apache.iceberg.util.StructLikeWrapper; import org.apache.iceberg.util.Tasks; /** @@ -69,28 +65,26 @@ * file. */ class DeleteFileIndex { - private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + private static final DeleteFile[] EMPTY_DELETES = new DeleteFile[0]; - private final Map partitionTypeById; - private final Map> wrapperById; - private final DeleteFileGroup globalDeletes; - private final Map, DeleteFileGroup> deletesByPartition; + private final EqualityDeletes globalDeletes; + private final PartitionMap eqDeletesByPartition; + private final PartitionMap posDeletesByPartition; + private final CharSequenceMap posDeletesByPath; private final boolean isEmpty; - private final boolean useColumnStatsFiltering; private DeleteFileIndex( - Map specs, - DeleteFileGroup globalDeletes, - Map, DeleteFileGroup> deletesByPartition, - boolean useColumnStatsFiltering) { - ImmutableMap.Builder builder = ImmutableMap.builder(); - specs.forEach((specId, spec) -> builder.put(specId, spec.partitionType())); - this.partitionTypeById = builder.build(); - this.wrapperById = wrappers(specs); + EqualityDeletes globalDeletes, + PartitionMap eqDeletesByPartition, + PartitionMap posDeletesByPartition, + CharSequenceMap posDeletesByPath) { this.globalDeletes = globalDeletes; - this.deletesByPartition = deletesByPartition; - this.isEmpty = globalDeletes == null && deletesByPartition.isEmpty(); - this.useColumnStatsFiltering = useColumnStatsFiltering; + this.eqDeletesByPartition = eqDeletesByPartition; + this.posDeletesByPartition = posDeletesByPartition; + this.posDeletesByPath = posDeletesByPath; + boolean noEqDeletes = globalDeletes == null && eqDeletesByPartition == null; + boolean noPosDeletes = posDeletesByPartition == null && posDeletesByPath == null; + this.isEmpty = noEqDeletes && noPosDeletes; } public boolean isEmpty() { @@ -104,28 +98,25 @@ public Iterable referencedDeleteFiles() { deleteFiles = Iterables.concat(deleteFiles, globalDeletes.referencedDeleteFiles()); } - for (DeleteFileGroup partitionDeletes : deletesByPartition.values()) { - deleteFiles = Iterables.concat(deleteFiles, partitionDeletes.referencedDeleteFiles()); + if (eqDeletesByPartition != null) { + for (EqualityDeletes deletes : eqDeletesByPartition.values()) { + deleteFiles = Iterables.concat(deleteFiles, deletes.referencedDeleteFiles()); + } } - return deleteFiles; - } - - // use HashMap with precomputed values instead of thread-safe collections loaded on demand - // as the cache is being accessed for each data file and the lookup speed is critical - private Map> wrappers(Map specs) { - Map> wrappers = Maps.newHashMap(); - specs.forEach((specId, spec) -> wrappers.put(specId, newWrapper(specId))); - return wrappers; - } + if (posDeletesByPartition != null) { + for (PositionDeletes deletes : posDeletesByPartition.values()) { + deleteFiles = Iterables.concat(deleteFiles, deletes.referencedDeleteFiles()); + } + } - private ThreadLocal newWrapper(int specId) { - return ThreadLocal.withInitial(() -> StructLikeWrapper.forType(partitionTypeById.get(specId))); - } + if (posDeletesByPath != null) { + for (PositionDeletes deletes : posDeletesByPath.values()) { + deleteFiles = Iterables.concat(deleteFiles, deletes.referencedDeleteFiles()); + } + } - private Pair partition(int specId, StructLike struct) { - ThreadLocal wrapper = wrapperById.get(specId); - return Pair.of(specId, wrapper.get().set(struct)); + return deleteFiles; } DeleteFile[] forEntry(ManifestEntry entry) { @@ -138,95 +129,50 @@ DeleteFile[] forDataFile(DataFile file) { DeleteFile[] forDataFile(long sequenceNumber, DataFile file) { if (isEmpty) { - return NO_DELETES; - } - - Pair partition = partition(file.specId(), file.partition()); - DeleteFileGroup partitionDeletes = deletesByPartition.get(partition); - - if (globalDeletes == null && partitionDeletes == null) { - return NO_DELETES; - } else if (useColumnStatsFiltering) { - return limitWithColumnStatsFiltering(sequenceNumber, file, partitionDeletes); - } else { - return limitWithoutColumnStatsFiltering(sequenceNumber, partitionDeletes); - } - } - - // limits deletes using sequence numbers and checks whether columns stats overlap - private DeleteFile[] limitWithColumnStatsFiltering( - long sequenceNumber, DataFile file, DeleteFileGroup partitionDeletes) { - - Stream matchingDeletes; - if (partitionDeletes == null) { - matchingDeletes = globalDeletes.limit(sequenceNumber); - } else if (globalDeletes == null) { - matchingDeletes = partitionDeletes.limit(sequenceNumber); - } else { - Stream matchingGlobalDeletes = globalDeletes.limit(sequenceNumber); - Stream matchingPartitionDeletes = partitionDeletes.limit(sequenceNumber); - matchingDeletes = Stream.concat(matchingGlobalDeletes, matchingPartitionDeletes); + return EMPTY_DELETES; } - return matchingDeletes - .filter(deleteFile -> canContainDeletesForFile(file, deleteFile)) - .map(IndexedDeleteFile::wrapped) - .toArray(DeleteFile[]::new); + DeleteFile[] global = findGlobalDeletes(sequenceNumber, file); + DeleteFile[] eqPartition = findEqPartitionDeletes(sequenceNumber, file); + DeleteFile[] posPartition = findPosPartitionDeletes(sequenceNumber, file); + DeleteFile[] posPath = findPathDeletes(sequenceNumber, file); + return concat(global, eqPartition, posPartition, posPath); } - // limits deletes using sequence numbers but skips expensive column stats filtering - private DeleteFile[] limitWithoutColumnStatsFiltering( - long sequenceNumber, DeleteFileGroup partitionDeletes) { - - if (partitionDeletes == null) { - return globalDeletes.filter(sequenceNumber); - } else if (globalDeletes == null) { - return partitionDeletes.filter(sequenceNumber); - } else { - DeleteFile[] matchingGlobalDeletes = globalDeletes.filter(sequenceNumber); - DeleteFile[] matchingPartitionDeletes = partitionDeletes.filter(sequenceNumber); - return ObjectArrays.concat(matchingGlobalDeletes, matchingPartitionDeletes, DeleteFile.class); - } + private DeleteFile[] findGlobalDeletes(long seq, DataFile dataFile) { + return globalDeletes == null ? EMPTY_DELETES : globalDeletes.filter(seq, dataFile); } - private static boolean canContainDeletesForFile(DataFile dataFile, IndexedDeleteFile deleteFile) { - switch (deleteFile.content()) { - case POSITION_DELETES: - return canContainPosDeletesForFile(dataFile, deleteFile); - - case EQUALITY_DELETES: - return canContainEqDeletesForFile(dataFile, deleteFile, deleteFile.spec().schema()); + private DeleteFile[] findPosPartitionDeletes(long seq, DataFile dataFile) { + if (posDeletesByPartition == null) { + return EMPTY_DELETES; } - return true; + PositionDeletes deletes = posDeletesByPartition.get(dataFile.specId(), dataFile.partition()); + return deletes == null ? EMPTY_DELETES : deletes.filter(seq); } - private static boolean canContainPosDeletesForFile( - DataFile dataFile, IndexedDeleteFile deleteFile) { - // check that the delete file can contain the data file's file_path - if (deleteFile.hasNoLowerOrUpperBounds()) { - return true; + private DeleteFile[] findEqPartitionDeletes(long seq, DataFile dataFile) { + if (eqDeletesByPartition == null) { + return EMPTY_DELETES; } - int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId(); - Comparator comparator = Comparators.charSequences(); - - CharSequence lower = deleteFile.lowerBound(pathId); - if (lower != null && comparator.compare(dataFile.path(), lower) < 0) { - return false; - } + EqualityDeletes deletes = eqDeletesByPartition.get(dataFile.specId(), dataFile.partition()); + return deletes == null ? EMPTY_DELETES : deletes.filter(seq, dataFile); + } - CharSequence upper = deleteFile.upperBound(pathId); - if (upper != null && comparator.compare(dataFile.path(), upper) > 0) { - return false; + private DeleteFile[] findPathDeletes(long seq, DataFile dataFile) { + if (posDeletesByPath == null) { + return EMPTY_DELETES; } - return true; + PositionDeletes deletes = posDeletesByPath.get(dataFile.path()); + return deletes == null ? EMPTY_DELETES : deletes.filter(seq); } @SuppressWarnings("checkstyle:CyclomaticComplexity") private static boolean canContainEqDeletesForFile( - DataFile dataFile, IndexedDeleteFile deleteFile, Schema schema) { + DataFile dataFile, EqualityDeleteFile deleteFile) { Map dataLowers = dataFile.lowerBounds(); Map dataUppers = dataFile.upperBounds(); @@ -241,8 +187,7 @@ private static boolean canContainEqDeletesForFile( Map deleteNullCounts = deleteFile.nullValueCounts(); Map deleteValueCounts = deleteFile.valueCounts(); - for (int id : deleteFile.equalityFieldIds()) { - Types.NestedField field = schema.findField(id); + for (Types.NestedField field : deleteFile.equalityFields()) { if (!field.type().isPrimitiveType()) { // stats are not kept for nested types. assume that the delete file may match continue; @@ -271,6 +216,7 @@ && allNonNull(dataNullCounts, field)) { continue; } + int id = field.fieldId(); ByteBuffer dataLower = dataLowers.get(id); ByteBuffer dataUpper = dataUppers.get(id); Object deleteLower = deleteFile.lowerBound(id); @@ -474,68 +420,66 @@ private Collection loadDeleteFiles() { DeleteFileIndex build() { Iterable files = deleteFiles != null ? filterDeleteFiles() : loadDeleteFiles(); - boolean useColumnStatsFiltering = false; + EqualityDeletes globalDeletes = new EqualityDeletes(); + PartitionMap eqDeletesByPartition = PartitionMap.create(specsById); + PartitionMap posDeletesByPartition = PartitionMap.create(specsById); + CharSequenceMap posDeletesByPath = CharSequenceMap.create(); - // build a map from (specId, partition) to delete file entries - Map wrappersBySpecId = Maps.newHashMap(); - ListMultimap, IndexedDeleteFile> deleteFilesByPartition = - Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList); for (DeleteFile file : files) { - int specId = file.specId(); - PartitionSpec spec = specsById.get(specId); - StructLikeWrapper wrapper = - wrappersBySpecId - .computeIfAbsent(specId, id -> StructLikeWrapper.forType(spec.partitionType())) - .copyFor(file.partition()); - IndexedDeleteFile indexedFile = new IndexedDeleteFile(spec, file); - deleteFilesByPartition.put(Pair.of(specId, wrapper), indexedFile); - - if (!useColumnStatsFiltering) { - useColumnStatsFiltering = indexedFile.hasLowerAndUpperBounds(); + switch (file.content()) { + case POSITION_DELETES: + add(posDeletesByPath, posDeletesByPartition, file); + break; + case EQUALITY_DELETES: + add(globalDeletes, eqDeletesByPartition, file); + break; + default: + throw new UnsupportedOperationException("Unsupported content: " + file.content()); } - ScanMetricsUtil.indexedDeleteFile(scanMetrics, file); } - // sort the entries in each map value by sequence number and split into sequence numbers and - // delete files lists - Map, DeleteFileGroup> sortedDeletesByPartition = - Maps.newHashMap(); - // also, separate out equality deletes in an unpartitioned spec that should be applied - // globally - DeleteFileGroup globalDeletes = null; - for (Pair partition : deleteFilesByPartition.keySet()) { - if (specsById.get(partition.first()).isUnpartitioned()) { - Preconditions.checkState( - globalDeletes == null, "Detected multiple partition specs with no partitions"); - - IndexedDeleteFile[] eqFilesSortedBySeq = - deleteFilesByPartition.get(partition).stream() - .filter(file -> file.content() == FileContent.EQUALITY_DELETES) - .sorted(Comparator.comparingLong(IndexedDeleteFile::applySequenceNumber)) - .toArray(IndexedDeleteFile[]::new); - if (eqFilesSortedBySeq.length > 0) { - globalDeletes = new DeleteFileGroup(eqFilesSortedBySeq); - } + return new DeleteFileIndex( + globalDeletes.isEmpty() ? null : globalDeletes, + eqDeletesByPartition.isEmpty() ? null : eqDeletesByPartition, + posDeletesByPartition.isEmpty() ? null : posDeletesByPartition, + posDeletesByPath.isEmpty() ? null : posDeletesByPath); + } - IndexedDeleteFile[] posFilesSortedBySeq = - deleteFilesByPartition.get(partition).stream() - .filter(file -> file.content() == FileContent.POSITION_DELETES) - .sorted(Comparator.comparingLong(IndexedDeleteFile::applySequenceNumber)) - .toArray(IndexedDeleteFile[]::new); - sortedDeletesByPartition.put(partition, new DeleteFileGroup(posFilesSortedBySeq)); - - } else { - IndexedDeleteFile[] filesSortedBySeq = - deleteFilesByPartition.get(partition).stream() - .sorted(Comparator.comparingLong(IndexedDeleteFile::applySequenceNumber)) - .toArray(IndexedDeleteFile[]::new); - sortedDeletesByPartition.put(partition, new DeleteFileGroup(filesSortedBySeq)); - } + private void add( + CharSequenceMap deletesByPath, + PartitionMap deletesByPartition, + DeleteFile file) { + CharSequence path = ContentFileUtil.referencedDataFile(file); + + PositionDeletes deletes; + if (path != null) { + deletes = deletesByPath.computeIfAbsent(path, PositionDeletes::new); + } else { + int specId = file.specId(); + StructLike partition = file.partition(); + deletes = deletesByPartition.computeIfAbsent(specId, partition, PositionDeletes::new); } - return new DeleteFileIndex( - specsById, globalDeletes, sortedDeletesByPartition, useColumnStatsFiltering); + deletes.add(file); + } + + private void add( + EqualityDeletes globalDeletes, + PartitionMap deletesByPartition, + DeleteFile file) { + PartitionSpec spec = specsById.get(file.specId()); + + EqualityDeletes deletes; + if (spec.isUnpartitioned()) { + deletes = globalDeletes; + } else { + int specId = spec.specId(); + StructLike partition = file.partition(); + deletes = deletesByPartition.computeIfAbsent(specId, partition, EqualityDeletes::new); + } + + deletes.add(spec, file); } private Iterable>> deleteManifestReaders() { @@ -582,97 +526,222 @@ private Iterable>> deleteManifestRea } } - // a group of indexed delete files sorted by the sequence number they apply to - private static class DeleteFileGroup { - private final long[] seqs; - private final IndexedDeleteFile[] files; - - DeleteFileGroup(IndexedDeleteFile[] files) { - this.seqs = Arrays.stream(files).mapToLong(IndexedDeleteFile::applySequenceNumber).toArray(); - this.files = files; + /** + * Finds an index in the sorted array of sequence numbers where the given sequence number should + * be inserted or is found. + * + *

If the sequence number is present in the array, this method returns the index of the first + * occurrence of the sequence number. If the sequence number is not present, the method returns + * the index where the sequence number would be inserted while maintaining the sorted order of the + * array. This returned index ranges from 0 (inclusive) to the length of the array (inclusive). + * + *

This method is used to determine the subset of delete files that apply to a given data file. + * + * @param seqs an array of sequence numbers sorted in ascending order + * @param seq the sequence number to search for + * @return the index of the first occurrence or the insertion point + */ + private static int findStartIndex(long[] seqs, long seq) { + int pos = Arrays.binarySearch(seqs, seq); + int start; + if (pos < 0) { + // the sequence number was not found, where it would be inserted is -(pos + 1) + start = -(pos + 1); + } else { + // the sequence number was found, but may not be the first + // find the first delete file with the given sequence number by decrementing the position + start = pos; + while (start > 0 && seqs[start - 1] >= seq) { + start -= 1; + } } - DeleteFileGroup(long[] seqs, IndexedDeleteFile[] files) { - this.seqs = seqs; - this.files = files; + return start; + } + + private static DeleteFile[] concat(DeleteFile[]... deletes) { + return ArrayUtil.concat(DeleteFile.class, deletes); + } + + // a group of position delete files sorted by the sequence number they apply to + static class PositionDeletes { + private static final Comparator SEQ_COMPARATOR = + Comparator.comparingLong(DeleteFile::dataSequenceNumber); + + // indexed state + private long[] seqs = null; + private DeleteFile[] files = null; + + // a buffer that is used to hold files before indexing + private volatile List buffer = Lists.newArrayList(); + + public void add(DeleteFile file) { + Preconditions.checkState(buffer != null, "Can't add files upon indexing"); + buffer.add(file); } public DeleteFile[] filter(long seq) { - int start = findStartIndex(seq); + indexIfNeeded(); + + int start = findStartIndex(seqs, seq); if (start >= files.length) { - return NO_DELETES; + return EMPTY_DELETES; } - DeleteFile[] matchingFiles = new DeleteFile[files.length - start]; - - for (int index = start; index < files.length; index++) { - matchingFiles[index - start] = files[index].wrapped(); + if (start == 0) { + return files; } + int matchingFilesCount = files.length - start; + DeleteFile[] matchingFiles = new DeleteFile[matchingFilesCount]; + System.arraycopy(files, start, matchingFiles, 0, matchingFilesCount); return matchingFiles; } - public Stream limit(long seq) { - int start = findStartIndex(seq); - return Arrays.stream(files, start, files.length); + public Iterable referencedDeleteFiles() { + indexIfNeeded(); + return Arrays.asList(files); } - private int findStartIndex(long seq) { - int pos = Arrays.binarySearch(seqs, seq); - int start; - if (pos < 0) { - // the sequence number was not found, where it would be inserted is -(pos + 1) - start = -(pos + 1); - } else { - // the sequence number was found, but may not be the first - // find the first delete file with the given sequence number by decrementing the position - start = pos; - while (start > 0 && seqs[start - 1] >= seq) { - start -= 1; + public boolean isEmpty() { + indexIfNeeded(); + return files.length == 0; + } + + private void indexIfNeeded() { + if (buffer != null) { + synchronized (this) { + if (buffer != null) { + this.files = indexFiles(buffer); + this.seqs = indexSeqs(files); + this.buffer = null; + } } } + } + + private static DeleteFile[] indexFiles(List list) { + DeleteFile[] array = list.toArray(EMPTY_DELETES); + Arrays.sort(array, SEQ_COMPARATOR); + return array; + } + + private static long[] indexSeqs(DeleteFile[] files) { + long[] seqs = new long[files.length]; + + for (int index = 0; index < files.length; index++) { + seqs[index] = files[index].dataSequenceNumber(); + } + + return seqs; + } + } + + // a group of equality delete files sorted by the sequence number they apply to + static class EqualityDeletes { + private static final Comparator SEQ_COMPARATOR = + Comparator.comparingLong(EqualityDeleteFile::applySequenceNumber); + private static final EqualityDeleteFile[] EMPTY_EQUALITY_DELETES = new EqualityDeleteFile[0]; + + // indexed state + private long[] seqs = null; + private EqualityDeleteFile[] files = null; + + // a buffer that is used to hold files before indexing + private volatile List buffer = Lists.newArrayList(); - return start; + public void add(PartitionSpec spec, DeleteFile file) { + Preconditions.checkState(buffer != null, "Can't add files upon indexing"); + buffer.add(new EqualityDeleteFile(spec, file)); + } + + public DeleteFile[] filter(long seq, DataFile dataFile) { + indexIfNeeded(); + + int start = findStartIndex(seqs, seq); + + if (start >= files.length) { + return EMPTY_DELETES; + } + + List matchingFiles = Lists.newArrayList(); + + for (int index = start; index < files.length; index++) { + EqualityDeleteFile file = files[index]; + if (canContainEqDeletesForFile(dataFile, file)) { + matchingFiles.add(file.wrapped()); + } + } + + return matchingFiles.toArray(EMPTY_DELETES); } public Iterable referencedDeleteFiles() { - return Arrays.stream(files).map(IndexedDeleteFile::wrapped).collect(Collectors.toList()); + indexIfNeeded(); + return Iterables.transform(Arrays.asList(files), EqualityDeleteFile::wrapped); + } + + public boolean isEmpty() { + indexIfNeeded(); + return files.length == 0; + } + + private void indexIfNeeded() { + if (buffer != null) { + synchronized (this) { + if (buffer != null) { + this.files = indexFiles(buffer); + this.seqs = indexSeqs(files); + this.buffer = null; + } + } + } + } + + private static EqualityDeleteFile[] indexFiles(List list) { + EqualityDeleteFile[] array = list.toArray(EMPTY_EQUALITY_DELETES); + Arrays.sort(array, SEQ_COMPARATOR); + return array; + } + + private static long[] indexSeqs(EqualityDeleteFile[] files) { + long[] seqs = new long[files.length]; + + for (int index = 0; index < files.length; index++) { + seqs[index] = files[index].applySequenceNumber(); + } + + return seqs; } } - // a delete file wrapper that caches the converted boundaries for faster boundary checks + // an equality delete file wrapper that caches the converted boundaries for faster boundary checks // this class is not meant to be exposed beyond the delete file index - private static class IndexedDeleteFile { + private static class EqualityDeleteFile { private final PartitionSpec spec; private final DeleteFile wrapped; private final long applySequenceNumber; + private volatile List equalityFields = null; private volatile Map convertedLowerBounds = null; private volatile Map convertedUpperBounds = null; - IndexedDeleteFile(PartitionSpec spec, DeleteFile file, long applySequenceNumber) { + EqualityDeleteFile(PartitionSpec spec, DeleteFile file) { this.spec = spec; this.wrapped = file; - this.applySequenceNumber = applySequenceNumber; + this.applySequenceNumber = wrapped.dataSequenceNumber() - 1; } - IndexedDeleteFile(PartitionSpec spec, DeleteFile file) { - this.spec = spec; - this.wrapped = file; - - if (file.content() == FileContent.EQUALITY_DELETES) { - this.applySequenceNumber = file.dataSequenceNumber() - 1; - } else { - this.applySequenceNumber = file.dataSequenceNumber(); - } + public DeleteFile wrapped() { + return wrapped; } public PartitionSpec spec() { return spec; } - public DeleteFile wrapped() { - return wrapped; + public StructLike partition() { + return wrapped.partition(); } public long applySequenceNumber() { @@ -683,8 +752,21 @@ public FileContent content() { return wrapped.content(); } - public List equalityFieldIds() { - return wrapped.equalityFieldIds(); + public List equalityFields() { + if (equalityFields == null) { + synchronized (this) { + if (equalityFields == null) { + List fields = Lists.newArrayList(); + for (int id : wrapped.equalityFieldIds()) { + Types.NestedField field = spec.schema().findField(id); + fields.add(field); + } + this.equalityFields = fields; + } + } + } + + return equalityFields; } public Map valueCounts() { @@ -699,10 +781,6 @@ public Map nanValueCounts() { return wrapped.nanValueCounts(); } - public boolean hasNoLowerOrUpperBounds() { - return wrapped.lowerBounds() == null || wrapped.upperBounds() == null; - } - public boolean hasLowerAndUpperBounds() { return wrapped.lowerBounds() != null && wrapped.upperBounds() != null; } @@ -745,22 +823,13 @@ private Map convertBounds(Map bounds) { Map converted = Maps.newHashMap(); if (bounds != null) { - if (wrapped.content() == FileContent.POSITION_DELETES) { - Type pathType = MetadataColumns.DELETE_FILE_PATH.type(); - int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId(); - ByteBuffer bound = bounds.get(pathId); - if (bound != null) { - converted.put(pathId, Conversions.fromByteBuffer(pathType, bound)); - } - - } else { - for (int id : equalityFieldIds()) { - Type type = spec.schema().findField(id).type(); - if (type.isPrimitiveType()) { - ByteBuffer bound = bounds.get(id); - if (bound != null) { - converted.put(id, Conversions.fromByteBuffer(type, bound)); - } + for (Types.NestedField field : equalityFields()) { + int id = field.fieldId(); + Type type = spec.schema().findField(id).type(); + if (type.isPrimitiveType()) { + ByteBuffer bound = bounds.get(id); + if (bound != null) { + converted.put(id, Conversions.fromByteBuffer(type, bound)); } } } diff --git a/core/src/main/java/org/apache/iceberg/util/ArrayUtil.java b/core/src/main/java/org/apache/iceberg/util/ArrayUtil.java index fae8464c6765..4c9941b51677 100644 --- a/core/src/main/java/org/apache/iceberg/util/ArrayUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/ArrayUtil.java @@ -320,4 +320,31 @@ public static boolean isStrictlyAscending(long[] array) { return true; } + + @SuppressWarnings("unchecked") + public static T[] concat(Class type, T[]... arrays) { + T[] result = (T[]) Array.newInstance(type, totalLength(arrays)); + + int currentLength = 0; + + for (T[] array : arrays) { + int length = array.length; + if (length > 0) { + System.arraycopy(array, 0, result, currentLength, length); + currentLength += length; + } + } + + return result; + } + + private static int totalLength(Object[][] arrays) { + int totalLength = 0; + + for (Object[] array : arrays) { + totalLength += array.length; + } + + return totalLength; + } } diff --git a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java b/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java index d027567e069c..fc68f14d0971 100644 --- a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java @@ -18,8 +18,15 @@ */ package org.apache.iceberg.util; +import java.nio.ByteBuffer; +import java.util.Map; import java.util.Set; import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; public class ContentFileUtil { private ContentFileUtil() {} @@ -41,4 +48,31 @@ public static , K> K copy( return file.copyWithoutStats(); } } + + public static CharSequence referencedDataFile(DeleteFile deleteFile) { + if (deleteFile.content() == FileContent.EQUALITY_DELETES) { + return null; + } + + int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId(); + Type pathType = MetadataColumns.DELETE_FILE_PATH.type(); + + Map lowerBounds = deleteFile.lowerBounds(); + ByteBuffer lowerPathBound = lowerBounds != null ? lowerBounds.get(pathId) : null; + if (lowerPathBound == null) { + return null; + } + + Map upperBounds = deleteFile.upperBounds(); + ByteBuffer upperPathBound = upperBounds != null ? upperBounds.get(pathId) : null; + if (upperPathBound == null) { + return null; + } + + if (lowerPathBound.equals(upperPathBound)) { + return Conversions.fromByteBuffer(pathType, lowerPathBound); + } else { + return null; + } + } } diff --git a/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java b/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java index 6f33be094892..6354c3ee18d5 100644 --- a/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java +++ b/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java @@ -20,16 +20,21 @@ import static org.apache.iceberg.expressions.Expressions.bucket; import static org.apache.iceberg.expressions.Expressions.equal; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.UUID; +import org.apache.iceberg.DeleteFileIndex.EqualityDeletes; +import org.apache.iceberg.DeleteFileIndex.PositionDeletes; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.CharSequenceSet; import org.junit.Assert; import org.junit.Test; @@ -514,4 +519,70 @@ public void testPartitionedTableWithExistingDeleteFile() { Sets.newHashSet(FILE_A_EQ_1.path(), FILE_A_POS_1.path()), Sets.newHashSet(Iterables.transform(task.deletes(), ContentFile::path))); } + + @Test + public void testPositionDeletesGroup() { + DeleteFile file1 = withDataSequenceNumber(1, partitionedPosDeletes(SPEC, FILE_A.partition())); + DeleteFile file2 = withDataSequenceNumber(2, partitionedPosDeletes(SPEC, FILE_A.partition())); + DeleteFile file3 = withDataSequenceNumber(3, partitionedPosDeletes(SPEC, FILE_A.partition())); + DeleteFile file4 = withDataSequenceNumber(4, partitionedPosDeletes(SPEC, FILE_A.partition())); + + PositionDeletes group = new PositionDeletes(); + group.add(file4); + group.add(file2); + group.add(file1); + group.add(file3); + + // the group must not be empty + assertThat(group.isEmpty()).isFalse(); + + // all files must be reported as referenced + CharSequenceSet paths = + CharSequenceSet.of(Iterables.transform(group.referencedDeleteFiles(), ContentFile::path)); + assertThat(paths).contains(file1.path(), file2.path(), file3.path(), file4.path()); + + // position deletes are indexed by their data sequence numbers + // so that position deletes can apply to data files added in the same snapshot + assertThat(group.filter(0)).isEqualTo(new DeleteFile[] {file1, file2, file3, file4}); + assertThat(group.filter(1)).isEqualTo(new DeleteFile[] {file1, file2, file3, file4}); + assertThat(group.filter(2)).isEqualTo(new DeleteFile[] {file2, file3, file4}); + assertThat(group.filter(3)).isEqualTo(new DeleteFile[] {file3, file4}); + assertThat(group.filter(4)).isEqualTo(new DeleteFile[] {file4}); + assertThat(group.filter(5)).isEqualTo(new DeleteFile[] {}); + + // it should not be possible to add more elements upon indexing + assertThatThrownBy(() -> group.add(file1)).isInstanceOf(IllegalStateException.class); + } + + @Test + public void testEqualityDeletesGroup() { + DeleteFile file1 = withDataSequenceNumber(1, partitionedEqDeletes(SPEC, FILE_A.partition())); + DeleteFile file2 = withDataSequenceNumber(2, partitionedEqDeletes(SPEC, FILE_A.partition())); + DeleteFile file3 = withDataSequenceNumber(3, partitionedEqDeletes(SPEC, FILE_A.partition())); + DeleteFile file4 = withDataSequenceNumber(4, partitionedEqDeletes(SPEC, FILE_A.partition())); + + EqualityDeletes group = new EqualityDeletes(); + group.add(SPEC, file4); + group.add(SPEC, file2); + group.add(SPEC, file1); + group.add(SPEC, file3); + + // the group must not be empty + assertThat(group.isEmpty()).isFalse(); + + // all files must be reported as referenced + CharSequenceSet paths = + CharSequenceSet.of(Iterables.transform(group.referencedDeleteFiles(), ContentFile::path)); + assertThat(paths).contains(file1.path(), file2.path(), file3.path(), file4.path()); + + // equality deletes are indexed by data sequence number - 1 to apply to next snapshots + assertThat(group.filter(0, FILE_A)).isEqualTo(new DeleteFile[] {file1, file2, file3, file4}); + assertThat(group.filter(1, FILE_A)).isEqualTo(new DeleteFile[] {file2, file3, file4}); + assertThat(group.filter(2, FILE_A)).isEqualTo(new DeleteFile[] {file3, file4}); + assertThat(group.filter(3, FILE_A)).isEqualTo(new DeleteFile[] {file4}); + assertThat(group.filter(4, FILE_A)).isEqualTo(new DeleteFile[] {}); + + // it should not be possible to add more elements upon indexing + assertThatThrownBy(() -> group.add(SPEC, file1)).isInstanceOf(IllegalStateException.class); + } } diff --git a/core/src/test/java/org/apache/iceberg/util/TestArrayUtil.java b/core/src/test/java/org/apache/iceberg/util/TestArrayUtil.java index dbacc9076e47..e013cc63bc70 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestArrayUtil.java +++ b/core/src/test/java/org/apache/iceberg/util/TestArrayUtil.java @@ -41,4 +41,36 @@ public void testStrictlyAscendingLongArrays() { long[] ascendingArray = new long[] {1, 2, 2, 3}; assertThat(ArrayUtil.isStrictlyAscending(ascendingArray)).isFalse(); } + + @Test + public void testConcatWithDifferentLengthArrays() { + Integer[] array1 = {1, 2, 3}; + Integer[] array2 = {4, 5}; + Integer[] array3 = {6, 7, 8, 9}; + + Integer[] result = ArrayUtil.concat(Integer.class, array1, array2, array3); + assertThat(result).isEqualTo(new Integer[] {1, 2, 3, 4, 5, 6, 7, 8, 9}); + } + + @Test + public void testConcatWithEmptyArrays() { + String[] array1 = {}; + String[] array2 = {"a", "b"}; + + String[] result = ArrayUtil.concat(String.class, array1, array2); + assertThat(result).isEqualTo(new String[] {"a", "b"}); + } + + @Test + public void testConcatWithSingleArray() { + Boolean[] array = {true, false}; + Boolean[] result = ArrayUtil.concat(Boolean.class, array); + assertThat(result).isEqualTo(new Boolean[] {true, false}); + } + + @Test + public void testConcatWithNoArray() { + Character[] result = ArrayUtil.concat(Character.class); + assertThat(result).isEqualTo(new Character[] {}); + } } diff --git a/core/src/test/java/org/apache/iceberg/util/TestPartitionMap.java b/core/src/test/java/org/apache/iceberg/util/TestPartitionMap.java index 268f7eada88d..63b6f49e0bcb 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestPartitionMap.java +++ b/core/src/test/java/org/apache/iceberg/util/TestPartitionMap.java @@ -293,4 +293,18 @@ public void testLookupArbitraryKeyTypes() { assertThat(map.get("some-string")).isNull(); assertThat(map.remove("some-string")).isNull(); } + + @Test + public void testComputeIfAbsent() { + PartitionMap map = PartitionMap.create(SPECS); + + String result1 = map.computeIfAbsent(BY_DATA_SPEC.specId(), Row.of("a"), () -> "v1"); + assertThat(result1).isEqualTo("v1"); + assertThat(map.get(BY_DATA_SPEC.specId(), CustomRow.of("a"))).isEqualTo("v1"); + + // verify existing key is not affected + String result2 = map.computeIfAbsent(BY_DATA_SPEC.specId(), CustomRow.of("a"), () -> "v2"); + assertThat(result2).isEqualTo("v1"); + assertThat(map.get(BY_DATA_SPEC.specId(), CustomRow.of("a"))).isEqualTo("v1"); + } } diff --git a/data/src/test/java/org/apache/iceberg/data/TestDataFileIndexStatsFilters.java b/data/src/test/java/org/apache/iceberg/data/TestDataFileIndexStatsFilters.java index d87c2eeb4c98..7961755cdbfb 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestDataFileIndexStatsFilters.java +++ b/data/src/test/java/org/apache/iceberg/data/TestDataFileIndexStatsFilters.java @@ -18,19 +18,28 @@ */ package org.apache.iceberg.data; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + import java.io.File; import java.io.IOException; import java.util.List; import java.util.stream.Collectors; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Files; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers.Row; import org.apache.iceberg.TestTables; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; @@ -53,6 +62,8 @@ public class TestDataFileIndexStatsFilters { private Table table; private List records = null; + private List oddRecords = null; + private List evenRecords = null; private DataFile dataFile = null; private DataFile dataFileWithoutNulls = null; private DataFile dataFileOnlyNulls = null; @@ -74,6 +85,15 @@ public void createTableAndData() throws IOException { records.add(record.copy("id", 7, "data", "g", "category", "odd")); records.add(record.copy("id", 8, "data", null, "category", "even")); + this.oddRecords = + records.stream() + .filter(rec -> rec.getField("category").equals("odd")) + .collect(Collectors.toList()); + this.evenRecords = + records.stream() + .filter(rec -> rec.getField("category").equals("even")) + .collect(Collectors.toList()); + this.dataFile = FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), records); this.dataFileWithoutNulls = FileHelpers.writeDataFile( @@ -318,4 +338,149 @@ public void testEqualityDeletePlanningStatsSomeNullValuesWithSomeNullDeletes() Assert.assertEquals( "Should have one delete file, data and deletes have null values", 1, task.deletes().size()); } + + @Test + public void testDifferentDeleteTypes() throws IOException { + // init the table with an unpartitioned data file + table.newAppend().appendFile(dataFile).commit(); + + // add a matching global equality delete + DeleteFile globalEqDeleteFile1 = writeEqDeletes("id", 7, 8); + table.newRowDelta().addDeletes(globalEqDeleteFile1).commit(); + + // evolve the spec to partition by category + table.updateSpec().addField("category").commit(); + + StructLike evenPartition = Row.of("even"); + StructLike oddPartition = Row.of("odd"); + + // add 2 data files to "even" and "odd" partitions + DataFile dataFileWithEvenRecords = writeData(evenPartition, evenRecords); + DataFile dataFileWithOddRecords = writeData(oddPartition, oddRecords); + table + .newFastAppend() + .appendFile(dataFileWithEvenRecords) + .appendFile(dataFileWithOddRecords) + .commit(); + + // add 2 matching and 1 filterable partition-scoped equality delete files for "even" partition + DeleteFile partitionEqDeleteFile1 = writeEqDeletes(evenPartition, "id", 2); + DeleteFile partitionEqDeleteFile2 = writeEqDeletes(evenPartition, "id", 4); + DeleteFile partitionEqDeleteFile3 = writeEqDeletes(evenPartition, "id", 25); + table + .newRowDelta() + .addDeletes(partitionEqDeleteFile1) + .addDeletes(partitionEqDeleteFile2) + .addDeletes(partitionEqDeleteFile3) + .commit(); + + // add 1 matching partition-scoped position delete file for "even" partition + Pair partitionPosDeletes = + writePosDeletes( + evenPartition, + ImmutableList.of( + Pair.of(dataFileWithEvenRecords.path(), 0L), + Pair.of("some-other-file.parquet", 0L))); + table + .newRowDelta() + .addDeletes(partitionPosDeletes.first()) + .validateDataFilesExist(partitionPosDeletes.second()) + .commit(); + + // add 1 path-scoped position delete file for dataFileWithEvenRecords + Pair pathPosDeletes = + writePosDeletes( + evenPartition, + ImmutableList.of( + Pair.of(dataFileWithEvenRecords.path(), 1L), + Pair.of(dataFileWithEvenRecords.path(), 2L))); + table + .newRowDelta() + .addDeletes(pathPosDeletes.first()) + .validateDataFilesExist(pathPosDeletes.second()) + .commit(); + + // switch back to the unpartitioned spec + table.updateSpec().removeField("category").commit(); + + // add another global equality delete file that can be filtered using stats + DeleteFile globalEqDeleteFile2 = writeEqDeletes("id", 20, 21); + table.newRowDelta().addDeletes(globalEqDeleteFile2); + + List tasks = planTasks(); + + assertThat(tasks).hasSize(3); + + for (FileScanTask task : tasks) { + if (coversDataFile(task, dataFile)) { + assertDeletes(task, globalEqDeleteFile1); + + } else if (coversDataFile(task, dataFileWithEvenRecords)) { + assertDeletes( + task, + partitionEqDeleteFile1, + partitionEqDeleteFile2, + pathPosDeletes.first(), + partitionPosDeletes.first()); + + } else if (coversDataFile(task, dataFileWithOddRecords)) { + assertThat(task.deletes()).isEmpty(); + + } else { + fail("Unexpected task: " + task); + } + } + } + + private boolean coversDataFile(FileScanTask task, DataFile file) { + return task.file().path().toString().equals(file.path().toString()); + } + + private void assertDeletes(FileScanTask task, DeleteFile... expectedDeleteFiles) { + CharSequenceSet actualDeletePaths = deletePaths(task); + + assertThat(actualDeletePaths.size()).isEqualTo(expectedDeleteFiles.length); + + for (DeleteFile expectedDeleteFile : expectedDeleteFiles) { + assertThat(actualDeletePaths.contains(expectedDeleteFile.path())).isTrue(); + } + } + + private CharSequenceSet deletePaths(FileScanTask task) { + return CharSequenceSet.of(Iterables.transform(task.deletes(), ContentFile::path)); + } + + private List planTasks() throws IOException { + try (CloseableIterable tasksIterable = table.newScan().planFiles()) { + return Lists.newArrayList(tasksIterable); + } + } + + private DataFile writeData(StructLike partition, List data) throws IOException { + return FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), partition, data); + } + + private DeleteFile writeEqDeletes(String col, Object... values) throws IOException { + return writeEqDeletes(null /* unpartitioned */, col, values); + } + + private DeleteFile writeEqDeletes(StructLike partition, String col, Object... values) + throws IOException { + Schema deleteSchema = SCHEMA.select(col); + + Record delete = GenericRecord.create(deleteSchema); + List deletes = Lists.newArrayList(); + for (Object value : values) { + deletes.add(delete.copy(col, value)); + } + + OutputFile out = Files.localOutput(temp.newFile()); + return FileHelpers.writeDeleteFile(table, out, partition, deletes, deleteSchema); + } + + private Pair writePosDeletes( + StructLike partition, List> deletes) throws IOException { + OutputFile out = Files.localOutput(temp.newFile()); + return FileHelpers.writeDeleteFile(table, out, partition, deletes); + } } diff --git a/spark/v3.5/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java b/spark/v3.5/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java index 73bbbd85235d..365dff804c75 100644 --- a/spark/v3.5/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java +++ b/spark/v3.5/spark-extensions/src/jmh/java/org/apache/iceberg/DeleteFileIndexBenchmark.java @@ -41,6 +41,7 @@ import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; @@ -80,6 +81,9 @@ public class DeleteFileIndexBenchmark { private List dataFiles; + @Param({"true", "false"}) + private boolean oneToOneMapping; + @Setup public void setupBenchmark() throws NoSuchTableException, ParseException { setupSpark(); @@ -88,6 +92,14 @@ public void setupBenchmark() throws NoSuchTableException, ParseException { loadDataFiles(); } + private void initDataAndDeletes() { + if (oneToOneMapping) { + initDataAndFileScopedDeletes(); + } else { + initDataAndPartitionScopedDeletes(); + } + } + @TearDown public void tearDownBenchmark() { dropTable(); @@ -134,7 +146,7 @@ private DeleteFileIndex buildDeletes() { .build(); } - private void initDataAndDeletes() { + private void initDataAndPartitionScopedDeletes() { for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS; partitionOrdinal++) { StructLike partition = TestHelpers.Row.of(partitionOrdinal); @@ -154,6 +166,23 @@ private void initDataAndDeletes() { } } + private void initDataAndFileScopedDeletes() { + for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS; partitionOrdinal++) { + StructLike partition = TestHelpers.Row.of(partitionOrdinal); + + RowDelta rowDelta = table.newRowDelta(); + + for (int fileOrdinal = 0; fileOrdinal < NUM_DATA_FILES_PER_PARTITION; fileOrdinal++) { + DataFile dataFile = FileGenerationUtil.generateDataFile(table, partition); + DeleteFile deleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile); + rowDelta.addRows(dataFile); + rowDelta.addDeletes(deleteFile); + } + + rowDelta.commit(); + } + } + private void setupSpark() { this.spark = SparkSession.builder()