Skip to content

Commit

Permalink
Flink: Support modifying table properties for table with primary key. (
Browse files Browse the repository at this point in the history
  • Loading branch information
wuwenchi authored Apr 19, 2022
1 parent 5da8885 commit 5ded91e
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,30 @@ void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
}

private static void validateTableSchemaAndPartition(CatalogTable ct1, CatalogTable ct2) {
TableSchema ts1 = ct1.getSchema();
TableSchema ts2 = ct2.getSchema();
boolean equalsPrimary = false;

if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) {
equalsPrimary =
Objects.equals(ts1.getPrimaryKey().get().getType(), ts2.getPrimaryKey().get().getType()) &&
Objects.equals(ts1.getPrimaryKey().get().getColumns(), ts2.getPrimaryKey().get().getColumns());
} else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) {
equalsPrimary = true;
}

if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns()) &&
Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs()) &&
equalsPrimary)) {
throw new UnsupportedOperationException("Altering schema is not supported yet.");
}

if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) {
throw new UnsupportedOperationException("Altering partition keys is not supported yet.");
}
}

@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws CatalogException, TableNotExistException {
Expand All @@ -423,13 +447,7 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean

// For current Flink Catalog API, support for adding/removing/renaming columns cannot be done by comparing
// CatalogTable instances, unless the Flink schema contains Iceberg column IDs.
if (!table.getSchema().equals(newTable.getSchema())) {
throw new UnsupportedOperationException("Altering schema is not supported yet.");
}

if (!table.getPartitionKeys().equals(((CatalogTable) newTable).getPartitionKeys())) {
throw new UnsupportedOperationException("Altering partition keys is not supported yet.");
}
validateTableSchemaAndPartition(table, (CatalogTable) newTable);

Map<String, String> oldProperties = table.getOptions();
Map<String, String> setProperties = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,30 @@ public void testAlterTable() throws TableNotExistException {
Assert.assertEquals(properties, table("tl").properties());
}

@Test
public void testAlterTableWithPrimaryKey() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')");
Map<String, String> properties = Maps.newHashMap();
properties.put("oldK", "oldV");

// new
sql("ALTER TABLE tl SET('newK'='newV')");
properties.put("newK", "newV");
Assert.assertEquals(properties, table("tl").properties());

// update old
sql("ALTER TABLE tl SET('oldK'='oldV2')");
properties.put("oldK", "oldV2");
Assert.assertEquals(properties, table("tl").properties());

// remove property
CatalogTable catalogTable = catalogTable("tl");
properties.remove("oldK");
getTableEnv().getCatalog(getTableEnv().getCurrentCatalog()).get().alterTable(
new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false);
Assert.assertEquals(properties, table("tl").properties());
}

@Test
public void testRelocateTable() {
Assume.assumeFalse("HadoopCatalog does not support relocate table", isHadoopCatalog);
Expand Down

0 comments on commit 5ded91e

Please sign in to comment.