Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2793] Fixing deltastreamer checkpoint fetch/copy over #4034

Merged
merged 4 commits into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@

package org.apache.hudi.client.utils;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
import org.apache.hudi.client.transaction.ConcurrentOperation;
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand All @@ -34,9 +32,15 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.table.HoodieTable;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class TransactionUtils {
Expand All @@ -45,6 +49,7 @@ public class TransactionUtils {

/**
* Resolve any write conflicts when committing data.
*
* @param table
* @param currentTxnOwnerInstant
* @param thisCommitMetadata
Expand All @@ -54,11 +59,11 @@ public class TransactionUtils {
* @throws HoodieWriteConflictException
*/
public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(
final HoodieTable table,
final Option<HoodieInstant> currentTxnOwnerInstant,
final Option<HoodieCommitMetadata> thisCommitMetadata,
final HoodieWriteConfig config,
Option<HoodieInstant> lastCompletedTxnOwnerInstant) throws HoodieWriteConflictException {
final HoodieTable table,
final Option<HoodieInstant> currentTxnOwnerInstant,
final Option<HoodieCommitMetadata> thisCommitMetadata,
final HoodieWriteConfig config,
Option<HoodieInstant> lastCompletedTxnOwnerInstant) throws HoodieWriteConflictException {
if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy();
Stream<HoodieInstant> instantStream = resolutionStrategy.getCandidateInstants(table.getActiveTimeline(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant);
Expand All @@ -68,7 +73,7 @@ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(
ConcurrentOperation otherOperation = new ConcurrentOperation(instant, table.getMetaClient());
if (resolutionStrategy.hasConflict(thisOperation, otherOperation)) {
LOG.info("Conflict encountered between current instant = " + thisOperation + " and instant = "
+ otherOperation + ", attempting to resolve it...");
+ otherOperation + ", attempting to resolve it...");
resolutionStrategy.resolveConflict(table, thisOperation, otherOperation);
}
} catch (IOException io) {
Expand All @@ -88,33 +93,42 @@ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(

/**
* Get the last completed transaction hoodie instant and {@link HoodieCommitMetadata#getExtraMetadata()}.
*
* @param metaClient
* @return
*/
public static Option<Pair<HoodieInstant, Map<String, String>>> getLastCompletedTxnInstantAndMetadata(
HoodieTableMetaClient metaClient) {
Option<HoodieInstant> hoodieInstantOption = metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
try {
if (hoodieInstantOption.isPresent()) {
switch (hoodieInstantOption.get().getAction()) {
case HoodieTimeline.REPLACE_COMMIT_ACTION:
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstantOption.get()).get(), HoodieReplaceCommitMetadata.class);
return Option.of(Pair.of(hoodieInstantOption.get(), replaceCommitMetadata.getExtraMetadata()));
case HoodieTimeline.DELTA_COMMIT_ACTION:
case HoodieTimeline.COMMIT_ACTION:
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstantOption.get()).get(), HoodieCommitMetadata.class);
return Option.of(Pair.of(hoodieInstantOption.get(), commitMetadata.getExtraMetadata()));
default:
throw new IllegalArgumentException("Unknown instant action" + hoodieInstantOption.get().getAction());
List<HoodieInstant> hoodieInstants = metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toList());
if (!hoodieInstants.isEmpty()) {
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
for (HoodieInstant hoodieInstant : hoodieInstants) {
try {
switch (hoodieInstant.getAction()) {
case HoodieTimeline.REPLACE_COMMIT_ACTION:
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class);
return Option.of(Pair.of(hoodieInstant, replaceCommitMetadata.getExtraMetadata()));
case HoodieTimeline.DELTA_COMMIT_ACTION:
case HoodieTimeline.COMMIT_ACTION:
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
if (commitMetadata.getOperationType() != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the operation type is null, should it still return the instant and the metadata?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this probably is older commits?

&& !commitMetadata.getOperationType().equals(WriteOperationType.UNKNOWN)
&& !commitMetadata.getOperationType().equals(WriteOperationType.COMPACT)) { // skip compaction instants
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
return Option.of(Pair.of(hoodieInstant, commitMetadata.getExtraMetadata()));
}
break;
default:
throw new IllegalArgumentException("Unknown instant action" + hoodieInstant.getAction());
}
} catch (IOException io) {
throw new HoodieIOException("Unable to read metadata for instant " + hoodieInstant, io);
}
} else {
return Option.empty();
}
} catch (IOException io) {
throw new HoodieIOException("Unable to read metadata for instant " + hoodieInstantOption.get(), io);
return Option.empty();
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
} else {
return Option.empty();
}
}

Expand All @@ -125,9 +139,10 @@ protected static void mergeCheckpointStateFromPreviousCommit(HoodieTableMetaClie
/**
* Generic method allowing us to override the current metadata with the metadata from
* the latest instant for the specified key prefixes.
*
* @param metaClient
* @param thisMetadata
* @param keyPrefixes The key prefixes to merge from the previous commit
* @param keyPrefixes The key prefixes to merge from the previous commit
*/
private static void overrideWithLatestCommitMetadata(HoodieTableMetaClient metaClient,
Option<HoodieCommitMetadata> thisMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand All @@ -30,7 +31,9 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.io.IOException;
Expand All @@ -49,8 +52,9 @@ public void init() throws Exception {
metaClient.getFs().mkdirs(new Path(basePath));
}

@Test
public void testCheckpointStateMerge() throws IOException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCheckpointStateMerge(boolean testCompaction) throws IOException {
HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient);

// Create completed commit with deltastreamer checkpoint state
Expand All @@ -62,13 +66,28 @@ public void testCheckpointStateMerge() throws IOException {
timeline.createNewInstant(commitInstantWithCheckpointState);

HoodieCommitMetadata metadataWithCheckpoint = new HoodieCommitMetadata();
metadataWithCheckpoint.setOperationType(WriteOperationType.INSERT);
String checkpointVal = "00001";
metadataWithCheckpoint.addMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY, checkpointVal);
timeline.saveAsComplete(
commitInstantWithCheckpointState,
Option.of(metadataWithCheckpoint.toJsonString().getBytes(StandardCharsets.UTF_8))
);

if (testCompaction) {
HoodieInstant compactionInstant = new HoodieInstant(
true,
HoodieTimeline.COMPACTION_ACTION,
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
HoodieActiveTimeline.createNewInstantTime()
);
timeline.createNewInstant(compactionInstant);
HoodieCommitMetadata compactionMeta = new HoodieCommitMetadata(true);
compactionMeta.setOperationType(WriteOperationType.COMPACT);
timeline.saveAsComplete(
compactionInstant,
Option.of(compactionMeta.toJsonString().getBytes(StandardCharsets.UTF_8)));
}

// Inflight commit without checkpoint metadata
HoodieInstant commitInstantWithoutCheckpointState = new HoodieInstant(
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.keygen.KeyGenerator;
Expand Down Expand Up @@ -339,11 +340,17 @@ public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(
resumeCheckpointStr = Option.empty();
} else if (HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this table "
+ "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :"
+ commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata="
+ commitMetadata.toJsonString());
// if previous commit metadata did not have the checkpoint key, try traversing previous commits until we find one.
Option<String> prevCheckpoint = getPreviousCheckpoint(commitTimelineOpt.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anyway to do Option.orThrow or some pattern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not find a easier way to get this one. Have fixed all other feedback.

if (prevCheckpoint.isPresent()) {
resumeCheckpointStr = prevCheckpoint;
} else {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this table "
+ "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :"
+ commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata="
+ commitMetadata.toJsonString());
}
}
// KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
Expand Down Expand Up @@ -451,6 +458,22 @@ public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(
return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
}

private Option<String> getPreviousCheckpoint(HoodieTimeline timeline) throws IOException {
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
return (Option<String>) timeline.getReverseOrderedInstants().map(instant -> {
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimelineOpt.get().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
return Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
} else {
return Option.empty();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + instant.toString(), e);
}
}).filter(Option::isPresent).findFirst().orElse(Option.empty());
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive if needed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.spark.sql.SaveMode;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

Expand Down Expand Up @@ -155,7 +156,16 @@ void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) thr

@ParameterizedTest
@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE"})
void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception {
public void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception {
testCheckpointCarryOver(tableType, false);
}

@Test
public void testGapsInDeltaStreamerCheckpoints() throws Exception {
testCheckpointCarryOver(HoodieTableType.COPY_ON_WRITE, true);
}

private void testCheckpointCarryOver(HoodieTableType tableType, boolean addGaps) throws Exception {
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
setUpTestTable(tableType);
prepareInitialConfigs(fs(), basePath, "foo");
Expand Down Expand Up @@ -196,34 +206,59 @@ void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType)
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc());
backfillJob.sync();

meta.reloadActiveTimeline();
int totalCommits = meta.getCommitsTimeline().filterCompletedInstants().countInstants();

// Save the checkpoint information from the deltastreamer run and perform next write
String checkpointAfterDeltaSync = getLatestMetadata(meta).getMetadata(CHECKPOINT_KEY);
// this writer will enable HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key() so that deltastreamer checkpoint will be carried over.
performWriteWithDeltastreamerStateMerge();
doSparkWriteWithDeltastreamerStateMerge(true);// Verify that the checkpoint is carried over
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
verifyCommitMetadataCheckpoint(meta, checkpointAfterDeltaSync);
if (addGaps) {
// add a spark data source write which will not carry over deltastreamer checkpoint.
doSparkWriteWithDeltastreamerStateMerge(false);
verifyCommitMetadataCheckpoint(meta, null);
}
meta.reloadActiveTimeline();
Assertions.assertEquals(totalCommits + ((addGaps) ? 2 : 1), meta.getCommitsTimeline().filterCompletedInstants().countInstants());

cfgBackfillJob.checkpoint = null;
new HoodieDeltaStreamer(cfgBackfillJob, jsc()).sync(); //nothing to commit, but checkpoint parsing will fail if not found in previous commits.
meta.reloadActiveTimeline();
Assertions.assertEquals(totalCommits + ((addGaps) ? 3 : 2), meta.getCommitsTimeline().filterCompletedInstants().countInstants());
verifyCommitMetadataCheckpoint(meta, "00008");
}

// Verify that the checkpoint is carried over
HoodieCommitMetadata commitMetaAfterDatasourceWrite = getLatestMetadata(meta);
Assertions.assertEquals(checkpointAfterDeltaSync, commitMetaAfterDatasourceWrite.getMetadata(CHECKPOINT_KEY));
private void verifyCommitMetadataCheckpoint(HoodieTableMetaClient metaClient, String expectedCheckpoint) throws IOException {
HoodieCommitMetadata commitMeta = getLatestMetadata(metaClient);
if (expectedCheckpoint == null) {
Assertions.assertNull(commitMeta.getMetadata(CHECKPOINT_KEY));
} else {
Assertions.assertEquals(expectedCheckpoint, commitMeta.getMetadata(CHECKPOINT_KEY));
}
}

/**
* Performs a hudi datasource write with deltastreamer state merge enabled.
*/
private void performWriteWithDeltastreamerStateMerge() {
private void doSparkWriteWithDeltastreamerStateMerge(boolean mergeState) {
spark().read()
.format("hudi")
.load(tableBasePath + "/*/*.parquet")
.limit(1)
.write()
.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key(), COW_TEST_TABLE_NAME)
.option(HoodieWriteConfig.TBL_NAME.key(), "hoodie_trips")
.option(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())
.option(DataSourceWriteOptions.INSERT_DROP_DUPS().key(), "true")
.option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp")
.option(HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key(), "true")
.option(HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key(), Boolean.toString(mergeState))
.option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass")
.option("hoodie.write.lock.filesystem.path", tableBasePath)
.option("hoodie.cleaner.policy.failed.writes","LAZY")
.option("hoodie.write.concurrency.mode","OPTIMISTIC_CONCURRENCY_CONTROL")
.mode(SaveMode.Append)
.save(tableBasePath + "/*/*.parquet");
.save(tableBasePath);
}

private static HoodieCommitMetadata getLatestMetadata(HoodieTableMetaClient meta) throws IOException {
Expand Down