Skip to content

Commit

Permalink
[HUDI-5242] Do not fail Meta sync in Deltastreamer when inline table …
Browse files Browse the repository at this point in the history
…service fails (#7243)

After the files are written, table services like clustering and compaction can fail. This causes the sync to the metaserver to not happen. This patch adds a config that when set to false, the deltastreamer will not fail and the sync to the metaserver will occur. A warning will be logged with the exception that occurred. To use this new behavior, set hoodie.fail.writes.on.inline.table.service.exception to false.

Co-authored-by: Jonathan Vexler <=>
  • Loading branch information
jonvex authored Nov 29, 2022
1 parent b31ef12 commit 1cdbf68
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,21 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
} finally {
this.txnManager.endTransaction(Option.of(inflightInstant));
}
// do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period
runTableServicesInline(table, metadata, extraMetadata);

// We don't want to fail the commit if hoodie.fail.writes.on.inline.table.service.exception is false. We catch warn if false
try {
// do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period
runTableServicesInline(table, metadata, extraMetadata);
} catch (Exception e) {
if (config.isFailOnInlineTableServiceExceptionEnabled()) {
throw e;
}
LOG.warn("Inline compaction or clustering failed with exception: " + e.getMessage()
+ ". Moving further since \"hoodie.fail.writes.on.inline.table.service.exception\" is set to false.");
}

emitCommitMetrics(instantTime, metadata, commitActionType);

// callback if needed.
if (config.writeCommitCallbackOn()) {
if (null == commitCallback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Timeline archiving removes older instants from the timeline, after each write operation, to minimize metadata overhead. "
+ "Controls whether or not, the write should be failed as well, if such archiving fails.");

public static final ConfigProperty<String> FAIL_ON_INLINE_TABLE_SERVICE_EXCEPTION = ConfigProperty
.key("hoodie.fail.writes.on.inline.table.service.exception")
.defaultValue("true")
.withDocumentation("Table services such as compaction and clustering can fail and prevent syncing to "
+ "the metaclient. Set this to true to fail writes when table services fail");

public static final ConfigProperty<Long> INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty
.key("hoodie.consistency.check.initial_interval_ms")
.defaultValue(2000L)
Expand Down Expand Up @@ -1150,6 +1156,10 @@ public boolean isFailOnTimelineArchivingEnabled() {
return getBoolean(FAIL_ON_TIMELINE_ARCHIVING_ENABLE);
}

public boolean isFailOnInlineTableServiceExceptionEnabled() {
return getBoolean(FAIL_ON_INLINE_TABLE_SERVICE_EXCEPTION);
}

public int getMaxConsistencyChecks() {
return getInt(MAX_CONSISTENCY_CHECKS);
}
Expand Down Expand Up @@ -2354,6 +2364,11 @@ public Builder withDeleteParallelism(int parallelism) {
return this;
}

public Builder withFailureOnInlineTableServiceException(boolean fail) {
writeConfig.setValue(FAIL_ON_INLINE_TABLE_SERVICE_EXCEPTION, String.valueOf(fail));
return this;
}

public Builder withParallelism(int insertShuffleParallelism, int upsertShuffleParallelism) {
writeConfig.setValue(INSERT_PARALLELISM_VALUE, String.valueOf(insertShuffleParallelism));
writeConfig.setValue(UPSERT_PARALLELISM_VALUE, String.valueOf(upsertShuffleParallelism));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieUpsertException;
Expand Down Expand Up @@ -1754,30 +1755,62 @@ private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig cluste
return allRecords.getLeft().getLeft();
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testFailWritesOnInlineTableServiceExceptions(boolean shouldFail) throws IOException {
try {
Properties properties = new Properties();
properties.setProperty("hoodie.fail.writes.on.inline.table.service.exception", String.valueOf(shouldFail));
properties.setProperty("hoodie.auto.commit", "false");
properties.setProperty("hoodie.clustering.inline.max.commits", "1");
properties.setProperty("hoodie.clustering.inline", "true");
testInsertTwoBatches(true, "2015/03/16", properties, true);
assertFalse(shouldFail);
} catch (HoodieException e) {
assertEquals(CLUSTERING_FAILURE, e.getMessage());
assertTrue(shouldFail);
}
}

private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields) throws IOException {
return testInsertTwoBatches(populateMetaFields, "2015/03/16");
}

private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws IOException {
return testInsertTwoBatches(populateMetaFields, partitionPath, new Properties(), false);
}

/**
* This method returns following three items:
* 1. List of all HoodieRecord written in the two batches of insert.
* 2. Commit instants of the two batches.
* 3. List of new file group ids that were written in the two batches.
*/
private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws IOException {
private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields, String partitionPath, Properties props,
boolean failInlineClustering) throws IOException {
// create config to not update small files.
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields,
populateMetaFields ? new Properties() : getPropertiesForKeyGen());
SparkRDDWriteClient client = getHoodieWriteClient(config);
populateMetaFields ? props : getPropertiesForKeyGen());
SparkRDDWriteClient client;
if (failInlineClustering) {
if (null != writeClient) {
writeClient.close();
writeClient = null;
}
client = new WriteClientBrokenClustering(context, config);
} else {
client = getHoodieWriteClient(config);
}

dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
String commitTime1 = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime1, populateMetaFields);
List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime1, populateMetaFields, failInlineClustering);
Set<HoodieFileGroupId> fileIds1 = getFileGroupIdsFromWriteStatus(statuses1);

String commitTime2 = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records2 = dataGen.generateInserts(commitTime2, 200);
List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, commitTime2, populateMetaFields);
List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, commitTime2, populateMetaFields, failInlineClustering);
Set<HoodieFileGroupId> fileIds2 = getFileGroupIdsFromWriteStatus(statuses2);
Set<HoodieFileGroupId> fileIdsUnion = new HashSet<>(fileIds1);
fileIdsUnion.addAll(fileIds2);
Expand Down Expand Up @@ -2075,11 +2108,20 @@ private void verifyRecordsWrittenWithPreservedMetadata(Set<String> commitTimes,
}

private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields) throws IOException {
return writeAndVerifyBatch(client, inserts, commitTime, populateMetaFields, false);
}

private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields, boolean autoCommitOff) throws IOException {
client.startCommitWithTime(commitTime);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts, 2);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime).collect();
JavaRDD<WriteStatus> statusRDD = client.upsert(insertRecordsRDD1, commitTime);
if (autoCommitOff) {
client.commit(commitTime, statusRDD);
}
List<WriteStatus> statuses = statusRDD.collect();
assertNoWriteErrors(statuses);
verifyRecordsWritten(commitTime, populateMetaFields, inserts, statuses, client.getConfig());

return statuses;
}

Expand Down Expand Up @@ -2755,4 +2797,18 @@ protected void validateRecordsBeforeAndAfter(final Dataset<Row> before, final Da
}
}

public static class WriteClientBrokenClustering<T extends HoodieRecordPayload> extends org.apache.hudi.client.SparkRDDWriteClient<T> {

public WriteClientBrokenClustering(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
super(context, clientConfig);
}

@Override
protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
throw new HoodieException(CLUSTERING_FAILURE);
}

}

public static String CLUSTERING_FAILURE = "CLUSTERING FAILURE";
}

0 comments on commit 1cdbf68

Please sign in to comment.