Skip to content

Commit

Permalink
[HUDI-4792] Batch clean files to delete (apache#6580)
Browse files Browse the repository at this point in the history
This  patch makes use of batch call to get fileGroup to delete during cleaning instead of 1 call per partition.
This limit the number of call to the view and should fix the trouble with metadata table in context of lot of partitions.
Fixes issue apache#6373

Co-authored-by: sivabalan <[email protected]>
  • Loading branch information
2 people authored and codope committed Oct 22, 2022
1 parent 4c110c1 commit 095f947
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -116,9 +117,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());

Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
.parallelize(partitionsToClean, cleanerParallelism)
.mapPartitions(partitionIterator -> {
List<String> partitionList = new ArrayList<>();
partitionIterator.forEachRemaining(partitionList::add);
Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanResult = planner.getDeletePaths(partitionList);
return cleanResult.entrySet().iterator();
}, false).collectAsList()
.stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey,
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce
HoodieWriteConfig writeConfig = getConfigBuilder(true)
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).withMaxNumDeltaCommitsBeforeCompaction(2).build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) {
Expand All @@ -81,7 +81,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce
client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4);

// 2nd write batch; 4 commits for the 4th partition; the 4th commit to trigger archiving the replace commit
// 2nd write batch; 4 commits for the 3rd partition; the 4th commit to trigger archiving the replace commit
for (int i = 5; i < 9; i++) {
String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000);
client.startCommitWithTime(instantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActi
this.metaClient = metaClient;
refreshTimeline(visibleActiveTimeline);
resetFileGroupsReplaced(visibleCommitsAndCompactionTimeline);
this.bootstrapIndex = BootstrapIndex.getBootstrapIndex(metaClient);
this.bootstrapIndex = BootstrapIndex.getBootstrapIndex(metaClient);
// Load Pending Compaction Operations
resetPendingCompactionOperations(CompactionUtils.getAllPendingCompactionOperations(metaClient).values().stream()
.map(e -> Pair.of(e.getKey(), CompactionOperation.convertFromAvroRecordInstance(e.getValue()))));
Expand All @@ -121,7 +121,7 @@ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActi

/**
* Refresh commits timeline.
*
*
* @param visibleActiveTimeline Visible Active Timeline
*/
protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
Expand Down Expand Up @@ -163,13 +163,13 @@ public List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
* Build FileGroups from passed in file-status.
*/
protected List<HoodieFileGroup> buildFileGroups(FileStatus[] statuses, HoodieTimeline timeline,
boolean addPendingCompactionFileSlice) {
boolean addPendingCompactionFileSlice) {
return buildFileGroups(convertFileStatusesToBaseFiles(statuses), convertFileStatusesToLogFiles(statuses), timeline,
addPendingCompactionFileSlice);
}

protected List<HoodieFileGroup> buildFileGroups(Stream<HoodieBaseFile> baseFileStream,
Stream<HoodieLogFile> logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) {
Stream<HoodieLogFile> logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) {
Map<Pair<String, String>, List<HoodieBaseFile>> baseFiles =
baseFileStream.collect(Collectors.groupingBy(baseFile -> {
String partitionPathStr = getPartitionPathFor(baseFile);
Expand Down Expand Up @@ -227,7 +227,7 @@ private void resetFileGroupsReplaced(HoodieTimeline timeline) {

// get replace instant mapping for each partition, fileId
return replaceMetadata.getPartitionToReplaceFileIds().entrySet().stream().flatMap(entry -> entry.getValue().stream().map(e ->
new AbstractMap.SimpleEntry<>(new HoodieFileGroupId(entry.getKey(), e), instant)));
new AbstractMap.SimpleEntry<>(new HoodieFileGroupId(entry.getKey(), e), instant)));
} catch (HoodieIOException ex) {

if (ex.getIOException() instanceof FileNotFoundException) {
Expand Down Expand Up @@ -417,7 +417,7 @@ protected boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
* With async compaction, it is possible to see partial/complete base-files due to inflight-compactions, Ignore those
* base-files.
*
* @param fileSlice File Slice
* @param fileSlice File Slice
* @param includeEmptyFileSlice include empty file-slice
*/
protected Stream<FileSlice> filterBaseFileAfterPendingCompaction(FileSlice fileSlice, boolean includeEmptyFileSlice) {
Expand Down Expand Up @@ -557,8 +557,8 @@ public final Option<HoodieBaseFile> getBaseFileOn(String partitionStr, String in
return Option.empty();
} else {
return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> fileGroup.getAllBaseFiles()
.filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS,
instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst().orElse(null))
.filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS,
instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst().orElse(null))
.map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, fileId), df));
}
} finally {
Expand Down Expand Up @@ -593,8 +593,8 @@ public final Stream<HoodieBaseFile> getLatestBaseFilesInRange(List<String> commi
return fetchAllStoredFileGroups()
.filter(fileGroup -> !isFileGroupReplacedBeforeAny(fileGroup.getFileGroupId(), commitsToReturn))
.map(fileGroup -> Pair.of(fileGroup.getFileGroupId(), Option.fromJavaOptional(
fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime())
&& !isBaseFileDueToPendingCompaction(baseFile) && !isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent())
fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime())
&& !isBaseFileDueToPendingCompaction(baseFile) && !isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent())
.map(p -> addBootstrapBaseFileIfPresent(p.getKey(), p.getValue().get()));
} finally {
readLock.unlock();
Expand Down Expand Up @@ -624,9 +624,9 @@ public final Stream<FileSlice> getLatestFileSlices(String partitionStr) {
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
return fetchLatestFileSlices(partitionPath)
.filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, true))
.map(this::addBootstrapBaseFileIfPresent);
.filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, true))
.map(this::addBootstrapBaseFileIfPresent);
} finally {
readLock.unlock();
}
Expand Down Expand Up @@ -681,26 +681,26 @@ public final Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionSt

