Skip to content

Commit

Permalink
[HUDI-4643] MergeInto syntax WHEN MATCHED is optional but must be set (
Browse files Browse the repository at this point in the history
  • Loading branch information
dongkelun authored and voonhous committed Oct 7, 2022
1 parent bffbf4a commit 71a1c16
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,11 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
if (mergeInto.matchedActions.nonEmpty) { // Do the upsert
executeUpsert(sourceDF, parameters)
} else { // If there is no match actions in the statement, execute insert operation only.
executeInsertOnly(sourceDF, parameters)
val targetDF = Dataset.ofRows(sparkSession, mergeInto.targetTable)
val primaryKeys = hoodieCatalogTable.tableConfig.getRecordKeyFieldProp.split(",")
// Only records that are not included in the target table can be inserted
val insertSourceDF = sourceDF.join(targetDF, primaryKeys,"leftanti")
executeInsertOnly(insertSourceDF, parameters)
}
sparkSession.catalog.refreshTable(targetTableIdentify.unquotedString)
Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
| )
""".stripMargin)

// Insert data to source table
// Insert data
spark.sql(s"insert into $tableName select 1, 'a1', 1, 10, '2021-03-21'")
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 1.0, 10, "2021-03-21")
Expand Down Expand Up @@ -544,4 +544,96 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
}
}

test("Test only insert when source table contains history") {
withTempDir { tmp =>
val tableName = generateTableName
// Create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| primaryKey ='id',
| preCombineField = 'ts'
| )
""".stripMargin)
// Insert data
spark.sql(s"insert into $tableName select 1, 'a1', 1, 10, '2022-08-18'")
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 1.0, 10, "2022-08-18")
)

// Insert data which not matched insert-condition.
spark.sql(
s"""
| merge into $tableName as t0
| using (
| select 1 as id, 'a1' as name, 11 as price, 110 as ts, '2022-08-19' as dt union all
| select 2 as id, 'a2' as name, 10 as price, 100 as ts, '2022-08-18' as dt
| ) as s0
| on t0.id = s0.id
| when not matched then insert *
""".stripMargin
)

checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 1.0, 10, "2022-08-18"),
Seq(2, "a2", 10.0, 100, "2022-08-18")
)
}
}

test("Test only insert when source table contains history and target table has multiple keys") {
withTempDir { tmp =>
val tableName = generateTableName
// Create table with multiple keys
spark.sql(
s"""
|create table $tableName (
| id1 int,
| id2 int,
| name string,
| price double,
| ts long,
| dt string
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| primaryKey ='id1,id2',
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql("set hoodie.merge.allow.duplicate.on.inserts = true")
// Insert data
spark.sql(s"insert into $tableName select 1, 1, 'a1', 1, 10, '2022-08-18'")
checkAnswer(s"select id1, id2, name, price, ts, dt from $tableName")(
Seq(1, 1, "a1", 1.0, 10, "2022-08-18")
)

// Insert data which not matched insert-condition.
spark.sql(
s"""
| merge into $tableName as t0
| using (
| select 1 as id1, 1 as id2, 'a1' as name, 11 as price, 110 as ts, '2022-08-19' as dt union all
| select 1 as id1, 2 as id2, 'a2' as name, 10 as price, 100 as ts, '2022-08-18' as dt
| ) as s0
| on t0.id1 = s0.id1
| when not matched then insert *
""".stripMargin
)

checkAnswer(s"select id1, id2, name, price, ts, dt from $tableName")(
Seq(1, 1, "a1", 1.0, 10, "2022-08-18"),
Seq(1, 2, "a2", 10.0, 100, "2022-08-18")
)
}
}

}

0 comments on commit 71a1c16

Please sign in to comment.