Skip to content

Commit

Permalink
Allow missing fields with implicit casting during streaming write
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Oct 29, 2024
1 parent fce003f commit 4c5609b
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,17 @@ class DeltaInsertIntoColumnOrderSuite extends DeltaInsertIntoTest {

for { (inserts: Set[Insert], expectedAnswer) <- Seq(
// When there's a type mismatch and an implicit cast is required, then all inserts use position
// based resolution for struct fields, except for `INSERT OVERWRITE PARTITION (partition)` which
// uses name based resolution, and dataframe inserts by name which don't support implicit cast
// and fail - see negative test below.
// based resolution for struct fields, except for `INSERT OVERWRITE PARTITION (partition)` and
// streaming insert which use name based resolution, and dataframe inserts by name which don't
// support implicit cast and fail - see negative test below.
insertsAppend - StreamingInsert ->
TestData("a int, s struct <x int, y: int>",
Seq("""{ "a": 1, "s": { "x": 2, "y": 3 } }""", """{ "a": 1, "s": { "x": 5, "y": 4 } }""")),
insertsOverwrite - SQLInsertOverwritePartitionByPosition ->
TestData("a int, s struct <x int, y: int>", Seq("""{ "a": 1, "s": { "x": 5, "y": 4 } }""")),
Set(StreamingInsert) ->
TestData("a int, s struct <x int, y: int>",
Seq("""{ "a": 1, "s": { "x": 2, "y": 3 } }""", """{ "a": 1, "s": { "x": 4, "y": 5 } }""")),
Set(SQLInsertOverwritePartitionByPosition) ->
TestData("a int, s struct <x int, y: int>", Seq("""{ "a": 1, "s": { "x": 4, "y": 5 } }"""))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,26 @@ class DeltaInsertIntoMissingColumnSuite extends DeltaInsertIntoTest {
confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString)
)

testInserts(s"insert with implicit cast and missing nested field," +
s"schemaEvolution=$schemaEvolution")(
initialData =
TestData("a int, s struct<x: int, y: int>", Seq("""{ "a": 1, "s": { "x": 2, "y": 3 } }""")),
partitionBy = Seq("a"),
overwriteWhere = "a" -> 1,
insertData =
TestData("a int, s struct<y: long>", Seq("""{ "a": 1, "s": { "y": 5 } }""")),
// Missing nested fields are allowed when writing to a delta streaming sink when there's a
// type mismatch, same as when there's no type mismatch.
expectedResult = ExpectedResult.Success(
expected = new StructType()
.add("a", IntegerType)
.add("s", new StructType()
.add("x", IntegerType)
.add("y", IntegerType))),
includeInserts = Set(StreamingInsert),
confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString)
)

// Missing columns for all inserts by position and missing nested fields for all inserts by
// position or SQL inserts are rejected. Whether the insert also includes a type mismatch
// doesn't play a role.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class DeltaInsertIntoSchemaEvolutionSuite extends DeltaInsertIntoTest {
))
})
},
includeInserts = insertsByPosition,
includeInserts = insertsByPosition + StreamingInsert,
confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString)
)

Expand All @@ -124,7 +124,7 @@ class DeltaInsertIntoSchemaEvolutionSuite extends DeltaInsertIntoTest {
"updateField" -> "b"
))
}),
includeInserts = insertsDataframe.intersect(insertsByName),
includeInserts = insertsDataframe.intersect(insertsByName) - StreamingInsert,
confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString)
)

Expand Down Expand Up @@ -210,7 +210,7 @@ class DeltaInsertIntoSchemaEvolutionSuite extends DeltaInsertIntoTest {
))
})
},
includeInserts = insertsSQL ++ insertsByPosition -- Seq(
includeInserts = insertsSQL ++ insertsByPosition + StreamingInsert -- Seq(
// It's not possible to specify a column that doesn't exist in the target using SQL with an
// explicit column list.
SQLInsertColList(SaveMode.Append),
Expand All @@ -236,7 +236,7 @@ class DeltaInsertIntoSchemaEvolutionSuite extends DeltaInsertIntoTest {
"updateField" -> "s"
))
}),
includeInserts = insertsDataframe.intersect(insertsByName),
includeInserts = insertsDataframe.intersect(insertsByName) - StreamingInsert,
confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString)
)
}
Expand Down

0 comments on commit 4c5609b

Please sign in to comment.