Skip to content

Commit

Permalink
clean
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuanshenbsj1 committed Nov 22, 2023
1 parent 3913dca commit fc27baa
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -118,17 +120,25 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {

context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());

Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean, earliestInstant)), cleanerParallelism)
.stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
Map<String, List<HoodieCleanFileInfo>> cleanOps = new HashMap<>();
List<String> partitionsToDelete = new ArrayList<>();
for (int i = 0; i < partitionsToClean.size(); i += cleanerParallelism) {
List<String> subPartitionsToClean = partitionsToClean.subList(i, Math.min(i + cleanerParallelism, partitionsToClean.size()));
Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
.map(subPartitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean, earliestInstant)), cleanerParallelism)
.stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));

Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey,
e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue())));
Map<String, List<HoodieCleanFileInfo>> subCleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey,
e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue())));

List<String> partitionsToDelete = cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey)
.collect(Collectors.toList());
List<String> subPartitionsToDelete = cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey)
.collect(Collectors.toList());

cleanOps.putAll(subCleanOps);
partitionsToDelete.addAll(subPartitionsToDelete);
}

return new HoodieCleanerPlan(earliestInstant
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(
// In other words, the file versions only apply to the active file groups.
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));
boolean toDeletePartition = false;
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
int keepVersions = config.getCleanerFileVersionsRetained();
// do not cleanup slice required for pending compaction
Expand Down Expand Up @@ -329,7 +329,7 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(S
// all replaced file groups before earliestCommitToRetain are eligible to clean
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetain));
// add active files
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,19 @@ protected Map<Pair<String, Path>, FileStatus[]> listPartitions(
return fileStatusMap;
}

/**
* Return all files status that belong to a specific partition。
*/
public FileStatus[] getFileStatusForPartition(String partitionPathStr) throws IOException {
Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPathStr);
long beginLsTs = System.currentTimeMillis();
FileStatus[] statuses = listPartition(partitionPath);
long endLsTs = System.currentTimeMillis();
LOG.debug("#files found in partition (" + partitionPathStr + ") =" + statuses.length + ", Time taken ="
+ (endLsTs - beginLsTs));
return statuses;
}

/**
* Allows lazily loading the partitions if needed.
*
Expand All @@ -449,15 +462,7 @@ private void ensurePartitionLoadedCorrectly(String partition) {
// Not loaded yet
try {
LOG.info("Building file system view for partition (" + partitionPathStr + ")");

Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPathStr);
long beginLsTs = System.currentTimeMillis();
FileStatus[] statuses = listPartition(partitionPath);
long endLsTs = System.currentTimeMillis();
LOG.debug("#files found in partition (" + partitionPathStr + ") =" + statuses.length + ", Time taken ="
+ (endLsTs - beginLsTs));
List<HoodieFileGroup> groups = addFilesToView(statuses);

List<HoodieFileGroup> groups = addFilesToView(getFileStatusForPartition(partitionPathStr));
if (groups.isEmpty()) {
storePartitionView(partitionPathStr, new ArrayList<>());
}
Expand Down Expand Up @@ -706,7 +711,6 @@ public final Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOn(String partitio
public final Map<String, Stream<HoodieBaseFile>> getAllLatestBaseFilesBeforeOrOn(String maxCommitTime) {
try {
readLock.lock();

List<String> formattedPartitionList = ensureAllPartitionsLoadedCorrectly();
return formattedPartitionList.stream().collect(Collectors.toMap(
Function.identity(),
Expand Down Expand Up @@ -1014,6 +1018,20 @@ public final Stream<HoodieFileGroup> getAllFileGroups(String partitionStr) {
return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg));
}

@Override
public final Stream<HoodieFileGroup> getAllFileGroupsStateless(String partitionStr) {
if (isPartitionAvailableInStore(partitionStr)) {
return getAllFileGroups(partitionStr);
} else {
return getAllFileGroupsIncludingReplacedStateless(partitionStr);
}
}

@Override
public boolean isPartitionAvailableInStoreForTest(String partitionStr) {
return isPartitionAvailableInStore(partitionStr);
}

private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final String partitionStr) {
try {
readLock.lock();
Expand All @@ -1027,14 +1045,28 @@ private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final String p
}
}

private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplacedStateless(final String partitionStr) {
String partition = formatPartitionKey(partitionStr);
List<HoodieFileGroup> groups;
try {
groups = buildFileGroups(getFileStatusForPartition(partition), visibleCommitsAndCompactionTimeline, true);
} catch (IOException e) {
throw new HoodieIOException("Failed to list base files in partition " + partitionStr, e);
}
return groups.stream();
}

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxCommitTime));
}

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath) {
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedBefore(fg.getFileGroupId(), maxCommitTime));
if (isReplacedFileExistWithinSpecifiedPartition(partitionPath)) {
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedBefore(fg.getFileGroupId(), maxCommitTime));
}
return (new ArrayList<HoodieFileGroup>()).stream();
}

@Override
Expand All @@ -1044,7 +1076,10 @@ public Stream<HoodieFileGroup> getReplacedFileGroupsAfterOrOn(String minCommitTi

@Override
public Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath) {
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplaced(fg.getFileGroupId()));
if (isReplacedFileExistWithinSpecifiedPartition(partitionPath)) {
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplaced(fg.getFileGroupId()));
}
return (new ArrayList<HoodieFileGroup>()).stream();
}

@Override
Expand Down Expand Up @@ -1263,6 +1298,8 @@ protected abstract Option<Pair<String, CompactionOperation>> getPendingLogCompac
*/
protected abstract void removeReplacedFileIdsAtInstants(Set<String> instants);

