Skip to content

Commit

Permalink
now working with test
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Vexler committed Nov 22, 2022
1 parent afb5f8f commit 79e84bb
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
this.txnManager.endTransaction(Option.of(inflightInstant));
}

// We don't want to fail the commit if hoodie.deltastreamer.fail.writes.on.inline.table.service.errors is false. We catch warn if false
// We don't want to fail the commit if hoodie.deltastreamer.fail.writes.on.inline.table.service.exceptions is false. We catch warn if false
try {
// do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period
runTableServicesInline(table, metadata, extraMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "Controls whether or not, the write should be failed as well, if such archiving fails.");

public static final ConfigProperty<String> FAIL_ON_TABLE_SERVICE_EXCEPTION_ENABLE = ConfigProperty
.key("hoodie.deltastreamer.fail.writes.on.inline.table.service.errors")
.key("hoodie.deltastreamer.fail.writes.on.inline.table.service.exceptions")
.defaultValue("false")
.withDocumentation("Table services such as compaction and clustering can fail and prevent syncing to "
+ "the metaclient in Deltastreamer. Set this to true to fail writes when table services fail");
Expand Down Expand Up @@ -2365,6 +2365,11 @@ public Builder withDeleteParallelism(int parallelism) {
return this;
}

public Builder withFailureOnTableServiceException(boolean fail) {
writeConfig.setValue(FAIL_ON_TABLE_SERVICE_EXCEPTION_ENABLE, String.valueOf(fail));
return this;
}

public Builder withParallelism(int insertShuffleParallelism, int upsertShuffleParallelism) {
writeConfig.setValue(INSERT_PARALLELISM_VALUE, String.valueOf(insertShuffleParallelism));
writeConfig.setValue(UPSERT_PARALLELISM_VALUE, String.valueOf(upsertShuffleParallelism));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieUpsertException;
Expand Down Expand Up @@ -1754,30 +1755,52 @@ private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig cluste
return allRecords.getLeft().getLeft();
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testClusteringFailure(boolean shouldFail) throws IOException{
try {
Properties properties = new Properties();
properties.setProperty("hoodie.deltastreamer.fail.writes.on.inline.table.service.exceptions", String.valueOf(shouldFail));
properties.setProperty("hoodie.auto.commit", "false");
properties.setProperty("hoodie.clustering.inline.max.commits", "1");
properties.setProperty("hoodie.clustering.inline", "true");
testInsertTwoBatches(true, "2015/03/16", properties, true);
} catch (HoodieException e) {
assertEquals("CLUSTERING IS BROKEN", e.getMessage());
assertTrue(shouldFail);
return;
}
assertFalse(shouldFail);
}

private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields) throws IOException {
return testInsertTwoBatches(populateMetaFields, "2015/03/16");
}

/**
* This method returns following three items:
* 1. List of all HoodieRecord written in the two batches of insert.
* 2. Commit instants of the two batches.
* 3. List of new file group ids that were written in the two batches.
*/
private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws IOException {
return testInsertTwoBatches(populateMetaFields, partitionPath, new Properties(), false);
}

/**
* This method returns following three items:
* 1. List of all HoodieRecord written in the two batches of insert.
* 2. Commit instants of the two batches.
* 3. List of new file group ids that were written in the two batches.
*/
private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields, String partitionPath, Properties props, boolean makeClusteringBroken) throws IOException {
// create config to not update small files.
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields,
populateMetaFields ? new Properties() : getPropertiesForKeyGen());
SparkRDDWriteClient client = getHoodieWriteClient(config);
populateMetaFields ? props: getPropertiesForKeyGen());
SparkRDDWriteClient client = getHoodieWriteClient(config, makeClusteringBroken);
dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
String commitTime1 = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime1, populateMetaFields);
List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime1, populateMetaFields, makeClusteringBroken);
Set<HoodieFileGroupId> fileIds1 = getFileGroupIdsFromWriteStatus(statuses1);

String commitTime2 = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records2 = dataGen.generateInserts(commitTime2, 200);
List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, commitTime2, populateMetaFields);
List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, commitTime2, populateMetaFields, makeClusteringBroken);
Set<HoodieFileGroupId> fileIds2 = getFileGroupIdsFromWriteStatus(statuses2);
Set<HoodieFileGroupId> fileIdsUnion = new HashSet<>(fileIds1);
fileIdsUnion.addAll(fileIds2);
Expand Down Expand Up @@ -2075,11 +2098,19 @@ private void verifyRecordsWrittenWithPreservedMetadata(Set<String> commitTimes,
}

private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields) throws IOException {
return writeAndVerifyBatch(client, inserts, commitTime, populateMetaFields, false);
}

private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields, boolean autoCommitOff) throws IOException {
client.startCommitWithTime(commitTime);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts, 2);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime).collect();
JavaRDD<WriteStatus> statusRDD = client.upsert(insertRecordsRDD1, commitTime);
List<WriteStatus> statuses = statusRDD.collect();
assertNoWriteErrors(statuses);
verifyRecordsWritten(commitTime, populateMetaFields, inserts, statuses, client.getConfig());
if (autoCommitOff) {
client.commit(commitTime, statusRDD);
}
return statuses;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.hudi.client.functional;

import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;

import java.util.Map;

public class WriteClientBrokenClustering<T extends HoodieRecordPayload> extends org.apache.hudi.client.SparkRDDWriteClient<T> {


public WriteClientBrokenClustering(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
super(context, clientConfig);
}

@Override
protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
throw new HoodieException("CLUSTERING IS BROKEN");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.functional.WriteClientBrokenClustering;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand Down Expand Up @@ -489,12 +490,20 @@ public SparkRDDReadClient getHoodieReadClient(String basePath) {
}

public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
return getHoodieWriteClient(cfg, false);
}

public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, Boolean makeClusteringBroken) {
if (null != writeClient) {
writeClient.close();
writeClient = null;
}
writeClient = new SparkRDDWriteClient(context, cfg);
return writeClient;
if (makeClusteringBroken) {
writeClient = new WriteClientBrokenClustering(context, cfg);
} else {
writeClient = new SparkRDDWriteClient(context, cfg);
}
return writeClient;
}

public HoodieTableMetaClient getHoodieMetaClient(Configuration conf, String basePath) {
Expand Down

0 comments on commit 79e84bb

Please sign in to comment.