From 3cec3f8cea64be28f341c1e8a9eaf60142d5e037 Mon Sep 17 00:00:00 2001 From: KnightChess <981159963@qq.com> Date: Thu, 29 Sep 2022 10:02:21 +0800 Subject: [PATCH] [HUDI-4946] fix merge into with no preCombineField has dup row by only insert --- .../command/MergeIntoHoodieTableCommand.scala | 69 +++------ .../spark/sql/hudi/TestMergeIntoTable2.scala | 134 +++++++++++++++++- 2 files changed, 152 insertions(+), 51 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index f0394ad379e7..9919062cac70 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -185,19 +185,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie // Create the write parameters val parameters = buildMergeIntoConfig(hoodieCatalogTable) + executeUpsert(sourceDF, parameters) - if (mergeInto.matchedActions.nonEmpty) { // Do the upsert - executeUpsert(sourceDF, parameters) - } else { // If there is no match actions in the statement, execute insert operation only. - val targetDF = Dataset.ofRows(sparkSession, mergeInto.targetTable) - val primaryKeys = hoodieCatalogTable.tableConfig.getRecordKeyFieldProp.split(",") - // Only records that are not included in the target table can be inserted - val insertSourceDF = sourceDF.join(targetDF, primaryKeys,"leftanti") - - // column order changed after left anti join , we should keep column order of source dataframe - val cols = removeMetaFields(sourceDF).columns - executeInsertOnly(insertSourceDF.select(cols.head, cols.tail:_*), parameters) - } sparkSession.catalog.refreshTable(targetTableIdentify.unquotedString) Seq.empty[Row] } @@ -299,35 +288,30 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie * expressions to the ExpressionPayload#getInsertValue. */ private def executeUpsert(sourceDF: DataFrame, parameters: Map[String, String]): Unit = { - val updateActions = mergeInto.matchedActions.filter(_.isInstanceOf[UpdateAction]) - .map(_.asInstanceOf[UpdateAction]) - // Check for the update actions - checkUpdateAssignments(updateActions) - - val deleteActions = mergeInto.matchedActions.filter(_.isInstanceOf[DeleteAction]) - .map(_.asInstanceOf[DeleteAction]) - assert(deleteActions.size <= 1, "Should be only one delete action in the merge into statement.") - val deleteAction = deleteActions.headOption - - val insertActions = - mergeInto.notMatchedActions.map(_.asInstanceOf[InsertAction]) - - // Check for the insert actions - checkInsertAssignments(insertActions) - - // Append the table schema to the parameters. In the case of merge into, the schema of sourceDF - // may be different from the target table, because the are transform logical in the update or - // insert actions. val operation = if (StringUtils.isNullOrEmpty(parameters.getOrElse(PRECOMBINE_FIELD.key, ""))) { INSERT_OPERATION_OPT_VAL } else { UPSERT_OPERATION_OPT_VAL } + + // Append the table schema to the parameters. In the case of merge into, the schema of sourceDF + // may be different from the target table, because the are transform logical in the update or + // insert actions. var writeParams = parameters + (OPERATION.key -> operation) + (HoodieWriteConfig.WRITE_SCHEMA.key -> getTableSchema.toString) + (DataSourceWriteOptions.TABLE_TYPE.key -> targetTableType) + val updateActions = mergeInto.matchedActions.filter(_.isInstanceOf[UpdateAction]) + .map(_.asInstanceOf[UpdateAction]) + // Check for the update actions + checkUpdateAssignments(updateActions) + + val deleteActions = mergeInto.matchedActions.filter(_.isInstanceOf[DeleteAction]) + .map(_.asInstanceOf[DeleteAction]) + assert(deleteActions.size <= 1, "Should be only one delete action in the merge into statement.") + val deleteAction = deleteActions.headOption + // Map of Condition -> Assignments val updateConditionToAssignments = updateActions.map(update => { @@ -352,28 +336,13 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie writeParams += (PAYLOAD_DELETE_CONDITION -> serializedDeleteCondition) } - // Serialize the Map[InsertCondition, InsertAssignments] to base64 string - writeParams += (PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS -> - serializedInsertConditionAndExpressions(insertActions)) - - // Remove the meta fields from the sourceDF as we do not need these when writing. - val sourceDFWithoutMetaFields = removeMetaFields(sourceDF) - HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, sourceDFWithoutMetaFields) - } + val insertActions = + mergeInto.notMatchedActions.map(_.asInstanceOf[InsertAction]) - /** - * If there are not matched actions, we only execute the insert operation. - * @param sourceDF - * @param parameters - */ - private def executeInsertOnly(sourceDF: DataFrame, parameters: Map[String, String]): Unit = { - val insertActions = mergeInto.notMatchedActions.map(_.asInstanceOf[InsertAction]) + // Check for the insert actions checkInsertAssignments(insertActions) - var writeParams = parameters + - (OPERATION.key -> INSERT_OPERATION_OPT_VAL) + - (HoodieWriteConfig.WRITE_SCHEMA.key -> getTableSchema.toString) - + // Serialize the Map[InsertCondition, InsertAssignments] to base64 string writeParams += (PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS -> serializedInsertConditionAndExpressions(insertActions)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala index c0c996e5415b..7f23c9f7d8aa 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala @@ -673,7 +673,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { } } - test("Test Merge into with String cast to Double") { + test ("Test Merge into with String cast to Double") { withTempDir { tmp => val tableName = generateTableName // Create a cow partitioned table. @@ -750,4 +750,136 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase { ) } } + + test("Test only insert for source table in dup key with preCombineField") { + Seq("cow", "mor").foreach { + tableType => { + withTempDir { tmp => + val tableName = generateTableName + // Create a cow partitioned table with preCombineField + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + | ) using hudi + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + | partitioned by(dt) + | location '${tmp.getCanonicalPath}' + """.stripMargin) + // Insert data without match condition + spark.sql( + s""" + | merge into $tableName as t0 + | using ( + | select 1 as id, 'a1' as name, 10.1 as price, 1000 as ts, '2021-03-21' as dt + | union all + | select 1 as id, 'a2' as name, 10.2 as price, 1002 as ts, '2021-03-21' as dt + | ) as s0 + | on t0.id = s0.id + | when not matched then insert * + """.stripMargin + ) + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a2", 10.2, 1002, "2021-03-21") + ) + + // Insert data with match condition + spark.sql( + s""" + | merge into $tableName as t0 + | using ( + | select 1 as id, 'a1_new' as name, 10.1 as price, 1003 as ts, '2021-03-21' as dt + | union all + | select 3 as id, 'a3' as name, 10.3 as price, 1003 as ts, '2021-03-21' as dt + | union all + | select 3 as id, 'a3' as name, 10.3 as price, 1003 as ts, '2021-03-21' as dt + | ) as s0 + | on t0.id = s0.id + | when matched then update set * + | when not matched then insert * + """.stripMargin + ) + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1_new", 10.1, 1003, "2021-03-21"), + Seq(3, "a3", 10.3, 1003, "2021-03-21") + ) + } + } + } + } + + test("Test only insert for source table in dup key without preCombineField") { + Seq("cow", "mor").foreach { + tableType => { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + | create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + | ) using hudi + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id' + | ) + | partitioned by(dt) + | location '${tmp.getCanonicalPath}' + """.stripMargin) + // append records to small file is use update bucket, set this conf use concat handler + spark.sql("set hoodie.merge.allow.duplicate.on.inserts = true") + + // Insert data without matched condition + spark.sql( + s""" + | merge into $tableName as t0 + | using ( + | select 1 as id, 'a1' as name, 10.1 as price, 1000 as ts, '2021-03-21' as dt + | union all + | select 1 as id, 'a2' as name, 10.2 as price, 1002 as ts, '2021-03-21' as dt + | ) as s0 + | on t0.id = s0.id + | when not matched then insert * + """.stripMargin + ) + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 10.1, 1000, "2021-03-21"), + Seq(1, "a2", 10.2, 1002, "2021-03-21") + ) + + // Insert data with matched condition + spark.sql( + s""" + | merge into $tableName as t0 + | using ( + | select 3 as id, 'a3' as name, 10.3 as price, 1003 as ts, '2021-03-21' as dt + | union all + | select 1 as id, 'a2' as name, 10.2 as price, 1002 as ts, '2021-03-21' as dt + | ) as s0 + | on t0.id = s0.id + | when matched then update set * + | when not matched then insert * + """.stripMargin + ) + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 10.1, 1000, "2021-03-21"), + Seq(1, "a2", 10.2, 1002, "2021-03-21"), + Seq(3, "a3", 10.3, 1003, "2021-03-21"), + Seq(1, "a2", 10.2, 1002, "2021-03-21") + ) + } + } + } + } }