Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink 1.17: Support alter table column #7628

Merged
merged 15 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
Expand All @@ -60,15 +61,14 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.flink.util.FlinkAlterTableUtil;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -91,7 +91,6 @@
* independent of the partition of Flink.
*/
public class FlinkCatalog extends AbstractCatalog {

private final CatalogLoader catalogLoader;
private final Catalog icebergCatalog;
private final Namespace baseNamespace;
Expand Down Expand Up @@ -439,14 +438,35 @@ private static void validateTableSchemaAndPartition(CatalogTable ct1, CatalogTab
if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns())
&& Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs())
&& equalsPrimary)) {
throw new UnsupportedOperationException("Altering schema is not supported yet.");
throw new UnsupportedOperationException(
"Altering schema is not supported in the old alterTable API. "
+ "To alter schema, use the other alterTable API and provide a list of TableChange's.");
}

validateTablePartition(ct1, ct2);
}

private static void validateTablePartition(CatalogTable ct1, CatalogTable ct2) {
if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) {
throw new UnsupportedOperationException("Altering partition keys is not supported yet.");
}
}

/**
* This alterTable API only supports altering table properties.
*
* <p>Support for adding/removing/renaming columns cannot be done by comparing CatalogTable
* instances, unless the Flink schema contains Iceberg column IDs.
*
* <p>To alter columns, use the other alterTable API and provide a list of TableChange's.
*
* @param tablePath path of the table or view to be modified
* @param newTable the new table definition
* @param ignoreIfNotExists flag to specify behavior when the table or view does not exist: if set
* to false, throw an exception, if set to true, do nothing.
* @throws CatalogException in case of any runtime exception
* @throws TableNotExistException if the table does not exist
*/
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws CatalogException, TableNotExistException {
Expand All @@ -464,12 +484,6 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean
}

CatalogTable table = toCatalogTable(icebergTable);

// Currently, Flink SQL only support altering table properties.

// For current Flink Catalog API, support for adding/removing/renaming columns cannot be done by
// comparing
// CatalogTable instances, unless the Flink schema contains Iceberg column IDs.
validateTableSchemaAndPartition(table, (CatalogTable) newTable);

Map<String, String> oldProperties = table.getOptions();
Expand Down Expand Up @@ -507,7 +521,66 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean
}
});

commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties);
FlinkAlterTableUtil.commitChanges(
icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties);
}

@Override
public void alterTable(
ObjectPath tablePath,
CatalogBaseTable newTable,
List<TableChange> tableChanges,
boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
validateFlinkTable(newTable);

Table icebergTable;
try {
icebergTable = loadIcebergTable(tablePath);
} catch (TableNotExistException e) {
if (!ignoreIfNotExists) {
throw e;
} else {
return;
}
}

// Does not support altering partition yet.
validateTablePartition(toCatalogTable(icebergTable), (CatalogTable) newTable);

String setLocation = null;
String setSnapshotId = null;
String cherrypickSnapshotId = null;

List<TableChange> propertyChanges = Lists.newArrayList();
List<TableChange> schemaChanges = Lists.newArrayList();
for (TableChange change : tableChanges) {
if (change instanceof TableChange.SetOption) {
TableChange.SetOption set = (TableChange.SetOption) change;

if ("location".equalsIgnoreCase(set.getKey())) {
setLocation = set.getValue();
} else if ("current-snapshot-id".equalsIgnoreCase(set.getKey())) {
setSnapshotId = set.getValue();
} else if ("cherry-pick-snapshot-id".equalsIgnoreCase(set.getKey())) {
cherrypickSnapshotId = set.getValue();
} else {
propertyChanges.add(change);
}
} else if (change instanceof TableChange.ResetOption) {
propertyChanges.add(change);
} else {
schemaChanges.add(change);
}
}

FlinkAlterTableUtil.commitChanges(
icebergTable,
setLocation,
setSnapshotId,
cherrypickSnapshotId,
schemaChanges,
propertyChanges);
}

private static void validateFlinkTable(CatalogBaseTable table) {
Expand Down Expand Up @@ -552,52 +625,6 @@ private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSc
return partitionKeysBuilder.build();
}

private static void commitChanges(
Table table,
String setLocation,
String setSnapshotId,
String pickSnapshotId,
Map<String, String> setProperties) {
// don't allow setting the snapshot and picking a commit at the same time because order is
// ambiguous and choosing
// one order leads to different results
Preconditions.checkArgument(
setSnapshotId == null || pickSnapshotId == null,
"Cannot set the current snapshot ID and cherry-pick snapshot changes");

if (setSnapshotId != null) {
long newSnapshotId = Long.parseLong(setSnapshotId);
table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
}

// if updating the table snapshot, perform that update first in case it fails
if (pickSnapshotId != null) {
long newSnapshotId = Long.parseLong(pickSnapshotId);
table.manageSnapshots().cherrypick(newSnapshotId).commit();
}

Transaction transaction = table.newTransaction();

if (setLocation != null) {
transaction.updateLocation().setLocation(setLocation).commit();
}

if (!setProperties.isEmpty()) {
UpdateProperties updateProperties = transaction.updateProperties();
setProperties.forEach(
(k, v) -> {
if (v == null) {
updateProperties.remove(k);
} else {
updateProperties.set(k, v);
}
});
updateProperties.commit();
}

transaction.commitTransaction();
}

static CatalogTable toCatalogTable(Table table) {
TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ public static LogicalType convert(Type type) {
return TypeUtil.visit(type, new TypeToFlinkType());
}

/**
* Convert a {@link LogicalType Flink type} to a {@link Type}.
*
* @param flinkType a FlinkType
* @return the equivalent Iceberg type
*/
public static Type convert(LogicalType flinkType) {
return flinkType.accept(new FlinkTypeToType());
}

/**
* Convert a {@link RowType} to a {@link TableSchema}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class FlinkTypeToType extends FlinkTypeVisitor<Type> {
private final RowType root;
private int nextId;

FlinkTypeToType() {
this.root = null;
}

FlinkTypeToType(RowType root) {
this.root = root;
// the root struct's fields use the first ids
Expand Down
Loading