Skip to content

Commit

Permalink
[SPARK-49044][SQL] ValidateExternalType should return child in error
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

In this PR was an extended error message for checkType in ValidateExternalType with information about child nodes.

### Why are the changes needed?

When we have mixed schema rows, the error message "{actual} is not a valid external type for schema of {expected}" doesn't help to understand the column with the problem. I suggest adding information about the source column.

After fix error message may contain extra info

```
The external type [B is not valid for the type "STRING" at the expression "getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, f3)".
```

#### Example

```scala
class ErrorMsgSuite extends AnyFunSuite with SharedSparkContext {
  test("shouldThrowSchemaError") {
    val seq: Seq[Row] = Seq(
      Row(
        toBytes("0"),
        toBytes(""),
        1L,
      ),
      Row(
        toBytes("0"),
        toBytes(""),
        1L,
      ),
    )    val schema: StructType = new StructType()
      .add("f1", BinaryType)
      .add("f3", StringType)
      .add("f2", LongType)    val df = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(seq), schema)    val exception = intercept[RuntimeException] {
      df.show()
    }    assert(
      exception.getCause.getMessage
        .contains("[B is not a valid external type for schema of string")
    )
    assertResult(
      "[B is not a valid external type for schema of string"
    )(exception.getCause.getMessage)
  }
  def toBytes(x: String): Array[Byte] = x.toCharArray.map(_.toByte)
}
```

After fix error message may contain extra info

```
The external type [B is not valid for the type "STRING" at the expression "getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, f3)".
```

Example: https://github.com/mrk-andreev/example-spark-schema/blob/main/spark_4.0.0/src/test/scala/ErrorMsgSuite.scala

### Does this PR introduce _any_ user-facing change?

This changes extends error message in case of "not a valid external type".

#### Example before

```
java.lang.Integer is not a valid external type for schema of double
```

#### Example after

```
The external type [B is not valid for the type "STRING" at the expression "getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, c0)".
```

### How was this patch tested?

This changes covered by new unit test `test("SPARK-49044 ValidateExternalType should return child in error")` and by new integration test `test("SPARK-49044 ValidateExternalType should be user visible")`

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#47522 from mrk-andreev/improve-error-for-cast-v2.

Authored-by: Mark Andreev <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
  • Loading branch information
