Skip to content

Commit

Permalink
[ENG-15363] Fixing issue of extractor stopping in case of zero batche…
Browse files Browse the repository at this point in the history
…s in current page (#127)
  • Loading branch information
karankm97 authored Oct 25, 2024
1 parent e123414 commit 16fe87e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ public CompletableFuture<Checkpoint> paginatedBatchUploadWithCheckpoint(
// startAfter is used only in the first call to get the objects, post that continuation token is
// used
// Resetting the firstIncompleteCommitFile so that we do not process from the same commit again
// All commit files will be processed after firstIncompleteCommitFile, and the checkpoint will be
// updated accordingly
String startAfter = getStartAfterString(prefix, checkpoint, true);
return executePaginatedBatchUpload(
tableId,
Expand Down Expand Up @@ -283,6 +285,20 @@ private CompletableFuture<Checkpoint> uploadInstantsInSequentialBatches(
int numBatches = batches.size();

if (numBatches == 0) {
// In case of CONTINUE_ON_INCOMPLETE_COMMIT, the extractor also needs to check subsequent pages hence
// returning a non-null checkpoint to continue processing.
if (
CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE.equals(commitTimelineType) &&
extractorConfig
.getUploadStrategy()
.equals(MetadataExtractorConfig.UploadStrategy.CONTINUE_ON_INCOMPLETE_COMMIT)
) {
log.info(
"No batches found in current page for table {} timeline {}",
table,
commitTimelineType);
return CompletableFuture.completedFuture(checkpoint);
}
log.info(
"Could not create batches with completed commits for table {} timeline {}",
table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static ai.onehouse.constants.MetadataExtractorConstants.HOODIE_PROPERTIES_FILE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
Expand All @@ -28,6 +29,7 @@
import ai.onehouse.storage.models.File;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import java.time.Instant;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -129,7 +131,12 @@ void testContinueOnIncompleteCommitApproach() {
TABLE_PREFIX + "/.hoodie/",
null,
"table/.hoodie/777.rollback", // last successful commit is used for checkpointing
new ArrayList<>());
new ArrayList<>(
Arrays.asList(
generateFileObj("888.deltacommit.requested"),
// No batches returned, however it will return based on null continuation token
generateFileObj("888.deltacommit.inflight")
)));

List<File> batch1 =
Stream.of(
Expand Down Expand Up @@ -196,6 +203,7 @@ void testContinueOnIncompleteCommitApproach() {
CommitTimelineType.COMMIT_TIMELINE_TYPE_ACTIVE)
.join();

assertNotNull(response);
verify(asyncStorageClient, times(2)).fetchObjectsByPage(anyString(), anyString(), any(), any());
verifyFilesUploaded(
batch1.stream()
Expand Down

0 comments on commit 16fe87e

Please sign in to comment.