Skip to content

Commit

Permalink
Check for more results when the filtered result is empty
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Check for more results when the filtered result is null when loading data.

### Why are the changes needed?
Fix Alluxio#18043.

			pr-link: Alluxio#18133
			change-id: cid-a05d66db230971ba6585b732c7bb2990ba02f7f7
  • Loading branch information
Haoning-Sun authored and codings-dan committed Dec 21, 2023
1 parent e7c7d8a commit 64cfcbe
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -151,28 +152,38 @@ private void partialListFileInfos() {
if (!mStartAfter.isEmpty()) {
mListOptions.setDisableAreDescendantsLoadedCheck(true);
}
ListStatusContext context = ListStatusContext.create(ListStatusPartialPOptions
.newBuilder()
.setOptions(mListOptions)
.setBatchSize(PARTIAL_LISTING_BATCH_SIZE)
.setStartAfter(mStartAfter));
listFileInfos(context);
Supplier<ListStatusContext> context = () -> {
return ListStatusContext.create(ListStatusPartialPOptions
.newBuilder()
.setOptions(mListOptions)
.setBatchSize(PARTIAL_LISTING_BATCH_SIZE)
.setStartAfter(mStartAfter));
};

List<FileInfo> fileInfos;
while ((fileInfos = listStatus(context.get())) != null
&& (mFiles = fileInfos.stream().filter(mFilter).collect(Collectors.toList())).isEmpty()
&& !fileInfos.isEmpty()) {
mStartAfter = fileInfos.get(fileInfos.size() - 1).getPath();
mListOptions.setDisableAreDescendantsLoadedCheck(true);
}
if (mFiles.size() > 0) {
mStartAfter = mFiles
.get(mFiles.size() - 1)
.getPath();
}
updateIterator();
}

private void listFileInfos(ListStatusContext context) {
mFiles = listStatus(context).stream().filter(mFilter).collect(Collectors.toList());
updateIterator();
}

private List<FileInfo> listStatus(ListStatusContext context) {
try {
AuthenticatedClientUser.set(mUser.orElse(null));
mFiles = mFileSystemMaster
.listStatus(new AlluxioURI(mPath), context)
.stream()
.filter(mFilter)
.collect(Collectors.toList());
mFileInfoIterator = mFiles.iterator();
return mFileSystemMaster.listStatus(new AlluxioURI(mPath), context);
} catch (FileDoesNotExistException | InvalidPathException e) {
throw new NotFoundRuntimeException(e);
} catch (AccessControlException e) {
Expand All @@ -182,6 +193,10 @@ private void listFileInfos(ListStatusContext context) {
} finally {
AuthenticatedClientUser.remove();
}
}

private void updateIterator() {
mFileInfoIterator = mFiles.iterator();
mTotalFileCount.set(mFiles.size());
mTotalByteCount.set(mFiles
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import alluxio.grpc.Block;
import alluxio.grpc.JobProgressReportFormat;
import alluxio.master.file.FileSystemMaster;
import alluxio.master.file.contexts.ListStatusContext;
import alluxio.master.job.FileIterable;
import alluxio.master.job.LoadJob;
import alluxio.scheduler.job.JobState;
Expand Down Expand Up @@ -80,6 +81,64 @@ public void testGetNextBatch()
assertEquals(0, batch.size());
}

@Test
public void testGetNextBatchWithPartialListing()
throws FileDoesNotExistException, AccessControlException, IOException, InvalidPathException {
List<FileInfo> fileInfos = generateRandomFileInfo(400, 2, 64 * Constants.MB);

for (int i = 0; i < 100; i++) {
fileInfos.get(i).setInAlluxioPercentage(100);
}
for (int i = 200; i < 300; i++) {
fileInfos.get(i).setInAlluxioPercentage(100);
}
for (int i = 0; i < 10; i++) {
fileInfos.get(300 + i * i).setInAlluxioPercentage(100);
}

FileSystemMaster fileSystemMaster = mock(FileSystemMaster.class);
when(fileSystemMaster.listStatus(any(), any())).thenAnswer(invocation -> {
ListStatusContext context = invocation.getArgument(1, ListStatusContext.class);
int fileSize = fileInfos.size();
int from = 0;
int to = fileSize;
if (context.isPartialListing()) {
String startAfter = context.getPartialOptions().get().getStartAfter();
int batch = context.getPartialOptions().get().getBatchSize();
for (int i = 0; i < fileSize; i++) {
if (startAfter.equals(fileInfos.get(i).getPath())) {
from = i + 1;
break;
}
}
to = fileSize < from + batch ? fileSize : from + batch;
}
return fileInfos.subList(from, to);
});
String testPath = "test";
Optional<String> user = Optional.of("user");
FileIterable files =
new FileIterable(fileSystemMaster, testPath, user, true,
LoadJob.QUALIFIED_FILE_FILTER);
LoadJob load =
new LoadJob(testPath, user, "1", OptionalLong.empty(), true, false, files);

List<Block> batch = load.getNextBatchBlocks(100);
assertEquals(100, batch.size());
assertEquals(50, batch.stream().map(Block::getUfsPath).distinct().count());

batch = load.getNextBatchBlocks(200);
assertEquals(200, batch.size());
assertEquals(100, batch.stream().map(Block::getUfsPath).distinct().count());

batch = load.getNextBatchBlocks(300);
assertEquals(80, batch.size());
assertEquals(40, batch.stream().map(Block::getUfsPath).distinct().count());

batch = load.getNextBatchBlocks(100);
assertEquals(0, batch.size());
}

@Test
public void testIsHealthy()
throws FileDoesNotExistException, AccessControlException, IOException, InvalidPathException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,13 @@ private static FileInfo createFileInfo(int blockCount, long blockSizeLimit) {
Random random = new Random();
FileInfo info = new FileInfo();
String ufs = CommonUtils.randomAlphaNumString(6);
String filePath = CommonUtils.randomAlphaNumString(6);
long blockSize = Math.abs(random.nextLong() % blockSizeLimit);
List<Long> blockIds = LongStream.range(0, blockCount)
.map(i -> random.nextLong())
.boxed()
.collect(ImmutableList.toImmutableList());
info.setUfsPath(ufs)
info.setUfsPath(ufs).setPath(filePath)
.setBlockSizeBytes(blockSize)
.setLength(blockSizeLimit * blockCount)
.setBlockIds(blockIds)
Expand Down

0 comments on commit 64cfcbe

Please sign in to comment.