Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2422] Adding rollback plan and rollback requested instant #3651

Merged
merged 1 commit into from
Sep 16, 2021

Conversation

nsivabalan
Copy link
Contributor

@nsivabalan nsivabalan commented Sep 13, 2021

What is the purpose of the pull request

  • Adding rollback plan and rollback requested instant.
  • We are introducing RollbackPlanning phase which will collect all file info that needs to be deleted as part of the rollback action. This HoodieRollBackPlan will get serialized into commitInstant.rollback.requested.
    • As of of this, adding a new scheduleRollback() method to the HoodieTable. Any rollback, should first schedule and then trigger actual rollback.
  public abstract Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
                                                              String instantTime,
                                                              HoodieInstant instantToRollback,
                                                              boolean skipTimelinePublish);
  • During rollback action phase, state is transitioned to inflight and the original rollback plan will be used. In this action phase, no listing or marker based fetch is performed. All we do in this phase is, fetch all file info to be deleted from serialized rollback plan and delete them. For log file deletions, we make appends to log files.

Brief change log

  • Added a new avro model for RollbackPlan (which contains a list of HoodieRollbackRequests).
  • Rollback strategy interface is changes as below.
  interface RollbackStrategy extends Serializable {
    /**
     * Fetch list of {@link HoodieRollbackRequest}s to be added to rollback plan.
     * @param instantToRollback instant to be rolled back.
     * @return list of {@link HoodieRollbackRequest}s to be added to rollback plan
     */
    List<HoodieRollbackRequest> getRollbackRequest(HoodieInstant instantToRollback);
  }
  • As part of this change, RollBackStrategy is moved from RollbackActionExecutor to RollbackPlanExecutor. Also, have added separate strategy class for Listing bases strategy.
  • Classes added or modified relating to Planning phase
    • BaseRollbackPlanActionExecutor : Create the Rollback plan (leveraging the right strategy) and serialize into rollback.requested meta file.
      • Inherited by SparkCOW/SparkMORRollbackPlanActionExecutor, FlinkCOW/FlinkMORRollbackPlanActionExecutor, and JavaCOWRollbackPlanActionExecutor.
    • AbstractMarkerBasedRollbackStrategy and ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecutor.RollbackStrategy.
      • ListingBased: SparkCOW/SparkMORListingBasedRollbackStrategy, FlinkCOW/FlinkMORListingBasedRollbackStrategy and JavaCOWListingBasedRollbackStrategy.
      • BaseMarkerBasedRollbackStrategy(used by both Flink and Java) and SparkMarkerBasedRollbackStrategy
  • Classes modified for rollback action phase:
    • BaseRollbackActionExecutor: where the actual rollback happens.
      • BaseCopyOnWriteRollbackActionExecutor(used by all 3 engines) and BaseMergeOnReadRollbackActionExecutor (used by all 3 engines)
        • Spark/Flink/JavaCOWRollbackActionExecutor and Spark/FlinkMORRollbackActionExecutor
    • Have added RollbackHelper classes to re-use code as much as possible across all 3 engines.
      • BaseRollbackHelpers: Spark/Flink/JavaRollbackHelper
      • Spark/Java/FlinkListingBasedRollbackHelper classes contains methods used in listing based rollback strategy.
  • Have fixed upgrade classes to also use the new approach
  • Fixed all existing tests relating to rollback to invoke both rollbackPlan followed by rollback action.
  • Tracking few follow ups here https://issues.apache.org/jira/browse/HUDI-2422

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@hudi-bot
Copy link

hudi-bot commented Sep 13, 2021

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run travis re-run the last Travis build
  • @hudi-bot run azure re-run the last Azure build

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some high level comments

