Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5067] Merge the columns stats of multiple log blocks from the s… #7018

Merged
merged 1 commit into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ private void processAppendResult(AppendResult result, List<IndexedRecord> record
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangesMetadataMap =
collectColumnRangeMetadata(recordList, fieldsToIndex, stat.getPath());

stat.setRecordsStats(columnRangesMetadataMap);
stat.putRecordsStats(columnRangesMetadataMap);
}

resetWriteCounts();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.util.FlinkClientUtil;
import org.apache.hudi.util.WriteStatMerger;

import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -115,7 +116,13 @@ protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
@Override
public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
List<HoodieWriteStat> writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
// for eager flush, multiple write stat may share one file path.
List<HoodieWriteStat> merged = writeStats.stream()
.collect(Collectors.groupingBy(writeStat -> writeStat.getPartitionPath() + writeStat.getPath()))
.values().stream()
.map(duplicates -> duplicates.stream().reduce(WriteStatMerger::merge).get())
.collect(Collectors.toList());
return commitStats(instantTime, merged, extraMetadata, commitActionType, partitionToReplacedFileIds);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.util;

import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Helper clazz to merge hoodie write stats that belong to one file path.
*
* <p>CAUTION: The merge can be buggy, we need to maintain the new variables for the write stat.
*/
public class WriteStatMerger {
public static HoodieWriteStat merge(HoodieWriteStat stat1, HoodieWriteStat stat2) {
if (stat1 instanceof HoodieDeltaWriteStat) {
return mergeDeltaWriteStat((HoodieDeltaWriteStat) stat1, (HoodieDeltaWriteStat) stat2);
}
return mergeWriteStat(new HoodieWriteStat(), stat1, stat2);
}

private static HoodieDeltaWriteStat mergeDeltaWriteStat(
HoodieDeltaWriteStat stat1,
HoodieDeltaWriteStat stat2) {
HoodieDeltaWriteStat merged = new HoodieDeltaWriteStat();
mergeWriteStat(merged, stat1, stat2);
merged.setLogVersion(stat2.getLogVersion());
merged.setLogOffset(stat2.getLogOffset());
merged.setBaseFile(stat2.getBaseFile());
// log files
List<String> mergedLogFiles = new ArrayList<>(stat1.getLogFiles());
for (String logFile : stat2.getLogFiles()) {
if (!mergedLogFiles.contains(logFile)) {
mergedLogFiles.add(logFile);
}
}
merged.setLogFiles(mergedLogFiles);
// column stats
if (stat1.getColumnStats().isPresent()) {
merged.putRecordsStats(stat1.getColumnStats().get());
}
if (stat2.getColumnStats().isPresent()) {
merged.putRecordsStats(stat2.getColumnStats().get());
}
return merged;
}

private static HoodieWriteStat mergeWriteStat(HoodieWriteStat merged, HoodieWriteStat stat1, HoodieWriteStat stat2) {
merged.setFileId(stat2.getFileId());
merged.setPath(stat2.getPath());
// merge cdc stats
merged.setCdcStats(getMergedCdcStats(stat1.getCdcStats(), stat2.getCdcStats()));
// prev commit
merged.setPrevCommit(stat2.getPrevCommit());

merged.setNumWrites(stat2.getNumWrites() + stat1.getNumWrites());
merged.setNumDeletes(stat2.getNumDeletes() + stat1.getNumDeletes());
merged.setNumUpdateWrites(stat2.getNumUpdateWrites() + stat1.getNumUpdateWrites());
merged.setNumInserts(stat2.getNumInserts() + stat1.getNumInserts());
merged.setTotalWriteBytes(stat2.getTotalWriteBytes() + stat1.getTotalWriteBytes());
merged.setTotalWriteErrors(stat2.getTotalWriteErrors() + stat1.getTotalWriteErrors());

// -------------------------------------------------------------------------
// Nullable
// -------------------------------------------------------------------------

// tmp path
merged.setTempPath(stat2.getTempPath());
// partition path
merged.setPartitionPath(stat2.getPartitionPath());
// runtime stats
merged.setRuntimeStats(getMergedRuntimeStats(stat1.getRuntimeStats(), stat2.getRuntimeStats()));

// log statistics
merged.setTotalLogRecords(stat2.getTotalLogRecords() + stat1.getTotalLogRecords());
merged.setTotalLogFilesCompacted(stat2.getTotalLogFilesCompacted() + stat1.getTotalLogFilesCompacted());
merged.setTotalLogSizeCompacted(stat2.getTotalLogSizeCompacted() + stat1.getTotalLogSizeCompacted());
merged.setTotalUpdatedRecordsCompacted(stat2.getTotalUpdatedRecordsCompacted() + stat1.getTotalUpdatedRecordsCompacted());
merged.setTotalLogBlocks(stat2.getTotalLogBlocks() + stat1.getTotalLogBlocks());
merged.setTotalCorruptLogBlock(stat2.getTotalCorruptLogBlock() + stat1.getTotalCorruptLogBlock());
merged.setTotalRollbackBlocks(stat2.getTotalRollbackBlocks() + stat1.getTotalRollbackBlocks());
merged.setFileSizeInBytes(stat2.getFileSizeInBytes() + stat1.getFileSizeInBytes());
// event time
merged.setMinEventTime(minLong(stat1.getMinEventTime(), stat2.getMinEventTime()));
merged.setMaxEventTime(maxLong(stat1.getMaxEventTime(), stat2.getMaxEventTime()));
return stat2;
}

private static HoodieWriteStat.RuntimeStats getMergedRuntimeStats(
HoodieWriteStat.RuntimeStats runtimeStats1,
HoodieWriteStat.RuntimeStats runtimeStats2) {
final HoodieWriteStat.RuntimeStats runtimeStats;
if (runtimeStats1 != null && runtimeStats2 != null) {
runtimeStats = new HoodieWriteStat.RuntimeStats();
runtimeStats.setTotalScanTime(runtimeStats1.getTotalScanTime() + runtimeStats2.getTotalScanTime());
runtimeStats.setTotalUpsertTime(runtimeStats1.getTotalUpsertTime() + runtimeStats2.getTotalUpsertTime());
runtimeStats.setTotalCreateTime(runtimeStats1.getTotalCreateTime() + runtimeStats2.getTotalCreateTime());
} else if (runtimeStats1 == null) {
runtimeStats = runtimeStats2;
} else {
runtimeStats = runtimeStats1;
}
return runtimeStats;
}

private static Map<String, Long> getMergedCdcStats(Map<String, Long> cdcStats1, Map<String, Long> cdcStats2) {
final Map<String, Long> cdcStats;
if (cdcStats1 != null && cdcStats2 != null) {
cdcStats = new HashMap<>();
cdcStats.putAll(cdcStats1);
cdcStats.putAll(cdcStats2);
} else if (cdcStats1 == null) {
cdcStats = cdcStats2;
} else {
cdcStats = cdcStats1;
}
return cdcStats;
}

private static Long minLong(Long v1, Long v2) {
if (v1 == null) {
return v2;
}
if (v2 == null) {
return v1;
}
return v1.compareTo(v2) < 0 ? v1 : v2;
}

private static Long maxLong(Long v1, Long v2) {
if (v1 == null) {
return v2;
}
if (v2 == null) {
return v1;
}
return v1.compareTo(v2) > 0 ? v1 : v2;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,41 @@ public static HoodieColumnRangeMetadata<Comparable> stub(String filePath,
String columnName) {
return new HoodieColumnRangeMetadata<>(filePath, columnName, null, null, -1, -1, -1, -1);
}

/**
* Merges the given two column range metadata.
*/
public static HoodieColumnRangeMetadata<Comparable> merge(
HoodieColumnRangeMetadata<Comparable> left,
HoodieColumnRangeMetadata<Comparable> right) {
String filePath = left.getFilePath();
String columnName = left.getColumnName();
Comparable min = minVal(left.getMinValue(), right.getMinValue());
Comparable max = maxVal(left.getMaxValue(), right.getMaxValue());
long nullCount = left.getNullCount() + right.getNullCount();
long valueCount = left.getValueCount() + right.getValueCount();
long totalSize = left.getTotalSize() + right.getTotalSize();
long totalUncompressedSize = left.getTotalUncompressedSize() + right.getTotalUncompressedSize();
return create(filePath, columnName, min, max, nullCount, valueCount, totalSize, totalUncompressedSize);
}

private static Comparable minVal(Comparable val1, Comparable val2) {
if (val1 == null) {
return val2;
}
if (val2 == null) {
return val1;
}
return val1.compareTo(val2) < 0 ? val1 : val2;
}

private static Comparable maxVal(Comparable val1, Comparable val2) {
if (val1 == null) {
return val2;
}
if (val2 == null) {
return val1;
}
return val1.compareTo(val2) > 0 ? val1 : val2;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.util.Option;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -74,11 +75,35 @@ public List<String> getLogFiles() {
return logFiles;
}

public void putRecordsStats(Map<String, HoodieColumnRangeMetadata<Comparable>> stats) {
if (!recordsStats.isPresent()) {
recordsStats = Option.of(stats);
} else {
// in case there are multiple log blocks for one write process.
recordsStats = Option.of(mergeRecordsStats(recordsStats.get(), stats));
}
}

// keep for serialization efficiency
public void setRecordsStats(Map<String, HoodieColumnRangeMetadata<Comparable>> stats) {
recordsStats = Option.of(stats);
}

public Option<Map<String, HoodieColumnRangeMetadata<Comparable>>> getColumnStats() {
return recordsStats;
}

private static Map<String, HoodieColumnRangeMetadata<Comparable>> mergeRecordsStats(
Map<String, HoodieColumnRangeMetadata<Comparable>> stats1,
Map<String, HoodieColumnRangeMetadata<Comparable>> stats2) {
Map<String, HoodieColumnRangeMetadata<Comparable>> mergedStats = new HashMap<>(stats1);
for (Map.Entry<String, HoodieColumnRangeMetadata<Comparable>> entry : stats2.entrySet()) {
final String colName = entry.getKey();
final HoodieColumnRangeMetadata<Comparable> metadata = mergedStats.containsKey(colName)
? HoodieColumnRangeMetadata.merge(mergedStats.get(colName), entry.getValue())
: entry.getValue();
mergedStats.put(colName, metadata);
}
return mergedStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1408,6 +1408,51 @@ void testWriteAndReadWithDataSkipping(HoodieTableType tableType) {
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}

@Test
void testMultipleLogBlocksWithDataSkipping() {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.METADATA_ENABLED, true)
.option("hoodie.metadata.index.column.stats.enable", true)
.option("hoodie.metadata.index.column.stats.file.group.count", 2)
.option("hoodie.metadata.index.column.stats.column.list", "ts")
.option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
.option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
.option("hoodie.logfile.data.block.max.size", 1)
.end();
tableEnv.executeSql(hoodieTableDDL);

execInsertSql(tableEnv, TestSQL.INSERT_SAME_KEY_T1);

// apply filters
List<Row> result2 = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1 where ts > TIMESTAMP '1970-01-01 00:00:04'").execute().collect());
assertRowsEquals(result2, "[+I[id1, Danny, 23, 1970-01-01T00:00:05, par1]]");
}

@Test
void testEagerFlushWithDataSkipping() {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.METADATA_ENABLED, true)
.option("hoodie.metadata.index.column.stats.enable", true)
.option("hoodie.metadata.index.column.stats.file.group.count", 2)
.option("hoodie.metadata.index.column.stats.column.list", "ts")
.option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
.option(FlinkOptions.WRITE_BATCH_SIZE, 0.00001)
.end();
tableEnv.executeSql(hoodieTableDDL);

execInsertSql(tableEnv, TestSQL.INSERT_SAME_KEY_T1);

// apply filters
List<Row> result2 = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1 where ts > TIMESTAMP '1970-01-01 00:00:04'").execute().collect());
assertRowsEquals(result2, "[+I[id1, Danny, 23, 1970-01-01T00:00:05, par1]]");
}

@Test
void testBuiltinFunctionWithHMSCatalog() {
TableEnvironment tableEnv = batchTableEnv;
Expand Down