Skip to content

Commit

Permalink
[HUDI-1794] Moved static COMMIT_FORMATTER to thread local variable as…
Browse files Browse the repository at this point in the history
… SimpleDateFormat is not thread safe. (#2819)
  • Loading branch information
prashantwason authored Nov 5, 2021
1 parent 3af6568 commit b7ee341
Show file tree
Hide file tree
Showing 19 changed files with 194 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static long countNewRecords(HoodieTableMetaClient target, List<String> co

public static String getTimeDaysAgo(int numberOfDays) {
Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
return HoodieActiveTimeline.COMMIT_FORMATTER.format(date);
return HoodieActiveTimeline.formatInstantTime(date);
}

/**
Expand All @@ -61,8 +61,8 @@ public static String getTimeDaysAgo(int numberOfDays) {
* b) hours: -1, returns 20200202010000
*/
public static String addHours(String compactionCommitTime, int hours) throws ParseException {
Instant instant = HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).toInstant();
Instant instant = HoodieActiveTimeline.parseInstantTime(compactionCommitTime).toInstant();
ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
return HoodieActiveTimeline.COMMIT_FORMATTER.format(Date.from(commitDateTime.plusHours(hours).toInstant()));
return HoodieActiveTimeline.formatInstantTime(Date.from(commitDateTime.plusHours(hours).toInstant()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String

if (writeTimer != null) {
long durationInMs = metrics.getDurationInMs(writeTimer.stop());
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(instantTime).getTime(), durationInMs,
metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(instantTime).getTime(), durationInMs,
metadata, actionType);
writeTimer = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy)
private Long parsedToSeconds(String time) {
long timestamp;
try {
timestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(time).getTime() / 1000;
timestamp = HoodieActiveTimeline.parseInstantTime(time).getTime() / 1000;
} catch (ParseException e) {
throw new HoodieCompactionException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ public void completeCompaction(
if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(compactionCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteSt
if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(compactionCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
Expand Down Expand Up @@ -396,7 +396,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<Wr
if (clusteringTimer != null) {
long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(clusteringCommitTime).getTime(),
metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(clusteringCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ public boolean rollbackCommit(String instantTime) {
BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) {
final RateLimiter limiter = RateLimiter.create(multiPutBatchSize, TimeUnit.SECONDS);

Long rollbackTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse(instantTime).getTime();
Long rollbackTime = HoodieActiveTimeline.parseInstantTime(instantTime).getTime();
Long currentTime = new Date().getTime();
Scan scan = new Scan();
scan.addFamily(SYSTEM_COLUMN_FAMILY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.text.ParseException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

/**
Expand All @@ -59,8 +59,6 @@
*/
public class HoodieActiveTimeline extends HoodieDefaultTimeline {

public static final SimpleDateFormat COMMIT_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmss");

public static final Set<String> VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = new HashSet<>(Arrays.asList(
COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION,
DELTA_COMMIT_EXTENSION, INFLIGHT_DELTA_COMMIT_EXTENSION, REQUESTED_DELTA_COMMIT_EXTENSION,
Expand All @@ -72,28 +70,44 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION));
private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
protected HoodieTableMetaClient metaClient;
private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE));

/**
* Returns next instant time in the {@link #COMMIT_FORMATTER} format.
* Parse the timestamp of an Instant and return a {@code SimpleDateFormat}.
*/
public static Date parseInstantTime(String timestamp) throws ParseException {
return HoodieInstantTimeGenerator.parseInstantTime(timestamp);
}

/**
* Format the java.time.Instant to a String representing the timestamp of a Hoodie Instant.
*/
public static String formatInstantTime(Instant timestamp) {
return HoodieInstantTimeGenerator.formatInstantTime(timestamp);
}

/**
* Format the Date to a String representing the timestamp of a Hoodie Instant.
*/
public static String formatInstantTime(Date timestamp) {
return HoodieInstantTimeGenerator.formatInstantTime(timestamp);
}

/**
* Returns next instant time in the correct format.
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
*/
public static String createNewInstantTime() {
return createNewInstantTime(0);
return HoodieInstantTimeGenerator.createNewInstantTime(0);
}

/**
* Returns next instant time that adds N milliseconds in the {@link #COMMIT_FORMATTER} format.
* Returns next instant time that adds N milliseconds to current time.
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
*
* @param milliseconds Milliseconds to add to current time while generating the new instant time
*/
public static String createNewInstantTime(long milliseconds) {
return lastInstantTime.updateAndGet((oldVal) -> {
String newCommitTime;
do {
newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(System.currentTimeMillis() + milliseconds));
} while (HoodieTimeline.compareTimestamps(newCommitTime, LESSER_THAN_OR_EQUALS, oldVal));
return newCommitTime;
});
return HoodieInstantTimeGenerator.createNewInstantTime(milliseconds);
}

protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set<String> includedExtensions) {
Expand Down Expand Up @@ -129,6 +143,7 @@ public HoodieActiveTimeline(HoodieTableMetaClient metaClient, boolean applyLayou
*
* @deprecated
*/
@Deprecated
public HoodieActiveTimeline() {
}

Expand All @@ -137,6 +152,7 @@ public HoodieActiveTimeline() {
*
* @deprecated
*/
@Deprecated
private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.table.timeline;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.time.temporal.TemporalAccessor;
import java.util.Date;
import java.util.concurrent.atomic.AtomicReference;

/**
* Utility class to generate and parse timestamps used in Instants.
*/
public class HoodieInstantTimeGenerator {
// Format of the timestamp used for an Instant
private static final String INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmss";
// Formatter to generate Instant timestamps
private static DateTimeFormatter INSTANT_TIME_FORMATTER = DateTimeFormatter.ofPattern(INSTANT_TIMESTAMP_FORMAT);
// The last Instant timestamp generated
private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE));
private static final String ALL_ZERO_TIMESTAMP = "00000000000000";

/**
* Returns next instant time that adds N milliseconds to the current time.
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
*
* @param milliseconds Milliseconds to add to current time while generating the new instant time
*/
public static String createNewInstantTime(long milliseconds) {
return lastInstantTime.updateAndGet((oldVal) -> {
String newCommitTime;
do {
Date d = new Date(System.currentTimeMillis() + milliseconds);
newCommitTime = INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d));
} while (HoodieTimeline.compareTimestamps(newCommitTime, HoodieActiveTimeline.LESSER_THAN_OR_EQUALS, oldVal));
return newCommitTime;
});
}

public static Date parseInstantTime(String timestamp) {
try {
LocalDateTime dt = LocalDateTime.parse(timestamp, INSTANT_TIME_FORMATTER);
return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant());
} catch (DateTimeParseException e) {
// Special handling for all zero timestamp which is not parsable by DateTimeFormatter
if (timestamp.equals(ALL_ZERO_TIMESTAMP)) {
return new Date(0);
}

throw e;
}
}

public static String formatInstantTime(Instant timestamp) {
return INSTANT_TIME_FORMATTER.format(timestamp);
}

public static String formatInstantTime(Date timestamp) {
return INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(timestamp));
}

private static TemporalAccessor convertDateToTemporalAccessor(Date d) {
return d.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
* {@link org.apache.hudi.common.table.timeline.HoodieTimeline#INIT_INSTANT_TS}, such that the metadata table
* can be prepped even before bootstrap is done.
*/
String SOLO_COMMIT_TIMESTAMP = "0000000000000";
String SOLO_COMMIT_TIMESTAMP = "00000000000000";
// Key for the record which saves list of all partitions
String RECORDKEY_PARTITION_LIST = "__all_partitions__";
// The partition name used for non-partitioned tables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -51,7 +52,6 @@
import java.util.stream.Stream;

import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -79,14 +79,14 @@ public void setUp() throws IOException {

@Test
public void testMakeDataFileName() {
String instantTime = COMMIT_FORMATTER.format(new Date());
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
String fileName = UUID.randomUUID().toString();
assertEquals(FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName), fileName + "_" + TEST_WRITE_TOKEN + "_" + instantTime + BASE_FILE_EXTENSION);
}

@Test
public void testMaskFileName() {
String instantTime = COMMIT_FORMATTER.format(new Date());
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
int taskPartitionId = 2;
assertEquals(FSUtils.maskWithoutFileId(instantTime, taskPartitionId), "*_" + taskPartitionId + "_" + instantTime + BASE_FILE_EXTENSION);
}
Expand Down Expand Up @@ -154,7 +154,7 @@ public void testProcessFiles() throws Exception {

@Test
public void testGetCommitTime() {
String instantTime = COMMIT_FORMATTER.format(new Date());
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
String fileName = UUID.randomUUID().toString();
String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName);
assertEquals(instantTime, FSUtils.getCommitTime(fullFileName));
Expand All @@ -165,7 +165,7 @@ public void testGetCommitTime() {

@Test
public void testGetFileNameWithoutMeta() {
String instantTime = COMMIT_FORMATTER.format(new Date());
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
String fileName = UUID.randomUUID().toString();
String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName);
assertEquals(fileName, FSUtils.getFileId(fullFileName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
package org.apache.hudi.common.model;

import org.apache.hudi.common.fs.FSUtils;

import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;

import java.util.Date;
import java.util.UUID;

import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

Expand All @@ -37,7 +36,7 @@ public class TestHoodieWriteStat {

@Test
public void testSetPaths() {
String instantTime = COMMIT_FORMATTER.format(new Date());
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
String basePathString = "/data/tables/some-hoodie-table";
String partitionPathString = "2017/12/31";
String fileName = UUID.randomUUID().toString();
Expand Down
Loading

0 comments on commit b7ee341

Please sign in to comment.