Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2343]Fix the exception for mergeInto when the primaryKey and preCombineField of source table and target table differ in case only #3517

Merged
merged 1 commit into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,11 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
}

private def isEqualToTarget(targetColumnName: String, sourceExpression: Expression): Boolean = {
val sourceColNameMap = sourceDFOutput.map(attr => (attr.name.toLowerCase, attr.name)).toMap

sourceExpression match {
case attr: AttributeReference if attr.name.equalsIgnoreCase(targetColumnName) => true
case Cast(attr: AttributeReference, _, _) if attr.name.equalsIgnoreCase(targetColumnName) => true
case attr: AttributeReference if sourceColNameMap(attr.name.toLowerCase).equals(targetColumnName) => true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use sparkSession.sessionState.conf.resolver to compare the column name?

Copy link
Contributor Author

@dongkelun dongkelun Sep 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,is it like this?

val resolver = sparkSession.sessionState.conf.resolver
case attr: AttributeReference if resolver(attr.name, targetColumnName) => true

I'm not sure if I understand,resolver is not case sensitive when comparing equality.However, the comparison of equality here must be case sensitive.Therefore, use sourceColNameMap(attr.name.toLowerCase) to obtain the original column name of source table without case conversion,Then compare with targetColumnName for equality.If not, add the corresponding column name with withColumn later. It is case sensitive because sourceDF is case sensitive when writing data.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me. +1 for this.

case Cast(attr: AttributeReference, _, _) if sourceColNameMap(attr.name.toLowerCase).equals(targetColumnName) => true
case _=> false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,4 +375,73 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase {
}
}

test("Test MergeInto When PrimaryKey And PreCombineField Of Source Table And Target Table Differ In Case Only") {
withTempDir { tmp =>
val tableName = generateTableName
// Create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| options (
| primaryKey ='id',
| preCombineField = 'ts'
| )
""".stripMargin)

spark.sql(
s"""
| merge into $tableName
| using (
| select 1 as ID, 'a1' as NAME, 10 as PRICE, 1000 as TS, '1' as FLAG
| ) s0
| on s0.ID = $tableName.id
| when matched and FLAG = '1' then update set
| id = s0.ID, name = s0.NAME, price = s0.PRICE, ts = s0.TS
| when not matched and FLAG = '1' then insert *
|""".stripMargin)
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000)
)

// Test the case of the column names of condition and action is different from that of source table
spark.sql(
s"""
| merge into $tableName
| using (
| select 1 as ID, 'a1' as NAME, 11 as PRICE, 1001 as TS, '1' as FLAG
| ) s0
| on s0.id = $tableName.id
| when matched and FLAG = '1' then update set
| id = s0.id, name = s0.NAME, price = s0.PRICE, ts = s0.ts
| when not matched and FLAG = '1' then insert *
|""".stripMargin)
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 11.0, 1001)
)

// Test the case of the column names of cast condition is different from that of source table
spark.sql(
s"""
| merge into $tableName
| using (
| select 2 as ID, 'a2' as NAME, 12 as PRICE, 1002 as TS, '1' as FLAG
| ) s0
| on cast(s0.id as int) = $tableName.id
| when matched and FLAG = '1' then update set
| id = s0.id, name = s0.NAME, price = s0.PRICE, ts = s0.ts
| when not matched and FLAG = '1' then insert *
|""".stripMargin)
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 11.0, 1001),
Seq(2, "a2", 12.0, 1002)
)
}
}

}