Skip to content

Commit

Permalink
[HUDI-4130] Remove the upgrade/downgrade for flink #initTable (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 authored and yihua committed Jun 3, 2022
1 parent 59abb09 commit b0633f5
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1551,7 +1551,7 @@ private void setWriteTimer(HoodieTable table) {
}
}

private void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
UpgradeDowngrade upgradeDowngrade =
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,14 +407,20 @@ protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<Strin
return getHoodieTable();
}

@Override
protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
// do nothing.
// flink executes the upgrade/downgrade once when initializing the first instant on start up,
// no need to execute the upgrade/downgrade on each write in streaming.
}

/**
* Upgrade downgrade the Hoodie table.
*
* <p>This action should only be executed once for each commit.
* The modification of the table properties is not thread safe.
*/
public void upgradeDowngrade(String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
public void upgradeDowngrade(String instantTime, HoodieTableMetaClient metaClient) {
new UpgradeDowngrade(metaClient, config, context, FlinkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.current(), instantTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ private void initInstant(String instant) {
// starts a new instant
startInstant();
// upgrade downgrade
this.writeClient.upgradeDowngrade(this.instant);
this.writeClient.upgradeDowngrade(this.instant, this.metaClient);
}, "initialize instant %s", instant);
}

Expand Down

0 comments on commit b0633f5

Please sign in to comment.