Skip to content

Commit

Permalink
Revert "[HUDI-3870] Add timeout rollback for flink online compaction (#…
Browse files Browse the repository at this point in the history
…5314)"

This reverts commit 6f9b02d.
  • Loading branch information
danny0405 committed May 18, 2022
1 parent a1017c6 commit 54ed452
Showing 1 changed file with 3 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public void notifyCheckpointComplete(long checkpointId) {
// when the earliest inflight instant has timed out, assumes it has failed
// already and just rolls it back.

CompactionUtil.rollbackEarliestCompaction(table, conf);
// comment out: do we really need the timeout rollback ?
// CompactionUtil.rollbackEarliestCompaction(table, conf);
scheduleCompaction(table, checkpointId);
} catch (Throwable throwable) {
// make it fail-safe
Expand All @@ -99,22 +100,14 @@ public void notifyCheckpointComplete(long checkpointId) {

private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
// the first instant takes the highest priority.
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
Option<HoodieInstant> firstRequested = pendingCompactionTimeline
Option<HoodieInstant> firstRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant();
if (!firstRequested.isPresent()) {
// do nothing.
LOG.info("No compaction plan for checkpoint " + checkpointId);
return;
}

Option<HoodieInstant> firstInflight = pendingCompactionTimeline
.filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT).firstInstant();
if (firstInflight.isPresent()) {
LOG.warn("Waiting for pending compaction instant : " + firstInflight + " to complete, skip scheduling new compaction plans");
return;
}

String compactionInstantTime = firstRequested.get().getTimestamp();

// generate compaction plan
Expand Down

0 comments on commit 54ed452

Please sign in to comment.