Skip to content

Commit

Permalink
[HUDI-7139] Fix operation type for bulk insert with row writer in Hud…
Browse files Browse the repository at this point in the history
…i Streamer (apache#10175)

This commit fixes the bug which causes the `operationType` to be null in the commit metadata of bulk insert operation with row writer enabled in Hudi Streamer (`hoodie.datasource.write.row.writer.enable=true`).  `HoodieStreamerDatasetBulkInsertCommitActionExecutor` is updated so that `#preExecute` and `#afterExecute` should run the same logic as regular bulk insert operation without row writer.
  • Loading branch information
yihua authored Nov 25, 2023
1 parent 332e7e8 commit 4f875ed
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Expand All @@ -44,12 +42,8 @@ public HoodieStreamerDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig con

@Override
protected void preExecute() {
// no op
}

@Override
protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
// no op
table.validateInsertSchema();
writeClient.preWrite(instantTime, getWriteOperationType(), table.getMetaClient());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,10 @@ private void testBulkInsertRowWriterMultiBatches(Boolean useSchemaProvider, List
if (i == 2 || i == 4) { // this validation reloads the timeline. So, we are validating only for first and last batch.
// validate commit metadata for all completed commits to have valid schema in extra metadata.
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry -> assertValidSchemaInCommitMetadata(entry, metaClient));
metaClient.reloadActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().getInstants()
.forEach(entry -> assertValidSchemaAndOperationTypeInCommitMetadata(
entry, metaClient, WriteOperationType.BULK_INSERT));
}
}
} finally {
Expand Down Expand Up @@ -1743,15 +1746,21 @@ private void testParquetDFSSource(boolean useSchemaProvider, List<String> transf
assertRecordCount(parquetRecordsCount + 100, tableBasePath, sqlContext);
// validate commit metadata for all completed commits to have valid schema in extra metadata.
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry -> assertValidSchemaInCommitMetadata(entry, metaClient));
metaClient.reloadActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().getInstants()
.forEach(entry -> assertValidSchemaAndOperationTypeInCommitMetadata(
entry, metaClient, WriteOperationType.INSERT));
testNum++;
}

private void assertValidSchemaInCommitMetadata(HoodieInstant instant, HoodieTableMetaClient metaClient) {
private void assertValidSchemaAndOperationTypeInCommitMetadata(HoodieInstant instant,
HoodieTableMetaClient metaClient,
WriteOperationType operationType) {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
assertFalse(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY)));
assertEquals(operationType, commitMetadata.getOperationType());
} catch (IOException ioException) {
throw new HoodieException("Failed to parse commit metadata for " + instant.toString());
}
Expand Down

0 comments on commit 4f875ed

Please sign in to comment.