protected abstract boolean isReplacedFileExistWithinSpecifiedPartition(String partitionPath);

/**
* Track instant time for file groups replaced.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,11 @@ protected void removeReplacedFileIdsAtInstants(Set<String> instants) {
fgIdToReplaceInstants.entrySet().removeIf(entry -> instants.contains(entry.getValue().getTimestamp()));
}

@Override
protected boolean isReplacedFileExistWithinSpecifiedPartition(String partitionPath) {
return fgIdToReplaceInstants.keySet().stream().anyMatch(fg -> fg.getPartitionPath().equals(partitionPath));
}

@Override
protected Option<HoodieInstant> getReplaceInstant(final HoodieFileGroupId fileGroupId) {
return Option.ofNullable(fgIdToReplaceInstants.get(fileGroupId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,16 @@ public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
}

@Override
public Stream<HoodieFileGroup> getAllFileGroupsStateless(String partitionPath) {
return execute(partitionPath, preferredView::getAllFileGroupsStateless, secondaryView::getAllFileGroupsStateless);
}

@Override
public boolean isPartitionAvailableInStoreForTest(String partitionStr) {
throw new UnsupportedOperationException("isPartitionAvailableInStoreForTest() is not supported for PriorityBasedFileSystemView!");
}

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
public static final String ALL_FILEGROUPS_FOR_PARTITION_URL =
String.format("%s/%s", BASE_URL, "filegroups/all/partition/");

public static final String ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL =
String.format("%s/%s", BASE_URL, "filegroups/all/partition/stateless");

public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON =
String.format("%s/%s", BASE_URL, "filegroups/replaced/beforeoron/");

Expand All @@ -123,6 +126,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
public static final String LOAD_ALL_PARTITIONS_URL = String.format("%s/%s", BASE_URL, "loadallpartitions/");

public static final String PARTITION_PARAM = "partition";
public static final String PARTITIONS_PARAM = "partitions";
public static final String BASEPATH_PARAM = "basepath";
public static final String INSTANT_PARAM = "instant";
public static final String MAX_INSTANT_PARAM = "maxinstant";
Expand Down Expand Up @@ -202,6 +206,13 @@ private Map<String, String> getParamsWithPartitionPath(String partitionPath) {
return paramsMap;
}

private Map<String, String> getParamsWithPartitionPaths(List<String> partitionPaths) {
Map<String, String> paramsMap = new HashMap<>();
paramsMap.put(BASEPATH_PARAM, basePath);
paramsMap.put(PARTITIONS_PARAM, StringUtils.join(partitionPaths.toArray(new String[0]), ","));
return paramsMap;
}

private Map<String, String> getParams() {
Map<String, String> paramsMap = new HashMap<>();
paramsMap.put(BASEPATH_PARAM, basePath);
Expand Down Expand Up @@ -430,6 +441,23 @@ public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
}
}

@Override
public Stream<HoodieFileGroup> getAllFileGroupsStateless(String partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
try {
List<FileGroupDTO> fileGroups = executeRequest(ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL, paramsMap,
new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
} catch (IOException e) {
throw new HoodieRemoteException(e);
}
}

@Override
public boolean isPartitionAvailableInStoreForTest(String partitionStr) {
throw new UnsupportedOperationException("isPartitionAvailableInStoreForTest() is not supported for RemoteHoodieTableFileSystemView!");
}

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,10 @@ protected void removeReplacedFileIdsAtInstants(Set<String> instants) {
);
}

protected boolean isReplacedFileExistWithinSpecifiedPartition(String partitionPath) {
throw new UnsupportedOperationException("isReplacedFileExistWithinSpecifiedPartition() is not supported for RocksDbBasedFileSystemView!");
}

@Override
protected Option<HoodieInstant> getReplaceInstant(final HoodieFileGroupId fileGroupId) {
String lookupKey = schemaHelper.getKeyForReplacedFileGroup(fileGroupId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;

import java.util.List;
Expand Down Expand Up @@ -168,6 +169,19 @@ interface SliceView extends SliceViewWithLatestSlice {
*/
Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);