@@ -590,6 +591,11 @@ public boolean rollback(final String commitInstantTime) throws HoodieRollbackExc
.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
.findFirst());
if (commitInstantOpt.isPresent()) {

LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime);
Option<HoodieRollbackPlan> rollbackPlan = createTable(config, hadoopConf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this follows convention of how we do compaction scheduling, i think its good.

{
"namespace": "org.apache.hudi.avro.model",
"type": "record",
"name": "HoodieListingBasedRollbackRequest",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does the plan care about listing vs marker based for the plan serialization itself?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be just store generically what file paths need to be deleted and where command blocks need to be logged? then the plan becomes the source of truth

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@nsivabalan nsivabalan force-pushed the rollbackPlan branch 5 times, most recently from 67a62e1 to 2687fdc Compare September 14, 2021 19:07
@Override
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
boolean skipTimelinePublish) {
return null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yet to fix flink and java. will be fixing it in a day or two.

{
"namespace": "org.apache.hudi.avro.model",
"type": "record",
"name": "HoodieListingBasedRollbackRequest",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -53,11 +54,6 @@

private static final Logger LOG = LogManager.getLogger(BaseRollbackActionExecutor.class);

interface RollbackStrategy extends Serializable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewer: RollbackStrategy is now moved to planActionExecutor and also interface methods have changed.

@@ -48,20 +48,23 @@ public SparkCopyOnWriteRestoreActionExecutor(HoodieSparkEngineContext context,

@Override
protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) {
if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved code from below to here.

@nsivabalan nsivabalan force-pushed the rollbackPlan branch 2 times, most recently from 87a2824 to 076a996 Compare September 14, 2021 21:28
Copy link
Contributor Author

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some notes for reviewer

@nsivabalan nsivabalan marked this pull request as ready for review September 14, 2021 21:56
@nsivabalan nsivabalan changed the title [HUDI-2422] Adding rollback plan to rollback requested instant [HUDI-2422] Adding rollback plan and rollback requested instant Sep 15, 2021
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High level code structure makes sense to me.

@@ -590,6 +590,10 @@ public boolean rollback(final String commitInstantTime) throws HoodieRollbackExc
.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
.findFirst());
if (commitInstantOpt.isPresent()) {
LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime);
createTable(config, hadoopConf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do something with the return value?

* @return Map<FileStatus, File size>
* @throws IOException
*/
protected Map<FileStatus, Long> getWrittenLogFileSizeMap(String partitionPath, String baseCommitTime, String fileId) throws IOException {
return Collections.EMPTY_MAP;
// collect all log files that is supposed to be deleted with this rollback
return FSUtils.getAllLogFiles(table.getMetaClient().getFs(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this go through metadata table for listing? would not doing so cause any issues? i.e we may log the rollback some place thats not reachable from metadata table and thus fail to interpret the rollback command

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully get this point. I have created a follow up ticket here. will follow up with you.
here is my understanding: of a scenario using cloud stores that does not support append.

If there was crash during a commit, when listing log files to be logged, the last one which got crashed may not be part of the rollback plan. but thats should be fine. anyways, its not available via listing. and so I assume even during compaction those will not be available. we will proceed on with rollback by adding another log block (file). and this will get replayed to metadata table.

If you are talking about the case, where a crash happens when rollback itself is being logged and crashed just before committing to metadata table.
we should be ok here too. we will retry the rollback which will redo the action phase. and will add new log blocks (with same old logs that were part of failed writes, just that it may not be able to successfully delete). and this will get applied to metadata table. We just have to ensure when applying changes to metadata table, we consider all files from the plan and not just the ones that got successfully deleted.

* @param rollbackPlan instance of {@link HoodieRollbackPlan} for which rollback needs to be executed.
* @return list of {@link HoodieRollbackStat}s.
*/
abstract List<HoodieRollbackStat> rollbackAndGetStats(HoodieInstant instantToRollback, HoodieRollbackPlan rollbackPlan);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executeRollback?

return undoCreate(WriteMarkers.stripMarkerSuffix(markerFilePath));
String fileToDelete = WriteMarkers.stripMarkerSuffix(markerFilePath);
Path fullDeletePath = new Path(basePath, fileToDelete);
String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), fullDeletePath.getParent());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move any helper that is not actually doing anything with an fs instance to somethihg like PathUtils

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieRollbackPlan rollbackPlan = RollbackUtils.getRollbackPlan(table.getMetaClient(), rollbackInstant);
return runRollback(table, rollBackInstants.get(0), rollbackPlan);
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add more context to the exception?

import java.util.List;
import java.util.Map;

public class HoodieRollbackRequestInternal {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SerializableRollbackRequest

"items": "string"
}
},
{"name": "logFilesToBeDeleted",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename this? log blocks to rollback? or sth?

/**
* Performs Rollback of Hoodie Tables.
*/
public class ListingBasedRollbackHelper implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can bunch of this live in client-common?

Comment on lines 29 to 46
public class SparkCopyOnWriteListingBasedRollbackStrategy extends ListingBasedRollbackStrategy {

public SparkCopyOnWriteListingBasedRollbackStrategy(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable table,
String instantTime) {
super(table, context, config, instantTime);
}

@Override
public List<HoodieRollbackRequest> getRollbackRequest(HoodieInstant instantToRollback) {
List<ListingBasedRollbackRequest> rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context,
table.getMetaClient().getBasePath(), config);
List<HoodieRollbackRequest> listingBasedRollbackRequests = new SparkListingBasedRollbackHelper(table.getMetaClient(), config)
.getRollbackRequestsForRollbackPlan(context, instantToRollback, rollbackRequests);
return listingBasedRollbackRequests;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just have one class for COW and MOR ? and branch internally using table type?

}
}
return Option.of(rollbackPlan);
} catch (IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Option is never empty, there is no need to return Option here, just return rollbackPlan which is more straight-forward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just trying to keep in sync with other operations like clean, compaction etc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants