Skip to content

Commit

Permalink
Core: Extend ResolvingFileIO to support BulkOperations (apache#7976) (a…
Browse files Browse the repository at this point in the history
…pache#139)

Co-authored-by: Hongyue/Steve Zhang <[email protected]>
  • Loading branch information
bryanck and dramaticlly authored Aug 14, 2023
1 parent 54b5a96 commit 69ca9cb
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 44 deletions.
76 changes: 32 additions & 44 deletions core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,32 @@
package org.apache.iceberg.io;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.PeekingIterator;
import org.apache.iceberg.util.SerializableMap;
import org.apache.iceberg.util.SerializableSupplier;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* FileIO implementation that uses location scheme to choose the correct FileIO implementation.
* Delegate FileIO implementations should support the mixin interfaces {@link
* SupportsPrefixOperations} and {@link SupportsBulkOperations}.
*/
public class ResolvingFileIO
implements FileIO, SupportsPrefixOperations, SupportsBulkOperations, HadoopConfigurable {
/** FileIO implementation that uses location scheme to choose the correct FileIO implementation. */
public class ResolvingFileIO implements FileIO, HadoopConfigurable, SupportsBulkOperations {
private static final Logger LOG = LoggerFactory.getLogger(ResolvingFileIO.class);
private static final int BATCH_SIZE = 100_000;
private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO";
private static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO";
private static final String GCS_FILE_IO_IMPL = "org.apache.iceberg.gcp.gcs.GCSFileIO";
Expand Down Expand Up @@ -95,40 +92,30 @@ public void deleteFile(String location) {

@Override
public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
// peek at the first element to determine the type of FileIO
Iterator<String> originalIterator = pathsToDelete.iterator();
if (!originalIterator.hasNext()) {
return;
}

PeekingIterator<String> iterator = Iterators.peekingIterator(originalIterator);
FileIO fileIO = io(iterator.peek());
if (!(fileIO instanceof SupportsPrefixOperations)) {
throw new UnsupportedOperationException(
"FileIO doesn't support bulk operations: " + fileIO.getClass().getName());
}

((SupportsBulkOperations) fileIO).deleteFiles(() -> iterator);
}

@Override
public Iterable<FileInfo> listPrefix(String prefix) {
FileIO fileIO = io(prefix);
if (!(fileIO instanceof SupportsPrefixOperations)) {
throw new UnsupportedOperationException(
"FileIO doesn't support prefix operations: " + fileIO.getClass().getName());
}
return ((SupportsPrefixOperations) fileIO).listPrefix(prefix);
}

@Override
public void deletePrefix(String prefix) {
FileIO fileIO = io(prefix);
if (!(fileIO instanceof SupportsPrefixOperations)) {
throw new UnsupportedOperationException(
"FileIO doesn't support prefix operations: " + fileIO.getClass().getName());
}
((SupportsPrefixOperations) fileIO).deletePrefix(prefix);
Iterators.partition(pathsToDelete.iterator(), BATCH_SIZE)
.forEachRemaining(
partitioned -> {
Map<FileIO, List<String>> pathByFileIO =
partitioned.stream().collect(Collectors.groupingBy(this::io));
for (Map.Entry<FileIO, List<String>> entries : pathByFileIO.entrySet()) {
FileIO io = entries.getKey();
List<String> filePaths = entries.getValue();
if (io instanceof SupportsBulkOperations) {
((SupportsBulkOperations) io).deleteFiles(filePaths);
} else {
LOG.warn(
"IO {} does not support bulk operations. Using non-bulk deletes.",
io.getClass().getName());
Tasks.Builder<String> deleteTasks =
Tasks.foreach(filePaths)
.noRetry()
.suppressFailureWhenFinished()
.onFailure(
(file, exc) -> LOG.warn("Failed to delete file: {}", file, exc));
deleteTasks.run(io::deleteFile);
}
}
});
}

@Override
Expand Down Expand Up @@ -175,7 +162,8 @@ public Configuration getConf() {
return hadoopConf.get();
}

private FileIO io(String location) {
@VisibleForTesting
FileIO io(String location) {
String impl = implFromLocation(location);
FileIO io = ioInstances.get(impl);
if (io != null) {
Expand Down
69 changes: 69 additions & 0 deletions core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,30 @@
*/
package org.apache.iceberg.io;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.inmemory.InMemoryFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Test;

public class TestResolvingIO {

@TempDir private java.nio.file.Path temp;

@Test
public void testResolvingFileIOKryoSerialization() throws IOException {
FileIO testResolvingFileIO = new ResolvingFileIO();
Expand All @@ -48,4 +64,57 @@ public void testResolvingFileIOJavaSerialization() throws IOException, ClassNotF

Assert.assertEquals(testResolvingFileIO.properties(), roundTripSerializedFileIO.properties());
}

@Test
public void resolveFileIOBulkDeletion() throws IOException {
ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO());
Configuration hadoopConf = new Configuration();
FileSystem fs = FileSystem.getLocal(hadoopConf);
Path parent = new Path(temp.toUri());
// configure delegation IO
HadoopFileIO delegation = new HadoopFileIO(hadoopConf);
doReturn(delegation).when(resolvingFileIO).io(anyString());
// write
List<Path> randomFilePaths =
IntStream.range(1, 10)
.mapToObj(i -> new Path(parent, "random-" + i + "-" + UUID.randomUUID()))
.collect(Collectors.toList());
for (Path randomFilePath : randomFilePaths) {
fs.createNewFile(randomFilePath);
assertThat(delegation.newInputFile(randomFilePath.toUri().toString()).exists()).isTrue();
}
// bulk deletion
List<String> randomFilePathString =
randomFilePaths.stream().map(p -> p.toUri().toString()).collect(Collectors.toList());
resolvingFileIO.deleteFiles(randomFilePathString);

for (String path : randomFilePathString) {
assertThat(delegation.newInputFile(path).exists()).isFalse();
}
}

@Test
public void resolveFileIONonBulkDeletion() {
ResolvingFileIO resolvingFileIO = spy(new ResolvingFileIO());
String parentPath = "inmemory://foo.db/bar";
// configure delegation IO
InMemoryFileIO delegation = new InMemoryFileIO();
doReturn(delegation).when(resolvingFileIO).io(anyString());
// write
byte[] someData = "some data".getBytes();
List<String> randomFilePaths =
IntStream.range(1, 10)
.mapToObj(i -> parentPath + "-" + i + "-" + UUID.randomUUID())
.collect(Collectors.toList());
for (String randomFilePath : randomFilePaths) {
delegation.addFile(randomFilePath, someData);
assertThat(delegation.fileExists(randomFilePath)).isTrue();
}
// non-bulk deletion
resolvingFileIO.deleteFiles(randomFilePaths);

for (String path : randomFilePaths) {
assertThat(delegation.fileExists(path)).isFalse();
}
}
}

0 comments on commit 69ca9cb

Please sign in to comment.