diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java index 230e201dcbe7..7f7eab70c2ad 100644 --- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java +++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java @@ -19,35 +19,32 @@ package org.apache.iceberg.io; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HadoopConfigurable; import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.PeekingIterator; import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; +import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * FileIO implementation that uses location scheme to choose the correct FileIO implementation. - * Delegate FileIO implementations should support the mixin interfaces {@link - * SupportsPrefixOperations} and {@link SupportsBulkOperations}. - */ -public class ResolvingFileIO - implements FileIO, SupportsPrefixOperations, SupportsBulkOperations, HadoopConfigurable { +/** FileIO implementation that uses location scheme to choose the correct FileIO implementation. */ +public class ResolvingFileIO implements FileIO, HadoopConfigurable, SupportsBulkOperations { private static final Logger LOG = LoggerFactory.getLogger(ResolvingFileIO.class); + private static final int BATCH_SIZE = 100_000; private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO"; private static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO"; private static final String GCS_FILE_IO_IMPL = "org.apache.iceberg.gcp.gcs.GCSFileIO"; @@ -95,40 +92,30 @@ public void deleteFile(String location) { @Override public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailureException { - // peek at the first element to determine the type of FileIO - Iterator originalIterator = pathsToDelete.iterator(); - if (!originalIterator.hasNext()) { - return; - } - - PeekingIterator iterator = Iterators.peekingIterator(originalIterator); - FileIO fileIO = io(iterator.peek()); - if (!(fileIO instanceof SupportsPrefixOperations)) { - throw new UnsupportedOperationException( - "FileIO doesn't support bulk operations: " + fileIO.getClass().getName()); - } - - ((SupportsBulkOperations) fileIO).deleteFiles(() -> iterator); - } - - @Override - public Iterable listPrefix(String prefix) { - FileIO fileIO = io(prefix); - if (!(fileIO instanceof SupportsPrefixOperations)) { - throw new UnsupportedOperationException( - "FileIO doesn't support prefix operations: " + fileIO.getClass().getName()); - } - return ((SupportsPrefixOperations) fileIO).listPrefix(prefix); - } - - @Override - public void deletePrefix(String prefix) { - FileIO fileIO = io(prefix); - if (!(fileIO instanceof SupportsPrefixOperations)) { - throw new UnsupportedOperationException( - "FileIO doesn't support prefix operations: " + fileIO.getClass().getName()); - } - ((SupportsPrefixOperations) fileIO).deletePrefix(prefix); + Iterators.partition(pathsToDelete.iterator(), BATCH_SIZE) + .forEachRemaining( + partitioned -> { + Map> pathByFileIO = + partitioned.stream().collect(Collectors.groupingBy(this::io)); + for (Map.Entry> entries : pathByFileIO.entrySet()) { + FileIO io = entries.getKey(); + List filePaths = entries.getValue(); + if (io instanceof SupportsBulkOperations) { + ((SupportsBulkOperations) io).deleteFiles(filePaths); + } else { + LOG.warn( + "IO {} does not support bulk operations. Using non-bulk deletes.", + io.getClass().getName()); + Tasks.Builder deleteTasks = + Tasks.foreach(filePaths) + .noRetry() + .suppressFailureWhenFinished() + .onFailure( + (file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)); + deleteTasks.run(io::deleteFile); + } + } + }); } @Override @@ -175,7 +162,8 @@ public Configuration getConf() { return hadoopConf.get(); } - private FileIO io(String location) { + @VisibleForTesting + FileIO io(String location) { String impl = implFromLocation(location); FileIO io = ioInstances.get(impl); if (io != null) { diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java index fccdffeab8a4..e5ba7b986998 100644 --- a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java +++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java @@ -18,14 +18,30 @@ */ package org.apache.iceberg.io; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.inmemory.InMemoryFileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.Assert; import org.junit.Test; public class TestResolvingIO { + @TempDir private java.nio.file.Path temp; + @Test public void testResolvingFileIOKryoSerialization() throws IOException { FileIO testResolvingFileIO = new ResolvingFileIO(); @@ -48,4 +64,57 @@ public void testResolvingFileIOJavaSerialization() throws IOException, ClassNotF Assert.assertEquals(testResolvingFileIO.properties(), roundTripSerializedFileIO.properties()); } + + @Test + public void resolveFileIOBulkDeletion() throws IOException { + ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO()); + Configuration hadoopConf = new Configuration(); + FileSystem fs = FileSystem.getLocal(hadoopConf); + Path parent = new Path(temp.toUri()); + // configure delegation IO + HadoopFileIO delegation = new HadoopFileIO(hadoopConf); + doReturn(delegation).when(resolvingFileIO).io(anyString()); + // write + List randomFilePaths = + IntStream.range(1, 10) + .mapToObj(i -> new Path(parent, "random-" + i + "-" + UUID.randomUUID())) + .collect(Collectors.toList()); + for (Path randomFilePath : randomFilePaths) { + fs.createNewFile(randomFilePath); + assertThat(delegation.newInputFile(randomFilePath.toUri().toString()).exists()).isTrue(); + } + // bulk deletion + List randomFilePathString = + randomFilePaths.stream().map(p -> p.toUri().toString()).collect(Collectors.toList()); + resolvingFileIO.deleteFiles(randomFilePathString); + + for (String path : randomFilePathString) { + assertThat(delegation.newInputFile(path).exists()).isFalse(); + } + } + + @Test + public void resolveFileIONonBulkDeletion() { + ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO()); + String parentPath = "inmemory://foo.db/bar"; + // configure delegation IO + InMemoryFileIO delegation = new InMemoryFileIO(); + doReturn(delegation).when(resolvingFileIO).io(anyString()); + // write + byte[] someData = "some data".getBytes(); + List randomFilePaths = + IntStream.range(1, 10) + .mapToObj(i -> parentPath + "-" + i + "-" + UUID.randomUUID()) + .collect(Collectors.toList()); + for (String randomFilePath : randomFilePaths) { + delegation.addFile(randomFilePath, someData); + assertThat(delegation.fileExists(randomFilePath)).isTrue(); + } + // non-bulk deletion + resolvingFileIO.deleteFiles(randomFilePaths); + + for (String path : randomFilePaths) { + assertThat(delegation.fileExists(path)).isFalse(); + } + } }