Skip to content

Commit

Permalink
perform rollback before initiating commit instant
Browse files Browse the repository at this point in the history
  • Loading branch information
puru committed Feb 4, 2023
1 parent f84a6df commit 2ec476d
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, H
@Deprecated
public boolean rollback(final String commitInstantTime, Option<HoodiePendingRollbackInfo> pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException {
LOG.info("Begin rollback of instant " + commitInstantTime);
final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(commitInstantTime);
final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, hadoopConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,15 @@ public String startCommit(String actionType, HoodieTableMetaClient metaClient) {
return instantTime;
}


/**
* Provides a new commit time for a write operation (insert/update/delete/insert_overwrite/insert_overwrite_table) with specified action.
*/
public String startCommit(String actionType) {
HoodieTableMetaClient metaClient = createMetaClient(true);
return startCommit(actionType, metaClient);
}

/**
* Provides a new commit time for a write operation (insert/update/delete/insert_overwrite/insert_overwrite_table) without specified action.
* @param instantTime Instant time to be generated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
Expand Down Expand Up @@ -759,10 +758,8 @@ private String startCommit() {
RuntimeException lastException = null;
while (retryNum <= maxRetries) {
try {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType));
writeClient.startCommitWithTime(instantTime, commitActionType);
return instantTime;
return writeClient.startCommit(commitActionType);
} catch (IllegalArgumentException ie) {
lastException = ie;
LOG.error("Got error trying to start a new commit. Retrying after sleeping for a sec", ie);
Expand Down

0 comments on commit 2ec476d

Please sign in to comment.