diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoColumnOrderSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoColumnOrderSuite.scala index 4236c7d80d..08cf18088a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoColumnOrderSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoColumnOrderSuite.scala @@ -125,14 +125,17 @@ class DeltaInsertIntoColumnOrderSuite extends DeltaInsertIntoTest { for { (inserts: Set[Insert], expectedAnswer) <- Seq( // When there's a type mismatch and an implicit cast is required, then all inserts use position - // based resolution for struct fields, except for `INSERT OVERWRITE PARTITION (partition)` which - // uses name based resolution, and dataframe inserts by name which don't support implicit cast - // and fail - see negative test below. + // based resolution for struct fields, except for `INSERT OVERWRITE PARTITION (partition)` and + // streaming insert which use name based resolution, and dataframe inserts by name which don't + // support implicit cast and fail - see negative test below. insertsAppend - StreamingInsert -> TestData("a int, s struct ", Seq("""{ "a": 1, "s": { "x": 2, "y": 3 } }""", """{ "a": 1, "s": { "x": 5, "y": 4 } }""")), insertsOverwrite - SQLInsertOverwritePartitionByPosition -> TestData("a int, s struct ", Seq("""{ "a": 1, "s": { "x": 5, "y": 4 } }""")), + Set(StreamingInsert) -> + TestData("a int, s struct ", + Seq("""{ "a": 1, "s": { "x": 2, "y": 3 } }""", """{ "a": 1, "s": { "x": 4, "y": 5 } }""")), Set(SQLInsertOverwritePartitionByPosition) -> TestData("a int, s struct ", Seq("""{ "a": 1, "s": { "x": 4, "y": 5 } }""")) ) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoMissingColumnSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoMissingColumnSuite.scala index a9dbc4f438..006f6dc533 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoMissingColumnSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoMissingColumnSuite.scala @@ -137,6 +137,26 @@ class DeltaInsertIntoMissingColumnSuite extends DeltaInsertIntoTest { confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString) ) + testInserts(s"insert with implicit cast and missing nested field," + + s"schemaEvolution=$schemaEvolution")( + initialData = + TestData("a int, s struct", Seq("""{ "a": 1, "s": { "x": 2, "y": 3 } }""")), + partitionBy = Seq("a"), + overwriteWhere = "a" -> 1, + insertData = + TestData("a int, s struct", Seq("""{ "a": 1, "s": { "y": 5 } }""")), + // Missing nested fields are allowed when writing to a delta streaming sink when there's a + // type mismatch, same as when there's no type mismatch. + expectedResult = ExpectedResult.Success( + expected = new StructType() + .add("a", IntegerType) + .add("s", new StructType() + .add("x", IntegerType) + .add("y", IntegerType))), + includeInserts = Set(StreamingInsert), + confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString) + ) + // Missing columns for all inserts by position and missing nested fields for all inserts by // position or SQL inserts are rejected. Whether the insert also includes a type mismatch // doesn't play a role. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoSchemaEvolutionSuite.scala index 671ea442a9..77989cefef 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoSchemaEvolutionSuite.scala @@ -105,7 +105,7 @@ class DeltaInsertIntoSchemaEvolutionSuite extends DeltaInsertIntoTest { )) }) }, - includeInserts = insertsByPosition, + includeInserts = insertsByPosition + StreamingInsert, confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString) ) @@ -124,7 +124,7 @@ class DeltaInsertIntoSchemaEvolutionSuite extends DeltaInsertIntoTest { "updateField" -> "b" )) }), - includeInserts = insertsDataframe.intersect(insertsByName), + includeInserts = insertsDataframe.intersect(insertsByName) - StreamingInsert, confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString) ) @@ -210,7 +210,7 @@ class DeltaInsertIntoSchemaEvolutionSuite extends DeltaInsertIntoTest { )) }) }, - includeInserts = insertsSQL ++ insertsByPosition -- Seq( + includeInserts = insertsSQL ++ insertsByPosition + StreamingInsert -- Seq( // It's not possible to specify a column that doesn't exist in the target using SQL with an // explicit column list. SQLInsertColList(SaveMode.Append), @@ -236,7 +236,7 @@ class DeltaInsertIntoSchemaEvolutionSuite extends DeltaInsertIntoTest { "updateField" -> "s" )) }), - includeInserts = insertsDataframe.intersect(insertsByName), + includeInserts = insertsDataframe.intersect(insertsByName) - StreamingInsert, confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString) ) }