Skip to content

Commit

Permalink
[HUDI-5067] Merge the columns stats of multiple log blocks from the s…
Browse files Browse the repository at this point in the history
…ame log file (apache#7018)
  • Loading branch information
danny0405 authored and Alexey Kudinkin committed Dec 14, 2022
1 parent de14b95 commit 265fef6
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ private void processAppendResult(AppendResult result, List<IndexedRecord> record
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangesMetadataMap =
collectColumnRangeMetadata(recordList, fieldsToIndex, stat.getPath());

stat.setRecordsStats(columnRangesMetadataMap);
stat.putRecordsStats(columnRangesMetadataMap);
}

resetWriteCounts();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.util.FlinkClientUtil;
import org.apache.hudi.util.WriteStatMerger;

import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -122,7 +123,13 @@ protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
@Override
public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
List<HoodieWriteStat> writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
// for eager flush, multiple write stat may share one file path.
List<HoodieWriteStat> merged = writeStats.stream()
.collect(Collectors.groupingBy(writeStat -> writeStat.getPartitionPath() + writeStat.getPath()))
.values().stream()
.map(duplicates -> duplicates.stream().reduce(WriteStatMerger::merge).get())
.collect(Collectors.toList());
return commitStats(instantTime, merged, extraMetadata, commitActionType, partitionToReplacedFileIds);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.util;

import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Helper clazz to merge hoodie write stats that belong to one file path.
*
* <p>CAUTION: The merge can be buggy, we need to maintain the new variables for the write stat.
*/
public class WriteStatMerger {
public static HoodieWriteStat merge(HoodieWriteStat stat1, HoodieWriteStat stat2) {
if (stat1 instanceof HoodieDeltaWriteStat) {
return mergeDeltaWriteStat((HoodieDeltaWriteStat) stat1, (HoodieDeltaWriteStat) stat2);
}
return mergeWriteStat(new HoodieWriteStat(), stat1, stat2);
}

private static HoodieDeltaWriteStat mergeDeltaWriteStat(
HoodieDeltaWriteStat stat1,
HoodieDeltaWriteStat stat2) {
HoodieDeltaWriteStat merged = new HoodieDeltaWriteStat();
mergeWriteStat(merged, stat1, stat2);
merged.setLogVersion(stat2.getLogVersion());
merged.setLogOffset(stat2.getLogOffset());
merged.setBaseFile(stat2.getBaseFile());
// log files
List<String> mergedLogFiles = new ArrayList<>(stat1.getLogFiles());
for (String logFile : stat2.getLogFiles()) {
if (!mergedLogFiles.contains(logFile)) {
mergedLogFiles.add(logFile);
}
}
merged.setLogFiles(mergedLogFiles);
// column stats
if (stat1.getColumnStats().isPresent()) {
merged.putRecordsStats(stat1.getColumnStats().get());
}
if (stat2.getColumnStats().isPresent()) {
merged.putRecordsStats(stat2.getColumnStats().get());
}
return merged;
}

private static HoodieWriteStat mergeWriteStat(HoodieWriteStat merged, HoodieWriteStat stat1, HoodieWriteStat stat2) {
merged.setFileId(stat2.getFileId());
merged.setPath(stat2.getPath());
// merge cdc stats
merged.setCdcStats(getMergedCdcStats(stat1.getCdcStats(), stat2.getCdcStats()));
// prev commit
merged.setPrevCommit(stat2.getPrevCommit());

merged.setNumWrites(stat2.getNumWrites() + stat1.getNumWrites());
merged.setNumDeletes(stat2.getNumDeletes() + stat1.getNumDeletes());
merged.setNumUpdateWrites(stat2.getNumUpdateWrites() + stat1.getNumUpdateWrites());
merged.setNumInserts(stat2.getNumInserts() + stat1.getNumInserts());
merged.setTotalWriteBytes(stat2.getTotalWriteBytes() + stat1.getTotalWriteBytes());
merged.setTotalWriteErrors(stat2.getTotalWriteErrors() + stat1.getTotalWriteErrors());

// -------------------------------------------------------------------------
// Nullable
// -------------------------------------------------------------------------

// tmp path
merged.setTempPath(stat2.getTempPath());
// partition path
merged.setPartitionPath(stat2.getPartitionPath());
// runtime stats
merged.setRuntimeStats(getMergedRuntimeStats(stat1.getRuntimeStats(), stat2.getRuntimeStats()));

// log statistics
merged.setTotalLogRecords(stat2.getTotalLogRecords() + stat1.getTotalLogRecords());
merged.setTotalLogFilesCompacted(stat2.getTotalLogFilesCompacted() + stat1.getTotalLogFilesCompacted());
merged.setTotalLogSizeCompacted(stat2.getTotalLogSizeCompacted() + stat1.getTotalLogSizeCompacted());
merged.setTotalUpdatedRecordsCompacted(stat2.getTotalUpdatedRecordsCompacted() + stat1.getTotalUpdatedRecordsCompacted());
merged.setTotalLogBlocks(stat2.getTotalLogBlocks() + stat1.getTotalLogBlocks());
merged.setTotalCorruptLogBlock(stat2.getTotalCorruptLogBlock() + stat1.getTotalCorruptLogBlock());
merged.setTotalRollbackBlocks(stat2.getTotalRollbackBlocks() + stat1.getTotalRollbackBlocks());
merged.setFileSizeInBytes(stat2.getFileSizeInBytes() + stat1.getFileSizeInBytes());
// event time
merged.setMinEventTime(minLong(stat1.getMinEventTime(), stat2.getMinEventTime()));
merged.setMaxEventTime(maxLong(stat1.getMaxEventTime(), stat2.getMaxEventTime()));
return stat2;
}

private static HoodieWriteStat.RuntimeStats getMergedRuntimeStats(
HoodieWriteStat.RuntimeStats runtimeStats1,
HoodieWriteStat.RuntimeStats runtimeStats2) {
final HoodieWriteStat.RuntimeStats runtimeStats;
if (runtimeStats1 != null && runtimeStats2 != null) {
runtimeStats = new HoodieWriteStat.RuntimeStats();
runtimeStats.setTotalScanTime(runtimeStats1.getTotalScanTime() + runtimeStats2.getTotalScanTime());
runtimeStats.setTotalUpsertTime(runtimeStats1.getTotalUpsertTime() + runtimeStats2.getTotalUpsertTime());
runtimeStats.setTotalCreateTime(runtimeStats1.getTotalCreateTime() + runtimeStats2.getTotalCreateTime());
} else if (runtimeStats1 == null) {
runtimeStats = runtimeStats2;
} else {
runtimeStats = runtimeStats1;
}
return runtimeStats;
}

private static Map<String, Long> getMergedCdcStats(Map<String, Long> cdcStats1, Map<String, Long> cdcStats2) {
final Map<String, Long> cdcStats;
if (cdcStats1 != null && cdcStats2 != null) {
cdcStats = new HashMap<>();
cdcStats.putAll(cdcStats1);
cdcStats.putAll(cdcStats2);
} else if (cdcStats1 == null) {
cdcStats = cdcStats2;
} else {
cdcStats = cdcStats1;
}
return cdcStats;
}

private static Long minLong(Long v1, Long v2) {
if (v1 == null) {
return v2;
}
if (v2 == null) {
return v1;
}
return v1.compareTo(v2) < 0 ? v1 : v2;
}

private static Long maxLong(Long v1, Long v2) {
if (v1 == null) {
return v2;
}
if (v2 == null) {
return v1;
}
return v1.compareTo(v2) > 0 ? v1 : v2;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,41 @@ public static HoodieColumnRangeMetadata<Comparable> stub(String filePath,
String columnName) {
return new HoodieColumnRangeMetadata<>(filePath, columnName, null, null, -1, -1, -1, -1);
}

/**
* Merges the given two column range metadata.
*/
public static HoodieColumnRangeMetadata<Comparable> merge(
HoodieColumnRangeMetadata<Comparable> left,
HoodieColumnRangeMetadata<Comparable> right) {
String filePath = left.getFilePath();
String columnName = left.getColumnName();
Comparable min = minVal(left.getMinValue(), right.getMinValue());
Comparable max = maxVal(left.getMaxValue(), right.getMaxValue());
long nullCount = left.getNullCount() + right.getNullCount();
long valueCount = left.getValueCount() + right.getValueCount();
long totalSize = left.getTotalSize() + right.getTotalSize();
long totalUncompressedSize = left.getTotalUncompressedSize() + right.getTotalUncompressedSize();
return create(filePath, columnName, min, max, nullCount, valueCount, totalSize, totalUncompressedSize);
}

private static Comparable minVal(Comparable val1, Comparable val2) {
if (val1 == null) {
return val2;
}
if (val2 == null) {
return val1;
}
return val1.compareTo(val2) < 0 ? val1 : val2;
}

private static Comparable maxVal(Comparable val1, Comparable val2) {
if (val1 == null) {
return val2;
}
if (val2 == null) {
return val1;
}
return val1.compareTo(val2) > 0 ? val1 : val2;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.util.Option;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -74,11 +75,35 @@ public List<String> getLogFiles() {
return logFiles;
}

public void putRecordsStats(Map<String, HoodieColumnRangeMetadata<Comparable>> stats) {
if (!recordsStats.isPresent()) {
recordsStats = Option.of(stats);
} else {
// in case there are multiple log blocks for one write process.
recordsStats = Option.of(mergeRecordsStats(recordsStats.get(), stats));
}
}

// keep for serialization efficiency
public void setRecordsStats(Map<String, HoodieColumnRangeMetadata<Comparable>> stats) {
recordsStats = Option.of(stats);
}

public Option<Map<String, HoodieColumnRangeMetadata<Comparable>>> getColumnStats() {
return recordsStats;
}

private static Map<String, HoodieColumnRangeMetadata<Comparable>> mergeRecordsStats(
Map<String, HoodieColumnRangeMetadata<Comparable>> stats1,
Map<String, HoodieColumnRangeMetadata<Comparable>> stats2) {
Map<String, HoodieColumnRangeMetadata<Comparable>> mergedStats = new HashMap<>(stats1);
for (Map.Entry<String, HoodieColumnRangeMetadata<Comparable>> entry : stats2.entrySet()) {
final String colName = entry.getKey();
final HoodieColumnRangeMetadata<Comparable> metadata = mergedStats.containsKey(colName)
? HoodieColumnRangeMetadata.merge(mergedStats.get(colName), entry.getValue())
: entry.getValue();
mergedStats.put(colName, metadata);
}
return mergedStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,51 @@ void testWriteAndReadWithDataSkipping(HoodieTableType tableType) {
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}

@Test
void testMultipleLogBlocksWithDataSkipping() {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.METADATA_ENABLED, true)
.option("hoodie.metadata.index.column.stats.enable", true)
.option("hoodie.metadata.index.column.stats.file.group.count", 2)
.option("hoodie.metadata.index.column.stats.column.list", "ts")
.option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
.option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
.option("hoodie.logfile.data.block.max.size", 1)
.end();
tableEnv.executeSql(hoodieTableDDL);

execInsertSql(tableEnv, TestSQL.INSERT_SAME_KEY_T1);

// apply filters
List<Row> result2 = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1 where ts > TIMESTAMP '1970-01-01 00:00:04'").execute().collect());
assertRowsEquals(result2, "[+I[id1, Danny, 23, 1970-01-01T00:00:05, par1]]");
}

@Test
void testEagerFlushWithDataSkipping() {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.METADATA_ENABLED, true)
.option("hoodie.metadata.index.column.stats.enable", true)
.option("hoodie.metadata.index.column.stats.file.group.count", 2)
.option("hoodie.metadata.index.column.stats.column.list", "ts")
.option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
.option(FlinkOptions.WRITE_BATCH_SIZE, 0.00001)
.end();
tableEnv.executeSql(hoodieTableDDL);

execInsertSql(tableEnv, TestSQL.INSERT_SAME_KEY_T1);

// apply filters
List<Row> result2 = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1 where ts > TIMESTAMP '1970-01-01 00:00:04'").execute().collect());
assertRowsEquals(result2, "[+I[id1, Danny, 23, 1970-01-01T00:00:05, par1]]");
}

@Test
void testBuiltinFunctionWithHMSCatalog() {
TableEnvironment tableEnv = batchTableEnv;
Expand Down

0 comments on commit 265fef6

Please sign in to comment.