Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2422] Adding rollback plan and rollback requested instant #3651

Merged
merged 1 commit into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
Expand Down Expand Up @@ -590,12 +591,19 @@ public boolean rollback(final String commitInstantTime) throws HoodieRollbackExc
.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
.findFirst());
if (commitInstantOpt.isPresent()) {
HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true);
if (timerContext != null) {
long durationInMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime);
Option<HoodieRollbackPlan> rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false);
if (rollbackPlanOption.isPresent()) {
// execute rollback
HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true);
if (timerContext != null) {
long durationInMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
}
return true;
} else {
throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime);
}
return true;
} else {
LOG.warn("Cannot find instant " + commitInstantTime + " in the timeline, for rollback");
return false;
Expand Down Expand Up @@ -776,7 +784,9 @@ protected abstract void completeCompaction(HoodieCommitMetadata metadata, O writ
* @param table Hoodie Table
*/
public void rollbackInflightCompaction(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
table.rollback(context, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false);
String commitTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, commitTime, inflightInstant, false);
table.rollback(context, commitTime, inflightInstant, false);
table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
}

Expand Down Expand Up @@ -978,7 +988,9 @@ protected Option<String> inlineCluster(Option<Map<String, String>> extraMetadata
}

protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
table.rollback(context, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false);
String commitTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, commitTime, inflightInstant, false);
table.rollback(context, commitTime, inflightInstant, false);
table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
Expand Down Expand Up @@ -316,6 +317,13 @@ public HoodieTimeline getCleanTimeline() {
return getActiveTimeline().getCleanerTimeline();
}

/**
* Get rollback timeline.
*/
public HoodieTimeline getRollbackTimeline() {
vinothchandar marked this conversation as resolved.
Show resolved Hide resolved
return getActiveTimeline().getRollbackTimeline();
}

/**
* Get only the completed (no-inflights) savepoint timeline.
*/
Expand Down Expand Up @@ -417,6 +425,19 @@ public abstract Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext c
*/
public abstract HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime);

/**
* Schedule rollback for the instant time.
*
* @param context HoodieEngineContext
* @param instantTime Instant Time for scheduling rollback
* @param instantToRollback instant to be rolled back
* @return HoodieRollbackPlan containing info on rollback.
*/
public abstract Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
vinothchandar marked this conversation as resolved.
Show resolved Hide resolved
String instantTime,
HoodieInstant instantToRollback,
boolean skipTimelinePublish);

/**
* Rollback the (inflight/committed) record changes with the given commit time.
* <pre>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.table.action.rollback;

import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
Expand All @@ -43,7 +44,6 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand All @@ -53,11 +53,6 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,

private static final Logger LOG = LogManager.getLogger(BaseRollbackActionExecutor.class);

interface RollbackStrategy extends Serializable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewer: RollbackStrategy is now moved to planActionExecutor and also interface methods have changed.


List<HoodieRollbackStat> execute(HoodieInstant instantToRollback);
}

protected final HoodieInstant instantToRollback;
protected final boolean deleteInstants;
protected final boolean skipTimelinePublish;
Expand Down Expand Up @@ -92,30 +87,74 @@ public BaseRollbackActionExecutor(HoodieEngineContext context,
}
}

protected abstract RollbackStrategy getRollbackStrategy();
/**
* Execute actual rollback and fetch list of RollbackStats.
* @param hoodieRollbackPlan instance of {@link HoodieRollbackPlan} that needs to be executed.
* @return a list of {@link HoodieRollbackStat}s.
* @throws IOException
*/
protected abstract List<HoodieRollbackStat> executeRollback(HoodieRollbackPlan hoodieRollbackPlan) throws IOException;

private HoodieRollbackMetadata runRollback(HoodieTable<T, I, K, O> table, HoodieInstant rollbackInstant, HoodieRollbackPlan rollbackPlan) {
ValidationUtils.checkArgument(rollbackInstant.getState().equals(HoodieInstant.State.REQUESTED)
|| rollbackInstant.getState().equals(HoodieInstant.State.INFLIGHT));
try {
final HoodieInstant inflightInstant;
final HoodieTimer timer = new HoodieTimer();
timer.startTimer();
if (rollbackInstant.isRequested()) {
inflightInstant = table.getActiveTimeline().transitionRollbackRequestedToInflight(rollbackInstant,
TimelineMetadataUtils.serializeRollbackPlan(rollbackPlan));
} else {
inflightInstant = rollbackInstant;
}

HoodieTimer rollbackTimer = new HoodieTimer().startTimer();
List<HoodieRollbackStat> stats = doRollbackAndGetStats(rollbackPlan);
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.convertRollbackMetadata(
instantTime,
Option.of(rollbackTimer.endTimer()),
Collections.singletonList(instantToRollback),
stats);
if (!skipTimelinePublish) {
finishRollback(inflightInstant, rollbackMetadata);
}

protected abstract List<HoodieRollbackStat> executeRollback() throws IOException;
// Finally, remove the markers post rollback.
WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp())
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());

