Skip to content

Commit

Permalink
[minor] Some code refactoring for LogFileComparator and Instant insta…
Browse files Browse the repository at this point in the history
…ntiation (apache#5600)
  • Loading branch information
danny0405 authored and yihua committed Jun 3, 2022
1 parent 7ad732e commit 1d7f9e4
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -77,7 +76,7 @@ public void completeInflightCompaction(HoodieTable table, String compactionCommi
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
try {
activeTimeline.transitionCompactionInflightToComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime),
HoodieTimeline.getCompactionInflightInstant(compactionCommitTime),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
throw new HoodieCompactionException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public void completeCompaction(
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
try {
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
finalizeWrite(table, compactionCommitTime, writeStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata,
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
try {
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
finalizeWrite(table, compactionCommitTime, writeStats);
Expand Down Expand Up @@ -382,7 +382,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
}

final HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
final HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime);
try {
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());

Expand All @@ -393,7 +393,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);

table.getActiveTimeline().transitionReplaceInflightToComplete(
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
clusteringInstant,
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public class HoodieLogFile implements Serializable {
public static final String DELTA_EXTENSION = ".log";
public static final Integer LOGFILE_BASE_VERSION = 1;

private static final Comparator<HoodieLogFile> LOG_FILE_COMPARATOR = new LogFileComparator();
private static final Comparator<HoodieLogFile> LOG_FILE_COMPARATOR_REVERSED = new LogFileComparator().reversed();

private transient FileStatus fileStatus;
private final String pathStr;
private long fileLen;
Expand Down Expand Up @@ -129,11 +132,11 @@ public HoodieLogFile rollOver(FileSystem fs, String logWriteToken) throws IOExce
}

public static Comparator<HoodieLogFile> getLogFileComparator() {
return new LogFileComparator();
return LOG_FILE_COMPARATOR;
}

public static Comparator<HoodieLogFile> getReverseLogFileComparator() {
return new LogFileComparator().reversed();
return LOG_FILE_COMPARATOR_REVERSED;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@

import java.util.Map;

/**
* Utilities for fetching hadoop configurations.
*/
public class HadoopConfigurations {
private static final String HADOOP_PREFIX = "hadoop.";
private static final String PARQUET_PREFIX = "parquet.";

/**
* Creates a merged hadoop configuration with given flink configuration and hadoop configuration.
*/
public static org.apache.hadoop.conf.Configuration getParquetConf(
org.apache.flink.configuration.Configuration options,
org.apache.hadoop.conf.Configuration hadoopConf) {
Expand All @@ -37,12 +43,12 @@ public static org.apache.hadoop.conf.Configuration getParquetConf(
}

/**
* Create a new hadoop configuration that is initialized with the given flink configuration.
* Creates a new hadoop configuration that is initialized with the given flink configuration.
*/
public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration conf) {
org.apache.hadoop.conf.Configuration hadoopConf = FlinkClientUtil.getHadoopConf();
Map<String, String> options = FlinkOptions.getPropertiesWithPrefix(conf.toMap(), HADOOP_PREFIX);
options.forEach((k, v) -> hadoopConf.set(k, v));
options.forEach(hadoopConf::set);
return hadoopConf;
}
}

0 comments on commit 1d7f9e4

Please sign in to comment.