Skip to content

Commit

Permalink
[HUDI-3884] Support archival beyond savepoint commits (apache#5837)
Browse files Browse the repository at this point in the history
Co-authored-by: sivabalan <[email protected]>
  • Loading branch information
2 people authored and fengjian committed Apr 5, 2023
1 parent 3047532 commit d1159ad
Show file tree
Hide file tree
Showing 14 changed files with 364 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -76,12 +77,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;

/**
* Archiver to bound the growth of files under .hoodie meta path.
Expand Down Expand Up @@ -409,9 +412,11 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() {
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
.filterInflights().firstInstant();

// We cannot have any holes in the commit timeline. We cannot archive any commits which are
// made after the first savepoint present.
// NOTE: We cannot have any holes in the commit timeline.
// We cannot archive any commits which are made after the first savepoint present,
// unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled.
Option<HoodieInstant> firstSavepoint = table.getCompletedSavepointTimeline().firstInstant();
Set<String> savepointTimestamps = table.getSavepointTimestamps();
if (!commitTimeline.empty() && commitTimeline.countInstants() > maxInstantsToKeep) {
// For Merge-On-Read table, inline or async compaction is enabled
// We need to make sure that there are enough delta commits in the active timeline
Expand All @@ -428,28 +433,33 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() {
// Actually do the commits
Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstants()
.filter(s -> {
// if no savepoint present, then don't filter
return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
if (config.shouldArchiveBeyondSavepoint()) {
// skip savepoint commits and proceed further
return !savepointTimestamps.contains(s.getTimestamp());
} else {
// if no savepoint present, then don't filter
// stop at first savepoint commit
return !(firstSavepoint.isPresent() && compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp()));
}
}).filter(s -> {
// Ensure commits >= oldest pending compaction commit is retained
return oldestPendingCompactionAndReplaceInstant
.map(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.map(instant -> compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true);
}).filter(s -> {
// We need this to ensure that when multiple writers are performing conflict resolution, eligible instants don't
// get archived, i.e, instants after the oldestInflight are retained on the timeline
if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.LAZY) {
return oldestInflightCommitInstant.map(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
compareTimestamps(instant.getTimestamp(), GREATER_THAN, s.getTimestamp()))
.orElse(true);
}
return true;
}).filter(s ->
oldestInstantToRetainForCompaction.map(instantToRetain ->
HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
compareTimestamps(s.getTimestamp(), LESSER_THAN, instantToRetain.getTimestamp()))
.orElse(true)
);

return instantToArchiveStream.limit(commitTimeline.countInstants() - minInstantsToKeep);
} else {
return Stream.empty();
Expand Down Expand Up @@ -479,26 +489,37 @@ private Stream<HoodieInstant> getInstantsToArchive() {
instants = Stream.empty();
} else {
LOG.info("Limiting archiving of instants to latest compaction on metadata table at " + latestCompactionTime.get());
instants = instants.filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN,
instants = instants.filter(instant -> compareTimestamps(instant.getTimestamp(), LESSER_THAN,
latestCompactionTime.get()));
}
} catch (Exception e) {
throw new HoodieException("Error limiting instant archival based on metadata table", e);
}
}

// If this is a metadata table, do not archive the commits that live in data set
// active timeline. This is required by metadata table,
// see HoodieTableMetadataUtil#processRollbackMetadata for details.
if (HoodieTableMetadata.isMetadataTable(config.getBasePath())) {
HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
.setConf(metaClient.getHadoopConf())
.build();
Option<String> earliestActiveDatasetCommit = dataMetaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp);
if (earliestActiveDatasetCommit.isPresent()) {
instants = instants.filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get()));
Option<HoodieInstant> earliestActiveDatasetCommit = dataMetaClient.getActiveTimeline().firstInstant();

if (config.shouldArchiveBeyondSavepoint()) {
// There are chances that there could be holes in the timeline due to archival and savepoint interplay.
// So, the first non-savepoint commit in the data timeline is considered as beginning of the active timeline.
Option<HoodieInstant> firstNonSavepointCommit = dataMetaClient.getActiveTimeline().getFirstNonSavepointCommit();
if (firstNonSavepointCommit.isPresent()) {
String firstNonSavepointCommitTime = firstNonSavepointCommit.get().getTimestamp();
instants = instants.filter(instant ->
compareTimestamps(instant.getTimestamp(), LESSER_THAN, firstNonSavepointCommitTime));
}
} else {
// Do not archive the commits that live in data set active timeline.
// This is required by metadata table, see HoodieTableMetadataUtil#processRollbackMetadata for details.
if (earliestActiveDatasetCommit.isPresent()) {
instants = instants.filter(instant ->
compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get().getTimestamp()));
}
}
}

Expand Down Expand Up @@ -589,7 +610,7 @@ private boolean deleteAllInstantsOlderOrEqualsInAuxMetaFolder(HoodieInstant thre
}

List<HoodieInstant> instantsToBeDeleted =
instants.stream().filter(instant1 -> HoodieTimeline.compareTimestamps(instant1.getTimestamp(),
instants.stream().filter(instant1 -> compareTimestamps(instant1.getTimestamp(),
LESSER_THAN_OR_EQUALS, thresholdInstant.getTimestamp())).collect(Collectors.toList());

for (HoodieInstant deleteInstant : instantsToBeDeleted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.config.HoodieConfig;

import javax.annotation.concurrent.Immutable;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
Expand All @@ -34,8 +35,8 @@
*/
@Immutable
@ConfigClassProperty(name = "Archival Configs",
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "Configurations that control archival.")
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "Configurations that control archival.")
public class HoodieArchivalConfig extends HoodieConfig {

public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty
Expand Down Expand Up @@ -92,6 +93,13 @@ public class HoodieArchivalConfig extends HoodieConfig {
.withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's"
+ " useful when storage scheme doesn't support append operation.");

public static final ConfigProperty<Boolean> ARCHIVE_BEYOND_SAVEPOINT = ConfigProperty
.key("hoodie.archive.beyond.savepoint")
.defaultValue(false)
.sinceVersion("0.12.0")
.withDocumentation("If enabled, archival will proceed beyond savepoint, skipping savepoint commits. "
+ "If disabled, archival will stop at the earliest savepoint commit.");

/**
* @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
*/
Expand All @@ -107,7 +115,9 @@ public class HoodieArchivalConfig extends HoodieConfig {
*/
@Deprecated
public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = COMMITS_ARCHIVAL_BATCH_SIZE.key();
/** @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead */
/**
* @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
*/
@Deprecated
private static final String DEFAULT_MAX_COMMITS_TO_KEEP = MAX_COMMITS_TO_KEEP.defaultValue();
/**
Expand Down Expand Up @@ -186,6 +196,11 @@ public HoodieArchivalConfig.Builder withCommitsArchivalBatchSize(int batchSize)
return this;
}

public Builder withArchiveBeyondSavepoint(boolean archiveBeyondSavepoint) {
archivalConfig.setValue(ARCHIVE_BEYOND_SAVEPOINT, String.valueOf(archiveBeyondSavepoint));
return this;
}

public HoodieArchivalConfig build() {
archivalConfig.setDefaults(HoodieArchivalConfig.class.getName());
return archivalConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,11 @@ public boolean isAutoClean() {
}

public boolean getArchiveMergeEnable() {
return getBoolean(HoodieArchivalConfig.ARCHIVE_MERGE_ENABLE);
return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_MERGE_ENABLE);
}

public boolean shouldArchiveBeyondSavepoint() {
return getBooleanOrDefault(HoodieArchivalConfig.ARCHIVE_BEYOND_SAVEPOINT);
}

public long getArchiveMergeSmallFileLimitBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,10 @@ public HoodieTimeline getCompletedSavepointTimeline() {
}

/**
* Get the list of savepoints in this table.
* Get the list of savepoint timestamps in this table.
*/
public List<String> getSavepoints() {
return getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
public Set<String> getSavepointTimestamps() {
return getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
}

public HoodieActiveTimeline getActiveTimeline() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieT
* Get the list of data file names savepointed.
*/
public Stream<String> getSavepointedDataFiles(String savepointTime) {
if (!hoodieTable.getSavepoints().contains(savepointTime)) {
if (!hoodieTable.getSavepointTimestamps().contains(savepointTime)) {
throw new HoodieSavepointException(
"Could not get data files for savepoint " + savepointTime + ". No such savepoint.");
}
Expand Down Expand Up @@ -227,7 +227,7 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(
+ " file versions. ");
List<CleanFileInfo> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
.flatMap(this::getSavepointedDataFiles)
.collect(Collectors.toList());

Expand Down Expand Up @@ -295,7 +295,7 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(S
List<CleanFileInfo> deletePaths = new ArrayList<>();

// Collect all the datafiles savepointed by all the savepoints
List<String> savepointedFiles = hoodieTable.getSavepoints().stream()
List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
.flatMap(this::getSavepointedDataFiles)
.collect(Collectors.toList());

Expand Down
Loading

0 comments on commit d1159ad

Please sign in to comment.