@Override
public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionStr, String maxCommitTime,
boolean includeFileSlicesInPendingCompaction) {
boolean includeFileSlicesInPendingCompaction) {
try {
readLock.lock();
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
Stream<Stream<FileSlice>> allFileSliceStream = fetchAllStoredFileGroups(partitionPath)
.filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
.map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
.filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
.map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
if (includeFileSlicesInPendingCompaction) {
return allFileSliceStream.map(sliceStream -> sliceStream.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, false)))
.map(sliceStream -> Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
.map(this::addBootstrapBaseFileIfPresent);
.map(sliceStream -> Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
.map(this::addBootstrapBaseFileIfPresent);
} else {
return allFileSliceStream
.map(sliceStream ->
Option.fromJavaOptional(sliceStream
.filter(slice -> !isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
.filter(slice -> !slice.isEmpty())
.findFirst()))
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
.map(sliceStream ->
Option.fromJavaOptional(sliceStream
.filter(slice -> !isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
.filter(slice -> !slice.isEmpty())
.findFirst()))
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
}
} finally {
readLock.unlock();
Expand Down Expand Up @@ -792,6 +792,15 @@ public final Stream<HoodieFileGroup> getAllFileGroups(String partitionStr) {
return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg));
}

@Override
public final Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
List<Pair<String, List<HoodieFileGroup>>> allFileGroups = new ArrayList<>();
partitionPaths.forEach(partitionPath -> {
allFileGroups.add(Pair.of(partitionPath, getAllFileGroups(partitionPath).collect(Collectors.toList())));
});
return allFileGroups.stream();
}

private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final String partitionStr) {
try {
readLock.lock();
Expand Down Expand Up @@ -899,8 +908,8 @@ public final Stream<Pair<HoodieFileGroupId, HoodieInstant>> getFileGroupsInPendi
protected abstract boolean isPendingClusteringScheduledForFileId(HoodieFileGroupId fgId);

/**
* Get pending clustering instant time for specified file group. Return None if file group is not in pending
* clustering operation.
* Get pending clustering instant time for specified file group. Return None if file group is not in pending
* clustering operation.
*/
protected abstract Option<HoodieInstant> getPendingClusteringInstant(final HoodieFileGroupId fileGroupId);

Expand Down Expand Up @@ -1002,7 +1011,7 @@ protected abstract Option<Pair<String, CompactionOperation>> getPendingLogCompac
* Add a complete partition view to store.
*
* @param partitionPath Partition Path
* @param fileGroups File Groups for the partition path
* @param fileGroups File Groups for the partition path
*/
abstract void storePartitionView(String partitionPath, List<HoodieFileGroup> fileGroups);

Expand Down Expand Up @@ -1122,7 +1131,7 @@ Stream<FileSlice> fetchLatestFileSlices(String partitionPath) {
/**
* Helper to merge last 2 file-slices. These 2 file-slices do not have compaction done yet.
*
* @param lastSlice Latest File slice for a file-group
* @param lastSlice Latest File slice for a file-group
* @param penultimateSlice Penultimate file slice for a file-group in commit timeline order
*/
private static FileSlice mergeCompactionPendingFileSlices(FileSlice lastSlice, FileSlice penultimateSlice) {
Expand Down Expand Up @@ -1187,7 +1196,7 @@ private Option<FileSlice> fetchAllLogsMergedFileSlice(HoodieFileGroup fileGroup,
* Default implementation for fetching latest base-file.
*
* @param partitionPath Partition path
* @param fileId File Id
* @param fileId File Id
* @return base File if present
*/
protected Option<HoodieBaseFile> fetchLatestBaseFile(String partitionPath, String fileId) {
Expand All @@ -1199,7 +1208,7 @@ protected Option<HoodieBaseFile> fetchLatestBaseFile(String partitionPath, Strin
* Default implementation for fetching file-slice.
*
* @param partitionPath Partition path
* @param fileId File Id
* @param fileId File Id
* @return File Slice if present
*/
public Option<FileSlice> fetchLatestFileSlice(String partitionPath, String fileId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
}

@Override
public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
return execute(partitionPaths, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
}

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
Expand Down Expand Up @@ -378,6 +380,16 @@ public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
}
}

@Override
public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
ArrayList<Pair<String, List<HoodieFileGroup>>> fileGroupPerPartitionList = new ArrayList<>();
for (String partitionPath : partitionPaths) {
Stream<HoodieFileGroup> fileGroup = getAllFileGroups(partitionPath);
fileGroupPerPartitionList.add(Pair.of(partitionPath, fileGroup.collect(Collectors.toList())));
}
return fileGroupPerPartitionList.stream();
}

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,18 @@ interface SliceViewWithLatestSlice {
/**
* Stream all latest file slices in given partition with precondition that commitTime(file) before maxCommitTime.
*
* @param partitionPath Partition path
* @param maxCommitTime Max Instant Time
* @param partitionPath Partition path
* @param maxCommitTime Max Instant Time
* @param includeFileSlicesInPendingCompaction include file-slices that are in pending compaction
*/
Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime,
boolean includeFileSlicesInPendingCompaction);
boolean includeFileSlicesInPendingCompaction);

/**
* Stream all "merged" file-slices before on an instant time If a file-group has a pending compaction request, the
* file-slice before and after compaction request instant is merged and returned.
*
* @param partitionPath Partition Path
*
* @param partitionPath Partition Path
* @param maxInstantTime Max Instant Time
* @return
*/
Expand Down Expand Up @@ -149,10 +149,12 @@ interface SliceView extends SliceViewWithLatestSlice {
*/
Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);

Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths);

/**
* Return Pending Compaction Operations.
*
* @return Pair<Pair<InstantTime,CompactionOperation>>
* @return Pair<Pair < InstantTime, CompactionOperation>>
*/
Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations();

Expand Down

0 comments on commit 095f947

Please sign in to comment.