Skip to content

Commit

Permalink
[HUDI-6734] Add back HUDI-5409: Avoid file index and use fs view cach…
Browse files Browse the repository at this point in the history
…e in COW input format

This reverts commit 2567ada.

 Conflicts:
	hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
	hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
  • Loading branch information
aajisaka committed Aug 29, 2023
1 parent 759907e commit 343fd5d
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private Runnable getPreExecuteRunnable() {
@SuppressWarnings("unchecked")
@Test
@Timeout(value = 60)
public void testRecordReading() throws Exception {
public void testRecordReading() {

final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);
ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableQueryType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -55,12 +58,13 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;

/**
* Base implementation of the Hive's {@link FileInputFormat} allowing for reading of Hudi's
Expand Down Expand Up @@ -186,7 +190,8 @@ protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
}

protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, HoodieTableMetaClient metaClient) {
protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, Option<HoodieInstant> latestCompletedInstantOpt,
String tableBasePath, HoodieTableMetaClient metaClient) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();

if (baseFileOpt.isPresent()) {
Expand Down Expand Up @@ -232,31 +237,83 @@ private List<FileStatus> listStatusForSnapshotMode(JobConf job,
boolean shouldIncludePendingCommits =
HoodieHiveUtils.shouldIncludePendingCommits(job, tableMetaClient.getTableConfig().getTableName());

HiveHoodieTableFileIndex fileIndex =
new HiveHoodieTableFileIndex(
engineContext,
tableMetaClient,
props,
HoodieTableQueryType.SNAPSHOT,
partitionPaths,
queryCommitInstant,
shouldIncludePendingCommits);

Map<String, List<FileSlice>> partitionedFileSlices = fileIndex.listFileSlices();

targetFiles.addAll(
partitionedFileSlices.values()
.stream()
.flatMap(Collection::stream)
.filter(fileSlice -> checkIfValidFileSlice(fileSlice))
.map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex, tableMetaClient))
.collect(Collectors.toList())
);
if (conf.getBoolean(ENABLE.key(), ENABLE.defaultValue()) && HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient)) {
HiveHoodieTableFileIndex fileIndex =
new HiveHoodieTableFileIndex(
engineContext,
tableMetaClient,
props,
HoodieTableQueryType.SNAPSHOT,
partitionPaths,
queryCommitInstant,
shouldIncludePendingCommits);

Map<String, List<FileSlice>> partitionedFileSlices = fileIndex.listFileSlices();

targetFiles.addAll(
partitionedFileSlices.values()
.stream()
.flatMap(Collection::stream)
.filter(fileSlice -> checkIfValidFileSlice(fileSlice))
.map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex.getLatestCompletedInstant(),
fileIndex.getBasePath().toString(), tableMetaClient))
.collect(Collectors.toList())
);
} else {
String basePath = tableMetaClient.getBasePathV2().toString();
Map<HoodieTableMetaClient, HoodieTableFileSystemView> fsViewCache = new HashMap<>();
HoodieTimeline timeline = getActiveTimeline(tableMetaClient, shouldIncludePendingCommits);
Option<String> queryInstant = queryCommitInstant.or(() -> timeline.lastInstant().map(HoodieInstant::getTimestamp));
validateInstant(timeline, queryInstant);

try {
HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(tableMetaClient, hoodieTableMetaClient ->
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, hoodieTableMetaClient,
HoodieInputFormatUtils.buildMetadataConfig(job), timeline));

List<FileSlice> filteredFileSlices = new ArrayList<>();

for (Path p : entry.getValue()) {
String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), p);

List<FileSlice> fileSlices = queryInstant.map(
instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, instant))
.orElse(fsView.getLatestFileSlices(relativePartitionPath))
.collect(Collectors.toList());

filteredFileSlices.addAll(fileSlices);
}

targetFiles.addAll(
filteredFileSlices.stream()
.filter(fileSlice -> checkIfValidFileSlice(fileSlice))
.map(fileSlice -> createFileStatusUnchecked(fileSlice, timeline.filterCompletedInstants().lastInstant(),
basePath, tableMetaClient))
.collect(Collectors.toList()));
} finally {
fsViewCache.forEach(((metaClient, fsView) -> fsView.close()));
}
}
}

return targetFiles;
}

private static HoodieTimeline getActiveTimeline(HoodieTableMetaClient metaClient, boolean shouldIncludePendingCommits) {
HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline();
if (shouldIncludePendingCommits) {
return timeline;
} else {
return timeline.filterCompletedAndCompactionInstants();
}
}

private static void validateInstant(HoodieTimeline activeTimeline, Option<String> queryInstant) {
if (queryInstant.isPresent() && !activeTimeline.containsInstant(queryInstant.get())) {
throw new HoodieIOException(String.format("Query instant (%s) not found in the timeline", queryInstant.get()));
}
}

protected boolean checkIfValidFileSlice(FileSlice fileSlice) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
Expand All @@ -271,11 +328,6 @@ protected boolean checkIfValidFileSlice(FileSlice fileSlice) {
}
}

private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {
List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
checkState(diff.isEmpty(), "Should be empty");
}

@Nonnull
protected static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.HiveHoodieTableFileIndex;
import org.apache.hudi.hadoop.HoodieCopyOnWriteTableInputFormat;
import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.RealtimeFileStatus;
Expand Down Expand Up @@ -92,14 +91,12 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
}

@Override
protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, HoodieTableMetaClient metaClient) {
protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, Option<HoodieInstant> latestCompletedInstantOpt,
String tableBasePath, HoodieTableMetaClient metaClient) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();

Option<HoodieInstant> latestCompletedInstantOpt = fileIndex.getLatestCompletedInstant();
String tableBasePath = fileIndex.getBasePath().toString();

// Check if we're reading a MOR table
if (baseFileOpt.isPresent()) {
return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, getHoodieVirtualKeyInfo(metaClient));
Expand Down

0 comments on commit 343fd5d

Please sign in to comment.