Skip to content

Commit

Permalink
Flink: Supports specifying comment for iceberg fields in create table…
Browse files Browse the repository at this point in the history
… and addcolumn syntax using flinksql (#9606)

Co-authored-by: huyuanfeng <[email protected]>
  • Loading branch information
huyuanfeng2018 and huyuanfeng authored Mar 5, 2024
1 parent bcbcbb2 commit 05f99b6
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
Expand Down Expand Up @@ -390,17 +391,16 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
+ "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or "
+ "create table without 'connector'='iceberg' related properties in an iceberg table.");
}

createIcebergTable(tablePath, table, ignoreIfExists);
Preconditions.checkArgument(table instanceof ResolvedCatalogTable, "table should be resolved");
createIcebergTable(tablePath, (ResolvedCatalogTable) table, ignoreIfExists);
}

void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
void createIcebergTable(ObjectPath tablePath, ResolvedCatalogTable table, boolean ignoreIfExists)
throws CatalogException, TableAlreadyExistException {
validateFlinkTable(table);

Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
Schema icebergSchema = FlinkSchemaUtil.convert(table.getResolvedSchema());
PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);

ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
String location = null;
for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.connector.sink.DynamicTableSink;
Expand Down Expand Up @@ -84,17 +83,17 @@ public FlinkDynamicTableFactory(FlinkCatalog catalog) {
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
CatalogTable catalogTable = context.getCatalogTable();
Map<String, String> tableProps = catalogTable.getOptions();
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable();
Map<String, String> tableProps = resolvedCatalogTable.getOptions();
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(resolvedCatalogTable.getSchema());

TableLoader tableLoader;
if (catalog != null) {
tableLoader = createTableLoader(catalog, objectIdentifier.toObjectPath());
} else {
tableLoader =
createTableLoader(
catalogTable,
resolvedCatalogTable,
tableProps,
objectIdentifier.getDatabaseName(),
objectIdentifier.getObjectName());
Expand All @@ -106,17 +105,17 @@ public DynamicTableSource createDynamicTableSource(Context context) {
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
CatalogTable catalogTable = context.getCatalogTable();
Map<String, String> writeProps = catalogTable.getOptions();
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable();
Map<String, String> writeProps = resolvedCatalogTable.getOptions();
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(resolvedCatalogTable.getSchema());

TableLoader tableLoader;
if (catalog != null) {
tableLoader = createTableLoader(catalog, objectIdentifier.toObjectPath());
} else {
tableLoader =
createTableLoader(
catalogTable,
resolvedCatalogTable,
writeProps,
objectIdentifier.getDatabaseName(),
objectIdentifier.getObjectName());
Expand Down Expand Up @@ -147,7 +146,7 @@ public String factoryIdentifier() {
}

private static TableLoader createTableLoader(
CatalogBaseTable catalogBaseTable,
ResolvedCatalogTable resolvedCatalogTable,
Map<String, String> tableProps,
String databaseName,
String tableName) {
Expand Down Expand Up @@ -187,7 +186,7 @@ private static TableLoader createTableLoader(
// Create table if not exists in the external catalog.
if (!flinkCatalog.tableExists(objectPath)) {
try {
flinkCatalog.createIcebergTable(objectPath, catalogBaseTable, true);
flinkCatalog.createIcebergTable(objectPath, resolvedCatalogTable, true);
} catch (TableAlreadyExistException e) {
throw new AlreadyExistsException(
e,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import java.util.List;
import java.util.Set;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
Expand Down Expand Up @@ -55,35 +58,69 @@ public class FlinkSchemaUtil {

private FlinkSchemaUtil() {}

/** Convert the flink table schema to apache iceberg schema. */
/** @deprecated Use {@link #convert(ResolvedSchema)} instead. */
@Deprecated
public static Schema convert(TableSchema schema) {
LogicalType schemaType = schema.toRowDataType().getLogicalType();
Preconditions.checkArgument(
schemaType instanceof RowType, "Schema logical type should be RowType.");
schemaType instanceof RowType, "Schema logical type should be row type.");

RowType root = (RowType) schemaType;
Type converted = root.accept(new FlinkTypeToType(root));

Schema iSchema = new Schema(converted.asStructType().fields());
return freshIdentifierFieldIds(iSchema, schema);
Schema icebergSchema = new Schema(converted.asStructType().fields());
if (schema.getPrimaryKey().isPresent()) {
return freshIdentifierFieldIds(icebergSchema, schema.getPrimaryKey().get().getColumns());
} else {
return icebergSchema;
}
}

/** Convert the flink table schema to apache iceberg schema with column comment. */
public static Schema convert(ResolvedSchema flinkSchema) {
List<Column> tableColumns = flinkSchema.getColumns();
// copy from org.apache.flink.table.api.Schema#toRowDataType
DataTypes.Field[] fields =
tableColumns.stream()
.map(
column -> {
if (column.getComment().isPresent()) {
return DataTypes.FIELD(
column.getName(), column.getDataType(), column.getComment().get());
} else {
return DataTypes.FIELD(column.getName(), column.getDataType());
}
})
.toArray(DataTypes.Field[]::new);

LogicalType schemaType = DataTypes.ROW(fields).notNull().getLogicalType();
Preconditions.checkArgument(
schemaType instanceof RowType, "Schema logical type should be row type.");

RowType root = (RowType) schemaType;
Type converted = root.accept(new FlinkTypeToType(root));
Schema icebergSchema = new Schema(converted.asStructType().fields());
if (flinkSchema.getPrimaryKey().isPresent()) {
return freshIdentifierFieldIds(icebergSchema, flinkSchema.getPrimaryKey().get().getColumns());
} else {
return icebergSchema;
}
}

private static Schema freshIdentifierFieldIds(Schema iSchema, TableSchema schema) {
private static Schema freshIdentifierFieldIds(Schema icebergSchema, List<String> primaryKeys) {
// Locate the identifier field id list.
Set<Integer> identifierFieldIds = Sets.newHashSet();
if (schema.getPrimaryKey().isPresent()) {
for (String column : schema.getPrimaryKey().get().getColumns()) {
Types.NestedField field = iSchema.findField(column);
Preconditions.checkNotNull(
field,
"Cannot find field ID for the primary key column %s in schema %s",
column,
iSchema);
identifierFieldIds.add(field.fieldId());
}
for (String primaryKey : primaryKeys) {
Types.NestedField field = icebergSchema.findField(primaryKey);
Preconditions.checkNotNull(
field,
"Cannot find field ID for the primary key column %s in schema %s",
primaryKey,
icebergSchema);
identifierFieldIds.add(field.fieldId());
}

return new Schema(iSchema.schemaId(), iSchema.asStruct().fields(), identifierFieldIds);
return new Schema(
icebergSchema.schemaId(), icebergSchema.asStruct().fields(), identifierFieldIds);
}

/**
Expand All @@ -109,7 +146,11 @@ public static Schema convert(Schema baseSchema, TableSchema flinkSchema) {

// fix types that can't be represented in Flink (UUID)
Schema fixedSchema = FlinkFixupTypes.fixup(schema, baseSchema);
return freshIdentifierFieldIds(fixedSchema, flinkSchema);
if (flinkSchema.getPrimaryKey().isPresent()) {
return freshIdentifierFieldIds(fixedSchema, flinkSchema.getPrimaryKey().get().getColumns());
} else {
return fixedSchema;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,11 @@ public static void applySchemaChanges(
flinkColumn.getName());
Type icebergType = FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());
if (flinkColumn.getDataType().getLogicalType().isNullable()) {
pendingUpdate.addColumn(flinkColumn.getName(), icebergType);
pendingUpdate.addColumn(
flinkColumn.getName(), icebergType, flinkColumn.getComment().orElse(null));
} else {
pendingUpdate.addRequiredColumn(flinkColumn.getName(), icebergType);
pendingUpdate.addRequiredColumn(
flinkColumn.getName(), icebergType, flinkColumn.getComment().orElse(null));
}
} else if (change instanceof TableChange.ModifyColumn) {
TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) change;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,19 @@ public void testCreatePartitionTable() throws TableNotExistException {
assertThat(catalogTable.getPartitionKeys()).isEqualTo(Collections.singletonList("dt"));
}

@TestTemplate
public void testCreateTableWithColumnComment() {
sql("CREATE TABLE tl(id BIGINT COMMENT 'comment - id', data STRING COMMENT 'comment - data')");

Table table = table("tl");
assertThat(table.schema().asStruct())
.isEqualTo(
new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get(), "comment - id"),
Types.NestedField.optional(2, "data", Types.StringType.get(), "comment - data"))
.asStruct());
}

@TestTemplate
public void testCreateTableWithFormatV2ThroughTableProperty() throws Exception {
sql("CREATE TABLE tl(id BIGINT) WITH ('format-version'='2')");
Expand Down Expand Up @@ -316,14 +329,15 @@ public void testAlterTableAddColumn() {
Types.NestedField.optional(2, "dt", Types.StringType.get()))
.asStruct());
// Add multiple columns
sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)");
sql("ALTER TABLE tl ADD (col1 STRING COMMENT 'comment for col1', col2 BIGINT)");
Schema schemaAfter2 = table("tl").schema();
assertThat(schemaAfter2.asStruct())
.isEqualTo(
new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "dt", Types.StringType.get()),
Types.NestedField.optional(3, "col1", Types.StringType.get()),
Types.NestedField.optional(
3, "col1", Types.StringType.get(), "comment for col1"),
Types.NestedField.optional(4, "col2", Types.LongType.get()))
.asStruct());
// Adding a required field should fail because Iceberg's SchemaUpdate does not allow
Expand Down

0 comments on commit 05f99b6

Please sign in to comment.