Skip to content

Commit

Permalink
Core, Hive, Nessie: Use ResolvingFileIO as default instead of HadoopF…
Browse files Browse the repository at this point in the history
…ileIO
  • Loading branch information
nastra committed Aug 9, 2023
1 parent f5f543a commit fcbc64e
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 36 deletions.
5 changes: 2 additions & 3 deletions core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class JdbcCatalog extends BaseMetastoreCatalog
private static final String NAMESPACE_EXISTS_PROPERTY = "exists";
private static final Logger LOG = LoggerFactory.getLogger(JdbcCatalog.class);
private static final Joiner SLASH = Joiner.on("/");
private static final String DEFAULT_FILE_IO_IMPL = "org.apache.iceberg.io.ResolvingFileIO";

private FileIO io;
private String catalogName = "jdbc";
Expand Down Expand Up @@ -112,9 +113,7 @@ public void initialize(String name, Map<String, String> properties) {
if (null != ioBuilder) {
this.io = ioBuilder.apply(properties);
} else {
String ioImpl =
properties.getOrDefault(
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.hadoop.HadoopFileIO");
String ioImpl = properties.getOrDefault(CatalogProperties.FILE_IO_IMPL, DEFAULT_FILE_IO_IMPL);
this.io = CatalogUtil.loadFileIO(ioImpl, properties, conf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
Expand All @@ -35,6 +36,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -97,10 +99,19 @@ private static void validateCatalogLoader(CatalogLoader loader)
private static void validateHadoopConf(Table table) {
FileIO io = table.io();
Assertions.assertThat(io)
.as("FileIO should be a HadoopFileIO")
.isInstanceOf(HadoopFileIO.class);
HadoopFileIO hadoopIO = (HadoopFileIO) io;
Assert.assertEquals("my_value", hadoopIO.conf().get("my_key"));
.as("FileIO should be a HadoopFileIO or ResolvingFileIO")
.isInstanceOfAny(HadoopFileIO.class, ResolvingFileIO.class);

Configuration conf;
if (io instanceof ResolvingFileIO) {
ResolvingFileIO resolvingFileIO = (ResolvingFileIO) io;
conf = resolvingFileIO.getConf();
} else {
HadoopFileIO hadoopIO = (HadoopFileIO) io;
conf = hadoopIO.conf();
}

Assert.assertEquals("my_value", conf.get("my_key"));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Schema;
Expand All @@ -32,6 +33,7 @@
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -94,10 +96,19 @@ private static void validateTableLoader(TableLoader loader)
private static void validateHadoopConf(Table table) {
FileIO io = table.io();
Assertions.assertThat(io)
.as("FileIO should be a HadoopFileIO")
.isInstanceOf(HadoopFileIO.class);
HadoopFileIO hadoopIO = (HadoopFileIO) io;
Assert.assertEquals("my_value", hadoopIO.conf().get("my_key"));
.as("FileIO should be a HadoopFileIO or ResolvingFileIO")
.isInstanceOfAny(HadoopFileIO.class, ResolvingFileIO.class);

Configuration conf;
if (io instanceof ResolvingFileIO) {
ResolvingFileIO resolvingFileIO = (ResolvingFileIO) io;
conf = resolvingFileIO.getConf();
} else {
HadoopFileIO hadoopIO = (HadoopFileIO) io;
conf = hadoopIO.conf();
}

Assert.assertEquals("my_value", conf.get("my_key"));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
Expand All @@ -35,6 +36,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -97,10 +99,19 @@ private static void validateCatalogLoader(CatalogLoader loader)
private static void validateHadoopConf(Table table) {
FileIO io = table.io();
Assertions.assertThat(io)
.as("FileIO should be a HadoopFileIO")
.isInstanceOf(HadoopFileIO.class);
HadoopFileIO hadoopIO = (HadoopFileIO) io;
Assert.assertEquals("my_value", hadoopIO.conf().get("my_key"));
.as("FileIO should be a HadoopFileIO or ResolvingFileIO")
.isInstanceOfAny(HadoopFileIO.class, ResolvingFileIO.class);

Configuration conf;
if (io instanceof ResolvingFileIO) {
ResolvingFileIO resolvingFileIO = (ResolvingFileIO) io;
conf = resolvingFileIO.getConf();
} else {
HadoopFileIO hadoopIO = (HadoopFileIO) io;
conf = hadoopIO.conf();
}

Assert.assertEquals("my_value", conf.get("my_key"));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Schema;
Expand All @@ -32,6 +33,7 @@
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -94,10 +96,19 @@ private static void validateTableLoader(TableLoader loader)
private static void validateHadoopConf(Table table) {
FileIO io = table.io();
Assertions.assertThat(io)
.as("FileIO should be a HadoopFileIO")
.isInstanceOf(HadoopFileIO.class);
HadoopFileIO hadoopIO = (HadoopFileIO) io;
Assert.assertEquals("my_value", hadoopIO.conf().get("my_key"));
.as("FileIO should be a HadoopFileIO or ResolvingFileIO")
.isInstanceOfAny(HadoopFileIO.class, ResolvingFileIO.class);

Configuration conf;
if (io instanceof ResolvingFileIO) {
ResolvingFileIO resolvingFileIO = (ResolvingFileIO) io;
conf = resolvingFileIO.getConf();
} else {
HadoopFileIO hadoopIO = (HadoopFileIO) io;
conf = hadoopIO.conf();
}

Assert.assertEquals("my_value", conf.get("my_key"));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
Expand All @@ -35,6 +36,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -97,10 +99,19 @@ private static void validateCatalogLoader(CatalogLoader loader)
private static void validateHadoopConf(Table table) {
FileIO io = table.io();
Assertions.assertThat(io)
.as("FileIO should be a HadoopFileIO")
.isInstanceOf(HadoopFileIO.class);
HadoopFileIO hadoopIO = (HadoopFileIO) io;
Assert.assertEquals("my_value", hadoopIO.conf().get("my_key"));
.as("FileIO should be a HadoopFileIO or ResolvingFileIO")
.isInstanceOfAny(HadoopFileIO.class, ResolvingFileIO.class);

Configuration conf;
if (io instanceof ResolvingFileIO) {
ResolvingFileIO resolvingFileIO = (ResolvingFileIO) io;
conf = resolvingFileIO.getConf();
} else {
HadoopFileIO hadoopIO = (HadoopFileIO) io;
conf = hadoopIO.conf();
}

Assert.assertEquals("my_value", conf.get("my_key"));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Schema;
Expand All @@ -32,6 +33,7 @@
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -94,10 +96,19 @@ private static void validateTableLoader(TableLoader loader)
private static void validateHadoopConf(Table table) {
FileIO io = table.io();
Assertions.assertThat(io)
.as("FileIO should be a HadoopFileIO")
.isInstanceOf(HadoopFileIO.class);
HadoopFileIO hadoopIO = (HadoopFileIO) io;
Assert.assertEquals("my_value", hadoopIO.conf().get("my_key"));
.as("FileIO should be a HadoopFileIO or ResolvingFileIO")
.isInstanceOfAny(HadoopFileIO.class, ResolvingFileIO.class);

Configuration conf;
if (io instanceof ResolvingFileIO) {
ResolvingFileIO resolvingFileIO = (ResolvingFileIO) io;
conf = resolvingFileIO.getConf();
} else {
HadoopFileIO hadoopIO = (HadoopFileIO) io;
conf = hadoopIO.conf();
}

Assert.assertEquals("my_value", conf.get("my_key"));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
Expand All @@ -72,6 +71,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa
static final String HIVE_CONF_CATALOG = "metastore.catalog.default";

private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class);
private static final String DEFAULT_FILE_IO_IMPL = "org.apache.iceberg.io.ResolvingFileIO";

private String name;
private Configuration conf;
Expand Down Expand Up @@ -104,11 +104,9 @@ public void initialize(String inputName, Map<String, String> properties) {
this.listAllTables =
Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT));

String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
this.fileIO =
fileIOImpl == null
? new HadoopFileIO(conf)
: CatalogUtil.loadFileIO(fileIOImpl, properties, conf);
String fileIOImpl =
properties.getOrDefault(CatalogProperties.FILE_IO_IMPL, DEFAULT_FILE_IO_IMPL);
this.fileIO = CatalogUtil.loadFileIO(fileIOImpl, properties, conf);

this.clients = new CachedClientPool(conf, properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class NessieCatalog extends BaseMetastoreCatalog
implements AutoCloseable, SupportsNamespaces, Configurable<Object> {

private static final Logger LOG = LoggerFactory.getLogger(NessieCatalog.class);
private static final String DEFAULT_FILE_IO_IMPL = "org.apache.iceberg.io.ResolvingFileIO";
private static final Joiner SLASH = Joiner.on("/");
private static final String NAMESPACE_LOCATION_PROPS = "location";
private NessieIcebergClient client;
Expand All @@ -79,9 +80,7 @@ public NessieCatalog() {}
@Override
public void initialize(String name, Map<String, String> options) {
Map<String, String> catalogOptions = ImmutableMap.copyOf(options);
String fileIOImpl =
options.getOrDefault(
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.hadoop.HadoopFileIO");
String fileIOImpl = options.getOrDefault(CatalogProperties.FILE_IO_IMPL, DEFAULT_FILE_IO_IMPL);
// remove nessie prefix
final Function<String, String> removePrefix =
x -> x.replace(NessieUtil.NESSIE_CONFIG_PREFIX, "");
Expand Down

0 comments on commit fcbc64e

Please sign in to comment.