Skip to content

Commit

Permalink
add ut
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuanshenbsj1 committed Nov 20, 2023
1 parent 45443c1 commit 0d2b1cf
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ private void resetFileGroupsReplaced(HoodieTimeline timeline) {

@Override
public void close() {
writeLock.lock();
try {
writeLock.lock();
this.metaClient = null;
this.completionTimeQueryView = null;
this.visibleCommitsAndCompactionTimeline = null;
Expand Down Expand Up @@ -320,7 +320,6 @@ public void reset() {
* Clear the resource.
*/
protected void clear() {
assert globalLock.isWriteLockedByCurrentThread();
addedPartitions.clear();
resetViewState();
bootstrapIndex = null;
Expand Down Expand Up @@ -936,7 +935,7 @@ public final Map<String, Stream<FileSlice>> getAllLatestFileSlicesBeforeOrOn(Str
.map(this::addBootstrapBaseFileIfPresent)
));
} finally {
writeLock.lock();
writeLock.unlock();
}
}

Expand Down Expand Up @@ -1027,6 +1026,11 @@ public final Stream<HoodieFileGroup> getAllFileGroups(String partitionStr) {
return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg));
}

@Override
public boolean isPartitionAvailableInStoreForTest(String partitionStr) {
return isPartitionAvailableInStore(partitionStr);
}

