Skip to content

Commit

Permalink
[HUDI-5409] Avoid file index and use fs view cache in COW input format (
Browse files Browse the repository at this point in the history
apache#7493)

- This PR falls back to the original code path using fs view cache as in 0.10.1 or earlier, instead of creating file index.

- Query engines using initial InputFormat based integration will not be using file index. Instead directly fetch file status from fs view cache.
  • Loading branch information
codope authored Dec 17, 2022
1 parent 7b17e6f commit cc1c1e7
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.spark.TaskContext$;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import scala.Tuple2;
Expand Down Expand Up @@ -85,10 +86,11 @@ private Runnable getPreExecuteRunnable() {

// Test to ensure that we are reading all records from queue iterator in the same order
// without any exceptions.
@Disabled("Disabled for unblocking 0.12.2 release. Disruptor queue is not part of this minor release. Tracked in HUDI-5410")
@SuppressWarnings("unchecked")
@Test
@Timeout(value = 60)
public void testRecordReading() throws Exception {
public void testRecordReading() {

final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(instantTime, 100);
ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,9 @@

package org.apache.hudi.hadoop;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
Expand All @@ -42,29 +30,51 @@
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.buildMetadataConfig;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getFileStatus;

/**
* Base implementation of the Hive's {@link FileInputFormat} allowing for reading of Hudi's
Expand Down Expand Up @@ -190,7 +200,7 @@ protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
}

protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, Option<HoodieInstant> instantOpt, String basePath, Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();

if (baseFileOpt.isPresent()) {
Expand Down Expand Up @@ -223,6 +233,7 @@ private List<FileStatus> listStatusForSnapshotMode(JobConf job,

Map<HoodieTableMetaClient, List<Path>> groupedPaths =
HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
Map<HoodieTableMetaClient, HoodieTableFileSystemView> fsViewCache = new HashMap<>();

for (Map.Entry<HoodieTableMetaClient, List<Path>> entry : groupedPaths.entrySet()) {
HoodieTableMetaClient tableMetaClient = entry.getKey();
Expand All @@ -236,33 +247,83 @@ private List<FileStatus> listStatusForSnapshotMode(JobConf job,
boolean shouldIncludePendingCommits =
HoodieHiveUtils.shouldIncludePendingCommits(job, tableMetaClient.getTableConfig().getTableName());

HiveHoodieTableFileIndex fileIndex =
new HiveHoodieTableFileIndex(
engineContext,
tableMetaClient,
props,
HoodieTableQueryType.SNAPSHOT,
partitionPaths,
queryCommitInstant,
shouldIncludePendingCommits);

Map<String, List<FileSlice>> partitionedFileSlices = fileIndex.listFileSlices();

Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt = getHoodieVirtualKeyInfo(tableMetaClient);

targetFiles.addAll(
partitionedFileSlices.values()
.stream()
.flatMap(Collection::stream)
.filter(fileSlice -> checkIfValidFileSlice(fileSlice))
.map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex, virtualKeyInfoOpt))
.collect(Collectors.toList())
);
// NOTE: Fetching virtual key info is a costly operation as it needs to load the commit metadata.
// This is only needed for MOR realtime splits. Hence, for COW tables, this can be avoided.
Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt = tableMetaClient.getTableType().equals(COPY_ON_WRITE) ? Option.empty() : getHoodieVirtualKeyInfo(tableMetaClient);
String basePath = tableMetaClient.getBasePathV2().toString();

if (conf.getBoolean(ENABLE.key(), DEFAULT_METADATA_ENABLE_FOR_READERS) && HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient)) {
HiveHoodieTableFileIndex fileIndex =
new HiveHoodieTableFileIndex(
engineContext,
tableMetaClient,
props,
HoodieTableQueryType.SNAPSHOT,
partitionPaths,
queryCommitInstant,
shouldIncludePendingCommits);

Map<String, List<FileSlice>> partitionedFileSlices = fileIndex.listFileSlices();

targetFiles.addAll(
partitionedFileSlices.values()
.stream()
.flatMap(Collection::stream)
.filter(fileSlice -> checkIfValidFileSlice(fileSlice))
.map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex.getLatestCompletedInstant(), basePath, virtualKeyInfoOpt))
.collect(Collectors.toList())
);
} else {
HoodieTimeline timeline = getActiveTimeline(tableMetaClient, shouldIncludePendingCommits);
Option<String> queryInstant = queryCommitInstant.or(() -> timeline.lastInstant().map(HoodieInstant::getTimestamp));
validateInstant(timeline, queryInstant);

try {
HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(tableMetaClient, hoodieTableMetaClient ->
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, hoodieTableMetaClient, buildMetadataConfig(job), timeline));

List<FileSlice> filteredFileSlices = new ArrayList<>();

for (Path p : entry.getValue()) {
String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), p);

List<FileSlice> fileSlices = queryInstant.map(
instant -> fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, instant))
.orElse(fsView.getLatestFileSlices(relativePartitionPath))
.collect(Collectors.toList());

filteredFileSlices.addAll(fileSlices);
}

targetFiles.addAll(
filteredFileSlices.stream()
.filter(fileSlice -> checkIfValidFileSlice(fileSlice))
.map(fileSlice -> createFileStatusUnchecked(fileSlice, timeline.filterCompletedInstants().lastInstant(), basePath, virtualKeyInfoOpt))
.collect(Collectors.toList()));
} finally {
fsViewCache.forEach(((metaClient, fsView) -> fsView.close()));
}
}
}

return targetFiles;
}

private static HoodieTimeline getActiveTimeline(HoodieTableMetaClient metaClient, boolean shouldIncludePendingCommits) {
HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline();
if (shouldIncludePendingCommits) {
return timeline;
} else {
return timeline.filterCompletedAndCompactionInstants();
}
}

private static void validateInstant(HoodieTimeline activeTimeline, Option<String> queryInstant) {
if (queryInstant.isPresent() && !activeTimeline.containsInstant(queryInstant.get())) {
throw new HoodieIOException(String.format("Query instant (%s) not found in the timeline", queryInstant.get()));
}
}

protected boolean checkIfValidFileSlice(FileSlice fileSlice) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
Expand All @@ -277,15 +338,10 @@ protected boolean checkIfValidFileSlice(FileSlice fileSlice) {
}
}

private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {
List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
checkState(diff.isEmpty(), "Should be empty");
}

@Nonnull
protected static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
try {
return HoodieInputFormatUtils.getFileStatus(baseFile);
return getFileStatus(baseFile);
} catch (IOException ioe) {
throw new HoodieIOException("Failed to get file-status", ioe);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,6 @@

package org.apache.hudi.hadoop.realtime;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand All @@ -43,13 +33,23 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.HiveHoodieTableFileIndex;
import org.apache.hudi.hadoop.HoodieCopyOnWriteTableInputFormat;
import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -86,14 +86,14 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
}

@Override
protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, HiveHoodieTableFileIndex fileIndex, Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
protected FileStatus createFileStatusUnchecked(FileSlice fileSlice,
Option<HoodieInstant> latestCompletedInstantOpt,
String tableBasePath,
Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();

Option<HoodieInstant> latestCompletedInstantOpt = fileIndex.getLatestCompletedInstant();
String tableBasePath = fileIndex.getBasePath().toString();

// Check if we're reading a MOR table
if (baseFileOpt.isPresent()) {
return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, tableBasePath, latestCompletedInstantOpt, virtualKeyInfoOpt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ public static HoodieMetadataConfig buildMetadataConfig(Configuration conf) {
* @param dataFile
* @return
*/
private static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFile dataFile) {
public static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFile dataFile) {
Path dataPath = dataFile.getFileStatus().getPath();
try {
if (dataFile.getFileSize() == 0) {
Expand Down

0 comments on commit cc1c1e7

Please sign in to comment.