mrk-andreev authored and MaxGekk committed Sep 3, 2024
1 parent 2a6080c commit 270327d
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 13 deletions.
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -2177,6 +2177,12 @@
],
"sqlState" : "42001"
},
"INVALID_EXTERNAL_TYPE" : {
"message" : [
"The external type <externalType> is not valid for the type <type> at the expression <expr>."
],
"sqlState" : "42K0N"
},
"INVALID_EXTRACT_BASE_FIELD_TYPE" : {
"message" : [
"Can't extract a value from <base>. Need a complex type [STRUCT, ARRAY, MAP] but got <other>."
Expand Down
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-states.json
Original file line number Diff line number Diff line change
Expand Up @@ -4625,6 +4625,12 @@
"standard": "N",
"usedBy": ["Spark"]
},
"42K0N": {
"description": "Invalid external type.",
"origin": "Spark",
"standard": "N",
"usedBy": ["Spark"]
},
"42KD0": {
"description": "Ambiguous name reference.",
"origin": "Databricks",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2023,8 +2023,6 @@ case class ValidateExternalType(child: Expression, expected: DataType, externalD

override val dataType: DataType = externalDataType

private lazy val errMsg = s" is not a valid external type for schema of ${expected.simpleString}"

private lazy val checkType: (Any) => Boolean = expected match {
case _: DecimalType =>
(value: Any) => {
Expand Down Expand Up @@ -2057,14 +2055,12 @@ case class ValidateExternalType(child: Expression, expected: DataType, externalD
if (checkType(input)) {
input
} else {
throw new RuntimeException(s"${input.getClass.getName}$errMsg")
throw QueryExecutionErrors.invalidExternalTypeError(
input.getClass.getName, expected, child)
}
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
// Use unnamed reference that doesn't create a local field here to reduce the number of fields
// because errMsgField is used only when the type doesn't match.
val errMsgField = ctx.addReferenceObj("errMsg", errMsg)
val input = child.genCode(ctx)
val obj = input.value
def genCheckTypes(classes: Seq[Class[_]]): String = {
Expand All @@ -2090,14 +2086,22 @@ case class ValidateExternalType(child: Expression, expected: DataType, externalD
s"$obj instanceof ${CodeGenerator.boxedType(dataType)}"
}

// Use unnamed reference that doesn't create a local field here to reduce the number of fields
// because errMsgField is used only when the type doesn't match.
val expectedTypeField = ctx.addReferenceObj(
"expectedTypeField", expected)
val childExpressionMsgField = ctx.addReferenceObj(
"childExpressionMsgField", child)

val code = code"""
${input.code}
${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
if (!${input.isNull}) {
if ($typeCheck) {
${ev.value} = (${CodeGenerator.boxedType(dataType)}) $obj;
} else {
throw new RuntimeException($obj.getClass().getName() + $errMsgField);
throw QueryExecutionErrors.invalidExternalTypeError(
$obj.getClass().getName(), $expectedTypeField, $childExpressionMsgField);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,20 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
)
}

def invalidExternalTypeError(
actualType: String,
expectedType: DataType,
childExpression: Expression): SparkRuntimeException = {
new SparkRuntimeException(
errorClass = "INVALID_EXTERNAL_TYPE",
messageParameters = Map(
"externalType" -> actualType,
"type" -> toSQLType(expectedType),
"expr" -> toSQLExpr(childExpression)
)
)
}

def notOverrideExpectedMethodsError(
className: String, m1: String, m2: String): SparkRuntimeException = {
new SparkRuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,29 +288,33 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest {
val encoder = ExpressionEncoder(schema)
toRow(encoder, Row(1.toShort))
}
assert(e1.getCause.getMessage.contains("java.lang.Short is not a valid external type"))
assert(e1.getCause.getMessage.contains("The external type java.lang.Short " +
"is not valid for the type \"INT\""))

val e2 = intercept[RuntimeException] {
val schema = new StructType().add("a", StringType)
val encoder = ExpressionEncoder(schema)
toRow(encoder, Row(1))
}
assert(e2.getCause.getMessage.contains("java.lang.Integer is not a valid external type"))
assert(e2.getCause.getMessage.contains("The external type java.lang.Integer " +
"is not valid for the type \"STRING\""))

val e3 = intercept[RuntimeException] {
val schema = new StructType().add("a",
new StructType().add("b", IntegerType).add("c", StringType))
val encoder = ExpressionEncoder(schema)
toRow(encoder, Row(1 -> "a"))
}
assert(e3.getCause.getMessage.contains("scala.Tuple2 is not a valid external type"))
assert(e3.getCause.getMessage.contains("The external type scala.Tuple2 is not valid " +
"for the type \"STRUCT<b: INT, c: STRING>\""))

val e4 = intercept[RuntimeException] {
val schema = new StructType().add("a", ArrayType(TimestampType))
val encoder = ExpressionEncoder(schema)
toRow(encoder, Row(Array("a")))
}
assert(e4.getCause.getMessage.contains("java.lang.String is not a valid external type"))
assert(e4.getCause.getMessage.contains("The external type java.lang.String is not valid " +
"for the type \"TIMESTAMP\""))
}

private def roundTripArray[T](dt: DataType, nullable: Boolean, data: Array[T]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,13 +547,55 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkObjectExprEvaluation(validateType, input, InternalRow.fromSeq(Seq(Row(input))))
}

checkExceptionInExpression[RuntimeException](
checkExceptionInExpression[SparkRuntimeException](
ValidateExternalType(
GetExternalRowField(inputObject, index = 0, fieldName = "c0"),
DoubleType,
DoubleType),
InternalRow.fromSeq(Seq(Row(1))),
"java.lang.Integer is not a valid external type for schema of double")
"The external type java.lang.Integer is not valid for the type \"DOUBLE\"")
}

test("SPARK-49044 ValidateExternalType should return child in error") {
val inputObject = BoundReference(0, ObjectType(classOf[Row]), nullable = true)
Seq(
(true, BooleanType),
(2.toByte, ByteType),
(5.toShort, ShortType),
(23, IntegerType),
(61L, LongType),
(1.0f, FloatType),
(10.0, DoubleType),
("abcd".getBytes, BinaryType),
("abcd", StringType),
(BigDecimal.valueOf(10), DecimalType.IntDecimal),
(IntervalUtils.stringToInterval(UTF8String.fromString("interval 3 day")),
CalendarIntervalType),
(java.math.BigDecimal.valueOf(10), DecimalType.BigIntDecimal),
(Array(3, 2, 1), ArrayType(IntegerType))
).foreach { case (input, dt) =>
val enc = RowEncoder.encoderForDataType(dt, lenient = false)
val validateType = ValidateExternalType(
GetExternalRowField(inputObject, index = 0, fieldName = "c0"),
dt,
EncoderUtils.lenientExternalDataTypeFor(enc))
checkObjectExprEvaluation(validateType, input, InternalRow.fromSeq(Seq(Row(input))))
}

checkErrorInExpression[SparkRuntimeException](
expression = ValidateExternalType(
GetExternalRowField(inputObject, index = 0, fieldName = "c0"),
DoubleType,
DoubleType),
inputRow = InternalRow.fromSeq(Seq(Row(1))),
errorClass = "INVALID_EXTERNAL_TYPE",
parameters = Map[String, String](
"externalType" -> "java.lang.Integer",
"type" -> "\"DOUBLE\"",
"expr" -> ("\"getexternalrowfield(input[0, org.apache.spark.sql.Row, true], " +
"0, c0)\"")
)
)
}

private def javaMapSerializerFor(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{StringType, StructType}

class ValidateExternalTypeSuite extends QueryTest with SharedSparkSession {
test("SPARK-49044 ValidateExternalType should be user visible") {
checkError(
exception = intercept[SparkRuntimeException] {
spark.createDataFrame(spark.sparkContext.parallelize(Seq(
Row(
"".toCharArray.map(_.toByte)
)
)), new StructType().add("f3", StringType)).show()
}.getCause.asInstanceOf[SparkRuntimeException],
errorClass = "INVALID_EXTERNAL_TYPE",
parameters = Map(
("externalType", "[B"),
("type", "\"STRING\""),
("expr", "\"getexternalrowfield(assertnotnull(" +
"input[0, org.apache.spark.sql.Row, true]), 0, f3)\"")
)
)
}
}

0 comments on commit 270327d

Please sign in to comment.