/**
* Stream all the file groups for a given partition.
*/
Stream<HoodieFileGroup> getAllFileGroupsStateless(String partitionPath);

/**
* Checks if partition is pre-loaded and available in store.
*
* NOTE: This method could only be used in tests
*/
@VisibleForTesting
boolean isPartitionAvailableInStoreForTest(String partitionPath);

/**
* Return Pending Compaction Operations.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@

import static org.apache.hudi.common.model.HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX;
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -328,6 +328,46 @@ public void testViewForFileSlicesWithPartitionMetadataFile() throws Exception {
assertEquals(2, fsView.getAllFileGroups(partitionPath).count());
}

@Test
public void testViewForCleanFileGroupsInPartitions() throws Exception {
String partitionPath1 = "2023/11/22";
new File(basePath + "/" + partitionPath1).mkdirs();
new File(basePath + "/" + partitionPath1 + "/" + HOODIE_PARTITION_METAFILE_PREFIX + ".parquet").mkdirs();
String partitionPath2 = "2023/11/23";
new File(basePath + "/" + partitionPath2).mkdirs();
new File(basePath + "/" + partitionPath2 + "/" + HOODIE_PARTITION_METAFILE_PREFIX + ".parquet").mkdirs();

// create 2 fileId in partition1
String fileId1 = UUID.randomUUID().toString();
String fileId2 = UUID.randomUUID().toString();
String commitTime1 = "1";
String fileName1 = FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, fileId1);
String fileName2 = FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, fileId2);
new File(basePath + "/" + partitionPath1 + "/" + fileName1).createNewFile();
new File(basePath + "/" + partitionPath1 + "/" + fileName2).createNewFile();

HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1);
saveAsComplete(commitTimeline, instant1, Option.empty());

// create 2 fileId in partition2
String fileId3 = UUID.randomUUID().toString();
String fileId4 = UUID.randomUUID().toString();
String commitTime2 = "2";
String fileName3 = FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN, fileId3);
String fileName4 = FSUtils.makeBaseFileName(commitTime2, TEST_WRITE_TOKEN, fileId4);
new File(basePath + "/" + partitionPath2 + "/" + fileName3).createNewFile();
new File(basePath + "/" + partitionPath2 + "/" + fileName4).createNewFile();

HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime2);
saveAsComplete(commitTimeline, instant2, Option.empty());

fsView.getAllFileGroups(partitionPath1);
assertTrue(fsView.isPartitionAvailableInStoreForTest(partitionPath1));
fsView.getAllFileGroupsStateless(partitionPath2);
assertFalse(fsView.isPartitionAvailableInStoreForTest(partitionPath2));
}

@Test
protected void testInvalidLogFiles() throws Exception {
String partitionPath = "2016/05/01";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
import org.apache.hudi.common.table.timeline.dto.InstantStateDTO;
import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
import org.apache.hudi.common.table.timeline.dto.InstantDTO;
import org.apache.hudi.common.table.timeline.dto.InstantStateDTO;
import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
Expand Down Expand Up @@ -420,6 +420,14 @@ private void registerFileSlicesAPI() {
writeValueAsString(ctx, dtos);
}, true));

app.get(RemoteHoodieTableFileSystemView.ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL, new ViewHandler(ctx -> {
metricsRegistry.add("ALL_FILEGROUPS_FOR_PARTITION_STATELESS", 1);
List<FileGroupDTO> dtos = sliceHandler.getAllFileGroupsStateless(
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault(""));
writeValueAsString(ctx, dtos);
}, true));

app.post(RemoteHoodieTableFileSystemView.REFRESH_TABLE, new ViewHandler(ctx -> {
metricsRegistry.add("REFRESH_TABLE", 1);
boolean success = sliceHandler
Expand Down
Loading

0 comments on commit fc27baa

Please sign in to comment.