Skip to content

Commit

Permalink
[HUDI-6828] Fix wrong partitionToReplaceIds when insertOverwrite empt…
Browse files Browse the repository at this point in the history
…y data into partitions (#9811)
  • Loading branch information
beyond1920 authored Oct 9, 2023
1 parent ecd088d commit 66879cd
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public class HoodieInternalConfig extends HoodieConfig {
.withDocumentation("For SQL operations, if enables bulk_insert operation, "
+ "this configure will take effect to decide overwrite whole table or partitions specified");

public static final ConfigProperty<String> STATIC_OVERWRITE_PARTITION_PATHS = ConfigProperty
.key("hoodie.static.overwrite.partition.paths")
.defaultValue("")
.markAdvanced()
.withDocumentation("Inner configure to pass static partition paths to executors for SQL operations.");

/**
* Returns if partition records are sorted or not.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieInternalConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.table.HoodieTable;
Expand All @@ -34,6 +36,7 @@

import org.apache.spark.Partitioner;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -78,8 +81,17 @@ protected String getCommitActionType() {

@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
return HoodieJavaPairRDD.getJavaPairRDD(writeMetadata.getWriteStatuses().map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap();
if (writeMetadata.getWriteStatuses().isEmpty()) {
String staticOverwritePartition = config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
if (StringUtils.isNullOrEmpty(staticOverwritePartition)) {
return Collections.emptyMap();
} else {
return Collections.singletonMap(staticOverwritePartition, getAllExistingFileIds(staticOverwritePartition));
}
} else {
return HoodieJavaPairRDD.getJavaPairRDD(writeMetadata.getWriteStatuses().map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap();
}
}

protected List<String> getAllExistingFileIds(String partitionPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieInternalConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -58,8 +60,17 @@ public WriteOperationType getWriteOperationType() {

@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
return HoodieJavaPairRDD.getJavaPairRDD(writeStatuses.map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap();
if (writeStatuses.isEmpty()) {
String staticOverwritePartition = writeConfig.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
if (staticOverwritePartition == null || staticOverwritePartition.isEmpty()) {
return Collections.emptyMap();
} else {
return Collections.singletonMap(staticOverwritePartition, getAllExistingFileIds(staticOverwritePartition));
}
} else {
return HoodieJavaPairRDD.getJavaPairRDD(writeStatuses.map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap();
}
}

protected List<String> getAllExistingFileIds(String partitionPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ trait ProvidesHoodieConfig extends Logging {
isOverwritePartition: Boolean,
isOverwriteTable: Boolean,
insertPartitions: Map[String, Option[String]] = Map.empty,
extraOptions: Map[String, String]): Map[String, String] = {
extraOptions: Map[String, String],
staticOverwritePartitionPathOpt: Option[String] = Option.empty): Map[String, String] = {

if (insertPartitions.nonEmpty &&
(insertPartitions.keys.toSet != hoodieCatalogTable.partitionFields.toSet)) {
Expand Down Expand Up @@ -256,6 +257,13 @@ trait ProvidesHoodieConfig extends Logging {
Map()
}

val staticOverwritePartitionPathOptions = staticOverwritePartitionPathOpt match {
case Some(staticOverwritePartitionPath) =>
Map(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS.key() -> staticOverwritePartitionPath)
case _ =>
Map()
}

// try to use new insert dup policy instead of legacy insert mode to deduce payload class. If only insert mode is explicitly specified,
// w/o specifying any value for insert dup policy, legacy configs will be honored. But on all other cases (i.e when neither of the configs is set,
// or when both configs are set, or when only insert dup policy is set), we honor insert dup policy and ignore the insert mode.
Expand Down Expand Up @@ -304,7 +312,7 @@ trait ProvidesHoodieConfig extends Logging {
RECORDKEY_FIELD.key -> recordKeyConfigValue,
PRECOMBINE_FIELD.key -> preCombineField,
PARTITIONPATH_FIELD.key -> partitionFieldsStr
) ++ overwriteTableOpts ++ getDropDupsConfig(useLegacyInsertModeFlow, combinedOpts)
) ++ overwriteTableOpts ++ getDropDupsConfig(useLegacyInsertModeFlow, combinedOpts) ++ staticOverwritePartitionPathOptions

combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf,
defaultOpts = defaultOpts, overridingOpts = overridingOpts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi
isOverWritePartition = true
}
}

val config = buildHoodieInsertConfig(catalogTable, sparkSession, isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions)
val staticOverwritePartitionPathOpt = getStaticOverwritePartitionPath(catalogTable, partitionSpec, isOverWritePartition)
val config = buildHoodieInsertConfig(catalogTable, sparkSession, isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions, staticOverwritePartitionPathOpt)

val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, sparkSession.sessionState.conf)

Expand All @@ -118,6 +118,22 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi
success
}

private def getStaticOverwritePartitionPath(hoodieCatalogTable: HoodieCatalogTable,
partitionsSpec: Map[String, Option[String]],
isOverWritePartition: Boolean): Option[String] = {
if (isOverWritePartition) {
val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
val isStaticOverwritePartition = staticPartitionValues.keys.size == hoodieCatalogTable.partitionFields.length
if (isStaticOverwritePartition) {
Option.apply(makePartitionPath(hoodieCatalogTable, staticPartitionValues))
} else {
Option.empty
}
} else {
Option.empty
}
}

/**
* Align provided [[query]]'s output with the expected [[catalogTable]] schema by
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -880,30 +880,19 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}

test("Test insert overwrite partitions with empty dataset") {
withSQLConf(SPARK_SQL_INSERT_INTO_OPERATION.key -> WriteOperationType.BULK_INSERT.value()) {
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
withTable(generateTableName) { inputTable =>
spark.sql(
s"""
|create table $inputTable (
| id int,
| name string,
| price double,
| dt string
|) using hudi
| tblproperties (
| type = '$tableType',
| primaryKey = 'id'
| )
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$inputTable'
""".stripMargin)

withTable(generateTableName) { target =>
Seq(true, false).foreach { enableBulkInsert =>
val bulkInsertConf: Array[(String, String)] = if (enableBulkInsert) {
Array(SPARK_SQL_INSERT_INTO_OPERATION.key -> WriteOperationType.BULK_INSERT.value())
} else {
Array()
}
withSQLConf(bulkInsertConf: _*) {
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
withTable(generateTableName) { inputTable =>
spark.sql(
s"""
|create table $target (
|create table $inputTable (
| id int,
| name string,
| price double,
Expand All @@ -914,21 +903,36 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
| primaryKey = 'id'
| )
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$target'
| location '${tmp.getCanonicalPath}/$inputTable'
""".stripMargin)
spark.sql(s"insert into $target values(3, 'c1', 13, '2021-07-17')")
spark.sql(s"insert into $target values(1, 'a1', 10, '2021-07-18')")

// Insert overwrite a partition with empty record
spark.sql(s"insert overwrite table $target partition(dt='2021-07-17') select id, name, price from $inputTable")
// TODO enable result check after fix https://issues.apache.org/jira/browse/HUDI-6828
// checkAnswer(s"select id, name, price, dt from $target order by id")(
// Seq(1, "a1", 10.0, "2021-07-18")
// )

withTable(generateTableName) { target =>
spark.sql(
s"""
|create table $target (
| id int,
| name string,
| price double,
| dt string
|) using hudi
| tblproperties (
| type = '$tableType',
| primaryKey = 'id'
| )
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$target'
""".stripMargin)
spark.sql(s"insert into $target values(3, 'c1', 13, '2021-07-17')")
spark.sql(s"insert into $target values(1, 'a1', 10, '2021-07-18')")

// Insert overwrite a partition with empty record
spark.sql(s"insert overwrite table $target partition(dt='2021-07-17') select id, name, price from $inputTable")
checkAnswer(s"select id, name, price, dt from $target where dt='2021-07-17'")(Seq.empty: _*)
}
}
}
}
})
})
}
}
}

Expand Down

0 comments on commit 66879cd

Please sign in to comment.