Skip to content

Commit

Permalink
[HUDI-2550] Expand File-Group candidates list for appending for MOR t…
Browse files Browse the repository at this point in the history
…ables (#3986)
  • Loading branch information
Alexey Kudinkin authored Nov 23, 2021
1 parent fe57e9b commit 3bdab01
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("When enabled, we allow duplicate keys even if inserts are routed to merge with an existing file (for ensuring file sizing)."
+ " This is only relevant for insert operation, since upsert, delete operations will ensure unique key constraints are maintained.");

public static final ConfigProperty<Integer> MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT = ConfigProperty
.key("hoodie.merge.small.file.group.candidates.limit")
.defaultValue(1)
.withDocumentation("Limits number of file groups, whose base file satisfies small-file limit, to consider for appending records during upsert operation. "
+ "Only applicable to MOR tables");

public static final ConfigProperty<Integer> CLIENT_HEARTBEAT_INTERVAL_IN_MS = ConfigProperty
.key("hoodie.client.heartbeat.interval_in_ms")
.defaultValue(60 * 1000)
Expand Down Expand Up @@ -1035,6 +1041,10 @@ public boolean allowDuplicateInserts() {
return getBoolean(MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE);
}

public int getSmallFileGroupCandidatesLimit() {
return getInt(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT);
}

public EngineType getEngineType() {
return engineType;
}
Expand Down Expand Up @@ -2116,6 +2126,11 @@ public Builder withMergeAllowDuplicateOnInserts(boolean routeInsertsToNewFiles)
return this;
}

public Builder withMergeSmallFileGroupCandidatesLimit(int limit) {
writeConfig.setValue(MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT, String.valueOf(limit));
return this;
}

public Builder withHeartbeatIntervalInMs(Integer heartbeatIntervalInMs) {
writeConfig.setValue(CLIENT_HEARTBEAT_INTERVAL_IN_MS, String.valueOf(heartbeatIntervalInMs));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ protected List<SmallFile> getSmallFiles(String partitionPath) {
return smallFileLocations;
}

public List<BucketInfo> getBucketInfos() {
return Collections.unmodifiableList(new ArrayList<>(bucketInfoMap.values()));
}

public BucketInfo getBucketInfo(int bucketNumber) {
return bucketInfoMap.get(bucketNumber);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;

import org.apache.hudi.table.action.commit.SmallFile;
import org.apache.hudi.table.action.commit.UpsertPartitioner;

import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -51,68 +52,68 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile, HoodieSparkEng

@Override
protected List<SmallFile> getSmallFiles(String partitionPath) {

// smallFiles only for partitionPath
List<SmallFile> smallFileLocations = new ArrayList<>();

// Init here since this class (and member variables) might not have been initialized
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();

// Find out all eligible small file slices
if (!commitTimeline.empty()) {
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
// find smallest file in partition and append to it
List<FileSlice> allSmallFileSlices = new ArrayList<>();
// If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
// it. Doing this overtime for a partition, we ensure that we handle small file issues
if (!table.getIndex().canIndexLogFiles()) {
// TODO : choose last N small files since there can be multiple small files written to a single partition
// by different spark partitions in a single batch
Option<FileSlice> smallFileSlice = Option.fromJavaOptional(table.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
.filter(
fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config
.getParquetSmallFileLimit())
.min((FileSlice left, FileSlice right) ->
left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1));
if (smallFileSlice.isPresent()) {
allSmallFileSlices.add(smallFileSlice.get());
}
if (commitTimeline.empty()) {
return Collections.emptyList();
}

HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();

// Find out all eligible small file slices, looking for
// smallest file in the partition to append to
List<FileSlice> smallFileSlicesCandidates = getSmallFileCandidates(partitionPath, latestCommitTime);
List<SmallFile> smallFileLocations = new ArrayList<>();

// Create SmallFiles from the eligible file slices
for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
SmallFile sf = new SmallFile();
if (smallFileSlice.getBaseFile().isPresent()) {
// TODO : Move logic of file name, file id, base commit time handling inside file slice
String filename = smallFileSlice.getBaseFile().get().getFileName();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
} else {
// If we can index log files, we can add more inserts to log files for fileIds NOT including those under
// pending compaction
List<FileSlice> allFileSlices =
table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
.collect(Collectors.toList());
for (FileSlice fileSlice : allFileSlices) {
if (isSmallFile(fileSlice)) {
allSmallFileSlices.add(fileSlice);
}
}
}
// Create SmallFiles from the eligible file slices
for (FileSlice smallFileSlice : allSmallFileSlices) {
SmallFile sf = new SmallFile();
if (smallFileSlice.getBaseFile().isPresent()) {
// TODO : Move logic of file name, file id, base commit time handling inside file slice
String filename = smallFileSlice.getBaseFile().get().getFileName();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
} else {
HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
FSUtils.getFileIdFromLogPath(logFile.getPath()));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
}
HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
FSUtils.getFileIdFromLogPath(logFile.getPath()));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
}
}
return smallFileLocations;
}

@Nonnull
private List<FileSlice> getSmallFileCandidates(String partitionPath, HoodieInstant latestCommitInstant) {
// If we can index log files, we can add more inserts to log files for fileIds NOT including those under
// pending compaction
if (table.getIndex().canIndexLogFiles()) {
return table.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitInstant.getTimestamp(), false)
.filter(this::isSmallFile)
.collect(Collectors.toList());
}

// If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
// it. Doing this overtime for a partition, we ensure that we handle small file issues
return table.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitInstant.getTimestamp(), false)
.filter(
fileSlice ->
// NOTE: We can not pad slices with existing log-files w/o compacting these,
// hence skipping
fileSlice.getLogFiles().count() < 1
&& fileSlice.getBaseFile().get().getFileSize() < config.getParquetSmallFileLimit())
.sorted(Comparator.comparing(fileSlice -> fileSlice.getBaseFile().get().getFileSize()))
.limit(config.getSmallFileGroupCandidatesLimit())
.collect(Collectors.toList());
}

