-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink 1.17: Support alter table column #7628
Conversation
d4d75bc
to
fe3521b
Compare
fe3521b
to
2264787
Compare
For backward-compatibility, I'm currently remaining the old alterTable API as it is. Not sure if it's the right thing to do. |
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. @stevenzwu you may also be interested in this.
@chenjunjiedada Thanks for reviewing! I just fixed a bug on modifying the nullability of columns. You might want to check. |
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java
Outdated
Show resolved
Hide resolved
pendingUpdate.deleteColumn(dropColumn.getColumnName()); | ||
|
||
} else if (change instanceof TableChange.AddWatermark) { | ||
throw new UnsupportedOperationException("Adding watermark specs is not supported yet. "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty space in the end is not necessary. please check all places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this (the extra whitespace in the end) needs to be addressed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also the error msg doesn't follow the coding style. you can referring to ArrowVectorAccessor
for examples. Please review all the error msgs in this class
E.g., it could be sth like Unsupported schema change: adding watermark specs
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
Outdated
Show resolved
Hide resolved
if (!schemaChanges.isEmpty()) { | ||
UpdateSchema updateSchema = transaction.updateSchema(); | ||
FlinkAlterTableUtil.applySchemaChanges(updateSchema, schemaChanges); | ||
updateSchema.commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe commit
action can be moved inside FlinkAlterTableUtil
? then it would be more natural to move applyManageSnapshots
into FlinkAlterTableUtil
too as mentioned in a comment below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's reasonable.
throw new UnsupportedOperationException("Modifying watermark specs is not supported yet. "); | ||
|
||
} else if (change instanceof TableChange.DropWatermark) { | ||
throw new UnsupportedOperationException("Watermark specs is not supported yet. "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the previous exception message indicated the action to be taken, such as add, modify, but there is no. Should unify it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this!
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
public void testAlterTableAddColumn() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add a case to test when the name of the field to be added already exists?
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
Show resolved
Hide resolved
@stevenzwu @hililiwei Thanks for reviewing! I added a few patches. Could you take a look? |
Hi @stevenzwu @hililiwei, do you have any more comments on this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@linyanghao this looks very close to me. just left some minor/nit comments
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
Show resolved
Hide resolved
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
Outdated
Show resolved
Hide resolved
@stevenzwu I addressed the comments. Regarding the warning message, I modified it so that users will only see it when they try altering schema using the old API. Could you take a look? Thanks! |
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
Outdated
Show resolved
Hide resolved
pendingUpdate.deleteColumn(dropColumn.getColumnName()); | ||
|
||
} else if (change instanceof TableChange.AddWatermark) { | ||
throw new UnsupportedOperationException("Adding watermark specs is not supported yet. "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this (the extra whitespace in the end) needs to be addressed.
pendingUpdate.deleteColumn(dropColumn.getColumnName()); | ||
|
||
} else if (change instanceof TableChange.AddWatermark) { | ||
throw new UnsupportedOperationException("Adding watermark specs is not supported yet. "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also the error msg doesn't follow the coding style. you can referring to ArrowVectorAccessor
for examples. Please review all the error msgs in this class
E.g., it could be sth like Unsupported schema change: adding watermark specs
@linyanghao I left a few comments. can you also rebase? |
b017a5c
to
bac300d
Compare
@stevenzwu Hi, I rebased and addressed the style issues you mentioned. Could you check? |
thanks @linyanghao for the contribution and @chenjunjiedada and @hililiwei for the review. @linyanghao can you create a back port PR? you can follow the example here for back port: #8228 |
@stevenzwu Sure. Thanks for reviewing! Also thanks @chenjunjiedada and @hililiwei for reviewing! |
@stevenzwu Wait, Flink does not support ALTER TABLE in 1.16 yet. I guess we don't need to backport? |
if that's the case, yeah no need to back port |
Resolves #7312