Skip to content

Commit

Permalink
[HUDI-4769] Option read.streaming.skip_compaction skips delta commit
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 committed Oct 1, 2022
1 parent f3d4ce9 commit fcfc1ba
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,33 @@ public final Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String partit
}
}

/**
* Stream all "merged" file-slices before on an instant time
* for a MERGE_ON_READ table with index that can index log files(which means it writes pure logs first).
*
* <p>In streaming read scenario, in order for better reading efficiency, the user can choose to skip the
* base files that are produced by compaction. That is to say, we allow the users to consumer only from
* these partitioned logs files, these log files can keep the record sequence just like the normal message queue.
*
* <p>NOTE: only local view is supported.
*
* @param partitionStr Partition Path
* @param maxInstantTime Max Instant Time
*/
public final Stream<FileSlice> getAllLogsMergedFileSliceBeforeOrOn(String partitionStr, String maxInstantTime) {
try {
readLock.lock();
String partition = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partition);
return fetchAllStoredFileGroups(partition)
.filter(fg -> !isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxInstantTime))
.map(fileGroup -> fetchAllLogsMergedFileSlice(fileGroup, maxInstantTime))
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
} finally {
readLock.unlock();
}
}

@Override
public final Stream<FileSlice> getLatestFileSliceInRange(List<String> commitsToReturn) {
try {
Expand Down Expand Up @@ -1076,6 +1103,29 @@ private FileSlice fetchMergedFileSlice(HoodieFileGroup fileGroup, FileSlice file
return fileSlice;
}

/**
* Returns the file slice with all the file slice log files merged.
*
* @param fileGroup File Group for which the file slice belongs to
* @param maxInstantTime The max instant time
*/
private Option<FileSlice> fetchAllLogsMergedFileSlice(HoodieFileGroup fileGroup, String maxInstantTime) {
List<FileSlice> fileSlices = fileGroup.getAllFileSlicesBeforeOn(maxInstantTime).collect(Collectors.toList());
if (fileSlices.size() == 0) {
return Option.empty();
}
if (fileSlices.size() == 1) {
return Option.of(fileSlices.get(0));
}
final FileSlice latestSlice = fileSlices.get(0);
FileSlice merged = new FileSlice(latestSlice.getPartitionPath(), latestSlice.getBaseInstantTime(),
latestSlice.getFileId());

// add log files from the latest slice to the earliest
fileSlices.forEach(slice -> slice.getLogFiles().forEach(merged::addLogFile));
return Option.of(merged);
}

/**
* Default implementation for fetching latest base-file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
Expand Down Expand Up @@ -220,7 +221,7 @@ public Result inputSplits(
: instants.get(instants.size() - 1).getTimestamp();

List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
fileStatuses, readPartitions, endInstant, instantRange);
fileStatuses, readPartitions, endInstant, instantRange, false);

return Result.instance(inputSplits, endInstant);
}
Expand Down Expand Up @@ -268,6 +269,17 @@ public Result inputSplits(
return Result.EMPTY;
}
fileStatuses = fileIndex.getFilesInPartitions();

if (fileStatuses.length == 0) {
LOG.warn("No files found for reading under path: " + path);
return Result.EMPTY;
}

final String endInstant = instantToIssue.getTimestamp();
List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
fileStatuses, readPartitions, endInstant, null, false);

return Result.instance(inputSplits, endInstant);
} else {
List<HoodieCommitMetadata> activeMetadataList = instants.stream()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
Expand All @@ -290,18 +302,18 @@ public Result inputSplits(
return Result.EMPTY;
}
fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
}

if (fileStatuses.length == 0) {
LOG.warn("No files found for reading in user provided path.");
return Result.EMPTY;
}
if (fileStatuses.length == 0) {
LOG.warn("No files found for reading under path: " + path);
return Result.EMPTY;
}

final String endInstant = instantToIssue.getTimestamp();
List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
fileStatuses, readPartitions, endInstant, instantRange);
final String endInstant = instantToIssue.getTimestamp();
List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
fileStatuses, readPartitions, endInstant, instantRange, skipCompaction);

return Result.instance(inputSplits, endInstant);
return Result.instance(inputSplits, endInstant);
}
}

/**
Expand Down Expand Up @@ -351,7 +363,7 @@ public Result inputSplitsCDC(

final String endInstant = instantToIssue.getTimestamp();
List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
fileStatuses, readPartitions, endInstant, null);
fileStatuses, readPartitions, endInstant, null, false);

return Result.instance(inputSplits, endInstant);
} else {
Expand Down Expand Up @@ -401,12 +413,13 @@ private List<MergeOnReadInputSplit> getInputSplits(
FileStatus[] fileStatuses,
Set<String> readPartitions,
String endInstant,
InstantRange instantRange) {
InstantRange instantRange,
boolean skipBaseFiles) {
final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
final AtomicInteger cnt = new AtomicInteger(0);
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
return readPartitions.stream()
.map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant)
.map(relPartitionPath -> getFileSlices(fsView, relPartitionPath, endInstant, skipBaseFiles)
.map(fileSlice -> {
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
Expand All @@ -421,6 +434,15 @@ private List<MergeOnReadInputSplit> getInputSplits(
.collect(Collectors.toList());
}

private static Stream<FileSlice> getFileSlices(
HoodieTableFileSystemView fsView,
String relPartitionPath,
String endInstant,
boolean skipBaseFiles) {
return skipBaseFiles ? fsView.getAllLogsMergedFileSliceBeforeOrOn(relPartitionPath, endInstant)
: fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant);
}

private FileIndex getFileIndex() {
FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf, rowType);
if (this.requiredPartitions != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;

import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
Expand All @@ -63,6 +64,7 @@
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;

/**
* Test cases for MergeOnReadInputFormat and ParquetInputFormat.
Expand Down Expand Up @@ -398,6 +400,70 @@ void testReadFromEarliestWithChangeLogCOW(HoodieCDCSupplementalLoggingMode mode)
assertThat(actual, is(expected));
}

@Test
void testReadSkipCompaction() throws Exception {
beforeEach(HoodieTableType.MERGE_ON_READ);

org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf);

// write base first with compaction
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
TestData.writeData(TestData.DATA_SET_INSERT, conf);

InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat(true);
assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));

HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder()
.rowType(TestConfigurations.ROW_TYPE)
.conf(conf)
.path(FilePathUtils.toFlinkPath(metaClient.getBasePathV2()))
.requiredPartitions(new HashSet<>(Arrays.asList("par1", "par2", "par3", "par4")))
.skipCompaction(true)
.build();

// default read the latest commit
// the compaction base files are skipped
IncrementalInputSplits.Result splits1 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null);
assertFalse(splits1.isEmpty());
List<RowData> result1 = readData(inputFormat, splits1.getInputSplits().toArray(new MergeOnReadInputSplit[0]));

String actual1 = TestData.rowDataToString(result1);
String expected1 = TestData.rowDataToString(TestData.DATA_SET_INSERT);
assertThat(actual1, is(expected1));

// write another commit using logs and read again
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);

// read from the compaction commit
String secondCommit = TestUtils.getNthCompleteInstant(metaClient.getBasePath(), 0, false);
conf.setString(FlinkOptions.READ_START_COMMIT, secondCommit);

IncrementalInputSplits.Result splits2 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null);
assertFalse(splits2.isEmpty());
List<RowData> result2 = readData(inputFormat, splits2.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
String actual2 = TestData.rowDataToString(result2);
String expected2 = TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
assertThat(actual2, is(expected2));

// write another commit using logs with separate partition
// so the file group has only logs
TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf);

// refresh the input format
this.tableSource.reset();
inputFormat = this.tableSource.getInputFormat(true);

IncrementalInputSplits.Result splits3 = incrementalInputSplits.inputSplits(metaClient, hadoopConf, null);
assertFalse(splits3.isEmpty());
List<RowData> result3 = readData(inputFormat, splits3.getInputSplits().toArray(new MergeOnReadInputSplit[0]));
String actual3 = TestData.rowDataToString(result3);
String expected3 = TestData.rowDataToString(TestData.DATA_SET_UPDATE_INSERT);
assertThat(actual3, is(expected3));
}

@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {
Expand Down

0 comments on commit fcfc1ba

Please sign in to comment.