Skip to content

Commit

Permalink
Check table prefix empty for location conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
Steve Zhang committed Aug 3, 2023
1 parent f9218f9 commit d13fb0f
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 28 deletions.
23 changes: 19 additions & 4 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@
*/
package org.apache.iceberg;

import java.io.UncheckedIOException;
import java.util.Map;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
Expand Down Expand Up @@ -193,10 +196,14 @@ public Table create() {
String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
tableProperties.putAll(tableOverrideProperties());

if (Boolean.parseBoolean(tableProperties.get(TableProperties.UNIQUE_LOCATION))) {
boolean alreadyExists = ops.io().newInputFile(baseLocation).exists();
if (alreadyExists) {
throw new AlreadyExistsException("Table location already in use: %s", baseLocation);
if (Boolean.parseBoolean(
tableProperties.get(TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED))) {
boolean conflictLocationDetected =
(ops.io() instanceof SupportsPrefixOperations)
? !prefixEmpty((SupportsPrefixOperations) ops.io(), baseLocation)
: ops.io().newInputFile(baseLocation).exists();
if (conflictLocationDetected) {
throw new AlreadyExistsException("Table location already exists: %s", baseLocation);
}
}

Expand All @@ -212,6 +219,14 @@ public Table create() {
return new BaseTable(ops, fullTableName(name(), identifier), metricsReporter());
}

private boolean prefixEmpty(SupportsPrefixOperations io, String prefix) {
try {
return Iterables.isEmpty(io.listPrefix(prefix));
} catch (UncheckedIOException e) {
return true;
}
}

@Override
public Transaction createTransaction() {
TableOperations ops = newTableOps(identifier);
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ private TableProperties() {}
public static final String UPSERT_ENABLED = "write.upsert.enabled";
public static final boolean UPSERT_ENABLED_DEFAULT = false;

public static final String UNIQUE_LOCATION = "location.unique";
public static final boolean UNIQUE_LOCATION_DEFAULT = false;
public static final String LOCATION_CONFLICT_DETECTION_ENABLED =
"location.conflict-detection.enabled";
public static final boolean LOCATION_CONFLICT_DETECTION_ENABLED_DEFAULT = false;
}
30 changes: 18 additions & 12 deletions core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,28 +205,34 @@ public void testBasicCatalog() throws Exception {
}

@Test
public void testCreateTableWithUniqueLocation() throws IOException {
TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "ns2", "unique");
public void testCreateTableWithLocationConflict() throws IOException {
TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "tbl");
TableIdentifier tableIdentWithLocationConflict = TableIdentifier.of("db", "ns1");
ImmutableMap<String, String> catalogProps =
ImmutableMap.of(String.format("table-default.%s", TableProperties.UNIQUE_LOCATION), "true");
ImmutableMap.of(
String.format("table-default.%s", TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED),
"true");

HadoopCatalog catalog = hadoopCatalog(catalogProps);
String location = catalog.defaultWarehouseLocation(tableIdent);
FileSystem fs = Util.getFs(new Path(location), catalog.getConf());
Assertions.assertThat(fs.mkdirs(new Path(location))).isTrue();

Table original = catalog.buildTable(tableIdent, SCHEMA).create();
assertThat(original.location()).isEqualTo(catalog.defaultWarehouseLocation(tableIdent));

Assertions.assertThatThrownBy(
() -> catalog.buildTable(tableIdent, SCHEMA).withPartitionSpec(SPEC).create())
() -> catalog.buildTable(tableIdentWithLocationConflict, SCHEMA).create())
.isInstanceOf(AlreadyExistsException.class)
.hasMessageStartingWith("Table location already in use");
.hasMessageStartingWith("Table location already exists");

Table recreated =
Table withLocationConflict =
catalog
.buildTable(tableIdent, SCHEMA)
.withProperty(TableProperties.UNIQUE_LOCATION, "false")
.buildTable(tableIdentWithLocationConflict, SCHEMA)
.withProperty(TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED, "false")
.create();
assertThat(recreated.location()).isEqualTo(location);
assertThat(withLocationConflict.location())
.isEqualTo(catalog.defaultWarehouseLocation(tableIdentWithLocationConflict));

catalog.dropTable(tableIdent);
catalog.dropTable(tableIdentWithLocationConflict);
}

@Test
Expand Down
12 changes: 7 additions & 5 deletions core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,17 @@ public void testBasicCatalog() throws Exception {
}

@Test
public void testCreateTableWithUniqueLocation() throws IOException {
public void testCreateTableWithLocationConflict() throws IOException {
try (JdbcCatalog jdbcCatalog =
initCatalog(
"unique_jdbc_catalog",
ImmutableMap.of(
String.format("table-default.%s", TableProperties.UNIQUE_LOCATION), "true"))) {
String.format(
"table-default.%s", TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED),
"true"))) {
Namespace testNamespace = Namespace.of("testDb", "ns1", "ns2");
jdbcCatalog.createNamespace(testNamespace, Maps.newHashMap());
TableIdentifier tableIdent = TableIdentifier.of(testNamespace, "unique");
TableIdentifier tableIdent = TableIdentifier.of(testNamespace, "original");
TableIdentifier tableRenamed = TableIdentifier.of(testNamespace, "renamed");

Table table = jdbcCatalog.createTable(tableIdent, SCHEMA, PartitionSpec.unpartitioned());
Expand All @@ -314,15 +316,15 @@ public void testCreateTableWithUniqueLocation() throws IOException {
currentLocation,
ImmutableMap.of()))
.isInstanceOf(AlreadyExistsException.class)
.hasMessageStartingWith("Table location already in use");
.hasMessageStartingWith("Table location already exists");

Table recreated =
jdbcCatalog.createTable(
tableIdent,
SCHEMA,
PartitionSpec.unpartitioned(),
currentLocation,
ImmutableMap.of(TableProperties.UNIQUE_LOCATION, "false"));
ImmutableMap.of(TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED, "false"));
assertThat(recreated.location()).isEqualTo(currentLocation);

jdbcCatalog.dropTable(tableRenamed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1163,13 +1163,15 @@ public void testDatabaseLocationWithSlashInWarehouseDir() {
}

@Test
public void testCreateTableWithUniqueLocation() {
public void testCreateTableWithLocationConflict() {
Schema schema = getTestSchema();
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "unique");
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "original");
TableIdentifier tableRenamed = TableIdentifier.of(DB_NAME, "renamed");

ImmutableMap<String, String> catalogProps =
ImmutableMap.of(String.format("table-default.%s", TableProperties.UNIQUE_LOCATION), "true");
ImmutableMap.of(
String.format("table-default.%s", TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED),
"true");
Catalog hiveCatalog =
CatalogUtil.loadCatalog(
HiveCatalog.class.getName(),
Expand All @@ -1185,12 +1187,12 @@ public void testCreateTableWithUniqueLocation() {

assertThatThrownBy(() -> hiveCatalog.buildTable(tableIdent, schema).create())
.isInstanceOf(AlreadyExistsException.class)
.hasMessageStartingWith("Table location already in use");
.hasMessageStartingWith("Table location already exists");

Table recreated =
hiveCatalog
.buildTable(tableIdent, schema)
.withProperty(TableProperties.UNIQUE_LOCATION, "false")
.withProperty(TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED, "false")
.create();
assertThat(recreated.location()).isEqualTo(currentLocation);
} finally {
Expand Down

0 comments on commit d13fb0f

Please sign in to comment.