@Override
public Void cleanFileGroupsInPartitions(List<String> partitionPaths) {
writeLock.lock();
Expand Down Expand Up @@ -1260,6 +1264,10 @@ protected abstract Option<Pair<String, CompactionOperation>> getPendingLogCompac
*/
abstract void storePartitionView(String partitionPath, List<HoodieFileGroup> fileGroups);

/**
* Remove a partation view from store.
* @param partitionPath
*/
abstract void removePartitionView(String partitionPath);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,19 +451,14 @@ public Stream<FileSlice> fetchLatestFileSlicesIncludingInflight(String partition

@Override
public void close() {
writeLock.lock();
try {
super.close();
this.fgIdToPendingCompaction = null;
this.fgIdToPendingLogCompaction = null;
this.partitionToFileGroupsMap = null;
this.fgIdToBootstrapBaseFile = null;
this.fgIdToReplaceInstants = null;
this.fgIdToPendingClustering = null;
this.closed = true;
} finally {
writeLock.unlock();
}
super.close();
this.fgIdToPendingCompaction = null;
this.fgIdToPendingLogCompaction = null;
this.partitionToFileGroupsMap = null;
this.fgIdToBootstrapBaseFile = null;
this.fgIdToReplaceInstants = null;
this.fgIdToPendingClustering = null;
this.closed = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
}

@Override
public boolean isPartitionAvailableInStoreForTest(String partitionPath) {
return execute(partitionPath, preferredView::isPartitionAvailableInStoreForTest, secondaryView::isPartitionAvailableInStoreForTest);
}

@Override
public Void cleanFileGroupsInPartitions(List<String> partitionPaths) {
return execute(partitionPaths, preferredView::cleanFileGroupsInPartitions, secondaryView::cleanFileGroupsInPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
public static final String ALL_FILEGROUPS_FOR_PARTITION_URL =
String.format("%s/%s", BASE_URL, "filegroups/all/partition/");

public static final String IS_PARTITION_AVAILABLE_IN_STORE =
String.format("%s/%s", BASE_URL, "partitions/available/");

public static final String CLEAN_FILEGROUPS_FOR_PARTITION_URL =
String.format("%s/%s", BASE_URL, "filegroups/clean/partitions/");

Expand Down Expand Up @@ -441,6 +444,16 @@ public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
}
}

@Override
public boolean isPartitionAvailableInStoreForTest(String partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
try {
return executeRequest(IS_PARTITION_AVAILABLE_IN_STORE, paramsMap, new TypeReference<Boolean>() {}, RequestMethod.GET);
} catch (IOException e) {
throw new HoodieRemoteException(e);
}
}

@Override
public Void cleanFileGroupsInPartitions(List<String> partitionPaths) {
Map<String, String> paramsMap = getParamsWithPartitionPaths(partitionPaths);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ protected void storePartitionView(String partitionPath, List<HoodieFileGroup> fi

@Override
protected void removePartitionView(String partitionPath) {
LOG.info("Removing partition (" + partitionPath + ") to ROCKSDB based file-system view at "
LOG.info("Removing partition (" + partitionPath + ") from ROCKSDB based file-system view at "
+ config.getRocksdbBasePath());

String lookupKey = schemaHelper.getKeyForPartitionLookup(partitionPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;

import java.util.List;
Expand Down Expand Up @@ -169,7 +170,15 @@ interface SliceView extends SliceViewWithLatestSlice {
Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);

/**
* clean up the file groups info for given partitions.
* Checks if partition is pre-loaded and available in store.
*
* NOTE: This method could only be used in tests
*/
@VisibleForTesting
boolean isPartitionAvailableInStoreForTest(String partitionPath);

/**
* Clean up the file groups info for given partitions.
*/
Void cleanFileGroupsInPartitions(List<String> partitionPaths);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@

import static org.apache.hudi.common.model.HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX;
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -328,6 +328,31 @@ public void testViewForFileSlicesWithPartitionMetadataFile() throws Exception {
assertEquals(2, fsView.getAllFileGroups(partitionPath).count());
}

@Test
public void testViewForCleanFileGroupsInPartitions() throws Exception {
String partitionPath = "2023/09/13";
new File(basePath + "/" + partitionPath).mkdirs();
new File(basePath + "/" + partitionPath + "/" + HOODIE_PARTITION_METAFILE_PREFIX + ".parquet").mkdirs();

// create 2 fileId in partition
String fileId1 = UUID.randomUUID().toString();
String fileId2 = UUID.randomUUID().toString();
String commitTime1 = "1";
String fileName1 = FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, fileId1);
String fileName2 = FSUtils.makeBaseFileName(commitTime1, TEST_WRITE_TOKEN, fileId2);
new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile();
new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile();
HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();

HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1);
saveAsComplete(commitTimeline, instant1, Option.empty());

fsView.getAllFileGroups(partitionPath);
assertTrue(fsView.isPartitionAvailableInStoreForTest(partitionPath));
fsView.cleanFileGroupsInPartitions(Collections.singletonList(partitionPath));
assertFalse(fsView.isPartitionAvailableInStoreForTest(partitionPath));
}

@Test
protected void testInvalidLogFiles() throws Exception {
String partitionPath = "2016/05/01";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,14 @@ private void registerFileSlicesAPI() {
writeValueAsString(ctx, dtos);
}, true));

app.post(RemoteHoodieTableFileSystemView.IS_PARTITION_AVAILABLE_IN_STORE, new ViewHandler(ctx -> {
metricsRegistry.add("IS_PARTITION_AVAILABLE_IN_STORE", 1);
boolean success = sliceHandler.isPartitionAvailableInStore(
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault(""));
writeValueAsString(ctx, success);
}, false));

app.post(RemoteHoodieTableFileSystemView.CLEAN_FILEGROUPS_FOR_PARTITION_URL, new ViewHandler(ctx -> {
metricsRegistry.add("CLEAN_FILEGROUPS_FOR_PARTITION", 1);
boolean success = sliceHandler.cleanFileGroupsInPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public List<FileGroupDTO> getAllFileGroups(String basePath, String partitionPath
return DTOUtils.fileGroupDTOsfromFileGroups(fileGroups);
}

public boolean isPartitionAvailableInStore(String basePath, String partitionPath) {
viewManager.getFileSystemView(basePath).isPartitionAvailableInStoreForTest(partitionPath);
return true;
}

public boolean cleanFileGroupsInPartitions(String basePath, List<String> partitionPaths) {
viewManager.getFileSystemView(basePath).cleanFileGroupsInPartitions(partitionPaths);
return true;
Expand Down

0 comments on commit 0d2b1cf

Please sign in to comment.