protected abstract List<HoodieRollbackStat> executeRollbackUsingFileListing(HoodieInstant instantToRollback);
return rollbackMetadata;
} catch (IOException e) {
throw new HoodieIOException("Failed to rollback commit ", e);
}
}

@Override
public HoodieRollbackMetadata execute() {
HoodieTimer rollbackTimer = new HoodieTimer().startTimer();
List<HoodieRollbackStat> stats = doRollbackAndGetStats();
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.convertRollbackMetadata(
instantTime,
Option.of(rollbackTimer.endTimer()),
Collections.singletonList(instantToRollback),
stats);
if (!skipTimelinePublish) {
finishRollback(rollbackMetadata);
table.getMetaClient().reloadActiveTimeline();
List<HoodieInstant> rollBackInstants = table.getRollbackTimeline()
.filterInflightsAndRequested().getInstants().collect(Collectors.toList());
if (rollBackInstants.isEmpty()) {
throw new HoodieRollbackException("No Requested Rollback Instants found to execute rollback ");
}
HoodieInstant rollbackInstant = null;
for (HoodieInstant instant : rollBackInstants) {
if (instantTime.equals(instant.getTimestamp())) {
rollbackInstant = instant;
break;
}
}
if (rollbackInstant != null) {
try {
HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(table.getMetaClient(), rollbackInstant);
return runRollback(table, rollBackInstants.get(0), rollbackPlan);
} catch (IOException e) {
throw new HoodieIOException("Failed to fetch rollback plan to rollback commit " + rollbackInstant.getTimestamp(), e);
}
} else {
throw new HoodieIOException("No inflight rollback instants found for commit time " + instantTime);
}

// Finally, remove the markers post rollback.
WriteMarkersFactory.get(config.getMarkersType(), table, instantToRollback.getTimestamp())
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());

return rollbackMetadata;
}

private void validateSavepointRollbacks() {
Expand Down Expand Up @@ -173,7 +212,7 @@ private void rollBackIndex() {
LOG.info("Index rolled back for commits " + instantToRollback);
}

public List<HoodieRollbackStat> doRollbackAndGetStats() {
public List<HoodieRollbackStat> doRollbackAndGetStats(HoodieRollbackPlan hoodieRollbackPlan) {
final String instantTimeToRollback = instantToRollback.getTimestamp();
final boolean isPendingCompaction = Objects.equals(HoodieTimeline.COMPACTION_ACTION, instantToRollback.getAction())
&& !instantToRollback.isCompleted();
Expand All @@ -186,7 +225,7 @@ public List<HoodieRollbackStat> doRollbackAndGetStats() {
}

try {
List<HoodieRollbackStat> stats = executeRollback();
List<HoodieRollbackStat> stats = executeRollback(hoodieRollbackPlan);
LOG.info("Rolled back inflight instant " + instantTimeToRollback);
if (!isPendingCompaction) {
rollBackIndex();
Expand All @@ -197,12 +236,19 @@ public List<HoodieRollbackStat> doRollbackAndGetStats() {
}
}

protected void finishRollback(HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException {
/**
* Execute rollback and fetch rollback stats.
* @param instantToRollback instant to be rolled back.
* @param rollbackPlan instance of {@link HoodieRollbackPlan} for which rollback needs to be executed.
* @return list of {@link HoodieRollbackStat}s.
*/
protected List<HoodieRollbackStat> executeRollback(HoodieInstant instantToRollback, HoodieRollbackPlan rollbackPlan) {
return new BaseRollbackHelper(table.getMetaClient(), config).performRollback(context, instantToRollback, rollbackPlan.getRollbackRequests());
}

protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetadata rollbackMetadata) throws HoodieIOException {
try {
table.getActiveTimeline().createNewInstant(
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, instantTime));
table.getActiveTimeline().saveAsComplete(
new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, instantTime),
table.getActiveTimeline().transitionRollbackInflightToComplete(inflightInstant,
TimelineMetadataUtils.serializeRollbackMetadata(rollbackMetadata));
LOG.info("Rollback of Commits " + rollbackMetadata.getCommitsRollback() + " is complete");
} catch (IOException e) {
Expand Down
Loading