public List<String> getSmallFileIds() {
return (List<String>) smallFiles.stream().map(smallFile -> ((SmallFile) smallFile).location.getFileId())
return smallFiles.stream().map(smallFile -> smallFile.location.getFileId())
.collect(Collectors.toList());
}

Expand All @@ -132,8 +133,12 @@ private boolean isSmallFile(FileSlice fileSlice) {

// TODO (NA) : Make this static part of utility
public long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles) {
long totalSizeOfLogFiles = hoodieLogFiles.stream().map(HoodieLogFile::getFileSize)
.filter(size -> size > 0).reduce(Long::sum).orElse(0L);
long totalSizeOfLogFiles =
hoodieLogFiles.stream()
.map(HoodieLogFile::getFileSize)
.filter(size -> size > 0)
.reduce(Long::sum)
.orElse(0L);
// Here we assume that if there is no base parquet file, all log files contain only inserts.
// We can then just get the parquet equivalent size of these log files, compare that with
// {@link config.getParquetMaxFileSize()} and decide if there is scope to insert more rows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -336,7 +337,6 @@ public void testUpsertPartitionerWithSmallFileHandlingWithInflightCompactionWith
HoodieCompactionPlan plan = CompactionTestUtils.createCompactionPlan(metaClient, "001", "002", 1, true, false);
FileCreateUtils.createRequestedCompactionCommit(basePath, "002", plan);
// Simulate one more commit so that inflight compaction is considered when building file groups in file system view
//
FileCreateUtils.createBaseFile(basePath, testPartitionPath, "003", "2", 1);
FileCreateUtils.createCommit(basePath, "003");

Expand Down Expand Up @@ -434,6 +434,49 @@ public void testUpsertPartitionerWithSmallFileHandlingWithCanIndexLogFiles() thr
"Insert should be assigned to fg1");
}

@Test
public void testUpsertPartitionerWithSmallFileHandlingPickingMultipleCandidates() throws Exception {
final String partitionPath = DEFAULT_PARTITION_PATHS[0];

HoodieWriteConfig config =
makeHoodieClientConfigBuilder()
.withMergeSmallFileGroupCandidatesLimit(3)
.withStorageConfig(
HoodieStorageConfig.newBuilder()
.parquetMaxFileSize(2048)
.build()
)
.build();

// Bootstrap base files ("small-file targets")
FileCreateUtils.createBaseFile(basePath, partitionPath, "002", "fg-1", 1024);
FileCreateUtils.createBaseFile(basePath, partitionPath, "002", "fg-2", 1024);
FileCreateUtils.createBaseFile(basePath, partitionPath, "002", "fg-3", 1024);

FileCreateUtils.createCommit(basePath, "002");

HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {partitionPath});
// Default estimated record size will be 1024 based on last file group created.
// Only 1 record can be added to small file
WorkloadProfile profile =
new WorkloadProfile(buildProfile(jsc.parallelize(dataGenerator.generateInserts("003", 3))));

HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(this.metaClient);

HoodieSparkTable<?> table = HoodieSparkTable.create(config, context, reloadedMetaClient);

SparkUpsertDeltaCommitPartitioner<?> partitioner = new SparkUpsertDeltaCommitPartitioner<>(profile, context, table, config);

assertEquals(3, partitioner.numPartitions());
assertEquals(
Arrays.asList(
new BucketInfo(BucketType.UPDATE, "fg-1", partitionPath),
new BucketInfo(BucketType.UPDATE, "fg-2", partitionPath),
new BucketInfo(BucketType.UPDATE, "fg-3", partitionPath)
),
partitioner.getBucketInfos());
}

private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() {
// Prepare the AvroParquetIO
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(SCHEMA.toString());
Expand Down

0 comments on commit 3bdab01

Please sign in to comment.